In [None]:
import dlt
import requests
import pandas as pd
import pyarrow.parquet as pq
import io
import os
from google.cloud import bigquery

# Path to your JSON key file
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "gcs.json"

# Base URL for the Parquet files
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-"
MONTHS = [f"{i:02d}" for i in range(1, 7)]  # ['01', '02', ..., '06']

# Define a dlt resource for fetching Parquet data
@dlt.resource(name="ny_taxi_dlt", write_disposition="replace")
def paginated_getter():
    """Fetches and yields monthly Parquet data as Pandas DataFrames."""

    for month in MONTHS:
        url = f"{BASE_URL}{month}.parquet"
        
        try:
            # Fetch the Parquet file in streaming mode
            with requests.get(url, stream=True) as response:
                response.raise_for_status()  # Raise an error for failed requests

                # Read file in chunks and store in a buffer
                buffer = io.BytesIO()
                for chunk in response.iter_content(chunk_size=1024 * 1024):  # Read in 1MB chunks
                    buffer.write(chunk)

                buffer.seek(0)  # Reset buffer position

                # Read Parquet file using pyarrow and convert to Pandas DataFrame
                table = pq.read_table(buffer)

                print(f'Got month {month} with {len(table)} records')

                if table.num_rows > 0:  # If data exists, yield it
                    yield table
                else:
                    break  # Stop if no more data

        except Exception as e:
            print(f"Failed to fetch data for month {month}: {e}")

# Create and configure the dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline_dlt",
    destination="bigquery",
    dataset_name="ny_taxi_parquet_dlt_8",
    dev_mode=True
)

# Run the pipeline and load data into BigQuery
load_info = pipeline.run(paginated_getter())

# Print load info and normalization details
print(load_info)
print(pipeline.last_trace.last_normalize_info)

Got month 01 with 2964624 records
Got month 02 with 3007526 records
Got month 03 with 3582628 records
Got month 04 with 3514289 records
Got month 05 with 3723833 records
Got month 06 with 3539193 records




Pipeline ny_taxi_pipeline_dlt load step completed in 1 minute and 24.80 seconds
1 load package(s) were loaded to destination bigquery and into dataset ny_taxi_parquet_dlt_8_20250218053652
The bigquery destination used de-zoomcamp-warehouse@de-zoomcamp-warehouse.iam.gserviceaccount.com@de-zoomcamp-warehouse location to store data
Load package 1739900212.89182 is LOADED and contains no failed jobs
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- ny_taxi_dlt: 20332093 row(s)

Load package 1739900212.89182 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs


In [15]:
import dlt
import requests
import pandas as pd
import io
import os
from google.cloud import bigquery

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "gcs.json"

# Base URL for the Parquet files
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-"
MONTHS = [f"{i:02d}" for i in range(1, 7)]

# Define a dlt resource for fetching Parquet data
@dlt.resource(name="ny_taxi_dlt", write_disposition="replace")
def paginated_getter():
    """Fetches and yields monthly Parquet data as Pandas DataFrames."""

    for month in MONTHS:
        url = f"{BASE_URL}{month}.parquet"
        
        try:
            # Fetch the Parquet file
            response = requests.get(url)
            response.raise_for_status()  

            # Convert response content into a Pandas DataFrame
            page_parquet = pd.read_parquet(io.BytesIO(response.content))

            print(f'Got month {month} with {len(page_parquet)} records')

            if not page_parquet.empty:  
                yield page_parquet
            else:
                break  

        except Exception as e:
            print(f"Failed to fetch data for month {month}: {e}")

# Create and configure the dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline_dlt",
    destination="bigquery",
    dataset_name="ny_taxi_parquet_dlt_4"
)


# Run the pipeline and load data into BigQuery
load_info = pipeline.run(paginated_getter())

# Print load info and normalization details
print(load_info)
print(pipeline.last_trace.last_normalize_info)

Got month 01 with 2964624 records
Got month 02 with 3007526 records
Got month 03 with 3582628 records
Got month 04 with 3514289 records
Got month 05 with 3723833 records
Got month 06 with 3539193 records




Pipeline ny_taxi_pipeline_dlt load step completed in 1 minute and 13.53 seconds
1 load package(s) were loaded to destination bigquery and into dataset ny_taxi_parquet_dlt_4
The bigquery destination used de-zoomcamp-warehouse@de-zoomcamp-warehouse.iam.gserviceaccount.com@de-zoomcamp-warehouse location to store data
Load package 1739888325.855125 is LOADED and contains no failed jobs
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- ny_taxi_dlt: 20332093 row(s)

Load package 1739888325.855125 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs


# with dlt_load_id and dlt_id

In [11]:
import dlt
import requests
import pandas as pd
import io
import os
from google.cloud import bigquery

# Path to your JSON key file
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "gcs.json"

# Base URL for the Parquet files
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-"
MONTHS = [f"{i:02d}" for i in range(1, 7)]  # ['01', '02', ..., '06']

# Define a dlt resource for fetching Parquet data
@dlt.resource(name="ny_taxi_dlt", write_disposition="replace")
def paginated_getter():
    """Fetches and yields monthly Parquet data as Pandas DataFrames."""
    for month in MONTHS:
        url = f"{BASE_URL}{month}.parquet"
        
        try:
            # Fetch the Parquet file
            response = requests.get(url)
            response.raise_for_status()  # Raise an error for failed requests

            # Convert response content into a Pandas DataFrame
            page_parquet = pd.read_parquet(io.BytesIO(response.content))

            print(f'Got month {month} with {len(page_parquet)} records')

            if not page_parquet.empty:  # If data exists, yield it
                yield page_parquet.to_dict(orient="records")  # Convert to list of dictionaries for dlt
            else:
                break  # Stop if no more data

        except Exception as e:
            print(f"Failed to fetch data for month {month}: {e}")

# Create and configure the dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="ny_taxi_pipeline_dlt",
    destination="bigquery",
    dataset_name="ny_taxi_parquet_dlt"
)

# Run the pipeline and load data into BigQuery
load_info = pipeline.run(paginated_getter())

# Print load info and normalization details
print(load_info)
print(pipeline.last_trace.last_normalize_info)


Got month 01 with 2964624 records
Got month 02 with 3007526 records
Got month 03 with 3582628 records
Got month 04 with 3514289 records
Got month 05 with 3723833 records
Got month 06 with 3539193 records




Pipeline ny_taxi_pipeline_dlt load step completed in 2 minutes and 34.67 seconds
1 load package(s) were loaded to destination bigquery and into dataset ny_taxi_parquet_dlt
The bigquery destination used de-zoomcamp-warehouse@de-zoomcamp-warehouse.iam.gserviceaccount.com@de-zoomcamp-warehouse location to store data
Load package 1739875054.172373 is LOADED and contains no failed jobs
Normalized data for the following tables:
- _dlt_pipeline_state: 1 row(s)
- ny_taxi_dlt: 20332093 row(s)

Load package 1739875054.172373 is NORMALIZED and NOT YET LOADED to the destination and contains no failed jobs


# Approach:
First, I tested yielding the data by printing it directly, following this method: [Extracting Data with dlt](https://dev.to/cmcrawford2/extracting-data-with-dlt-9hl).
Then, I replaced the print statements with a dlt pipeline to load the data into BigQuery.

In [5]:
import os
import requests
import dlt
from dlt.sources.filesystem import filesystem, read_parquet
import pandas as pd
import io


In [6]:

# Base URL for the Parquet files
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-"
MONTHS = [f"{i:02d}" for i in range(1, 7)]  # Generates ['01', '02', ..., '06']

def paginated_getter():
    """Fetches and yields monthly Parquet data as Pandas DataFrames."""

    for month in MONTHS:
        url = f"{BASE_URL}{month}.parquet"
        
        try:
            # Fetch the Parquet file
            response = requests.get(url)
            response.raise_for_status()  # Raise an error for failed requests

            # Convert response content into a Pandas DataFrame
            page_parquet = pd.read_parquet(io.BytesIO(response.content))

            print(f'Got month {month} with {len(page_parquet)} records')

            if not page_parquet.empty:  # If data exists, yield it
                yield page_parquet
            else:
                break  # Stop if no more data

        except Exception as e:
            print(f"Failed to fetch data for month {month}: {e}")

if __name__ == '__main__':
    for page_data in paginated_getter():
        print(page_data.head())  # Process data (example: print first 5 rows)


Got month 01 with 2964624 records
   VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         2  2024-01-01 00:57:55   2024-01-01 01:17:43              1.0   
1         1  2024-01-01 00:03:00   2024-01-01 00:09:36              1.0   
2         1  2024-01-01 00:17:06   2024-01-01 00:35:01              1.0   
3         1  2024-01-01 00:36:38   2024-01-01 00:44:56              1.0   
4         1  2024-01-01 00:46:51   2024-01-01 00:52:57              1.0   

   trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0           1.72         1.0                  N           186            79   
1           1.80         1.0                  N           140           236   
2           4.70         1.0                  N           236            79   
3           1.40         1.0                  N            79           211   
4           0.80         1.0                  N           211           148   

   payment_type  fare_amount  extra  mta

KeyboardInterrupt: 