# Download dataset

In [3]:
## download
# !wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
# !wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
# !wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-01.parquet

## extract (removes .gz and leaves CSV)
# !gunzip yellow_tripdata_2021-01.csv.gz
# !gunzip yellow_tripdata_2025-01.parquet

# Install dependencies

In [2]:
## check python version 
# !python --version

## Install required packages
# !pip install sqlalchemy psycopg2-binary pandas
# !pip install pyarrow 

# Test database connection

In [1]:
from sqlalchemy import create_engine
import pandas as pd

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

query = """
        SELECT *
        FROM pg_catalog.pg_tables
        WHERE schemaname != 'pg_catalog' AND 
            schemaname != 'information_schema';
        """
print(pd.read_sql(query, con=engine))

Empty DataFrame
Columns: [schemaname, tablename, tableowner, tablespace, hasindexes, hasrules, hastriggers, rowsecurity]
Index: []


In [2]:
import os 

os.getcwd()

'/workspaces/DataEngineering/Docker + SQL'

In [3]:
!ls

1-Introduction-to-Docker.md		 connect-ingest-test.ipynb
2-Ingesting-NY-Taxi-Data-to-Postgres.md  ny_taxi_pg_data
Dockerfile				 taxi_zone_lookup.csv
Dockerfile.old				 yellow_tripdata_2021-01.csv
README.md				 yellow_tripdata_2025-01.parquet


In [4]:
# Check connection
with engine.connect() as conn: 
    query = """ SELECT 1 as number; """
    print(pd.read_sql(query, con=conn))

   number
0       1


# Ingest CSV File -> Postgres Database

In [1]:
import pandas as pd
from sqlalchemy import create_engine

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

In [6]:
df = pd.read_csv(
    "yellow_tripdata_2021-01.csv",
    # parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    low_memory=False  # avoids dtype warning
)

df.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1369765 entries, 0 to 1369764
Data columns (total 18 columns):
 #   Column                 Non-Null Count    Dtype  
---  ------                 --------------    -----  
 0   VendorID               1271413 non-null  float64
 1   tpep_pickup_datetime   1369765 non-null  object 
 2   tpep_dropoff_datetime  1369765 non-null  object 
 3   passenger_count        1271413 non-null  float64
 4   trip_distance          1369765 non-null  float64
 5   RatecodeID             1271413 non-null  float64
 6   store_and_fwd_flag     1271413 non-null  object 
 7   PULocationID           1369765 non-null  int64  
 8   DOLocationID           1369765 non-null  int64  
 9   payment_type           1271413 non-null  float64
 10  fare_amount            1369765 non-null  float64
 11  extra                  1369765 non-null  float64
 12  mta_tax                1369765 non-null  float64
 13  tip_amount             1369765 non-null  float64
 14  tolls_amount      

In [7]:
# convert datetime columns to appropriate format
df['tpep_pickup_datetime'] = pd.to_datetime(df['tpep_pickup_datetime'])
df['tpep_dropoff_datetime'] = pd.to_datetime(df['tpep_dropoff_datetime'])

df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1369765 entries, 0 to 1369764
Data columns (total 18 columns):
 #   Column                 Non-Null Count    Dtype         
---  ------                 --------------    -----         
 0   VendorID               1271413 non-null  float64       
 1   tpep_pickup_datetime   1369765 non-null  datetime64[ns]
 2   tpep_dropoff_datetime  1369765 non-null  datetime64[ns]
 3   passenger_count        1271413 non-null  float64       
 4   trip_distance          1369765 non-null  float64       
 5   RatecodeID             1271413 non-null  float64       
 6   store_and_fwd_flag     1271413 non-null  object        
 7   PULocationID           1369765 non-null  int64         
 8   DOLocationID           1369765 non-null  int64         
 9   payment_type           1271413 non-null  float64       
 10  fare_amount            1369765 non-null  float64       
 11  extra                  1369765 non-null  float64       
 12  mta_tax                13697

In [7]:
# generate schema
print(pd.io.sql.get_schema(df, name='yellow_tripdata_2021_01', con=engine)) # convert dataframe to DDL format


CREATE TABLE yellow_tripdata_2021_01 (
	"VendorID" BIGINT, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count BIGINT, 
	trip_distance FLOAT(53), 
	"RatecodeID" BIGINT, 
	store_and_fwd_flag TEXT, 
	"PULocationID" BIGINT, 
	"DOLocationID" BIGINT, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53)
)




In [None]:
# df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000)
# df_iter



<pandas.io.parsers.readers.TextFileReader at 0x789809d4aad0>

In [None]:
# next(df_iter)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2021-01-01 00:30:10,2021-01-01 00:36:12,1,2.10,1,N,142,43,2,8.0,3.0,0.5,0.00,0.0,0.3,11.80,2.5
1,1,2021-01-01 00:51:20,2021-01-01 00:52:19,1,0.20,1,N,238,151,2,3.0,0.5,0.5,0.00,0.0,0.3,4.30,0.0
2,1,2021-01-01 00:43:30,2021-01-01 01:11:06,1,14.70,1,N,132,165,1,42.0,0.5,0.5,8.65,0.0,0.3,51.95,0.0
3,1,2021-01-01 00:15:48,2021-01-01 00:31:01,0,10.60,1,N,138,132,1,29.0,0.5,0.5,6.05,0.0,0.3,36.35,0.0
4,2,2021-01-01 00:31:49,2021-01-01 00:48:21,1,4.94,1,N,68,33,1,16.5,0.5,0.5,4.06,0.0,0.3,24.36,2.5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
99995,1,2021-01-04 14:04:31,2021-01-04 14:08:52,3,0.70,1,N,234,224,2,5.0,2.5,0.5,0.00,0.0,0.3,8.30,2.5
99996,1,2021-01-04 14:18:46,2021-01-04 14:35:45,2,3.30,1,N,234,236,1,14.5,2.5,0.5,3.55,0.0,0.3,21.35,2.5
99997,1,2021-01-04 14:42:41,2021-01-04 14:59:22,2,4.70,1,N,236,79,1,17.0,2.5,0.5,4.05,0.0,0.3,24.35,2.5
99998,2,2021-01-04 14:39:02,2021-01-04 15:09:37,2,17.95,2,N,132,148,1,52.0,0.0,0.5,5.00,0.0,0.3,60.30,2.5


In [None]:
# count = 0
# while True:
#     try:
#         chunk = next(df_iter)
#         print(f'batch_count: {count+1} data_len:{len(chunk)}')
#         count += 1
#     except StopIteration:
#         break

batch_count: 1 data_len:100000
batch_count: 2 data_len:100000
batch_count: 3 data_len:100000
batch_count: 4 data_len:100000
batch_count: 5 data_len:100000
batch_count: 6 data_len:100000
batch_count: 7 data_len:100000
batch_count: 8 data_len:100000
batch_count: 9 data_len:100000
batch_count: 10 data_len:100000
batch_count: 11 data_len:100000
batch_count: 12 data_len:100000
batch_count: 13 data_len:69765


  chunk = next(df_iter)


In [2]:
# load data and divide into chucks. ingest in one whole load will crash the kernel
df_iter = pd.read_csv('yellow_tripdata_2021-01.csv', iterator=True, chunksize=100000, parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"])

# get first chunk
df = next(df_iter) 

# first load data header
df.head(0).to_sql(name='yellow_tripdata_2021_01', con=engine, if_exists='replace', index=False)

0

In [4]:
# insert row by row
%time df.to_sql(name='yellow_tripdata_2021_01', con=engine, if_exists='append', index=False)

CPU times: user 4.91 s, sys: 107 ms, total: 5.01 s
Wall time: 7.88 s


1000

In [3]:
from time import time

# insert row by row
count = 0
while True:
    t_start = time()
    try:
        chunk = next(df_iter)
        # print(f'batch_count: {count+1} data_len:{len(chunk)}')
        chunk.to_sql(name='yellow_tripdata_2021_01', con=engine, if_exists='append', index=False)
        count+=1 
        print(f'Inserted batch {count} took {time() - t_start:.2f} seconds')
    except StopIteration:
        break

Inserted batch 1 took 8.63 seconds
Inserted batch 2 took 8.49 seconds
Inserted batch 3 took 8.42 seconds
Inserted batch 4 took 8.54 seconds
Inserted batch 5 took 8.79 seconds
Inserted batch 6 took 8.58 seconds
Inserted batch 7 took 8.22 seconds
Inserted batch 8 took 8.75 seconds
Inserted batch 9 took 8.12 seconds
Inserted batch 10 took 8.32 seconds
Inserted batch 11 took 8.50 seconds


  chunk = next(df_iter)


Inserted batch 12 took 9.78 seconds
Inserted batch 13 took 5.35 seconds


# Ingest PARQUET File -> Postgres Database

Here we will be using the `.parquet `file we downloaded and do the following:
- **Check metadata** and table datatypes of the paraquet file/table
- **Convert the paraquet file to pandas dataframe** and check the datatypes. Additionally check the data dictionary to make sure you have the right datatypes in pandas, as pandas will automatically create the table in our database.
- **Generate the DDL CREATE statement**from pandas for a sanity check.
- **Create a connection** to our database using SQLAlchemy
- **Convert our huge parquet file into a iterable** that has batches of 100,000 rows and load it into our database.

In [4]:
import pandas as pd 
import pyarrow.parquet as pq
from time import time

In [5]:
# Read metadata 
pq.read_metadata('yellow_tripdata_2025-01.parquet')

<pyarrow._parquet.FileMetaData object at 0x7aa5b18e7100>
  created_by: parquet-cpp-arrow version 16.1.0
  num_columns: 20
  num_rows: 3475226
  num_row_groups: 4
  format_version: 2.6
  serialized_size: 11212

In [7]:
# Read file, read the table from file and check schema
file = pq.ParquetFile('yellow_tripdata_2025-01.parquet')
table = file.read()
table.schema

VendorID: int32
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: int64
trip_distance: double
RatecodeID: int64
store_and_fwd_flag: large_string
PULocationID: int32
DOLocationID: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
Airport_fee: double
cbd_congestion_fee: double

In [8]:
# Convert to pandas and check data 
df = table.to_pandas()
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3475226 entries, 0 to 3475225
Data columns (total 20 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   VendorID               int32         
 1   tpep_pickup_datetime   datetime64[us]
 2   tpep_dropoff_datetime  datetime64[us]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   RatecodeID             float64       
 6   store_and_fwd_flag     object        
 7   PULocationID           int32         
 8   DOLocationID           int32         
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount           float64       
 17  congestion_surcharge   float64       
 18  Airport_fee           

In [9]:
'''
We need to first create the connection to our postgres database. 
We can feed the connection information to generate the CREATE SQL query for the specific server. 
SQLAlchemy supports a variety of servers.
'''

# Create an open SQL database connection object or a SQLAlchemy connectable
from sqlalchemy import create_engine

engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')
engine.connect()

<sqlalchemy.engine.base.Connection at 0x7aa5bc3a7290>

In [10]:
# Generate CREATE SQL statement from schema for validation
print(pd.io.sql.get_schema(df, name='yellow_taxi_data', con=engine))


CREATE TABLE yellow_taxi_data (
	"VendorID" INTEGER, 
	tpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	tpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	"RatecodeID" FLOAT(53), 
	store_and_fwd_flag TEXT, 
	"PULocationID" INTEGER, 
	"DOLocationID" INTEGER, 
	payment_type BIGINT, 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	"Airport_fee" FLOAT(53), 
	cbd_congestion_fee FLOAT(53)
)




Datatypes for the table looks good! Since we used paraquet file the datasets seem to have been preserved. You may have to convert some datatypes so it is always good to do this check.

In [11]:
df.shape

(3475226, 20)

There are 3,475,226 rows in our dataset. We are going to use the parquet_file.iter_batches() function to create batches of 100,000, convert them into pandas and then load it into the postgres database.

In [None]:
#This part is for testing


# Creating batches of 100,000 for the paraquet file
batches_iter = file.iter_batches(batch_size=100000)
batches_iter

# Take the first batch for testing
df = next(batches_iter).to_pandas()
df

# Creating just the table in postgres
df.head(0).to_sql(name='ny_taxi_data',con=engine, if_exists='replace')

0

In [14]:
# Insert values into the table 
t_start = time()
count = 0
for batch in file.iter_batches(batch_size=100000):
    count+=1
    batch_df = batch.to_pandas()
    print(f'inserting batch {count}...')
    b_start = time()
    
    batch_df.to_sql(name='ny_taxi_data',con=engine, if_exists='append')
    b_end = time()
    print(f'inserted! time taken {b_end-b_start:10.3f} seconds.\n')
    
t_end = time()   
print(f'Completed! Total time taken was {t_end-t_start:10.3f} seconds for {count} batches.') 

inserting batch 1...
inserted! time taken      9.466 seconds.

inserting batch 2...
inserted! time taken      9.216 seconds.

inserting batch 3...
inserted! time taken      9.674 seconds.

inserting batch 4...
inserted! time taken      8.957 seconds.

inserting batch 5...
inserted! time taken      9.322 seconds.

inserting batch 6...
inserted! time taken      9.811 seconds.

inserting batch 7...
inserted! time taken      9.331 seconds.

inserting batch 8...
inserted! time taken      9.520 seconds.

inserting batch 9...
inserted! time taken      9.891 seconds.

inserting batch 10...
inserted! time taken      9.993 seconds.

inserting batch 11...
inserted! time taken     10.292 seconds.

inserting batch 12...
inserted! time taken      9.838 seconds.

inserting batch 13...
inserted! time taken      9.933 seconds.

inserting batch 14...
inserted! time taken     10.224 seconds.

inserting batch 15...
inserted! time taken      9.579 seconds.

inserting batch 16...
inserted! time taken     10

# pgcli command

In [None]:
# pgcli -h localhost -p 5432 -u root -d ny_taxi
# \dt
# \d yellow_tripdata_2021_01
# select * from yellow_tripdata_2021_01 limit 10;
# select count(*) from yellow_tripdata_2021_01;