**Using GCP ADC**

In [6]:
import os
# os.environ["DESTINATION__CREDENTIALS"] = userdata.get("GCP_CREDENTIALS")
os.environ["BUCKET_URL"] = "gs://de-zc-pmg-data"

In [2]:
import os
import dlt
from dlt.destinations import filesystem

# 1. Point to the credentials file generated by 'gcloud auth application-default login'
# In Codespaces, this is the standard location:
creds_path = "/home/codespace/.config/gcloud/application_default_credentials.json"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_path

# 2. Verify the file actually exists (sanity check)
if not os.path.exists(creds_path):
    print("‚ö†Ô∏è WARNING: Credentials file not found! Did you run 'gcloud auth application-default login'?")
else:
    print(f"‚úÖ Credentials found at: {creds_path}")

# 3. Set your bucket URL
MY_BUCKET = "gs://de-zc-pmg-data"

‚úÖ Credentials found at: /home/codespace/.config/gcloud/application_default_credentials.json


In [10]:
%%capture
# Install for production
uv install dlt[bigquery, gs]

In [11]:
%%capture
# Install for testing
!pip install dlt[duckdb]

In [2]:
import dlt
import requests
import pandas as pd
from dlt.destinations import filesystem
from io import BytesIO

Ingesting parquet files to GCS.

In [None]:
# Define a dlt source to download and process Parquet files as resources
@dlt.source(name="rides")
def download_parquet():
    prefix = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata"
    for month in range(1, 7):
        file_name = f"yellow_tripdata_2024-0{month}.parquet"
        url = f"{prefix}_2024-0{month}.parquet"
        response = requests.get(url)

        df = pd.read_parquet(BytesIO(response.content))

        # Return the dataframe as a dlt resource for ingestion
        yield dlt.resource(df, name=file_name)


# Initialize the pipeline
pipeline = dlt.pipeline(
    pipeline_name="rides_pipeline",
    destination=filesystem(bucket_url=os.environ["BUCKET_URL"], layout="{schema_name}/{table_name}.{ext}"),
    dataset_name="rides_dataset",
)

# Run the pipeline to load Parquet data into DuckDB
load_info = pipeline.run(download_parquet(), loader_file_format="parquet")

# Print the results
print(load_info)


In [5]:
import dlt
import os
from dlt.destinations import filesystem

# ---------------------------------------------------------
# SETUP: AUTHENTICATION
# ---------------------------------------------------------
# 1. Point to the ADT file (Do not read it, just point to it)
creds_path = os.path.expanduser("~/.config/gcloud/application_default_credentials.json")
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_path

# 2. CRITICAL: Set your Project ID explicitly
# ADT keys don't always contain the project ID, causing gcsfs to fail.
# REPLACE 'your-project-id-here' with your actual GCP Project ID (not the name)
os.environ["GCP_PROJECT_ID"] = "project-c94cb82b-89ba-4659-98a"
os.environ["GOOGLE_CLOUD_PROJECT"] = "project-c94cb82b-89ba-4659-98a" # Set both just to be safe

# 3. Define Bucket
MY_BUCKET = "gs://de-zc-pmg-data"

# ---------------------------------------------------------
# PIPELINE
# ---------------------------------------------------------
pipeline = dlt.pipeline(
    pipeline_name="rides_pipeline",
    destination=filesystem(
        bucket_url=MY_BUCKET,
        # REMOVED: credentials=... 
        # We let dlt/gcsfs find the Env Vars automatically
        layout="{schema_name}/{table_name}.{ext}"
    ),
    dataset_name="rides_ny_taxi",
)

# ---------------------------------------------------------
# RUN
# ---------------------------------------------------------
# Use the file-safe download function from before
# If you need that function again, let me know!
try:
    load_info = pipeline.run(download_parquet, loader_file_format="parquet")
    print(load_info)
except Exception as e:
    print(f"\n‚ùå ERROR: {e}")
    print("\nüîç TROUBLESHOOTING:")
    print("1. Did you replace 'your-project-id-here' with your actual Project ID?")
    print(f"2. Does this file exist? {creds_path}")

Processing yellow_tripdata_2024-01.parquet...
Processing yellow_tripdata_2024-02.parquet...
Processing yellow_tripdata_2024-03.parquet...
Processing yellow_tripdata_2024-04.parquet...
Processing yellow_tripdata_2024-05.parquet...
Processing yellow_tripdata_2024-06.parquet...
Pipeline rides_pipeline load step completed in 1 minute and 3.44 seconds
1 load package(s) were loaded to destination filesystem and into dataset rides_ny_taxi
The filesystem destination used gs://de-zc-pmg-data location to store data
Load package 1770726527.519993 is LOADED and contains no failed jobs


Ingesting data to Database

In [None]:
# Define a dlt resource to download and process Parquet files as single table
@dlt.resource(name="rides", write_disposition="replace")
def download_parquet():
    prefix = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata'

    for month in range(1, 7):
        url = f"{prefix}_2024-0{month}.parquet"
        response = requests.get(url)

        df = pd.read_parquet(BytesIO(response.content))

        yield df


# Initialize the pipeline
pipeline = dlt.pipeline(
    pipeline_name="rides_pipeline",
    # destination="duckdb",  # Use DuckDB for testing
    destination="bigquery",  # Use BigQuery for production
    dataset_name="rides_dataset",
)

# Run the pipeline to load Parquet data into DuckDB
info = pipeline.run(download_parquet)

# Print the results
print(info)




In [None]:
import duckdb

conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Describe the dataset to see loaded tables
res = conn.sql("DESCRIBE").df()
print(res)

In [None]:
# provide a resource name to query a table of that name
with pipeline.sql_client() as client:
    with client.execute_query(f"SELECT count(1) FROM rides") as cursor:
        data = cursor.df()
print(data)