In [None]:
import os
import sys

# Fix Spark Python environment mismatch
os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

# Hadoop + winutils
os.environ["HADOOP_HOME"] = "C:\\hadoop-3.3.6"
os.environ["PATH"] += ";C:\\hadoop-3.3.6\\bin"

# Java 17 (Spark 4 requirement)
os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk-17"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "\\bin;" + os.environ["PATH"]

# Fix Windows Hadoop user identity error
os.environ["HADOOP_USER_NAME"] = "root"

In [None]:
# Import all necessary libraries for PySpark MLlib
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler as SparkStandardScaler
from pyspark.ml.regression import (
    LinearRegression, DecisionTreeRegressor, RandomForestRegressor, 
    GBTRegressor, GeneralizedLinearRegression
)
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.stat import Correlation
import warnings
warnings.filterwarnings('ignore')

# Set up plotting style
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = (12, 8)

print("All PySpark MLlib libraries imported successfully!")

In [None]:
# Initialize Spark Session (if not already available)
try:
    spark
    print("Using existing Spark session")
except NameError:
    print("Creating new Spark session...")
    spark = SparkSession.builder \
        .appName("MLModelTraining") \
        .master("local[*]") \
        .config("spark.driver.memory", "4g") \
        .config("spark.executor.memory", "2g") \
        .getOrCreate()
    print("Spark session created successfully")

# Load the aeso_weather_Final dataset from parquet file
print("\nLoading aeso_weather_Final dataset from parquet file...")

parquet_path = "../data/aeso_weather_Final.parquet"

try:
    # Load the dataset from parquet file
    df_spark = spark.read.parquet(parquet_path)
    
    print(f"Successfully loaded dataset from {parquet_path}")
    print(f"Dataset shape: {df_spark.count()} rows, {len(df_spark.columns)} columns")
    print("Dataset columns:", df_spark.columns)
    
    # Show sample of the data
    print("\nSample of loaded data:")
    df_spark.show(5)
    
    print("\nData types:")
    df_spark.printSchema()
    
except Exception as e:
    print(f"Error loading dataset from parquet: {str(e)}")
    print("\nPlease ensure:")
    print("1. The AESOLoadDataCleaningAndFraming notebook has been run")
    print("2. The aeso_weather_Final dataset has been saved to parquet")
    print(f"3. The parquet file exists at: {parquet_path}")
    raise Exception("Could not load aeso_weather_Final dataset from parquet file")

In [None]:
# Explore the dataset structure and prepare for ML
print("Exploring dataset structure with PySpark...")

# Display basic statistics
print("Dataset Info:")
print(f"Shape: {df_spark.count()} rows, {len(df_spark.columns)} columns")

# Check for missing values
print("\nMissing values per column:")
for col in df_spark.columns:
    missing_count = df_spark.filter(F.col(col).isNull()).count()
    if missing_count > 0:
        print(f"{col}: {missing_count}")

print("\nFirst few rows:")
df_spark.show(5)

print("\nData types:")
df_spark.printSchema()

# Check for the target variable and get statistics
if 'load_mw' in df_spark.columns:
    print(f"\nTarget variable (load_mw) statistics:")
    df_spark.select('load_mw').describe().show()
    
    # For visualization, sample some data to pandas
    load_sample = df_spark.select('load_mw').sample(0.1, seed=42).toPandas()
    
    plt.figure(figsize=(10, 6))
    plt.hist(load_sample['load_mw'], bins=50, alpha=0.7, edgecolor='black')
    plt.title('Distribution of Energy Load (MW) - Sample')
    plt.xlabel('Load (MW)')
    plt.ylabel('Frequency')
    plt.grid(True, alpha=0.3)
    plt.show()
else:
    print("Warning: 'load_mw' column not found in dataset!")
    print("Available columns:", df_spark.columns)

In [None]:
# Prepare data for PySpark MLlib
print("Preparing data for PySpark MLlib...")

# The aeso_weather_Final should have these columns based on the framing notebook:
# - Numeric: temp_c, precip_mm
# - Categorical (encoded): region_typeEncoder, seasonEncoder, hourEncoder, etc.
# - Target: load_mw
# - Features vector: features (already assembled)

# Check if features vector already exists
if 'features' in df_spark.columns:
    print("Found existing 'features' vector column - will use this for ML")
    ml_df = df_spark.select('features', 'load_mw')
    
else:
    print("Creating features vector from individual columns...")
    
    # Find numeric columns (original numeric features)
    numeric_features = ['temp_c', 'precip_mm']
    
    # Find encoded categorical features (those ending with 'Encoder')
    encoded_features = [col for col in df_spark.columns if col.endswith('Encoder')]
    
    print(f"Numeric features: {numeric_features}")
    print(f"Encoded categorical features: {encoded_features}")
    
    # Combine all feature columns
    all_features = numeric_features + encoded_features
    
    # Filter to only existing columns
    existing_features = [col for col in all_features if col in df_spark.columns]
    print(f"Using features: {existing_features}")
    
    # Create VectorAssembler to combine features
    assembler = VectorAssembler(inputCols=existing_features, outputCol="features")
    ml_df = assembler.transform(df_spark).select('features', 'load_mw')

print("\nML DataFrame structure:")
ml_df.printSchema()
print(f"ML DataFrame count: {ml_df.count()}")

# Show sample
print("\nSample of ML data:")
ml_df.show(5, truncate=False)

# Handle missing values if any
missing_features = ml_df.filter(F.col('features').isNull()).count()
missing_target = ml_df.filter(F.col('load_mw').isNull()).count()

print(f"\nMissing features: {missing_features}")
print(f"Missing target values: {missing_target}")

# Remove rows with missing values
if missing_features > 0 or missing_target > 0:
    print("Removing rows with missing values...")
    ml_df = ml_df.filter(F.col('features').isNotNull() & F.col('load_mw').isNotNull())
    print(f"After removing missing values: {ml_df.count()} rows")

print("Data preparation completed!")

In [None]:
# Split data for training and testing using PySpark
print("Splitting data for training and testing...")

# Split the data (80% train, 20% test)
train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)

print(f"Training set size: {train_df.count()} samples")
print(f"Test set size: {test_df.count()} samples")
print(f"Total samples: {ml_df.count()}")

# Cache the datasets for better performance
train_df.cache()
test_df.cache()

print("Data splitting completed!")

In [None]:
# Define and train multiple PySpark MLlib models
print("Training multiple PySpark MLlib models...")

# Initialize PySpark ML models
models = {
    'Linear Regression': LinearRegression(featuresCol='features', labelCol='load_mw', predictionCol='prediction'),
    'Decision Tree': DecisionTreeRegressor(featuresCol='features', labelCol='load_mw', predictionCol='prediction', maxDepth=10, seed=42),
    'Random Forest': RandomForestRegressor(featuresCol='features', labelCol='load_mw', predictionCol='prediction', numTrees=100, seed=42),
    'Gradient Boosting': GBTRegressor(featuresCol='features', labelCol='load_mw', predictionCol='prediction', maxIter=100, seed=42),
    'GLM - Gaussian': GeneralizedLinearRegression(featuresCol='features', labelCol='load_mw', predictionCol='prediction', family='gaussian'),
    'GLM - Poisson': GeneralizedLinearRegression(featuresCol='features', labelCol='load_mw', predictionCol='prediction', family='poisson')
}

# Initialize evaluators
evaluators = {
    'RMSE': RegressionEvaluator(labelCol='load_mw', predictionCol='prediction', metricName='rmse'),
    'MAE': RegressionEvaluator(labelCol='load_mw', predictionCol='prediction', metricName='mae'),
    'R2': RegressionEvaluator(labelCol='load_mw', predictionCol='prediction', metricName='r2')
}

# Train models and collect results
results = {}
trained_models = {}

for name, model in models.items():
    print(f"Training {name}...")
    
    try:
        # Train the model
        fitted_model = model.fit(train_df)
        
        # Make predictions on test set
        predictions = fitted_model.transform(test_df)
        
        # Evaluate the model
        model_results = {}
        for metric_name, evaluator in evaluators.items():
            score = evaluator.evaluate(predictions)
            model_results[metric_name] = score
        
        results[name] = model_results
        trained_models[name] = fitted_model
        
        print(f"{name} - R2: {model_results['R2']:.4f}, RMSE: {model_results['RMSE']:.2f}, MAE: {model_results['MAE']:.2f}")
        
    except Exception as e:
        print(f"Error training {name}: {str(e)}")
        results[name] = {'RMSE': np.nan, 'MAE': np.nan, 'R2': np.nan}

print("\nAll models trained successfully!")

In [None]:
# Create comprehensive model comparison
print("Creating model comparison analysis...")

# Convert results to DataFrame for easier analysis
results_df = pd.DataFrame(results).T
results_df = results_df.round(4)
results_df_sorted = results_df.sort_values('R2', ascending=False)

print("Model Performance Ranking:")
print("=" * 70)
print(results_df_sorted.to_string())

# Visualization of model performance
fig, axes = plt.subplots(2, 2, figsize=(16, 12))

# R² Score comparison
model_names = results_df_sorted.index
r2_scores = results_df_sorted['R2']
colors = plt.cm.viridis(np.linspace(0, 1, len(model_names)))

bars1 = axes[0,0].barh(model_names, r2_scores, color=colors)
axes[0,0].set_title('Model Performance - R² Score')
axes[0,0].set_xlabel('R² Score')
axes[0,0].grid(axis='x', alpha=0.3)

# Add value labels on bars
for bar, score in zip(bars1, r2_scores):
    if not np.isnan(score):
        axes[0,0].text(bar.get_width() + 0.01, bar.get_y() + bar.get_height()/2, 
                       f'{score:.3f}', va='center')

# RMSE comparison
rmse_scores = results_df_sorted['RMSE']
bars2 = axes[0,1].barh(model_names, rmse_scores, color=colors)
axes[0,1].set_title('Model Performance - RMSE (Lower is Better)')
axes[0,1].set_xlabel('RMSE')
axes[0,1].grid(axis='x', alpha=0.3)

# MAE comparison
mae_scores = results_df_sorted['MAE']
bars3 = axes[1,0].barh(model_names, mae_scores, color=colors)
axes[1,0].set_title('Model Performance - MAE (Lower is Better)')
axes[1,0].set_xlabel('MAE')
axes[1,0].grid(axis='x', alpha=0.3)

# Prediction vs Actual for best model
best_model_name = results_df_sorted.index[0]
if best_model_name in trained_models:
    best_model = trained_models[best_model_name]
    best_predictions = best_model.transform(test_df)
    
    # Sample for visualization
    pred_sample = best_predictions.select('load_mw', 'prediction').sample(0.2, seed=42).toPandas()
    
    axes[1,1].scatter(pred_sample['load_mw'], pred_sample['prediction'], alpha=0.5)
    axes[1,1].plot([pred_sample['load_mw'].min(), pred_sample['load_mw'].max()], 
                   [pred_sample['load_mw'].min(), pred_sample['load_mw'].max()], 'r--', lw=2)
    axes[1,1].set_title(f'Best Model ({best_model_name}) - Predicted vs Actual')
    axes[1,1].set_xlabel('Actual Energy Load (MW)')
    axes[1,1].set_ylabel('Predicted Energy Load (MW)')
    axes[1,1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# Feature importance analysis for tree-based models
print("Analyzing feature importance...")

# Get feature importance from tree-based models
tree_models = ['Decision Tree', 'Random Forest', 'Gradient Boosting']
feature_importance = {}

fig, axes = plt.subplots(1, 3, figsize=(18, 6))

for i, model_name in enumerate(tree_models):
    if model_name in trained_models:
        model = trained_models[model_name]
        
        # Get feature importances (available for tree-based models)
        if hasattr(model, 'featureImportances'):
            importances = model.featureImportances.toArray()
            
            # Create feature names (limited information from vector)
            feature_names = [f'Feature_{j}' for j in range(len(importances))]
            
            # Sort by importance
            indices = np.argsort(importances)[::-1]
            
            # Get top features (limit to top 15 for readability)
            top_n = min(15, len(importances))
            top_indices = indices[:top_n]
            top_importances = importances[top_indices]
            top_feature_names = [feature_names[j] for j in top_indices]
            
            # Plot
            axes[i].barh(range(len(top_importances)), top_importances)
            axes[i].set_title(f'{model_name} - Feature Importance')
            axes[i].set_yticks(range(len(top_importances)))
            axes[i].set_yticklabels(top_feature_names)
            axes[i].grid(axis='x', alpha=0.3)
            
            # Store for summary
            feature_importance[model_name] = dict(zip(top_feature_names, top_importances))
        else:
            axes[i].text(0.5, 0.5, f'{model_name}\nFeature importance\nnot available', 
                        ha='center', va='center', transform=axes[i].transAxes)
            axes[i].set_title(f'{model_name} - Feature Importance')

plt.tight_layout()
plt.show()

# Print top 5 features for each tree model
if feature_importance:
    print("\nTop 5 Most Important Features by Model:")
    print("=" * 60)
    for model_name, features in feature_importance.items():
        print(f"\n{model_name}:")
        for i, (feature, importance) in enumerate(list(features.items())[:5]):
            print(f"  {i+1}. {feature}: {importance:.4f}")
else:
    print("\nNo feature importance information available from the trained models.")

In [None]:
# Cross-validation and hyperparameter tuning for best model
print("Performing cross-validation and hyperparameter tuning...")

# Get the best performing model
best_model_name = results_df_sorted.index[0]
print(f"Best model: {best_model_name}")

# Perform cross-validation for Random Forest (if it's among the top performers)
if 'Random Forest' in results and not np.isnan(results['Random Forest']['R2']):
    print("\nPerforming cross-validation and hyperparameter tuning for Random Forest...")
    
    # Create parameter grid
    rf = RandomForestRegressor(featuresCol='features', labelCol='load_mw', seed=42)
    
    paramGrid = (ParamGridBuilder()
                 .addGrid(rf.numTrees, [50, 100, 150])
                 .addGrid(rf.maxDepth, [5, 10, 15])
                 .build())
    
    # Create cross validator
    evaluator = RegressionEvaluator(labelCol='load_mw', predictionCol='prediction', metricName='r2')
    crossval = CrossValidator(estimator=rf,
                             estimatorParamMaps=paramGrid,
                             evaluator=evaluator,
                             numFolds=3)
    
    # Fit cross validator
    print("Running cross-validation... (this may take a while)")
    cvModel = crossval.fit(train_df)
    
    # Get best model
    bestModel = cvModel.bestModel
    
    # Evaluate on test set
    predictions = bestModel.transform(test_df)
    tuned_r2 = evaluator.evaluate(predictions)
    tuned_rmse = RegressionEvaluator(labelCol='load_mw', predictionCol='prediction', metricName='rmse').evaluate(predictions)
    tuned_mae = RegressionEvaluator(labelCol='load_mw', predictionCol='prediction', metricName='mae').evaluate(predictions)
    
    print(f"\nTuned Random Forest Performance:")
    print(f"R²: {tuned_r2:.4f}")
    print(f"RMSE: {tuned_rmse:.2f}")
    print(f"MAE: {tuned_mae:.2f}")
    print(f"Best parameters: numTrees={bestModel.getNumTrees}, maxDepth={bestModel.getMaxDepth()}")
    
    # Update results with tuned model
    results['Tuned Random Forest'] = {
        'R2': tuned_r2,
        'RMSE': tuned_rmse,
        'MAE': tuned_mae
    }
    trained_models['Tuned Random Forest'] = bestModel

else:
    print("Random Forest not available or didn't perform well enough for hyperparameter tuning.")

In [None]:
# Advanced analysis: Model interpretability and insights
print("Analyzing model predictions and insights...")

# Get the overall best model
final_results = pd.DataFrame(results).T.sort_values('R2', ascending=False)
best_model_name = final_results.index[0]
best_model = trained_models[best_model_name]

print(f"Using {best_model_name} for detailed analysis...")

# Make predictions on test set
test_predictions = best_model.transform(test_df)

# Calculate residuals
test_with_residuals = test_predictions.withColumn('residual', F.col('load_mw') - F.col('prediction'))

print("Residual Analysis:")
test_with_residuals.select('residual').describe().show()

# Sample data for visualization
sample_data = test_with_residuals.select('load_mw', 'prediction', 'residual').sample(0.3, seed=42).toPandas()

# Create residual plots
fig, axes = plt.subplots(1, 3, figsize=(18, 6))

# Residuals vs Predicted
axes[0].scatter(sample_data['prediction'], sample_data['residual'], alpha=0.5)
axes[0].axhline(y=0, color='r', linestyle='--')
axes[0].set_title('Residuals vs Predicted Values')
axes[0].set_xlabel('Predicted Load (MW)')
axes[0].set_ylabel('Residuals')
axes[0].grid(True, alpha=0.3)

# Histogram of residuals
axes[1].hist(sample_data['residual'], bins=50, alpha=0.7, edgecolor='black')
axes[1].set_title('Distribution of Residuals')
axes[1].set_xlabel('Residuals')
axes[1].set_ylabel('Frequency')
axes[1].grid(True, alpha=0.3)

# Q-Q plot for residuals normality
from scipy import stats
stats.probplot(sample_data['residual'], dist="norm", plot=axes[2])
axes[2].set_title('Q-Q Plot of Residuals')
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Calculate prediction intervals
residual_std = sample_data['residual'].std()
print(f"\nPrediction Analysis:")
print(f"Residual Standard Deviation: {residual_std:.2f} MW")
print(f"95% Prediction Interval: ± {1.96 * residual_std:.2f} MW")

# Show some example predictions
print(f"\nExample Predictions from {best_model_name}:")
example_predictions = test_predictions.select('load_mw', 'prediction').limit(10).toPandas()
example_predictions['error'] = example_predictions['load_mw'] - example_predictions['prediction']
example_predictions['error_pct'] = (example_predictions['error'] / example_predictions['load_mw'] * 100).round(2)
print(example_predictions.round(2))

In [None]:
# Final summary and insights
print("=" * 80)
print("FINAL MODEL COMPARISON SUMMARY - PySpark MLlib")
print("=" * 80)

# Create final results DataFrame
final_results = pd.DataFrame(results).T
final_results = final_results.sort_values('R2', ascending=False, na_position='last')

print("\nFinal Model Rankings:")
print(final_results.round(4))

print("\n" + "=" * 80)
print("KEY INSIGHTS AND RECOMMENDATIONS")
print("=" * 80)

# Get best performing model
valid_results = final_results.dropna(subset=['R2'])
if len(valid_results) > 0:
    best_model_final = valid_results.index[0]
    best_r2 = valid_results.loc[best_model_final, 'R2']
    best_rmse = valid_results.loc[best_model_final, 'RMSE']
    best_mae = valid_results.loc[best_model_final, 'MAE']
    
    print(f"\n1. BEST PERFORMING MODEL: {best_model_final}")
    print(f"   - R² Score: {best_r2:.4f}")
    print(f"   - RMSE: {best_rmse:.2f} MW")
    print(f"   - MAE: {best_mae:.2f} MW")
    print(f"   - Explains {best_r2*100:.1f}% of the variance in energy load")
    
    print(f"\n2. PYSPARK MLLIB MODEL COMPARISON:")
    # Compare model types
    tree_models_in_results = [m for m in ['Decision Tree', 'Random Forest', 'Gradient Boosting'] if m in valid_results.index]
    linear_models_in_results = [m for m in ['Linear Regression', 'GLM - Gaussian'] if m in valid_results.index]
    
    if tree_models_in_results:
        tree_models_avg = valid_results.loc[tree_models_in_results, 'R2'].mean()
        print(f"   - Tree-based models average R²: {tree_models_avg:.4f}")
        print(f"   - Best tree model: {valid_results.loc[tree_models_in_results].iloc[0].name}")
    
    if linear_models_in_results:
        linear_models_avg = valid_results.loc[linear_models_in_results, 'R2'].mean()
        print(f"   - Linear models average R²: {linear_models_avg:.4f}")
        print(f"   - Best linear model: {valid_results.loc[linear_models_in_results].iloc[0].name}")
    
    if tree_models_in_results and linear_models_in_results:
        if tree_models_avg > linear_models_avg:
            print("   - Tree-based models outperform linear models in this dataset")
        else:
            print("   - Linear models perform competitively with tree-based models")
    
    print(f"\n3. DATA SCALABILITY WITH PYSPARK:")
    print(f"   - Successfully processed {ml_df.count():,} data points with PySpark")
    print("   - MLlib handles large-scale data efficiently")
    print("   - Models can scale to larger datasets without memory issues")
    
    print(f"\n4. FEATURE IMPORTANCE INSIGHTS:")
    if feature_importance:
        # Get most common top features across models
        all_features = []
        for model_features in feature_importance.values():
            all_features.extend(list(model_features.keys())[:3])
        
        from collections import Counter
        feature_counts = Counter(all_features)
        print("   Most important features across tree-based models:")
        for feature, count in feature_counts.most_common(5):
            print(f"   - {feature}: mentioned in {count} models")
    else:
        print("   Feature importance analysis was limited due to vector format")
        print("   Consider using the original feature columns for more detailed analysis")
    
    print(f"\n5. PERFORMANCE ASSESSMENT:")
    if best_r2 > 0.9:
        print("   - Excellent model performance (R² > 0.9)")
        print("   - Model predictions are highly reliable")
    elif best_r2 > 0.8:
        print("   - Good model performance (R² > 0.8)")
        print("   - Model provides reliable predictions with some uncertainty")
    elif best_r2 > 0.7:
        print("   - Decent model performance (R² > 0.7)")
        print("   - Model captures major patterns but has room for improvement")
    else:
        print("   - Model performance has significant room for improvement")
        print("   - Consider feature engineering or collecting more data")
    
    print(f"\n6. SPARK MLLIB ADVANTAGES:")
    print("   - Distributed computing capability for large datasets")
    print("   - Built-in feature processing and transformation")
    print("   - Seamless integration with Spark data processing pipeline")
    print("   - Scalable hyperparameter tuning with CrossValidator")
    
    print(f"\n7. RECOMMENDATIONS:")
    print(f"   - Deploy {best_model_final} for production energy load predictions")
    print(f"   - Expected prediction accuracy: ±{best_rmse:.1f} MW (RMSE)")
    print("   - Use Spark's batch prediction capabilities for real-time inference")
    
    if 'Tuned Random Forest' in results and not np.isnan(results['Tuned Random Forest']['R2']):
        tuned_r2 = results['Tuned Random Forest']['R2']
        if tuned_r2 > best_r2:
            print("   - Consider using tuned Random Forest as it shows improved performance")
    
    if best_r2 < 0.8:
        print("   - Recommendations for improvement:")
        print("     * Engineer more weather-related features (humidity, wind speed)")
        print("     * Include lagged features (previous hours/days)")
        print("     * Add seasonal and holiday indicators")
        print("     * Consider ensemble methods combining multiple algorithms")
    
    print("\n   - Dataset and processing summary:")
    print(f"     * Processed {ml_df.count():,} hourly energy load records")
    print(f"     * Features derived from weather and temporal data")
    print(f"     * Target variable range: Weather-dependent energy demand")
    print("     * Successfully leveraged Spark's distributed computing")
    
else:
    print("No valid model results found. Check data and model training process.")

print("\n" + "=" * 80)
print("PYSPARK MLLIB ANALYSIS COMPLETE")
print("=" * 80)
print("\nThis analysis used PySpark MLlib with the aeso_weather_Final dataset.")
print("Models were trained using distributed computing for scalability.")
print("The approach demonstrates enterprise-ready machine learning for energy forecasting.")