In [1]:
import pandas as pd

READ DATA

In [20]:
df = pd.read_csv('data/yellow_tripdata_2021-01.csv', nrows=100)
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,


CHECK THE SCHEMA USING io.sql.get_schema

the pd.io.sql.get_schema() method can be used to generate a DDL for a table creation. 

In [4]:
print(pd.io.sql.get_schema(df, 'yellow_taxi_data'))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


Now we have the schema and a DDL we can use to create a table.

**SCHEMA DISCREPANCIES**

Notice that columns like the following:
- pickup date ("tpep_pickup_datetime" TEXT)
- dropoff date ("tpep_dropoff_datetime" TEXT)

are have a datatype of TEXT instead of datetime. So I will fix that before loading to postgres.

There are other datatypes like REAL, which are not optimal compared to decimals etc, but i will transform that later. For now I will fix the datetime format

In [21]:
df['tpep_pickup_datetime'] = pd.to_datetime(df.tpep_pickup_datetime)
df['tpep_dropoff_datetime'] = pd.to_datetime(df.tpep_dropoff_datetime)


CREATE A CONNECTION TO POSTGRES

In [12]:
from sqlalchemy import create_engine

In [13]:
engine = create_engine('postgresql://root:password@localhost:5432/ny_taxi_db')

Engine has been created successfuly. Now I will create a schema with the connection to have an actual SQL schema

In [24]:
print(pd.io.sql.get_schema(df, 'yellow_taxi_data', con=engine))

table_ddl = pd.io.sql.get_schema(df, 'yellow_taxi_data', con=engine)


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




Now that we have completed the first part of the ingesting
- loading the data
- transforming the data by converting datatypes
- creating a postgres connection using sqlalchemy
- generating a DDL for ingestion

Let's read the the data in chunks of 100k rows because we do not want to burden the memory to load all 1.3M

READ DATA IN CHUNKS

In [40]:
#iterate the loading process
df_iter = pd.read_csv("data/yellow_tripdata_2021-01.csv", iterator=True, chunksize=100000)
df_iter

<pandas.io.parsers.readers.TextFileReader at 0x210c768a760>

In [41]:
# load the next chunk
df = next(df_iter)

print("length of df:",len(df))

length of df: 100000


In [36]:
# perform transformations

df['tpep_pickup_datetime'] = pd.to_datetime(df.tpep_pickup_datetime)
df['tpep_dropoff_datetime'] = pd.to_datetime(df.tpep_dropoff_datetime)

In [43]:
# create the table 

df.head(0).to_sql(name="yellow_taxi_data", con=engine, if_exists="replace")

0

In [44]:
# write data to postgres in batches - chunks
while True:
    df = next(df_iter)
    df['tpep_pickup_datetime'] = pd.to_datetime(df.tpep_pickup_datetime, errors="coerce")
    df['tpep_dropoff_datetime'] = pd.to_datetime(df.tpep_dropoff_datetime, errors="coerce")
    df.to_sql(name="yellow_taxi_data", con=engine, if_exists="append")
    print("inserted a chunk of 100_000")

inserted a chunk of 100_000
inserted a chunk of 100_000
inserted a chunk of 100_000
inserted a chunk of 100_000
inserted a chunk of 100_000


  df['tpep_dropoff_datetime'] = pd.to_datetime(df.tpep_dropoff_datetime, errors="coerce")


inserted a chunk of 100_000
inserted a chunk of 100_000
inserted a chunk of 100_000
inserted a chunk of 100_000
inserted a chunk of 100_000


  df = next(df_iter)


inserted a chunk of 100_000
inserted a chunk of 100_000


StopIteration: 