Steps:
1. Parse/Read parquet file using pandas
2. Transform timestamp columns in dataform
3. Create and Load postgres table in batch

In [1]:
import pandas as pd
import pyarrow.parquet as pq
import psycopg2

In [2]:
pd.__version__

'2.2.3'

In [3]:
# Open the Parquet file
# parquet_file = pq.ParquetFile('yellow_tripdata_2024-01.parquet')
parquet_file = pq.ParquetFile("yellow_tripdata_2024-01.parquet")

In [4]:
# Initialize an empty DataFrame to store the rows
df = pd.DataFrame()

In [5]:
# Iterate over the file in batches until we have 100 rows
for batch in parquet_file.iter_batches(batch_size=100):
    # Convert the batch to a pandas DataFrame
    batch_df = batch.to_pandas()
    # Append the batch DataFrame to the main DataFrame
    df = pd.concat([df, batch_df])
    # Break the loop if we have reached 100 rows
    if len(df) >= 100:
        df = df.head(100)
        break

In [6]:
# df now contains the first 100 rows
print(df)

    VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0          2  2024-01-01 00:57:55   2024-01-01 01:17:43                1   
1          1  2024-01-01 00:03:00   2024-01-01 00:09:36                1   
2          1  2024-01-01 00:17:06   2024-01-01 00:35:01                1   
3          1  2024-01-01 00:36:38   2024-01-01 00:44:56                1   
4          1  2024-01-01 00:46:51   2024-01-01 00:52:57                1   
..       ...                  ...                   ...              ...   
95         2  2024-01-01 00:54:16   2024-01-01 01:27:40                1   
96         2  2024-01-01 00:13:58   2024-01-01 00:33:39                1   
97         2  2024-01-01 00:35:53   2024-01-01 00:47:59                1   
98         2  2024-01-01 00:50:28   2024-01-01 01:38:39                1   
99         2  2024-01-01 00:18:24   2024-01-01 00:30:39                1   

    trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0      

In [7]:
from sqlalchemy import create_engine

In [8]:
# Create postgres engine
engine = create_engine('postgresql://admin:password@localhost:5432/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x10e819520>

In [9]:
#Create table schema 
df.head(0).to_sql(name = "yellow_taxi_data", con=engine, if_exists='replace')

0

In [10]:
pd.read_sql("select count(*) from yellow_taxi_data;", con=engine)

Unnamed: 0,count
0,0


In [11]:
# Define the batch size
batch_size = 100000
max_rows_inserted = 0

# Iterate over the file in batches
for batch in parquet_file.iter_batches(batch_size=batch_size):
 
    # Limit max rows inserted to the table
    max_rows_inserted = max_rows_inserted + batch_size
    if max_rows_inserted > 200001:
        break
        
    # Convert the batch to a pandas DataFrame
    df = batch.to_pandas()

    # Replace table if loading first batch
    load_mode = 'replace' if max_rows_inserted == batch_size else 'append'
    
    # Insert the DataFrame into the PostgreSQL table
    df.to_sql('yellow_taxi_data', engine, if_exists=load_mode, index=False)
    print(f' {max_rows_inserted} rows inserted')
    
    

 100000 rows inserted
 200000 rows inserted
