In [1]:
# !uv pip install google-cloud-bigquery-storage

In [2]:
# !uv pip install dlt[bigquery] requests

In [1]:
import os
import dlt
import requests
from google.cloud import bigquery_storage
from google.cloud import bigquery
import pandas as pd
from io import BytesIO
from tqdm.notebook import tqdm
from dlt.destinations import filesystem

In [2]:
PROJECT_ROOT = os.getcwd()
TERRAFORM_DIR = os.path.join(PROJECT_ROOT, "terraform")
KEYS_PATH = os.path.join(TERRAFORM_DIR, "keys.json")

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = KEYS_PATH
os.environ["DESTINATION__BIGQUERY__LOCATION"] = "US"
os.environ["BUCKET_URL"] = "gs://de-zoomcamp-terraform-484919-data-lake"

print("Using credentials:", os.environ["GOOGLE_APPLICATION_CREDENTIALS"])

Using credentials: /workspaces/de-zoomcamp/03-data-warehouse/pipeline/terraform/keys.json


In [3]:
client = bigquery.Client()
print("Authenticated project:", client.project)

Authenticated project: de-zoomcamp-terraform-484919


In [None]:
# -----------------------------
# Create the pipeline
# -----------------------------
pipeline_bq = dlt.pipeline(
    pipeline_name="nyc_taxi_pipeline",
    destination="bigquery",
    dataset_name="nyc_taxi_dataset",
)

pipeline_gcs = dlt.pipeline(
    pipeline_name="nyc_taxi_pipeline",
    destination=filesystem(layout="{schema_name}/{table_name}.{ext}"),
    dataset_name="nyc_taxi_dataset",
)

# -----------------------------
# List of months to fetch
# -----------------------------
months = range(1, 7)

# -----------------------------
# Process one month at a time
# -----------------------------
for month in tqdm(months, desc="Processing months", unit="month"):
    url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-{month:02d}.parquet"
    print(f"Fetching month {month}: {url}")

    # Download Parquet file
    response = requests.get(url)
    response.raise_for_status()
    df = pd.read_parquet(BytesIO(response.content))

    # # Create a temporary dlt source for this single month
    @dlt.source(name=f"yellow_tripdata_2024_dataset")
    def process_bigquery():
        yield dlt.resource(df, name="yellow_tripdata_2024")  # same table name for all months

    @dlt.source(name=f"yellow_tripdata_2024")
    def process_gcs_bucket():
        yield dlt.resource(df, name=f"yellow_tripdata_2024_{month:02d}")

    # Run pipeline
    load_info = pipeline_bq.run(process_bigquery(), loader_file_format="parquet")
    print(f"Month {month} loaded. Info: {load_info}")
    
    load_info = pipeline_gcs.run(process_gcs_bucket(), loader_file_format="parquet")
    print(f"Month {month} loaded. Info: {load_info}")

In [None]:
query = f"""
    SELECT COUNT(*) AS row_count FROM `{client.project}.nyc_taxi_dataset.yellow_tripdata_2024`
"""

# Execute the query
result = client.query(query).result()
total_rows = [row.row_count for row in result][0]

print(f"Total number of rows across all 6 months: {total_rows}")