**Please set up your credentials JSON as GCP_CREDENTIALS secrets**

In [1]:
from dotenv import load_dotenv
load_dotenv()

import os

# Load GCP credentials path and bucket URL from environment variables
credentials_path = os.getenv("GCP_CREDENTIALS_PATH")
bucket_url = os.getenv("BUCKET_URL")

try:
    with open(credentials_path, "r") as f:
        credentials_json = f.read()
    os.environ["DESTINATION__CREDENTIALS"] = credentials_json
    os.environ["BUCKET_URL"] = bucket_url
except FileNotFoundError:
    print(f"Credentials file not found: {credentials_path}. Skipping credential setup.")

In [2]:
import dlt
import requests
import pandas as pd
from dlt.destinations import filesystem
from io import BytesIO

In [3]:
from google.cloud import storage
import json

# After loading creds_dict, extract project_id from creds_dict
with open(os.getenv("GCP_CREDENTIALS_PATH"), "r") as f:
    creds_dict = json.load(f)

client = storage.Client.from_service_account_info(creds_dict)

# Extract bucket name from BUCKET_URL
bucket_name = bucket_url.replace("gs://", "").split("/")[0]

# Create bucket if it does not exist
if not client.lookup_bucket(bucket_name):
    try:
        bucket = client.create_bucket(bucket_name)
        print(f"Bucket '{bucket_name}' created.")
    except Exception as e:
        print(f"Bucket '{bucket_name}' does not exist and cannot be created due to billing issues. Skipping bucket creation.")
else:
    print(f"Bucket '{bucket_name}' already exists.")

Bucket 'de-zoomcamp-taxi-data-2024' already exists.


Ingesting parquet files to GCS.

In [6]:
# Define a dlt source to download and process Parquet files as resources
@dlt.source(name="rides")
def download_parquet():
    prefix = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata"
    for month in range(1, 7):
        print(f"Downloading data for month: {month}")
        file_name = f"yellow_tripdata_2024-0{month}.parquet"
        url = f"{prefix}_2024-0{month}.parquet"
        response = requests.get(url)

        df = pd.read_parquet(BytesIO(response.content))

        # Return the dataframe as a dlt resource for ingestion
        yield dlt.resource(df, name=file_name)


# Initialize the pipeline
gcs_pipeline = dlt.pipeline(
    pipeline_name="rides_pipeline",
    destination=filesystem(layout="{schema_name}/{table_name}.{ext}"),
    dataset_name="rides_dataset",
)

# Run the pipeline to load Parquet data into DuckDB
load_info = gcs_pipeline.run(download_parquet(), loader_file_format="parquet")

# Print the results
print(load_info)


Downloading data for month: 1
Downloading data for month: 2
Downloading data for month: 3
Downloading data for month: 4
Downloading data for month: 5
Downloading data for month: 6
Pipeline rides_pipeline load step completed in 35.48 seconds
1 load package(s) were loaded to destination filesystem and into dataset rides_dataset
The filesystem destination used gs://de-zoomcamp-taxi-data-2024 location to store data
Load package 1770677102.364394 is LOADED and contains no failed jobs


### Create an External Table in BigQuery referencing the Parquet files in GCS

In [13]:
# Create an external table in BigQuery referencing the Parquet files in GCS
from google.cloud import bigquery
import os

# Use the already loaded project_id variable
project_id = creds_dict.get("project_id")
dataset_id = "rides_dataset"
external_table_id = "yellow_tripdata_external"

# Parquet files location in GCS (adjust path if needed)
parquet_uri = f"{bucket_url}/rides_dataset/*.parquet"

bq_client = bigquery.Client.from_service_account_info(creds_dict, project=project_id)

# Create dataset if not exists
dataset_ref = bigquery.Dataset(f"{project_id}.{dataset_id}")
try:
    bq_client.get_dataset(dataset_ref)
    print(f"Dataset {dataset_id} exists or you have access.")
except Exception as e:
    print(f"Warning: Could not access dataset {dataset_id}. This may be due to insufficient permissions, but if you created it manually and have access in the UI, you can ignore this warning. Error: {e}")

# Proceed to create the external table regardless of the above error
external_config = bigquery.ExternalConfig("PARQUET")
external_config.source_uris = [parquet_uri]

table_ref = f"{project_id}.{dataset_id}.{external_table_id}"
external_table = bigquery.Table(table_ref)
external_table.external_data_configuration = external_config

try:
    external_table = bq_client.create_table(external_table, exists_ok=True)
    print(f"External table created: {external_table.full_table_id}")
except Exception as e:
    print(f"Error: Could not create external table. You do not have bigquery.tables.create permission on dataset {dataset_id}. Details: {e}")

Dataset rides_dataset exists or you have access.
External table created: tokyo-epoch-486603-u6:rides_dataset.yellow_tripdata_external


### Create a regular table in BigQuery from the external table

In [15]:
# Create a regular (materialized) table in BigQuery from the external table
materialized_table_id = "yellow_tripdata"
materialized_table_ref = f"{project_id}.{dataset_id}.{materialized_table_id}"

query = f'''
CREATE OR REPLACE TABLE `{materialized_table_ref}` AS
SELECT * FROM `{project_id}.{dataset_id}.{external_table_id}`
'''

job = bq_client.query(query)
job.result()  # Wait for job to finish
print(f"Materialized table created: {materialized_table_ref}")

Materialized table created: tokyo-epoch-486603-u6.rides_dataset.yellow_tripdata


In [28]:
# Show column names of the materialized BigQuery table
table = bq_client.get_table(materialized_table_ref)
print("Column names:", [field.name for field in table.schema])

Column names: ['vendor_id', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'ratecode_id', 'store_and_fwd_flag', 'pu_location_id', 'do_location_id', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee']


## Question 2. Data read estimation
    Write a query to count the distinct number of PULocationIDs for the entire dataset on both the tables.
    What is the estimated amount of data that will be read when this query is executed on the External Table and the Table?

In [17]:
# Count distinct pu_location_ids and estimate bytes processed for both tables
from google.cloud.bigquery import QueryJobConfig, QueryJob

def estimate_query_bytes(query):
    job_config = QueryJobConfig(dry_run=True, use_query_cache=False)
    query_job = bq_client.query(query, job_config=job_config)
    return query_job.total_bytes_processed

external_table_query = f'''
SELECT COUNT(DISTINCT pu_location_id) AS distinct_pu FROM `{project_id}.{dataset_id}.{external_table_id}`
'''
materialized_table_query = f'''
SELECT COUNT(DISTINCT pu_location_id) AS distinct_pu FROM `{project_id}.{dataset_id}.{materialized_table_id}`
'''

# Estimate data read for external table
external_bytes = estimate_query_bytes(external_table_query)
print(f"Estimated bytes read from external table: {external_bytes:,}")

# Estimate data read for materialized table
materialized_bytes = estimate_query_bytes(materialized_table_query)
print(f"Estimated bytes read from materialized table: {materialized_bytes:,}")

# Optionally, run the queries to get the actual counts
external_count = bq_client.query(external_table_query).result().to_dataframe().iloc[0,0]
materialized_count = bq_client.query(materialized_table_query).result().to_dataframe().iloc[0,0]
print(f"Distinct pu_location_ids (external table): {external_count}")
print(f"Distinct pu_location_ids (materialized table): {materialized_count}")

Estimated bytes read from external table: 0
Estimated bytes read from materialized table: 162,656,744




Distinct pu_location_ids (external table): 262
Distinct pu_location_ids (materialized table): 262


Ingesting data to Local Database

In [18]:
# Define a dlt resource to download and process Parquet files as single table
@dlt.resource(name="rides", write_disposition="replace")
def download_parquet():
    prefix = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata'
    for month in range(1, 7):
        print(f"Downloading data for month: {month}")
        url = f"{prefix}_2024-0{month}.parquet"
        response = requests.get(url)
        df = pd.read_parquet(BytesIO(response.content))
        yield df

# Use .env values for DuckDB path and dataset
import os
os.environ.pop('DUCKDB_PATH', None)  # Remove existing DUCKDB_PATH if set
duckdb_path = os.getenv('DUCKDB_PATH', 'data/rides_pipeline.db')
duckdb_dataset = os.getenv('DUCKDB_DATASET', 'rides_dataset')

# Unset GCP-related env vars to avoid contamination
# os.environ.pop('GCP_CREDENTIALS_PATH', None)
# os.environ.pop('BUCKET_URL', None)
# os.environ.pop('DESTINATION__CREDENTIALS', None)

# Set DuckDB path for dlt
os.environ['DUCKDB_DATABASE'] = duckdb_path

# Initialize the pipeline
duck_pipeline = dlt.pipeline(
    pipeline_name="rides_pipeline",
    destination=dlt.destinations.duckdb(duckdb_path),  # must be the string 'duckdb', not a path
    dataset_name=duckdb_dataset,
    # dlt will use duckdb_path automatically if set in env, but you can also set via env or config
)

# Run the pipeline to load Parquet data into DuckDB
info = duck_pipeline.run(download_parquet)

# Print the results
print(info)




Downloading data for month: 1
Downloading data for month: 2
Downloading data for month: 3
Downloading data for month: 4
Downloading data for month: 5
Downloading data for month: 6
Pipeline rides_pipeline load step completed in 3.47 seconds
1 load package(s) were loaded to destination duckdb and into dataset rides_dataset
The duckdb destination used duckdb:////app/notebooks/data/rides_pipeline.db location to store data
Load package 1770678707.380898 is LOADED and contains no failed jobs


In [19]:
import duckdb

conn = duckdb.connect(f"{duck_pipeline.pipeline_name}.duckdb")

# Set search path to the dataset
conn.sql(f"SET search_path = '{duck_pipeline.dataset_name}'")

# Describe the dataset to see loaded tables
res = conn.sql("DESCRIBE").df()
print(res)

         database         schema                 name  \
0  rides_pipeline  rides_dataset           _dlt_loads   
1  rides_pipeline  rides_dataset  _dlt_pipeline_state   
2  rides_pipeline  rides_dataset         _dlt_version   
3  rides_pipeline  rides_dataset                rides   

                                        column_names  \
0  [load_id, schema_name, status, inserted_at, sc...   
1  [version, engine_version, pipeline_name, state...   
2  [version, engine_version, inserted_at, schema_...   
3  [vendor_id, tpep_pickup_datetime, tpep_dropoff...   

                                        column_types  temporary  
0  [VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...      False  
1  [BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...      False  
2  [BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...      False  
3  [INTEGER, TIMESTAMP WITH TIME ZONE, TIMESTAMP ...      False  


## Question 1. Counting records

    What is count of records for the 2024 Yellow Taxi Data?

In [20]:
# provide a resource name to query a table of that name
with duck_pipeline.sql_client() as client:
    with client.execute_query(f"SELECT count(1) FROM rides") as cursor:
        data = cursor.df()
print(data)

   count(1)
0  20332093


## Question 2. Data read estimation

    Write a query to count the distinct number of PULocationIDs for the entire dataset on both the tables.
    What is the estimated amount of data that will be read when this query is executed on the External Table and the Table?

In [21]:
# Count distinct pu_location_id in DuckDB table
with duck_pipeline.sql_client() as client:
    with client.execute_query("SELECT COUNT(DISTINCT pu_location_id) AS distinct_pu FROM rides") as cursor:
        duckdb_distinct = cursor.df()
print("DuckDB Table distinct pu_location_id:", duckdb_distinct)



DuckDB Table distinct pu_location_id:    distinct_pu
0          262


## Question 3. Understanding columnar storage

    Write a query to retrieve the PULocationID from the table (not the external table) in BigQuery. Now write a query to retrieve the PULocationID and DOLocationID on the same table.

    Why are the estimated number of Bytes different?

        BigQuery is a columnar database, and it only scans the specific columns requested in the query. Querying two columns (PULocationID, DOLocationID) requires reading more data than querying one column (PULocationID), leading to a higher estimated number of bytes processed.
        BigQuery duplicates data across multiple storage partitions, so selecting two columns instead of one requires scanning the table twice, doubling the estimated bytes processed.
        BigQuery automatically caches the first queried column, so adding a second column increases processing time but does not affect the estimated bytes scanned.
        When selecting multiple columns, BigQuery performs an implicit join operation between them, increasing the estimated bytes processed


In [24]:
# Estimate bytes processed for selecting one vs two columns from materialized table
def estimate_query_bytes(query):
    job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    query_job = bq_client.query(query, job_config=job_config)
    return query_job.total_bytes_processed

single_col_query = f"SELECT pu_location_id FROM `{project_id}.{dataset_id}.{materialized_table_id}`"
two_col_query = f"SELECT pu_location_id, do_location_id FROM `{project_id}.{dataset_id}.{materialized_table_id}`"

single_col_bytes = estimate_query_bytes(single_col_query)
two_col_bytes = estimate_query_bytes(two_col_query)

print(f"Estimated bytes processed for one column (pu_location_id): {single_col_bytes:,}")
print(f"Estimated bytes processed for two columns (pu_location_id, do_location_id): {two_col_bytes:,}")

Estimated bytes processed for one column (pu_location_id): 162,656,744
Estimated bytes processed for two columns (pu_location_id, do_location_id): 325,313,488


## Question 4. Counting zero fare trips

How many records have a fare_amount of 0?

In [25]:
# Count records with fare_amount = 0 in BigQuery materialized table
zero_fare_query = f"SELECT COUNT(*) AS zero_fare_count FROM `{project_id}.{dataset_id}.{materialized_table_id}` WHERE fare_amount = 0"
zero_fare_count = bq_client.query(zero_fare_query).result().to_dataframe().iloc[0,0]
print(f"Number of records with fare_amount = 0: {zero_fare_count}")

Number of records with fare_amount = 0: 8333




## Question 5. Partitioning and clustering

What is the best strategy to make an optimized table in Big Query if your query will always filter based on tpep_dropoff_datetime and order the results by VendorID (Create a new table with this strategy)

In [29]:
# Create an optimized table partitioned by tpep_dropoff_datetime and clustered by VendorID
optimized_table_id = "yellow_tripdata_partitioned_clustered"
optimized_table_ref = f"{project_id}.{dataset_id}.{optimized_table_id}"

create_optimized_query = f'''
CREATE OR REPLACE TABLE `{optimized_table_ref}`
PARTITION BY DATE(tpep_dropoff_datetime)
CLUSTER BY vendor_id AS
SELECT * FROM `{project_id}.{dataset_id}.{materialized_table_id}`
'''

job = bq_client.query(create_optimized_query)
job.result()  # Wait for job to finish
print(f"Optimized table created: {optimized_table_ref}")

Optimized table created: tokyo-epoch-486603-u6.rides_dataset.yellow_tripdata_partitioned_clustered


## Question 6. Partition benefits

Write a query to retrieve the distinct VendorIDs between tpep_dropoff_datetime 2024-03-01 and 2024-03-15 (inclusive)

Use the materialized table you created earlier in your from clause and note the estimated bytes. Now change the table in the from clause to the partitioned table you created for question 5 and note the estimated bytes processed. What are these values?


In [30]:
# Question 6: Partition benefits - Estimate bytes for materialized and partitioned tables
start_date = '2024-03-01'
end_date = '2024-03-15'

materialized_query = f'''
SELECT DISTINCT vendor_id FROM `{project_id}.{dataset_id}.{materialized_table_id}`
WHERE tpep_dropoff_datetime BETWEEN '{start_date}' AND '{end_date}'
'''

partitioned_table_id = "yellow_tripdata_partitioned_clustered"
partitioned_query = f'''
SELECT DISTINCT vendor_id FROM `{project_id}.{dataset_id}.{partitioned_table_id}`
WHERE tpep_dropoff_datetime BETWEEN '{start_date}' AND '{end_date}'
'''

# Estimate bytes processed for both queries
materialized_bytes = estimate_query_bytes(materialized_query)
partitioned_bytes = estimate_query_bytes(partitioned_query)

print(f"Estimated bytes processed (materialized table): {materialized_bytes:,}")
print(f"Estimated bytes processed (partitioned table): {partitioned_bytes:,}")

# Optionally, run both queries and print distinct vendor_ids
materialized_vendor_ids = bq_client.query(materialized_query).result().to_dataframe()['vendor_id'].tolist()
partitioned_vendor_ids = bq_client.query(partitioned_query).result().to_dataframe()['vendor_id'].tolist()
print(f"Distinct VendorIDs (materialized table): {materialized_vendor_ids}")
print(f"Distinct VendorIDs (partitioned table): {partitioned_vendor_ids}")

Estimated bytes processed (materialized table): 325,313,488
Estimated bytes processed (partitioned table): 28,141,776




Distinct VendorIDs (materialized table): [6, 1, 2]
Distinct VendorIDs (partitioned table): [1, 6, 2]




## Question 7. External table storage

Where is the data stored in the External Table you created?
    GCP Bucket

## Question 8. Clustering best practices

It is best practice in Big Query to always cluster your data:

    FALSE.
    Clustering in BigQuery is not always best practice. Itâ€™s useful when queries frequently filter or group by clustered columns, reducing data scanned and improving performance. Otherwise, clustering adds overhead and may not help. Use clustering based on query patterns.

## Question 9. Understanding table scans

No Points: Write a SELECT count(*) query FROM the materialized table you created. How many bytes does it estimate will be read? Why?


In [39]:
# Sanity check: Ensure materialized table is not empty
row_count_query = f"SELECT COUNT(*) AS row_count FROM `{project_id}.{dataset_id}.{materialized_table_id}`"
row_count = bq_client.query(row_count_query).result().to_dataframe().iloc[0,0]
print(f"Materialized table row count: {row_count}")
if row_count == 0:
    print("Warning: Materialized table is empty. Estimated bytes processed will be 0.")
# Estimate bytes processed for SELECT count(*) FROM materialized table
count_query = f"SELECT count(*) FROM `{project_id}.{dataset_id}.{materialized_table_id}`"
count_bytes = estimate_query_bytes(count_query)
print(f"Estimated bytes processed for SELECT count(*): {count_bytes:,}")

Materialized table row count: 20332093
Estimated bytes processed for SELECT count(*): 0


The estimated bytes processed for `SELECT count(*)` is often equal to the total size of all columns in the table, because BigQuery must scan every row to count them. However, for some tables, BigQuery can use metadata optimizations to return the row count without scanning any data, resulting in zero bytes processed. In a columnar database, only the requested columns are scanned unless metadata is available for optimizations.