# Data ingestion into Postgres 

In this file, we will perform the ingestion of the **Yellow Taxi Trips Records - January 2025** from [NYC TLC Trip Record Data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page).

With the dowloaded parquet file, we will:
- Check the metadata before consumption. 
- Understand if the schema datatypes needs normalization, and apply them.
- Ingest the dataset into the Postgres Database by chunks.

## Data exploration

In [11]:
import pyarrow.parquet as pq

In [12]:
parquet_file = pq.ParquetFile('yellow_tripdata_2025-01.parquet')

In [13]:
parquet_file.metadata

<pyarrow._parquet.FileMetaData object at 0x7f744af1b470>
  created_by: parquet-cpp-arrow version 16.1.0
  num_columns: 20
  num_rows: 3475226
  num_row_groups: 4
  format_version: 2.6
  serialized_size: 11212

The dataset contains approximately **3.47 million rows**, stored in **4 Parquet row groups**.
To avoid memory overload when processing on a *t3.medium AWS EC2 instance* (4 GB RAM), we will **load and insert the data in batches of 100,000 rows**.

This batch size provides a good balance between:

- Minimizing memory usage,
- Maintaining reasonable database insertion speed, and
- Allowing the process to resume efficiently in case of interruptions.

In [14]:
parquet_file.schema.to_arrow_schema() # parquet schema

VendorID: int32
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: int64
trip_distance: double
RatecodeID: int64
store_and_fwd_flag: large_string
PULocationID: int32
DOLocationID: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
Airport_fee: double
cbd_congestion_fee: double

## Verifying if schema changes from parquet to pandas to sql

In [15]:
iter = parquet_file.iter_batches(batch_size=100) # Taking only 100 rows, just to check schema
df = next(iter).to_pandas()
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1,1.60,1,N,229,237,1,10.0,3.5,0.5,3.00,0.00,1.0,18.00,2.5,0.0,0.0
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1,0.50,1,N,236,237,1,5.1,3.5,0.5,2.02,0.00,1.0,12.12,2.5,0.0,0.0
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1,0.60,1,N,141,141,1,5.1,3.5,0.5,2.00,0.00,1.0,12.10,2.5,0.0,0.0
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3,0.52,1,N,244,244,2,7.2,1.0,0.5,0.00,0.00,1.0,9.70,0.0,0.0,0.0
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3,0.66,1,N,244,116,2,5.8,1.0,0.5,0.00,0.00,1.0,8.30,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,1,2025-01-01 00:19:30,2025-01-01 00:27:25,0,1.00,1,N,211,158,1,9.3,3.5,0.5,2.85,0.00,1.0,17.15,2.5,0.0,0.0
96,1,2025-01-01 00:33:13,2025-01-01 00:40:08,0,1.20,1,N,158,68,1,8.6,3.5,0.5,1.00,0.00,1.0,14.60,2.5,0.0,0.0
97,1,2025-01-01 00:45:05,2025-01-01 01:20:32,0,1.80,1,N,68,50,1,28.2,3.5,0.5,8.30,0.00,1.0,41.50,2.5,0.0,0.0
98,2,2025-01-01 00:04:29,2025-01-01 00:55:58,9,31.97,5,N,132,265,2,90.0,0.0,0.0,0.00,20.32,1.0,111.32,0.0,0.0,0.0


In [16]:
df.info() # pandas schema

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 20 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   VendorID               100 non-null    int32         
 1   tpep_pickup_datetime   100 non-null    datetime64[us]
 2   tpep_dropoff_datetime  100 non-null    datetime64[us]
 3   passenger_count        100 non-null    int64         
 4   trip_distance          100 non-null    float64       
 5   RatecodeID             100 non-null    int64         
 6   store_and_fwd_flag     100 non-null    object        
 7   PULocationID           100 non-null    int32         
 8   DOLocationID           100 non-null    int32         
 9   payment_type           100 non-null    int64         
 10  fare_amount            100 non-null    float64       
 11  extra                  100 non-null    float64       
 12  mta_tax                100 non-null    float64       
 13  tip_am

In [17]:
import pandas as pd

print(pd.io.sql.get_schema(df, name='yellow_taxi_data')) #sql schema

CREATE TABLE "yellow_taxi_data" (
"VendorID" INTEGER,
  "tpep_pickup_datetime" TIMESTAMP,
  "tpep_dropoff_datetime" TIMESTAMP,
  "passenger_count" INTEGER,
  "trip_distance" REAL,
  "RatecodeID" INTEGER,
  "store_and_fwd_flag" TEXT,
  "PULocationID" INTEGER,
  "DOLocationID" INTEGER,
  "payment_type" INTEGER,
  "fare_amount" REAL,
  "extra" REAL,
  "mta_tax" REAL,
  "tip_amount" REAL,
  "tolls_amount" REAL,
  "improvement_surcharge" REAL,
  "total_amount" REAL,
  "congestion_surcharge" REAL,
  "Airport_fee" REAL,
  "cbd_congestion_fee" REAL
)


Datatypes seams to persist from parquet to sql. **It's important to check the schemas, so we can make castings if needed**.

## Ingesting data

### Connect to Postgres

In [18]:
# Import the create_engine function from SQLAlchemy
# SQLAlchemy is a library that allows Python to interact with databases
from sqlalchemy import create_engine

# Create a database engine (a connection object) that lets Python talk to PostgreSQL
# Format of the connection string is:
# 'postgresql://<username>:<password>@<host>:<port>/<database>'
engine = create_engine('postgresql://root:root@localhost:5432/ny_taxi')

Breakdown of the connection string:
- postgresql   → tells SQLAlchemy which database dialect to use
- root:root    → username and password (user = root, password = root)
- @localhost   → the database is running locally (on the same machine)
- :5432        → the port PostgreSQL listens on (default is 5432)
- /ny_taxi     → the name of the database to connect to (ny_taxi)

### Iterate data consumtion

In [None]:
# Reset dataset in case it exists:
df.head(0).to_sql(name='yellow_taxi_data', con=engine, if_exists='replace')

In [21]:
# Loading the data in chunks of 100.000 rows to avoid overload:
parquet_iter = parquet_file.iter_batches(batch_size=100000)

In [22]:
from time import time

n = 0
t_start = time()
for batch in parquet_iter: # batch = next(parquet_iter)
    n+=1
    print(f'Ingesting batch {n}...')
    b_start = time()
    df_batch = batch.to_pandas()
    df_batch.to_sql(name='yellow_taxi_data', con=engine, if_exists='append')
    b_end = time()
    print(f'Time taken: {b_end-b_start:10.3f} seconds.\n')

t_end = time()  
print(f'The ingestion is finished! Total time taken: {t_end-t_start:10.3f} seconds for {n} batches.') 


Ingesting batch 1...
Time taken:     11.241 seconds.

Ingesting batch 2...
Time taken:     11.236 seconds.

Ingesting batch 3...
Time taken:     11.328 seconds.

Ingesting batch 4...
Time taken:     11.568 seconds.

Ingesting batch 5...
Time taken:     12.210 seconds.

Ingesting batch 6...
Time taken:     11.446 seconds.

Ingesting batch 7...
Time taken:     11.338 seconds.

Ingesting batch 8...
Time taken:     11.346 seconds.

Ingesting batch 9...
Time taken:     11.325 seconds.

Ingesting batch 10...
Time taken:     11.944 seconds.

Ingesting batch 11...
Time taken:     11.757 seconds.

Ingesting batch 12...
Time taken:     11.430 seconds.

Ingesting batch 13...
Time taken:     11.459 seconds.

Ingesting batch 14...
Time taken:     11.435 seconds.

Ingesting batch 15...
Time taken:     11.833 seconds.

Ingesting batch 16...
Time taken:     11.776 seconds.

Ingesting batch 17...
Time taken:     11.371 seconds.

Ingesting batch 18...
Time taken:     11.434 seconds.

Ingesting batch 19.