# DLT workshop homework

🔹 **Base API URL:**  
```
https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api
```
🔹 **Data format:** Paginated JSON (1,000 records per page).  
🔹 **API Pagination:** Stop when an empty page is returned.

## **Question 1: dlt Version**

We installed dlt with duckdb in the image (see `requirements.txt`)

In [2]:
import dlt

In [3]:
dlt.__version__

'1.6.1'

## **Question 2: Define & Run the Pipeline (NYC Taxi API)**

Use dlt to extract all pages of data from the API.

Steps:

1️⃣ Use the `@dlt.resource` decorator to define the API source.

2️⃣ Implement automatic pagination using dlt's built-in REST client.

3️⃣ Load the extracted data into DuckDB for querying.



How many tables were created?

* 2
* 4
* 6
* 8


In [None]:

import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


# Define the API resource for NYC taxi data
@dlt.resource(name="rides")   # <--- The name of the resource (will be used as the table name)
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # <--- yield data to manage memory


# define new dlt pipeline
pipeline = dlt.pipeline(pipeline_name="ny_taxi", destination="duckdb", dataset_name="taxi_rides")


# run the pipeline with the new resource
load_info = pipeline.run(ny_taxi, table_name="rides", write_disposition="replace")
print(load_info)






Pipeline ny_taxi load step completed in 0.68 seconds
1 load package(s) were loaded to destination duckdb and into dataset taxi_rides
The duckdb destination used duckdb:////workspaces/workshops/ny_taxi.duckdb location to store data
Load package 1739816151.7604206 is LOADED and contains no failed jobs


Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,payment_type,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,trip_dropoff_date_time,trip_pickup_date_time,surcharge,vendor_name,_dlt_load_id,_dlt_id,store_and_forward
0,40.742963,-73.980072,45.0,1,Credit,40.641525,-73.787442,9.0,4.15,58.15,17.52,2009-06-14 23:48:00+00:00,2009-06-14 23:23:00+00:00,0.0,VTS,1739816151.7604206,THuphxWgJnCAuA,
1,40.740187,-74.005698,6.5,1,Credit,40.722065,-74.009767,1.0,0.00,8.50,1.56,2009-06-18 17:43:00+00:00,2009-06-18 17:35:00+00:00,1.0,VTS,1739816151.7604206,BJDk/1v3VgQHZQ,
2,40.718043,-74.004745,12.5,5,Credit,40.761945,-73.983038,2.0,0.00,15.50,3.37,2009-06-10 18:27:00+00:00,2009-06-10 18:08:00+00:00,1.0,VTS,1739816151.7604206,lH74DvaHuvPIoQ,
3,40.739637,-73.985233,4.9,1,CASH,40.749802,-73.992247,0.0,0.00,5.40,1.11,2009-06-14 23:58:00+00:00,2009-06-14 23:54:00+00:00,0.5,VTS,1739816151.7604206,n1CDYwM1kvvggg,
4,40.730032,-73.852693,25.7,1,CASH,40.776825,-73.949233,0.0,4.15,29.85,11.09,2009-06-13 13:23:00+00:00,2009-06-13 13:01:00+00:00,0.0,VTS,1739816151.7604206,v/wIyyS5ztwU8A,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9995,40.783522,-73.970690,5.7,1,CASH,40.778560,-73.953660,0.0,0.00,5.70,1.16,2009-06-19 11:28:00+00:00,2009-06-19 11:22:00+00:00,0.0,VTS,1739816151.7604206,tsSKwAMmJcJ+MA,
9996,40.777200,-73.964197,4.1,1,CASH,40.779800,-73.974297,0.0,0.00,4.10,0.89,2009-06-17 07:43:00+00:00,2009-06-17 07:41:00+00:00,0.0,VTS,1739816151.7604206,hps4x1OH0KjZsw,
9997,40.780172,-73.957617,6.1,1,CASH,40.788388,-73.976758,0.0,0.00,6.10,1.30,2009-06-19 11:46:00+00:00,2009-06-19 11:39:00+00:00,0.0,VTS,1739816151.7604206,PTCPoQRYdJ5KfQ,
9998,40.777342,-73.957242,5.7,1,CASH,40.773828,-73.956690,0.0,0.00,6.20,0.97,2009-06-17 04:19:00+00:00,2009-06-17 04:13:00+00:00,0.5,VTS,1739816151.7604206,OlcHC1n081btpg,


In [11]:
print(pipeline.last_trace)

Run started at 2025-02-17 18:15:51.717332+00:00 and COMPLETED in 25.46 seconds with 4 steps.
Step extract COMPLETED in 24.05 seconds.

Load package 1739816151.7604206 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 0.67 seconds.
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- rides: 10000 row(s)

Load package 1739816151.7604206 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 0.69 seconds.
Pipeline ny_taxi load step completed in 0.68 seconds
1 load package(s) were loaded to destination duckdb and into dataset taxi_rides
The duckdb destination used duckdb:////workspaces/workshops/ny_taxi.duckdb location to store data
Load package 1739816151.7604206 is LOADED and contains no failed jobs

Step run COMPLETED in 25.46 seconds.
Pipeline ny_taxi load step completed in 0.68 seconds
1 load package(s) were loaded to destination duckdb and into dataset tax

In [9]:
# Start a connection to your database using native `duckdb` connection and look what tables were generated:

import duckdb

# A database '<pipeline_name>.duckdb' was created in working directory so just connect to it

# Connect to the DuckDB database
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset
conn.sql("DESCRIBE").df()


Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,ny_taxi,ny_taxi_dataset,_dlt_loads,"[load_id, schema_name, status, inserted_at, sc...","[VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...",False
1,ny_taxi,ny_taxi_dataset,_dlt_pipeline_state,"[version, engine_version, pipeline_name, state...","[BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...",False
2,ny_taxi,ny_taxi_dataset,_dlt_version,"[version, engine_version, inserted_at, schema_...","[BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...",False
3,ny_taxi,ny_taxi_dataset,rides,"[end_lat, end_lon, fare_amt, passenger_count, ...","[DOUBLE, DOUBLE, DOUBLE, BIGINT, VARCHAR, DOUB...",False


## **Question 3: Explore the loaded data**

What is the number of records loaded?



In [20]:
# explore loaded data
rides_df = pipeline.dataset(dataset_type="default").rides.df()
print(f"The number of loaded records is {len(rides_df)}")

The number of loaded records is 10000


## **Question 4: Trip Duration Analysis**



* Calculate the average trip duration in minutes.


What is the average trip duration?

* 12.3049
* 22.3049
* 32.3049
* 42.3049

In [27]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM rides;
            """)


res



[(12.3049,)]