In [1]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler, RobustScaler, FunctionTransformer
from sklearn.metrics import mean_squared_error
from math import sqrt
import matplotlib.pyplot as plt
import ray
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA, AutoETS, AutoTheta, AutoCES, AutoTBATS
import random

random.seed(42)

In [2]:
%run preprocessing.ipynb

# RUN THE SYNTHETIC DATA GENERATION PIPELINE

# SETUP

In [3]:
price_only_df['unique_id'] = 'price_only'


In [4]:
# Number of periods to forecast ahead
forecast_horizon = 3

# Size of each rolling window step
step_size = 3

# Total number of rolling windows for cross-validation
n_windows = 16

In [5]:
# Initialize Ray for parallel processing
ray.init(ignore_reinit_error=True)

# Define the models and forecaster
season_length = 12  # annual seasonality for monthly data
# List of statistical forecasting models with seasonal components
models = [
    AutoARIMA(season_length=season_length),  # Automated ARIMA model selection
    AutoETS(season_length=season_length),    # Automated Exponential Smoothing
    AutoTheta(season_length=season_length),  # Automated Theta method
    AutoCES(season_length=season_length)     # Automated Complex Exponential Smoothing
]

# Create StatsForecast object with parallel processing
def get_stats_forecaster():
    """
    Creates and returns a StatsForecast object with the defined models.
    
    Returns:
        StatsForecast: Configured forecaster with parallel processing enabled
    """
    return StatsForecast(models=models, freq='M', n_jobs=-1)

def prepare_data(df, use_scaler=False):
    """
    Prepares data for forecasting by handling data types and optional scaling.
    
    Args:
        df (pd.DataFrame): Input dataframe with time series data
        use_scaler (bool): Whether to apply MinMax scaling to the target variable
        
    Returns:
        tuple: (Prepared DataFrame, Fitted scaler or None if scaling not used)
    """
    # Ensure 'y' column is numeric
    df = df.copy()
    df['y'] = pd.to_numeric(df['y'], errors='coerce')
    
    # Apply MinMax scaling if requested
    scaler = None
    if use_scaler:
        scaler = MinMaxScaler()
        df['y'] = scaler.fit_transform(df[['y']])
    
    # Split the data into train and test sets based on rolling window parameters
    train_size = len(df) - n_windows * step_size
    train_df = df[:train_size]
    test_df = df[train_size:]
    
    return pd.concat([train_df, test_df]), scaler

def calculate_metrics(actual, predicted):
    """
    Calculates multiple performance metrics for forecast evaluation.
    
    Args:
        actual (array-like): True values
        predicted (array-like): Predicted values
        
    Returns:
        tuple: (RMSE, Directional Accuracy, Turning Point Accuracy)
    """
    actual = np.asarray(actual).flatten()
    predicted = np.asarray(predicted).flatten()

    # Root Mean Square Error
    rmse = sqrt(mean_squared_error(actual, predicted))
    
    # Directional Accuracy - measures correct prediction of up/down movements
    actual_diff = np.diff(actual)
    pred_diff = np.diff(predicted)
    directional_accuracy = np.mean((actual_diff * pred_diff) > 0)
    
    # Turning Point Accuracy - measures correct prediction of trend changes
    actual_turns = (actual_diff[:-1] * actual_diff[1:]) < 0
    pred_turns = (pred_diff[:-1] * pred_diff[1:]) < 0
    turning_point_accuracy = np.mean(actual_turns == pred_turns)
    
    return rmse, directional_accuracy, turning_point_accuracy

@ray.remote
def run_experiment(df, model_names, use_scaler=False):
    """
    Runs forecasting experiment with cross-validation for multiple models.
    
    Args:
        df (pd.DataFrame): Input dataframe with time series data
        model_names (list): List of model names to evaluate
        use_scaler (bool): Whether to apply MinMax scaling
        
    Returns:
        tuple: (Results DataFrame with metrics, Cross-validation DataFrame with predictions)
    """
    # Initialize forecaster and prepare data
    stats_forecaster = get_stats_forecaster()
    prepared_df, scaler = prepare_data(df, use_scaler)
    
    # Prepare for cross-validation
    cv_df = prepared_df[['ds', 'y', 'unique_id']].copy()
    cv_df['y'] = cv_df['y'].astype(float)

    # Perform rolling window cross-validation
    crossvalidation_df = stats_forecaster.cross_validation(
        df=cv_df,
        h=forecast_horizon,
        step_size=step_size,
        n_windows=n_windows
    )

    # Inverse transform predictions if scaling was applied
    if scaler:
        crossvalidation_df['y'] = scaler.inverse_transform(crossvalidation_df[['y']])
        for model in model_names:
            if model in crossvalidation_df.columns:
                crossvalidation_df[model] = scaler.inverse_transform(crossvalidation_df[[model]])

    # Calculate performance metrics for each model
    results = []
    for model in model_names:
        if model in crossvalidation_df.columns:
            rmse, dir_acc, turn_acc = calculate_metrics(
                crossvalidation_df['y'].values,
                crossvalidation_df[model].values
            )
            
            # Calculate weighted score (equal weights for all metrics)
            weighted_score = (rmse + (1 - dir_acc) + (1 - turn_acc)) / 3
            
            results.append({
                'Model': model,
                'RMSE': rmse,
                'Directional_Accuracy': dir_acc,
                'Turning_Point_Accuracy': turn_acc,
                'Weighted_Score': weighted_score
            })
    
    return pd.DataFrame(results), crossvalidation_df

# Define statistical models to evaluate
model_names = ['AutoARIMA', 'AutoETS', 'AutoTheta', 'CES']

# Run parallel experiments with and without data scaling
experiment_ref_no_scale = run_experiment.remote(price_only_df, model_names, use_scaler=False)
experiment_ref_with_scale = run_experiment.remote(price_only_df, model_names, use_scaler=True)

# Collect results from parallel processes
results_df_no_scale, crossvalidation_df_no_scale = ray.get(experiment_ref_no_scale)
results_df_with_scale, crossvalidation_df_with_scale = ray.get(experiment_ref_with_scale)

# Display performance metrics for both scaling approaches
print("\nModel Performance Metrics (No Scaling):")
print(results_df_no_scale.to_string(index=False))
print("\nModel Performance Metrics (With MinMax Scaling):")
print(results_df_with_scale.to_string(index=False))

# Identify best performing models based on weighted score
best_model_no_scale = results_df_no_scale.loc[results_df_no_scale['Weighted_Score'].idxmin(), 'Model']
best_model_with_scale = results_df_with_scale.loc[results_df_with_scale['Weighted_Score'].idxmin(), 'Model']
print(f"\nBest Model (No Scaling): {best_model_no_scale}")
print(f"Best Model (With MinMax Scaling): {best_model_with_scale}")

# Clean up Ray resources
ray.shutdown()





2024-11-03 23:46:46,096	INFO worker.py:1781 -- Started a local Ray instance.
