In [1]:
import pyarrow.parquet as pq
import pandas as pd
from sqlalchemy import create_engine
from time import time

In [2]:
engine = create_engine('postgresql://root:root@localhost:5433/ny_taxi')
#engine = create_engine("postgresql+psycopg2://user:password@host:port/dbname?client_encoding=utf8")


In [3]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x1bf46c15690>

In [4]:
#Trying the sql connection

query = """
SELECT 1 as number;"""

pd.read_sql(query, con=engine)

Unnamed: 0,number
0,1


In [5]:
parquet_file = 'yellow_tripdata_2021-01.parquet'

In [6]:
df = pd.read_parquet(parquet_file, engine="pyarrow")

In [7]:
# Open the Parquet file
parquet_file = pq.ParquetFile(parquet_file)

# Iterate over row groups
for i in range(parquet_file.num_row_groups):
    df_chunk = parquet_file.read_row_group(i).to_pandas()
    # Process the chunk (df_chunk)
    # print(df_chunk)

In [8]:
# Function to yield chunks of the Parquet file
def read_parquet_in_chunks(parquet_file, chunksize=100000):
    for i in range(parquet_file.num_row_groups):
        df_chunk = parquet_file.read_row_group(i).to_pandas()
        for start in range(0, len(df_chunk), chunksize):
            yield df_chunk[start:start + chunksize]

# Create an iterator
df_iter = read_parquet_in_chunks(parquet_file, chunksize=100000)

# Get the first chunk
df = next(df_iter)
#print(df)

In [9]:
# Check the data types of each column
print(df.dtypes)

VendorID                          int64
tpep_pickup_datetime     datetime64[us]
tpep_dropoff_datetime    datetime64[us]
passenger_count                 float64
trip_distance                   float64
RatecodeID                      float64
store_and_fwd_flag               object
PULocationID                      int64
DOLocationID                      int64
payment_type                      int64
fare_amount                     float64
extra                           float64
mta_tax                         float64
tip_amount                      float64
tolls_amount                    float64
improvement_surcharge           float64
total_amount                    float64
congestion_surcharge            float64
airport_fee                     float64
dtype: object


In [10]:
df.head(100)
len(df)

100000

In [11]:
table_name='yellow_taxi_data'

In [12]:
while True:
    t_start= time()
    df = next(df_iter)
    df.loc[:, "tpep_pickup_datetime"] = pd.to_datetime(df["tpep_pickup_datetime"])
    df.loc[:, "tpep_dropoff_datetime"] = pd.to_datetime(df["tpep_dropoff_datetime"])
    df.to_sql(name=table_name, con=engine, if_exists='replace')
    t_end = time()
    print("inserted another chunk...,it took %.3f seconds"  %(t_end - t_start))


inserted another chunk..., took 8.267 seconds
inserted another chunk..., took 8.038 seconds
inserted another chunk..., took 7.900 seconds
inserted another chunk..., took 7.737 seconds
inserted another chunk..., took 8.018 seconds
inserted another chunk..., took 8.213 seconds
inserted another chunk..., took 8.410 seconds
inserted another chunk..., took 7.973 seconds
inserted another chunk..., took 8.305 seconds
inserted another chunk..., took 8.246 seconds
inserted another chunk..., took 7.978 seconds
inserted another chunk..., took 7.853 seconds
inserted another chunk..., took 5.469 seconds


StopIteration: 

In [30]:

query = """
SELECT *
FROM pg_catalog.pg_tables
WHERE schemaname != 'pg_catalog' AND 
    schemaname != 'information_schema';
"""

pd.read_sql(query, con=engine)


Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
0,public,yellow_taxi_data,root,,True,False,False,False


In [31]:

query = """
SELECT *
FROM yellow_taxi_data LIMIT 10;
"""

pd.read_sql(query, con=engine)


Unnamed: 0,false,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,
5,5,1,2021-01-01 00:16:29,2021-01-01 00:24:30,1.0,1.6,1.0,N,224,68,1,8.0,3.0,0.5,2.35,0.0,0.3,14.15,2.5,
6,6,1,2021-01-01 00:00:28,2021-01-01 00:17:28,1.0,4.1,1.0,N,95,157,2,16.0,0.5,0.5,0.0,0.0,0.3,17.3,0.0,
7,7,1,2021-01-01 00:12:29,2021-01-01 00:30:34,1.0,5.7,1.0,N,90,40,2,18.0,3.0,0.5,0.0,0.0,0.3,21.8,2.5,
8,8,1,2021-01-01 00:39:16,2021-01-01 01:00:13,1.0,9.1,1.0,N,97,129,4,27.5,0.5,0.5,0.0,0.0,0.3,28.8,0.0,
9,9,1,2021-01-01 00:26:12,2021-01-01 00:39:46,2.0,2.7,1.0,N,263,142,1,12.0,3.0,0.5,3.15,0.0,0.3,18.95,2.5,


In [32]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)


