## Dependencies and Query Engine

In [2]:
import pandas as pd
import pyarrow.parquet as pq

from sqlalchemy import create_engine
from time import time

pd.options.display.max_columns = 50

db_con = create_engine(f"postgresql://root:root@localhost:5432/ny_taxi")
db_con.connect()

<sqlalchemy.engine.base.Connection at 0x7fe2dec1bdc0>

In [3]:
def sql_query(query, con=db_con):
    return pd.read_sql(query, con=con)

def sql_data_manipulation(query, con=db_con):
    return con.execute(query)

## Batch preprocessing and upload all data to database

In [None]:
def migrate_data_to_postgresdb(
        file_path="../yellow_tripdata_2021-01.parquet",
        table_name="yellow_taxi_data",
        db_connection=engine,
    ):
    # initialize the header of database
    parquet_file = pq.ParquetFile(file_path)
    df = parquet_file.read().to_pandas()
    df.head(n=0).to_sql(
        name=table_name, 
        con=db_connection, 
        if_exists='replace', 
        index=False
    )
    # create generator for parquet file
    parquet_data_generator = parquet_file.iter_batches()

    # upload to database
    while True: 
        try:
            t_start = time()
            batch_df = next(parquet_data_generator).to_pandas()

            batch_df["tpep_pickup_datetime"] = pd.to_datetime(batch_df["tpep_pickup_datetime"])
            batch_df["tpep_dropoff_datetime"] = pd.to_datetime(batch_df["tpep_dropoff_datetime"])

            batch_df.to_sql(
                name=table_name, 
                con=db_connection, 
                if_exists='append', 
                index=False
            )
            t_end = time()
            print('inserted another chunk, took %.3f second' % (t_end - t_start))
        
        except StopIteration:
            print('completed')
            break

    # add trip_id as primary key
    query = """
    ALTER TABLE yellow_taxi_data
        ADD COLUMN trip_id SERIAL PRIMARY KEY;
    """
    db_connection.execute(query)
    
%time migrate_data_to_postgresdb()