# Lesson 9: Performance Optimisation

This notebook contains my deliverables for Lesson 9 of the dlt Advanced course.

It contains a dlt pipeline that queries the below endpoints from the [Jaffle Shop API](https://jaffle-shop.scalevector.ai/docs)
* `/customers`
* `/orders`
* `/products`

## Setup

This notebook assumes you're running this from the [project repo](https://github.com/jarcelao/dlt-advanced-2025). In other words, it assumes you have `uv` and the relevant project dependencies installed.

Otherwise, you can run the below cell:

In [None]:
%%capture
%pip install dlt[duckdb]

## Naive Pipeline

Let's start by implementing this the naive way. In this version, we follow dlt best practices, but keep the defaults for performance tuning.

In [2]:
import dlt

from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


client = RESTClient(
    base_url="https://jaffle-shop.scalevector.ai/api/v1",
    paginator=PageNumberPaginator(base_page=1, total_path=""),
)


@dlt.resource(
    table_name="customers",
    write_disposition="merge",
    primary_key="id"
)
def get_customers():
    yield from client.paginate("/customers")


@dlt.resource(
    table_name="orders", 
    write_disposition="merge", 
    primary_key="id"
)
def get_orders():
    yield from client.paginate("/orders")


@dlt.resource(
    table_name="products",
    write_disposition="merge",
    primary_key="sku"
)
def get_products():
    yield from client.paginate("/products")


@dlt.source
def jaffle_shop_data():
    return get_customers(), get_orders(), get_products()


pipeline = dlt.pipeline(
    pipeline_name="jaffle_shop_pipeline",
    destination="duckdb",
    dataset_name="jaffle_shop",
    dev_mode=True
)


pipeline.run(jaffle_shop_data())
print(pipeline.last_trace)

Run started at 2025-05-10 05:08:16.911087+00:00 and COMPLETED in 16 minutes and 52.11 seconds with 4 steps.
Step extract COMPLETED in 16 minutes and 25.26 seconds.

Load package 1746853696.9559867 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 15.00 seconds.
Normalized data for the following tables:
- orders: 61948 row(s)
- orders__items: 90900 row(s)
- customers: 935 row(s)
- _dlt_pipeline_state: 1 row(s)
- products: 10 row(s)

Load package 1746853696.9559867 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 11.82 seconds.
Pipeline jaffle_shop_pipeline load step completed in 11.78 seconds
1 load package(s) were loaded to destination duckdb and into dataset jaffle_shop_20250510050816
The duckdb destination used duckdb:////workspaces/dlt-advanced-2025/jaffle_shop_pipeline.duckdb location to store data
Load package 1746853696.9559867 is LOADED and contains no failed jobs

St

That took its sweet time! Can we do a lot better if we applied optimization?

## Optimized Pipeline

Now we run the pipeline with all optimization techniques applied
- Chunking
- Parallelism
- Buffer control
- File rotation
- Worker tuning

In [4]:
import dlt
import os

from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


os.environ["EXTRACT__WORKERS"] = "4"
os.environ["DATA_WRITER__BUFFER_MAX_ITEMS"] = "10000"
os.environ["EXTRACT__DATA_WRITER__FILE_MAX_ITEMS"] = "10000"

os.environ["NORMALIZE__WORKERS"] = "4"
os.environ["NORMALIZE__DATA_WRITER__BUFFER_MAX_ITEMS"] = "10000"
os.environ["NORMALIZE__DATA_WRITER__FILE_MAX_ITEMS"] = "10000"

os.environ["LOAD__WORKERS"] = "4"


client = RESTClient(
    base_url="https://jaffle-shop.scalevector.ai/api/v1",
    paginator=PageNumberPaginator(base_page=1, total_path=""),
)


@dlt.resource(
    table_name="customers",
    write_disposition="merge",
    primary_key="id",
    parallelized=True,
)
def get_customers():  # noqa: F811
    yield from client.paginate("/customers")


@dlt.resource(
    table_name="orders",
    write_disposition="merge",
    primary_key="id",
    parallelized=True
)
def get_orders():  # noqa: F811
    yield from client.paginate("/orders?page_size=1000")


@dlt.resource(
    table_name="products",
    write_disposition="merge",
    primary_key="sku",
    parallelized=True,
)
def get_products():  # noqa: F811
    yield from client.paginate("/products")


@dlt.source
def jaffle_shop_data():
    return get_customers(), get_orders(), get_products()


pipeline = dlt.pipeline(
    pipeline_name="jaffle_shop_pipeline",
    destination="duckdb",
    dataset_name="jaffle_shop",
    dev_mode=True
)


pipeline.run(jaffle_shop_data())
print(pipeline.last_trace)

Run started at 2025-05-10 05:26:48.773800+00:00 and COMPLETED in 7 minutes and 35.19 seconds with 4 steps.
Step extract COMPLETED in 7 minutes and 12.70 seconds.

Load package 1746854808.8049765 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 14.81 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- customers: 935 row(s)
- products: 10 row(s)
- orders: 61948 row(s)
- orders__items: 90900 row(s)

Load package 1746854808.8049765 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 7.65 seconds.
Pipeline jaffle_shop_pipeline load step completed in 7.62 seconds
1 load package(s) were loaded to destination duckdb and into dataset jaffle_shop_20250510052648
The duckdb destination used duckdb:////workspaces/dlt-advanced-2025/jaffle_shop_pipeline.duckdb location to store data
Load package 1746854808.8049765 is LOADED and contains no failed jobs

Step r

That looks much better! Let's discuss more in the next part.

## Conclusion

In this exercise, we were able to cut down the overall pipeline runtime from 16 minutes and 52.11 seconds down to 7 minutes and 35.18 seconds. That's an estimated 2X speedup!

I believe what contributed to this speedup the most were the optimizations to the Extract step of the pipeline. In particular, the `/orders` endpoint took the longest to extract in both pipeline versions. Tuning the workers here meant that I could maximize dlt's parallelization capabilities. I also increased the `page_size` parameter to 1000 so as to reduce the amount of API calls I needed to make.