# SmartPave Analytics: Feature Engineering

## Overview
This notebook creates advanced features for machine learning models to predict pavement degradation and optimize maintenance costs.

## Objectives
- Create time-based features
- Engineer traffic impact features
- Develop weather impact metrics
- Create maintenance history features
- Prepare data for ML modeling


In [None]:
# Load data from Snowflake and create features
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Connect to Snowflake
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# Set database and schema context
session.sql("USE DATABASE DOT_workshop_test").collect()
session.sql("USE SCHEMA smartpave_analytics").collect()

# Load pavement condition data
df = session.sql("SELECT * FROM DOT_workshop_test.smartpave_analytics.pavement_condition").to_pandas()

# Detect the actual date column name (Snowflake uses uppercase)
date_col = None
for col in df.columns:
    if col.upper() == 'DATE':
        date_col = col
        break

if date_col:
    df[date_col] = pd.to_datetime(df[date_col])
    print(f"Loaded dataset: {len(df):,} records")
    print(f"Date range: {df[date_col].min()} to {df[date_col].max()}")
else:
    print("⚠️ No date column found")

# Detect segment ID column
segment_id_col = None
for col in df.columns:
    if col.upper() == 'SEGMENT_ID':
        segment_id_col = col
        break

if segment_id_col:
    print(f"Unique segments: {df[segment_id_col].nunique():,}")
else:
    print("⚠️ No segment ID column found")

# Load maintenance data for feature engineering
maintenance_df = session.sql("SELECT * FROM DOT_workshop_test.smartpave_analytics.maintenance_records").to_pandas()
if 'DATE' in maintenance_df.columns:
    maintenance_df['DATE'] = pd.to_datetime(maintenance_df['DATE'])


In [None]:
# Set database and schema context
session.sql("USE DATABASE DOT_workshop_test").collect()
session.sql("USE SCHEMA smartpave_analytics").collect()

# Load data from Snowflake tables
print("Loading data from Snowflake...")

# Load pavement condition data
condition_df = session.sql("SELECT * FROM DOT_workshop_test.smartpave_analytics.pavement_condition").to_pandas()
print(f"Condition data: {len(condition_df):,} records")

# Load road network data
roads_df = session.sql("SELECT * FROM DOT_workshop_test.smartpave_analytics.road_network").to_pandas()
print(f"Road network data: {len(roads_df):,} records")

# Load maintenance records
maintenance_df = session.sql("SELECT * FROM DOT_workshop_test.smartpave_analytics.maintenance_records").to_pandas()
print(f"Maintenance data: {len(maintenance_df):,} records")

# Load traffic data
traffic_df = session.sql("SELECT * FROM DOT_workshop_test.smartpave_analytics.traffic_data").to_pandas()
print(f"Traffic data: {len(traffic_df):,} records")

print("\nData loaded successfully!")


In [None]:
# Data preprocessing and column detection
print("Preprocessing data and detecting columns...")

# Detect column names (Snowflake stores in uppercase)
date_col = None
condition_score_col = None
segment_id_col = None
road_type_col = None

for col in condition_df.columns:
    if col.upper() == 'DATE':
        date_col = col
    elif col.upper() == 'CONDITION_SCORE':
        condition_score_col = col
    elif col.upper() == 'SEGMENT_ID':
        segment_id_col = col

for col in roads_df.columns:
    if col.upper() == 'ROAD_TYPE':
        road_type_col = col
    elif col.upper() == 'SEGMENT_ID':
        segment_id_col = col

print(f"Detected columns:")
print(f"  Date: {date_col}")
print(f"  Condition Score: {condition_score_col}")
print(f"  Segment ID: {segment_id_col}")
print(f"  Road Type: {road_type_col}")

# Convert date columns
if date_col:
    condition_df[date_col] = pd.to_datetime(condition_df[date_col])
    print(f"Date range: {condition_df[date_col].min()} to {condition_df[date_col].max()}")

if 'DATE' in maintenance_df.columns:
    maintenance_df['DATE'] = pd.to_datetime(maintenance_df['DATE'])

if 'DATE' in traffic_df.columns:
    traffic_df['DATE'] = pd.to_datetime(traffic_df['DATE'])

print("Data preprocessing complete!")


In [None]:
# Feature Engineering: Time-based Features
print("Creating time-based features...")

# Sort by segment and date
condition_df = condition_df.sort_values([segment_id_col, date_col])

# Time since last inspection
condition_df['days_since_last_inspection'] = condition_df.groupby(segment_id_col)[date_col].diff().dt.days

# Time-based degradation rate
condition_df['condition_change'] = condition_df.groupby(segment_id_col)[condition_score_col].diff()
condition_df['degradation_rate'] = condition_df['condition_change'] / condition_df['days_since_last_inspection'].replace(0, np.nan)

# Seasonal features
condition_df['month'] = condition_df[date_col].dt.month
condition_df['quarter'] = condition_df[date_col].dt.quarter
condition_df['year'] = condition_df[date_col].dt.year
condition_df['day_of_year'] = condition_df[date_col].dt.dayofyear

# Season classification
def get_season(month):
    if month in [12, 1, 2]:
        return 'Winter'
    elif month in [3, 4, 5]:
        return 'Spring'
    elif month in [6, 7, 8]:
        return 'Summer'
    else:
        return 'Fall'

condition_df['season'] = condition_df['month'].apply(get_season)

print("✅ Time-based features created")
print(f"  - Days since last inspection: {condition_df['days_since_last_inspection'].notna().sum():,} records")
print(f"  - Degradation rate: {condition_df['degradation_rate'].notna().sum():,} records")
print(f"  - Seasonal data: {condition_df['season'].value_counts().to_dict()}")


In [None]:
# Feature Engineering: Traffic Impact Features
print("Creating traffic impact features...")

# Merge with traffic data
traffic_cols = ['SEGMENT_ID', 'TRAFFIC_VOLUME', 'PEAK_HOUR_FACTOR', 'TRUCK_PERCENTAGE']
available_traffic_cols = [col for col in traffic_cols if col in traffic_df.columns]

if available_traffic_cols:
    # Merge traffic data with condition data
    condition_with_traffic = condition_df.merge(
        traffic_df[available_traffic_cols], 
        left_on=segment_id_col, 
        right_on='SEGMENT_ID', 
        how='left'
    )
    
    # Traffic stress features
    if 'TRAFFIC_VOLUME' in condition_with_traffic.columns:
        condition_with_traffic['traffic_stress'] = condition_with_traffic['TRAFFIC_VOLUME'] * condition_with_traffic.get('PEAK_HOUR_FACTOR', 1.0)
        condition_with_traffic['heavy_truck_impact'] = condition_with_traffic['TRAFFIC_VOLUME'] * condition_with_traffic.get('TRUCK_PERCENTAGE', 0.0) / 100
        
        # Traffic categories
        condition_with_traffic['traffic_category'] = pd.cut(
            condition_with_traffic['TRAFFIC_VOLUME'], 
            bins=[0, 1000, 5000, 10000, float('inf')], 
            labels=['Low', 'Medium', 'High', 'Very High']
        )
        
        print("✅ Traffic features created")
        print(f"  - Traffic stress: {condition_with_traffic['traffic_stress'].notna().sum():,} records")
        print(f"  - Heavy truck impact: {condition_with_traffic['heavy_truck_impact'].notna().sum():,} records")
        print(f"  - Traffic categories: {condition_with_traffic['traffic_category'].value_counts().to_dict()}")
    else:
        print("⚠️ Traffic volume data not available")
        condition_with_traffic = condition_df.copy()
else:
    print("⚠️ No traffic data available")
    condition_with_traffic = condition_df.copy()


In [None]:
# Feature Engineering: Weather Impact Features
print("Creating weather impact features...")

# Weather damage features (using existing weather columns if available)
weather_cols = ['PRECIPITATION', 'FREEZE_THAW_CYCLES', 'TEMPERATURE_AVG']
available_weather_cols = [col for col in weather_cols if col in condition_with_traffic.columns]

if available_weather_cols:
    # Freeze-thaw damage
    if 'FREEZE_THAW_CYCLES' in condition_with_traffic.columns:
        condition_with_traffic['freeze_thaw_damage'] = condition_with_traffic['FREEZE_THAW_CYCLES'] * 0.1
        
    # Precipitation impact
    if 'PRECIPITATION' in condition_with_traffic.columns:
        condition_with_traffic['precipitation_damage'] = condition_with_traffic['PRECIPITATION'] * 0.05
        
    # Temperature stress
    if 'TEMPERATURE_AVG' in condition_with_traffic.columns:
        condition_with_traffic['temperature_stress'] = abs(condition_with_traffic['TEMPERATURE_AVG'] - 20) * 0.01
        
    # Combined weather damage
    weather_damage_cols = [col for col in ['freeze_thaw_damage', 'precipitation_damage', 'temperature_stress'] 
                          if col in condition_with_traffic.columns]
    
    if weather_damage_cols:
        condition_with_traffic['total_weather_damage'] = condition_with_traffic[weather_damage_cols].sum(axis=1)
        
    print("✅ Weather features created")
    print(f"  - Available weather columns: {available_weather_cols}")
    print(f"  - Weather damage features: {len(weather_damage_cols)} created")
else:
    print("⚠️ No weather data available")
    # Create dummy weather features
    condition_with_traffic['freeze_thaw_damage'] = 0
    condition_with_traffic['precipitation_damage'] = 0
    condition_with_traffic['temperature_stress'] = 0
    condition_with_traffic['total_weather_damage'] = 0


In [None]:
# Feature Engineering: Maintenance History Features
print("Creating maintenance history features...")

# Merge with maintenance data
maintenance_cols = ['SEGMENT_ID', 'DATE', 'COST', 'REPAIR_TYPE']
available_maintenance_cols = [col for col in maintenance_cols if col in maintenance_df.columns]

if available_maintenance_cols:
    # Calculate days since last maintenance
    maintenance_df_sorted = maintenance_df.sort_values(['SEGMENT_ID', 'DATE'])
    maintenance_df_sorted['days_since_last_maintenance'] = maintenance_df_sorted.groupby('SEGMENT_ID')['DATE'].diff().dt.days
    
    # Merge maintenance data
    condition_with_maintenance = condition_with_traffic.merge(
        maintenance_df_sorted[available_maintenance_cols + ['days_since_last_maintenance']], 
        left_on=[segment_id_col, date_col], 
        right_on=['SEGMENT_ID', 'DATE'], 
        how='left'
    )
    
    # Maintenance frequency features
    maintenance_counts = maintenance_df.groupby('SEGMENT_ID').size().reset_index(name='maintenance_frequency')
    condition_with_maintenance = condition_with_maintenance.merge(
        maintenance_counts, 
        left_on=segment_id_col, 
        right_on='SEGMENT_ID', 
        how='left'
    )
    
    # Cost features
    if 'COST' in condition_with_maintenance.columns:
        cost_stats = maintenance_df.groupby('SEGMENT_ID')['COST'].agg(['mean', 'sum', 'count']).reset_index()
        cost_stats.columns = ['SEGMENT_ID', 'avg_maintenance_cost', 'total_maintenance_cost', 'maintenance_count']
        condition_with_maintenance = condition_with_maintenance.merge(cost_stats, on='SEGMENT_ID', how='left')
    
    print("✅ Maintenance features created")
    print(f"  - Days since last maintenance: {condition_with_maintenance['days_since_last_maintenance'].notna().sum():,} records")
    print(f"  - Maintenance frequency: {condition_with_maintenance['maintenance_frequency'].notna().sum():,} records")
else:
    print("⚠️ No maintenance data available")
    condition_with_maintenance = condition_with_traffic.copy()
    # Add dummy maintenance features
    condition_with_maintenance['days_since_last_maintenance'] = np.nan
    condition_with_maintenance['maintenance_frequency'] = 0
    condition_with_maintenance['avg_maintenance_cost'] = 0
    condition_with_maintenance['total_maintenance_cost'] = 0


In [None]:
# Feature Engineering: Geospatial and Road Type Features
print("Creating geospatial and road type features...")

# Debug: Show available columns in road network data
print(f"Available road network columns: {list(roads_df.columns)}")

# Merge with road network data
road_cols = ['SEGMENT_ID', 'ROAD_TYPE', 'SEGMENT_LENGTH_MILES', 'LATITUDE', 'LONGITUDE']
available_road_cols = [col for col in road_cols if col in roads_df.columns]
print(f"Using road columns: {available_road_cols}")

if available_road_cols:
    condition_final = condition_with_maintenance.merge(
        roads_df[available_road_cols], 
        left_on=segment_id_col, 
        right_on='SEGMENT_ID', 
        how='left'
    )
    
    # Road type features
    if 'ROAD_TYPE' in condition_final.columns:
        condition_final['is_highway'] = condition_final['ROAD_TYPE'].str.contains('Highway', case=False, na=False)
        condition_final['is_arterial'] = condition_final['ROAD_TYPE'].str.contains('Arterial', case=False, na=False)
        condition_final['is_local'] = condition_final['ROAD_TYPE'].str.contains('Local', case=False, na=False)
    
    # Length-based features
    if 'SEGMENT_LENGTH_MILES' in condition_final.columns:
        condition_final['length_category'] = pd.cut(
            condition_final['SEGMENT_LENGTH_MILES'], 
            bins=[0, 0.5, 1.0, 2.0, float('inf')], 
            labels=['Short', 'Medium', 'Long', 'Very Long']
        )
    
    # Geographic features (if coordinates available)
    if 'LATITUDE' in condition_final.columns and 'LONGITUDE' in condition_final.columns:
        # Simple geographic zones (you could make this more sophisticated)
        condition_final['latitude_zone'] = pd.cut(condition_final['LATITUDE'], bins=5, labels=['Zone1', 'Zone2', 'Zone3', 'Zone4', 'Zone5'])
        condition_final['longitude_zone'] = pd.cut(condition_final['LONGITUDE'], bins=5, labels=['ZoneA', 'ZoneB', 'ZoneC', 'ZoneD', 'ZoneE'])
    
    print("✅ Geospatial features created")
    
    # Check if road type features were created
    road_type_features = ['is_highway', 'is_arterial', 'is_local']
    created_road_features = [col for col in road_type_features if col in condition_final.columns]
    
    if created_road_features:
        road_type_count = sum(condition_final[col].sum() for col in created_road_features)
        print(f"  - Road type features: {road_type_count:,} records")
    else:
        print("  - Road type features: Not created (ROAD_TYPE column not found)")
    
    # Check if length categories were created
    if 'length_category' in condition_final.columns:
        print(f"  - Length categories: {condition_final['length_category'].value_counts().to_dict()}")
    else:
        print("  - Length categories: Not created (SEGMENT_LENGTH_MILES column not found)")
else:
    print("⚠️ No road network data available")
    condition_final = condition_with_maintenance.copy()


In [None]:
# Feature Engineering: Advanced ML Features
print("Creating advanced ML features...")

# Rolling window features
window_sizes = [30, 90, 180]  # days
for window in window_sizes:
    condition_final[f'condition_avg_{window}d'] = condition_final.groupby(segment_id_col)[condition_score_col].rolling(window=window, min_periods=1).mean().reset_index(0, drop=True)
    condition_final[f'condition_std_{window}d'] = condition_final.groupby(segment_id_col)[condition_score_col].rolling(window=window, min_periods=1).std().reset_index(0, drop=True)

# Lag features
for lag in [1, 2, 3, 6, 12]:  # months
    condition_final[f'condition_lag_{lag}m'] = condition_final.groupby(segment_id_col)[condition_score_col].shift(lag)

# Trend features
condition_final['condition_trend_3m'] = condition_final.groupby(segment_id_col)[condition_score_col].rolling(window=3, min_periods=1).apply(lambda x: np.polyfit(range(len(x)), x, 1)[0] if len(x) > 1 else 0).reset_index(0, drop=True)

# Interaction features
if 'traffic_stress' in condition_final.columns and 'total_weather_damage' in condition_final.columns:
    condition_final['traffic_weather_interaction'] = condition_final['traffic_stress'] * condition_final['total_weather_damage']

# Risk score (composite feature)
risk_components = []
if 'traffic_stress' in condition_final.columns:
    risk_components.append('traffic_stress')
if 'total_weather_damage' in condition_final.columns:
    risk_components.append('total_weather_damage')
if 'days_since_last_maintenance' in condition_final.columns:
    risk_components.append('days_since_last_maintenance')

if risk_components:
    # Normalize and combine risk components
    for component in risk_components:
        if component in condition_final.columns:
            condition_final[f'{component}_normalized'] = (condition_final[component] - condition_final[component].min()) / (condition_final[component].max() - condition_final[component].min())
    
    normalized_components = [f'{comp}_normalized' for comp in risk_components if f'{comp}_normalized' in condition_final.columns]
    if normalized_components:
        condition_final['risk_score'] = condition_final[normalized_components].mean(axis=1)

print("✅ Advanced ML features created")
print(f"  - Rolling window features: {len([col for col in condition_final.columns if 'condition_avg_' in col or 'condition_std_' in col])} features")
print(f"  - Lag features: {len([col for col in condition_final.columns if 'condition_lag_' in col])} features")
print(f"  - Risk score: {condition_final['risk_score'].notna().sum():,} records")


In [None]:
# Save processed features to Snowflake
print("Saving processed features to Snowflake...")

# Create a new table for the processed features
try:
    # Drop the table if it exists
    session.sql("DROP TABLE IF EXISTS DOT_workshop_test.smartpave_analytics.pavement_features").collect()
    
    # Create the table with the processed features
    session.create_dataframe(condition_final).write.mode("overwrite").save_as_table("DOT_workshop_test.smartpave_analytics.pavement_features")
    
    print("✅ Features saved to Snowflake table: pavement_features")
    print(f"  - Total records: {len(condition_final):,}")
    print(f"  - Total features: {len(condition_final.columns)}")
    
    # Show feature summary
    feature_categories = {
        'Time-based': [col for col in condition_final.columns if any(x in col for x in ['month', 'quarter', 'year', 'season', 'days_since', 'degradation_rate'])],
        'Traffic': [col for col in condition_final.columns if any(x in col for x in ['traffic', 'TRAFFIC_VOLUME', 'PEAK_HOUR', 'TRUCK'])],
        'Weather': [col for col in condition_final.columns if any(x in col for x in ['weather', 'freeze', 'precipitation', 'temperature'])],
        'Maintenance': [col for col in condition_final.columns if any(x in col for x in ['maintenance', 'COST', 'REPAIR_TYPE'])],
        'Geospatial': [col for col in condition_final.columns if any(x in col for x in ['ROAD_TYPE', 'LATITUDE', 'LONGITUDE', 'SEGMENT_LENGTH', 'is_', 'zone'])],
        'ML Features': [col for col in condition_final.columns if any(x in col for x in ['condition_avg_', 'condition_std_', 'condition_lag_', 'condition_trend_', 'risk_score', 'interaction'])]
    }
    
    print("\n📊 Feature Engineering Summary:")
    for category, features in feature_categories.items():
        if features:
            print(f"  {category}: {len(features)} features")
            print(f"    - {', '.join(features[:5])}{'...' if len(features) > 5 else ''}")
    
except Exception as e:
    print(f"❌ Error saving features: {e}")
    print("Features created but not saved to Snowflake")


In [None]:
# Performance estimation for cell 10
print("📊 Performance Estimation for Advanced ML Features")
print("=" * 50)

# Get record counts
total_records = len(condition_final)
unique_segments = condition_final[segment_id_col].nunique() if segment_id_col else 0

print(f"Total records: {total_records:,}")
print(f"Unique segments: {unique_segments:,}")
print(f"Records per segment (avg): {total_records/unique_segments:.1f}" if unique_segments > 0 else "N/A")

# Time estimation based on record count
if total_records < 10000:
    estimated_time = "1-3 minutes"
    complexity = "Low"
elif total_records < 50000:
    estimated_time = "3-8 minutes"
    complexity = "Medium"
elif total_records < 100000:
    estimated_time = "8-15 minutes"
    complexity = "High"
else:
    estimated_time = "15+ minutes"
    complexity = "Very High"

print(f"\n⏱️  Estimated processing time: {estimated_time}")
print(f"📈 Complexity level: {complexity}")

# Factors affecting performance
print(f"\n🔍 Performance factors:")
print(f"  - Rolling windows: 3 windows × {unique_segments:,} segments")
print(f"  - Lag features: 5 lags × {total_records:,} records")
print(f"  - Trend calculations: {unique_segments:,} segments")
print(f"  - Groupby operations: {unique_segments:,} groups")

if total_records > 50000:
    print(f"\n💡 Consider optimizing if taking too long:")
    print(f"  - Reduce window sizes (30, 60 days)")
    print(f"  - Fewer lag features (1, 3, 6 months)")
    print(f"  - Simpler trend calculation")


In [None]:
# OPTIMIZED VERSION for Large Datasets (9.6M records)
print("🚀 Creating OPTIMIZED ML features for large dataset...")
print(f"Processing {len(condition_final):,} records across {condition_final[segment_id_col].nunique():,} segments")

# Use a subset for testing if needed
# condition_final = condition_final.sample(n=min(100000, len(condition_final))).copy()
# print(f"Using subset for testing: {len(condition_final):,} records")

# OPTIMIZED: Reduced rolling window features
print("Creating rolling window features (optimized)...")
window_sizes = [30, 90]  # Reduced from [30, 90, 180]
for window in window_sizes:
    print(f"  Processing {window}-day windows...")
    condition_final[f'condition_avg_{window}d'] = condition_final.groupby(segment_id_col)[condition_score_col].rolling(window=window, min_periods=1).mean().reset_index(0, drop=True)
    condition_final[f'condition_std_{window}d'] = condition_final.groupby(segment_id_col)[condition_score_col].rolling(window=window, min_periods=1).std().reset_index(0, drop=True)

# OPTIMIZED: Reduced lag features
print("Creating lag features (optimized)...")
for lag in [1, 3, 6]:  # Reduced from [1, 2, 3, 6, 12]
    print(f"  Processing {lag}-month lags...")
    condition_final[f'condition_lag_{lag}m'] = condition_final.groupby(segment_id_col)[condition_score_col].shift(lag)

# OPTIMIZED: Simplified trend features
print("Creating trend features (simplified)...")
condition_final['condition_trend_3m'] = condition_final.groupby(segment_id_col)[condition_score_col].rolling(window=3, min_periods=1).apply(lambda x: np.polyfit(range(len(x)), x, 1)[0] if len(x) > 1 else 0).reset_index(0, drop=True)

# OPTIMIZED: Simplified interaction features
print("Creating interaction features...")
if 'traffic_stress' in condition_final.columns and 'total_weather_damage' in condition_final.columns:
    condition_final['traffic_weather_interaction'] = condition_final['traffic_stress'] * condition_final['total_weather_damage']

# OPTIMIZED: Simplified risk score
print("Creating risk score...")
risk_components = []
if 'traffic_stress' in condition_final.columns:
    risk_components.append('traffic_stress')
if 'total_weather_damage' in condition_final.columns:
    risk_components.append('total_weather_damage')
if 'days_since_last_maintenance' in condition_final.columns:
    risk_components.append('days_since_last_maintenance')

if risk_components:
    # Simplified normalization (avoid division by zero)
    for component in risk_components:
        if component in condition_final.columns:
            max_val = condition_final[component].max()
            min_val = condition_final[component].min()
            if max_val > min_val:
                condition_final[f'{component}_normalized'] = (condition_final[component] - min_val) / (max_val - min_val)
            else:
                condition_final[f'{component}_normalized'] = 0
    
    normalized_components = [f'{comp}_normalized' for comp in risk_components if f'{comp}_normalized' in condition_final.columns]
    if normalized_components:
        condition_final['risk_score'] = condition_final[normalized_components].mean(axis=1)

print("✅ OPTIMIZED ML features created")
print(f"  - Rolling window features: {len([col for col in condition_final.columns if 'condition_avg_' in col or 'condition_std_' in col])} features")
print(f"  - Lag features: {len([col for col in condition_final.columns if 'condition_lag_' in col])} features")
print(f"  - Risk score: {condition_final['risk_score'].notna().sum():,} records")
