In [1]:
import pandas as pd

In [2]:
from sqlalchemy import create_engine

In [3]:
df_zones = pd.read_csv('../data/taxi_zone_lookup.csv')

In [4]:
df_taxi = pd.read_parquet('../data/green_tripdata_2025-11.parquet')

In [5]:
df_zones.head()

Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [6]:
df_taxi.info()

<class 'pandas.DataFrame'>
RangeIndex: 46912 entries, 0 to 46911
Data columns (total 21 columns):
 #   Column                 Non-Null Count  Dtype         
---  ------                 --------------  -----         
 0   VendorID               46912 non-null  int32         
 1   lpep_pickup_datetime   46912 non-null  datetime64[us]
 2   lpep_dropoff_datetime  46912 non-null  datetime64[us]
 3   store_and_fwd_flag     41343 non-null  str           
 4   RatecodeID             41343 non-null  float64       
 5   PULocationID           46912 non-null  int32         
 6   DOLocationID           46912 non-null  int32         
 7   passenger_count        41343 non-null  float64       
 8   trip_distance          46912 non-null  float64       
 9   fare_amount            46912 non-null  float64       
 10  extra                  46912 non-null  float64       
 11  mta_tax                46912 non-null  float64       
 12  tip_amount             46912 non-null  float64       
 13  tolls_amount

In [7]:
# Connection: postgresql://[user]:[password]@[hostname]:[port]/[database]
# Port 5433 is used to avoid conflicts with local Postgres installations
engine = create_engine('postgresql://admin:password123@localhost:5433/ny_taxi')

try:
    with engine.connect() as conn:
        print("✅ Connection Successful!")
except Exception as e:
    print(f"❌ Connection failed: {e}")

✅ Connection Successful!


In [8]:
# Check for negative values in columns that should be positive
inconsistent_data = df_taxi[(df_taxi.trip_distance <= 0) | (df_taxi.fare_amount < 0)]
print(f"Rows with inconsistent metrics: {len(inconsistent_data)}")

# Preview the DDL (Data Definition Language)
# This shows exactly how SQL will structure the table
print(pd.io.sql.get_schema(df_taxi, name='green_taxi_trips', con=engine))

Rows with inconsistent metrics: 1580

CREATE TABLE green_taxi_trips (
	"VendorID" INTEGER, 
	lpep_pickup_datetime TIMESTAMP WITHOUT TIME ZONE, 
	lpep_dropoff_datetime TIMESTAMP WITHOUT TIME ZONE, 
	store_and_fwd_flag TEXT, 
	"RatecodeID" FLOAT(53), 
	"PULocationID" INTEGER, 
	"DOLocationID" INTEGER, 
	passenger_count FLOAT(53), 
	trip_distance FLOAT(53), 
	fare_amount FLOAT(53), 
	extra FLOAT(53), 
	mta_tax FLOAT(53), 
	tip_amount FLOAT(53), 
	tolls_amount FLOAT(53), 
	ehail_fee FLOAT(53), 
	improvement_surcharge FLOAT(53), 
	total_amount FLOAT(53), 
	payment_type FLOAT(53), 
	trip_type FLOAT(53), 
	congestion_surcharge FLOAT(53), 
	cbd_congestion_fee FLOAT(53)
)




In [9]:
# Cast VendorID to 'Int64' (Capital 'I' allows for Nulls/NaNs)
df_taxi['VendorID'] = df_taxi['VendorID'].astype('Int64')
df_taxi['PULocationID'] = df_taxi['PULocationID'].astype('Int64')
df_taxi['DOLocationID'] = df_taxi['DOLocationID'].astype('Int64')

# Remove obvious outliers for the pilot load to keep data 'clean'
df_taxi_clean = df_taxi[df_taxi.fare_amount >= 0].head(100)

In [10]:
# Load Zones (Dimension Table)
df_zones.to_sql(name='zones', con=engine, if_exists='replace', index=False)

# Load Taxi Sample (Fact Table)
df_taxi_clean.to_sql(name='green_taxi_trips', con=engine, if_exists='replace', index=False)

print("Ingestion successful. Tables created: 'zones' and 'green_taxi_trips'.")

Ingestion successful. Tables created: 'zones' and 'green_taxi_trips'.


In [11]:
query = """
SELECT 
    t.lpep_pickup_datetime, 
    z."Zone" AS pickup_zone, 
    t.trip_distance
FROM green_taxi_trips t
JOIN zones z ON t."PULocationID" = z."LocationID"
LIMIT 5;
"""
pd.read_sql(query, con=engine)

Unnamed: 0,lpep_pickup_datetime,pickup_zone,trip_distance
0,2025-11-01 00:51:48,Astoria,4.67
1,2025-11-01 00:51:04,Astoria,0.73
2,2025-11-01 01:38:50,Astoria,0.57
3,2025-11-01 01:22:45,Astoria,0.96
4,2025-11-01 00:48:32,Belmont,26.03


In [12]:
engine.dispose()
print("Connection closed. Exploration results ready for script automation.")

Connection closed. Exploration results ready for script automation.
