<a href="https://colab.research.google.com/github/yusarc/gcp-data-warehouse-bigquery/blob/main/dezoomcamp_gcp_dlt_rides.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import os
from google.colab import userdata  # Colab'e özel

os.environ["DESTINATION__CREDENTIALS"] = userdata.get("GCP_CREDENTIALS")
os.environ["BUCKET_URL"] = "gs://dezoomcamp_hw3_arcan_2025"

In [9]:
!pip install dlt[bigquery]


Collecting dlt[bigquery]
  Downloading dlt-1.21.0-py3-none-any.whl.metadata (14 kB)
Collecting giturlparse>=0.10.0 (from dlt[bigquery])
  Downloading giturlparse-0.14.0-py2.py3-none-any.whl.metadata (4.9 kB)
Collecting jsonpath-ng>=1.5.3 (from dlt[bigquery])
  Downloading jsonpath_ng-1.7.0-py3-none-any.whl.metadata (18 kB)
Collecting pathvalidate>=2.5.2 (from dlt[bigquery])
  Downloading pathvalidate-3.3.1-py3-none-any.whl.metadata (12 kB)
Collecting pendulum>=2.1.2 (from dlt[bigquery])
  Downloading pendulum-3.2.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.8 kB)
Collecting rich-argparse>=1.6.0 (from dlt[bigquery])
  Downloading rich_argparse-1.7.2-py3-none-any.whl.metadata (14 kB)
Collecting semver>=3.0.0 (from dlt[bigquery])
  Downloading semver-3.0.4-py3-none-any.whl.metadata (6.8 kB)
Downloading giturlparse-0.14.0-py2.py3-none-any.whl (16 kB)
Downloading jsonpath_ng-1.7.0-py3-none-any.whl (30 kB)
Downloading pathvalidate-3.3.1-py3-none-any.whl (24 kB)
Do

In [10]:
!pip install dlt[gs]




In [11]:
!pip install dlt[duckdb]




In [18]:
import dlt
import requests
import pandas as pd
from dlt.destinations import filesystem
from io import BytesIO

In [19]:
@dlt.resource(name="rides", write_disposition="replace")
def download_parquet():
    prefix = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata"

    for month in range(1, 7):
        url = f"{prefix}_2024-0{month}.parquet"
        response = requests.get(url)

        df = pd.read_parquet(BytesIO(response.content))

        # Her ay için bir batch veri
        yield df


In [20]:
pipeline = dlt.pipeline(
    pipeline_name="rides_pipeline",
    destination="duckdb",      # önce local test
    dataset_name="rides_dataset",
)

In [21]:
import os

# DuckDB testinde GCP credentials'a ihtiyacımız yok
if "DESTINATION__CREDENTIALS" in os.environ:
    del os.environ["DESTINATION__CREDENTIALS"]


In [22]:
load_info = pipeline.run(download_parquet)

print(load_info)


Pipeline rides_pipeline load step completed in 27.18 seconds
1 load package(s) were loaded to destination duckdb and into dataset rides_dataset
The duckdb destination used duckdb:////content/rides_pipeline.duckdb location to store data
Load package 1770237320.4104617 is LOADED and contains no failed jobs


In [23]:
import duckdb

conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

res = conn.sql("DESCRIBE").df()
print(res)

row_count = conn.sql("SELECT count(*) AS row_count FROM rides").df()
print(row_count)


         database         schema                 name  \
0  rides_pipeline  rides_dataset           _dlt_loads   
1  rides_pipeline  rides_dataset  _dlt_pipeline_state   
2  rides_pipeline  rides_dataset         _dlt_version   
3  rides_pipeline  rides_dataset                rides   

                                        column_names  \
0  [load_id, schema_name, status, inserted_at, sc...   
1  [version, engine_version, pipeline_name, state...   
2  [version, engine_version, inserted_at, schema_...   
3  [vendor_id, tpep_pickup_datetime, tpep_dropoff...   

                                        column_types  temporary  
0  [VARCHAR, VARCHAR, BIGINT, TIMESTAMP WITH TIME...      False  
1  [BIGINT, BIGINT, VARCHAR, VARCHAR, TIMESTAMP W...      False  
2  [BIGINT, BIGINT, TIMESTAMP WITH TIME ZONE, VAR...      False  
3  [INTEGER, TIMESTAMP WITH TIME ZONE, TIMESTAMP ...      False  
   row_count
0   20332093


In [27]:
import os

# Daha önce set ettiğimiz değişkeni temizleyelim
if "DESTINATION__CREDENTIALS" in os.environ:
    del os.environ["DESTINATION__CREDENTIALS"]


In [28]:
import os

# Service account JSON dosyanın yolu
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/content/gcs.json"


In [29]:
bq_pipeline = dlt.pipeline(
    pipeline_name="rides_pipeline_bq",
    destination="bigquery",
    dataset_name="rides_dataset",
)


In [33]:
import os
import json
from google.colab import userdata

# Secret'tan JSON string'i al
creds_json = userdata.get("GCP_CREDENTIALS")

# JSON string'ini dict'e çevir
creds = json.loads(creds_json)

creds.keys()


dict_keys(['type', 'project_id', 'private_key_id', 'private_key', 'client_email', 'client_id', 'auth_uri', 'token_uri', 'auth_provider_x509_cert_url', 'client_x509_cert_url', 'universe_domain'])

In [34]:
# dlt'nin BigQuery için beklediği env değişkenlerini set ediyoruz
os.environ["DESTINATION__BIGQUERY__CREDENTIALS__PROJECT_ID"] = creds["project_id"]
os.environ["DESTINATION__BIGQUERY__CREDENTIALS__CLIENT_EMAIL"] = creds["client_email"]
os.environ["DESTINATION__BIGQUERY__CREDENTIALS__PRIVATE_KEY"] = creds["private_key"]


In [35]:
bq_pipeline = dlt.pipeline(
    pipeline_name="rides_pipeline_bq",
    destination="bigquery",
    dataset_name="rides_dataset",
)

bq_load_info = bq_pipeline.run(download_parquet)

print(bq_load_info)


Pipeline rides_pipeline_bq load step completed in 36.17 seconds
1 load package(s) were loaded to destination bigquery and into dataset rides_dataset
The bigquery destination used bigquery@gcp-data-warehouse-486419.iam.gserviceaccount.com@gcp-data-warehouse-486419 location to store data
Load package 1770238301.5603597 is LOADED and contains no failed jobs
