# Data Cleaning Pipeline



## 1. Imports and Setup

In [None]:
import pandas as pd
import numpy as np
from datetime import datetime, timezone
from typing import Tuple, Dict, Any
import warnings
warnings.filterwarnings('ignore')

import matplotlib.pyplot as plt
import seaborn as sns

pd.set_option('display.max_columns', None)
pd.set_option('display.precision', 2)
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

print("Libraries imported successfully")
print(f"Pandas version: {pd.__version__}")
print(f"NumPy version: {np.__version__}")
print(f"Matplotlib version: {plt.matplotlib.__version__}")
print(f"Seaborn version: {sns.__version__}")

Libraries imported successfully
Pandas version: 2.2.3
NumPy version: 2.2.6
Matplotlib version: 3.10.0
Seaborn version: 0.13.2


In [None]:
import os
OUTPUT_DIR = '../outputs/images/cleaning'
os.makedirs(OUTPUT_DIR, exist_ok=True)

def save_figure(filename, dpi=300, bbox_inches='tight'):
    filepath = os.path.join(OUTPUT_DIR, filename)
    plt.savefig(filepath, dpi=dpi, bbox_inches=bbox_inches)
    print(f"Saved figure: {filename}")

print(f"Output directory configured: {OUTPUT_DIR}")

✓ Output directory configured: ../outputs/images/cleaning


## 2. Data Loading and Type Enforcement


In [None]:
def load_and_enforce_types(filepath: str) -> pd.DataFrame:
    """Load dataset and enforce correct data types."""
    print("\n" + "="*80)
    print("STEP 1: DATA LOADING AND TYPE ENFORCEMENT")
    print("="*80)
    
    dtype_spec = {
        'order_category': 'category',
        'item_count': 'Int64',
        'actual_delivery_time_minutes': 'float64',
        'estimated_delivery_time_lower_minutes': 'float64',
        'estimated_delivery_time_upper_minutes': 'float64',
        'venue_location_h3_index': 'str',
        'customer_location_h3_index': 'str',
        'courier_supply_index': 'float64',
        'precipitation': 'float64'
    }
    
    df = pd.read_csv(filepath, dtype=dtype_spec, parse_dates=['order_placed_at_utc'])
    
    initial_rows = len(df)
    print(f"\nLoaded {initial_rows:,} rows from {filepath}")
    
    if df['order_placed_at_utc'].dt.tz is None:
        df['order_placed_at_utc'] = df['order_placed_at_utc'].dt.tz_localize('UTC')
        print("Localized timestamps to UTC")
    else:
        df['order_placed_at_utc'] = df['order_placed_at_utc'].dt.tz_convert('UTC')
        print("Converted timestamps to UTC")
    
    assert df['order_placed_at_utc'].dtype == 'datetime64[ns, UTC]'
    assert df['order_category'].dtype.name == 'category'
    assert df['venue_location_h3_index'].dtype == 'object'
    assert df['customer_location_h3_index'].dtype == 'object'
    
    print("\nType enforcement complete")
    print("\nData types:")
    print(df.dtypes)
    
    print(f"\nDate range: {df['order_placed_at_utc'].min()} to {df['order_placed_at_utc'].max()}")
    print(f"Number of days: {(df['order_placed_at_utc'].max() - df['order_placed_at_utc'].min()).days}")
    
    return df

## 3. Structural Validation (Duplicates)

Remove exact duplicate rows and identify potential duplicate orders.

In [None]:
def remove_duplicates(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """Remove exact and potential duplicate orders."""
    print("\n" + "="*80)
    print("STEP 2: STRUCTURAL VALIDATION (DUPLICATES)")
    print("="*80)
    
    initial_rows = len(df)
    print(f"\nStarting rows: {initial_rows:,}")
    
    df_clean = df.drop_duplicates()
    exact_duplicates_removed = initial_rows - len(df_clean)
    print(f"\nRemoved {exact_duplicates_removed:,} exact duplicate rows")
    
    duplicate_cols = [
        'order_placed_at_utc',
        'venue_location_h3_index',
        'customer_location_h3_index',
        'item_count',
        'estimated_delivery_time_lower_minutes',
        'estimated_delivery_time_upper_minutes'
    ]
    
    before_potential = len(df_clean)
    df_clean = df_clean.drop_duplicates(subset=duplicate_cols, keep='first')
    potential_duplicates_removed = before_potential - len(df_clean)
    print(f"Removed {potential_duplicates_removed:,} potential duplicate orders")
    
    total_removed = initial_rows - len(df_clean)
    print(f"\nTotal rows removed: {total_removed:,}")
    print(f"Remaining rows: {len(df_clean):,}")
    
    log = {
        'exact_duplicates': exact_duplicates_removed,
        'potential_duplicates': potential_duplicates_removed
    }
    
    return df_clean, log

## 4. Target Cleaning

Clean the target variable `actual_delivery_time_minutes`:
- Remove non-positive values
- Remove unrealistic values (> 180 minutes = 3 hours)
- Do not impute

In [None]:
def clean_target(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """Clean target variable by removing invalid values."""
    print("\n" + "="*80)
    print("STEP 3: TARGET CLEANING")
    print("="*80)
    
    initial_rows = len(df)
    print(f"\nStarting rows: {initial_rows:,}")
    
    print("\nTarget statistics BEFORE cleaning:")
    print(df['actual_delivery_time_minutes'].describe())
    print(f"Missing values: {df['actual_delivery_time_minutes'].isna().sum()}")
    
    non_positive = (df['actual_delivery_time_minutes'] <= 0).sum()
    df_clean = df[df['actual_delivery_time_minutes'] > 0].copy()
    print(f"\nRemoved {non_positive:,} rows with actual_delivery_time_minutes <= 0")
    
    unrealistic = (df_clean['actual_delivery_time_minutes'] > 180).sum()
    df_clean = df_clean[df_clean['actual_delivery_time_minutes'] <= 180].copy()
    print(f"Removed {unrealistic:,} rows with actual_delivery_time_minutes > 180")
    
    print("\nTarget statistics AFTER cleaning:")
    print(df_clean['actual_delivery_time_minutes'].describe())
    
    total_removed = initial_rows - len(df_clean)
    print(f"\nTotal rows removed: {total_removed:,}")
    print(f"Remaining rows: {len(df_clean):,}")
    
    log = {
        'target_non_positive': non_positive,
        'target_unrealistic': unrealistic
    }
    
    return df_clean, log

## 5. ETA Consistency Checks

Validate estimated delivery time bounds and create derived features.

In [None]:
def validate_and_engineer_eta(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """Validate ETA consistency and create derived features."""
    print("\n" + "="*80)
    print("STEP 4: ETA CONSISTENCY CHECKS")
    print("="*80)
    
    initial_rows = len(df)
    print(f"\nStarting rows: {initial_rows:,}")
    
    df_clean = df.copy()
    
    lower_invalid = (df_clean['estimated_delivery_time_lower_minutes'] <= 0).sum()
    df_clean = df_clean[df_clean['estimated_delivery_time_lower_minutes'] > 0].copy()
    print(f"\nRemoved {lower_invalid:,} rows with estimated_delivery_time_lower_minutes <= 0")
    
    upper_invalid = (df_clean['estimated_delivery_time_upper_minutes'] <= 0).sum()
    df_clean = df_clean[df_clean['estimated_delivery_time_upper_minutes'] > 0].copy()
    print(f"Removed {upper_invalid:,} rows with estimated_delivery_time_upper_minutes <= 0")
    
    inconsistent = (
        df_clean['estimated_delivery_time_lower_minutes'] > 
        df_clean['estimated_delivery_time_upper_minutes']
    ).sum()
    df_clean = df_clean[
        df_clean['estimated_delivery_time_lower_minutes'] <= 
        df_clean['estimated_delivery_time_upper_minutes']
    ].copy()
    print(f"Removed {inconsistent:,} rows where lower > upper")
    
    df_clean['estimate_width'] = (
        df_clean['estimated_delivery_time_upper_minutes'] - 
        df_clean['estimated_delivery_time_lower_minutes']
    )
    df_clean['estimate_midpoint'] = (
        df_clean['estimated_delivery_time_upper_minutes'] + 
        df_clean['estimated_delivery_time_lower_minutes']
    ) / 2
    print("\nCreated derived features: estimate_width, estimate_midpoint")
    
    print("\nEstimate width statistics:")
    print(df_clean['estimate_width'].describe())
    
    zero_width = (df_clean['estimate_width'] == 0).sum()
    large_width = (df_clean['estimate_width'] > 180).sum()
    print(f"\nRows with zero estimate width: {zero_width:,} ({zero_width/len(df_clean)*100:.2f}%)")
    print(f"Rows with estimate width > 180 minutes: {large_width:,} ({large_width/len(df_clean)*100:.2f}%)")
    
    total_removed = initial_rows - len(df_clean)
    print(f"\nTotal rows removed: {total_removed:,}")
    print(f"Remaining rows: {len(df_clean):,}")
    
    log = {
        'eta_lower_invalid': lower_invalid,
        'eta_upper_invalid': upper_invalid,
        'eta_inconsistent': inconsistent,
        'zero_width_count': zero_width,
        'large_width_count': large_width
    }
    
    return df_clean, log

## 6. Feature Sanity Checks

Validate and clean individual features based on domain knowledge.

In [None]:
def validate_features(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """Validate and clean individual features based on domain knowledge."""
    print("\n" + "="*80)
    print("STEP 5: FEATURE SANITY CHECKS")
    print("="*80)
    
    initial_rows = len(df)
    print(f"\nStarting rows: {initial_rows:,}")
    
    df_clean = df.copy()
    
    print("\n--- courier_supply_index ---")
    print(f"Range: [{df_clean['courier_supply_index'].min():.2f}, {df_clean['courier_supply_index'].max():.2f}]")
    courier_invalid = (df_clean['courier_supply_index'] < 0).sum()
    df_clean = df_clean[df_clean['courier_supply_index'] >= 0].copy()
    print(f"Removed {courier_invalid:,} rows with courier_supply_index < 0")
    
    print("\n--- precipitation ---")
    print(f"Range: [{df_clean['precipitation'].min():.2f}, {df_clean['precipitation'].max():.2f}]")
    precip_negative = (df_clean['precipitation'] < 0).sum()
    df_clean = df_clean[df_clean['precipitation'] >= 0].copy()
    print(f"Removed {precip_negative:,} rows with precipitation < 0")
    
    precip_excessive = (df_clean['precipitation'] > 100).sum()
    df_clean = df_clean[df_clean['precipitation'] <= 100].copy()
    print(f"Removed {precip_excessive:,} rows with precipitation > 100")
    
    print("\n--- item_count ---")
    print(f"Range: [{df_clean['item_count'].min()}, {df_clean['item_count'].max()}]")
    item_invalid = (df_clean['item_count'] <= 0).sum()
    df_clean = df_clean[df_clean['item_count'] > 0].copy()
    print(f"Removed {item_invalid:,} rows with item_count <= 0")
    
    print("\nNumeric feature summary:")
    numeric_cols = [
        'courier_supply_index', 
        'precipitation', 
        'item_count',
        'actual_delivery_time_minutes'
    ]
    print(df_clean[numeric_cols].describe())
    
    total_removed = initial_rows - len(df_clean)
    print(f"\nTotal rows removed: {total_removed:,}")
    print(f"Remaining rows: {len(df_clean):,}")
    
    log = {
        'courier_supply_invalid': courier_invalid,
        'precipitation_negative': precip_negative,
        'precipitation_excessive': precip_excessive,
        'item_count_invalid': item_invalid
    }
    
    return df_clean, log

## 7. H3 Validation

Validate H3 geospatial indexes for consistency.

In [None]:
def validate_h3_indexes(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, int]]:
    """Validate H3 geospatial indexes."""
    print("\n" + "="*80)
    print("STEP 6: H3 VALIDATION")
    print("="*80)
    
    initial_rows = len(df)
    print(f"\nStarting rows: {initial_rows:,}")
    
    df_clean = df.copy()
    
    venue_nulls = df_clean['venue_location_h3_index'].isna().sum()
    customer_nulls = df_clean['customer_location_h3_index'].isna().sum()
    print(f"\nNull values:")
    print(f"  venue_location_h3_index: {venue_nulls:,}")
    print(f"  customer_location_h3_index: {customer_nulls:,}")
    
    df_clean = df_clean.dropna(subset=['venue_location_h3_index', 'customer_location_h3_index']).copy()
    null_removed = venue_nulls + customer_nulls
    print(f"Removed {null_removed:,} rows with null H3 indexes")
    
    venue_lengths = df_clean['venue_location_h3_index'].str.len()
    customer_lengths = df_clean['customer_location_h3_index'].str.len()
    
    print(f"\nH3 string length distribution:")
    print(f"  venue_location_h3_index: {venue_lengths.value_counts().to_dict()}")
    print(f"  customer_location_h3_index: {customer_lengths.value_counts().to_dict()}")
    
    expected_length = venue_lengths.mode()[0]
    print(f"\nExpected H3 length: {expected_length}")
    
    before_length_check = len(df_clean)
    df_clean = df_clean[
        (venue_lengths == expected_length) & 
        (customer_lengths == expected_length)
    ].copy()
    length_invalid = before_length_check - len(df_clean)
    print(f"Removed {length_invalid:,} rows with inconsistent H3 lengths")
    
    empty_venue = (df_clean['venue_location_h3_index'] == '').sum()
    empty_customer = (df_clean['customer_location_h3_index'] == '').sum()
    df_clean = df_clean[
        (df_clean['venue_location_h3_index'] != '') & 
        (df_clean['customer_location_h3_index'] != '')
    ].copy()
    empty_removed = empty_venue + empty_customer
    if empty_removed > 0:
        print(f"Removed {empty_removed:,} rows with empty H3 strings")
    
    print(f"\nUnique locations:")
    print(f"  Unique venues: {df_clean['venue_location_h3_index'].nunique():,}")
    print(f"  Unique customers: {df_clean['customer_location_h3_index'].nunique():,}")
    
    total_removed = initial_rows - len(df_clean)
    print(f"\nTotal rows removed: {total_removed:,}")
    print(f"Remaining rows: {len(df_clean):,}")
    
    log = {
        'h3_nulls': null_removed,
        'h3_length_invalid': length_invalid,
        'h3_empty': empty_removed
    }
    
    return df_clean, log

## 8. Missing Values Handling

Handle missing values with explicit strategies for each column.

In [None]:
def handle_missing_values(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, Any]]:
    """Handle missing values with explicit strategies."""
    print("\n" + "="*80)
    print("STEP 7: MISSING VALUES HANDLING")
    print("="*80)
    
    initial_rows = len(df)
    print(f"\nStarting rows: {initial_rows:,}")
    
    missing_counts = df.isna().sum()
    missing_pct = (missing_counts / len(df) * 100).round(2)
    missing_summary = pd.DataFrame({
        'Missing Count': missing_counts,
        'Missing %': missing_pct
    })
    missing_summary = missing_summary[missing_summary['Missing Count'] > 0].sort_values('Missing %', ascending=False)
    
    print("\nMissing value summary:")
    if len(missing_summary) > 0:
        print(missing_summary)
    else:
        print("No missing values found!")
    
    df_clean = df.copy()
    imputation_log = {}
    
    if 'precipitation' in missing_summary.index:
        precip_missing = df_clean['precipitation'].isna().sum()
        df_clean['precipitation'] = df_clean['precipitation'].fillna(0)
        print(f"\nFilled {precip_missing:,} missing precipitation values with 0")
        imputation_log['precipitation_filled'] = precip_missing
    
    cols_to_drop = []
    for col in missing_summary.index:
        if col != 'precipitation' and missing_summary.loc[col, 'Missing %'] < 1.0:
            if col != 'actual_delivery_time_minutes':
                cols_to_drop.append(col)
    
    if cols_to_drop:
        before_drop = len(df_clean)
        df_clean = df_clean.dropna(subset=cols_to_drop).copy()
        rows_dropped = before_drop - len(df_clean)
        print(f"\nDropped {rows_dropped:,} rows with missing values in: {cols_to_drop}")
        imputation_log['rows_dropped_missing'] = rows_dropped
    
    if df_clean['actual_delivery_time_minutes'].isna().sum() > 0:
        target_missing = df_clean['actual_delivery_time_minutes'].isna().sum()
        df_clean = df_clean.dropna(subset=['actual_delivery_time_minutes']).copy()
        print(f"\nDropped {target_missing:,} rows with missing target variable")
        imputation_log['target_missing_dropped'] = target_missing
    
    final_missing = df_clean.isna().sum().sum()
    print(f"\nFinal missing values: {final_missing}")
    
    if final_missing > 0:
        print("\nWARNING: Some missing values remain:")
        print(df_clean.isna().sum()[df_clean.isna().sum() > 0])
    else:
        print("No missing values remain")
    
    total_removed = initial_rows - len(df_clean)
    print(f"\nTotal rows removed: {total_removed:,}")
    print(f"Remaining rows: {len(df_clean):,}")
    
    return df_clean, imputation_log

## 9. Time-Based Split

Create train/validation/test splits based on time ordering to prevent data leakage.

In [None]:
def create_time_based_split(
    df: pd.DataFrame, 
    train_pct: float = 0.70, 
    val_pct: float = 0.15,
    test_pct: float = 0.15
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Create time-based train/validation/test split to prevent data leakage."""
    print("\n" + "="*80)
    print("STEP 8: TIME-BASED SPLIT")
    print("="*80)
    
    assert abs(train_pct + val_pct + test_pct - 1.0) < 1e-6
    
    df_sorted = df.sort_values('order_placed_at_utc').reset_index(drop=True)
    print(f"\nSorted {len(df_sorted):,} rows by order_placed_at_utc")
    
    n = len(df_sorted)
    train_end = int(n * train_pct)
    val_end = int(n * (train_pct + val_pct))
    
    train_df = df_sorted.iloc[:train_end].copy()
    val_df = df_sorted.iloc[train_end:val_end].copy()
    test_df = df_sorted.iloc[val_end:].copy()
    
    print(f"\nSplit sizes:")
    print(f"  Train: {len(train_df):,} rows ({len(train_df)/n*100:.1f}%)")
    print(f"  Val:   {len(val_df):,} rows ({len(val_df)/n*100:.1f}%)")
    print(f"  Test:  {len(test_df):,} rows ({len(test_df)/n*100:.1f}%)")
    
    print(f"\nDate ranges:")
    print(f"  Train: {train_df['order_placed_at_utc'].min()} to {train_df['order_placed_at_utc'].max()}")
    print(f"  Val:   {val_df['order_placed_at_utc'].min()} to {val_df['order_placed_at_utc'].max()}")
    print(f"  Test:  {test_df['order_placed_at_utc'].min()} to {test_df['order_placed_at_utc'].max()}")
    
    assert train_df['order_placed_at_utc'].max() <= val_df['order_placed_at_utc'].min()
    assert val_df['order_placed_at_utc'].max() <= test_df['order_placed_at_utc'].min()
    print("\nVerified no temporal overlap between splits")
    
    print("\nTarget distribution across splits:")
    print(f"  Train - Mean: {train_df['actual_delivery_time_minutes'].mean():.2f}, "
          f"Std: {train_df['actual_delivery_time_minutes'].std():.2f}")
    print(f"  Val   - Mean: {val_df['actual_delivery_time_minutes'].mean():.2f}, "
          f"Std: {val_df['actual_delivery_time_minutes'].std():.2f}")
    print(f"  Test  - Mean: {test_df['actual_delivery_time_minutes'].mean():.2f}, "
          f"Std: {test_df['actual_delivery_time_minutes'].std():.2f}")
    
    return train_df, val_df, test_df

## 10. Main Pipeline Function

Orchestrate the entire cleaning pipeline.

In [None]:
def run_cleaning_pipeline(
    filepath: str,
    output_dir: str = '../outputs/'
) -> Dict[str, Any]:
    """Execute the complete data cleaning pipeline."""
    print("\n" + "#"*80)
    print("#" + " "*78 + "#")
    print("#" + " "*20 + "DATA CLEANING PIPELINE" + " "*36 + "#")
    print("#" + " "*78 + "#")
    print("#"*80)
    
    import os
    os.makedirs(output_dir, exist_ok=True)
    
    pipeline_log = {
        'input_file': filepath,
        'pipeline_start': datetime.now(timezone.utc).isoformat()
    }
    
    df = load_and_enforce_types(filepath)
    pipeline_log['initial_rows'] = len(df)
    pipeline_log['date_range_start'] = df['order_placed_at_utc'].min().isoformat()
    pipeline_log['date_range_end'] = df['order_placed_at_utc'].max().isoformat()
    
    df, dup_log = remove_duplicates(df)
    pipeline_log.update(dup_log)
    
    df, target_log = clean_target(df)
    pipeline_log.update(target_log)
    
    df, eta_log = validate_and_engineer_eta(df)
    pipeline_log.update(eta_log)
    
    df, feature_log = validate_features(df)
    pipeline_log.update(feature_log)
    
    df, h3_log = validate_h3_indexes(df)
    pipeline_log.update(h3_log)
    
    df, missing_log = handle_missing_values(df)
    pipeline_log.update(missing_log)
    
    pipeline_log['final_rows'] = len(df)
    pipeline_log['total_rows_removed'] = pipeline_log['initial_rows'] - pipeline_log['final_rows']
    pipeline_log['retention_rate'] = f"{len(df) / pipeline_log['initial_rows'] * 100:.2f}%"
    
    train_df, val_df, test_df = create_time_based_split(df)
    pipeline_log['train_rows'] = len(train_df)
    pipeline_log['val_rows'] = len(val_df)
    pipeline_log['test_rows'] = len(test_df)
    
    print("\n" + "="*80)
    print("SAVING CLEANED DATASETS")
    print("="*80)
    
    full_path = os.path.join(output_dir, 'cleaned_full.csv')
    train_path = os.path.join(output_dir, 'train.csv')
    val_path = os.path.join(output_dir, 'val.csv')
    test_path = os.path.join(output_dir, 'test.csv')
    
    df.to_csv(full_path, index=False)
    train_df.to_csv(train_path, index=False)
    val_df.to_csv(val_path, index=False)
    test_df.to_csv(test_path, index=False)
    
    print(f"\nSaved cleaned_full.csv ({len(df):,} rows)")
    print(f"Saved train.csv ({len(train_df):,} rows)")
    print(f"Saved val.csv ({len(val_df):,} rows)")
    print(f"Saved test.csv ({len(test_df):,} rows)")
    print(f"\nAll files saved to: {output_dir}")
    
    pipeline_log['pipeline_end'] = datetime.now(timezone.utc).isoformat()
    
    return {
        'cleaned_df': df,
        'train_df': train_df,
        'val_df': val_df,
        'test_df': test_df,
        'log': pipeline_log
    }

## 11. Execute Pipeline

Run the complete data cleaning pipeline.

In [44]:
# Execute the pipeline
results = run_cleaning_pipeline(
    filepath='../data/orders_spring_2022.csv',
    output_dir='../outputs/'
)


################################################################################
#                                                                              #
#                    DATA CLEANING PIPELINE                                    #
#                                                                              #
################################################################################

STEP 1: DATA LOADING AND TYPE ENFORCEMENT

Loaded 24,942 rows from ../data/orders_spring_2022.csv
✓ Localized timestamps to UTC

✓ Type enforcement complete

Data types:
order_placed_at_utc                      datetime64[ns, UTC]
order_category                                      category
item_count                                             Int64
actual_delivery_time_minutes                         float64
estimated_delivery_time_lower_minutes                float64
estimated_delivery_time_upper_minutes                float64
venue_location_h3_index                               obj

## 12. Pipeline Summary Report

Display comprehensive summary of the cleaning process.

In [None]:
def print_pipeline_summary(log: Dict[str, Any]):
    """Print a comprehensive summary of the cleaning pipeline."""
    print("\n" + "#"*80)
    print("#" + " "*78 + "#")
    print("#" + " "*25 + "PIPELINE SUMMARY" + " "*37 + "#")
    print("#" + " "*78 + "#")
    print("#"*80)
    
    print("\n" + "="*80)
    print("DATASET INFORMATION")
    print("="*80)
    print(f"Input file: {log['input_file']}")
    print(f"Date range: {log['date_range_start'][:10]} to {log['date_range_end'][:10]}")
    print(f"Initial rows: {log['initial_rows']:,}")
    print(f"Final rows: {log['final_rows']:,}")
    print(f"Rows removed: {log['total_rows_removed']:,}")
    print(f"Retention rate: {log['retention_rate']}")
    
    print("\n" + "="*80)
    print("ROWS REMOVED PER RULE")
    print("="*80)
    
    removal_reasons = [
        ('Exact duplicates', 'exact_duplicates'),
        ('Potential duplicate orders', 'potential_duplicates'),
        ('Target non-positive', 'target_non_positive'),
        ('Target unrealistic (>180 min)', 'target_unrealistic'),
        ('ETA lower invalid', 'eta_lower_invalid'),
        ('ETA upper invalid', 'eta_upper_invalid'),
        ('ETA inconsistent (lower>upper)', 'eta_inconsistent'),
        ('Courier supply invalid', 'courier_supply_invalid'),
        ('Precipitation negative', 'precipitation_negative'),
        ('Precipitation excessive', 'precipitation_excessive'),
        ('Item count invalid', 'item_count_invalid'),
        ('H3 nulls', 'h3_nulls'),
        ('H3 length invalid', 'h3_length_invalid'),
        ('H3 empty strings', 'h3_empty'),
    ]
    
    total_logged = 0
    for label, key in removal_reasons:
        if key in log:
            count = log[key]
            if count > 0:
                print(f"  {label:.<60} {count:>8,}")
                total_logged += count
    
    if 'precipitation_filled' in log:
        print(f"\n  {'Precipitation values filled with 0':.<60} {log['precipitation_filled']:>8,}")
    if 'rows_dropped_missing' in log:
        print(f"  {'Rows dropped for other missing values':.<60} {log['rows_dropped_missing']:>8,}")
        total_logged += log['rows_dropped_missing']
    if 'target_missing_dropped' in log:
        print(f"  {'Rows dropped for missing target':.<60} {log['target_missing_dropped']:>8,}")
        total_logged += log['target_missing_dropped']
    
    print(f"\n  {'TOTAL':.<60} {total_logged:>8,}")
    
    print("\n" + "="*80)
    print("TRAIN/VAL/TEST SPLIT")
    print("="*80)
    print(f"  Training set:   {log['train_rows']:>8,} rows ({log['train_rows']/log['final_rows']*100:>5.1f}%)")
    print(f"  Validation set: {log['val_rows']:>8,} rows ({log['val_rows']/log['final_rows']*100:>5.1f}%)")
    print(f"  Test set:       {log['test_rows']:>8,} rows ({log['test_rows']/log['final_rows']*100:>5.1f}%)")
    
    print("\n" + "="*80)
    print("KEY FEATURES ENGINEERED")
    print("="*80)
    print("  - estimate_width: Upper bound - Lower bound")
    print("  - estimate_midpoint: (Upper bound + Lower bound) / 2")
    
    if 'zero_width_count' in log:
        print(f"\n  Rows with zero estimate width: {log['zero_width_count']:,}")
    if 'large_width_count' in log:
        print(f"  Rows with large estimate width (>180 min): {log['large_width_count']:,}")
    
    print("\n" + "="*80)
    print("PIPELINE STATUS")
    print("="*80)
    print("- Data loading and type enforcement")
    print("- Structural validation (duplicates)")
    print("- Target variable cleaning")
    print("- ETA consistency validation")
    print("- Feature sanity checks")
    print("- H3 geospatial index validation")
    print("- Missing value handling")
    print("- Time-based train/val/test split")
    print("\nPIPELINE COMPLETED SUCCESSFULLY")
    
    print("\n" + "#"*80 + "\n")

print_pipeline_summary(results['log'])


################################################################################
#                                                                              #
#                         PIPELINE SUMMARY                                     #
#                                                                              #
################################################################################

DATASET INFORMATION
Input file: ../data/orders_spring_2022.csv
Date range: 2022-02-01 to 2022-04-29
Initial rows: 24,942
Final rows: 23,865
Rows removed: 1,077
Retention rate: 95.68%

ROWS REMOVED PER RULE
  Target unrealistic (>180 min)...............................       26
  ETA lower invalid...........................................       36

  TOTAL.......................................................       62

TRAIN/VAL/TEST SPLIT
  Training set:     16,705 rows ( 70.0%)
  Validation set:    3,580 rows ( 15.0%)
  Test set:          3,580 rows ( 15.0%)

KEY FEATURES ENGINEERED


## 13. Final Data Quality Checks

Verify the cleaned dataset meets all quality criteria.

In [None]:
cleaned_df = results['cleaned_df']
train_df = results['train_df']
val_df = results['val_df']
test_df = results['test_df']

print("Final Quality Checks")
print("="*80)

missing_count = cleaned_df.isna().sum().sum()
print(f"\n1. Missing values: {missing_count} {'PASS' if missing_count == 0 else 'FAIL'}")

target_valid = (
    (cleaned_df['actual_delivery_time_minutes'] > 0).all() and
    (cleaned_df['actual_delivery_time_minutes'] <= 180).all()
)
print(f"2. Target in valid range (0, 180]: {'PASS' if target_valid else 'FAIL'}")

eta_valid = (
    (cleaned_df['estimated_delivery_time_lower_minutes'] > 0).all() and
    (cleaned_df['estimated_delivery_time_upper_minutes'] > 0).all() and
    (cleaned_df['estimated_delivery_time_lower_minutes'] <= 
     cleaned_df['estimated_delivery_time_upper_minutes']).all()
)
print(f"3. ETA estimates valid and consistent: {'PASS' if eta_valid else 'FAIL'}")

features_valid = (
    (cleaned_df['courier_supply_index'] >= 0).all() and
    (cleaned_df['precipitation'] >= 0).all() and
    (cleaned_df['precipitation'] <= 100).all() and
    (cleaned_df['item_count'] > 0).all()
)
print(f"4. All features in valid ranges: {'PASS' if features_valid else 'FAIL'}")

h3_valid = (
    cleaned_df['venue_location_h3_index'].notna().all() and
    cleaned_df['customer_location_h3_index'].notna().all() and
    (cleaned_df['venue_location_h3_index'].str.len().nunique() == 1) and
    (cleaned_df['customer_location_h3_index'].str.len().nunique() == 1)
)
print(f"5. H3 indexes valid and consistent: {'PASS' if h3_valid else 'FAIL'}")

no_duplicates = not cleaned_df.duplicated().any()
print(f"6. No duplicate rows: {'PASS' if no_duplicates else 'FAIL'}")

time_ordered = (
    train_df['order_placed_at_utc'].max() <= val_df['order_placed_at_utc'].min() and
    val_df['order_placed_at_utc'].max() <= test_df['order_placed_at_utc'].min()
)
print(f"7. Splits properly time-ordered: {'PASS' if time_ordered else 'FAIL'}")

total_split = len(train_df) + len(val_df) + len(test_df)
splits_match = total_split == len(cleaned_df)
print(f"8. Split sizes sum to total: {'PASS' if splits_match else 'FAIL'}")

print("\n" + "="*80)
if all([missing_count == 0, target_valid, eta_valid, features_valid, 
        h3_valid, no_duplicates, time_ordered, splits_match]):
    print("\nALL QUALITY CHECKS PASSED")
    print("\nDataset is ready for model training!")
else:
    print("\nWARNING: SOME QUALITY CHECKS FAILED - REVIEW REQUIRED")

print("\n" + "="*80)

Final Quality Checks

1. Missing values: 0 ✓
2. Target in valid range (0, 180]: ✓
3. ETA estimates valid and consistent: ✓
4. All features in valid ranges: ✓
5. H3 indexes valid and consistent: ✓
6. No duplicate rows: ✓
7. Splits properly time-ordered: ✓
8. Split sizes sum to total: ✓


✓✓✓ ALL QUALITY CHECKS PASSED ✓✓✓

Dataset is ready for model training!



## 14. Data Overview and Statistics

Display final dataset characteristics.

In [47]:
print("\nCleaned Dataset Overview")
print("="*80)

print("\nDataset Shape:")
print(f"  Rows: {len(cleaned_df):,}")
print(f"  Columns: {len(cleaned_df.columns)}")

print("\nColumn Names:")
print(f"  {', '.join(cleaned_df.columns)}")

print("\nData Types:")
print(cleaned_df.dtypes)

print("\nNumerical Features Summary:")
print(cleaned_df.describe())

print("\nCategorical Feature:")
print(f"  order_category value counts:")
print(cleaned_df['order_category'].value_counts())

print("\nMemory Usage:")
memory_mb = cleaned_df.memory_usage(deep=True).sum() / 1024 / 1024
print(f"  {memory_mb:.2f} MB")


Cleaned Dataset Overview

Dataset Shape:
  Rows: 23,865
  Columns: 12

Column Names:
  order_placed_at_utc, order_category, item_count, actual_delivery_time_minutes, estimated_delivery_time_lower_minutes, estimated_delivery_time_upper_minutes, venue_location_h3_index, customer_location_h3_index, courier_supply_index, precipitation, estimate_width, estimate_midpoint

Data Types:
order_placed_at_utc                      datetime64[ns, UTC]
order_category                                      category
item_count                                             Int64
actual_delivery_time_minutes                         float64
estimated_delivery_time_lower_minutes                float64
estimated_delivery_time_upper_minutes                float64
venue_location_h3_index                               object
customer_location_h3_index                            object
courier_supply_index                                 float64
precipitation                                        float64
estimate

## 14.5. Feature Engineering

Create additional predictive features to improve model performance:
1. **H3 Distance**: Calculate geographic distance between venue and customer locations
2. **Venue Friction Score**: Identify venues with historically longer delivery times using target encoding

### 14.5.1 H3 Distance Feature

Calculate geographic distance between venue and customer using H3 hexagonal indexes.
Distance is converted from H3 grid distance to approximate kilometers.

In [None]:
def calculate_h3_distance_features(df: pd.DataFrame) -> pd.DataFrame:
    """Calculate distance features from H3 geospatial indexes."""
    try:
        import h3
        
        print("Calculating H3 distance features...")
        
        df['h3_grid_distance'] = df.apply(
            lambda row: h3.grid_distance(
                row['venue_location_h3_index'],
                row['customer_location_h3_index']
            ),
            axis=1
        )
        
        df['h3_distance_km'] = df['h3_grid_distance'] * 0.5
        
        print(f"H3 distance features created")
        print(f"  - h3_grid_distance (hexagon units)")
        print(f"  - h3_distance_km (approximate delivery distance)")
        print(f"\nDistance Statistics:")
        print(f"  Mean: {df['h3_distance_km'].mean():.2f} km")
        print(f"  Median: {df['h3_distance_km'].median():.2f} km")
        print(f"  Max: {df['h3_distance_km'].max():.2f} km")
        print(f"  Same location (0 km): {(df['h3_distance_km'] == 0).sum():,} orders")
        
        return df
        
    except ImportError:
        print("WARNING: h3 library not available. Install with: pip install h3")
        print("Skipping H3 distance features")
        df['h3_grid_distance'] = 0
        df['h3_distance_km'] = 0.0
        return df
    except Exception as e:
        print(f"WARNING: Error calculating H3 distances: {e}")
        df['h3_grid_distance'] = 0
        df['h3_distance_km'] = 0.0
        return df

print("\n" + "="*80)
print("FEATURE ENGINEERING: H3 DISTANCE")
print("="*80)

train_df = calculate_h3_distance_features(train_df)
print(f"\nApplied to training set ({len(train_df):,} rows)")

val_df = calculate_h3_distance_features(val_df)
print(f"Applied to validation set ({len(val_df):,} rows)")

test_df = calculate_h3_distance_features(test_df)
print(f"Applied to test set ({len(test_df):,} rows)")

cleaned_df = calculate_h3_distance_features(cleaned_df)
print(f"Applied to full cleaned dataset ({len(cleaned_df):,} rows)")


FEATURE ENGINEERING: H3 DISTANCE
Calculating H3 distance features...
✓ H3 distance features created
  - h3_grid_distance (hexagon units)
  - h3_distance_km (approximate delivery distance)

Distance Statistics:
  Mean: 0.91 km
  Median: 0.50 km
  Max: 7.00 km
  Same location (0 km): 2,905 orders

✓ Applied to training set (16,705 rows)
Calculating H3 distance features...
✓ H3 distance features created
  - h3_grid_distance (hexagon units)
  - h3_distance_km (approximate delivery distance)

Distance Statistics:
  Mean: 0.94 km
  Median: 0.50 km
  Max: 6.00 km
  Same location (0 km): 625 orders
✓ Applied to validation set (3,580 rows)
Calculating H3 distance features...
✓ H3 distance features created
  - h3_grid_distance (hexagon units)
  - h3_distance_km (approximate delivery distance)

Distance Statistics:
  Mean: 0.94 km
  Median: 0.50 km
  Max: 5.50 km
  Same location (0 km): 612 orders
✓ Applied to test set (3,580 rows)
Calculating H3 distance features...
✓ H3 distance features creat

### 14.5.2 Venue Friction Score (Target Encoding)

Calculate venue-specific delay patterns using historical data to capture systematic delays from certain venues.

**Methodology:**
1. Calculate error = actual_delivery_time - estimated_delivery_time_upper
2. Identify delayed orders (error > 0)
3. For each venue, calculate:
   - **Mean Extra Time**: Average delay in minutes when delayed
   - **Delay Rate**: Percentage of orders that were delayed
4. **Friction Score** = Delay Rate × Mean Extra Time

This captures venues that are consistently problematic (high delay rate AND high delay magnitude).

**Important:** Use training data only to avoid data leakage, then apply to val/test sets.

In [None]:
def calculate_venue_friction_score(train_df: pd.DataFrame) -> Dict[str, float]:
    """Calculate venue friction scores from training data using target encoding."""
    print("Calculating venue friction scores from training data...")
    
    train_df['delivery_error'] = (
        train_df['actual_delivery_time_minutes'] - 
        train_df['estimated_delivery_time_upper_minutes']
    )
    
    train_df['is_delayed'] = (train_df['delivery_error'] > 0).astype(int)
    
    venue_stats = train_df.groupby('venue_location_h3_index').agg({
        'delivery_error': ['mean', 'count'],
        'is_delayed': 'mean'
    })
    
    venue_stats.columns = ['mean_extra_time', 'order_count', 'delay_rate']
    
    min_orders = 10
    venue_stats = venue_stats[venue_stats['order_count'] >= min_orders]
    
    delayed_only = train_df[train_df['is_delayed'] == 1].groupby('venue_location_h3_index')['delivery_error'].mean()
    venue_stats['mean_delay_when_late'] = delayed_only
    venue_stats['mean_delay_when_late'] = venue_stats['mean_delay_when_late'].fillna(0)
    
    venue_stats['friction_score'] = (
        venue_stats['delay_rate'] * venue_stats['mean_delay_when_late']
    )
    
    venue_stats['overall_mean_error'] = venue_stats['mean_extra_time']
    
    print(f"Calculated friction scores for {len(venue_stats):,} venues")
    print(f"  (Venues with at least {min_orders} orders)")
    
    print(f"\nFriction Score Statistics:")
    print(f"  Mean: {venue_stats['friction_score'].mean():.3f}")
    print(f"  Median: {venue_stats['friction_score'].median():.3f}")
    print(f"  75th percentile: {venue_stats['friction_score'].quantile(0.75):.3f}")
    print(f"  90th percentile: {venue_stats['friction_score'].quantile(0.90):.3f}")
    print(f"  95th percentile: {venue_stats['friction_score'].quantile(0.95):.3f}")
    print(f"  Max: {venue_stats['friction_score'].max():.3f}")
    
    print(f"\nTop 10 Venues with Highest Friction Scores:")
    top_venues = venue_stats.nlargest(10, 'friction_score')[
        ['delay_rate', 'mean_delay_when_late', 'friction_score', 'order_count']
    ]
    top_venues['delay_rate_pct'] = (top_venues['delay_rate'] * 100).round(1)
    print(top_venues[['delay_rate_pct', 'mean_delay_when_late', 'friction_score', 'order_count']].to_string())
    
    return venue_stats['friction_score'].to_dict()


def apply_venue_friction_score(df: pd.DataFrame, friction_mapping: Dict[str, float]) -> pd.DataFrame:
    """Apply pre-calculated venue friction scores to a dataframe."""
    df['venue_friction_score'] = df['venue_location_h3_index'].map(friction_mapping).fillna(0)
    return df


print("\n" + "="*80)
print("FEATURE ENGINEERING: VENUE FRICTION SCORE")
print("="*80)

venue_friction_mapping = calculate_venue_friction_score(train_df.copy())

print(f"\nApplying venue friction scores...")
train_df = apply_venue_friction_score(train_df, venue_friction_mapping)
print(f"Training set: {len(train_df):,} rows")
print(f"  - Known venues: {(train_df['venue_friction_score'] != 0).sum():,}")
print(f"  - Unknown venues (score=0): {(train_df['venue_friction_score'] == 0).sum():,}")

val_df = apply_venue_friction_score(val_df, venue_friction_mapping)
print(f"Validation set: {len(val_df):,} rows")
print(f"  - Known venues: {(val_df['venue_friction_score'] != 0).sum():,}")
print(f"  - Unknown venues (score=0): {(val_df['venue_friction_score'] == 0).sum():,}")

test_df = apply_venue_friction_score(test_df, venue_friction_mapping)
print(f"Test set: {len(test_df):,} rows")
print(f"  - Known venues: {(test_df['venue_friction_score'] != 0).sum():,}")
print(f"  - Unknown venues (score=0): {(test_df['venue_friction_score'] == 0).sum():,}")

cleaned_df = apply_venue_friction_score(cleaned_df, venue_friction_mapping)
print(f"Full cleaned dataset: {len(cleaned_df):,} rows")

if 'delivery_error' in train_df.columns:
    train_df = train_df.drop(columns=['delivery_error', 'is_delayed'])

print("\nVenue friction score feature engineering complete!")


FEATURE ENGINEERING: VENUE FRICTION SCORE
Calculating venue friction scores from training data...
✓ Calculated friction scores for 96 venues
  (Venues with at least 10 orders)

Friction Score Statistics:
  Mean: 4.189
  Median: 3.791
  75th percentile: 4.799
  90th percentile: 6.776
  95th percentile: 8.060
  Max: 9.490

Top 10 Venues with Highest Friction Scores:
                         delay_rate_pct  mean_delay_when_late  friction_score  order_count
venue_location_h3_index                                                                   
881126d221fffff                    51.9                 18.28            9.49          131
881126d229fffff                    38.7                 23.80            9.21           31
881126d0cdfffff                    38.9                 22.49            8.75           18
881126d18bfffff                    40.0                 20.96            8.38           70
881126d233fffff                    35.7                 22.73            8.12         

### 14.5.4 Save Updated Datasets with New Features

Re-save train/val/test splits with all engineered features included.

### 14.5.3 Distance × Weather Interaction Feature

Create interaction feature between distance and precipitation based on EDA findings showing strong correlation improvement.

**Rationale from EDA:**
- Distance affects delivery time directly
- Weather (precipitation) slows down couriers
- The combined effect is non-additive: rain has a stronger impact on longer distances
- This interaction captures how weather conditions amplify distance-based delays

In [None]:
def create_distance_weather_interaction(df: pd.DataFrame) -> pd.DataFrame:
    """Create interaction feature between distance and weather."""
    df['distance_x_weather'] = df['h3_distance_km'] * df['precipitation']
    return df

print("\n" + "="*80)
print("FEATURE ENGINEERING: DISTANCE × WEATHER INTERACTION")
print("="*80)

print("\nCreating distance × weather interaction feature...")
print("Formula: distance_x_weather = h3_distance_km × precipitation")

train_df = create_distance_weather_interaction(train_df)
print(f"\nApplied to training set")
print(f"  - Non-zero interactions: {(train_df['distance_x_weather'] > 0).sum():,} ({(train_df['distance_x_weather'] > 0).sum()/len(train_df)*100:.1f}%)")
print(f"  - Mean: {train_df['distance_x_weather'].mean():.3f}")
print(f"  - Median: {train_df['distance_x_weather'].median():.3f}")
print(f"  - Max: {train_df['distance_x_weather'].max():.3f}")

val_df = create_distance_weather_interaction(val_df)
print(f"Applied to validation set")
print(f"  - Non-zero interactions: {(val_df['distance_x_weather'] > 0).sum():,} ({(val_df['distance_x_weather'] > 0).sum()/len(val_df)*100:.1f}%)")

test_df = create_distance_weather_interaction(test_df)
print(f"Applied to test set")
print(f"  - Non-zero interactions: {(test_df['distance_x_weather'] > 0).sum():,} ({(test_df['distance_x_weather'] > 0).sum()/len(test_df)*100:.1f}%)")

cleaned_df = create_distance_weather_interaction(cleaned_df)
print(f"Applied to full cleaned dataset")

print("\nDistance × weather interaction feature created!")
print("\nInterpretation:")
print("  - High values: Long distance + rainy weather -> expect significant delays")
print("  - Zero values: Either no distance or no precipitation")


FEATURE ENGINEERING: DISTANCE × WEATHER INTERACTION

Creating distance × weather interaction feature...
Formula: distance_x_weather = h3_distance_km × precipitation

✓ Applied to training set
  - Non-zero interactions: 2,116 (12.7%)
  - Mean: 0.097
  - Median: 0.000
  - Max: 9.600
✓ Applied to validation set
  - Non-zero interactions: 529 (14.8%)
✓ Applied to test set
  - Non-zero interactions: 273 (7.6%)
✓ Applied to full cleaned dataset

✓ Distance × weather interaction feature created!

Interpretation:
  • High values: Long distance + rainy weather → expect significant delays
  • Zero values: Either no distance or no precipitation
  • This feature helps model capture weather's amplified effect on long routes


In [None]:
print("\n" + "="*80)
print("SAVING DATASETS WITH ENGINEERED FEATURES")
print("="*80)

output_dir = '../outputs/'

cleaned_path = output_dir + 'cleaned_full.csv'
cleaned_df.to_csv(cleaned_path, index=False)
print(f"Saved: {cleaned_path}")
print(f"  Shape: {cleaned_df.shape}")
print(f"  Columns: {len(cleaned_df.columns)}")

train_path = output_dir + 'train.csv'
train_df.to_csv(train_path, index=False)
print(f"Saved: {train_path}")
print(f"  Shape: {train_df.shape}")

val_path = output_dir + 'val.csv'
val_df.to_csv(val_path, index=False)
print(f"Saved: {val_path}")
print(f"  Shape: {val_df.shape}")

test_path = output_dir + 'test.csv'
test_df.to_csv(test_path, index=False)
print(f"Saved: {test_path}")
print(f"  Shape: {test_df.shape}")

print(f"\nAll datasets updated with new features!")
print(f"\nNew columns added:")
print(f"  1. h3_grid_distance")
print(f"  2. h3_distance_km")
print(f"  3. venue_friction_score")
print(f"  4. distance_x_weather")


SAVING DATASETS WITH ENGINEERED FEATURES
✓ Saved: ../outputs/cleaned_full.csv
  Shape: (23865, 16)
  Columns: 16
✓ Saved: ../outputs/train.csv
  Shape: (16705, 16)
✓ Saved: ../outputs/val.csv
  Shape: (3580, 16)
✓ Saved: ../outputs/test.csv
  Shape: (3580, 16)

✓ All datasets updated with new features!

New columns added:
  1. h3_grid_distance
  2. h3_distance_km
  3. venue_friction_score
  4. distance_x_weather
