# **Workshop "Data Ingestion with dlt": Homework**


In [None]:
!python --version

Python 3.11.11


---

## **Dataset & API**

We’ll use **NYC Taxi data** via the same custom API from the workshop:

🔹 **Base API URL:**  
```
https://us-central1-dlthub-analytics.cloudfunctions.net/data_engineering_zoomcamp_api
```
🔹 **Data format:** Paginated JSON (1,000 records per page).  
🔹 **API Pagination:** Stop when an empty page is returned.  

## **Question 1: dlt Version**

1. **Install dlt**:

In [2]:
 !pip install dlt[duckdb]

Collecting dlt[duckdb]
  Downloading dlt-1.6.1-py3-none-any.whl.metadata (11 kB)
Collecting giturlparse>=0.10.0 (from dlt[duckdb])
  Downloading giturlparse-0.12.0-py2.py3-none-any.whl.metadata (4.5 kB)
Collecting hexbytes>=0.2.2 (from dlt[duckdb])
  Downloading hexbytes-1.3.0-py3-none-any.whl.metadata (3.3 kB)
Collecting jsonpath-ng>=1.5.3 (from dlt[duckdb])
  Downloading jsonpath_ng-1.7.0-py3-none-any.whl.metadata (18 kB)
Collecting makefun>=1.15.0 (from dlt[duckdb])
  Downloading makefun-1.15.6-py2.py3-none-any.whl.metadata (3.2 kB)
Collecting pathvalidate>=2.5.2 (from dlt[duckdb])
  Downloading pathvalidate-3.2.3-py3-none-any.whl.metadata (12 kB)
Collecting pendulum>=2.1.2 (from dlt[duckdb])
  Downloading pendulum-3.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.9 kB)
Collecting rich-argparse<2.0.0,>=1.6.0 (from dlt[duckdb])
  Downloading rich_argparse-1.7.0-py3-none-any.whl.metadata (14 kB)
Collecting semver>=3.0.0 (from dlt[duckdb])
  Downloading semve

> Or choose a different bracket—`bigquery`, `redshift`, etc.—if you prefer another primary destination. For this assignment, we’ll still do a quick test with DuckDB.

2. **Check** the version:


In [None]:
!dlt --version

or:

In [3]:
import dlt
print("dlt version:", dlt.__version__)

dlt version: 1.6.1


**Answer**:  
- Provide the **version** you see in the output.

## **Question 2: Define & Run the Pipeline (NYC Taxi API)**

Use dlt to extract all pages of data from the API.

Steps:

1️⃣ Use the `@dlt.resource` decorator to define the API source.

2️⃣ Implement automatic pagination using dlt's built-in REST client.

3️⃣ Load the extracted data into DuckDB for querying.



In [6]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator


# your code is here
# name of table
@dlt.resource(name='rides')
def ny_taxi():
    client = RESTClient(
        base_url='https://us-central1-dlthub-analytics.cloudfunctions.net',
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None,
        )
    )
    # api endpoint
    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page

# defines destination and dataset name;
# will take above func as arg (which yields API data)
pipeline_name = "ny_taxi_pipeline"
dataset_name = "ny_taxi_data"
pipeline = dlt.pipeline(
    pipeline_name=pipeline_name,
    destination="duckdb",
    dataset_name=dataset_name
)

Load the data into DuckDB to test:






In [5]:
load_info = pipeline.run(ny_taxi)
print(load_info)

Pipeline ny_taxi_pipeline load step completed in 1.81 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:////content/ny_taxi_pipeline.duckdb location to store data
Load package 1739591823.5433455 is LOADED and contains no failed jobs


Start a connection to your database using native `duckdb` connection and look what tables were generated:

In [7]:
import duckdb
from google.colab import data_table
data_table.enable_dataframe_formatter()

# A database '<pipeline_name>.duckdb' was created in working directory so just connect to it

# Connect to the DuckDB database
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset
conn.sql("DESCRIBE").df()

Unnamed: 0,database,schema,name,column_names,column_types,temporary
0,ny_taxi_pipeline,ny_taxi_data,_dlt_loads,"[load_id, schema_name, status, inserted_at, sc...","[VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...",False
1,ny_taxi_pipeline,ny_taxi_data,_dlt_pipeline_state,"[version, engine_version, pipeline_name, state...","[BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...",False
2,ny_taxi_pipeline,ny_taxi_data,_dlt_version,"[version, engine_version, inserted_at, schema_...","[BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...",False
3,ny_taxi_pipeline,ny_taxi_data,rides,"[end_lat, end_lon, fare_amt, passenger_count, ...","[DOUBLE, DOUBLE, DOUBLE, BIGINT, VARCHAR, DOUB...",False


**Answer:**
* How many tables were created?

1. _dlt_loads
1. _dlt_pipeline_state
1. _dlt_version
1. rides

## **Question 3: Explore the loaded data**

Inspect the table `ride`:


In [8]:
df = pipeline.dataset(dataset_type="default").rides.df()
df.describe()

Unnamed: 0,end_lat,end_lon,fare_amt,passenger_count,start_lat,start_lon,tip_amt,tolls_amt,total_amt,trip_distance,surcharge,store_and_forward
count,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,10000.0,135.0
mean,40.381455,-73.30134,10.066001,2.0853,40.356752,-73.25825,0.606341,0.159485,11.165727,2.73583,0.3339,0.044444
std,3.870109,7.024986,8.245156,2.580095,3.994385,7.250808,1.451652,0.857254,9.448327,3.145651,0.37192,0.206848
min,0.0,-74.330058,2.5,1.0,0.0,-75.233332,0.0,0.0,2.5,0.0,0.0,0.0
25%,40.736812,-73.991246,5.7,1.0,40.737349,-73.991837,0.0,0.0,6.2,1.02,0.0,0.0
50%,40.754705,-73.979959,7.7,1.0,40.754095,-73.981639,0.0,0.0,8.5,1.73,0.5,0.0
75%,40.76911,-73.964989,11.3,3.0,40.768396,-73.967952,0.8,0.0,12.5,3.04,0.5,0.0
max,41.310787,0.005538,194.0,208.0,41.156413,0.001023,38.8,16.0,232.8,40.21,1.0,1.0


**Answer:**
* What is the total number of records extracted?

- 10,000

## **Question 4: Trip Duration Analysis**

Run the SQL query below to:

* Calculate the average trip duration in minutes.

In [9]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM rides;
            """
        )
    # Prints column values of the first row
    print(res)

[(12.3049,)]


**Answer:**
* What is the average trip duration?

- 12.3 minutes

## **Submitting the solutions**

* Form for submitting: TBA




## **Solution**

We will publish the solution here after deadline.