In [1]:
import numpy

In [2]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


def paginated_getter():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        # Define pagination strategy - page-based pagination
        paginator=PageNumberPaginator(   # <--- Pages are numbered (1, 2, 3, ...)
            base_page=1,   # <--- Start from page 1
            total_path=None    # <--- No total count of pages provided by API, pagination should stop when a page contains no result items
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # remember about memory management and yield data

for page_data in paginated_getter():
    print(page_data)

[{'End_Lat': 40.742963, 'End_Lon': -73.980072, 'Fare_Amt': 45.0, 'Passenger_Count': 1, 'Payment_Type': 'Credit', 'Rate_Code': None, 'Start_Lat': 40.641525, 'Start_Lon': -73.787442, 'Tip_Amt': 9.0, 'Tolls_Amt': 4.15, 'Total_Amt': 58.15, 'Trip_Distance': 17.52, 'Trip_Dropoff_DateTime': '2009-06-14 23:48:00', 'Trip_Pickup_DateTime': '2009-06-14 23:23:00', 'mta_tax': None, 'store_and_forward': None, 'surcharge': 0.0, 'vendor_name': 'VTS'}, {'End_Lat': 40.740187, 'End_Lon': -74.005698, 'Fare_Amt': 6.5, 'Passenger_Count': 1, 'Payment_Type': 'Credit', 'Rate_Code': None, 'Start_Lat': 40.722065, 'Start_Lon': -74.009767, 'Tip_Amt': 1.0, 'Tolls_Amt': 0.0, 'Total_Amt': 8.5, 'Trip_Distance': 1.56, 'Trip_Dropoff_DateTime': '2009-06-18 17:43:00', 'Trip_Pickup_DateTime': '2009-06-18 17:35:00', 'mta_tax': None, 'store_and_forward': None, 'surcharge': 1.0, 'vendor_name': 'VTS'}, {'End_Lat': 40.718043, 'End_Lon': -74.004745, 'Fare_Amt': 12.5, 'Passenger_Count': 5, 'Payment_Type': 'Credit', 'Rate_Code': N

KeyboardInterrupt: 

In [1]:
data = [
    {
        "vendor_name": "VTS",
        "record_hash": "b00361a396177a9cb410ff61f20015ad",
        "time": {
            "pickup": "2009-06-14 23:23:00",
            "dropoff": "2009-06-14 23:48:00"
        },
        "coordinates": {
            "start": {"lon": -73.787442, "lat": 40.641525},
            "end": {"lon": -73.980072, "lat": 40.742963}
        },
        "passengers": [
            {"name": "John", "rating": 4.9},
            {"name": "Jack", "rating": 3.9}
        ]
    }
]

In [2]:
import dlt

# Define a dlt pipeline with automatic normalization
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_data",
    destination="duckdb",
    dataset_name="taxi_rides",
)

# Run the pipeline with raw nested data
info = pipeline.run(data, table_name="rides", write_disposition="replace")

# Print the load summary
print(info)

Pipeline ny_taxi_data load step completed in 1.07 seconds
1 load package(s) were loaded to destination duckdb and into dataset taxi_rides
The duckdb destination used duckdb:////Users/shivsk/Documents/Projects/Data_engineering_course/dlt/ny_taxi_data.duckdb location to store data
Load package 1739790165.1638021 is LOADED and contains no failed jobs


In [3]:
print(pipeline.last_trace)

Run started at 2025-02-17 11:02:45.027157+00:00 and COMPLETED in 1.30 seconds with 4 steps.
Step extract COMPLETED in 0.06 seconds.

Load package 1739790165.1638021 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.05 seconds.
Normalized data for the following tables:
- rides: 1 row(s)
- rides__passengers: 2 row(s)

Load package 1739790165.1638021 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 1.09 seconds.
Pipeline ny_taxi_data load step completed in 1.07 seconds
1 load package(s) were loaded to destination duckdb and into dataset taxi_rides
The duckdb destination used duckdb:////Users/shivsk/Documents/Projects/Data_engineering_course/dlt/ny_taxi_data.duckdb location to store data
Load package 1739790165.1638021 is LOADED and contains no failed jobs

Step run COMPLETED in 1.30 seconds.
Pipeline ny_taxi_data load step completed in 1.07 seconds
1 load package(s) were load

In [6]:
pipeline.dataset(dataset_type="default").rides.df()

Unnamed: 0,vendor_name,record_hash,time__pickup,time__dropoff,coordinates__start__lon,coordinates__start__lat,coordinates__end__lon,coordinates__end__lat,_dlt_load_id,_dlt_id
0,VTS,b00361a396177a9cb410ff61f20015ad,2009-06-14 23:23:00+00:00,2009-06-14 23:48:00+00:00,-73.787442,40.641525,-73.980072,40.742963,1739790165.163802,RN7vrV1MD0K9SQ


In [8]:
pipeline.dataset(dataset_type="default").rides__passengers.df()

Unnamed: 0,name,rating,_dlt_parent_id,_dlt_list_idx,_dlt_id
0,John,4.9,RN7vrV1MD0K9SQ,0,ljWDmXgQap2ovg
1,Jack,3.9,RN7vrV1MD0K9SQ,1,3SxQFg2upfScEw


In [9]:
import duckdb

# 1. Create a connection to an in-memory DuckDB database
conn = duckdb.connect("ny_taxi_manual.db")

# 2. Create the rides Table
# Since our dataset has nested structures, we must manually flatten it before inserting data.
conn.execute("""
CREATE TABLE IF NOT EXISTS rides (
    record_hash TEXT PRIMARY KEY,
    vendor_name TEXT,
    pickup_time TIMESTAMP,
    dropoff_time TIMESTAMP,
    start_lon DOUBLE,
    start_lat DOUBLE,
    end_lon DOUBLE,
    end_lat DOUBLE
);
""")

# 3. Insert Data Manually
# Since JSON data has nested fields, we need to extract and transform them before inserting them into DuckDB.
data = [
    {
        "vendor_name": "VTS",
        "record_hash": "b00361a396177a9cb410ff61f20015ad",
        "time": {
            "pickup": "2009-06-14 23:23:00",
            "dropoff": "2009-06-14 23:48:00"
        },
        "coordinates": {
            "start": {"lon": -73.787442, "lat": 40.641525},
            "end": {"lon": -73.980072, "lat": 40.742963}
        }
    }
]

# Prepare data for insertion
flattened_data = [
    (
        ride["record_hash"],
        ride["vendor_name"],
        ride["time"]["pickup"],
        ride["time"]["dropoff"],
        ride["coordinates"]["start"]["lon"],
        ride["coordinates"]["start"]["lat"],
        ride["coordinates"]["end"]["lon"],
        ride["coordinates"]["end"]["lat"]
    )
    for ride in data
]

# Insert into DuckDB
conn.executemany("""
INSERT INTO rides (record_hash, vendor_name, pickup_time, dropoff_time, start_lon, start_lat, end_lon, end_lat)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", flattened_data)

print("Data successfully loaded into DuckDB!")


# 4. Query Data in DuckDB
# Now that the data is loaded, we can query it using DuckDB’s SQL engine.
df = conn.execute("SELECT * FROM rides").df()

conn.close()
df

Data successfully loaded into DuckDB!


Unnamed: 0,record_hash,vendor_name,pickup_time,dropoff_time,start_lon,start_lat,end_lon,end_lat
0,b00361a396177a9cb410ff61f20015ad,VTS,2009-06-14 23:23:00,2009-06-14 23:48:00,-73.787442,40.641525,-73.980072,40.742963


In [10]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


# Define the API resource for NYC taxi data
@dlt.resource(name="rides")   # <--- The name of the resource (will be used as the table name)
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # <--- yield data to manage memory


# define new dlt pipeline
pipeline = dlt.pipeline(destination="duckdb")


# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi, write_disposition="replace")
print(load_info)


# explore loaded data
pipeline.dataset(dataset_type="default").rides.df()

Pipeline dlt_ipykernel_launcher load step completed in 1.26 seconds
1 load package(s) were loaded to destination duckdb and into dataset dlt_ipykernel_launcher_dataset
The duckdb destination used duckdb:////Users/shivsk/Documents/Projects/Data_engineering_course/dlt/dlt_ipykernel_launcher.duckdb location to store data
Load package 1739790752.0950432 is LOADED and contains no failed jobs


Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,surcharge,vendor_name,_dlt_load_id,_dlt_id,store_and_forward
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00+00:00,2009-06-14 23:23:00+00:00,0.0,VTS,1739790752.0950432,/wSgav7AMmsPig,
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.00,8.50,1.56,2009-06-18 17:43:00+00:00,2009-06-18 17:35:00+00:00,1.0,VTS,1739790752.0950432,sQp/T/WDoL3v2A,
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.00,15.50,3.37,2009-06-10 18:27:00+00:00,2009-06-10 18:08:00+00:00,1.0,VTS,1739790752.0950432,rlxvxWl0Im6lSg,
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.00,5.40,1.11,2009-06-14 23:58:00+00:00,2009-06-14 23:54:00+00:00,0.5,VTS,1739790752.0950432,IsFNBu6+NWDe+g,
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00+00:00,2009-06-13 13:01:00+00:00,0.0,VTS,1739790752.0950432,vZp0f4WygIciaw,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,40.783522,-73.970690,5.7,1,CASH,40.778560,-73.953660,0.0,0.00,5.70,1.16,2009-06-19 11:28:00+00:00,2009-06-19 11:22:00+00:00,0.0,VTS,1739790752.0950432,GoEOfKoGi2F9OQ,
9996,40.777200,-73.964197,4.1,1,CASH,40.779800,-73.974297,0.0,0.00,4.10,0.89,2009-06-17 07:43:00+00:00,2009-06-17 07:41:00+00:00,0.0,VTS,1739790752.0950432,U7vrJd2Tvgpn4g,
9997,40.780172,-73.957617,6.1,1,CASH,40.788388,-73.976758,0.0,0.00,6.10,1.30,2009-06-19 11:46:00+00:00,2009-06-19 11:39:00+00:00,0.0,VTS,1739790752.0950432,cvGjuA7gPj+iFw,
9998,40.777342,-73.957242,5.7,1,CASH,40.773828,-73.956690,0.0,0.00,6.20,0.97,2009-06-17 04:19:00+00:00,2009-06-17 04:13:00+00:00,0.5,VTS,1739790752.0950432,WM/Y+nnWXSlCUw,


### Incremental loading

In [11]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


@dlt.resource(name="rides", write_disposition="append")
def ny_taxi(
    cursor_date=dlt.sources.incremental(
        "Trip_Dropoff_DateTime",   # <--- field to track, our timestamp
        initial_value="2009-06-15",   # <--- start date June 15, 2009
        )
    ):
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

In [12]:
# define new dlt pipeline
pipeline = dlt.pipeline(pipeline_name="ny_taxi_incremental", destination="duckdb", dataset_name="ny_taxi_data")


# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi)
print(pipeline.last_trace)

Run started at 2025-02-17 11:17:27.293926+00:00 and COMPLETED in 31.32 seconds with 4 steps.
Step extract COMPLETED in 29.97 seconds.

Load package 1739791047.384795 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.50 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- rides: 5325 row(s)

Load package 1739791047.384795 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 0.76 seconds.
Pipeline ny_taxi_incremental load step completed in 0.73 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:////Users/shivsk/Documents/Projects/Data_engineering_course/dlt/ny_taxi_incremental.duckdb location to store data
Load package 1739791047.384795 is LOADED and contains no failed jobs

Step run COMPLETED in 31.32 seconds.
Pipeline ny_taxi_incremental load step completed in 0.73 seconds

In [13]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            MIN(trip_dropoff_date_time)
            FROM rides;
            """
        )
    print(res)

[(datetime.datetime(2009, 6, 15, 0, 6, tzinfo=<UTC>),)]


In [14]:
# define new dlt pipeline
pipeline = dlt.pipeline(pipeline_name="ny_taxi_incremental", destination="duckdb", dataset_name="ny_taxi_data")


# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi)
print(pipeline.last_trace)

Run started at 2025-02-17 11:20:40.644486+00:00 and COMPLETED in 30.89 seconds with 4 steps.
Step extract COMPLETED in 30.79 seconds.

Load package 1739791240.724498 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.03 seconds.
No data found to normalize

Step load COMPLETED in 0.01 seconds.
Pipeline ny_taxi_incremental load step completed in ---
0 load package(s) were loaded to destination duckdb and into dataset None
The duckdb destination used duckdb:////Users/shivsk/Documents/Projects/Data_engineering_course/dlt/ny_taxi_incremental.duckdb location to store data

Step run COMPLETED in 30.89 seconds.
Pipeline ny_taxi_incremental load step completed in ---
0 load package(s) were loaded to destination duckdb and into dataset None
The duckdb destination used duckdb:////Users/shivsk/Documents/Projects/Data_engineering_course/dlt/ny_taxi_incremental.duckdb location to store data


In [None]:
pipeline = dlt.pipeline(
    pipeline_name='taxi_data',
    destination='bigquery', # <--- to run pipeline in production
    dataset_name='taxi_rides',
)

zsh:1: no matches found: dlt[bigquery]
Note: you may need to restart the kernel to use updated packages.


In [None]:
import os

# Set the environment variable for BigQuery credentials
os.environ["DESTINATION__BIGQUERY__CREDENTIALS"] = "path/to/your/credentials/file.json"