# Incremental Loading and Write Dispositions

## 1. Introduction

In this section, we will discover how to use `dlt` effectively by loading only new/modified data using two dlt features in tandem: write dispositions and incremental loading.

### ELT patterns

There are two ideal data source types, in terms of efficiency:
- an immutable source (eg. logs), from which we're able to extract only the new records

  In this case, we're able to use incremental loading with the `append` strategy to load data in the most efficient way.
- a mutable source (eg. a database), but one from which we're able to extract new and modified records

    In this case, we can use the `merge` write disposition.

The diagram below describes the most optimal ELT strategy given how we're able to extract data from a data source.

![](https://thescalableway.com/img/uT145YgjSn-960.webp)

Credit: https://thescalableway.com/blog/dlt-and-prefect-a-great-combo-for-streamlined-data-ingestion-pipelines/#efficiency

## A note on deletes

While `dlt` supports handling of deleted records in the `merge` write disposition, doing so depends on upstream source managing these records in a specific way (we need a column that indicates whether a record has been deleted). This is not a common practice and implementing such scenarios typically requires data engineering work at the data generation level (eg. collaborating with database admins), making this an advanced scenario, and so we will not cover it in this notebook.

For now, assume that in the case records are deleted, a full refresh must be performed.

For more information, see [dlt documentation](https://dlthub.com/docs/general-usage/incremental-loading#delete-records).

## 2. Incremental loading

### What is incremental loading

Incremental loading lets us extract and normalize only the new or modified records from a data source. It relies on a column that indicates when a record was last updated, which is typically a timestamp column such as `modified_at`, or a sequence number. This column is called a "cursor column", and the last processed value of this column is called a "cursor value".

### A note on how dbt stores pipeline state

Let's get back to the first pipeline we've ever executed. Remember those odd `_dlt` tables that were created automatically?

| table_catalog          | table_schema | table_name              |
|------------------------|--------------|-------------------------|
| dummy_source_to_duckdb  | mydata       | person                  |
| dummy_source_to_duckdb  | mydata       | **_dlt_loads**          |
| dummy_source_to_duckdb  | mydata       | **_dlt_pipeline_state** |
| dummy_source_to_duckdb  | mydata       | **_dlt_version**        |

This is one of the instances where they're useful. Specifically, for incremental data loads, dlt tracks each incremental pipeline's cursor value in these tables, which allows each pipeline run to resume from where the previous run stopped.

### Incremental loading in action

In this pipeline, we will generate some fake incremental data. The dataset contains of 10 records, 2 of which are updated each time the pipeline is executed.  Our goal is to only load the updated records in subsequent executions of the pipeline.

To start off, let's see what happens if we have a changing dataset, but don't configure incremental loading in `dlt`:

<div class="alert alert-block alert-warning">
⚠️ Please esure that load file format is set to parquet (as described in lesson 2) before proceeding.
</div>

In [None]:
from collections.abc import Generator
from datetime import UTC, datetime
from random import sample
from secrets import choice
from typing import Any

import dlt
from faker import Faker


fake = Faker()

n_users = 10


def person() -> Generator[dict[str, Any], None, None]:
    """Simulate data from a source.

    We keep the first row static, while rows 2 and 3 are "updated" each time the
    function is called.

    We also showcase the usage of `cursor.last_value`, which could be used to filter
    only new data at the extract stage (eg, by passing it to a filtering parameter
    such as `since` in a REST API).

    For more information on this usage, see
    https://dlthub.com/docs/general-usage/incremental-loading#incremental-loading-with-a-cursor-field.

    The `cursor` variable is injected by the `edp_resource()` decorator.
    """
    ids = range(n_users)
    # Simulate updating two random rows.
    ids_to_update = sample(ids, 2)
    for _id in ids:
        yield {
            "id": _id,
            "name": fake.name(),
            "country": choice(["USA", "China", "Poland"]),
            "updated_at": datetime.now(UTC)
            if _id in ids_to_update
            else datetime(2024, 1, 1, 0, 0, 0, 0, UTC),
        }


pipeline = dlt.pipeline(
    pipeline_name="dummy_source_to_duckdb",
    destination=dlt.destinations.duckdb("incremental.duckdb"),
    dataset_name="bronze",
)

if __name__ == "__main__":
    load_info = pipeline.extract(data=person)
    load_info = pipeline.normalize()

print(load_info)

normalized_data_path = next(
    job.file_path
    for job in load_info.load_packages[0].jobs["new_jobs"]
    if job.job_file_info.table_name == "person"
)
print()
print(f"Normalized data path: {normalized_data_path}")

In [None]:
%%capture

# Install required dependency.
!uv add pandas

In [None]:
import pandas as pd


df = pd.read_parquet(normalized_data_path)
df

Now, re-execute the pipeline and check what the normalized data looks like.

That's right, we see 10 rows again, most of which were updated back in January 2024! This means that we're re-extracting and re-normalizing the same data over and over again.

Let's fix that by using the power of dlt resources.

Execute this pipline **twice** to see that on the second run, only the updated records are normalized.

In [None]:
from collections.abc import Generator
from typing import Any

import dlt
from faker import Faker


fake = Faker()

n_users = 10


@dlt.resource()  # NOTE: Make `person` a dlt resource
def person() -> Generator[dict[str, Any], None, None]:
    """Simulate data from a source.

    We keep the first row static, while rows 2 and 3 are "updated" each time the
    function is called.

    We also showcase the usage of `cursor.last_value`, which could be used to filter
    only new data at the extract stage (eg, by passing it to a filtering parameter
    such as `since` in a REST API).

    For more information on this usage, see
    https://dlthub.com/docs/general-usage/incremental-loading#incremental-loading-with-a-cursor-field.

    The `cursor` variable is injected by the `edp_resource()` decorator.
    """
    ids = range(n_users)
    # Simulate updating two random rows.
    ids_to_update = sample(ids, 2)
    for _id in ids:
        yield {
            "id": _id,
            "name": fake.name(),
            "country": choice(["USA", "China", "Poland"]),
            "updated_at": datetime.now(UTC)
            if _id in ids_to_update
            else datetime(2024, 1, 1, 0, 0, 0, 0, UTC),
        }


pipeline = dlt.pipeline(
    pipeline_name="dummy_source_to_duckdb",
    destination=dlt.destinations.duckdb("incremental.duckdb"),
    dataset_name="bronze",
)

# NOTE: Specify the cursor column.
person.apply_hints(incremental=dlt.sources.incremental("updated_at"))

if __name__ == "__main__":
    load_info = pipeline.extract(data=person)
    load_info = pipeline.normalize()

print(load_info)

normalized_data_path = next(
    job.file_path
    for job in load_info.load_packages[0].jobs["new_jobs"]
    if job.job_file_info.table_name == "person"
)
print()
print(f"Normalized data path: {normalized_data_path}")

In [None]:
import pandas as pd


df = pd.read_parquet(normalized_data_path)
df

Great! Now, how do we tell `dlt` to not for example append these rows in the destination, but to update them instead? Enter **write dispositions**.

## 2. Write Dispositions

Write dispositions tell `dlt` how data should be loaded into the destination. The three main ones are: "append", "replace", and "merge". In our case, we're interested in the "merge" disposition.

Below is a cheat sheet on which disposition to use in which scenario:

![dlt write dispositions decision tree](https://storage.googleapis.com/dlt-blog-images/flowchart_for_scd2.png)

Credit: [dlt documentation](https://dlthub.com/docs/general-usage/incremental-loading#two-simple-questions-determine-the-write-disposition-you-use)

Let's first see what happens if we extract data incrementally, but don't specify any write disposition. Execute the pipeline below **twice**.

In [None]:
from collections.abc import Generator
from typing import Any

import dlt
from faker import Faker


fake = Faker()

n_users = 10


@dlt.resource()
def person() -> Generator[dict[str, Any], None, None]:
    """Simulate data from a source.

    We keep the first row static, while rows 2 and 3 are "updated" each time the
    function is called.

    We also showcase the usage of `cursor.last_value`, which could be used to filter
    only new data at the extract stage (eg, by passing it to a filtering parameter
    such as `since` in a REST API).

    For more information on this usage, see
    https://dlthub.com/docs/general-usage/incremental-loading#incremental-loading-with-a-cursor-field.

    The `cursor` variable is injected by the `edp_resource()` decorator.
    """
    ids = range(n_users)
    # Simulate updating two random rows.
    ids_to_update = sample(ids, 2)
    for _id in ids:
        yield {
            "id": _id,
            "name": fake.name(),
            "country": choice(["USA", "China", "Poland"]),
            "updated_at": datetime.now(UTC)
            if _id in ids_to_update
            else datetime(2024, 1, 1, 0, 0, 0, 0, UTC),
        }


pipeline = dlt.pipeline(
    pipeline_name="dummy_source_to_duckdb",
    destination=dlt.destinations.duckdb("incremental.duckdb"),
    dataset_name="bronze",
)


person.apply_hints(incremental=dlt.sources.incremental("updated_at"))

if __name__ == "__main__":
    # NOTE: Chance this to run() to execute the full EL process.
    load_info = pipeline.run(person)

print(load_info)

In [None]:
with pipeline.sql_client() as client:
    with client.execute_query("SELECT * FROM bronze.person") as cursor:
        data = cursor.df()

data

In [None]:
# Clean up.
!rm -f incremental.duckdb

#### What happened?

We correctly extracted only the two modified rows, but we appended them to the destination table, resulting in two dupicate rows (assuming `id` if our unique identifier).

#### Using the `merge` write disposition
Let's fix that by specifying the `merge` write disposition*.

*Note that for the merge disposition to work, we also need to specify the primary key of our resource. You can read more about each write disposition in the [dlt documentation](https://dlthub.com/docs/general-usage/incremental-loading).

Execute below pipeline twice:

In [None]:
from collections.abc import Generator
from typing import Any

import dlt
from faker import Faker


fake = Faker()

n_users = 10


@dlt.resource()
def person() -> Generator[dict[str, Any], None, None]:
    """Simulate data from a source.

    We keep the first row static, while rows 2 and 3 are "updated" each time the
    function is called.

    We also showcase the usage of `cursor.last_value`, which could be used to filter
    only new data at the extract stage (eg, by passing it to a filtering parameter
    such as `since` in a REST API).

    For more information on this usage, see
    https://dlthub.com/docs/general-usage/incremental-loading#incremental-loading-with-a-cursor-field.

    The `cursor` variable is injected by the `edp_resource()` decorator.
    """
    ids = range(n_users)
    # Simulate updating two random rows.
    ids_to_update = sample(ids, 2)
    for _id in ids:
        yield {
            "id": _id,
            "name": fake.name(),
            "country": choice(["USA", "China", "Poland"]),
            "updated_at": datetime.now(UTC)
            if _id in ids_to_update
            else datetime(2024, 1, 1, 0, 0, 0, 0, UTC),
        }


pipeline = dlt.pipeline(
    pipeline_name="dummy_source_to_duckdb",
    destination=dlt.destinations.duckdb("incremental.duckdb"),
    dataset_name="bronze",
)

person.apply_hints(incremental=dlt.sources.incremental("updated_at"))
person.apply_hints(primary_key="id")  # NOTE: Specify the primary key.

if __name__ == "__main__":
    # NOTE: Add `merge` write disposition.
    load_info = pipeline.run(person, write_disposition="merge")

print(load_info)

Great! Now we're **updating** the rows that have changed instead of **appending** them:

In [None]:
with pipeline.sql_client() as client:
    with client.execute_query("SELECT * FROM bronze.person") as cursor:
        data = cursor.df()

data

We still have 10 records, and we now have different `updated_at` values.

#### Bonus - using `_dlt_load_id`

We can also utilize `_dlt_load_id` to inspect or debug incremental loads:

In [None]:
with pipeline.sql_client() as client:
    with client.execute_query(
        "SELECT _dlt_load_id, count(*) as records FROM bronze.person GROUP BY _dlt_load_id"
    ) as cursor:
        data = cursor.df()
data

This column is created automatically by `dlt` (the behavior is configurable in the `config.toml` file).

**NOTE** This column will also come in handy when working with dlt-loaded datasets in the Transform layer, eg. when creating `dbt` models, as it will allow as to easily define incremental models in a standardized way and without any additional setup.

## Summary

In this lesson, we've learned:
- what is incremental loading, its related concepts (curor column and cursor value), and how it fits into the Extract and Load process
- how to use incremental loading to extract only new or modified records from a data source
- how to use write dispositions to control how data is loaded into the destination
- how to use `_dlt_load_id` to inspect or debug incremental loads

