In [1]:
!pip install pandas sqlalchemy psycopg2-binary
import pandas as pd
import pyarrow.parquet as pq
from sqlalchemy import create_engine
from time import time



In [2]:
df = pd.read_parquet('yellow_tripdata_2024-01.parquet')

In [3]:
# tell pandas to convert these columns to datetime format 
pd.to_datetime(df.tpep_pickup_datetime)
pd.to_datetime(df.tpep_dropoff_datetime)

0         2024-01-01 01:17:43
1         2024-01-01 00:09:36
2         2024-01-01 00:35:01
3         2024-01-01 00:44:56
4         2024-01-01 00:52:57
                  ...        
2964619   2024-01-31 23:54:36
2964620   2024-01-31 23:27:52
2964621   2024-01-31 23:38:00
2964622   2024-01-31 23:25:14
2964623   2024-02-01 00:13:30
Name: tpep_dropoff_datetime, Length: 2964624, dtype: datetime64[us]

In [4]:
pd.io.sql.get_schema(df, name='yellow_taxi_data')

'CREATE TABLE "yellow_taxi_data" (\n"VendorID" INTEGER,\n  "tpep_pickup_datetime" TIMESTAMP,\n  "tpep_dropoff_datetime" TIMESTAMP,\n  "passenger_count" REAL,\n  "trip_distance" REAL,\n  "RatecodeID" REAL,\n  "store_and_fwd_flag" TEXT,\n  "PULocationID" INTEGER,\n  "DOLocationID" INTEGER,\n  "payment_type" INTEGER,\n  "fare_amount" REAL,\n  "extra" REAL,\n  "mta_tax" REAL,\n  "tip_amount" REAL,\n  "tolls_amount" REAL,\n  "improvement_surcharge" REAL,\n  "total_amount" REAL,\n  "congestion_surcharge" REAL,\n  "Airport_fee" REAL\n)'

In [5]:
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [6]:
engine.connect()

<sqlalchemy.engine.base.Connection at 0x11b107d40>

In [7]:
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" INTEGER, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" INTEGER, 
	"DOLocationID" INTEGER, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	"Airport_fee" FLOAT(53)
)




In [8]:
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [9]:
def insert_chunks(parquet_file, table_name, engine):
    num_row_groups = parquet_file.num_row_groups
    start = 0

    while start < num_row_groups:
        t_start = time()
        end = start + 1  

        try:
            df = parquet_file.read_row_group(start).to_pandas()

            print(f"Row group {start+1}/{num_row_groups}, records: {len(df)}")

            df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
            df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

            df.to_sql(name=table_name, con=engine, if_exists='append', index=False)

            t_end = time()
            print(f"Inserted chunk {start+1}/{num_row_groups}, took {t_end - t_start:.3f} seconds")

        except Exception as e:
            print(f"Error inserting chunk {start+1}: {str(e)}")

        start = end

parquet_file = pq.ParquetFile('yellow_tripdata_2024-01.parquet')
insert_chunks(parquet_file, 'yellow_taxi_data', engine)

Row group 1/3, records: 1048576
Inserted chunk 1/3, took 42.183 seconds
Row group 2/3, records: 1048576
Inserted chunk 2/3, took 42.909 seconds
Row group 3/3, records: 867472
Inserted chunk 3/3, took 34.292 seconds


In [12]:
# df_iter = pd.read_csv('yellow_tripdata_2024-01.csv', iterator=True, chunksize=100000)
# df = next(df_iter)
# len(df)

# pd.to_datetime(df.tpep_pickup_datetime)
# pd.to_datetime(df.tpep_dropoff_datetime)
# df.head()

# # only insert heads 
# df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

# check the columns & column types in pgcli 
# \d yellow_taxi_data 
# %time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

In [11]:
# insert all data in chunks 
# while True: 
#     t_start = time()
#     df = next(df_iter)
    
#     pd.to_datetime(df.tpep_pickup_datetime)
#     pd.to_datetime(df.tpep_dropoff_datetime)

#     df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

#     t_end = time()

#     print('inserted another chunk..., took %.3f second', % (t_end - t_start))