In [1]:
#import pandas
import pandas as pd

In [2]:
# Path to the Parquet file
file_path = 'yellow_tripdata_2021-01.parquet'

# Read the Parquet file into a Pandas DataFrame
df = pd.read_parquet(file_path, engine='pyarrow')  # or engine='fastparquet'

# Display the first few rows of the DataFrame
# print(df.head())

# Path for the output CSV file
output_csv_path = 'yellow_tripdata_2021-01.csv'

# Write the DataFrame to a CSV file
df.to_csv(output_csv_path, index=False)

# This script read the parquet file and converted it into a csv format 

In [2]:
# because the original dataset is in parquet format, and I want to use the csv format, i will import it now.
df = pd.read_csv('yellow_tripdata_2021-01.csv', nrows=100)

In [3]:
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1.0,2.10,1.0,N,142,43,2,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5,
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1.0,0.20,1.0,N,238,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0,
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1.0,14.70,1.0,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0,
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0.0,10.60,1.0,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0,
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1.0,4.94,1.0,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,2,2021-01-01 00:12:41,2021-01-01 00:26:47,1.0,4.13,1.0,N,161,226,1,14.5,0.5,0.5,3.66,0.0,0.3,21.96,2.5,
96,2,2021-01-01 00:23:29,2021-01-01 00:35:03,2.0,4.12,1.0,N,162,74,2,13.5,0.5,0.5,0.00,0.0,0.3,17.30,2.5,
97,2,2021-01-01 00:46:17,2021-01-01 00:54:25,2.0,2.22,1.0,N,144,170,1,9.0,0.5,0.5,2.56,0.0,0.3,15.36,2.5,
98,2,2021-01-01 00:28:16,2021-01-01 00:51:44,1.0,7.11,1.0,N,264,264,2,23.5,0.5,0.5,0.00,0.0,0.3,24.80,0.0,


In [4]:
#print the schema of the table
print(pd.io.sql.get_schema(df, name='yellow_tripdata_2021-01.csv'))

CREATE TABLE "yellow_tripdata_2021-01.csv" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TEXT,
  "tpep_dropoff_datetime" TEXT,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


In [5]:
# converting the TEXT datatype to TIMESTAMP
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)

In [6]:
print(pd.io.sql.get_schema(df, name='yellow_tripdata_2021-01.csv'))

CREATE TABLE "yellow_tripdata_2021-01.csv" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" REAL,
  "trip_distance" REAL,
  "RatecodeID" REAL,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "airport_fee" REAL
)


In [7]:
#importing sqlalchemy to connect to database
from sqlalchemy import create_engine


In [8]:
# setting up engine
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [9]:
# Test the connection 
connection = engine.connect()
print("Connection successful!")

Connection successful!


In [11]:
print(pd.io.sql.get_schema(df, name='yellow_tripdata_2021-01.csv', con=engine))


CREATE TABLE "yellow_tripdata_2021-01.csv" (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	airport_fee FLOAT(53)
)




In [12]:
# Ingesting the data in chunks
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000)

In [14]:
# iterating to the next 100000
df = next(df_iter)

In [15]:
# verifying that the chunksize is indeed 100000
len(df)

100000

In [18]:
# creating a table using the header of the csv file
df.head(n=0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

0

In [19]:
# inserting the first 100000 chunk of the data into the database. I added a time feature to calculate the time it took
%time df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')

CPU times: user 8.24 s, sys: 221 ms, total: 8.46 s
Wall time: 14.5 s


1000

In [21]:
from time import time 

In [23]:


while True:
    t_start = time()
    
    df = next(df_iter)
    
    df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
    df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
    
    df.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
    
    t_end = time()
    
    print('inserted another chunk, took %.3f second' % (t_end - t_start))

inserted another chunk, took 15.750 second
inserted another chunk, took 15.560 second
inserted another chunk, took 14.081 second
inserted another chunk, took 17.946 second
inserted another chunk, took 16.275 second


  df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)


inserted another chunk, took 18.477 second
inserted another chunk, took 26.244 second
inserted another chunk, took 22.365 second
inserted another chunk, took 21.109 second
inserted another chunk, took 26.139 second


  df = next(df_iter)


inserted another chunk, took 15.589 second
inserted another chunk, took 10.710 second


StopIteration: 