#### 01 â€“ Local Data Ingestion & Schema Mapping
> **Phase:** Research & Development (R&D)  
> **Source:** NYC Open Data (Green Taxi Trips)  
> **Destination:** Local PostgreSQL (Docker)

---
#### ðŸŽ¯ Goal
Stage raw NYC Green Taxi data into a local Postgres database using batch ingestion. This notebook serves as the **Proof of Concept (PoC)** to validate:
* **Connectivity:** Ensuring the Python environment can reach the Dockerized Postgres.
* **Schema Inference:** Validating that Parquet/CSV types map correctly to SQL types.
* **Performance:** Testing batch-loading patterns to handle large datasets without memory overflow.

---

In [None]:
import pandas as pd
from sqlalchemy import create_engine
import os
from dotenv import load_dotenv

#### Load Environment Configuration
> **Objective:** Securely load database credentials from a `.env` file.  
This practice ensures secrets stay out of source code and supports **environment portability** (e.g., switching from local to production).

In [None]:
# load credentials from .env file
load_dotenv(dotenv_path='../.env')

In [None]:
user = os.getenv('POSTGRES_USER')
password = os.getenv('POSTGRES_PASSWORD')
db = os.getenv('POSTGRES_DB')
host = 'localhost'
port = '5433'  

#### Establish Database Connection
Create a `SQLAlchemy` engine and verify connectivity to the Postgres instance running in Docker.

In [None]:
# create the SQLAlchemy engine
conn_url = f'postgresql://{user}:{password}@{host}:{port}/{db}'
engine = create_engine(conn_url)

In [None]:
# test connection
try:
    with engine.connect() as conn:
        print("Connection successful!")
except Exception as e:
    print(f"Connection failed: {e}")

#### Load Raw Source Data
* **Green Taxi Trips (Parquet):** Fact table containing trip-level records.
* **Zone Lookup (CSV):** Dimension table mapping location IDs to boroughs and zones.

In [None]:
green_taxi_path = '../data/green_tripdata_2025-11.parquet'
zones_lookup_path = '../data/taxi_zone_lookup.csv'

In [None]:
# load raw taxi trip data (fact table)
df_green_trips = pd.read_parquet(green_taxi_path)
# load zone lookup information (dimension table)
df_zones = pd.read_csv(zones_lookup_path)

#### Schema and Null Checks
Validate column data types and check for missing values before loading into Postgres.

In [None]:
# quick schema inspection to validate data types
df_green_trips.info()
df_zones.info()

In [None]:
# preview the first few rows
df_green_trips.head()

In [None]:
df_zones.head(2)

In [None]:
# null count per column
df_green_trips.isnull().sum()

In [None]:
df_zones.isnull().sum()

#### Preview Inferred SQL Schemas
Generate and review the inferred `CREATE TABLE` DDL to confirm table structure before creating tables in Postgres.

In [None]:
# DDL (Data Definition Language) 
print(pd.io.sql.get_schema(df_green_trips, name="green_taxi_trips", con=engine))
print(pd.io.sql.get_schema(df_zones, name="zones", con=engine))

#### Initialize Database Tables
Create empty table schemas in Postgres using `head(0)` to define structure without inserting data.

In [None]:
# create 'green_taxi_trips' table structure
df_green_trips.head(n=0).to_sql(
    name="green_taxi_trips",
    con=engine,
    if_exists="replace",
    index=False
)

In [None]:
# create the 'zones' table structure
df_zones.head(n=0).to_sql(
    name="zones",
    con=engine,
    if_exists="replace",
    index=False
)
print("Tables 'green_taxi_trips' and 'zones' initialized in Postgres.")

#### Load Dimension Table (`zones`)
Load the zone lookup table in a single operation since it is small and static.

In [None]:
# append zones data to the table
df_zones.to_sql(
    name="zones", 
    con=engine, 
    if_exists="append", 
    index=False
)

print(f"Loaded {len(df_zones)} records into the 'zones' table.")

#### Batch Load Fact Table (`green_taxi_trips`)
Stream the Parquet file in chunks to:
* **Avoid memory pressure:** Keep RAM usage low.
* **Support large datasets:** Process files bigger than system memory.
* **Mimic production:** Follow real-world ingestion patterns.

*Each batch is appended incrementally to the database.*

In [None]:
import pyarrow.parquet as pq

In [None]:
# use ParquetFile to stream chunks
parquet_file = pq.ParquetFile(green_taxi_path)

In [None]:
# iterate through batches of 100,000 rows
for batch in parquet_file.iter_batches(batch_size=100000):
    t_start = pd.Timestamp.now()
    
    batch_df = batch.to_pandas()
    
    # append the chunk to the existing table
    batch_df.to_sql(
        name='green_taxi_trips', 
        con=engine, 
        if_exists='append',
        index=False
    )
    
    t_end = pd.Timestamp.now()
    print(f"Inserted chunk... duration: {t_end - t_start}")

print("All data successfully loaded to Postgres!")