In [2]:
# Install for testing
#%%capture only for jupyter/Ipython kernel
#uv add dlt[duckdb]

In [3]:
import dlt
import duckdb
import requests
import pandas as pd
from dlt.destinations import filesystem
from io import BytesIO

Ingesting data to Database

In [4]:
# Define a dlt resource to download and process Parquet files as single table
@dlt.resource(name="rides", write_disposition="replace")
def download_parquet():
    prefix = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata'

    for month in range(1, 7):
        url = f"{prefix}_2024-0{month}.parquet"
        response = requests.get(url)

        df = pd.read_parquet(BytesIO(response.content))

        yield df


# Initialize the pipeline
pipeline = dlt.pipeline(
    pipeline_name="rides_pipeline",
    destination="duckdb",  # Use DuckDB for testing
    # destination="bigquery",  # Use BigQuery for production
    dataset_name="rides_dataset",
)

# Run the pipeline to load Parquet data into DuckDB
info = pipeline.run(download_parquet)

# Print the results
print(info)


Pipeline rides_pipeline load step completed in 5.56 seconds
1 load package(s) were loaded to destination duckdb and into dataset rides_dataset
The duckdb destination used duckdb:////home/rgc/project-zoomcamp/data_engineering_zoomcamp_2026/Homeworks/3_BigQuery/rides_pipeline.duckdb location to store data
Load package 1770214902.32594 is LOADED and contains no failed jobs


In [5]:
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset to see loaded tables
res = conn.sql("DESCRIBE").df()
print(res)

         database         schema                 name  \
0  rides_pipeline  rides_dataset           _dlt_loads   
1  rides_pipeline  rides_dataset  _dlt_pipeline_state   
2  rides_pipeline  rides_dataset         _dlt_version   
3  rides_pipeline  rides_dataset                rides   

                                        column_names  \
0  [load_id, schema_name, status, inserted_at, sc...   
1  [version, engine_version, pipeline_name, state...   
2  [version, engine_version, inserted_at, schema_...   
3  [vendor_id, tpep_pickup_datetime, tpep_dropoff...   

                                        column_types  temporary  
0  [VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...      False  
1  [BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...      False  
2  [BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...      False  
3  [INTEGER, TIMESTAMP WITH TIME ZONE, TIMESTAMP ...      False  


<div style="color: #5bde7e;">

## Export parquet to back_up_data folder

For offline needs

</div>

In [9]:
backup_dir = "./3_data_backup"
conn.execute(f"EXPORT DATABASE '{backup_dir}' (FORMAT PARQUET)")
print("Export parquet terminé.")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Export parquet terminé.


<div style="color: #5bde7e;">

## Total Record Count

</div>



In [7]:
# provide a resource name to query a table of that name
with pipeline.sql_client() as client:
    with client.execute_query(f"SELECT count(1) FROM rides") as cursor:
        data = cursor.df()
print(data)

   count(1)
0  20332093


In [None]:
# provide a resource name to query a table of that name
with pipeline.sql_client() as client:
    with client.execute_query(f"SELECT count(1) FROM rides") as cursor:
        data = cursor.df()
print(data)