# ARIMA Forecasting Model for Azure Cost Management

This notebook implements ARIMA (AutoRegressive Integrated Moving Average) forecasting models for Azure cost prediction. ARIMA is a classical time series forecasting method that works well for stationary time series.

## ARIMA Model Features
- **AR (AutoRegressive)**: Uses past values to predict future values
- **I (Integrated)**: Handles non-stationary data through differencing
- **MA (Moving Average)**: Uses past forecast errors to predict future values
- **Automatic Parameter Selection**: Uses statistical tests to find optimal parameters
- **Seasonal ARIMA (SARIMA)**: Handles seasonal patterns

## Objectives
1. Load and prepare time series data for ARIMA
2. Perform stationarity tests and transformations
3. Find optimal ARIMA parameters using auto_arima
4. Train ARIMA models for different cost categories
5. Generate forecasts with confidence intervals
6. Evaluate model performance and compare with Prophet


In [None]:
# Import required libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings
warnings.filterwarnings('ignore')

# ARIMA specific imports
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller, kpss
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.stats.diagnostic import acorr_ljungbox
from pmdarima import auto_arima
from pmdarima.arima import ARIMA as PMDARIMA
import statsmodels.api as sm

# Load forecasting data
import pickle
with open('/Users/sabbineni/projects/acm/data/forecasting_data.pkl', 'rb') as f:
    forecasting_data = pickle.load(f)

print("Libraries imported successfully!")
print(f"Available time series: {list(forecasting_data.keys())}")

# Display data info
for key, ts_data in forecasting_data.items():
    print(f"{key}: {len(ts_data)} data points, "
          f"Date range: {ts_data.index.min()} to {ts_data.index.max()}")


In [None]:
# Stationarity Tests and Data Preparation
def test_stationarity(timeseries, title="Time Series"):
    """
    Perform stationarity tests on time series data.
    """
    print(f"\n=== Stationarity Tests for {title} ===")
    
    # ADF Test
    adf_result = adfuller(timeseries.dropna())
    print(f"ADF Test:")
    print(f"  ADF Statistic: {adf_result[0]:.6f}")
    print(f"  p-value: {adf_result[1]:.6f}")
    print(f"  Critical Values:")
    for key, value in adf_result[4].items():
        print(f"    {key}: {value:.6f}")
    
    if adf_result[1] <= 0.05:
        print("  ‚úÖ Series is stationary (reject null hypothesis)")
    else:
        print("  ‚ùå Series is non-stationary (fail to reject null hypothesis)")
    
    # KPSS Test
    try:
        kpss_result = kpss(timeseries.dropna(), regression='c')
        print(f"\nKPSS Test:")
        print(f"  KPSS Statistic: {kpss_result[0]:.6f}")
        print(f"  p-value: {kpss_result[1]:.6f}")
        print(f"  Critical Values:")
        for key, value in kpss_result[3].items():
            print(f"    {key}: {value:.6f}")
        
        if kpss_result[1] >= 0.05:
            print("  ‚úÖ Series is stationary (fail to reject null hypothesis)")
        else:
            print("  ‚ùå Series is non-stationary (reject null hypothesis)")
    except:
        print("  ‚ö†Ô∏è KPSS test failed (insufficient data or other issue)")

def make_stationary(timeseries, method='diff'):
    """
    Make time series stationary using differencing or log transformation.
    """
    if method == 'diff':
        return timeseries.diff().dropna()
    elif method == 'log_diff':
        return np.log(timeseries + 1).diff().dropna()
    elif method == 'log':
        return np.log(timeseries + 1)
    else:
        return timeseries

# Test stationarity for key categories
key_categories = ['Total', 'Compute', 'Storage', 'Database']
stationarity_results = {}

for category in key_categories:
    if category in forecasting_data:
        ts_data = forecasting_data[category]
        test_stationarity(ts_data, category)
        
        # Try different transformations
        print(f"\n--- Testing transformations for {category} ---")
        
        # Original data
        test_stationarity(ts_data, f"{category} (Original)")
        
        # First difference
        diff_data = make_stationary(ts_data, 'diff')
        if len(diff_data) > 10:
            test_stationarity(diff_data, f"{category} (1st Diff)")
        
        # Log transformation
        log_data = make_stationary(ts_data, 'log')
        test_stationarity(log_data, f"{category} (Log)")
        
        # Log + First difference
        log_diff_data = make_stationary(ts_data, 'log_diff')
        if len(log_diff_data) > 10:
            test_stationarity(log_diff_data, f"{category} (Log + 1st Diff)")
        
        stationarity_results[category] = {
            'original': ts_data,
            'diff': diff_data if len(diff_data) > 10 else None,
            'log': log_data,
            'log_diff': log_diff_data if len(log_diff_data) > 10 else None
        }


In [None]:
# Auto ARIMA Parameter Selection
def find_best_arima_params(timeseries, category_name, max_p=5, max_q=5, max_d=2):
    """
    Find the best ARIMA parameters using auto_arima.
    """
    print(f"\nFinding best ARIMA parameters for {category_name}...")
    
    try:
        # Use auto_arima to find best parameters
        model = auto_arima(
            timeseries,
            start_p=0, start_q=0,
            max_p=max_p, max_q=max_q, max_d=max_d,
            seasonal=True,
            m=7,  # Weekly seasonality
            stepwise=True,
            suppress_warnings=True,
            error_action='ignore',
            trace=True
        )
        
        print(f"Best ARIMA parameters for {category_name}: {model.order}")
        if hasattr(model, 'seasonal_order') and model.seasonal_order:
            print(f"Best seasonal parameters: {model.seasonal_order}")
        
        return model
        
    except Exception as e:
        print(f"Error finding parameters for {category_name}: {str(e)}")
        return None

# Find best parameters for each category
arima_models = {}
best_params = {}

for category in key_categories:
    if category in stationarity_results:
        ts_data = stationarity_results[category]['original']
        
        if len(ts_data) > 50:  # Need sufficient data
            model = find_best_arima_params(ts_data, category)
            if model:
                arima_models[category] = model
                best_params[category] = {
                    'order': model.order,
                    'seasonal_order': getattr(model, 'seasonal_order', None),
                    'aic': model.aic(),
                    'bic': model.bic()
                }
        else:
            print(f"Skipping {category} - insufficient data points")

print(f"\nSuccessfully found parameters for: {list(arima_models.keys())}")
print("\nBest parameters summary:")
for category, params in best_params.items():
    print(f"{category}: {params}")


In [None]:
# Train ARIMA Models and Generate Forecasts
def train_arima_model(timeseries, order, seasonal_order=None, periods=30):
    """
    Train ARIMA model and generate forecasts.
    """
    try:
        # Fit the model
        if seasonal_order:
            model = SARIMAX(timeseries, order=order, seasonal_order=seasonal_order)
        else:
            model = ARIMA(timeseries, order=order)
        
        fitted_model = model.fit(disp=False)
        
        # Generate forecasts
        forecast = fitted_model.forecast(steps=periods)
        conf_int = fitted_model.get_forecast(steps=periods).conf_int()
        
        return fitted_model, forecast, conf_int
        
    except Exception as e:
        print(f"Error training model: {str(e)}")
        return None, None, None

# Train models and generate forecasts
arima_results = {}
forecast_periods = 30

for category, model in arima_models.items():
    print(f"\nTraining ARIMA model for {category}...")
    
    ts_data = stationarity_results[category]['original']
    order = best_params[category]['order']
    seasonal_order = best_params[category]['seasonal_order']
    
    fitted_model, forecast, conf_int = train_arima_model(
        ts_data, order, seasonal_order, forecast_periods
    )
    
    if fitted_model is not None:
        arima_results[category] = {
            'model': fitted_model,
            'forecast': forecast,
            'conf_int': conf_int,
            'order': order,
            'seasonal_order': seasonal_order,
            'aic': fitted_model.aic,
            'bic': fitted_model.bic
        }
        
        print(f"Model trained successfully for {category}")
        print(f"Order: {order}, Seasonal: {seasonal_order}")
        print(f"AIC: {fitted_model.aic:.2f}, BIC: {fitted_model.bic:.2f}")
        print(f"Forecast mean: ${forecast.mean():.2f}")
        print(f"Forecast std: ${forecast.std():.2f}")
    else:
        print(f"Failed to train model for {category}")

print(f"\nSuccessfully trained models for: {list(arima_results.keys())}")


In [None]:
# Model Diagnostics and Evaluation
def evaluate_arima_model(model, category_name):
    """
    Perform diagnostic tests on ARIMA model.
    """
    print(f"\n=== Model Diagnostics for {category_name} ===")
    
    # Model summary
    print("Model Summary:")
    print(model.summary())
    
    # Residual analysis
    residuals = model.resid
    
    # Ljung-Box test for residual autocorrelation
    lb_test = acorr_ljungbox(residuals, lags=10, return_df=True)
    print(f"\nLjung-Box Test (p-values):")
    print(lb_test['lb_pvalue'].head())
    
    # Check if residuals are white noise
    if lb_test['lb_pvalue'].min() > 0.05:
        print("‚úÖ Residuals appear to be white noise (no autocorrelation)")
    else:
        print("‚ùå Residuals show autocorrelation (model may need improvement)")
    
    # Normality test
    from scipy import stats
    shapiro_stat, shapiro_p = stats.shapiro(residuals)
    print(f"\nShapiro-Wilk normality test:")
    print(f"Statistic: {shapiro_stat:.6f}, p-value: {shapiro_p:.6f}")
    
    if shapiro_p > 0.05:
        print("‚úÖ Residuals appear to be normally distributed")
    else:
        print("‚ùå Residuals are not normally distributed")
    
    return {
        'residuals': residuals,
        'ljung_box': lb_test,
        'shapiro_stat': shapiro_stat,
        'shapiro_p': shapiro_p
    }

# Evaluate all models
model_diagnostics = {}
for category, result in arima_results.items():
    diagnostics = evaluate_arima_model(result['model'], category)
    model_diagnostics[category] = diagnostics


In [None]:
# Visualize ARIMA Results
def plot_arima_results(timeseries, forecast, conf_int, category_name, periods=30):
    """
    Create comprehensive visualizations for ARIMA model results.
    """
    # Create future dates
    last_date = timeseries.index[-1]
    future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), periods=periods, freq='D')
    
    # Create subplots
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=(
            f'{category_name} - Historical Data and Forecast',
            f'{category_name} - Forecast with Confidence Intervals',
            f'{category_name} - Residuals',
            f'{category_name} - ACF of Residuals'
        ),
        specs=[[{"secondary_y": False}, {"secondary_y": False}],
               [{"secondary_y": False}, {"secondary_y": False}]]
    )
    
    # 1. Historical data and forecast
    fig.add_trace(
        go.Scatter(x=timeseries.index, y=timeseries.values, mode='lines',
                  name='Historical', line=dict(color='blue')),
        row=1, col=1
    )
    
    fig.add_trace(
        go.Scatter(x=future_dates, y=forecast, mode='lines',
                  name='Forecast', line=dict(color='red')),
        row=1, col=1
    )
    
    # 2. Forecast with confidence intervals
    fig.add_trace(
        go.Scatter(x=future_dates, y=forecast, mode='lines',
                  name='Forecast', line=dict(color='red')),
        row=1, col=2
    )
    
    # Add confidence intervals
    fig.add_trace(
        go.Scatter(x=future_dates, y=conf_int.iloc[:, 1], 
                  mode='lines', line=dict(width=0), showlegend=False),
        row=1, col=2
    )
    
    fig.add_trace(
        go.Scatter(x=future_dates, y=conf_int.iloc[:, 0], 
                  mode='lines', line=dict(width=0), fill='tonexty',
                  fillcolor='rgba(255,0,0,0.2)', name='95% CI'),
        row=1, col=2
    )
    
    # 3. Residuals
    if category_name in model_diagnostics:
        residuals = model_diagnostics[category_name]['residuals']
        fig.add_trace(
            go.Scatter(x=timeseries.index[1:], y=residuals, mode='lines',
                      name='Residuals', line=dict(color='green')),
            row=2, col=1
        )
    
    # 4. ACF of residuals
    if category_name in model_diagnostics:
        residuals = model_diagnostics[category_name]['residuals']
        from statsmodels.tsa.stattools import acf
        acf_values = acf(residuals, nlags=20)
        lags = range(len(acf_values))
        
        fig.add_trace(
            go.Bar(x=list(lags), y=acf_values, name='ACF'),
            row=2, col=2
        )
    
    fig.update_layout(height=800, title_text=f"ARIMA Model Results - {category_name}")
    fig.show()

# Create visualizations for each model
for category, result in arima_results.items():
    ts_data = stationarity_results[category]['original']
    plot_arima_results(
        ts_data, 
        result['forecast'], 
        result['conf_int'], 
        category, 
        forecast_periods
    )


In [None]:
# Save ARIMA Results
print("=== Saving ARIMA Results ===")

# Save models and results
import joblib
import os

# Create results directory
results_dir = '/Users/sabbineni/projects/acm/results/arima'
os.makedirs(results_dir, exist_ok=True)

# Save models
for category, result in arima_results.items():
    model_path = f"{results_dir}/arima_model_{category.lower()}.pkl"
    joblib.dump(result['model'], model_path)
    print(f"Saved model: {model_path}")

# Save forecasts
for category, result in arima_results.items():
    # Create forecast DataFrame
    last_date = stationarity_results[category]['original'].index[-1]
    future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), 
                                periods=forecast_periods, freq='D')
    
    forecast_df = pd.DataFrame({
        'date': future_dates,
        'forecast': result['forecast'],
        'lower_bound': result['conf_int'].iloc[:, 0],
        'upper_bound': result['conf_int'].iloc[:, 1]
    })
    
    forecast_path = f"{results_dir}/arima_forecast_{category.lower()}.csv"
    forecast_df.to_csv(forecast_path, index=False)
    print(f"Saved forecast: {forecast_path}")

# Save model parameters and diagnostics
params_df = pd.DataFrame(best_params).T
params_path = f"{results_dir}/arima_parameters.csv"
params_df.to_csv(params_path)
print(f"Saved parameters: {params_path}")

# Save diagnostics
if model_diagnostics:
    diagnostics_path = f"{results_dir}/arima_diagnostics.pkl"
    joblib.dump(model_diagnostics, diagnostics_path)
    print(f"Saved diagnostics: {diagnostics_path}")

# Create forecast comparison
forecast_comparison = pd.DataFrame()
for category, result in arima_results.items():
    last_date = stationarity_results[category]['original'].index[-1]
    future_dates = pd.date_range(start=last_date + pd.Timedelta(days=1), 
                                periods=forecast_periods, freq='D')
    forecast_comparison[category] = result['forecast'].values

forecast_comparison.index = future_dates
forecast_comparison.index.name = 'Date'

comparison_path = f"{results_dir}/arima_forecast_comparison.csv"
forecast_comparison.to_csv(comparison_path)
print(f"Saved forecast comparison: {comparison_path}")

print("\n‚úÖ ARIMA model implementation completed successfully!")
print("üìä Models trained, evaluated, and saved")
print("üîÆ Future forecasts generated for 30 days")
print("üìà Results ready for comparison with other models")
