## Wed, July 6th 2022

1. Create conda environment (Done)
2. Upload dataset into Postgres Database by using Python
3. Query from Jupyter Lab

### Dataset

Yellow Taxi Trip Data, January 2021 and Taxi Zone: https://drive.google.com/drive/u/1/folders/1F0csrG_2vgfmB152dMMbw7jvAlWRUmim

### Create a connection to Postgres Database

In [1]:
from sqlalchemy import create_engine
import psycopg2
import pandas as pd

# create connection to psql
# notes: create_engine(f"postgresql://{user_id}:{password}@localhost:5432/{database_name}")
db_connection = create_engine("postgresql://postgres:abem1593574628@localhost:5432/practice_dataset")
display(db_connection.connect())

# query function
def query_result(query, con=db_connection):
    return pd.read_sql(query, con=con)

<sqlalchemy.engine.base.Connection at 0x221b3500a00>

### Upload a csv file to Postgres Database

In [2]:
df = pd.read_csv('taxi+_zone_lookup.csv')
df.sample(5)

Unnamed: 0,LocationID,Borough,Zone,service_zone
82,83,Queens,Elmhurst/Maspeth,Boro Zone
52,53,Queens,College Point,Boro Zone
129,130,Queens,Jamaica,Boro Zone
61,62,Brooklyn,Crown Heights South,Boro Zone
81,82,Queens,Elmhurst,Boro Zone


In [3]:
df.shape

(265, 4)

In [8]:
# initialize the header of table
df.head(n=0).to_sql(
    name='taxi_zone', 
    con=db_connection, 
    if_exists='replace', 
    index=False
)

In [9]:
# test if header already added or not
query_result(
"""
SELECT * FROM taxi_zone
"""
)

Unnamed: 0,LocationID,Borough,Zone,service_zone


In [10]:
# append the rest of dataset
df.to_sql(
    name='taxi_zone', 
    con=db_connection, 
    if_exists='append', 
    index=False
)

In [11]:
# test if all data have been added or not
query_result(
"""
SELECT COUNT(*) FROM taxi_zone
"""
)

Unnamed: 0,count
0,265


### Upload a parquet file to Postgres Database

In [12]:
df = pd.read_parquet("yellow_tripdata_2021-01.parquet")
df.shape

(1369769, 19)

In [14]:
# initialize the header of database
df.head(n=0).to_sql(
    name='yellow_taxi_data', 
    con=db_connection, 
    if_exists='replace', 
    index=False
)

In [16]:
# test if header already exists
query = """
    SELECT * FROM yellow_taxi_data
"""

pd.read_sql(query, con=db_connection)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee


In [21]:
import pyarrow.parquet as pq

# create generator for parquet file
parquet_file = pq.ParquetFile("./yellow_tripdata_2021-01.parquet")
parquet_data_generator = parquet_file.iter_batches(batch_size=100000)

# parse each batch to pandas
batch_df = next(parquet_data_generator).to_pandas()

# parse to datetime
batch_df["tpep_pickup_datetime"] = pd.to_datetime(batch_df["tpep_pickup_datetime"])
batch_df["tpep_dropoff_datetime"] = pd.to_datetime(batch_df["tpep_dropoff_datetime"])

# migrate to database
%time batch_df.to_sql(name='yellow_taxi_data', con=db_connection, if_exists='append', index=False)

CPU times: total: 20.7 s
Wall time: 50.2 s


In [22]:
# test if header already exists
query = """
    SELECT COUNT(*) FROM yellow_taxi_data
"""

pd.read_sql(query, con=db_connection)

Unnamed: 0,count
0,100000


### Upload all parquet data to Postgres Database

In [25]:
from time import time
import pandas as pd
import pyarrow.parquet as pq

# initialize the header of database
df = pd.read_parquet("./yellow_tripdata_2021-01.parquet")
df.head(n=0).to_sql(
    name='yellow_taxi_data', 
    con=db_connection, 
    if_exists='replace', 
    index=False
)

# create generator for parquet file
parquet_file = pq.ParquetFile("./yellow_tripdata_2021-01.parquet")
parquet_data_generator = parquet_file.iter_batches()

# upload to database
while True: 
    try:
        t_start = time()
        batch_df = next(parquet_data_generator).to_pandas()

        batch_df["tpep_pickup_datetime"] = pd.to_datetime(batch_df["tpep_pickup_datetime"])
        batch_df["tpep_dropoff_datetime"] = pd.to_datetime(batch_df["tpep_dropoff_datetime"])
        
        batch_df.to_sql(
            name='yellow_taxi_data', 
            con=db_connection, 
            if_exists='append', 
            index=False
        )
        t_end = time()
        print('inserted another chunk, took %.3f second' % (t_end - t_start))
    except StopIteration:
        print('completed')
        break

inserted another chunk, took 37.625 second
inserted another chunk, took 39.127 second
inserted another chunk, took 30.128 second
inserted another chunk, took 29.762 second
inserted another chunk, took 29.479 second
inserted another chunk, took 29.430 second
inserted another chunk, took 31.076 second
inserted another chunk, took 30.416 second
inserted another chunk, took 29.366 second
inserted another chunk, took 30.024 second
inserted another chunk, took 31.052 second
inserted another chunk, took 29.924 second
inserted another chunk, took 29.726 second
inserted another chunk, took 29.088 second
inserted another chunk, took 29.643 second
inserted another chunk, took 29.301 second
inserted another chunk, took 28.878 second
inserted another chunk, took 29.412 second
inserted another chunk, took 29.044 second
inserted another chunk, took 28.255 second
inserted another chunk, took 24.780 second
completed


In [26]:
# test if header already exists
query = """
    SELECT COUNT(*) FROM yellow_taxi_data
"""

pd.read_sql(query, con=db_connection)

Unnamed: 0,count
0,1369769


In [28]:
query = """
    SELECT * FROM yellow_taxi_data
    LIMIT 10
"""

pd.read_sql(query, con=db_connection)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.1,1.0,N,142,43,2,8.0,3.0,0.5,0.0,0.0,0.3,11.8,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.2,1.0,N,238,151,2,3.0,0.5,0.5,0.0,0.0,0.3,4.3,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.7,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.6,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,
5,1,2021-01-01 00:16:29,2021-01-01 00:24:30,1.0,1.6,1.0,N,224,68,1,8.0,3.0,0.5,2.35,0.0,0.3,14.15,2.5,
6,1,2021-01-01 00:00:28,2021-01-01 00:17:28,1.0,4.1,1.0,N,95,157,2,16.0,0.5,0.5,0.0,0.0,0.3,17.3,0.0,
7,1,2021-01-01 00:12:29,2021-01-01 00:30:34,1.0,5.7,1.0,N,90,40,2,18.0,3.0,0.5,0.0,0.0,0.3,21.8,2.5,
8,1,2021-01-01 00:39:16,2021-01-01 01:00:13,1.0,9.1,1.0,N,97,129,4,27.5,0.5,0.5,0.0,0.0,0.3,28.8,0.0,
9,1,2021-01-01 00:26:12,2021-01-01 00:39:46,2.0,2.7,1.0,N,263,142,1,12.0,3.0,0.5,3.15,0.0,0.3,18.95,2.5,


In [33]:
df['duration'] = df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']
df['duration'] = df['duration'].dt.total_seconds() / 60

df = df[(df['duration'] >= 1) & (df['duration'] <= 60)].copy()
df.shape

(1343254, 20)