# **Homework: Speed up your pipeline**

### **Goal**

Use the public **Jaffle Shop API** to build a `dlt` pipeline and apply everything you've learned about performance:

- Chunking
- Parallelism
- Buffer control
- File rotation
- Worker tuning

Your task is to **make the pipeline as fast as possible**, while keeping the results correct.



### **What you’ll need**

- API base: `https://jaffle-shop.scalevector.ai/api/v1`
- Docs: [https://jaffle-shop.scalevector.ai/docs](https://jaffle-shop.scalevector.ai/docs)
- Start with these endpoints:
  - `/customers`
  - `/orders`
  - `/products`

Each of them returns **paged responses** — so you'll need to handle pagination.



### **What to implement**

1. **Extract** from the API using `dlt`
   - Use `dlt.resource` and [`RESTClient`](https://dlthub.com/docs/devel/general-usage/http/rest-client) with proper pagination

2. **Apply all performance techniques**
   - Group resources into sources
   - Yield **chunks/pages**, not single rows
   - Use `parallelized=True`
   - Set `EXTRACT__WORKERS`, `NORMALIZE__WORKERS`, and `LOAD__WORKERS`
   - Tune buffer sizes and enable **file rotation**

3. **Measure performance**
   - Time the extract, normalize, and load stages separately
   - Compare a naive version vs. optimized version
   - Log thread info or `pipeline.last_trace` if helpful


### **Deliverables**

Share your code as a Google Colab or [GitHub Gist](https://gist.github.com/) in Homework Google Form. **This step is required for certification.**


It should include:
- Working pipeline for at least 2 endpoints
- Before/after timing comparison
- A short explanation of what changes made the biggest difference if there're any differences

In [21]:
import dlt
import os

from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

In [22]:
os.environ["EXTRACT__WORKERS"] = "3"

In [23]:
base_url = 'https://jaffle-shop.scalevector.ai/api/v1'
page_size = 1_000

In [24]:
@dlt.source(name='jaffle_shop')
def jaffle_shop_source():

    client = RESTClient(
        base_url=base_url, 
        paginator=PageNumberPaginator(
            page_param='page',
            base_page=1,
            total_path=None
        )
    )

    @dlt.resource(name="customers", primary_key="id", parallelized=True)
    def get_customers():
        for page in client.paginate("customers"):
            yield page

    @dlt.resource(name="products", primary_key="sku", parallelized=True)
    def get_products():
        for page in client.paginate("products"):
            yield page

    @dlt.resource(name="orders", primary_key="id", parallelized=True)
    def get_orders():
        for page in client.paginate("orders", params={"page_size": page_size}):
            yield page

    return [get_customers, get_orders, get_products]

In [25]:
pipeline = dlt.pipeline(
    destination='duckdb',
    dataset_name='optimized-jaffle_shop',
    full_refresh=True,
    progress='log'
)

  full_refresh_argument_deprecated("pipeline", full_refresh)


In [26]:
%timeit

pipeline.extract(jaffle_shop_source())

----------------------------- Extract jaffle_shop ------------------------------
Resources: 0/3 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 51.05 MB (84.10%) | CPU usage: 0.00%

----------------------------- Extract jaffle_shop ------------------------------
Resources: 0/3 (0.0%) | Time: 2.90s | Rate: 0.00/s
products: 10  | Time: 0.00s | Rate: 1446311.72/s
Memory usage: 48.22 MB (84.50%) | CPU usage: 0.00%

----------------------------- Extract jaffle_shop ------------------------------
Resources: 0/3 (0.0%) | Time: 2.95s | Rate: 0.00/s
products: 10  | Time: 0.05s | Rate: 214.13/s
customers: 100  | Time: 0.00s | Rate: 34952533.33/s
Memory usage: 53.08 MB (84.50%) | CPU usage: 0.00%

----------------------------- Extract jaffle_shop ------------------------------
Resources: 0/3 (0.0%) | Time: 12.96s | Rate: 0.00/s
products: 10  | Time: 10.06s | Rate: 0.99/s
customers: 100  | Time: 10.01s | Rate: 9.99/s
orders: 1000  | Time: 0.00s | Rate: 349525333.33/s
Memory usage: 110.81 MB (83.

ExtractInfo(pipeline=<dlt.pipeline.pipeline.Pipeline object at 0x10a9241c0>, metrics={'1748260293.5200179': [{'started_at': DateTime(2025, 5, 26, 11, 51, 33, 525768, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2025, 5, 26, 12, 6, 24, 378966, tzinfo=Timezone('UTC')), 'schema_name': 'jaffle_shop', 'job_metrics': {'products.b2810c038d.typed-jsonl': DataWriterMetrics(file_path='/Users/redperiabras/.dlt/pipelines/dlt_ipykernel_launcher/normalize/864818dd52e7deb6/1748260293.5200179/new_jobs/products.b2810c038d.0.typed-jsonl', items_count=10, file_size=1416, created=1748260296.4261482, last_modified=1748260307.907465), 'customers.d2386d29ef.typed-jsonl': DataWriterMetrics(file_path='/Users/redperiabras/.dlt/pipelines/dlt_ipykernel_launcher/normalize/864818dd52e7deb6/1748260293.5200179/new_jobs/customers.d2386d29ef.0.typed-jsonl', items_count=935, file_size=64816, created=1748260296.4731278, last_modified=1748260334.629886), 'orders.b7253007de.typed-jsonl': DataWriterMetrics(file_path='/U

In [27]:
%%timeit

pipeline.normalize()

----------------- Normalize jaffle_shop in 1748260293.5200179 ------------------
Files: 0/4 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 49.28 MB (86.30%) | CPU usage: 0.00%

----------------- Normalize jaffle_shop in 1748260293.5200179 ------------------
Files: 0/4 (0.0%) | Time: 0.00s | Rate: 0.00/s
Items: 0  | Time: 0.00s | Rate: 0.00/s
Memory usage: 49.45 MB (86.30%) | CPU usage: 0.00%

----------------- Normalize jaffle_shop in 1748260293.5200179 ------------------
Files: 5/4 (125.0%) | Time: 5.20s | Rate: 0.96/s
Items: 0  | Time: 5.20s | Rate: 0.00/s
Memory usage: 186.28 MB (86.10%) | CPU usage: 0.00%

----------------- Normalize jaffle_shop in 1748260293.5200179 ------------------
Files: 5/4 (125.0%) | Time: 5.21s | Rate: 0.96/s
Items: 153794  | Time: 5.21s | Rate: 29531.72/s
Memory usage: 191.12 MB (86.00%) | CPU usage: 0.00%

8.52 ms ± 1.09 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [28]:
%%timeit

load_info = pipeline.load()



-------------------- Load jaffle_shop in 1748260293.5200179 --------------------
Jobs: 0/5 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 156.14 MB (85.10%) | CPU usage: 0.00%





-------------------- Load jaffle_shop in 1748260293.5200179 --------------------
Jobs: 3/5 (60.0%) | Time: 5.82s | Rate: 0.52/s
Memory usage: 629.62 MB (84.70%) | CPU usage: 0.00%





-------------------- Load jaffle_shop in 1748260293.5200179 --------------------
Jobs: 5/5 (100.0%) | Time: 6.30s | Rate: 0.79/s
Memory usage: 205.27 MB (80.80%) | CPU usage: 0.00%

10.6 ms ± 947 μs per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [30]:
print(pipeline.last_trace)

Run started at 2025-05-26 11:51:33.508046+00:00 and COMPLETED in 50 minutes and 44.50 seconds with 17 steps.
Step extract COMPLETED in 14 minutes and 50.90 seconds.

Load package 1748260293.5200179 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 5.25 seconds.
Normalized data for the following tables:
- orders: 61948 row(s)
- orders__items: 90900 row(s)
- customers: 935 row(s)
- _dlt_pipeline_state: 1 row(s)
- products: 10 row(s)

Load package 1748260293.5200179 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.01 seconds.
No data found to normalize

Step normalize COMPLETED in 0.01 seconds.
No data found to normalize

Step normalize COMPLETED in 0.01 seconds.
No data found to normalize

Step normalize COMPLETED in 0.01 seconds.
No data found to normalize

Step normalize COMPLETED in 0.01 seconds.
No data found to normalize

Step normalize COMPLETED in 0.01 seconds.
N