In [1]:
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import PageNumberPaginator

@dlt.resource(name="rides", write_disposition="replace")
def ny_taxi():
    client = RESTClient(
        base_url="https://us-central1-dlthub-analytics.cloudfunctions.net",
        paginator=PageNumberPaginator(
            base_page=1,
            total_path=None
        )
    )

    for page in client.paginate("data_engineering_zoomcamp_api"):
        yield page


pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline",
    destination="duckdb",
    dataset_name="ny_taxi_data"
)

load_info = pipeline.run(ny_taxi, write_disposition="replace")
print(load_info)

Pipeline ny_taxi_pipeline load step completed in 1.27 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:////workspaces/data-engineering-zoomcamp/workshop-01-ingestion-with-dlt/ny_taxi_pipeline.duckdb location to store data
Load package 1739467233.995203 is LOADED and contains no failed jobs


In [2]:
print(load_info)

Pipeline ny_taxi_pipeline load step completed in 1.27 seconds
1 load package(s) were loaded to destination duckdb and into dataset ny_taxi_data
The duckdb destination used duckdb:////workspaces/data-engineering-zoomcamp/workshop-01-ingestion-with-dlt/ny_taxi_pipeline.duckdb location to store data
Load package 1739467233.995203 is LOADED and contains no failed jobs


In [None]:
from google.cloud import bigquery
client = bigquery.Client()
dataset_name = "ny_taxi_data"
tables = list(client.list_tables(dataset_name))

In [None]:
print(f"Tables in dataset {dataset_name}:")
for table in tables:
    print(f"- {table.table_id}")

In [None]:
for table in tables:
    table_ref = f"{client.project}.{dataset_name}.{table.table_id}"
    query = f"SELECT COUNT(*) AS row_count FROM `{table_ref}`"
    result = client.query(query).result()
    row_count = list(result)[0]["row_count"]
    print(f"Table {table.table_id} has {row_count} rows")

In [None]:
pipeline.dataset(dataset_type="default").rides.df()

In [None]:
with pipeline.sql_client() as client:
    res = client.execute_sql(
            """
            SELECT
            AVG(date_diff('minute', trip_pickup_date_time, trip_dropoff_date_time))
            FROM rides;
            """
        )
    print(res)