# Building a Data Pipeline for NYC Yellow Trip Data

In this notebook, we will build a complete data pipeline using **dlt** to process NYC Yellow Taxi Trip data.

Our goal is:

‚Üí Fetch real trip data from the Data Engineering Zoomcamp API  
‚Üí Turn it into clean relational tables  
‚Üí Load it into DuckDB  
‚Üí Explore and analyze it  

We will use the **Data Engineering Zoomcamp API** as our data source and **DuckDB** as our database.

Along the way, you will learn:

- How to configure a paginated REST API source  
- How to handle pagination that stops on empty pages  
- How the Extract ‚Üí Normalize ‚Üí Load process works  
- How to inspect and explore NYC taxi trip data  

By the end, you will have a working pipeline processing real-world transportation data.

## üì¶ Step 0: Install Dependencies

In [1]:
# install dependencies first
!pip -q install dlt[duckdb]


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m26.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In this notebook we will use:

- **dlt** to extract, normalize, and load data
- **DuckDB** as the destination database (runs locally)

DuckDB is great for data analysis because it requires no setup and no credentials.

## üìö Step 1: Import Libraries

In this cell we import the libraries we will use throughout the notebook:

- **dlt** is the main library for building and running the pipeline
- **rest_api_source** helps us define an API source using a simple configuration
- **islice** (from `itertools`) is a small Python helper for previewing only a few records

In [2]:
import dlt
from itertools import islice
from dlt.sources.rest_api import rest_api_source

## üîó Step 2: Define the API Source (NYC Yellow Trip Data)

In **dlt**, a **source** is the part of your pipeline that knows how to fetch data from somewhere.
In this notebook, our source fetches NYC Yellow Taxi trip data from the **Data Engineering Zoomcamp API**.

We define the source using `rest_api_source`, which lets us describe an API in a simple Python dictionary.

**API Specifications:**
- **Base URL:** https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api
- **Format:** Paginated JSON
- **Page Size:** 1,000 records per page
- **Pagination:** Stops when an empty page is returned

In [3]:
def nyc_yellow_trip_source():
    """
    Creates a dlt source that fetches NYC Yellow Taxi trip data
    from the Data Engineering Zoomcamp API.
    """
    return rest_api_source({
        "client": {
            "base_url": "https://us-central1-dlthub-analytics.cloudfunctions.net",
        },
        "resources": [
            {
                "name": "yellow_trips",
                "endpoint": {
                    "path": "data_engineering_zoomcamp_api",
                    "paginator": {
                        "type": "page_number",
                        "page_param": "page",
                        "total_path": None,  # Stop on empty page
                        "base_page": 1,
                    },
                },
            },
        ],
    })

## üîß Step 3: Create the dlt Pipeline

In [4]:
pipeline = dlt.pipeline(
    pipeline_name="nyc_yellow_trip_pipeline",
    destination="duckdb",
    dataset_name="nyc_taxi_data",
    progress="log"  # logs the pipeline run (Optional)
)

## üîç Understanding the Pipeline

At this point we have defined two key building blocks:

- **The source** describes where the data comes from and how to fetch it from the API.  
- **The pipeline** describes where the data should go (DuckDB) and keeps track of tables, schemas, and run history.  

---

Instead of running everything at once, we will now run the pipeline in three separate phases so you can clearly see what happens at each stage:

1. **Extract**: download raw data from the API  
2. **Normalize**: turn nested JSON into relational tables  
3. **Load**: write those tables into DuckDB  

Once these steps make sense, we will run the full workflow again using one command:

```python
pipeline.run(source)
```

## ‚¨áÔ∏è Step 4: Extract

Now we run the first stage of the pipeline: **Extract**.

Extract means:

- dlt sends requests to the Data Engineering Zoomcamp API
- the raw JSON responses are downloaded
- the results are stored in dlt's local working folder

At this stage, the data is **not** in DuckDB yet. We are just confirming that we successfully pulled data from the API.

In [5]:
extract_info = pipeline.extract(nyc_yellow_trip_source())

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 177.91 MB (33.40%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 2.94s | Rate: 0.00/s
yellow_trips: 1000  | Time: 0.00s | Rate: 127100121.21/s
Memory usage: 180.04 MB (33.40%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 5.78s | Rate: 0.00/s
yellow_trips: 2000  | Time: 2.84s | Rate: 703.77/s
Memory usage: 181.79 MB (32.80%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%) | Time: 8.60s | Rate: 0.00/s
yellow_trips: 3000  | Time: 5.67s | Rate: 529.30/s
Memory usage: 182.91 MB (32.80%) | CPU usage: 0.00%

------------------------------- Extract rest_api -------------------------------
Resources: 0/1 (0.0%

---

### What we will print

After extraction, we will print a small summary showing:

- which **resources** were extracted
- which **tables** will be created later
- how many rows were extracted per resource

This helps confirm that the pipeline is working before we move on to normalization.

In [6]:
load_id = extract_info.loads_ids[-1]
m = extract_info.metrics[load_id][0]

print("Resources:", list(m["resource_metrics"].keys()))
print("Tables:", list(m["table_metrics"].keys()))
print("Load ID:", load_id)
print()

for resource, rm in m["resource_metrics"].items():
    print(f"Resource: {resource}")
    print(f"rows extracted: {rm.items_count}")
    print()

Resources: ['yellow_trips']
Tables: ['yellow_trips']
Load ID: 1772214533.8725717

Resource: yellow_trips
rows extracted: 10000



### What you should see after Extract

After extraction, you should see:

- **Resources:** `['yellow_trips']`  
- **Tables:** `['yellow_trips']`

The number of rows extracted depends on how many taxi trip records are available from the API. Each page contains up to 1,000 records, and the pagination continues until an empty page is returned.

---

## üîÑ Step 5: Normalize

Now we run **Normalize**. This is where dlt transforms raw JSON into a clean relational structure.

During normalization, dlt does three key things:

### 1. Adds Tracking Columns to the Main Table

dlt adds special columns to every table:
- `_dlt_id`: A unique identifier for each row
- `_dlt_load_id`: Links each row to the load job that created it

### 2. Flattens Nested Data into Child Tables

If the API response contains nested structures (like arrays or objects), dlt will flatten them into separate child tables with names like:
- `yellow_trips__nested_field`

Each child table has a `_dlt_parent_id` column that references `_dlt_id` in the parent table.

### 3. Creates Metadata Tables

dlt also creates internal tables to track pipeline state:
- `_dlt_loads`: Tracks load history (when data was loaded, status)
- `_dlt_pipeline_state`: Stores pipeline state for incremental loading
- `_dlt_version`: Tracks schema versions

In the next cell, we will print a summary showing which tables were created.

In [7]:
normalize_info = pipeline.normalize()

------------------- Normalize rest_api in 1772214256.4101622 -------------------
Files: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 187.74 MB (32.90%) | CPU usage: 0.00%

------------------- Normalize rest_api in 1772214256.4101622 -------------------
Files: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Items: 0  | Time: 0.00s | Rate: 0.00/s
Memory usage: 187.74 MB (32.90%) | CPU usage: 0.00%

------------------- Normalize rest_api in 1772214256.4101622 -------------------
Files: 1/1 (100.0%) | Time: 0.11s | Rate: 9.39/s
Items: 1000  | Time: 0.11s | Rate: 9452.21/s
Memory usage: 187.93 MB (32.90%) | CPU usage: 0.00%



PipelineStepFailed: Pipeline execution failed at `step=normalize` when processing package with `load_id=1772214256.4101622` with exception:

<class 'dlt.common.schema.exceptions.UnboundColumnException'>
In schema `rest_api`: The following columns in table `yellow_trips` did not receive any data during this load:
  - id (marked as non-nullable primary key and must have values)

This can happen if you specify columns manually, for example, using the `merge_key`, `primary_key` or `columns` argument but they do not exist in the data.


Pending packages are left in the pipeline and will be re-tried on the next pipeline run. If you pass new data to extract to next run, it will be ignored. Run `dlt pipeline nyc_yellow_trip_pipeline info` for more information or `dlt pipeline nyc_yellow_trip_pipeline drop-pending-packages` to drop pending packages.

In [None]:
load_id = normalize_info.loads_ids[-1]
m = normalize_info.metrics[load_id][0]

print("Load ID:", load_id)
print()

print("Tables created/updated:")
for table_name, tm in m["table_metrics"].items():
    # skip dlt internal tables to keep it beginner-friendly
    if table_name.startswith("_dlt"):
        continue
    print(f"  - {table_name}: {tm.items_count} rows")

IndexError: list index out of range

### What happened during Normalize?

After running `pipeline.normalize()`, the data has been transformed from raw JSON into a **relational structure**.

If the NYC Yellow Trip data contains nested fields, you may see additional child tables created automatically by dlt.

---

### Schema Visualization

dlt can render the schema as a visual diagram. Run the next cell to see the table relationships:

In [None]:
# Display schema
pipeline.default_schema

NameError: name 'pipeline' is not defined

## üì§ Step 6: Load

Now we run the final stage of the pipeline: **Load**.

Load means:

- dlt creates tables in DuckDB (if they do not already exist)
- the normalized rows are inserted into those tables
- the pipeline records the load in its internal tracking tables

In [None]:
load_info = pipeline.load()

--------------------- Load rest_api in 1772212573.7407193 ----------------------
Jobs: 0/1 (0.0%) | Time: 0.00s | Rate: 0.00/s
Memory usage: 207.52 MB (22.90%) | CPU usage: 0.00%

--------------------- Load rest_api in 1772212573.7407193 ----------------------
Jobs: 0/1 (0.0%) | Time: 0.01s | Rate: 0.00/s
Memory usage: 209.77 MB (22.90%) | CPU usage: 0.00%



PipelineStepFailed: Pipeline execution failed at `step=load` when processing package with `load_id=1772212573.7407193` with exception:

<class 'dlt.destinations.exceptions.DestinationConnectionError'>
Connection with `client_type=DuckDbSqlClient` to `dataset_name=nyc_taxi_data` failed. Please check if you configured the credentials at all and provided the right credentials values. You can be also denied access or your internet connection may be down. The actual reason given is: IO Error: Cannot open file "/home/hadoop/workspace/notebooks/workshop-01/nyc_yellow_trip_pipeline.duckdb": Permission denied

Pending packages are left in the pipeline and will be re-tried on the next pipeline run. If you pass new data to extract to next run, it will be ignored. Run `dlt pipeline nyc_yellow_trip_pipeline info` for more information or `dlt pipeline nyc_yellow_trip_pipeline drop-pending-packages` to drop pending packages.

After this step, the data is fully stored in the database and ready to query.

At this point:

- The `yellow_trips` table contains NYC Yellow Taxi trip records
- Any related child tables contain exploded nested data
- Everything is now queryable using `pipeline.dataset()` or SQL

This is the moment where the data officially moves from "pipeline processing" into a database you can explore.

## üöÄ Step 7: Run the Full Pipeline

Now that we have walked through each step individually, we can run the entire workflow using a single command:

In [None]:
load_info = pipeline.run(nyc_yellow_trip_source())

### What does `pipeline.run()` do?

`pipeline.run()` simply combines the three steps we already executed manually:

1. **Extract** ‚Äì fetch data from the NYC Yellow Trip API
2. **Normalize** ‚Äì convert nested JSON into relational tables
3. **Load** ‚Äì write those tables into DuckDB

In other words, this:

```python
pipeline.run(source)
```

is equivalent to:

```python
pipeline.extract(source)
pipeline.normalize()
pipeline.load()
```

There is no hidden magic. It just runs the full ELT process in order.

## üîé Step 8: Inspect the Loaded Data

Now that the data is loaded into DuckDB, we can inspect it using `pipeline.dataset()`.

This gives us a convenient Python interface for exploring the tables that dlt created, without writing SQL.

---

### List available tables

First, let's see what tables exist in the dataset:

In [None]:
ds = pipeline.dataset()

In [None]:
ds.tables

### Preview the yellow_trips table

Let's look at the first few rows of the main table:

In [None]:
df = ds.yellow_trips.df()      # main table
df.head(10)

### Basic Data Analysis

Let's explore some basic statistics about the NYC Yellow Taxi trips:

In [None]:
# Total number of trips
print(f"Total trips: {len(df)}")
print()

# Display data types 
print("Data types:")
print(df.dtypes)

## üí° Conclusion

### What we accomplished

In this notebook, we built a complete data pipeline that:

‚úî Fetches NYC Yellow Taxi trip data from a REST API  
‚úî Handles pagination automatically (stops on empty page)  
‚úî Normalizes JSON into relational tables  
‚úî Loads data into DuckDB  
‚úî Provides easy data inspection and analysis  

---

### What dlt handled for us

‚úî API requests with proper pagination  
‚úî JSON to relational normalization  
‚úî Table creation with proper schemas  
‚úî Database loading  
‚úî Simple dataset inspection  

---

### Key Takeaways

‚Ä¢ **Extract** downloads raw data from the API  
‚Ä¢ **Normalize** converts JSON into clean relational tables  
‚Ä¢ **Load** writes data into the destination database  
‚Ä¢ `pipeline.run()` executes all three steps in sequence  
‚Ä¢ The resulting data is easily queryable via `pipeline.dataset()` or SQL  

You now have a working pipeline processing real NYC Yellow Taxi trip data!