In [None]:
# =============================================================================
# PROJECT ATLAS: MODULE 04a - SETUP AND DATA LOADING
# =============================================================================
#
# OBJECTIVE: Shared environment and data loading for Risk Management modules
# USAGE: %run ./04a_Setup_and_Data.ipynb in downstream notebooks
# =============================================================================

# -----------------------------------------------------------------------------
# ¬ß 1. ENVIRONMENT SETUP
# -----------------------------------------------------------------------------

import polars as pl
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio
import os
import glob
import warnings
from typing import Optional, Dict
from datetime import datetime
from pathlib import Path

warnings.filterwarnings('ignore')

# =============================================================================
# 1.2 CONFIGURATION CONSTANTS
# =============================================================================

AGG_DIR = './HVFHV subsets 2019-2025 - Aggregates/Aggregates_Processed/'
SAMPLE_DIR = './HVFHV subsets 2019-2025 - Samples/'

DATA_PATHS = {
    'executive': os.path.join(AGG_DIR, 'agg_executive_daily.csv'),
    'sample_pattern': os.path.join(SAMPLE_DIR, 'tlc_sample_*_processed.parquet'),
    'network': os.path.join(AGG_DIR, 'agg_network_monthly.parquet')
}

# =============================================================================
# PLOTLY + UBER STYLE BOOTSTRAP
# =============================================================================
from pathlib import Path
import plotly.io as pio

import uber_style as ub 

pio.templates["uber"] = ub.uber_style_template
pio.templates.default = "uber"

from uber_style import *

# Export color constants explicitly for downstream notebooks
UBER_BLACK = ub.UBER_BLACK
UBER_RED = ub.UBER_RED
UBER_GREEN = ub.UBER_GREEN
UBER_ORANGE = ub.UBER_ORANGE
UBER_PURPLE = ub.UBER_PURPLE
UBER_BROWN = ub.UBER_BROWN
UBER_YELLOW = ub.UBER_YELLOW
UBER_WHITE = ub.UBER_WHITE
GRAY_900 = ub.GRAY_900
GRAY_700 = ub.GRAY_700
GRAY_600 = ub.GRAY_600
GRAY_500 = ub.GRAY_500
GRAY_300 = ub.GRAY_300
GRAY_100 = ub.GRAY_100

# Create missing gray shades (interpolated)
GRAY_800 = "#222222"  # Between GRAY_900 (#141414) and GRAY_700 (#333333)
GRAY_400 = "#CCCCCC"  # Between GRAY_500 (#AFAFAF) and GRAY_300 (#E2E2E2)
GRAY_200 = "#ECECEC"  # Between GRAY_300 (#E2E2E2) and GRAY_100 (#F6F6F6)

PLOT_DIR = Path("plots")
PLOT_DIR.mkdir(exist_ok=True)


def _plot_paths(fig_name: str):
    """Return path json + html for 1 figure name."""
    json_path = PLOT_DIR / f"{fig_name}.json"
    html_path = PLOT_DIR / f"{fig_name}.html"
    return json_path, html_path


def load_plot_if_exists(fig_name: str):
    """
    If JSON file of the figure exists:
        -> return (fig, True)
    If not exists:
        -> return (None, False)
    """
    json_path, _ = _plot_paths(fig_name)
    if json_path.exists():
        with open(json_path, "r", encoding="utf-8") as f:
            fig = pio.from_json(f.read())
        return fig, True
    return None, False


def save_plot(fig, fig_name: str):
    """
    Save figure as JSON + HTML (no show).
    """
    json_path, html_path = _plot_paths(fig_name)

    # JSON
    with open(json_path, "w", encoding="utf-8") as f:
        f.write(pio.to_json(fig))

    # HTML
    pio.write_html(
        fig,
        file=str(html_path),
        include_plotlyjs="cdn",
        auto_open=False
    )

print("‚úÖ Environment configured successfully")
print(f"   - Notebook: 001a_Spatial_OD")

print("üé® Uber BI template + color system loaded successfully")

üé® Uber BI template + color system loaded successfully


In [2]:
# =============================================================================
# ¬ß 2. STATISTICAL UTILITY FUNCTIONS
# =============================================================================

def format_number(x: float, pos: Optional[int] = None) -> str:
    """Format large numbers with K/M suffixes."""
    if x >= 1e6:
        return '{:1.1f}M'.format(x * 1e-6)
    elif x >= 1e3:
        return '{:1.0f}K'.format(x * 1e-3)
    else:
        return '{:1.0f}'.format(x)

def format_currency(x: float, pos: Optional[int] = None) -> str:
    """Format currency values with $ prefix."""
    if x >= 1e6:
        return '${:1.1f}M'.format(x * 1e-6)
    elif x >= 1e3:
        return '${:1.0f}K'.format(x * 1e-3)
    else:
        return '${:1.0f}'.format(x)

def calculate_modified_zscore(data: np.ndarray) -> np.ndarray:
    """
    Calculate modified Z-score using median absolute deviation (MAD).
    More robust to outliers than standard Z-score.
    
    Args:
        data: 1D numpy array
    
    Returns:
        Modified Z-scores for each data point
    """
    median = np.median(data)
    mad = np.median(np.abs(data - median))
    modified_z = 0.6745 * (data - median) / mad if mad > 0 else np.zeros_like(data)
    return modified_z

def flag_outliers_iqr(df: pl.DataFrame, column: str, 
                      multiplier: float = 3.0) -> pl.DataFrame:
    """
    Flag outliers using IQR method.
    
    Args:
        df: Polars DataFrame
        column: Column name to analyze
        multiplier: IQR multiplier (1.5 = moderate, 3.0 = extreme)
    
    Returns:
        DataFrame with outlier flag column
    """
    q1 = df.select(pl.col(column).quantile(0.25)).item()
    q3 = df.select(pl.col(column).quantile(0.75)).item()
    iqr = q3 - q1
    
    lower_bound = q1 - multiplier * iqr
    upper_bound = q3 + multiplier * iqr
    
    df_flagged = df.with_columns([
        ((pl.col(column) < lower_bound) | (pl.col(column) > upper_bound))
        .alias(f'{column}_outlier')
    ])
    
    return df_flagged

print("‚úÖ Statistical utility functions loaded")

‚úÖ Statistical utility functions loaded


In [3]:
# =============================================================================
# ¬ß 3. DATA LOADING FUNCTIONS (MEMORY-OPTIMIZED)
# =============================================================================

def load_full_sample_data(pattern: str, use_lazy: bool = True, n_rows: Optional[int] = None) -> pl.DataFrame:
    """
    Load and merge all sample parquet files with full feature set.
    Uses lazy evaluation for memory efficiency with large datasets (5M+ rows).
    
    Args:
        pattern: Glob pattern for sample files
        use_lazy: If True, use lazy evaluation (scan_parquet) for memory efficiency
        n_rows: Optional row limit for testing (None = load all)
    
    Returns:
        Combined Polars DataFrame with all trips
    """
    sample_files = sorted(glob.glob(pattern))
    
    if not sample_files:
        raise FileNotFoundError(f"No files found matching pattern: {pattern}")
    
    print(f"   üìÇ Located {len(sample_files)} sample files")
    for f in sample_files:
        print(f"      - {os.path.basename(f)}")
    
    # Use lazy evaluation for memory efficiency
    if use_lazy and n_rows is None:
        print("   üîß Using lazy evaluation (scan_parquet) for memory efficiency...")
        df_lazy = pl.scan_parquet(sample_files)
        
        # Apply filters in lazy mode before collecting
        df_lazy = df_lazy.filter(
            (pl.col('trip_km') > 0) &
            (pl.col('duration_min') > 0) &
            (pl.col('total_rider_cost') > 0) &
            (pl.col('speed_kmh') > 0) &
            (pl.col('speed_kmh') <= 120)
        )
        
        df = df_lazy.collect()
    else:
        # For testing with row limit or explicit eager loading
        if n_rows:
            print(f"   üß™ Test mode: Loading first {n_rows:,} rows only...")
        df = pl.read_parquet(sample_files, n_rows=n_rows)
        
        # Apply data quality filters
        df = df.filter(
            (pl.col('trip_km') > 0) &
            (pl.col('duration_min') > 0) &
            (pl.col('total_rider_cost') > 0) &
            (pl.col('speed_kmh') > 0) &
            (pl.col('speed_kmh') <= 120)
        )
    
    # Validate critical columns for anomaly analysis
    required_cols = [
        'trip_km', 'duration_min', 'total_rider_cost',
        'pickup_datetime', 'pickup_borough', 'dropoff_borough',
        'speed_kmh'
    ]
    
    # Optional columns (weather data)
    optional_cols = ['conditions', 'temp', 'weather_state']
    missing = [col for col in required_cols if col not in df.columns]
    if missing:
        raise ValueError(f"Missing critical columns: {missing}")
    
    # Check weather columns availability
    weather_available = all(col in df.columns for col in optional_cols)
    if not weather_available:
        print(f"   ‚ö†Ô∏è  Weather columns not fully available: {[c for c in optional_cols if c not in df.columns]}")
    
    return df

def calculate_executive_daily_from_sample(df_sample: pl.DataFrame) -> pl.DataFrame:
    """
    Calculate daily executive metrics from trip-level sample data.
    This is the CORRECT way to get actual speed metrics instead of using placeholders.
    
    Args:
        df_sample: Full trip-level sample data
    
    Returns:
        Daily aggregated metrics with actual calculated values
    """
    print("   üîß Calculating daily metrics from trip-level data...")
    
    # Extract date from datetime
    daily_agg = (
        df_sample
        .with_columns([
            pl.col('pickup_datetime').dt.date().alias('date')
        ])
        .group_by('date')
        .agg([
            pl.count().alias('total_trips'),
            pl.col('total_rider_cost').sum().alias('total_revenue'),
            pl.col('total_rider_cost').sum().alias('total_gross_booking_value'),
            pl.col('trip_km').sum().alias('total_km_traveled'),
            
            # ACTUAL calculated metrics (not placeholders)
            pl.col('speed_kmh').mean().alias('avg_speed_kmh'),
            pl.col('speed_kmh').std().alias('std_speed_kmh'),
            pl.col('duration_min').mean().alias('avg_duration_min'),
            pl.col('total_rider_cost').mean().alias('avg_cost'),
            pl.col('trip_km').mean().alias('avg_distance_km'),
            
            # Weather impact metrics (if available)
            *([pl.col('conditions').mode().first().alias('dominant_weather')] 
              if 'conditions' in df_sample.columns else []),
            *([pl.col('temp').mean().alias('avg_temp')] 
              if 'temp' in df_sample.columns else [])
        ])
        .sort('date')
    )
    
    print(f"   ‚úÖ Calculated {daily_agg.height:,} days of metrics")
    return daily_agg

print("‚úÖ Data loading functions defined")

‚úÖ Data loading functions defined


In [None]:
# =============================================================================
# ¬ß 4. LOAD DATA (EXECUTE ONCE)
# =============================================================================

# Check if data already loaded (prevent re-loading on %run)
if 'df_sample' not in globals() or 'df_daily' not in globals():
    print("\n" + "=" * 80)
    print("‚è≥ LOADING DATA FOR RISK MANAGEMENT ANALYSIS")
    print("=" * 80)

    try:
        # Load full sample data with lazy evaluation
        print("\nüìä Loading Full Sample Data (tlc_sample_*_processed)...")
        print("   üí° Using lazy evaluation to handle 5M+ rows efficiently...")
        
        df_sample = load_full_sample_data(
            DATA_PATHS['sample_pattern'],
            use_lazy=True,  # Memory-efficient lazy loading
            n_rows=None     # Change to e.g. 100000 for testing
        )
        
        print(f"\n   ‚úÖ Loaded: {df_sample.height:,} trips")
        print(f"   üíæ Memory footprint: {df_sample.estimated_size('mb'):.1f} MB")
        print(f"   üìÖ Date range: {df_sample.select(pl.col('pickup_datetime').min())[0,0]} to {df_sample.select(pl.col('pickup_datetime').max())[0,0]}")
        
        # Calculate daily metrics FROM SAMPLE DATA (correct approach)
        print("\nüìä Calculating Daily Executive Metrics from Sample...")
        df_daily = calculate_executive_daily_from_sample(df_sample)
        
        print(f"   ‚úÖ Generated {df_daily.height:,} daily records with ACTUAL speed metrics")
        print(f"   üìä Avg Speed Range: {df_daily['avg_speed_kmh'].min():.1f} - {df_daily['avg_speed_kmh'].max():.1f} km/h")
        
        print("\n" + "=" * 80)
        print("‚úÖ DATA LOADING COMPLETE - Ready for analysis modules")
        print("=" * 80)
        print("\nüí° Available datasets:")
        print("   - df_sample : Trip-level data (5M+ rows)")
        print("   - df_daily  : Daily aggregates (2K+ days)")
        print("\n‚ö° Use %run ./04a_Setup_and_Data.ipynb in other notebooks to import")
        
    except Exception as e:
        print(f"\n‚ùå ERROR: Data loading failed")
        print(f"   Details: {str(e)}")
        raise
else:
    print("\n‚úÖ Data already loaded in memory - skipping reload")
    print(f"   - df_sample: {df_sample.height:,} trips ({df_sample.estimated_size('mb'):.1f} MB)")
    print(f"   - df_daily: {df_daily.height:,} daily records")


‚è≥ LOADING DATA FOR RISK MANAGEMENT ANALYSIS

üìä Loading Full Sample Data (tlc_sample_*_processed)...
   üí° Using lazy evaluation to handle 5M+ rows efficiently...
   üìÇ Located 7 sample files
      - tlc_sample_2019_processed.parquet
      - tlc_sample_2020_processed.parquet
      - tlc_sample_2021_processed.parquet
      - tlc_sample_2022_processed.parquet
      - tlc_sample_2023_processed.parquet
      - tlc_sample_2024_processed.parquet
      - tlc_sample_2025_processed.parquet
   üîß Using lazy evaluation (scan_parquet) for memory efficiency...

   ‚úÖ Loaded: 9,830,241 trips

   ‚úÖ Loaded: 9,830,241 trips
   üíæ Memory footprint: 3694.2 MB
   üìÖ Date range: 2019-02-01 00:00:16 to 2025-09-30 23:58:55

üìä Calculating Daily Executive Metrics from Sample...
   üîß Calculating daily metrics from trip-level data...
   üíæ Memory footprint: 3694.2 MB
   üìÖ Date range: 2019-02-01 00:00:16 to 2025-09-30 23:58:55

üìä Calculating Daily Executive Metrics from Sample...
 