In [1]:
import pandas as pd
from sqlalchemy import create_engine
from time import time

In [3]:
# Load the first 100 rows and inspect them
df_TEST = pd.read_csv('yellow_tripdata_2021-01.csv', nrows=5)
df_TEST

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.1,1,N,142,43,2,8.0,3.0,0.5,0.0,0,0.3,11.8,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.2,1,N,238,151,2,3.0,0.5,0.5,0.0,0,0.3,4.3,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.7,1,N,132,165,1,42.0,0.5,0.5,8.65,0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.6,1,N,138,132,1,29.0,0.5,0.5,6.05,0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0,0.3,24.36,2.5


In [5]:
# We can see that the there are two columns which are now read as TEXT but should actually be 
# of type datetime. We will do this during the ingestion
print(pd.io.sql.get_schema(df_TEST, name='yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" INTEGER,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL
)


In [7]:
# Now we need to create a schema which is suitable for Postgres.
# To this aim we loaded sqlalchemy. We create an engine with the specs
# of our dockerized Postgres server
# The username, pw and df_name are those that we defined in the Dockerfile

# Note that the first time we run this it might complain that we don't have psycopg2
# On my Linux Ubuntu 22.04 this requires installing the PostgreSQL development packages 
# and then pip install psycopg2
#
# sudo apt-get install libpq-dev
# pip install psycopg2

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

print(pd.io.sql.get_schema(df_TEST, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TEXT, 
	tpep_dropoff_datetime TEXT, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount BIGINT, 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [8]:
# Read the csv into an iterator, so that we will ingest it in chunks of 100000 rows
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=1e5)

# Create a connection with the Postgres db
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

# First we write the header - we can then check it inside pgcli using "\d ny_taxi"
df_TEST.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

# Then all the data
while True:
    t_start = time()

    df = next(df_iter)

    # fix the type of these two fields. Shuold be datetime
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

    df.to_sql(name = 'yellow_taxi_data', con=engine, if_exists='append')

    t_end = time()
    
    print('inserted another chunk... it took %.3f seconds' % (t_end - t_start))

inserted another chunk... it took 11.256 seconds
inserted another chunk... it took 11.149 seconds
inserted another chunk... it took 11.184 seconds
inserted another chunk... it took 11.316 seconds
inserted another chunk... it took 11.193 seconds
