In [1]:
import pandas as pd

  from pandas.core.computation.check import NUMEXPR_INSTALLED


In [19]:
pip install -U SQLAlchemy

Collecting SQLAlchemy
  Downloading SQLAlchemy-2.0.23-cp38-cp38-macosx_10_9_x86_64.whl (2.1 MB)
[K     |████████████████████████████████| 2.1 MB 4.1 MB/s eta 0:00:01
Installing collected packages: SQLAlchemy
  Attempting uninstall: SQLAlchemy
    Found existing installation: SQLAlchemy 1.3.24
    Uninstalling SQLAlchemy-1.3.24:
      Successfully uninstalled SQLAlchemy-1.3.24
Successfully installed SQLAlchemy-2.0.23
Note: you may need to restart the kernel to use updated packages.


In [6]:
import os 
cwd = os.getcwd()
print (cwd)

/Users/usmankhaliq/Projects/data-engineering-lessons/week-1/ingesting_ny_taxi_data_to_postgres


In [2]:
pd.__version__

'1.5.3'

Lets load the parquet file in pandas

In [2]:
dataset_parquet = pd.read_parquet("yellow_tripdata_2023-01.parquet", engine="pyarrow")

In [3]:
dataset_parquet.dtypes

VendorID                          int64
tpep_pickup_datetime     datetime64[ns]
tpep_dropoff_datetime    datetime64[ns]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
airport_fee                     float64
dtype: object

In [4]:
dataset_parquet.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.1,1.0,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.9,1.0,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0


Lets get the schema of the parquet file so that we can create a table in postgresql with the same schema as our parquet file. The following command generates a DDL of the parquet data.

In [5]:
print(pd.io.sql.get_schema(dataset_parquet, name="yellow_taxi_data"))

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


The above DDL query might not work directly in postgres. We therefore need to connect pandas to postgres first. 

In [6]:
from sqlalchemy import create_engine

In [7]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [8]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7fc83a267a00>

In [9]:
print(pd.io.sql.get_schema(dataset_parquet, name="yellow_taxi_data", con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




Since this table is very big(3 million rows), we need to iterate through the parquet file while writing the data in the table in chunks. First, lets create the table only.

In [10]:
dataset_parquet.head(n=0).to_sql( 
    name='yellow_taxi_data',
    con=engine,
    if_exists='replace'
)

0

In [11]:
import pyarrow.parquet as pq
parquet_file = pq.ParquetFile('yellow_tripdata_2023-01.parquet')

In [12]:
for table in parquet_file.iter_batches(batch_size=10000):
    df = table.to_pandas()
    df.to_sql(name='yellow_taxi_data',con=engine,if_exists='append')