In [48]:
import pyarrow.parquet as pq
import pandas as pd


# Read the data

In [49]:
trips = pq.read_table('yellow_tripdata_2022-01.parquet')

### Convert parquet to pandas

In [50]:
trips = trips.to_pandas()

### Preview the data

In [51]:
trips.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2022-01-01 00:35:40,2022-01-01 00:53:29,2.0,3.8,1.0,N,142,236,1,14.5,3.0,0.5,3.65,0.0,0.3,21.95,2.5,0.0
1,1,2022-01-01 00:33:43,2022-01-01 00:42:07,1.0,2.1,1.0,N,236,42,1,8.0,0.5,0.5,4.0,0.0,0.3,13.3,0.0,0.0
2,2,2022-01-01 00:53:21,2022-01-01 01:02:19,1.0,0.97,1.0,N,166,166,1,7.5,0.5,0.5,1.76,0.0,0.3,10.56,0.0,0.0
3,2,2022-01-01 00:25:21,2022-01-01 00:35:23,1.0,1.09,1.0,N,114,68,2,8.0,0.5,0.5,0.0,0.0,0.3,11.8,2.5,0.0
4,2,2022-01-01 00:36:48,2022-01-01 01:14:20,1.0,4.3,1.0,N,68,163,1,23.5,0.5,0.5,3.0,0.0,0.3,30.3,2.5,0.0


### Convert it to CSV


In [52]:
trips.to_csv('yellow_tripdata_2022-01.csv')  

# Get data to the database
### Get schema

In [53]:
schema = pd.io.sql.get_schema(trips, name='yellow_taxi_data')

In [54]:
print(schema)

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


### Convert text to timestamp

In [55]:
trips.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
trips.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

### Put it to postgres

In [56]:
from sqlalchemy import create_engine
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [58]:
print(pd.io.sql.get_schema(trips, name='yellow_taxi_data',con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




### Create a table, insert data chunk by chunk

In [73]:
df_iter = pd.read_csv('yellow_tripdata_2022-01.csv', iterator=True, chunksize=100000)

In [74]:
df=next(df_iter)

In [75]:
len(df)

100000

In [81]:
df.head(n=0).to_sql(name='yellow_taxi_data',con=engine, if_exists='replace')

0

In [77]:
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [78]:
%time df.to_sql(name='yellow_taxi_data',con=engine, if_exists='append')


CPU times: user 4.4 s, sys: 82.2 ms, total: 4.49 s
Wall time: 14.6 s


1000

In [79]:
from time import time

In [82]:
while True:
    t_start = time()
    df=next(df_iter)
    df.to_sql(name='yellow_taxi_data',con=engine, if_exists='append')
    t_end = time()
    print('inserted another chunk..., took %.3f second' %(t_end-t_start))

inserted another chunk..., took 14.237 second
inserted another chunk..., took 14.076 second
inserted another chunk..., took 13.727 second
inserted another chunk..., took 13.911 second
inserted another chunk..., took 14.531 second
inserted another chunk..., took 17.135 second
inserted another chunk..., took 14.697 second
inserted another chunk..., took 14.839 second
inserted another chunk..., took 18.444 second
inserted another chunk..., took 22.927 second
inserted another chunk..., took 15.235 second
inserted another chunk..., took 16.562 second
inserted another chunk..., took 21.595 second
inserted another chunk..., took 15.332 second
inserted another chunk..., took 15.265 second
inserted another chunk..., took 21.326 second
inserted another chunk..., took 20.864 second
inserted another chunk..., took 15.000 second
inserted another chunk..., took 19.291 second
inserted another chunk..., took 74.861 second


  df=next(df_iter)


inserted another chunk..., took 15.914 second
inserted another chunk..., took 9.912 second


StopIteration: 