# Green Taxi Data Pipeline - PostgreSQL Integration
Load November 2025 green taxi data from parquet and insert into PostgreSQL database running in Docker

## Section 1: Import Required Libraries

In [1]:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
import os
from pathlib import Path

print("Libraries imported successfully")

Libraries imported successfully


## Section 2: Load Parquet Data

In [2]:
# Load the green taxi parquet file
parquet_path = Path('/workspaces/Data-Engineering-Zoomcamp/pipeline/data/green_tripdata_2025-11.parquet')

print(f"Loading parquet file: {parquet_path}")
df = pd.read_parquet(parquet_path)

print(f"\nDataFrame loaded successfully!")
print(f"Shape: {df.shape}")
print(f"\nColumns: {df.columns.tolist()}")
print(f"\nFirst few rows:")
df.head()

Loading parquet file: /workspaces/Data-Engineering-Zoomcamp/pipeline/data/green_tripdata_2025-11.parquet

DataFrame loaded successfully!
Shape: (46912, 21)

Columns: ['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'ehail_fee', 'improvement_surcharge', 'total_amount', 'payment_type', 'trip_type', 'congestion_surcharge', 'cbd_congestion_fee']

First few rows:


Unnamed: 0,VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,...,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,cbd_congestion_fee
0,2,2025-11-01 00:34:48,2025-11-01 00:41:39,N,1.0,74,42,1.0,0.74,7.2,...,0.5,1.94,0.0,,1.0,11.64,1.0,1.0,0.0,0.0
1,2,2025-11-01 00:18:52,2025-11-01 00:24:27,N,1.0,74,42,2.0,0.95,7.2,...,0.5,0.0,0.0,,1.0,9.7,2.0,1.0,0.0,0.0
2,2,2025-11-01 01:03:14,2025-11-01 01:15:24,N,1.0,83,160,1.0,2.19,13.5,...,0.5,5.0,0.0,,1.0,21.0,1.0,1.0,0.0,0.0
3,2,2025-11-01 00:10:57,2025-11-01 00:24:53,N,1.0,166,127,1.0,5.44,24.7,...,0.5,0.5,0.0,,1.0,27.7,1.0,1.0,0.0,0.0
4,1,2025-11-01 00:03:48,2025-11-01 00:19:38,N,1.0,166,262,1.0,3.2,18.4,...,1.5,1.0,0.0,,1.0,24.65,1.0,1.0,2.75,0.0


## Section 3: Establish PostgreSQL Connection

In [3]:
# PostgreSQL connection parameters
db_config = {
    'host': 'localhost',
    'port': 5432,
    'database': 'ny_taxi',
    'user': 'root',
    'password': 'root'
}

# Create connection string for SQLAlchemy
connection_string = f"postgresql://{db_config['user']}:{db_config['password']}@{db_config['host']}:{db_config['port']}/{db_config['database']}"

print("PostgreSQL Connection Parameters:")
print(f"  Host: {db_config['host']}")
print(f"  Port: {db_config['port']}")
print(f"  Database: {db_config['database']}")
print(f"  User: {db_config['user']}")

# Create SQLAlchemy engine
try:
    engine = create_engine(connection_string)
    connection = engine.connect()
    print("\nâœ“ Successfully connected to PostgreSQL!")
    connection.close()
except Exception as e:
    print(f"\nâœ— Connection failed: {e}")
    print("Make sure the PostgreSQL container is running with:")
    print("  docker run -it --rm -e POSTGRES_USER='root' -e POSTGRES_PASSWORD='root' -e POSTGRES_DB='ny_taxi' -v ny_taxi_postgres_data:/var/lib/postgresql -p 5432:5432 postgres:18")

PostgreSQL Connection Parameters:
  Host: localhost
  Port: 5432
  Database: ny_taxi
  User: root

âœ“ Successfully connected to PostgreSQL!


## Section 4: Create Table and Insert Data

In [4]:
try:
    # Create engine and insert data into PostgreSQL
    engine = create_engine(connection_string)
    
    # Convert datetime columns to ensure proper formatting
    df_copy = df.copy()
    
    # Insert data into PostgreSQL table
    # if_exists: 'replace' will drop and recreate the table; 'append' adds to existing; 'fail' raises error
    print("Inserting data into PostgreSQL table 'green_taxi'...")
    df_copy.to_sql('green_taxi', engine, if_exists='replace', index=False, chunksize=5000)
    
    print(f"âœ“ Successfully inserted {len(df)} rows into 'green_taxi' table!")
    
except Exception as e:
    print(f"âœ— Error inserting data: {e}")

Inserting data into PostgreSQL table 'green_taxi'...
âœ“ Successfully inserted 46912 rows into 'green_taxi' table!


## Section 5: Execute SQL Queries

In [5]:
# Check table size and row count
try:
    engine = create_engine(connection_string)
    
    query = "SELECT COUNT(*) as row_count FROM green_taxi;"
    result = pd.read_sql(query, engine)
    print(f"âœ“ Table 'green_taxi' contains {result['row_count'][0]} rows")
    
except Exception as e:
    print(f"âœ— Error querying table: {e}")

âœ“ Table 'green_taxi' contains 46912 rows


In [6]:
# Display query results as DataFrame - Sample data
try:
    engine = create_engine(connection_string)
    
    query = "SELECT * FROM green_taxi LIMIT 5;"
    sample_df = pd.read_sql(query, engine)
    print("Sample rows from green_taxi table:")
    print(sample_df)
    
except Exception as e:
    print(f"âœ— Error reading sample data: {e}")

Sample rows from green_taxi table:
   VendorID lpep_pickup_datetime lpep_dropoff_datetime store_and_fwd_flag  \
0         2  2025-11-01 00:34:48   2025-11-01 00:41:39                  N   
1         2  2025-11-01 00:18:52   2025-11-01 00:24:27                  N   
2         2  2025-11-01 01:03:14   2025-11-01 01:15:24                  N   
3         2  2025-11-01 00:10:57   2025-11-01 00:24:53                  N   
4         1  2025-11-01 00:03:48   2025-11-01 00:19:38                  N   

   RatecodeID  PULocationID  DOLocationID  passenger_count  trip_distance  \
0         1.0            74            42              1.0           0.74   
1         1.0            74            42              2.0           0.95   
2         1.0            83           160              1.0           2.19   
3         1.0           166           127              1.0           5.44   
4         1.0           166           262              1.0           3.20   

   fare_amount  ...  mta_tax  tip_amoun

In [7]:
# # Aggregate queries - Revenue analysis
# try:
#     engine = create_engine(connection_string)
    
#     # Total revenue by payment type
#     query = """
#     SELECT 
#         payment_type,
#         COUNT(*) as trip_count,
#         ROUND(SUM(total_amount)::numeric, 2) as total_revenue,
#         ROUND(AVG(total_amount)::numeric, 2) as avg_fare
#     FROM green_taxi
#     GROUP BY payment_type
#     ORDER BY total_revenue DESC;
#     """
    
#     revenue_df = pd.read_sql(query, engine)
#     print("\nðŸ“Š Revenue Analysis by Payment Type:")
#     print(revenue_df)
    
# except Exception as e:
#     print(f"âœ— Error in aggregate query: {e}")

In [8]:
# Question 3. For the trips in November 2025, how many trips had a trip_distance of less than or equal to 1 mile?
try:
    engine = create_engine(connection_string)
    
    query = """SELECT COUNT(*) AS trips_le_1_mile
                FROM green_taxi
                WHERE lpep_pickup_datetime >= '2025-11-01'
                AND lpep_pickup_datetime <  '2025-12-01'
                AND trip_distance <= 1;"""
    sample_df = pd.read_sql(query, engine)
    print("Question 3. For the trips in November 2025, how many trips had a trip_distance of less than or equal to 1 mile?:")
    print(sample_df)
    
except Exception as e:
    print(f"âœ— Error reading sample data: {e}")

Question 3. For the trips in November 2025, how many trips had a trip_distance of less than or equal to 1 mile?:
   trips_le_1_mile
0             8007


In [9]:
# Question 4. Which was the pick up day with the longest trip distance? Only consider trips with trip_distance less than 100 miles. 
try:
    engine = create_engine(connection_string)
    
    query = """SELECT CAST(lpep_pickup_datetime AS DATE) AS pickup_day,
                       MAX(trip_distance) AS max_dist
                FROM green_taxi
                WHERE lpep_pickup_datetime >= '2025-11-01'
                  AND lpep_pickup_datetime <  '2025-12-01'
                  AND trip_distance < 100
                GROUP BY 1
                ORDER BY max_dist DESC
                LIMIT 1;"""
    sample_df = pd.read_sql(query, engine)
    print("Which was the pick up day with the longest trip distance?:")
    print(sample_df)
    
except Exception as e:
    print(f"âœ— Error reading sample data: {e}")

Which was the pick up day with the longest trip distance?:
   pickup_day  max_dist
0  2025-11-14     88.03


## Section 7: Download and Ingest Taxi Zones Data

In [10]:
# Download taxi_zone_lookup.csv from GitHub using requests
import requests
import subprocess

url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv"
zones_download_path = '/workspaces/Data-Engineering-Zoomcamp/pipeline/data/taxi_zone_lookup_download.csv'

print("Downloading taxi_zone_lookup.csv from GitHub...")
print(f"URL: {url}\n")

try:
    response = requests.get(url, timeout=30)
    response.raise_for_status()
    
    # Save the file
    with open(zones_download_path, 'wb') as f:
        f.write(response.content)
    
    file_size = Path(zones_download_path).stat().st_size
    print(f"âœ“ Download successful!")
    print(f"  Saved to: {zones_download_path}")
    print(f"  File size: {file_size:,} bytes")
    
except Exception as e:
    print(f"âœ— Download failed: {e}")
    print("  Trying alternative source...")
    zones_download_path = '/workspaces/Data-Engineering-Zoomcamp/pipeline/taxi_zone_lookup.csv'
    print(f"  Will use local file: {zones_download_path}")

Downloading taxi_zone_lookup.csv from GitHub...
URL: https://github.com/DataTalksClub/nyc-tlc-data/releases/download/misc/taxi_zone_lookup.csv

âœ“ Download successful!
  Saved to: /workspaces/Data-Engineering-Zoomcamp/pipeline/data/taxi_zone_lookup_download.csv
  File size: 12,322 bytes


In [11]:
# Load zones data from CSV
print("\nLoading zones data from CSV...")

# Try downloaded file first, then fall back to local
zones_path = zones_download_path if Path(zones_download_path).exists() else '/workspaces/Data-Engineering-Zoomcamp/pipeline/taxi_zone_lookup.csv'

zones_df = pd.read_csv(zones_path)

print(f"âœ“ Zones data loaded successfully!")
print(f"  Shape: {zones_df.shape}")
print(f"  Columns: {zones_df.columns.tolist()}")
print(f"\nFirst 5 rows:")
print(zones_df.head())


Loading zones data from CSV...
âœ“ Zones data loaded successfully!
  Shape: (265, 4)
  Columns: ['LocationID', 'Borough', 'Zone', 'service_zone']

First 5 rows:
   LocationID        Borough                     Zone service_zone
0           1            EWR           Newark Airport          EWR
1           2         Queens              Jamaica Bay    Boro Zone
2           3          Bronx  Allerton/Pelham Gardens    Boro Zone
3           4      Manhattan            Alphabet City  Yellow Zone
4           5  Staten Island            Arden Heights    Boro Zone


In [12]:
# Ingest zones data into PostgreSQL 'zones' table
try:
    engine = create_engine(connection_string)
    
    print("\nInserting zones data into PostgreSQL table 'zones'...")
    zones_df.to_sql('zones', engine, if_exists='replace', index=False, chunksize=5000)
    
    print(f"âœ“ Successfully inserted {len(zones_df)} zones into 'zones' table!")
    
except Exception as e:
    print(f"âœ— Error inserting zones data: {e}")


Inserting zones data into PostgreSQL table 'zones'...
âœ“ Successfully inserted 265 zones into 'zones' table!


In [13]:
# Verify zones table creation
try:
    engine = create_engine(connection_string)
    
    # Count zones in table
    count_query = "SELECT COUNT(*) as zone_count FROM zones;"
    count_result = pd.read_sql(count_query, engine)
    zone_count = count_result['zone_count'][0]
    
    print(f"\nâœ“ Zones table verified!")
    print(f"  Total zones: {zone_count}")
    
    # Display sample zones
    sample_query = "SELECT * FROM zones LIMIT 5;"
    sample_zones = pd.read_sql(sample_query, engine)
    print(f"\nSample zones from database:")
    print(sample_zones)
    
except Exception as e:
    print(f"âœ— Error verifying zones table: {e}")


âœ“ Zones table verified!
  Total zones: 265

Sample zones from database:
   LocationID        Borough                     Zone service_zone
0           1            EWR           Newark Airport          EWR
1           2         Queens              Jamaica Bay    Boro Zone
2           3          Bronx  Allerton/Pelham Gardens    Boro Zone
3           4      Manhattan            Alphabet City  Yellow Zone
4           5  Staten Island            Arden Heights    Boro Zone


In [14]:
# Join green_taxi trips with zones lookup for enriched analysis
# Note: PostgreSQL stores column names as lowercase
try:
    engine = create_engine(connection_string)
    
    query = """
    SELECT 
        g."VendorID",
        g."lpep_pickup_datetime",
        g."lpep_dropoff_datetime",
        g."trip_distance",
        g."fare_amount",
        g."total_amount",
        pz."Zone" as pickup_zone,
        pz."Borough" as pickup_borough,
        dz."Zone" as dropoff_zone,
        dz."Borough" as dropoff_borough
    FROM green_taxi g
    LEFT JOIN zones pz ON g."PULocationID" = pz."LocationID"
    LEFT JOIN zones dz ON g."DOLocationID" = dz."LocationID"
    LIMIT 10;
    """
    
    joined_result = pd.read_sql(query, engine)
    print("\nâœ“ Sample trips enriched with zone information:")
    print(joined_result)
    
except Exception as e:
    print(f"âœ— Error in join query: {e}")
    print("\n  Note: Column names are case-sensitive in PostgreSQL when using quotes.")


âœ“ Sample trips enriched with zone information:
   VendorID lpep_pickup_datetime lpep_dropoff_datetime  trip_distance  \
0         2  2025-11-01 00:34:48   2025-11-01 00:41:39           0.74   
1         2  2025-11-01 00:18:52   2025-11-01 00:24:27           0.95   
2         2  2025-11-01 01:03:14   2025-11-01 01:15:24           2.19   
3         2  2025-11-01 00:10:57   2025-11-01 00:24:53           5.44   
4         1  2025-11-01 00:03:48   2025-11-01 00:19:38           3.20   
5         1  2025-11-01 00:42:13   2025-11-01 01:04:50           5.10   
6         2  2025-11-01 00:05:41   2025-11-01 00:39:20           9.80   
7         2  2025-11-01 00:42:14   2025-11-01 01:13:20           5.01   
8         2  2025-11-01 00:03:08   2025-11-01 00:06:27           0.63   
9         2  2025-11-01 00:56:33   2025-11-01 01:01:34           1.15   

   fare_amount  total_amount          pickup_zone pickup_borough  \
0          7.2         11.64    East Harlem North      Manhattan   
1         

In [18]:
# Question 5. Which was the pickup zone with the largest total_amount (sum of all trips) on November 18th, 2025? 
try:
    engine = create_engine(connection_string)
    
    query = """SELECT pz."Zone" AS pickup_zone,
                       SUM(g."total_amount") AS total_amt
                FROM green_taxi g
                LEFT JOIN zones pz ON g."PULocationID" = pz."LocationID"
                LEFT JOIN zones dz ON g."DOLocationID" = dz."LocationID"
                WHERE CAST(g."lpep_pickup_datetime" AS DATE) = '2025-11-18'
                GROUP BY 1
                ORDER BY total_amt DESC
                LIMIT 1;
            """
    sample_df = pd.read_sql(query, engine)
    print("Which was the pickup zone with the largest total_amount :")
    print(sample_df)
    
except Exception as e:
    print(f"âœ— Error reading sample data: {e}")

Which was the pickup zone with the largest total_amount :
         pickup_zone  total_amt
0  East Harlem North    9281.92


In [19]:
# Question 6: For passengers picked up in "East Harlem North" in Nov 2025, 
# which drop-off zone had the largest tip?

try:
    engine = create_engine(connection_string)
    
    query = """
    SELECT 
        dz."Zone" as dropoff_zone,
        t."tip_amount",
        t."lpep_pickup_datetime",
        pz."Zone" as pickup_zone
    FROM green_taxi t
    JOIN zones pz ON t."PULocationID" = pz."LocationID"
    JOIN zones dz ON t."DOLocationID" = dz."LocationID"
    WHERE pz."Zone" = 'East Harlem North'
        AND EXTRACT(YEAR FROM t."lpep_pickup_datetime") = 2025
        AND EXTRACT(MONTH FROM t."lpep_pickup_datetime") = 11
    ORDER BY t."tip_amount" DESC
    LIMIT 1;
    """
    
    result_q6 = pd.read_sql(query, engine)
    
    if not result_q6.empty:
        print("âœ“ Question 6: Drop-off zone with largest tip from 'East Harlem North' (Nov 2025)")
        print(result_q6)
    else:
        print("âš  No trips found from 'East Harlem North' in November 2025")
    
except Exception as e:
    print(f"âœ— Error: {e}")
    print("\nðŸ’¡ Verify the exact zone name using:")
    print("   SELECT DISTINCT \"Zone\" FROM zones ORDER BY \"Zone\";")


âœ“ Question 6: Drop-off zone with largest tip from 'East Harlem North' (Nov 2025)
     dropoff_zone  tip_amount lpep_pickup_datetime        pickup_zone
0  Yorkville West       81.89  2025-11-30 16:30:27  East Harlem North
