In [None]:
from dotenv import load_dotenv
import os
import http.client
import json
import pandas as pd
import numpy as np

class DataLoader:
    def __init__(self):
        # Load environment variables
        load_dotenv()
        
        # Initialize common attributes
        self.domain = os.getenv("DOMAIN")
        self.workspace_id = os.getenv("WORKSPACE_ID")
        self.api_key = os.getenv("API_KEY")
        
        # Dataset IDs
        self.world_bank_dataset_id = os.getenv("WORLD_BANK_DATASET_ID")
        self.agro_gov_dataset_id = os.getenv("AGRO_GOV_DATASET_ID")
        self.vbma_dataset_id = os.getenv("VBMA_DATASET_ID")
        
        # Common headers
        self.headers = {
            "API_KEY": self.api_key,
            "Content-Type": "application/json"
        }

    def _make_request(self, dataset_id):
        """Make HTTP request to API endpoint"""
        conn = http.client.HTTPSConnection(self.domain)
        conn.request(
            "GET",
            f"/api/v1/workspaces/{self.workspace_id}/datasets/{dataset_id}/full",
            headers=self.headers
        )
        res = conn.getresponse()
        data = res.read()
        return json.loads(data.decode("utf-8"))

    def _flatten_data(self, rows):
        """Flatten nested JSON data structure"""
        flattened_data = []
        for row in rows:
            cells = row["cells"]
            cells["row_id"] = row["row_id"]
            flattened_data.append(cells)
        return flattened_data

    def load_price_data(self):
        """Load and process World Bank rice price data"""
        parsed_data = self._make_request(self.world_bank_dataset_id)
        flattened_data = self._flatten_data(parsed_data["data"])
        
        # Convert to DataFrame and clean
        price_df = pd.DataFrame(flattened_data)
        price_df = price_df.replace("…", pd.NA)
        price_df = price_df.apply(pd.to_numeric, errors="ignore")
        
        # Keep only Rice and Date columns
        price_df = price_df[["Rice, Viet Namese 5%", "Date"]]
        
        # Convert Date format from YYYYMM to YYYY-MM-DD
        price_df['Date'] = pd.to_datetime(price_df['Date'].astype(str).str.replace('M', '-'), format='%Y-%m') + pd.offsets.MonthBegin(0)
        
        # Sort by Date
        return price_df.sort_values('Date')

    def load_news_data(self):
        """Load and process agricultural news data"""
        parsed_data = self._make_request(self.agro_gov_dataset_id)
        flattened_data = self._flatten_data(parsed_data["data"])
        
        # Convert to DataFrame and clean
        news_df = pd.DataFrame(flattened_data)
        news_df = news_df.replace("…", pd.NA)
        news_df = news_df.apply(pd.to_numeric, errors="ignore")
        
        # Convert DATE column to datetime
        news_df['DATE'] = pd.to_datetime(news_df['DATE'], format='%d | %m | %Y')
        
        # Sort by DATE
        return news_df.sort_values('DATE')

    def load_vbma_data(self):
        """Load and process VBMA data"""
        parsed_data = self._make_request(self.vbma_dataset_id)
        flattened_data = self._flatten_data(parsed_data["data"])
        
        # Convert to DataFrame
        df = pd.DataFrame(flattened_data)
        
        # Clean data - first replace "..." with NA, then replace "N/A" and "-" with 0
        df = df.replace("…", pd.NA)
        df = df.replace(["N/A", "-"], 0)
        df = df.apply(pd.to_numeric, errors="ignore")
        
        # Drop row_id before transposing
        df = df.drop('row_id', axis=1)
        
        # Transpose and set headers
        df = df.transpose()
        df.columns = df.iloc[0]
        df = df.iloc[1:]
        
        # Reset index and rename columns
        df = df.reset_index().rename(columns={'index': 'ds'})
        
        # Clean date strings
        df['ds'] = df['ds'].str.replace('- ', '')
        df['ds'] = df['ds'].str.replace('T', '')
        df['ds'] = df['ds'].str.replace(r'\([^)]*\)', '', regex=True).str.strip()
        
        # Convert to datetime
        df['ds'] = pd.to_datetime(df['ds'], format='mixed') + pd.offsets.MonthBegin(0)
        # Drop the first row:
        df = df.iloc[1:]
        
        return df


if __name__ == "__main__":
    loader = DataLoader()
    price_df = loader.load_price_data()
    news_df = loader.load_news_data()
    vbma_df = loader.load_vbma_data()

# STATISTICAL MODELS


In [21]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler, RobustScaler, FunctionTransformer
from sklearn.metrics import mean_squared_error
from math import sqrt
import matplotlib.pyplot as plt
import ray
from statsforecast import StatsForecast
from statsforecast.models import AutoARIMA, AutoETS, AutoTheta, AutoCES, AutoTBATS
import random

random.seed(42)

In [22]:
# Create stats_df from price_df and add unique_id column
stats_df = price_df.copy()
stats_df = stats_df.rename(columns={'Rice, Viet Namese 5%': 'y', 'Date': 'ds'})
stats_df['unique_id'] = 'stats'
stats_df = stats_df[['ds', 'y', 'unique_id']]  # Reorder columns
stats_df = stats_df.reset_index(drop=True)  # Reset index to increase incrementally

# Handle missing values by forward filling and then backward filling
stats_df['y'] = stats_df['y'].fillna(method='ffill').fillna(method='bfill')

stats_df

In [23]:


# Number of periods to forecast ahead
forecast_horizon = 6
# Size of each rolling window step
step_size = 1
# Total number of rolling windows for cross-validation
n_windows = 36

# Initialize Ray for parallel processing
ray.init(ignore_reinit_error=True)

# Define the models and forecaster
season_length = 12  # annual seasonality for monthly data
# List of statistical forecasting models with seasonal components
models = [
    AutoARIMA(season_length=season_length),  # Automated ARIMA model selection
    AutoETS(season_length=season_length),    # Automated Exponential Smoothing
    AutoTheta(season_length=season_length),  # Automated Theta method
    AutoCES(season_length=season_length)     # Automated Complex Exponential Smoothing
]

# Create StatsForecast object with parallel processing
def get_stats_forecaster():
    """
    Creates and returns a StatsForecast object with the defined models.
    
    Returns:
        StatsForecast: Configured forecaster with parallel processing enabled
    """
    return StatsForecast(models=models, freq='M', n_jobs=-1)

def prepare_data(df, use_scaler=False):
    """
    Prepares data for forecasting by handling data types and optional scaling.
    
    Args:
        df (pd.DataFrame): Input dataframe with time series data
        use_scaler (bool): Whether to apply MinMax scaling to the target variable
        
    Returns:
        tuple: (Prepared DataFrame, Fitted scaler or None if scaling not used)
    """
    # Ensure 'y' column is numeric
    df = df.copy()
    df['y'] = pd.to_numeric(df['y'], errors='coerce')
    
    # Handle any remaining missing values
    df['y'] = df['y'].fillna(method='ffill').fillna(method='bfill')
    
    # Apply MinMax scaling if requested
    scaler = None
    if use_scaler:
        scaler = MinMaxScaler()
        df['y'] = scaler.fit_transform(df[['y']])
    
    # Split the data into train and test sets based on rolling window parameters
    train_size = len(df) - n_windows * step_size
    train_df = df[:train_size]
    test_df = df[train_size:]
    
    return pd.concat([train_df, test_df]), scaler

def calculate_metrics(actual, predicted):
    """
    Calculates multiple performance metrics for forecast evaluation.
    
    Args:
        actual (array-like): True values
        predicted (array-like): Predicted values
        
    Returns:
        tuple: (RMSE, Directional Accuracy, Turning Point Accuracy)
    """
    actual = np.asarray(actual).flatten()
    predicted = np.asarray(predicted).flatten()

    # Root Mean Square Error
    rmse = sqrt(mean_squared_error(actual, predicted))
    
    # Directional Accuracy - measures correct prediction of up/down movements
    actual_diff = np.diff(actual)
    pred_diff = np.diff(predicted)
    directional_accuracy = np.mean((actual_diff * pred_diff) > 0)
    
    # Turning Point Accuracy - measures correct prediction of trend changes
    actual_turns = (actual_diff[:-1] * actual_diff[1:]) < 0
    pred_turns = (pred_diff[:-1] * pred_diff[1:]) < 0
    turning_point_accuracy = np.mean(actual_turns == pred_turns)
    
    return rmse, directional_accuracy, turning_point_accuracy

@ray.remote
def run_experiment(df, model_names, use_scaler=False):
    """
    Runs forecasting experiment with cross-validation for multiple models.
    
    Args:
        df (pd.DataFrame): Input dataframe with time series data
        model_names (list): List of model names to evaluate
        use_scaler (bool): Whether to apply MinMax scaling
        
    Returns:
        tuple: (Results DataFrame with metrics, Cross-validation DataFrame with predictions)
    """
    # Initialize forecaster and prepare data
    stats_forecaster = get_stats_forecaster()
    prepared_df, scaler = prepare_data(df, use_scaler)
    
    # Prepare for cross-validation
    cv_df = prepared_df[['ds', 'y', 'unique_id']].copy()
    cv_df['y'] = cv_df['y'].astype(float)

    # Perform rolling window cross-validation
    crossvalidation_df = stats_forecaster.cross_validation(
        df=cv_df,
        h=forecast_horizon,
        step_size=step_size,
        n_windows=n_windows
    )

    # Inverse transform predictions if scaling was applied
    if scaler:
        crossvalidation_df['y'] = scaler.inverse_transform(crossvalidation_df[['y']])
        for model in model_names:
            if model in crossvalidation_df.columns:
                crossvalidation_df[model] = scaler.inverse_transform(crossvalidation_df[[model]])

    # Calculate performance metrics for each model
    results = []
    for model in model_names:
        if model in crossvalidation_df.columns:
            rmse, dir_acc, turn_acc = calculate_metrics(
                crossvalidation_df['y'].values,
                crossvalidation_df[model].values
            )
            
            # Calculate weighted score (equal weights for all metrics)
            weighted_score = (rmse + (1 - dir_acc) + (1 - turn_acc)) / 3
            
            results.append({
                'Model': model,
                'RMSE': rmse,
                'Directional_Accuracy': dir_acc,
                'Turning_Point_Accuracy': turn_acc,
                'Weighted_Score': weighted_score
            })
    
    return pd.DataFrame(results), crossvalidation_df

# Define statistical models to evaluate
model_names = ['AutoARIMA', 'AutoETS', 'AutoTheta', 'CES']

# Run parallel experiments with and without data scaling
experiment_ref_no_scale = run_experiment.remote(stats_df, model_names, use_scaler=False)
experiment_ref_with_scale = run_experiment.remote(stats_df, model_names, use_scaler=True)

# Collect results from parallel processes
results_df_no_scale, crossvalidation_df_no_scale = ray.get(experiment_ref_no_scale)
results_df_with_scale, crossvalidation_df_with_scale = ray.get(experiment_ref_with_scale)

# Display performance metrics for both scaling approaches
print("\nModel Performance Metrics (No Scaling):")
print(results_df_no_scale.to_string(index=False))
print("\nModel Performance Metrics (With MinMax Scaling):")
print(results_df_with_scale.to_string(index=False))

# Identify best performing models based on weighted score
best_model_no_scale = results_df_no_scale.loc[results_df_no_scale['Weighted_Score'].idxmin(), 'Model']
best_model_with_scale = results_df_with_scale.loc[results_df_with_scale['Weighted_Score'].idxmin(), 'Model']
print(f"\nBest Model (No Scaling): {best_model_no_scale}")
print(f"Best Model (With MinMax Scaling): {best_model_with_scale}")

# Clean up Ray resources
ray.shutdown()


# ML MODELS

In [24]:
# Standard libraries
import random
import multiprocessing
from math import sqrt

# Data manipulation
import numpy as np
import pandas as pd

# Machine learning
from sklearn.metrics import mean_squared_error
import optuna

# MLForecast
from mlforecast import MLForecast
from mlforecast.auto import (
    AutoMLForecast,
    AutoElasticNet, 
    AutoXGBoost,
    AutoLightGBM,
    AutoCatboost
)
from mlforecast.target_transforms import LocalStandardScaler
from mlforecast.lag_transforms import ExponentiallyWeightedMean, RollingMean

# Visualization
import matplotlib.pyplot as plt
from utilsforecast.plotting import plot_series

# Core forecasting utilities
from coreforecast.scalers import LocalStandardScaler, LocalMinMaxScaler
from coreforecast.grouped_array import GroupedArray

# Set up multiprocessing and seeds
print(multiprocessing.cpu_count())

def set_seeds(seed=42):
    np.random.seed(seed)
    random.seed(seed)
    optuna.logging.set_verbosity(optuna.logging.WARNING)

set_seeds()
def catboost_model_params(trial: optuna.Trial):
    return {
        'subsample': trial.suggest_float('subsample', 0.5, 1.0)
    }

def calculate_metrics(actual, predicted):
    """Calculate multiple performance metrics for forecasting evaluation.
    
    Args:
        actual (array-like): The actual/true values
        predicted (array-like): The predicted/forecasted values
        
    Returns:
        tuple: A tuple containing:
            - rmse (float): Root Mean Square Error
            - directional_accuracy (float): Proportion of correctly predicted directions (0-1)
            - turning_point_accuracy (float): Proportion of correctly predicted turning points (0-1)
            - weighted_score (float): Combined score weighing all three metrics equally
    """
    # Convert inputs to numpy arrays and flatten
    actual = np.asarray(actual).flatten()
    predicted = np.asarray(predicted).flatten()

    # Calculate RMSE
    rmse = sqrt(mean_squared_error(actual, predicted))
    
    # Calculate directional accuracy (proportion of correctly predicted up/down movements)
    actual_diff = np.diff(actual)
    pred_diff = np.diff(predicted)
    directional_accuracy = np.mean((actual_diff * pred_diff) > 0)
    
    # Calculate turning point accuracy (proportion of correctly predicted trend changes)
    actual_turns = (actual_diff[:-1] * actual_diff[1:]) < 0  # True when direction changes
    pred_turns = (pred_diff[:-1] * pred_diff[1:]) < 0
    turning_point_accuracy = np.mean(actual_turns == pred_turns)
    
    # Calculate weighted score - lower is better
    # Combines RMSE with penalties for poor directional and turning point accuracy
    weighted_score = (rmse + (1 - directional_accuracy) + (1 - turning_point_accuracy)) / 3
    
    return rmse, directional_accuracy, turning_point_accuracy, weighted_score


def run_forecasting_pipeline(stats_df, horizon=6, step_size=1, n_windows=36):
    """Run an automated machine learning forecasting pipeline with multiple models.
    
    This function implements a complete forecasting workflow including:
    - Train/test splitting
    - Data preprocessing and scaling
    - Model training with cross-validation
    - Prediction generation
    - Performance evaluation and visualization
    
    Args:
        stats_df (pd.DataFrame): Input dataframe containing target variable 'y',
            datetime column 'ds', ID column 'unique_id' and optional macro features
        horizon (int, optional): Number of future periods to forecast. Defaults to 3.
        step_size (int, optional): Number of periods between cross-validation windows. Defaults to 3.
        n_windows (int, optional): Number of cross-validation windows. Defaults to 16.
            
    Returns:
        tuple: A tuple containing:
            - auto_mlf (AutoMLForecast): The fitted forecasting model
            - predictions (pd.DataFrame): Future predictions
            - cv_results (dict): Cross-validation results for each model
            - metrics_df (pd.DataFrame): Performance metrics comparison
    """
    # Split data into train and test sets
    # Test set size is determined by number of windows * step size
    train_size = len(stats_df) - n_windows * step_size
    train_df = stats_df[:train_size].copy()
    test_df = stats_df[train_size:].copy()  

    # Basic preprocessing - fill missing values with 0
    processed_df = stats_df.copy()
    processed_df.fillna(0)

    # Identify any exogenous (macro) features by excluding standard columns
    macro_features = processed_df.columns.difference(['unique_id', 'ds', 'y'])
    has_exog = len(macro_features) > 0

    # Scale macro features if present using local min-max scaling
    if has_exog:
        scaler = LocalMinMaxScaler()
        
        # First scale training data
        for feature in macro_features:
            train_values = train_df[feature].values
            indptr = np.array([0, len(train_values)])
            grouped_train = GroupedArray(train_values, indptr)
            scaled_train_values = scaler.fit_transform(grouped_train)
            train_df[feature] = scaled_train_values

        # Then scale full dataset using fitted scaler
        for feature in macro_features:
            full_values = processed_df[feature].values
            indptr = np.array([0, len(full_values)])
            grouped_full = GroupedArray(full_values, indptr)
            scaled_full_values = scaler.transform(grouped_full)
            processed_df[feature] = scaled_full_values

    # Initialize dictionary of models to evaluate
    models = {
        'elasticnet': AutoElasticNet(),  # Linear model with L1/L2 regularization
        'xgboost': AutoXGBoost(),        # Gradient boosting with trees
        'lightgbm': AutoLightGBM(),      # Light gradient boosting
        'catboost': AutoCatboost(config = catboost_model_params)  # Categorical boosting
    }

    # Configure automated ML forecasting framework
    auto_mlf = AutoMLForecast(
        models=models,
        freq='MS',  # Monthly frequency
        season_length=12,  # Annual seasonality
        fit_config=lambda trial: {
            'static_features': [],
            'dropna': True,
            'keep_last_n': None
        },
        num_threads=12  # Parallel processing
    )

    # Fit models with cross-validation
    print("Performing optimization and cross-validation...")
    auto_mlf.fit(
        train_df,
        n_windows=2,
        h=1,
        num_samples=20,
        step_size=1
    )

    # Generate future prediction dataframe
    print("\nGenerating predictions...")
    any_model = next(iter(auto_mlf.models_.values()))
    future_df = any_model.make_future_dataframe(h=horizon)
    
    # Handle future macro features if present
    if has_exog:
        # Get last known values for each series
        last_dates = stats_df.groupby('unique_id')['ds'].max()
        future_values = []
        
        # Create future macro data using last known values
        for idx, row in future_df.iterrows():
            uid = row['unique_id']
            last_known_values = stats_df[stats_df['unique_id'] == uid].loc[
                stats_df['ds'] == last_dates[uid], 
                macro_features
            ].iloc[0]
            
            future_values.append({
                'unique_id': uid,
                'ds': row['ds'],
                **last_known_values
            })
        
        # Scale future macro features
        future_macro_df = pd.DataFrame(future_values)
        for feature in macro_features:
            future_values = future_macro_df[feature].values
            indptr = np.array([0, len(future_values)])
            grouped_future = GroupedArray(future_values, indptr)
            scaled_future_values = scaler.transform(grouped_future)
            future_macro_df[feature] = scaled_future_values
        
        # Generate predictions with exogenous features
        predictions = auto_mlf.predict(horizon, X_df=future_macro_df)
    else:
        # Generate predictions without exogenous features
        predictions = auto_mlf.predict(horizon)

    # Evaluate models using cross-validation
    cv_results = {}
    metrics = {}

    # Loop through each model for evaluation
    for model_name, model in auto_mlf.models_.items():
        # Perform cross-validation on last 48 periods
        cv_df = model.cross_validation(
            df=processed_df,
            n_windows=36,
            h=6,
            step_size=1,
            static_features=[],
            dropna=True,
        )
        cv_results[model_name] = cv_df
        actual = cv_df['y']
        predicted = cv_df[model_name]
        
        # Calculate performance metrics
        rmse, dir_acc, turn_acc, weighted_score = calculate_metrics(actual, predicted)
        metrics[model_name] = {
            'RMSE': rmse,
            'Directional Accuracy': dir_acc,
            'Turning Point Accuracy': turn_acc,
            'Weighted Score': weighted_score
        }

        # Create evaluation plots
        plt.figure(figsize=(15, 6))
        print(f"\nMetrics for {model_name}:")
        print(f"RMSE: {rmse:.4f}")
        print(f"Directional Accuracy: {dir_acc:.4f}")
        print(f"Turning Point Accuracy: {turn_acc:.4f}")
        print(f"Weighted Score: {weighted_score:.4f}")
        
        # Print value ranges for validation
        print(f"\nValue ranges for {model_name}:")
        print("Original data range:", stats_df['y'].min(), "-", stats_df['y'].max())
        print("Predicted data range:", cv_df[model_name].min(), "-", cv_df[model_name].max())
        print("Time range:", cv_df['ds'].min(), "-", cv_df['ds'].max())
    # Create comparison metrics dataframe
    metrics_df = pd.DataFrame(metrics).round(4)
    print("\nModel Comparison Metrics:")
    print(metrics_df)

    # Identify best performing model based on weighted score
    best_model = min(metrics.items(), key=lambda x: x[1]['Weighted Score'])
    print(f"\nBest Model: {best_model[0]} (Weighted Score: {best_model[1]['Weighted Score']:.4f})")

    return auto_mlf, predictions, cv_results, metrics_df

# Run the forecasting pipeline
auto_mlf, predictions, cv_results, metrics_df = run_forecasting_pipeline(stats_df, horizon=6, step_size=1)
