To run this locally, [install Ploomber](https://docs.ploomber.io/en/latest/get-started/quick-start.html) and execute: `ploomber examples -n cookbook/incremental`

Questions? [Ask us on Slack.](https://ploomber.io/community/)


# Incremental processing

<!-- start description -->
A pipeline that processes new records from a database and uploads them.
<!-- end description -->

A common scenario is to have a pipeline that processes records incrementally. For example, we might load data from a data warehouse, process all historical records one, and store results in another table. However, when running the pipeline again, we might want to process new records only, since it'd be time consuming to process all records again.

To achieve so, we can add a dynamic parameter to our pipeline to return the index of the latest processed record. Let's look at the `pipeline.yaml`:

```yaml
# Content of pipeline.yaml
tasks:
  - source: tasks/load.py
    product:
      nb: output/load.ipynb
      data: output/raw.csv
    params:
      index: 
        dotted_path: params::get_index
        path_to_index: '{{root}}/index.json'

      path_to_db: '{{root}}/data.db'

    on_finish: hooks.check_if_new_records

  - source: tasks/process.py
    product:
      nb: output/process.ipynb
      data: output/processed.csv

  - source: tasks/upload.py
    product:
      nb: output/upload.ipynb
    params:
      path_to_db: '{{root}}/data.db'

    on_finish:
      dotted_path: hooks.store_index
      path_to_index: '{{root}}/index.json'
```

The first task (`tasks/load.py`) has a dynamic parameter (`params::get_index`). Whenever Ploomber runs your pipeline, it'll import `params.py`, call `get_index()` and assign the returned value to the `index` parameter in the task.

Let's look at `params.py`:

```python
# Content of params.py
import json
from pathlib import Path


def get_index(path_to_index):
    path = Path(path_to_index)

    if not path.exists():
        return -1

    index = json.loads(path.read_text())
    return index['latest']

```

You can see that it loads the parameter from an `index.json` file; however, you can store the parameter anywhere you want.

Let's now create a sample database and insert 100 records to a table named `numbers`:

In [1]:
%%bash
# clean up data and parameters in case we have anything
rm -f data.db index.json

In [2]:
%%bash
# create database and insert 100 rows
python insert.py

Appending 100 records...
Index range: 1, 100
Done.


In [3]:
%%bash
# count rows in the table
python count.py

numbers has 100 rows
plus_one table does not exist


Let's run the pipeline. Our tasks will load all unprocessed records from the `numbers` table, transform the data and store the output in the `plus_one` table:

In [4]:
%%bash
ploomber build --log info --force

Loading pipeline...


INFO:ploomber.dag.dag:Building DAG DAG("incremental")
Building task 'load':   0%|          | 0/3 [00:00<?, ?it/s]INFO:ploomber.tasks.abc.NotebookRunner:Starting execution: NotebookRunner: load -> MetaProduct({'data': File('output/raw.csv'), 'nb': File('output/load.ipynb')})
INFO:papermill:Input Notebook:  /var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmpg59y4re6.ipynb
INFO:papermill:Output Notebook: /Users/Edu/dev/projects-ploomber/cookbook/incremental/output/load.ipynb

Executing:   0%|          | 0/6 [00:00<?, ?cell/s]INFO:papermill:Executing notebook with kernel: python3

Executing:  17%|█▋        | 1/6 [00:01<00:08,  1.62s/cell]
Executing: 100%|██████████| 6/6 [00:02<00:00,  2.41cell/s]
INFO:ploomber.tasks.abc.NotebookRunner:Done. Operation took 2.5 seconds
Building task 'process':  33%|███▎      | 1/3 [00:02<00:05,  2.52s/it]INFO:ploomber.tasks.abc.NotebookRunner:Starting execution: NotebookRunner: process -> MetaProduct({'data': File('output/processed.csv'), 'nb': File('output

Storing index: 100


Building task 'upload': 100%|██████████| 3/3 [00:07<00:00,  2.38s/it]
- NotebookRunner: load -> MetaProduct({'data': File('output/raw.csv'), 'nb': File('output/load.ipynb')}) -
----- /Users/Edu/dev/projects-ploomber/cookbook/incremental/tasks/load.py ------
:10:1: 'json' imported but unused
:11:1: 'pathlib.Path' imported but unused
NotebookRunner: load -> MetaProduct({'data': File('output/raw.csv'), 'nb': File('output/load.ipynb')})

INFO:ploomber.dag.dag: DAG report:
name     Ran?      Elapsed (s)    Percentage
-------  ------  -------------  ------------
load     True          2.5057        35.2447
process  True          2.41283       33.9385
upload   True          2.1909        30.8168


name     Ran?      Elapsed (s)    Percentage
-------  ------  -------------  ------------
load     True          2.5057        35.2447
process  True          2.41283       33.9385
upload   True          2.1909        30.8168


Let's check the table counts:

In [5]:
%%bash
python count.py

numbers has 100 rows
plus_one has 100 rows


Great, our pipeline processed the existing 100 rows in `numbers` and stored the results in the `plus_one` table.

Let's add another 100 rows to the `numbers` table:

In [6]:
%%bash
python insert.py

Appending 100 records...
Index range: 101, 200
Done.


In [7]:
%%bash
python count.py

numbers has 200 rows
plus_one has 100 rows


Process the data again:

In [8]:
%%bash
ploomber build --log info --force

Loading pipeline...


INFO:ploomber.dag.dag:Building DAG DAG("incremental")
Building task 'load':   0%|          | 0/3 [00:00<?, ?it/s]INFO:ploomber.tasks.abc.NotebookRunner:Starting execution: NotebookRunner: load -> MetaProduct({'data': File('output/raw.csv'), 'nb': File('output/load.ipynb')})
INFO:papermill:Input Notebook:  /var/folders/3h/_lvh_w_x5g30rrjzb_xnn2j80000gq/T/tmp8byixllc.ipynb
INFO:papermill:Output Notebook: /Users/Edu/dev/projects-ploomber/cookbook/incremental/output/load.ipynb

Executing:   0%|          | 0/6 [00:00<?, ?cell/s]INFO:papermill:Executing notebook with kernel: python3

Executing:  17%|█▋        | 1/6 [00:01<00:07,  1.48s/cell]
Executing: 100%|██████████| 6/6 [00:02<00:00,  2.58cell/s]
INFO:ploomber.tasks.abc.NotebookRunner:Done. Operation took 2.3 seconds
Building task 'process':  33%|███▎      | 1/3 [00:02<00:04,  2.35s/it]INFO:ploomber.tasks.abc.NotebookRunner:Starting execution: NotebookRunner: process -> MetaProduct({'data': File('output/processed.csv'), 'nb': File('output

Storing index: 200


Building task 'upload': 100%|██████████| 3/3 [00:06<00:00,  2.27s/it]
- NotebookRunner: load -> MetaProduct({'data': File('output/raw.csv'), 'nb': File('output/load.ipynb')}) -
----- /Users/Edu/dev/projects-ploomber/cookbook/incremental/tasks/load.py ------
:10:1: 'json' imported but unused
:11:1: 'pathlib.Path' imported but unused
NotebookRunner: load -> MetaProduct({'data': File('output/raw.csv'), 'nb': File('output/load.ipynb')})

INFO:ploomber.dag.dag: DAG report:
name     Ran?      Elapsed (s)    Percentage
-------  ------  -------------  ------------
load     True          2.33977       34.4733
process  True          2.11967       31.2303
upload   True          2.32776       34.2964


name     Ran?      Elapsed (s)    Percentage
-------  ------  -------------  ------------
load     True          2.33977       34.4733
process  True          2.11967       31.2303
upload   True          2.32776       34.2964


In [9]:
%%bash
python count.py

numbers has 200 rows
plus_one has 200 rows


Now our `plus_one` table has 200 records, and the last execution only processed 100 rows. Note that if we run the pipeline again, it'll stop after running the `load` task since there are no records to process.

In [10]:
%%bash
ploomber build --force

Loading pipeline...


Building task 'load':   0%|          | 0/3 [00:00<?, ?it/s]
Executing:   0%|          | 0/6 [00:00<?, ?cell/s]
Executing:  17%|█▋        | 1/6 [00:01<00:07,  1.47s/cell]
Executing: 100%|██████████| 6/6 [00:02<00:00,  2.55cell/s]
Building task 'load': 100%|██████████| 3/3 [00:02<00:00,  1.26it/s]


In [11]:
%%bash
python count.py

numbers has 200 rows
plus_one has 200 rows
