In [21]:
import io
import os
import requests
import pandas as pd
from google.cloud import storage
from google.cloud import bigquery

# 1. Upload data to GCS

In [None]:
"""
Pre-reqs: 
1. `pip install pandas pyarrow google-cloud-storage`
2. Set GOOGLE_APPLICATION_CREDENTIALS to your project/service-account key
3. Set GCP_GCS_BUCKET as your bucket or change default value of BUCKET
"""

In [15]:
# services = ['fhv','green','yellow']
init_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data'
# switch out the bucketname
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="terrademo/keys/my-creds.json"
BUCKET = os.environ.get("GCP_GCS_BUCKET", "mage-zoomcamp-nb")

In [17]:
def upload_to_gcs(bucket, object_name, local_file):
    """
    Ref: https://cloud.google.com/storage/docs/uploading-objects#storage-upload-object-python
    """
    # # WORKAROUND to prevent timeout for files > 6 MB on 800 kbps upload speed.
    # # (Ref: https://github.com/googleapis/python-storage/issues/74)
    # storage.blob._MAX_MULTIPART_SIZE = 5 * 1024 * 1024  # 5 MB
    # storage.blob._DEFAULT_CHUNKSIZE = 5 * 1024 * 1024  # 5 MB

    client = storage.Client()
    bucket = client.bucket(bucket)
    blob = bucket.blob(object_name)
    blob.upload_from_filename(local_file)


def web_to_gcs(year, service):
    
    for i in range(12):
        
        # sets the month part of the file_name string
        month = '0'+str(i+1)
        month = month[-2:]
        
        # csv file_name
        file_name = f"{service}_tripdata_{year}-{month}.parquet"

        # download it using requests via a pandas df
        request_url = f"{init_url}/{file_name}"
        r = requests.get(request_url)
        open(file_name, 'wb').write(r.content)  
        print(f"Local: {file_name}")

        if os.path.exists(f"{file_name}"):
            os.remove(f"{file_name}")

        # upload it to gcs 
        upload_to_gcs(BUCKET, f"{service}/{file_name}", file_name)
        print(f"GCS: {service}/{file_name}")


web_to_gcs('2022', 'green')

Local: green_tripdata_2022-01.parquet
GCS: green/green_tripdata_2022-01.parquet
Local: green_tripdata_2022-02.parquet
GCS: green/green_tripdata_2022-02.parquet
Local: green_tripdata_2022-03.parquet
GCS: green/green_tripdata_2022-03.parquet
Local: green_tripdata_2022-04.parquet
GCS: green/green_tripdata_2022-04.parquet
Local: green_tripdata_2022-05.parquet
GCS: green/green_tripdata_2022-05.parquet
Local: green_tripdata_2022-06.parquet
GCS: green/green_tripdata_2022-06.parquet
Local: green_tripdata_2022-07.parquet
GCS: green/green_tripdata_2022-07.parquet
Local: green_tripdata_2022-08.parquet
GCS: green/green_tripdata_2022-08.parquet
Local: green_tripdata_2022-09.parquet
GCS: green/green_tripdata_2022-09.parquet
Local: green_tripdata_2022-10.parquet
GCS: green/green_tripdata_2022-10.parquet
Local: green_tripdata_2022-11.parquet
GCS: green/green_tripdata_2022-11.parquet
Local: green_tripdata_2022-12.parquet
GCS: green/green_tripdata_2022-12.parquet


In [24]:
uris = []

for i in range(12):
    month = '0'+str(i+1)
    month = month[-2:]
    
    uris.append(f"gs://mage-zoomcamp-nb/green/green_tripdata_2022-{month}.parquet")

# 2. Upload data to BigQuery (create table) 

In [23]:
# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
table_id = "terraform-course-411914.taxi.green_taxi_2022"

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
)

load_job = client.load_table_from_uri(
    uris, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

Loaded 840402 rows.


# 3. Create External table in BigQuery (create table) 

In [25]:
# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
table_id = "terraform-course-411914.taxi.green_taxi_2022_ext"

# TODO(developer): Set the external source format of your table.
# Note that the set of allowed values for external data sources is
# different than the set used for loading data (see :class:`~google.cloud.bigquery.job.SourceFormat`).
external_source_format = "PARQUET"

# Create ExternalConfig object with external source format
external_config = bigquery.ExternalConfig(external_source_format)
# Set source_uris that point to your data in Google Cloud
external_config.source_uris = uris

# TODO(developer) You have the option to set a reference_file_schema_uri, which points to
# a reference file for the table schema
reference_file_schema_uri = "gs://mage-zoomcamp-nb/green/green_tripdata_2022-01.parquet"

external_config.reference_file_schema_uri = reference_file_schema_uri

table = bigquery.Table(table_id)
# Set the external data configuration of the table
table.external_data_configuration = external_config
table = client.create_table(table)  # Make an API request.

print(
    f"Created table with external source format {table.external_data_configuration.source_format}"
)

Created table with external source format PARQUET
