# <center> Ingest NYC Taxi Trips Data into PostgreSQL </center>
---

In [1]:
import os
import getpass
import itertools
import pandas as pd
import pyarrow.parquet as pq
from prefect import flow, task
from sqlalchemy import create_engine

#### Database Credentials

In [2]:
def __connect_db():
    username = getpass.getpass('Enter PostgreSQL Username: ')
    password = getpass.getpass('Enter PostgreSQL Password: ')
    host    = os.getenv('PG_HOST')
    port    = os.getenv('PG_PORT')
    dbname  = os.getenv('PG_DATABASE','ny_taxi')
    print()
    return create_engine(f'postgresql://{username}:{password}@{host}:{port}/{dbname}').url

In [3]:
def connect_to_database(uri=None):
    return create_engine(uri or __connect_db())

#### Utilities

In [4]:
def get_s3_url(service: str, year: int, month: int):
    return f's3://nyc-tlc/trip data/{service}_tripdata_{year}-{month:02d}.parquet'

In [5]:
def get_tablename(service: str, year: int, month: int):
    return f'{service}_taxi_trips_{year}_{month:02d}'

In [6]:
def get_local_filepath(service: str, year: int, month: int):
    datadir = f'data/{service}/{year}/{month:02d}'
    os.makedirs(datadir, exist_ok=True)
    return f'{datadir}/{service}_tripdata_{year}-{month:02d}.parquet'

In [7]:
def get_datetime_columns(service: str):
    if service == 'yellow':
        return ['tpep_pickup_datetime','tpep_dropoff_datetime']
    elif service == 'green':
        return ['tpep_pickup_datetime','tpep_dropoff_datetime']
    else:
        return []

In [8]:
def get_column_mapper(service: str)-> dict:
    if service == 'yellow':
        mapper = dict([
                ('VendorID','vendorid'),('RatecodeID','ratecodeid'),
                ('PULocationID','pickup_locationid'),('DOLocationID','dropoff_locationid'),
                ('tpep_pickup_datetime','pickup_datetime'),('tpep_dropoff_datetime','dropoff_datetime'),
        ])
    elif service == 'green':
        mapper = dict([
                ('VendorID','vendorid'),('RatecodeID','ratecodeid'),
                ('PULocationID','pickup_locationid'),('DOLocationID','dropoff_locationid'),
                ('lpep_pickup_datetime','pickup_datetime'),('lpep_dropoff_datetime','dropoff_datetime'),
        ])
    else:
        mapper = dict()
        
    return mapper

## Extract Data

In [9]:
# @task
def fetch_data(dataset_url: str, filepath: str, service: str):
    print('Extracting Data...')
    print('Data File:', dataset_url, '\n')
    df = pd.read_parquet(dataset_url, engine='fastparquet')
    df.info()
    print('\nDone!')
    return df

## Transform Data

In [10]:
# @task()
def clean_dataframe(df: pd.DataFrame, service: str):
    return df.rename_axis(columns=get_column_mapper(service)).copy()

In [11]:
# @task()
def write_to_local(df: pd.DataFrame, filepath: str):
    df.to_parquet(filepath, compression='snappy')
    return filepath

## Load Data

In [12]:
# @task
def upload_to_database(filename: str, tablename: str, uri):
    print('Loading Data into Database...')
    print('Data Table:', tablename)
    engine = connect_to_database(uri)
    df = pd.read_parquet(filename, engine='fastparquet')
    df.to_sql(tablename, con=engine, schema='staging', if_exists='append',
              chunksize=100_000, method='multi')
    os.remove(filename)
    engine.dispose()
    print('Done!')

## ELT Pipeline

In [None]:
# @flow
def web_to_postgres(service: str, year: int, month: int, uri):
    url = get_s3_url(service, year, month)
    filepath = get_local_filepath(service, year, month)
    tablename = get_tablename(service, year, month)
    # extract
    df_raw = fetch_data(url, filepath, service)

    # transform
    df_clean = clean_dataframe(df_raw, service)
    local_path = write_to_local(df_clean, filepath)
    
    print()
    df_clean.info()
    print()
    
    # load
    upload_to_database(local_path, tablename, uri)

## Run ELT Pipeline

In [14]:
# @flow
def elt_pipeline(start_year: int, end_year: int, services: list):
    uri = __connect_db()
    for year, month, service in itertools.product(range(start_year,end_year+1),range(1,12+1),services):
        web_to_postgres(service, year, month, uri)
    shutil.rmtree('data')

In [None]:
elt_pipeline(2019, 2021, ['yellow','green'])

Enter PostgreSQL Username:  ········
Enter PostgreSQL Password:  ········



Extracting Data...
Data File: s3://nyc-tlc/trip data/yellow_tripdata_2019-01.parquet 

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 7696617 entries, 0 to 7696616
Data columns (total 19 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int64         
 1   tpep_pickup_datetime   datetime64[ns]
 2   tpep_dropoff_datetime  datetime64[ns]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   RatecodeID             float64       
 6   store_and_fwd_flag     object        
 7   PULocationID           int64         
 8   DOLocationID           int64         
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount        

In [None]:
elt_pipeline(2019, 2019, ['fhv'])