In [None]:
Data loading

Here we will be using the .paraquet file we downloaded and do the following:

Check metadata and table datatypes of the paraquet file/table
Convert the paraquet file to pandas dataframe and check the datatypes. Additionally check the data dictionary to make sure you have the right datatypes in pandas, as pandas will automatically create the table in our database.
Generate the DDL CREATE statement from pandas for a sanity check.
Create a connection to our database using SQLAlchemy
Convert our huge paraquet file into a iterable that has batches of 100,000 rows and load it into our database.

In [3]:
import pandas as pd 
import pyarrow.parquet as pq
from time import time

In [6]:
import os
print(os.getcwd())
print(os.listdir())

/home/iamomowale/dtc_de_zoomcamp_2025/01-docker-terraform/i_docker_sql
['.ipynb_checkpoints', 'Untitled.ipynb', 'yellow_tripdata_2021-01.parquet', 'ny_taxi_postgres_data', 'Dockerfile', 'docker-compose.yaml']


In [7]:
# Read metadata 
pq.read_metadata('yellow_tripdata_2021-01.parquet')

<pyarrow._parquet.FileMetaData object at 0x7ca804f18130>
  created_by: parquet-cpp-arrow version 7.0.0
  num_columns: 19
  num_rows: 1369769
  num_row_groups: 1
  format_version: 1.0
  serialized_size: 10382

In [8]:
# Read file, read the table from file and check schema
file = pq.ParquetFile('yellow_tripdata_2021-01.parquet')
table = file.read()
table.schema

VendorID: int64
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
RatecodeID: double
store_and_fwd_flag: string
PULocationID: int64
DOLocationID: int64
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 2492

In [28]:
# Convert to pandas and check data 
df = table.to_pandas()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 76518 entries, 0 to 76517
Data columns (total 20 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   VendorID               76518 non-null  int64         
 1   lpep_pickup_datetime   76518 non-null  datetime64[us]
 2   lpep_dropoff_datetime  76518 non-null  datetime64[us]
 3   store_and_fwd_flag     40471 non-null  object        
 4   RatecodeID             40471 non-null  float64       
 5   PULocationID           76518 non-null  int64         
 6   DOLocationID           76518 non-null  int64         
 7   passenger_count        40471 non-null  float64       
 8   trip_distance          76518 non-null  float64       
 9   fare_amount            76518 non-null  float64       
 10  extra                  76518 non-null  float64       
 11  mta_tax                76518 non-null  float64       
 12  tip_amount             76518 non-null  float64       
 13  t

In [14]:
pip install psycopg2-binary

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [16]:
# Create an open SQL database connection object or a SQLAlchemy connectable


from sqlalchemy import create_engine
engine = create_engine("postgresql://root:root@localhost:5435/ny_taxi")
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7ca7da4680a0>

In [32]:
# Generate CREATE SQL statement from schema for validation
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" BIGINT, 
	lpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	lpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" FLOAT(53), 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee TEXT, 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type FLOAT(53), 
	trip_type FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [33]:
#This part is for testing

# Creating batches of 100,000 for the paraquet file
batches_iter = file.iter_batches(batch_size=100000)
batches_iter

# Take the first batch for testing
df = next(batches_iter).to_pandas()
df

# Creating just the table in postgres
#df.head(0).to_sql(name='ny_taxi_data',con=engine, if_exists='replace')

Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge
0,2,2021-01-01 00:15:56,2021-01-01 00:19:52,N,1.0,43,151,1.0,1.01,5.50,0.50,0.5,0.00,0.00,,0.3,6.80,2.0,1.0,0.00
1,2,2021-01-01 00:25:59,2021-01-01 00:34:44,N,1.0,166,239,1.0,2.53,10.00,0.50,0.5,2.81,0.00,,0.3,16.86,1.0,1.0,2.75
2,2,2021-01-01 00:45:57,2021-01-01 00:51:55,N,1.0,41,42,1.0,1.12,6.00,0.50,0.5,1.00,0.00,,0.3,8.30,1.0,1.0,0.00
3,2,2020-12-31 23:57:51,2021-01-01 00:04:56,N,1.0,168,75,1.0,1.99,8.00,0.50,0.5,0.00,0.00,,0.3,9.30,2.0,1.0,0.00
4,2,2021-01-01 00:16:36,2021-01-01 00:16:40,N,2.0,265,265,3.0,0.00,-52.00,0.00,-0.5,0.00,0.00,,-0.3,-52.80,3.0,1.0,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
76513,2,2021-01-31 21:38:00,2021-01-31 22:16:00,,,81,90,,17.63,56.23,2.75,0.0,0.00,6.12,,0.3,65.40,,,
76514,2,2021-01-31 22:43:00,2021-01-31 23:21:00,,,35,213,,18.36,46.66,0.00,0.0,12.20,6.12,,0.3,65.28,,,
76515,2,2021-01-31 22:16:00,2021-01-31 22:27:00,,,74,69,,2.50,18.95,2.75,0.0,0.00,0.00,,0.3,22.00,,,
76516,2,2021-01-31 23:10:00,2021-01-31 23:37:00,,,168,215,,14.48,48.87,2.75,0.0,0.00,6.12,,0.3,58.04,,,


In [19]:
# Insert values into the table 
t_start = time()
count = 0
for batch in file.iter_batches(batch_size=100000):
    count+=1
    batch_df = batch.to_pandas()
    print(f'inserting batch {count}...')
    b_start = time()
    
    batch_df.to_sql(name='ny_taxi_data',con=engine, if_exists='append')
    b_end = time()
    print(f'inserted! time taken {b_end-b_start:10.3f} seconds.\n')
    
t_end = time()   
print(f'Completed! Total time taken was {t_end-t_start:10.3f} seconds for {count} batches.')

inserting batch 1...
inserted! time taken     10.072 seconds.

inserting batch 2...
inserted! time taken     10.300 seconds.

inserting batch 3...
inserted! time taken     10.355 seconds.

inserting batch 4...
inserted! time taken     12.662 seconds.

inserting batch 5...
inserted! time taken     11.208 seconds.

inserting batch 6...
inserted! time taken     11.374 seconds.

inserting batch 7...
inserted! time taken     12.000 seconds.

inserting batch 8...
inserted! time taken      9.888 seconds.

inserting batch 9...
inserted! time taken      9.945 seconds.

inserting batch 10...
inserted! time taken      9.939 seconds.

inserting batch 11...
inserted! time taken     10.439 seconds.

inserting batch 12...
inserted! time taken     11.610 seconds.

inserting batch 13...
inserted! time taken     10.649 seconds.

inserting batch 14...
inserted! time taken      6.273 seconds.

Completed! Total time taken was    147.296 seconds for 14 batches.


In [20]:
from time import time
import pandas as pd 
import pyarrow.parquet as pq
from sqlalchemy import create_engine

In [21]:
url = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv'
url = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet'

file_name = url.rsplit('/', 1)[-1].strip()
file_name

'yellow_tripdata_2021-01.parquet'

In [23]:
#This code is a rough code and seems to be working. The cleaned up version will be in data-loading-parquet.py file.

if '.csv' in file_name:
    print('yay') 
    df = pd.read_csv(file_name, nrows=10)
    df_iter = pd.read_csv(file_name, iterator=True, chunksize=100000)
elif '.parquet' in file_name:
    print('oh yea')
    file = pq.ParquetFile(file_name)
    df = next(file.iter_batches(batch_size=10)).to_pandas()
    df_iter = file.iter_batches(batch_size=100000)
else: 
    print('Error. Only .csv or .parquet files allowed.')
    sys.exit() 

oh yea


In [43]:
from sqlalchemy import create_engine
import pandas as pd
import pyarrow.parquet as pq

engine = create_engine('postgresql://root:root@localhost:5435/ny_taxi')

In [45]:
import pyarrow.parquet as pq
from time import time

green_file = 'green_tripdata_2019-10.parquet'
green_pf = pq.ParquetFile(green_file)

t_start = time()
count = 0

for batch in green_pf.iter_batches(batch_size=100000):
    count += 1
    batch_df = batch.to_pandas()
    print(f'inserting batch {count} into green_taxi_rides...')
    b_start = time()

    batch_df.to_sql(name='green_taxi_rides', con=engine, if_exists='append', index=False)
    b_end = time()
    print(f'inserted! time taken {b_end - b_start:10.3f} seconds.\n')

t_end = time()
print(f'Completed! Total time taken was {t_end - t_start:10.3f} seconds for {count} batches.')


inserting batch 1 into green_taxi_rides...
inserted! time taken     10.497 seconds.

inserting batch 2 into green_taxi_rides...
inserted! time taken      9.424 seconds.

inserting batch 3 into green_taxi_rides...
inserted! time taken     10.165 seconds.

inserting batch 4 into green_taxi_rides...
inserted! time taken      9.750 seconds.

inserting batch 5 into green_taxi_rides...
inserted! time taken      6.189 seconds.

Completed! Total time taken was     46.377 seconds for 5 batches.


In [40]:
zone_df = pd.read_csv('taxi_zone_lookup.csv')
zone_df.to_sql('taxi_zones', con=engine, if_exists='replace', index=False)
print('Taxi zone lookup uploaded successfully.')

Taxi zone lookup uploaded successfully.
