In [5]:
# imports
from pathlib import Path
import urllib.request
import pandas as pd
from google.cloud import bigquery
from prefect import task, flow
from prefect_gcp.cloud_storage import cloud_storage_upload_blob_from_file
from prefect_gcp import GcpCredentials

print('Setup Complete')

Setup Complete


In [4]:
# Parameters
year=2019
month=2
dataset_file = f"fhv_tripdata_{year}-{month:02}"
dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/{dataset_file}.csv.gz"

# Deployment 1
# Fetch the data from url
@task(log_prints=True, name='fetch-file-dataset_url')
def fetch(dataset_url):
    filename, _ = urllib.request.urlretrieve(dataset_url)
    return filename


# Upload dataset from web to gcs
@flow(log_prints=True, name="web-to-gcs")
def web_to_gcs(path: str, year: int, month: int):
    gcp_credentials = GcpCredentials.load("ny-taxi-gcp-creds")  # inferred from env, or set service_account_file
    blob = cloud_storage_upload_blob_from_file(file=path, 
                                                bucket="ny_taxi_bucket_de_2023",
                                                blob=f"2019/fhv_tripdata_{year}-{month:02}.csv.gz",
                                                gcp_credentials=gcp_credentials)
    return blob



In [6]:
@flow()
def etl_web_gcs_bq():
    path = fetch(dataset_url)
    web_to_gcs(path, year=2019, month=2)
    # fetch data from github
    # save data to gcs
    # transfer data to bq

if __name__=="__main__":
 etl_web_gcs_bq()   


In [8]:
# Deployment 2
# load GCP Credentials
@task(log_prints=True, name="get-gcp-creds")
def get_bigquery_creds():
    gcp_creds_block = GcpCredentials.load("prefect-gcs-2023-creds")
    gcp_creds = gcp_creds_block.get_credentials_from_service_account()
    return gcp_creds


# Upload data from GCS to BigQuery
@flow(log_prints=True, name="etl-gcs-to-bq")
def etl_gcs_to_bq(year: int, month: int):

    gcp_creds = get_bigquery_creds()
    client = bigquery.Client(credentials=gcp_creds)
    table_id = "dtc-de-2023.ny_taxi.ny_taxi_tripdata_2019"

    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        schema=[
            bigquery.SchemaField("dispatching_base_num", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("pickup_datetime", "DATETIME", mode="NULLABLE"),
            bigquery.SchemaField("dropOff_datetime", "DATETIME", mode="NULLABLE"),
            bigquery.SchemaField("PUlocationID", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField("DOlocationID", "FLOAT", mode="NULLABLE"),
            bigquery.SchemaField(
                "SR_Flag",
                "FLOAT",
                mode="NULLABLE",
            ),
            bigquery.SchemaField("Affiliated_base_number", "STRING", mode="NULLABLE"),
        ],
    )
    uri = f"gs://ny_taxi_bucket_de_2023/2019/fhv_tripdata_{year}-{month:02}.csv.gz"

    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config
    )  # Make an API request.

    load_job.result()  # Waits for the job to complete.

    destination_table = client.get_table(table_id)
    print(f"Loaded {destination_table.num_rows} rows.")


# Parent flow ETL
@flow(log_prints=True, name="etl-parent-to-bq")
def etl_parent_bq_flow(
    year: int = 2019, months: list[int] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
):
    for month in months:
        etl_gcs_to_bq(year, month)


# run main
if __name__ == "__main__":
    year = 2019
    months = [2]

    etl_parent_bq_flow(year, months)



 `@task(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`

 `@flow(name='my_unique_name', ...)`



 `@task(name='my_unique_name', ...)`


BadRequest: 400 Error while reading data, error message: Could not parse 'pickup_datetime' as TIMESTAMP for field pickup_datetime (position 1) starting at location 0  with message 'Could not parse 'pickup_datetime' as a timestamp. Required format is YYYY-MM-DD HH:MM[:SS[.SSSSSS]] or YYYY/MM/DD HH:MM[:SS[.SSSSSS]]' File: gs://ny_taxi_bucket_de_2023/2019/fhv_tripdata_2019-02.csv.gz