# Building a Data Pipeline with dlt

In this notebook, we will build a complete data pipeline from scratch using **dlt**.

Our goal is simple:

‚Üí Fetch real data from an API  
‚Üí Turn it into clean relational tables  
‚Üí Load it into a database  
‚Üí Explore and analyze it  

We will use the **Open Library API** as our data source and **DuckDB** as our database.

Along the way, you will learn:

- What a dlt source is  
- What a dlt pipeline does  
- How data moves through Extract ‚Üí Normalize ‚Üí Load  
- How to inspect and explore the final dataset  

By the end, you will understand not just how to run a pipeline, but what happens at each stage.


## üì¶ Step 0: Install Dependencies


In [None]:
# install dependencies first (only if uv is not used)
!pip -q install dlt[duckdb]

[1;31merror[0m: [1mexternally-managed-environment[0m

[31m√ó[0m This environment is externally managed
[31m‚ï∞‚îÄ>[0m To install Python packages system-wide, try apt install
[31m   [0m python3-xyz, where xyz is the package you are trying to
[31m   [0m install.
[31m   [0m 
[31m   [0m If you wish to install a non-Debian-packaged Python package,
[31m   [0m create a virtual environment using python3 -m venv path/to/venv.
[31m   [0m Then use path/to/venv/bin/python and path/to/venv/bin/pip. Make
[31m   [0m sure you have python3-full installed.
[31m   [0m 
[31m   [0m If you wish to install a non-Debian packaged Python application,
[31m   [0m it may be easiest to use pipx install xyz, which will manage a
[31m   [0m virtual environment for you. Make sure you have pipx installed.
[31m   [0m 
[31m   [0m See /usr/share/doc/python3.12/README.venv for more information.

[1;35mnote[0m: If you believe this is a mistake, please contact your Python installation or OS

<p>In this notebook we will use:</p>

<ul>
  <li><strong>dlt</strong> to extract, normalize, and load data</li>
  <li><strong>DuckDB</strong> as the destination database (runs locally inside Colab)</li>
</ul>

<p>
  DuckDB is great for beginners because it requires no setup and no credentials.
</p>

## üìö Step 1: Import Libraries


<p>In this cell we import the libraries we will use throughout the notebook:</p>

<ul>
  <li><strong>dlt</strong> is the main library for building and running the pipeline</li>
  <li><strong>rest_api_source</strong> helps us define an API source using a simple configuration</li>
  <li><strong>islice</strong> (from <code>itertools</code>) is a small Python helper for previewing only a few records</li>
</ul>


In [24]:
import dlt
import numpy as np
import pandas as pd
from itertools import islice
from dlt.sources.rest_api import rest_api_source

## üîó Step 2: Define the API Source (Open Library)

<p>
  In <strong>dlt</strong>, a <strong>source</strong> is the part of your pipeline that knows how to fetch data from somewhere.
  In this notebook, our source fetches data from the <strong>Open Library Search API</strong>.
</p>

<p>
  We define the source using <code>rest_api_source</code>, which lets us describe an API in a simple
  Python dictionary instead of writing lots of request code.
</p>

<p>
  üìñ <strong>Open Library Search API docs:</strong><br>
  <a href="https://openlibrary.org/dev/docs/api/search" target="_blank">
    https://openlibrary.org/dev/docs/api/search
  </a>
</p>

In [3]:
def openlibrary_source(query: str = "harry potter"):

    return rest_api_source({
        "client": {
            "base_url": "https://openlibrary.org",
        },
        "resource_defaults": {
            "primary_key": "key",
            "write_disposition": "replace",
        },
        "resources": [
            {
                "name": "books",
                "endpoint": {
                    "path": "search.json",
                    "params": {
                        "q": query,
                        "limit": 100,
                    },
                    "data_selector": "docs",
                    "paginator": {
                        "type": "offset",
                        "limit": 100,
                        "offset_param": "offset",
                        "limit_param": "limit",
                        "total_path": "numFound",
                    },
                },
            },
        ],
    })


## üîß Step 3: Create the dlt Pipeline

In [4]:
pipeline = dlt.pipeline(
    pipeline_name="ol_demo",
    destination="duckdb",
    dataset_name="ol_data",
    progress="log" # logs the pipeline run (Optiona)
)

## üîç Understanding the Pipeline

At this point we have defined two key building blocks:

- **The source** describes where the data comes from and how to fetch it from the API.  
- **The pipeline** describes where the data should go (DuckDB) and keeps track of tables, schemas, and run history.  

---

Instead of running everything at once, we will now run the pipeline in three separate phases so you can clearly see what happens at each stage:

1. **Extract**: download raw data from the API  
2. **Normalize**: turn nested JSON into relational tables  
3. **Load**: write those tables into DuckDB  




![ETL Diagram](https://github.com/anair123/data-engineering-zoomcamp/blob/workshop/dlt_2026/cohorts/2026/workshops/dlt/images/etl_diagram.png?raw=1)

Once these steps make sense, we will run the full workflow again using one command:

```python
pipeline.run(source)


## ‚¨áÔ∏è Step 4: Extract

Now we run the first stage of the pipeline: **Extract**.

Extract means:

- dlt sends requests to the Open Library API
- the raw JSON responses are downloaded
- the results are stored in dlt‚Äôs local working folder

At this stage, the data is **not** in DuckDB yet. We are just confirming that we successfully pulled data from the API.

In [5]:
extract_info = pipeline.extract(openlibrary_source())

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 114.05 MB (53.00%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 0.91s | Rate: 0.00/s
books: 100  | Time: 0.00s | Rate: 18236104.35/s
Memory usage: 117.30 MB (53.00%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 2.06s | Rate: 0.00/s
books: 400  | Time: 1.14s | Rate: 350.12/s
Memory usage: 118.18 MB (53.00%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 3.61s | Rate: 0.00/s
books: 700  | Time: 2.69s | Rate: 259.86/s
Memory usage: 118.30 MB (53.00%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 5.00s | Rate: 0

---

### What we will print

After extraction, we will print a small summary showing:

- which **resources** were extracted
- which **tables** will be created later
- how many rows were extracted per resource

This helps confirm that the pipeline is working before we move on to normalization.

In [6]:
load_id = extract_info.loads_ids[-1]
m = extract_info.metrics[load_id][0]

print("Resources:", list(m["resource_metrics"].keys()))
print("Tables:", list(m["table_metrics"].keys()))
print("Load ID:", load_id)
print()

for resource, rm in m["resource_metrics"].items():
    print(f"Resource: {resource}")
    print(f"rows extracted: {rm.items_count}")
    print()

Resources: ['books']
Tables: ['books']
Load ID: 1772033868.1635435

Resource: books
rows extracted: 3759



### What you should see after Extract

In our case, Extract shows only **one resource and one table**:

- **Resources:** `['books']`  
- **Tables:** `['books']`

That is expected.

The `search` endpoint returns a list of book results, so dlt stores those rows in a single table called `books`. The interesting part comes next, because many fields inside each row are lists or nested objects. Those will turn into additional tables during **Normalize**.

Example output:

- **25 rows extracted** means we pulled 25 search results (books)  

---

## üîÑ Step 5: Normalize

Now we run **Normalize**. This is where dlt transforms raw JSON into a clean relational structure.

During normalization, dlt does three key things:

### 1. Adds Tracking Columns to the Main Table

dlt adds special columns to every table:
- `_dlt_id`: A unique identifier for each row
- `_dlt_load_id`: Links each row to the load job that created it

### 2. Flattens Nested Data into Child Tables

APIs often return nested JSON. For example, a book can have multiple authors (a list), multiple editions, and multiple identifiers.

dlt flattens these nested structures into separate **child tables** with names like:
- `books__author_name`
- `books__author_key`
- `books__language`

Each child table has a `_dlt_parent_id` column that references `_dlt_id` in the parent table. This is how dlt maintains relationships.

### 3. Creates Metadata Tables

dlt also creates internal tables to track pipeline state:
- `_dlt_loads`: Tracks load history (when data was loaded, status)
- `_dlt_pipeline_state`: Stores pipeline state for incremental loading
- `_dlt_version`: Tracks schema versions

In the next cell, we will print a summary showing which tables were created.


In [7]:
normalize_info = pipeline.normalize()

------------------- Normalize rest_api in 1772033868.1635435 -------------------
Files: 0/2 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 124.87 MB (53.10%) | CPU usage: 0.00%

------------------- Normalize rest_api in 1772033868.1635435 -------------------
Files: 0/2 (0.0%) | Time: 0.00s | Rate: 0.00/s
Items: 0  | Time: 0.00s | Rate: 0.00/s
Memory usage: 124.87 MB (53.10%) | CPU usage: 0.00%

------------------- Normalize rest_api in 1772033868.1635435 -------------------
Files: 10/2 (500.0%) | Time: 0.38s | Rate: 26.50/s
Items: 23075  | Time: 0.38s | Rate: 61240.27/s
Memory usage: 130.05 MB (53.20%) | CPU usage: 0.00%



In [8]:
load_id = normalize_info.loads_ids[-1]
m = normalize_info.metrics[load_id][0]

print("Load ID:", load_id)
print()

print("Tables created/updated:")
for table_name, tm in m["table_metrics"].items():
    # skip dlt internal tables to keep it beginner-friendly
    if table_name.startswith("_dlt"):
        continue
    print(f"  - {table_name}: {tm.items_count} rows")


Load ID: 1772033868.1635435

Tables created/updated:
  - books: 3759 rows
  - books__author_key: 4637 rows
  - books__author_name: 4637 rows
  - books__ia: 3431 rows
  - books__ia_collection: 2730 rows
  - books__language: 3748 rows
  - books__id_standard_ebooks: 12 rows
  - books__id_librivox: 64 rows
  - books__id_project_gutenberg: 56 rows


### What happened during Normalize?

After running `pipeline.normalize()`, we now see multiple tables instead of just one.

Tables created/updated:

- `books`
- `books__author_key`
- `books__author_name`
- `books__editions__docs`
- `books__editions__docs__language`
- `books__ia`

---

### What does this mean?

We started with **N book search results** in the `books` table.

During normalization:

- Each book may have **more than N authors**, so those were split into:
  - `books__author_name`
  - `books__author_key`

- Each book may contain **edition information**, which became:
  - `books__editions__docs`

- Some editions contain **language information**, which became:
  - `books__editions__docs__language`

- The `ia` field (Internet Archive IDs) is a list, so it became:
  - `books__ia`

This is the key moment in the pipeline.

The data has been transformed from nested JSON into a **relational structure** with multiple linked tables. This makes it much easier to query and analyze.

---

### Schema Visualization

dlt can render the schema as a visual diagram. Run the next cell to see the parent-child table relationships:

In [14]:
# Display schema
pipeline.default_schema
print("default_schema_name:", pipeline.default_schema_name)
print("schemas:", list(pipeline.schemas.keys()))
print(pipeline.default_schema)

default_schema_name: rest_api
schemas: ['rest_api']
<dlt.Schema(name='rest_api', version=2, tables=['_dlt_version', '_dlt_loads', 'books', '_dlt_pipeline_state', 'books__author_key', 'books__author_name', 'books__ia', 'books__ia_collection', 'books__language', 'books__id_standard_ebooks', 'books__id_librivox', 'books__id_project_gutenberg'], version_hash='ZJIabaQJ9DAYgsR04wEVeXOgU80roBUfdvrR2YoBEyU=')>


## üì§ Step 6: Load

Now we run the final stage of the pipeline: **Load**.

Load means:

- dlt creates tables in DuckDB (if they do not already exist)
- the normalized rows are inserted into those tables
- the pipeline records the load in its internal tracking tables



In [16]:
load_info = pipeline.load()

--------------------- Load rest_api in 1772033868.1635435 ----------------------
Jobs: 0/10 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 153.74 MB (54.30%) | CPU usage: 0.00%

--------------------- Load rest_api in 1772033868.1635435 ----------------------
Jobs: 10/10 (100.0%) | Time: 0.96s | Rate: 10.38/s
Memory usage: 178.20 MB (55.20%) | CPU usage: 0.00%




After this step, the data is fully stored in the database and ready to query.

At this point:

- The `books` table contains our books
- The related tables (such as `books__author_name` and `books__editions__docs`) contain the exploded nested data
- Everything is now queryable using `pipeline.dataset()` or SQL

This is the moment where the data officially moves from ‚Äúpipeline processing‚Äù into a database you can explore.

## üöÄ Step 7: Run the Full Pipeline

Now that we have walked through each step individually, we can run the entire workflow using a single command:



In [17]:
load_info = pipeline.run(openlibrary_source())

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 169.66 MB (55.10%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 0.94s | Rate: 0.00/s
books: 100  | Time: 0.00s | Rate: 32263876.92/s
Memory usage: 169.79 MB (55.10%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 2.66s | Rate: 0.00/s
books: 400  | Time: 1.72s | Rate: 232.37/s
Memory usage: 170.41 MB (55.30%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 4.34s | Rate: 0.00/s
books: 600  | Time: 3.41s | Rate: 176.14/s
Memory usage: 171.29 MB (55.50%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 5.54s | Rate: 0

<h3>What does <code>pipeline.run()</code> do?</h3>

<p>
  <code>pipeline.run()</code> simply combines the three steps we already executed manually:
</p>

<ol>
  <li><strong>Extract</strong> ‚Äì fetch data from the Open Library API</li>
  <li><strong>Normalize</strong> ‚Äì convert nested JSON into relational tables</li>
  <li><strong>Load</strong> ‚Äì write those tables into DuckDB</li>
</ol>

<p>In other words, this:</p>

<pre><code>pipeline.run(source)</code></pre>

<p>is equivalent to:</p>

<pre><code>pipeline.extract(source)
pipeline.normalize()
pipeline.load()</code></pre>

<p>
  There is no hidden magic. It just runs the full ELT process in order.
</p>


## üîé Step 8: Inspect the Loaded Data

Now that the data is loaded into DuckDB, we can inspect it using `pipeline.dataset()`.

This gives us a convenient Python interface for exploring the tables that dlt created, without writing SQL.

---

### List available tables

First, let‚Äôs see what tables exist in the dataset:

In [18]:
ds = pipeline.dataset()

In [19]:
ds.tables

['books',
 'books__author_key',
 'books__author_name',
 'books__ia',
 'books__ia_collection',
 'books__language',
 'books__id_standard_ebooks',
 'books__id_librivox',
 'books__id_project_gutenberg',
 '_dlt_version',
 '_dlt_loads',
 '_dlt_pipeline_state']

In [25]:
df = ds.books.df()      # main table
df.head(3)

Unnamed: 0,cover_edition_key,cover_i,ebook_access,edition_count,first_publish_year,has_fulltext,key,lending_edition_s,lending_identifier_s,public_scan_b,title,_dlt_load_id,_dlt_id,subtitle
0,OL61027601M,15155833,borrowable,397,1997,True,/works/OL82563W,OL38565767M,harrypotterylapi0000rowl_q5r6,False,Harry Potter and the Philosopher's Stone,1772034231.1452715,4NoSitrDvSc68Q,
1,OL26378158M,15158660,printdisabled,144,2007,True,/works/OL82586W,,,False,Harry Potter and the Deathly Hallows,1772034231.1452715,8A6CSf4X2TZ08A,
2,OL26234270M,10580435,borrowable,279,1999,True,/works/OL82536W,OL48101764M,bdrc-W8LS66814,False,Harry Potter and the Prisoner of Azkaban,1772034231.1452715,WjCwQXPIRCE9Vg,


## üí° Conclusion

### What dlt handled for us

‚úî API requests  
‚úî JSON normalization  
‚úî Table creation  
‚úî Database loading  
‚úî Simple dataset inspection  

---

### But there are still friction points

‚Ä¢ Getting the REST API config exactly right  
‚Ä¢ Remembering paginator syntax  
‚Ä¢ Remembering how to inspect tables  
‚Ä¢ Debugging schema or pagination issues  
‚Ä¢ Writing Python or SQL to get insights  

It works... but it still takes effort.

---

## üöÄ Next Up: LLM-Powered Workflows

dlt now integrates LLMs directly into the workflow to make:

‚Ä¢ Pipeline runs easier  
‚Ä¢ Debugging faster  
‚Ä¢ Schema inspection simpler  
‚Ä¢ Data analysis more natural  

Instead of writing glue code, you can use natural language.

In the workshop, we will see what that looks like.
