In [8]:
import os
from google.colab import userdata

os.environ["DESTINATION__CREDENTIALS"] = userdata.get('GCP_CREDENTIALS')
os.environ["BUCKET_URL"] = "YOUR_BUCKET_NAME_HERE" # YOUR_BUCKET_NAME_HERE

In [3]:
# Install for production
%%capture
!pip install dlt[bigquery, gs]

In [4]:
# Install for testing
%%capture
!pip install dlt[duckdb]

In [5]:
import dlt
import requests
import pandas as pd
from dlt.destinations import filesystem
from io import BytesIO

1. You can easily implement your own sources, as long as you yield data in a way that is compatible with dlt, such as JSON objects, Python lists and dictionaries, pandas dataframes, and arrow tables.

2. A pipeline is a connection that moves data from your Python code to a destination. The pipeline accepts dlt sources or resources.



In [None]:
# Define a dlt source to download and process Parquet files as resources
@dlt.source(name="rides")
def download_parquet():
     for month in range(1,7):
      file_name = f"yellow_tripdata_2024-0{month}.parquet"

      url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-0{month}.parquet"
      response = requests.get(url)

      df = pd.read_parquet(BytesIO(response.content))

      # Return the dataframe as a dlt resource for ingestion
      yield dlt.resource(df, name=file_name)

# Initialize the pipeline
pipeline = dlt.pipeline(
    pipeline_name="rides_pipeline",
    destination=filesystem(
      layout="{schema_name}/{table_name}.{ext}"
    )
)

# Run the pipeline to load Parquet data into DuckDB
load_info = pipeline.run(
    download_parquet(),
    loader_file_format="parquet"
    )

# Print the results
print(load_info)