Skip to content

Latest commit

 

History

History
96 lines (67 loc) · 2.46 KB

_source.md

File metadata and controls

96 lines (67 loc) · 2.46 KB
jupyter
jupytext kernelspec
text_representation
extension format_name format_version jupytext_version
.md
markdown
1.3
1.13.0
display_name language name
Python 3 (ipykernel)
python
python3

Incremental processing

A pipeline that processes new records from a database and uploads them.

A common scenario is to have a pipeline that processes records incrementally. For example, we might load data from a data warehouse, process all historical records one, and store results in another table. However, when running the pipeline again, we might want to process new records only, since it'd be time consuming to process all records again.

To achieve so, we can add a dynamic parameter to our pipeline to return the index of the latest processed record. Let's look at the pipeline.yaml:

<% expand('pipeline.yaml') %>

The first task (tasks/load.py) has a dynamic parameter (params::get_index). Whenever Ploomber runs your pipeline, it'll import params.py, call get_index() and assign the returned value to the index parameter in the task.

Let's look at params.py:

<% expand('params.py') %>

You can see that it loads the parameter from an index.json file; however, you can store the parameter anywhere you want.

Let's now create a sample database and insert 100 records to a table named numbers:

# clean up data and parameters in case we have anything
rm -f data.db index.json
# create database and insert 100 rows
python insert.py
# count rows in the table
python count.py

Let's run the pipeline. Our tasks will load all unprocessed records from the numbers table, transform the data and store the output in the plus_one table:

ploomber build --log info --force

Let's check the table counts:

python count.py

Great, our pipeline processed the existing 100 rows in numbers and stored the results in the plus_one table.

Let's add another 100 rows to the numbers table:

python insert.py
python count.py

Process the data again:

ploomber build --log info --force
python count.py

Now our plus_one table has 200 records, and the last execution only processed 100 rows. Note that if we run the pipeline again, it'll stop after running the load task since there are no records to process.

ploomber build --force
python count.py