### without dlt

- Schema management is manual – If the schema changes, you need to update table structures manually 
- No automatic retries – If the network fails, data may be lost
- No incremental loading – Every run reloads everything, making it slow and expensive
- More code to maintain – A simple pipeline quickly becomes complex

In [None]:
import duckdb

In [None]:
# Create a connection to an in-memory DuckDB database
conn = duckdb.connect("ny_taxi_manual.db")

In [None]:
# Create the Rides table
conn.execute("""
CREATE TABLE IF NOT EXISTS rides (
    record_hash TEXT PRIMARY KEY,
    vendor_name TEXT,
    pickup_time TIMESTAMP,
    dropoff_time TIMESTAMP,
    start_lon DOUBLE,
    start_lat DOUBLE,
    end_lon DOUBLE,
    end_lat DOUBLE
);
""")

In [None]:
# Insert Data Manually
# Since JSON data has nested fields, we need to extract and transform them before inserting them into DuckDB
data = [
    {
        "vendor_name": "VTS",
        "record_hash": "b00361a396177a9cb410ff61f20015ad",
        "time": {
            "pickup": "2009-06-14 23:23:00",
            "dropoff": "2009-06-14 23:48:00"
        },
        "coordinates": {
            "start": {"lon": -73.787442, "lat": 40.641525},
            "end": {"lon": -73.980072, "lat": 40.742963}
        }
    }
]

# Prepare data for insertion
flattened_data = [
    (
        ride["record_hash"],
        ride["vendor_name"],
        ride["time"]["pickup"],
        ride["time"]["dropoff"],
        ride["coordinates"]["start"]["lon"],
        ride["coordinates"]["start"]["lat"],
        ride["coordinates"]["end"]["lon"],
        ride["coordinates"]["end"]["lat"]
    )
    for ride in data
]

# Insert into DuckDB
conn.executemany("""
INSERT INTO rides (record_hash, vendor_name, pickup_time, dropoff_time, start_lon, start_lat, end_lon, end_lat)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", flattened_data)

print("Data successfully loaded into DuckDB!")

In [None]:
# Query Data in DuckDB
df = conn.execute("SELECT * FROM rides").df()

conn.close()

### with dlt 

- Supports multiple destinations (BigQuery, GCS, S3, Redshift, Snowflake, Postgres,...)
- Optimized for performance – Uses batch loading, parallelism, and streaming for fast and scalable data transfer
- Schema-aware – Ensures that column names, data types, and structures match the destination’s requirements
- Incremental loading – Avoids unnecessary reloading by only inserting new or updated records
- Resilience & retries – Automatically handles failures, ensuring data is loaded without missing records

In [None]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

In [None]:
# Define the API resource for NYC taxi data
@dlt.resource(name="rides") # will be used as the table name
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"): # API endpoint for retrieving taxi ride data
        yield page # yield data to manage memory

In [None]:
# define new dlt pipeline
pipeline = dlt.pipeline(destination="duckdb")

In [None]:
# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi, write_disposition="replace")
print(load_info)

In [None]:
# explore loaded data
pipeline.dataset(dataset_type="default").rides.df()

### Incremental loading with dlt

In [None]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

In [None]:
# download only trips made after June 15, 2009, skipping the old ones (append)
@dlt.resource(name="rides", write_disposition="append")
def ny_taxi(
    cursor_date=dlt.sources.incremental(
        "Trip_Dropoff_DateTime",   # field to track, our timestamp
        initial_value="2009-06-15", # start date June 15, 2009
        )
    ):
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

In [None]:
# define new dlt pipeline
# when run the second time, no new data will be loaded
pipeline = dlt.pipeline(pipeline_name="ny_taxi", destination="duckdb", dataset_name="ny_taxi_data")

# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi)
print(pipeline.last_trace)

In [None]:
# query the earliest date in the loaded data
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            MIN(trip_dropoff_date_time)
            FROM rides;
            """
        )
    print(res)

### Loading data into a Data Warehouse (BigQuery)

In [None]:
# install the dependencies
!pip install dlt[bigquery]

In [None]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator
import os
import json
from google.colab import userdata

In [None]:
# assign value for env var
userdata = {}
with open("credentials.json", "r") as f:
    userdata["BIGQUERY_CREDENTIALS"] = json.dumps(json.load(f))

os.environ["DESTINATION__BIGQUERY__CREDENTIALS"] = userdata.get("BIGQUERY_CREDENTIALS")

In [None]:
# upload key to Google Colab (need to Create Service Account (assign BigQuery Admin & create key) first)
from google.colab import files
uploaded = files.upload()

In [None]:
# fetch data from API
@dlt.resource(name="rides", write_disposition="replace")
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

In [None]:
# define pipeline & run
pipeline = dlt.pipeline(
    pipeline_name="taxi_data",
    destination="bigquery",
    dataset_name="taxi_rides",
    dev_mode=True,
)

info = pipeline.run(ny_taxi)
print(info)

### Loading data into a Data Lake (Parquet on Local FS or S3)

In [None]:
import os
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

In [None]:
# Set up a local bucket or cloud directory for storing files
os.environ["BUCKET_URL"] = "/content"

In [None]:
# fetch data from API
@dlt.resource(name="rides", write_disposition="replace")
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

In [None]:
# define pipeline & run
pipeline = dlt.pipeline(
    pipeline_name='fs_pipeline',
    destination='filesystem',
    dataset_name='fs_data',
)

load_info = pipeline.run(ny_taxi, loader_file_format="parquet") # choose a file format: parquet, csv or jsonl
print(load_info)

In [None]:
# explore loaded data
pipeline.dataset(dataset_type="default").rides.df()

### Loading to Delta Lake or Iceberg

In [None]:
!pip install "dlt[pyiceberg]"

In [None]:
import os
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

In [None]:
os.environ["BUCKET_URL"] = "/content"

In [None]:
# fetch data from API
@dlt.resource(name="rides", write_disposition="replace")
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

In [None]:
pipeline = dlt.pipeline(
    pipeline_name='fs_pipeline',
    destination='filesystem',
    dataset_name='fs_iceberg_data',
)


load_info = pipeline.run(
    ny_taxi,
    loader_file_format="parquet",
    table_format="iceberg", 
)
print(load_info)