# Upload the data from .csv to PostgreSQL database

In [1]:
from sqlalchemy import create_engine
from pyarrow.dataset import dataset
import pyarrow.parquet as pq
import pandas as pd

from time import time

In [12]:
input_file = "yellow_tripdata_2021-01.parquet"
output_file = input_file.split(".")[0] + ".csv"
table_name = "yellow_taxi_data2"

In [13]:
# Download NY Taxi data
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/{input_file}

--2022-10-15 12:25:33--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 54.230.159.130, 54.230.159.160, 54.230.159.45, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|54.230.159.130|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 21686067 (21M) [application/x-www-form-urlencoded]
Saving to: ‘yellow_tripdata_2021-01.parquet’


2022-10-15 12:25:37 (5.95 MB/s) - ‘yellow_tripdata_2021-01.parquet’ saved [21686067/21686067]



__Convert data from parquet to csv__

In [14]:
df = pq.read_table(input_file)
df = df.to_pandas().to_csv(output_file)

In [3]:
# Create an engine to connect to postgresql
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7f84d899bad0>

In [16]:
# Drop table
query = f"""
DROP TABLE IF EXISTS "{table_name}";
"""

results = engine.execute(query)

In [17]:
def transform(df):
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [18]:
df_iter = pd.read_csv(output_file, iterator=True, chunksize=100000)

while True: 
    t_start = time()

    df = next(df_iter)

    transform(df)
    df.to_sql(name=table_name, con=engine, if_exists='append')

    t_end = time()

    print(f"Inserted {df.shape[0]} rows, took {(t_end - t_start):.2f}")
    
    # Restriction for the test purposes
    break

Inserted 100000 rows, took 8.81


In [19]:
!rm {input_file} {output_file}

In [20]:
print(pd.io.sql.get_schema(df, name=table_name, con=engine))


CREATE TABLE yellow_taxi_data2 (
	"Unnamed: 0" BIGINT, 
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [17]:
assert False

AssertionError: 

In [2]:
!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv

--2022-10-15 18:23:12--  https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 54.230.159.160, 54.230.159.45, 54.230.159.124, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|54.230.159.160|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [text/csv]
Saving to: ‘taxi+_zone_lookup.csv.1’


2022-10-15 18:23:12 (2.41 MB/s) - ‘taxi+_zone_lookup.csv.1’ saved [12322/12322]



In [4]:
df_zones = pd.read_csv('taxi+_zone_lookup.csv')

In [5]:
df_zones.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [6]:
df_zones.to_sql(name='zones', con=engine, if_exists='replace')