# 03 - Weather-Ridership Integration Pipeline
*Integration of Processed Subway Ridership and Weather Data*

---

## Objective
Integrate processed 2024 Manhattan subway ridership and weather data into a unified dataset for downstream feature engineering and predictive modeling.

---

## Input Data Sources

- `data/processed/subway_data_cleaned.parquet`  
  → Hourly ridership data (1,052,709 records, 121 stations)

- `data/processed/weather_data_cleaned.parquet`  
  → Hourly weather data (8,783 records, 13 features)

---

## Integration Goals

- Time-based merge using `transit_timestamp`
- Validate alignment across both datasets (timezones, hourly resolution)
- Output clean integrated dataset for modeling
- Prepare for exploratory analysis and feature development

In [1]:
# =========================================
# Weather-Ridership Integration Pipeline
# =========================================

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json
from pathlib import Path
from datetime import datetime, timedelta
import pytz
import warnings

# Suppress warnings for clean output
warnings.filterwarnings("ignore")

# Display settings
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", 100)
pd.set_option("display.float_format", "{:.2f}".format)
plt.style.use("default")
sns.set_palette("husl")

# Execution header
print("Weather-Ridership Integration Pipeline")
print("=" * 60)
print(f"Execution Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("Objective: Merge cleaned subway and weather data into unified dataset")
print("=" * 60)


Weather-Ridership Integration Pipeline
Execution Time: 2025-07-28 19:47:04
Objective: Merge cleaned subway and weather data into unified dataset


In [2]:
# =========================================
# Directory and File Configuration
# =========================================

# Base directory paths
PROJECT_DIR = Path(".").resolve().parents[0]
DATA_DIR = PROJECT_DIR / "data"
PROCESSED_DIR = DATA_DIR / "processed"
INTEGRATION_DIR = PROCESSED_DIR / "integration"
INTEGRATION_DIR.mkdir(parents=True, exist_ok=True)

# File paths
RIDERSHIP_FILE = PROCESSED_DIR / "subway_data_cleaned.parquet"
WEATHER_FILE = PROCESSED_DIR / "weather_data_cleaned.parquet"
OUTPUT_FILE = INTEGRATION_DIR / "weather_ridership_integrated_2024.parquet"

# File checks
print("Checking input files...")
print(f"  Ridership: {RIDERSHIP_FILE.name} - {'FOUND' if RIDERSHIP_FILE.exists() else 'NOT FOUND'}")
print(f"  Weather:   {WEATHER_FILE.name} - {'FOUND' if WEATHER_FILE.exists() else 'NOT FOUND'}")
print(f"  Output directory: {INTEGRATION_DIR.resolve()}")

# Validate required files
if not RIDERSHIP_FILE.exists() or not WEATHER_FILE.exists():
    raise FileNotFoundError("Required processed files not found. Please run the earlier cleaning notebooks.")

print("\nFile check complete. Ready to begin integration.")

Checking input files...
  Ridership: subway_data_cleaned.parquet - FOUND
  Weather:   weather_data_cleaned.parquet - FOUND
  Output directory: C:\Users\neasa\manhattan-subway\data\processed\integration

File check complete. Ready to begin integration.


In [3]:
# =========================================
# Step 1: Load and Validate Input Data Quality
# =========================================

print("=" * 60)
print("INPUT DATA QUALITY REVIEW")
print("=" * 60)

# Quality assessment file paths
SUBWAY_QUALITY_FILE = PROCESSED_DIR / "subway_data_quality_assessment.json"
WEATHER_QUALITY_FILE = PROCESSED_DIR / "weather_data_quality_assessment.json"

# Check for quality files
has_quality = SUBWAY_QUALITY_FILE.exists() and WEATHER_QUALITY_FILE.exists()

if has_quality:
    # Load quality JSONs
    with open(SUBWAY_QUALITY_FILE, "r") as f:
        subway_quality = json.load(f)
    with open(WEATHER_QUALITY_FILE, "r") as f:
        weather_quality = json.load(f)

    # Subway metrics
    subway_score = subway_quality.get("quality_scores", {}).get("Overall Score", 0)
    subway_records = subway_quality.get("dataset_info", {}).get("total_records", "N/A")
    subway_stations = subway_quality.get("dataset_info", {}).get("unique_stations", "N/A")
    subway_temporal_pct = subway_quality.get("quality_scores", {}).get("Temporal Coverage", 0)

    # Weather metrics
    weather_score = weather_quality.get("quality_metrics", {}).get("overall_score", 0)
    weather_records = weather_quality.get("processing_summary", {}).get("final_integration_records", "N/A")
    weather_pct = weather_quality.get("processing_summary", {}).get("processing_success_rate", 0)
    weather_ready_flag = weather_quality.get("feature_summary", {}).get("integration_ready", False)

    # Display summary
    print("SUBWAY RIDERSHIP DATA QUALITY")
    print(f"  Overall Score:         {subway_score:.1f}/100")
    print(f"  Total Records:         {subway_records:,}")
    print(f"  Unique Stations:       {subway_stations}")
    print(f"  Temporal Coverage:     {subway_temporal_pct:.1f}%")

    print("\nWEATHER DATA QUALITY")
    print(f"  Overall Score:         {weather_score:.1f}/100")
    print(f"  Final Records:         {weather_records:,}")
    print(f"  Success Rate:          {weather_pct:.1f}%")
    print(f"  Integration Ready:     {weather_ready_flag}")

    # Integration readiness logic
    ridership_ready = subway_score >= 95
    weather_ready = weather_score >= 95 and weather_ready_flag
    integration_ready = ridership_ready and weather_ready

    print("\nINTEGRATION READINESS")
    print(f"  Ridership Data Ready:  {'YES' if ridership_ready else 'NO'}")
    print(f"  Weather Data Ready:    {'YES' if weather_ready else 'NO'}")
    print(f"  Integration Status:    {'READY' if integration_ready else 'REVIEW NEEDED'}")

    if not integration_ready:
        print("\nWarning: One or both datasets fall below the preferred quality threshold.")
        print("Proceeding with integration, but results should be validated downstream.")
    else:
        print("\nDatasets meet quality standards for integration.")

else:
    print("No quality assessment files found.")
    print("Proceeding without formal quality check.")
    integration_ready = True

print("\nProceeding with integration...")


INPUT DATA QUALITY REVIEW
No quality assessment files found.
Proceeding without formal quality check.

Proceeding with integration...


In [4]:
# =========================================
# Step 2: Load Processed Datasets
# =========================================

print("=" * 60)
print("LOADING PROCESSED DATASETS")
print("=" * 60)

# Confirm file paths again (should be previously defined)
if not RIDERSHIP_FILE.exists() or not WEATHER_FILE.exists():
    raise FileNotFoundError("Processed input files not found. Please run prior notebooks.")

# -------------------------------
# Load Ridership Data
# -------------------------------
print("Loading ridership data...")
ridership_df = pd.read_parquet(RIDERSHIP_FILE)
ridership_df['transit_timestamp'] = pd.to_datetime(ridership_df['transit_timestamp'])

print(f"+ Ridership records loaded:   {len(ridership_df):,}")
print(f"  • Unique stations:          {ridership_df['station_complex_id'].nunique()}")
print(f"  • Date range:               {ridership_df['transit_timestamp'].min()} to {ridership_df['transit_timestamp'].max()}")
print(f"  • Features:                 {len(ridership_df.columns)}")
print(f"  • Memory usage:             {ridership_df.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

# -------------------------------
# Load Weather Data
# -------------------------------
print("\nLoading weather data...")
weather_df = pd.read_parquet(WEATHER_FILE)
weather_df['transit_timestamp'] = pd.to_datetime(weather_df['transit_timestamp'])

print(f"+ Weather records loaded:     {len(weather_df):,}")
print(f"  • Date range:               {weather_df['transit_timestamp'].min()} to {weather_df['transit_timestamp'].max()}")
print(f"  • Features:                 {len(weather_df.columns)}")
print(f"  • Memory usage:             {weather_df.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

# -------------------------------
# Preview Samples and Structure
# -------------------------------
print("\nRIDERSHIP DATA SAMPLE:")
display(ridership_df.head(3))

print("\nWEATHER DATA SAMPLE:")
display(weather_df.head(3))

print("\nRIDERSHIP COLUMNS:")
print(f"  ({len(ridership_df.columns)}): {list(ridership_df.columns)}")

print("\nWEATHER COLUMNS:")
print(f"  ({len(weather_df.columns)}): {list(weather_df.columns)}")


LOADING PROCESSED DATASETS
Loading ridership data...
+ Ridership records loaded:   1,052,709
  • Unique stations:          121
  • Date range:               2024-01-01 00:00:00 to 2024-12-31 23:00:00
  • Features:                 11
  • Memory usage:             177.2 MB

Loading weather data...
+ Weather records loaded:     8,783
  • Date range:               2024-01-01 00:00:00 to 2024-12-31 23:00:00
  • Features:                 13
  • Memory usage:             1.8 MB

RIDERSHIP DATA SAMPLE:


Unnamed: 0,station_complex_id,station_complex,transit_timestamp,ridership,latitude,longitude,is_cbd,hour,day_of_week,month,date
0,8,"5 Av/59 St (N,R,W)",2024-01-01 00:00:00,1141,40.76,-73.97,1,0,0,1,2024-01-01
1,8,"5 Av/59 St (N,R,W)",2024-01-01 01:00:00,351,40.76,-73.97,1,1,0,1,2024-01-01
2,8,"5 Av/59 St (N,R,W)",2024-01-01 02:00:00,94,40.76,-73.97,1,2,0,1,2024-01-01



WEATHER DATA SAMPLE:


Unnamed: 0,transit_timestamp,temp,feels_like,dew_point,humidity,wind_speed,pressure,visibility,rain_1h,snow_1h,weather_main,weather_description,clouds_all
0,2024-01-01 00:00:00,5.75,2.53,-1.44,59.0,4.47,1016.0,10000.0,0.12,,Rain,light rain,100.0
1,2024-01-01 01:00:00,5.43,2.32,-1.51,60.0,4.12,1016.0,10000.0,0.14,,Rain,light rain,100.0
2,2024-01-01 02:00:00,5.45,2.35,-1.1,62.0,4.12,1016.0,10000.0,,,Clouds,overcast clouds,100.0



RIDERSHIP COLUMNS:
  (11): ['station_complex_id', 'station_complex', 'transit_timestamp', 'ridership', 'latitude', 'longitude', 'is_cbd', 'hour', 'day_of_week', 'month', 'date']

WEATHER COLUMNS:
  (13): ['transit_timestamp', 'temp', 'feels_like', 'dew_point', 'humidity', 'wind_speed', 'pressure', 'visibility', 'rain_1h', 'snow_1h', 'weather_main', 'weather_description', 'clouds_all']


In [5]:
# =========================================
# Step 3: Timezone Alignment and Temporal Validation
# =========================================

print("=" * 60)
print("TIMEZONE ALIGNMENT AND TEMPORAL VALIDATION")
print("=" * 60)

# Check timezone awareness
ridership_tz = ridership_df['transit_timestamp'].dt.tz
weather_tz = weather_df['transit_timestamp'].dt.tz

print(f"Ridership timezone: {ridership_tz}")
print(f"Weather timezone:   {weather_tz}")

# Ensure both are timezone-naive for merging
if ridership_tz is not None:
    ridership_df['transit_timestamp'] = ridership_df['transit_timestamp'].dt.tz_localize(None)
    print("→ Timezone removed from ridership timestamps")

if weather_tz is not None:
    weather_df['transit_timestamp'] = weather_df['transit_timestamp'].dt.tz_localize(None)
    print("→ Timezone removed from weather timestamps")

# Validate date range overlap
print("\nTEMPORAL OVERLAP ANALYSIS")

ridership_range = (
    ridership_df['transit_timestamp'].min(),
    ridership_df['transit_timestamp'].max()
)
weather_range = (
    weather_df['transit_timestamp'].min(),
    weather_df['transit_timestamp'].max()
)

overlap_start = max(ridership_range[0], weather_range[0])
overlap_end = min(ridership_range[1], weather_range[1])

print(f"Ridership range: {ridership_range[0]} to {ridership_range[1]}")
print(f"Weather range:   {weather_range[0]} to {weather_range[1]}")
print(f"Overlap period:  {overlap_start} to {overlap_end}")

if overlap_start >= overlap_end:
    raise ValueError("ERROR: No temporal overlap between ridership and weather datasets")

# Compute overlap coverage
overlap_hours = (overlap_end - overlap_start).total_seconds() / 3600
ridership_hours = (ridership_range[1] - ridership_range[0]).total_seconds() / 3600
weather_hours = (weather_range[1] - weather_range[0]).total_seconds() / 3600

ridership_coverage = (overlap_hours / ridership_hours) * 100
weather_coverage = (overlap_hours / weather_hours) * 100

print(f"\nOverlap confirmed: {overlap_hours:.0f} hours")
print(f"  • Ridership coverage: {ridership_coverage:.1f}%")
print(f"  • Weather coverage:   {weather_coverage:.1f}%")

# Interpret coverage
if ridership_coverage >= 95 and weather_coverage >= 95:
    print("Temporal alignment: EXCELLENT")
else:
    print("Warning: Overlap is below recommended threshold")


TIMEZONE ALIGNMENT AND TEMPORAL VALIDATION
Ridership timezone: None
Weather timezone:   None

TEMPORAL OVERLAP ANALYSIS
Ridership range: 2024-01-01 00:00:00 to 2024-12-31 23:00:00
Weather range:   2024-01-01 00:00:00 to 2024-12-31 23:00:00
Overlap period:  2024-01-01 00:00:00 to 2024-12-31 23:00:00

Overlap confirmed: 8783 hours
  • Ridership coverage: 100.0%
  • Weather coverage:   100.0%
Temporal alignment: EXCELLENT


In [6]:
# =========================================
# Step 4: Core Dataset Integration
# =========================================

print("=" * 60)
print("CORE DATASET INTEGRATION")
print("=" * 60)

# Pre-integration metrics
print("Pre-integration stats:")
print(f"• Ridership records: {len(ridership_df):,}")
print(f"• Weather records:   {len(weather_df):,}")

# Perform left join on 'transit_timestamp'
print("\nJoining on 'transit_timestamp' (left join)...")
integrated_df = ridership_df.merge(
    weather_df,
    on="transit_timestamp",
    how="left",
    suffixes=("", "_weather")
)

# Post-integration stats
total_records = len(integrated_df)
matched_weather = integrated_df['temp'].notna().sum()
missing_weather = total_records - matched_weather
match_rate = matched_weather / total_records * 100

print("\nPost-integration stats:")
print(f"• Total records:     {total_records:,}")
print(f"• Matched weather:   {matched_weather:,} ({match_rate:.1f}%)")
print(f"• Missing weather:   {missing_weather:,} ({100 - match_rate:.1f}%)")
print(f"• Columns in output: {len(integrated_df.columns)}")

# Integration quality assessment
if match_rate >= 98:
    quality = "EXCELLENT"
elif match_rate >= 95:
    quality = "VERY GOOD"
elif match_rate >= 90:
    quality = "GOOD"
else:
    quality = "NEEDS REVIEW"

print(f"\nIntegration quality: {quality}")
if match_rate < 95:
    print("Note: Some ridership records may lack weather data")
else:
    print("Weather coverage is sufficient for feature engineering")

# Display sample
print("\nSample of integrated data:")
sample_columns = [
    "station_complex_id", "transit_timestamp", "ridership",
    "temp", "weather_main", "is_cbd", "humidity", "wind_speed"
]
sample_columns = [col for col in sample_columns if col in integrated_df.columns][:7]

print(f"Columns shown: {sample_columns}")
display(integrated_df[sample_columns].head(3))


CORE DATASET INTEGRATION
Pre-integration stats:
• Ridership records: 1,052,709
• Weather records:   8,783

Joining on 'transit_timestamp' (left join)...

Post-integration stats:
• Total records:     1,052,709
• Matched weather:   1,052,709 (100.0%)
• Missing weather:   0 (0.0%)
• Columns in output: 23

Integration quality: EXCELLENT
Weather coverage is sufficient for feature engineering

Sample of integrated data:
Columns shown: ['station_complex_id', 'transit_timestamp', 'ridership', 'temp', 'weather_main', 'is_cbd', 'humidity']


Unnamed: 0,station_complex_id,transit_timestamp,ridership,temp,weather_main,is_cbd,humidity
0,8,2024-01-01 00:00:00,1141,5.75,Rain,1,59.0
1,8,2024-01-01 01:00:00,351,5.43,Rain,1,60.0
2,8,2024-01-01 02:00:00,94,5.45,Clouds,1,62.0


In [7]:
# =========================================
# Step 5: Basic Dataset Validation
# =========================================

print("=" * 60)
print("BASIC DATASET VALIDATION")
print("=" * 60)

print("Validating structure and completeness of integrated dataset...\n")

# Dataset summary
print("Integrated Dataset Overview:")
print(f"• Total records:       {len(integrated_df):,}")
print(f"• Unique stations:     {integrated_df['station_complex_id'].nunique()}")
print(f"• Unique timestamps:   {integrated_df['transit_timestamp'].nunique():,}")
print(f"• Date range:          {integrated_df['transit_timestamp'].min()} to {integrated_df['transit_timestamp'].max()}")
print(f"• Total columns:       {len(integrated_df.columns)}")

# Quality checks
print("\nBasic Quality Checks:")

# Duplicate detection
duplicates = integrated_df.duplicated(subset=["station_complex_id", "transit_timestamp"]).sum()
print(f"• Duplicate rows (station + time): {duplicates}")

# Weather data coverage
missing_weather = integrated_df['temp'].isna().sum()
print(f"• Records missing weather info:    {missing_weather}")

# Validation result
if duplicates == 0 and missing_weather == 0:
    print("• Integration status:              EXCELLENT")
elif duplicates == 0 and missing_weather <= 0.05 * len(integrated_df):
    print("• Integration status:              GOOD (minor missing weather)")
else:
    print("• Integration status:              NEEDS REVIEW")


BASIC DATASET VALIDATION
Validating structure and completeness of integrated dataset...

Integrated Dataset Overview:
• Total records:       1,052,709
• Unique stations:     121
• Unique timestamps:   8,783
• Date range:          2024-01-01 00:00:00 to 2024-12-31 23:00:00
• Total columns:       23

Basic Quality Checks:
• Duplicate rows (station + time): 0
• Records missing weather info:    0
• Integration status:              EXCELLENT


In [8]:
# =========================================
# Step 6: Save Integrated Dataset
# =========================================

print("=" * 60)
print("SAVING INTEGRATED DATASET")
print("=" * 60)

# Export dataset using predefined OUTPUT_FILE
integrated_df.to_parquet(OUTPUT_FILE, index=False)

# Report output summary
file_size_mb = OUTPUT_FILE.stat().st_size / (1024 * 1024)
print(f"Integrated dataset saved: {OUTPUT_FILE.name}")
print(f"Location: {OUTPUT_FILE.resolve()}")
print(f"File size: {file_size_mb:.1f} MB")
print(f"Total records: {len(integrated_df):,}")
print(f"Total columns: {len(integrated_df.columns)}")

# Final status message
print("\n" + "=" * 60)
print("INTEGRATION COMPLETED SUCCESSFULLY")
print("=" * 60)
print("Next steps:")
print("1. Temporal pattern analysis (Notebook 04)")
print("2. Weather-ridership correlation analysis (Notebook 05)")
print("3. Feature engineering for modeling (Notebook 06)")


SAVING INTEGRATED DATASET
Integrated dataset saved: weather_ridership_integrated_2024.parquet
Location: C:\Users\neasa\manhattan-subway\data\processed\integration\weather_ridership_integrated_2024.parquet
File size: 10.3 MB
Total records: 1,052,709
Total columns: 23

INTEGRATION COMPLETED SUCCESSFULLY
Next steps:
1. Temporal pattern analysis (Notebook 04)
2. Weather-ridership correlation analysis (Notebook 05)
3. Feature engineering for modeling (Notebook 06)
