# Workshop "Data Ingestion with dlt": Homework DE Zoomcamp
## Dataset & API

We’ll use NYC Taxi data via the same custom API from the workshop:

🔹 Base API URL:

`https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api`

🔹 Data format: Paginated JSON (1,000 records per page).
🔹 API Pagination: Stop when an empty page is returned.

### Question 1
Install DLT and check the version

In [2]:
!pip install dlt[duckdb]

Collecting dlt[duckdb]
  Downloading dlt-1.6.1-py3-none-any.whl.metadata (11 kB)
Collecting giturlparse>=0.10.0 (from dlt[duckdb])
  Downloading giturlparse-0.12.0-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting hexbytes>=0.2.2 (from dlt[duckdb])
  Downloading hexbytes-1.3.0-py3-none-any.whl.metadata (3.3 kB)
Collecting jsonpath-ng>=1.5.3 (from dlt[duckdb])
  Downloading jsonpath_ng-1.7.0-py3-none-any.whl.metadata (18 kB)
Collecting makefun>=1.15.0 (from dlt[duckdb])
  Downloading makefun-1.15.6-py2.py3-none-any.whl.metadata (3.2 kB)
Collecting pathvalidate>=2.5.2 (from dlt[duckdb])
  Downloading pathvalidate-3.2.3-py3-none-any.whl.metadata (12 kB)
Collecting pendulum>=2.1.2 (from dlt[duckdb])
  Downloading pendulum-3.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.9 kB)
Collecting rich-argparse<2.0.0,>=1.6.0 (from dlt[duckdb])
  Downloading rich_argparse-1.7.0-py3-none-any.whl.metadata (14 kB)
Collecting semver>=3.0.0 (from dlt[duckdb])
  Downloading semve

In [2]:
!dlt --version

[39mdlt 1.6.1[0m


Solution dlt 1.6.1

### Question 2: Define & Run the Pipeline (NYC Taxi API)
Use dlt to extract all pages of data from the API.

*How many tables were created?*

Steps:

1️⃣ Use the @dlt.resource decorator to define the API source.

2️⃣ Implement automatic pagination using dlt's built-in REST client.

3️⃣ Load the extracted data into DuckDB for querying.

In [3]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


# Connection to the API
# Define the API resource for NYC taxi data
@dlt.resource(name="rides")   # <--- The name of the resource (will be used as the table name)
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):    # <--- API endpoint for retrieving taxi ride data
        yield page   # <--- yield data to manage memory


In [5]:
for page_data in ny_taxi():
  print(page_data)

[1;30;43mSe han truncado las últimas 5000 líneas del flujo de salida.[0m
{'End_Lat': 40.765793, 'End_Lon': -73.978255, 'Fare_Amt': 13.7, 'Passenger_Count': 1, 'Payment_Type': 'CASH', 'Rate_Code': None, 'Start_Lat': 40.721385, 'Start_Lon': -74.000033, 'Tip_Amt': 0.0, 'Tolls_Amt': 0.0, 'Total_Amt': 13.7, 'Trip_Distance': 3.85, 'Trip_Dropoff_DateTime': '2009-06-08 15:59:00', 'Trip_Pickup_DateTime': '2009-06-08 15:37:00', 'mta_tax': None, 'store_and_forward': None, 'surcharge': 0.0, 'vendor_name': 'VTS'}
{'End_Lat': 40.745177, 'End_Lon': -73.994827, 'Fare_Amt': 6.5, 'Passenger_Count': 1, 'Payment_Type': 'CASH', 'Rate_Code': None, 'Start_Lat': 40.748835, 'Start_Lon': -73.977327, 'Tip_Amt': 0.0, 'Tolls_Amt': 0.0, 'Total_Amt': 6.5, 'Trip_Distance': 1.31, 'Trip_Dropoff_DateTime': '2009-06-05 13:35:00', 'Trip_Pickup_DateTime': '2009-06-05 13:27:00', 'mta_tax': None, 'store_and_forward': None, 'surcharge': 0.0, 'vendor_name': 'VTS'}
{'End_Lat': 40.766573, 'End_Lon': -73.978343, 'Fare_Amt': 8.9

In [6]:
# creating the pipeline
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)

In [8]:
# loading the data to the duckdb
load_info = pipeline.run(ny_taxi)
print(load_info)

Pipeline ny_taxi_pipeline load step completed in 2.37 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:////content/ny_taxi_pipeline.duckdb location to store data
Load package 1739603381.4691353 is LOADED and contains no failed jobs


In [9]:
print(pipeline.last_trace)

Run started at 2025-02-15 07:09:41.310960+00:00 and COMPLETED in 25.85 seconds with 4 steps.
Step extract COMPLETED in 20.56 seconds.

Load package 1739603381.4691353 is EXTRACTED and NOT YET LOADED to the destination and contains no failed jobs

Step normalize COMPLETED in 1.95 seconds.
Normalized data for the following tables:
- rides: 10000 row(s)
- _dlt_pipeline_state: 1 row(s)

Load package 1739603381.4691353 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs

Step load COMPLETED in 3.18 seconds.
Pipeline ny_taxi_pipeline load step completed in 2.37 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:////content/ny_taxi_pipeline.duckdb location to store data
Load package 1739603381.4691353 is LOADED and contains no failed jobs

Step run COMPLETED in 25.85 seconds.
Pipeline ny_taxi_pipeline load step completed in 2.37 seconds
1 load package(s) were loaded to destination duckdb and 

In [10]:
# Start a connection to duckdb
import duckdb
from google.colab import data_table
data_table.enable_dataframe_formatter()

# A database '<pipeline_name>.duckdb' was created in working directory so just connect to it

# Connect to the DuckDB database
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset
conn.sql("DESCRIBE").df()

Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,ny_taxi_pipeline,ny_taxi_data,_dlt_loads,"[load_id, schema_name, status, inserted_at, sc...","[VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...",False
1,ny_taxi_pipeline,ny_taxi_data,_dlt_pipeline_state,"[version, engine_version, pipeline_name, state...","[BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...",False
2,ny_taxi_pipeline,ny_taxi_data,_dlt_version,"[version, engine_version, inserted_at, schema_...","[BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...",False
3,ny_taxi_pipeline,ny_taxi_data,rides,"[end_lat, end_lon, fare_amt, passenger_count, ...","[DOUBLE, DOUBLE, DOUBLE, BIGINT, VARCHAR, DOUB...",False


Solution Question 2:

4 Tables
  * _dlt_loads
  * _dlt_pipeline_state
  * _dlt_version
  * rides

### Question 3 Explore the loaded data
What is the total number of records extracted?

  * 2500
  * 5000
  * 7500
  * 10000


In [12]:
# inspect the table ride
df = pipeline.dataset(dataset_type="default").rides.df()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 18 columns):
 #   Column                  Non-Null Count  Dtype              
---  ------                  --------------  -----              
 0   end_lat                 10000 non-null  float64            
 1   end_lon                 10000 non-null  float64            
 2   fare_amt                10000 non-null  float64            
 3   passenger_count         10000 non-null  int64              
 4   payment_type            10000 non-null  object             
 5   start_lat               10000 non-null  float64            
 6   start_lon               10000 non-null  float64            
 7   tip_amt                 10000 non-null  float64            
 8   tolls_amt               10000 non-null  float64            
 9   total_amt               10000 non-null  float64            
 10  trip_distance           10000 non-null  float64            
 11  trip_dropoff_date_time  10000 non-null  da

In [13]:
# number of rows
print(len(df))

10000


Solution 10000 records

### Question 4: Trip Duration Analysis
Calculate the average trip duration in minutes.

What is the average trip duration?

  * 12.3049
  * 22.3049
  * 32.3049
  * 42.3049


In [14]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM rides;
            """
        )
    # Prints column values of the first row
    print(res)

[(12.3049,)]


Solution: 12.3049