In [20]:
# for more about pandas
#https://github.com/alexeygrigorev/mlbookcamp-code/blob/master/course-zoomcamp/01-intro/09-pandas.md
# https://gist.github.com/ziritrion/9b80e47956adc0f20ecce209d494cd0a#pandas
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import time
from sqlalchemy import create_engine

In [None]:
# in the "data_dictionary_trip_records_yellow.pdf" file, we have access
# to an explanation on all the columns present in the dataset

In [3]:
# we load the data for yellow taxi trip records
# we have to load the data like this because the website
# used to have the data in csv format, but now it's in parquet format
trips = pq.read_table('data/yellow_tripdata_2021-01.parquet')
df = trips.to_pandas()

In [4]:
# still, we get our dataframe and now we can access the data
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.10,1.0,N,142,43,2,8.00,3.00,0.5,0.00,0.00,0.3,11.80,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.20,1.0,N,238,151,2,3.00,0.50,0.5,0.00,0.00,0.3,4.30,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.70,1.0,N,132,165,1,42.00,0.50,0.5,8.65,0.00,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.60,1.0,N,138,132,1,29.00,0.50,0.5,6.05,0.00,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.50,0.50,0.5,4.06,0.00,0.3,24.36,2.5,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1369764,2,2021-01-31 23:03:00,2021-01-31 23:33:00,,8.89,,,229,181,0,27.78,0.00,0.5,7.46,0.00,0.3,38.54,,
1369765,2,2021-01-31 23:29:00,2021-01-31 23:51:00,,7.43,,,41,70,0,32.58,0.00,0.5,0.00,6.12,0.3,39.50,,
1369766,2,2021-01-31 23:25:00,2021-01-31 23:38:00,,6.26,,,74,137,0,16.85,0.00,0.5,3.90,0.00,0.3,24.05,,
1369767,6,2021-01-31 23:01:06,2021-02-01 00:02:03,,19.70,,,265,188,0,53.68,0.00,0.5,0.00,0.00,0.3,54.48,,


In [5]:
# now we use this command to generate a statement to put the schema into our postgres
# convert dataframe to DDL
print(pd.io.sql.get_schema(df, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


In [6]:
# Not good, this is general SQL statement. Postgres won't understand it. 
# We need to connect to the postgres db so that pandas can understand what we need to do
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
# NOTE: this did NOT work for me in any way until I created a new cell
# and executed the following line:
# !pip install psycopg2-binary
# (Source: https://stackoverflow.com/questions/65517587/modulenotfounderror-no-module-named-psycopg2-in-ipython)

In [7]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x151ef0da210>

In [8]:
# now we do the previous command again but we specify the connection
# and now we have the statement in postgresql
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [None]:
# Now we have another problem: we cannot do 1 million three hundred sixty nine thousand inserts at the same time.
# We might need to batch it
# Problem is, the video tutorial is from 2022 and back then they didn't have to deal with parquet files
# The following code is a mix of code from the zoomcamp's slack, the pyarrow and pandas documentation, and personal creative leaps

In [9]:
pf = pq.ParquetFile('data/yellow_tripdata_2021-01.parquet')

In [18]:
## IGNORE THIS CELL, I just wanted to see how batch worked
flag = False
for batch in pf.iter_batches(batch_size=10000):
    if flag == False:
        df = batch.to_pandas()
    else:
        flag = True
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2021-01-28 13:06:46,2021-01-28 13:15:50,,0.75,,,188,71,0,13.45,2.75,0.5,0.00,0.00,0.3,17.00,,
1,2,2021-01-28 13:47:35,2021-01-28 14:09:39,,3.53,,,188,89,0,13.45,2.75,0.5,0.00,0.00,0.3,17.00,,
2,6,2021-01-28 13:01:01,2021-01-28 13:01:03,,8.32,,,265,197,0,39.38,0.00,0.5,0.00,0.00,0.3,40.18,,
3,2,2021-01-28 13:52:00,2021-01-28 14:25:00,,8.54,,,95,140,0,30.12,2.75,0.5,0.00,6.12,0.3,39.79,,
4,2,2021-01-28 13:17:00,2021-01-28 13:31:00,,3.26,,,51,18,0,28.95,2.75,0.5,0.00,0.00,0.3,32.50,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
9764,2,2021-01-31 23:03:00,2021-01-31 23:33:00,,8.89,,,229,181,0,27.78,0.00,0.5,7.46,0.00,0.3,38.54,,
9765,2,2021-01-31 23:29:00,2021-01-31 23:51:00,,7.43,,,41,70,0,32.58,0.00,0.5,0.00,6.12,0.3,39.50,,
9766,2,2021-01-31 23:25:00,2021-01-31 23:38:00,,6.26,,,74,137,0,16.85,0.00,0.5,3.90,0.00,0.3,24.05,,
9767,6,2021-01-31 23:01:06,2021-02-01 00:02:03,,19.70,,,265,188,0,53.68,0.00,0.5,0.00,0.00,0.3,54.48,,


In [34]:
flag = False
logs = []
tot_start = time.time()
for batch in pf.iter_batches(batch_size=40000):
    t_start = time.time()
    table = pa.Table.from_batches([batch])
    df = table.to_pandas()
    
    if flag == False:
        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')
        flag = True
    else:
        df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
        
    t_end = time.time()
    
    log = "Inserted another chunk in " + str(t_end - t_start) + " second"
    logs.append(log)
    print(log)
tot_end = time.time()
print("Finished ingesting data into the postgres database. It took " + str(len(log)) + " chunks and " + str(tot_end - tot_start) + " seconds.")

Inserted another chunk in 2.5044078826904297 second
Inserted another chunk in 2.9529852867126465 second
Inserted another chunk in 2.511495590209961 second
Inserted another chunk in 2.631087064743042 second
Inserted another chunk in 2.6561014652252197 second
Inserted another chunk in 3.2258317470550537 second
Inserted another chunk in 2.9702281951904297 second
Inserted another chunk in 3.0602636337280273 second
Inserted another chunk in 2.864670515060425 second
Inserted another chunk in 2.4757919311523438 second
Inserted another chunk in 2.6470301151275635 second
Inserted another chunk in 2.5266873836517334 second
Inserted another chunk in 2.4183850288391113 second
Inserted another chunk in 2.656714916229248 second
Inserted another chunk in 2.5153024196624756 second
Inserted another chunk in 2.770749568939209 second
Inserted another chunk in 2.4463677406311035 second
Inserted another chunk in 2.615565776824951 second
Inserted another chunk in 2.678389072418213 second
Inserted another ch

TypeError: can only concatenate str (not "float") to str