# ETL Pipeline
## EDA 

In [53]:
import pandas as pd
import os
from time import time

In [2]:
pd.__version__

'2.2.0'

In [31]:
data_folder= 'data'
file_csv= 'yellow_taxi_2021_01.csv'
data_desti = os.path.join(data_folder, file_csv)

In [37]:
# Reading parquet file directly from the URL into pandas
# But the read_parquet() function does not allow chunking and iteration
# Hence the better option is to download the file to the disk as csv and then read it from the disk in chunks
url="https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet"
df_raw=pd.read_parquet(url)

In [4]:
df_raw.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [5]:
df_raw.shape

(1369769, 19)

In [14]:
# Creating a new dataframe with only 100 rows to generate the schema
df = df_raw[:100].copy()

In [15]:
df.shape

(100, 19)

In [16]:
# Generating the schema required for creating a table in the db for this dataset
print(pd.io.sql.get_schema(df, 'yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


### Testing Data Type transformations

In [17]:
# Converting the datatype pf columns total_amount, passenger_count, RatecodeID to appropriate type
df.passenger_count = df['passenger_count'].astype('int')
df.RatecodeID = df['RatecodeID'].astype('int')
df.total_amount = df['total_amount'].astype('int')

In [18]:
print(pd.io.sql.get_schema(df, 'yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" INTEGER,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


### Using SQL Alchemy to extract the DDL

In [20]:
# Generating the actual DDL using sqlalchemy
from sqlalchemy import create_engine
engine = create_engine('postgresql://codespace:root@localhost:5432/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7f95bff72790>

In [21]:
print(pd.io.sql.get_schema(df, 'yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount BIGINT, 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




## Extraction and Loading in Batches

In [38]:
# Pandas cannot chunk while using parquer files
# Hence writing parquer files as csv to disk 
df_raw.to_csv(data_desti, index=False)

In [81]:
# Now reading that csv in chunk format
df_iter = pd.read_csv(data_desti, iterator=True, chunksize=100000, low_memory=False)
df = next(df_iter)

In [64]:
df.shape

(100000, 19)

In [57]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


In [58]:
# extract columns only
#col_names = df.columns.to_list()

In [82]:
# inserting column names as first write to db
df.head(0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [75]:
#Testing insertion of one chunk
#%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: user 5.05 s, sys: 43.8 ms, total: 5.1 s
Wall time: 7.55 s


1000

In [83]:
while df is not None:
    t_start = time()
    # Converting the datatype pf columns total_amount, passenger_count, RatecodeID to appropriate type
    df.passenger_count = df['passenger_count'].fillna(0).astype('int')
    df.RatecodeID = df['RatecodeID'].fillna(0).astype('int')
    df.total_amount = df['total_amount'].fillna(0).astype('int')
    df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
    t_end = time()
    print(f'Inserted chunk in {t_end-t_start:0,.2f} seconds')
    df = next(df_iter, None)

Inserted chunk in 7.41 seconds
Inserted chunk in 6.75 seconds
Inserted chunk in 7.30 seconds
Inserted chunk in 7.16 seconds
Inserted chunk in 7.55 seconds
Inserted chunk in 7.24 seconds
Inserted chunk in 7.18 seconds
Inserted chunk in 6.89 seconds
Inserted chunk in 7.51 seconds
Inserted chunk in 6.94 seconds
Inserted chunk in 7.47 seconds
Inserted chunk in 7.09 seconds
Inserted chunk in 6.86 seconds
Inserted chunk in 5.46 seconds
