In [52]:
# imports
import os
from datetime import datetime
from pathlib import Path
import urllib.request
import pandas as pd
from prefect import task, flow
from prefect_gcp import GcpCredentials
print("Setup Complete")

Setup Complete


In [10]:
# Get data from Github url
def get_data_from_web(dataset_url: str):
    filename, _ = urllib.request.urlretrieve(dataset_url)
    return filename

url_yellow = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2020-04.csv.gz"
url_green = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/green/green_tripdata_2019-02.csv.gz"
source_file_green = get_data_from_web(url_green)
source_file_yellow = get_data_from_web(url_yellow)

In [49]:
# Read and tweak to fix the dtypes of pick-up and drop-off
def read_tweak_df(src: str, color: str) -> pd.DataFrame:
    dict_types = {"store_and_fwd_flag": str}
    cols_dict = {"tpep_pickup_datetime": "pickup_datetime", 
                 "tpep_dropoff_datetime": "dropoff_datetime",
                 "lpep_pickup_datetime": "pickup_datetime", 
                 "lpep_dropoff_datetime": "dropoff_datetime"
                 }
    
    df = (pd.read_csv(src, parse_dates=[1,2], dtype=dict_types, compression="gzip")
          .assign(category = color)
          .rename(columns=cols_dict)
          .fillna(value={"passenger_count": 0})
        )
    print(f"Data frame number of rows: {df.shape[0]}")
    return df
 
df_green = read_tweak_df(source_file_green, color="green")
df_green.head()

Data frame number of rows: 575685


Unnamed: 0,VendorID,pickup_datetime,dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,category
0,2,2019-02-01 00:10:19,2019-02-01 00:21:43,N,1,92,135,1,2.79,11.0,...,0.5,3.08,0.0,,0.3,15.38,1,1,0.0,green
1,2,2019-02-01 00:02:16,2019-02-01 00:24:37,N,1,66,36,1,4.46,17.5,...,0.5,3.76,0.0,,0.3,22.56,1,1,0.0,green
2,2,2019-02-01 00:37:19,2019-02-01 00:43:07,N,1,255,112,1,1.26,6.0,...,0.5,1.46,0.0,,0.3,8.76,1,1,0.0,green
3,1,2019-02-01 00:10:10,2019-02-01 00:12:21,N,1,75,238,1,0.7,4.0,...,0.5,0.0,0.0,,0.3,5.3,2,1,0.0,green
4,1,2019-02-01 00:30:19,2019-02-01 00:46:14,N,1,75,48,1,3.9,14.5,...,0.5,0.0,0.0,,0.3,15.8,2,1,0.0,green


In [47]:
df_green.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 575685 entries, 0 to 575684
Data columns (total 21 columns):
 #   Column                 Non-Null Count   Dtype         
---  ------                 --------------   -----         
 0   VendorID               575685 non-null  int64         
 1   pickup_datetime        575685 non-null  datetime64[ns]
 2   dropoff_datetime       575685 non-null  datetime64[ns]
 3   store_and_fwd_flag     575685 non-null  object        
 4   RatecodeID             575685 non-null  int64         
 5   PULocationID           575685 non-null  int64         
 6   DOLocationID           575685 non-null  int64         
 7   passenger_count        575685 non-null  int64         
 8   trip_distance          575685 non-null  float64       
 9   fare_amount            575685 non-null  float64       
 10  extra                  575685 non-null  float64       
 11  mta_tax                575685 non-null  float64       
 12  tip_amount             575685 non-null  floa

In [50]:
df_yellow = read_tweak_df(source_file_yellow, color="yellow")
df_yellow.head()

Data frame number of rows: 237993


Unnamed: 0,VendorID,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,category
0,1.0,2020-04-01 00:41:22,2020-04-01 01:01:53,1.0,1.2,1.0,N,41,24,2.0,5.5,0.5,0.5,0.0,0.0,0.3,6.8,0.0,yellow
1,1.0,2020-04-01 00:56:00,2020-04-01 01:09:25,1.0,3.4,1.0,N,95,197,1.0,12.5,0.5,0.5,2.75,0.0,0.3,16.55,0.0,yellow
2,1.0,2020-04-01 00:00:26,2020-04-01 00:09:25,1.0,2.8,1.0,N,237,137,1.0,10.0,3.0,0.5,1.0,0.0,0.3,14.8,2.5,yellow
3,1.0,2020-04-01 00:24:38,2020-04-01 00:34:38,0.0,2.6,1.0,N,68,142,1.0,10.0,3.0,0.5,1.0,0.0,0.3,14.8,2.5,yellow
4,2.0,2020-04-01 00:13:24,2020-04-01 00:18:26,1.0,1.44,1.0,Y,263,74,1.0,6.5,0.5,0.5,3.0,0.0,0.3,13.3,2.5,yellow


In [51]:
df_yellow.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 237993 entries, 0 to 237992
Data columns (total 19 columns):
 #   Column                 Non-Null Count   Dtype         
---  ------                 --------------   -----         
 0   VendorID               218480 non-null  float64       
 1   pickup_datetime        237993 non-null  datetime64[ns]
 2   dropoff_datetime       237993 non-null  datetime64[ns]
 3   passenger_count        237993 non-null  float64       
 4   trip_distance          237993 non-null  float64       
 5   RatecodeID             218480 non-null  float64       
 6   store_and_fwd_flag     218480 non-null  object        
 7   PULocationID           237993 non-null  int64         
 8   DOLocationID           237993 non-null  int64         
 9   payment_type           218480 non-null  float64       
 10  fare_amount            237993 non-null  float64       
 11  extra                  237993 non-null  float64       
 12  mta_tax                237993 non-null  floa

In [43]:
df_yellow.passenger_count.isna().sum()

19513

In [56]:
# Write DataFrame to BigQuery
def write_bq(df: pd.DataFrame, year: int) -> None:
    gcp_credentials_block = GcpCredentials.load("ny-taxi-gcp-creds")
    df.to_gbq(
        destination_table=f"ny_taxi.yellow_tripdata_{year}",
        project_id="dtc-de-2023",
        credentials=gcp_credentials_block.get_credentials_from_service_account(),
        chunksize=500_000,
        if_exists="append",
    )
    return

In [57]:
# THIS WORKING FINE JUST SET THE OS ENVIRON
# write_bq(df_yellow, year=2019)

100%|██████████| 1/1 [00:00<00:00, 3816.47it/s]


In [None]:
# Define ETL
def etl_web_to_bq(year: int, month: int, color: str):
    color = "yellow"
    year = 2019
    month = 1
    dataset_url = f"https://github.com/DataTalksClub/nyc-tlc-data/releases/download/{color}/{color}_tripdata_{year}-{month:02}.csv.gz

    # Execution
    # Extract data from web
    data_file = get_data_from_web(dataset_url)
    # Read and tweak data frame
    df = read_tweak_df(source_file_green, color=color)
    # Write to BQ
    write_bq(df)
    
    return
    

In [None]:
# Parent ETL
def parent_etl_web_to_bq(years: list(int), months: list(int)):
    year = [2019, 2020]
    months = [1,2,3,4,5,6,7,8,9,10,11,12]
    for year in years:
        for month in months:
            etl_web_to_bq(year, month)

In [None]:
if __name__ == "__main__":
    parent_etl_web_to_bq()


In [2]:
colors = ["green", "yellow"]   
months = [1,2,3,4,5,6,7,8,9,10,11,12]
for color in colors:
    for month in months:
        print(f"{color}-{month}")

green-1
green-2
green-3
green-4
green-5
green-6
green-7
green-8
green-9
green-10
green-11
green-12
yellow-1
yellow-2
yellow-3
yellow-4
yellow-5
yellow-6
yellow-7
yellow-8
yellow-9
yellow-10
yellow-11
yellow-12
