# Example of exacting data with dlt

In [2]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


def paginated_getter():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        # Define pagination strategy - page-based pagination
        paginator=PageNumberPaginator(   # <--- Pages are numbered (1, 2, 3, ...)
            base_page=1,   # <--- Start from page 1
            total_path=None    # <--- No total count of pages provided by API, pagination should stop when a page contains no result items
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # remember about memory management and yield data

for i, page_data in enumerate(paginated_getter()):
    print(page_data)
    if i > 2:
        break # break for testing

[{'End_Lat': 40.742963, 'End_Lon': -73.980072, 'Fare_Amt': 45.0, 'Passenger_Count': 1, 'Payment_Type': 'Credit', 'Rate_Code': None, 'Start_Lat': 40.641525, 'Start_Lon': -73.787442, 'Tip_Amt': 9.0, 'Tolls_Amt': 4.15, 'Total_Amt': 58.15, 'Trip_Distance': 17.52, 'Trip_Dropoff_DateTime': '2009-06-14 23:48:00', 'Trip_Pickup_DateTime': '2009-06-14 23:23:00', 'mta_tax': None, 'store_and_forward': None, 'surcharge': 0.0, 'vendor_name': 'VTS'}, {'End_Lat': 40.740187, 'End_Lon': -74.005698, 'Fare_Amt': 6.5, 'Passenger_Count': 1, 'Payment_Type': 'Credit', 'Rate_Code': None, 'Start_Lat': 40.722065, 'Start_Lon': -74.009767, 'Tip_Amt': 1.0, 'Tolls_Amt': 0.0, 'Total_Amt': 8.5, 'Trip_Distance': 1.56, 'Trip_Dropoff_DateTime': '2009-06-18 17:43:00', 'Trip_Pickup_DateTime': '2009-06-18 17:35:00', 'mta_tax': None, 'store_and_forward': None, 'surcharge': 1.0, 'vendor_name': 'VTS'}, {'End_Lat': 40.718043, 'End_Lon': -74.004745, 'Fare_Amt': 12.5, 'Passenger_Count': 5, 'Payment_Type': 'Credit', 'Rate_Code': N

# Normalizing data with dlt

- Automatically detects schema – No need to define column types manually.
- Flattens nested JSON – Converts complex structures into table-ready formats.
- Handles data type conversion – Converts dates, numbers, and booleans correctly.
- Splits lists into child tables – Ensures relational integrity for better analysis.
- Schema evolution support – Adapts to changes in data structure over time.

In [3]:
data = [
    {
        "vendor_name": "VTS",
        "record_hash": "b00361a396177a9cb410ff61f20015ad",
        "time": {
            "pickup": "2009-06-14 23:23:00",
            "dropoff": "2009-06-14 23:48:00"
        },
        "coordinates": {
            "start": {"lon": -73.787442, "lat": 40.641525},
            "end": {"lon": -73.980072, "lat": 40.742963}
        },
        "passengers": [
            {"name": "John", "rating": 4.9},
            {"name": "Jack", "rating": 3.9}
        ]
    }
]

## How dlt normalizes this data automatically

Instead of manually flattening fields and extracting nested lists, we can load it directly into dlt:

In [4]:
import dlt

# Define a dlt pipeline with automatic normalization
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_data",
    destination="duckdb",
    dataset_name="taxi_rides",
)

# Run the pipeline with raw nested data
info = pipeline.run(data, table_name="rides", write_disposition="replace")

# Print the load summary
print(info)

print(pipeline.last_trace)

Pipeline ny_taxi_data load step completed in 1.30 seconds
1 load package(s) were loaded to destination duckdb and into dataset taxi_rides
The duckdb destination used duckdb:///c:\Users\swang\projects\data_engineer_zoomcamp\week_4\dbt workshop\ny_taxi_data.duckdb location to store data
Load package 1740090483.3783855 is LOADED and contains no failed jobs
Run started at 2025-02-20 22:28:03.225441+00:00 and COMPLETED in 1.76 seconds with 4 steps.
Step extract COMPLETED in 0.11 seconds.

Load package 1740090483.3783855 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.12 seconds.
Normalized data for the following tables:
- rides: 1 row(s)
- rides__passengers: 2 row(s)
- _dlt_pipeline_state: 1 row(s)

Load package 1740090483.3783855 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 1.36 seconds.
Pipeline ny_taxi_data load step completed in 1.30 seconds
1 load package(s) were loa

## What happens behind the scenes?
After running this pipeline, dlt automatically transforms the data into the following normalized structure:

Main table: rides

In [5]:
pipeline.dataset(dataset_type="default").rides.df()

Unnamed: 0,vendor_name,record_hash,time__pickup,time__dropoff,coordinates__start__lon,coordinates__start__lat,coordinates__end__lon,coordinates__end__lat,_dlt_load_id,_dlt_id
0,VTS,b00361a396177a9cb410ff61f20015ad,2009-06-14 23:23:00+00:00,2009-06-14 23:48:00+00:00,-73.787442,40.641525,-73.980072,40.742963,1740090483.3783855,+XSNuHo4anwwVg


This table displays structured taxi ride data, including vendor details, timestamps, coordinates, and dlt metadata.

Child Table: rides_passengers

In [6]:
pipeline.dataset(dataset_type="default").rides__passengers.df()

Unnamed: 0,name,rating,_dlt_parent_id,_dlt_list_idx,_dlt_id
0,John,4.9,+XSNuHo4anwwVg,0,6UkbwUF5qmjZUA
1,Jack,3.9,+XSNuHo4anwwVg,1,I3rIcvTPnVb+zA


# Loading data

Now that we’ve covered extracting and normalizing data, the final step is loading the data into a destination. This is where the processed data is stored, making it ready for querying, analysis, or further transformations.

## How data loading happens without dlt

Before dlt, data engineers had to manually handle schema validation, batch processing, error handling, and retries for every destination. This process becomes especially complex when loading data into data warehouses and data lakes, where performance optimization, partitioning, and incremental updates are critical.

### Example: Loading data into database without dlt
A basic pipeline requires:

1. Setting up a database connection.
2. Creating tables and defining schemas.
3. Handling schema changes manually.
4. Writing queries to insert/update data.

In [7]:
import duckdb

# 1. Create a connection to an in-memory DuckDB database
conn = duckdb.connect("ny_taxi_manual.db")

# 2. Create the rides Table
# Since our dataset has nested structures, we must manually flatten it before inserting data.
conn.execute("""
CREATE TABLE IF NOT EXISTS rides (
    record_hash TEXT PRIMARY KEY,
    vendor_name TEXT,
    pickup_time TIMESTAMP,
    dropoff_time TIMESTAMP,
    start_lon DOUBLE,
    start_lat DOUBLE,
    end_lon DOUBLE,
    end_lat DOUBLE
);
""")

# 3. Insert Data Manually
# Since JSON data has nested fields, we need to extract and transform them before inserting them into DuckDB.
data = [
    {
        "vendor_name": "VTS",
        "record_hash": "b00361a396177a9cb410ff61f20015ad",
        "time": {
            "pickup": "2009-06-14 23:23:00",
            "dropoff": "2009-06-14 23:48:00"
        },
        "coordinates": {
            "start": {"lon": -73.787442, "lat": 40.641525},
            "end": {"lon": -73.980072, "lat": 40.742963}
        }
    }
]

# Prepare data for insertion
flattened_data = [
    (
        ride["record_hash"],
        ride["vendor_name"],
        ride["time"]["pickup"],
        ride["time"]["dropoff"],
        ride["coordinates"]["start"]["lon"],
        ride["coordinates"]["start"]["lat"],
        ride["coordinates"]["end"]["lon"],
        ride["coordinates"]["end"]["lat"]
    )
    for ride in data
]

# Insert into DuckDB
conn.executemany("""
INSERT INTO rides (record_hash, vendor_name, pickup_time, dropoff_time, start_lon, start_lat, end_lon, end_lat)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", flattened_data)

print("Data successfully loaded into DuckDB!")


# 4. Query Data in DuckDB
# Now that the data is loaded, we can query it using DuckDB’s SQL engine.
df = conn.execute("SELECT * FROM rides").df()

conn.close()

Data successfully loaded into DuckDB!


### Example: Loading data into database with dlt

To use all the power of dlt is better to wrap our API Client in the @dlt.resource decorator which denotes a logical grouping of data within a data source, typically holding data of similar structure and origin:

In [16]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


# Define the API resource for NYC taxi data
@dlt.resource(name="rides")   # <--- The name of the resource (will be used as the table name)
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # <--- yield data to manage memory


# define new dlt pipeline
pipeline = dlt.pipeline(destination="duckdb")

# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi, write_disposition="replace")
print(load_info)

# explore loaded data
pipeline.dataset(dataset_type="default").rides.df()

Pipeline dlt_ipykernel_launcher load step completed in 1.89 seconds
1 load package(s) were loaded to destination duckdb and into dataset dlt_ipykernel_launcher_dataset
The duckdb destination used duckdb:///c:\Users\swang\projects\data_engineer_zoomcamp\week_4\dbt workshop\dlt_ipykernel_launcher.duckdb location to store data
Load package 1740094695.233581 is LOADED and contains no failed jobs


Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,surcharge,vendor_name,_dlt_load_id,_dlt_id,store_and_forward
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00+00:00,2009-06-14 23:23:00+00:00,0.0,VTS,1740094695.233581,xndNhPblTaRCKA,
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.00,8.50,1.56,2009-06-18 17:43:00+00:00,2009-06-18 17:35:00+00:00,1.0,VTS,1740094695.233581,eymKyuj+VVc3mA,
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.00,15.50,3.37,2009-06-10 18:27:00+00:00,2009-06-10 18:08:00+00:00,1.0,VTS,1740094695.233581,m55Uemm1QC8Eow,
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.00,5.40,1.11,2009-06-14 23:58:00+00:00,2009-06-14 23:54:00+00:00,0.5,VTS,1740094695.233581,aDhv79m6xxMI/Q,
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00+00:00,2009-06-13 13:01:00+00:00,0.0,VTS,1740094695.233581,/SNB4+J/r2gdWw,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,40.783522,-73.970690,5.7,1,CASH,40.778560,-73.953660,0.0,0.00,5.70,1.16,2009-06-19 11:28:00+00:00,2009-06-19 11:22:00+00:00,0.0,VTS,1740094695.233581,6s08QBVumcx8jw,
9996,40.777200,-73.964197,4.1,1,CASH,40.779800,-73.974297,0.0,0.00,4.10,0.89,2009-06-17 07:43:00+00:00,2009-06-17 07:41:00+00:00,0.0,VTS,1740094695.233581,FN79cy4Bh2APrg,
9997,40.780172,-73.957617,6.1,1,CASH,40.788388,-73.976758,0.0,0.00,6.10,1.30,2009-06-19 11:46:00+00:00,2009-06-19 11:39:00+00:00,0.0,VTS,1740094695.233581,JaCkPa9DXsSXUA,
9998,40.777342,-73.957242,5.7,1,CASH,40.773828,-73.956690,0.0,0.00,6.20,0.97,2009-06-17 04:19:00+00:00,2009-06-17 04:13:00+00:00,0.5,VTS,1740094695.233581,oNTbgYLvdx3dpA,


## Incremental Loading
Incremental loading allows us to update datasets by loading only new or changed data, instead of replacing the entire dataset. This makes pipelines faster and more cost-effective by reducing redundant data processing.

## How does incremental loading work?
Incremental loading works alongside two key concepts:

Incremental extraction – Only extracts the new or modified data rather than retrieving everything again.
State tracking – Keeps track of what has already been loaded, ensuring that only new data is processed.
In dlt, state is stored in a separate table at the destination, allowing pipelines to track what has been processed.

### Incremental loading methods in dlt
dlt provides two ways to load data incrementally:

#### 1. Append (adding new records)
- Best for immutable or stateless data, such as taxi ride records.
- Each run adds new records without modifying previous data.
- Can also be used to create a history of changes (slowly changing dimensions).

Example:

- If taxi ride data is loaded daily, only new rides are added, rather than reloading the full history.
- If tracking changes in a list of vehicles, each version is stored as a new row for auditing.

----

#### 2. Merge (updating existing records)
- Best for updating existing records (stateful data).
- Replaces old records with updated ones based on a unique key.
- Useful for tracking status changes, such as payment updates.

Example:

- A taxi ride's payment status could change from "booked" to "cancelled", requiring an update.
- A customer profile might be updated with a new email or phone number.

### Example: Incremental loading with dlt
The goal: download only trips made after June 15, 2009, skipping the old ones.

Using dlt, we set up an incremental filter to only fetch trips made after a certain date:


In [9]:
cursor_date = dlt.sources.incremental("Trip_Dropoff_DateTime", initial_value="2009-06-15")

This tells dlt:

- Start date: June 15, 2009 (initial_value).
- Field to track: Trip_Dropoff_DateTime (our timestamp).

As you run the pipeline repeatedly, dlt will keep track of the latest Trip_Dropoff_DateTime value processed. It will skip records older than this date in future runs.
    
Let's make the data resource incremental using dlt.sources.incremental:

In [10]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


@dlt.resource(name="rides", write_disposition="append")
def ny_taxi(
    cursor_date=dlt.sources.incremental(
        "Trip_Dropoff_DateTime",   # <--- field to track, our timestamp
        initial_value="2009-06-15",   # <--- start date June 15, 2009
        )
    ):
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

Finally, we run our pipeline and load the fresh taxi rides data:

In [11]:
# define new dlt pipeline
pipeline = dlt.pipeline(pipeline_name="ny_taxi", destination="duckdb", dataset_name="ny_taxi_data")

# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi)
print(pipeline.last_trace)

Run started at 2025-02-20 22:50:55.932777+00:00 and COMPLETED in 25.14 seconds with 4 steps.
Step extract COMPLETED in 22.99 seconds.

Load package 1740091856.068268 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.82 seconds.
Normalized data for the following tables:
- rides: 5325 row(s)
- _dlt_pipeline_state: 1 row(s)

Load package 1740091856.068268 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 1.21 seconds.
Pipeline ny_taxi load step completed in 1.16 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:///c:\Users\swang\projects\data_engineer_zoomcamp\week_4\dbt workshop\ny_taxi.duckdb location to store data
Load package 1740091856.068268 is LOADED and contains no failed jobs

Step run COMPLETED in 25.14 seconds.
Pipeline ny_taxi load step completed in 1.16 seconds
1 load package(s) were loaded

Only 5325 rows were flitered out and loaded into the duckdb destination. Let's take a look at the earliest date in the loaded data:

In [None]:
# define new dlt pipeline
pipeline = dlt.pipeline(pipeline_name="ny_taxi", destination="duckdb", dataset_name="ny_taxi_data")

# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi)
print(pipeline.last_trace)

Run started at 2025-02-20 22:51:38.417959+00:00 and COMPLETED in 22.17 seconds with 4 steps.
Step extract COMPLETED in 22.00 seconds.

Load package 1740091898.5471213 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.06 seconds.
No data found to normalize

Step load COMPLETED in 0.03 seconds.
Pipeline ny_taxi load step completed in ---
0 load package(s) were loaded to destination duckdb and into dataset None
The duckdb destination used duckdb:///c:\Users\swang\projects\data_engineer_zoomcamp\week_4\dbt workshop\ny_taxi.duckdb location to store data

Step run COMPLETED in 22.17 seconds.
Pipeline ny_taxi load step completed in ---
0 load package(s) were loaded to destination duckdb and into dataset None
The duckdb destination used duckdb:///c:\Users\swang\projects\data_engineer_zoomcamp\week_4\dbt workshop\ny_taxi.duckdb location to store data


### Example: Loading data into a Data Warehouse (BigQuery)

In [13]:
!pip install dlt[bigquery]

Defaulting to user installation because normal site-packages is not writeable


The system cannot find the path specified.


In [14]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


@dlt.resource(name="rides", write_disposition="replace")
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

#### Choosing a destination

Switching between data warehouses (BigQuery, Snowflake, Redshift) or data lakes (S3, Google Cloud Storage, Parquet files) in dlt is incredibly straightforward — simply modify the destination parameter in your pipeline configuration.

In [15]:
pipeline = dlt.pipeline(
    pipeline_name='taxi_data',
    destination='duckdb', # <--- to test pipeline locally
    dataset_name='taxi_rides',
)

pipeline = dlt.pipeline(
    pipeline_name='taxi_data',
    destination='bigquery', # <--- to run pipeline in production
    dataset_name='taxi_rides',
)



#### Set Credentials

The next logical step is to set credentials using dlt's TOML providers or environment variables (ENVs).

In [None]:
import os
from google.colab import userdata

os.environ["DESTINATION__BIGQUERY__CREDENTIALS"] = userdata.get('BIGQUERY_CREDENTIALS')

pipeline = dlt.pipeline(
    pipeline_name="taxi_data",
    destination="bigquery",
    dataset_name="taxi_rides",
    dev_mode=True,
)

info = pipeline.run(ny_taxi)
print(info)