<a href="https://colab.research.google.com/github/sagnik-sudo/Workload-Forecasting-Redset/blob/deepar/workload_forecasting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Amazon Redset Workload Forecasting

## What is Redset?
Redset is a dataset released by Amazon in 2024, comprising three months of user query metadata from a selected sample of Amazon Redshift instances. It includes query metadata for **200 provisioned and 200 serverless instances**, offering insights into user interactions with these database services. While not representative of the entire Redshift fleet, Redset serves as a valuable resource for developing **new benchmarks and exploring machine learning techniques**, such as **workload forecasting**, tailored to these specific workloads.

## What we perform in this notebook?

In this notebook, we analyze **Amazon Redset**, a dataset containing query metadata from Amazon Redshift instances, to explore **workload forecasting techniques** for **intelligent resource scaling**. Our primary objectives are:

### 1. Baseline Model Evaluation
- We evaluate **traditional forecasting baselines**, such as:
  - **AutoGluon DeepAR**
  - **Seasonal Naive Models**
- These models establish reference points for **workload prediction**.

### 2. Development of RNN-based Forecasting Models
- We implement **Recurrent Neural Network (RNN)-based models** to improve **forecasting accuracy**.
- These models aim to **capture complex workload patterns** and **improve upon the baselines**.

### 3. Comparison Between Baselines and RNN-based Approaches
- Using the **Redset dataset**, we compare the performance of our **custom RNN models** with:
  - **AutoGluon DeepAR**
  - **Statistical forecasting methods** (e.g., ARIMA, ETS)
- We use metrics such as **Q-error** and **forecast accuracy** to assess improvements.

---

### Reference
For more details on the forecasting methodologies and benchmark comparisons, we refer to the **attached paper: "Forecasting Algorithms for Intelligent Resource Scaling: An Experimental Analysis"**.  
This paper provides insights into **workload forecasting challenges**, evaluation metrics, and strategies for improving predictive accuracy in cloud environments.

In [None]:
!pip install autogluon.timeseries

Collecting autogluon.timeseries
  Downloading autogluon.timeseries-1.2-py3-none-any.whl.metadata (12 kB)
Collecting lightning<2.6,>=2.2 (from autogluon.timeseries)
  Downloading lightning-2.5.0.post0-py3-none-any.whl.metadata (40 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/40.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.4/40.4 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pytorch-lightning (from autogluon.timeseries)
  Downloading pytorch_lightning-2.5.0.post0-py3-none-any.whl.metadata (21 kB)
Collecting accelerate<1.0,>=0.34.0 (from autogluon.timeseries)
  Downloading accelerate-0.34.2-py3-none-any.whl.metadata (19 kB)
Collecting gluonts<0.17,>=0.15.0 (from autogluon.timeseries)
  Downloading gluonts-0.16.0-py3-none-any.whl.metadata (9.8 kB)
Collecting statsforecast<1.8,>=1.7.0 (from autogluon.timeseries)
  Downloading statsforecast-1.7.8-cp311-cp311-manylinux_2_17_x86_64.many

In [None]:
import pandas as pd
import numpy as np
import os
from typing import Dict
from autogluon.timeseries import TimeSeriesPredictor
from typing import List, Dict

class DeepAR:
    """
    DeepAR implementation for workload forecasting using AutoGluon TimeSeries.
    This implementation explicitly configures AutoGluon to use the DeepAR model.
    Supports training, prediction, evaluation, and saving/loading models.
    """

    def __init__(self, prediction_length: int, freq: str = "h", hyperparameters: Dict = None):
        print("Initializing DeepARAutogluonTS Model...")
        self.prediction_length = prediction_length
        self.freq = freq
        self.model = None

        # Default hyperparameters for the DeepAR model in AutoGluon.
        # By wrapping the parameters in a dictionary with the key "DeepAR",
        # we ensure that AutoGluon uses only the DeepAR model.
        default_deepar_hp = {
            "epochs": 50,
            "learning_rate": 1e-3,
            "num_layers": 2,
            "hidden_size": 40,
            "dropout_rate": 0.1,
            "batch_size": 32,
            "context_length": prediction_length,
        }
        # Explicitly force usage of DeepAR by setting the key.
        self.hyperparameters = hyperparameters or {"DeepAR": default_deepar_hp}

    def prepare_data(self, data: pd.DataFrame, target_column: str = "query_count") -> pd.DataFrame:
        """
        Converts a pandas DataFrame into the long format expected by AutoGluon TimeSeries.
        Assumes input DataFrame has columns 'timestamp' and target_column.
        """
        data = data.copy()
        data["timestamp"] = pd.to_datetime(data["timestamp"])
        data = data.sort_values("timestamp")
        # Add a constant item_id since we are forecasting a single time series.
        data["item_id"] = "item_1"
        # Rearranging columns if needed.
        data = data[["item_id", "timestamp", target_column]]
        return data

    def train(self, train_data: pd.DataFrame, target_column: str = "query_count"):
        """
        Trains the DeepAR model using AutoGluon TimeSeries.
        """
        prepared_data = self.prepare_data(train_data, target_column)

        # Initialize the TimeSeriesPredictor with the target, prediction length, and frequency.
        print("Training started using DeepAR...")
        self.model = TimeSeriesPredictor(
            target=target_column,
            prediction_length=self.prediction_length,
            freq=self.freq,
            eval_metric='WQL'
        )

        # Fit the predictor with the specified DeepAR hyperparameters.
        self.model.fit(
            train_data=prepared_data,
            hyperparameters=self.hyperparameters,
            # You can pass additional parameters such as time_limit if needed.
        )
        print("Training completed using DeepAR.")

    def predict(self, test_data: pd.DataFrame, target_column: str = "query_count") -> pd.DataFrame:
        """
        Generates predictions using the trained model.
        Returns a DataFrame with timestamps and prediction statistics.
        """
        if self.model is None:
            raise ValueError("Model must be trained before making predictions.")

        prepared_data = self.prepare_data(test_data, target_column)
        predictions = self.model.predict(prepared_data)

        # Extract forecasts for our single time series (item_id = "item_1").
        # The predictions DataFrame index is 'timestamp' and the columns include the forecast quantiles.
        # Here we compute the mean forecast and select quantiles for lower and upper bounds.
        forecast = predictions.loc["item_1"]
        forecast = forecast.reset_index().rename(columns={"index": "timestamp"})

        # Check if AutoGluon returns quantile columns (they usually have names like '0.1', '0.5', '0.9')
        # Here we assume the median is our best estimate for the mean forecast.
        lower_bound = forecast["0.1"] if "0.1" in forecast.columns else None
        upper_bound = forecast["0.9"] if "0.9" in forecast.columns else None
        mean_forecast = forecast["0.5"] if "0.5" in forecast.columns else forecast.iloc[:, 1]  # fallback

        predictions_df = pd.DataFrame({
            "timestamp": forecast["timestamp"],
            "mean": mean_forecast,
            "lower_bound": lower_bound,
            "upper_bound": upper_bound,
        })

        return predictions_df

    def evaluate(self, test_data: pd.DataFrame, target_column: str = "query_count") -> Dict[str, float]:
        """
        Evaluates the model using custom metrics: q-error, MAE, and RME.
        Instead of merging on timestamps (which can fail if the test timestamps
        do not match the forecast horizon timestamps), this version assumes that
        the actual values for evaluation are the last `prediction_length` rows of
        the prepared test data (sorted by timestamp).

        Returns a dictionary with the computed metrics.
        """
        if self.model is None:
            raise ValueError("Model must be trained before evaluation.")

        # Prepare the data in long format
        prepared_data = self.prepare_data(test_data, target_column)
        # Sort the prepared data to ensure chronological order
        prepared_data = prepared_data.sort_values("timestamp").reset_index(drop=True)

        # Get forecast predictions (using the mean forecast as our estimate)
        predictions_df = self.predict(test_data, target_column)
        predictions_df = predictions_df.reset_index(drop=True)

        # Instead of merging by timestamp, assume that the actual values for the forecast horizon
        # are the last `prediction_length` rows of the prepared data.
        if len(prepared_data) < self.prediction_length:
            raise ValueError("Not enough test data to cover the forecast horizon.")

        actual_forecast = prepared_data.iloc[-self.prediction_length:].reset_index(drop=True)

        if len(actual_forecast) != len(predictions_df):
            raise ValueError("Mismatch between forecast length and actuals length.")

        forecast = predictions_df["mean"].values
        actual = actual_forecast[target_column].values

        # To avoid division by zero, add a small epsilon
        epsilon = 1e-10

        # Compute MAE
        mae = np.mean(np.abs(forecast - actual))

        # Compute q-error for each forecast point
        q_errors = np.maximum(forecast / (actual + epsilon), actual / (forecast + epsilon))
        q_error = np.mean(q_errors)

        # Compute Relative Mean Error (RME)
        rme = np.mean(np.abs(forecast - actual) / (np.abs(actual) + epsilon))

        metrics = {
            "q_error": q_error,
            "mae": mae,
            "rme": rme
        }

        print("Evaluation Metrics:")
        print(f"Q-error: {q_error:.4f}")
        print(f"MAE: {mae:.4f}")
        print(f"RME: {rme:.4f}")

        return metrics

    def save_model(self):
        """
        Saves the trained model to disk.
        """
        if self.model is None:
            raise ValueError("No trained model to save.")

        self.model.save()
        print(f"Model saved successfully.")

    def load_model(self, path: str):
        """
        Loads a trained model from disk.
        """
        if not os.path.exists(path):
            raise FileNotFoundError(f"No model file found at {path}")

        self.model = TimeSeriesPredictor.load(path)
        print(f"Model loaded from {path}.")

    def train_with_cv_and_tuning(
        self,
        train_data: pd.DataFrame,
        target_column: str = "query_count",
        hyperparams_list: List[Dict] = None,
        num_val_windows: int = 3,
        eval_metric: str = "WQL"
    ):
        """
        Trains the DeepAR model with multiple hyperparameter configurations
        and rolling-window cross-validation, then picks the best model.

        :param train_data: The training DataFrame (with 'timestamp' and target_column).
        :param target_column: The name of the target column.
        :param hyperparams_list: A list of dictionaries of hyperparameters for DeepAR.
        :param num_val_windows: Number of rolling windows to use for cross-validation.
        :param eval_metric: The evaluation metric to use, e.g. "WQL", "MASE", etc.
        """
        if hyperparams_list is None or len(hyperparams_list) == 0:
            # default to a single set
            hyperparams_list = [
                {
                    "epochs": 20,
                    "learning_rate": 1e-3,
                    "num_layers": 2,
                    "hidden_size": 40,
                    "dropout_rate": 0.1,
                    "batch_size": 32
                }
            ]

        # Prepare data (long format)
        prepared_data = self.prepare_data(train_data, target_column=target_column)

        # Construct a hyperparameters dict recognized by AutoGluon (list under "DeepAR" key)
        hyperparams = {"DeepAR": hyperparams_list}

        # Create a new predictor
        from autogluon.timeseries import TimeSeriesPredictor

        predictor = TimeSeriesPredictor(
            target=target_column,
            prediction_length=self.prediction_length,
            freq=self.freq,
            eval_metric=eval_metric
        )

        # Fit with cross-validation (num_val_windows) and the given hyperparams
        predictor.fit(
            train_data=prepared_data,
            hyperparameters=hyperparams,
            num_val_windows=num_val_windows,
            verbosity=2  # for more detailed logs
        )

        # Store the best predictor in self.model
        self.model = predictor
        print("Cross-validation and hyperparameter tuning complete. Best model stored in self.model.")

In [None]:
from datetime import datetime

class DataLoadError(Exception):
    """Custom error raised when there is a problem loading the data."""

    def __init__(self, message, code=None):
        """
        Args:
            message (str): A descriptive error message.
            code (int, optional): An optional error code for more granular error handling.
        """
        # Capture the current timestamp
        self.timestamp = datetime.now()
        self.message = message
        self.code = code

        # Format the error message with timestamp and error code (if provided)
        timestamp_str = self.timestamp.strftime('%Y-%m-%d %H:%M:%S')
        if code is not None:
            full_message = f"[{timestamp_str}] [Error {code}] {message}"
        else:
            full_message = f"[{timestamp_str}] {message}"

        super().__init__(full_message)

    def log_error(self):
        """Log the error details along with the timestamp."""
        print(f"Error Logged at {self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}: {self}")

class DataSplitError(Exception):
    """Custom error raised when there is an issue splitting the dataset."""

    def __init__(self, message, code=None):
        """
        Args:
            message (str): A descriptive error message.
            code (int, optional): An optional error code for more granular error handling.
        """
        self.timestamp = datetime.now()
        self.message = message
        self.code = code

        timestamp_str = self.timestamp.strftime('%Y-%m-%d %H:%M:%S')
        if code is not None:
            full_message = f"[{timestamp_str}] [Error {code}] {message}"
        else:
            full_message = f"[{timestamp_str}] {message}"

        super().__init__(full_message)

    def log_error(self):
        """Log the error details along with the timestamp."""
        print(f"Error Logged at {self.timestamp.strftime('%Y-%m-%d %H:%M:%S')}: {self}")

In [None]:
import os
import pandas as pd
# from utility.errors import (
#     DataLoadError,
#     DataSplitError,
# )


class DataManager:
    """Helper class for loading data and creating train/test splits."""

    def __init__(self, cluster_type, instance_id):
        """
        Initialize the DataManager with a specific cluster type and instance ID.

        Args:
            cluster_type (str): either 'serverless' or 'provisioned'.
            instance_id (int): indicates which instance is being considered.
        """
        self.cluster_type = cluster_type
        self.instance_id = instance_id
        self.data = None

    def load_data(self):
        """
        Loads the data into a dataframe for training and testing.

        Returns:
            pd.DataFrame: A dataframe with columns:
                - instance_id: id of the instance
                - timestamp: hourly timestamp in the format 'YYYY-MM-DD HH'
                - query_count: total number of queries in that hour
                - runtime: combined execution time of all queries in that hour
                - bytes_scanned: total amount of Gigabytes scanned in that hour

        Raises:
            DataLoadError: If the cluster_type is invalid, no appropriate file is found,
                           or if the file's columns do not match the expected ones.
        """
        # Validate cluster_type
        if self.cluster_type not in ["serverless", "provisioned"]:
            raise DataLoadError(
                "Invalid cluster_type. Must be 'serverless' or 'provisioned'.",
                code=1001,
            )

        # Define expected filenames based on cluster_type
        parquet_file = f"{self.cluster_type}.parquet"
        csv_file = f"{self.cluster_type}.csv"

        # Attempt to load the dataframe from the available file
        if os.path.exists(parquet_file):
            try:
                df = pd.read_parquet(parquet_file)
            except Exception as e:
                raise DataLoadError(
                    f"Error reading the parquet file: {e}", code=1002
                )
        elif os.path.exists(csv_file):
            try:
                df = pd.read_csv(csv_file)
            except Exception as e:
                raise DataLoadError(
                    f"Error reading the CSV file: {e}", code=1003
                )
        else:
            raise DataLoadError(
                f"Neither {parquet_file} nor {csv_file} was found in the working directory.",
                code=1004,
            )

        # Define the expected columns
        expected_columns = {
            "instance_id",
            "timestamp",
            "query_count",
            "runtime",
            "bytes_scanned",
        }

        # Check if dataframe columns exactly match the expected columns
        if set(df.columns) != expected_columns or len(df.columns) != len(
            expected_columns
        ):
            raise DataLoadError(
                "The loaded dataframe does not have the required columns.",
                code=1005,
            )

        # Filter the dataframe to only include rows with the specified instance_id
        self.data = df[df["instance_id"] == self.instance_id]
        return self.data

    def train_test_split(self, data=None):
        """
        Splits the data into two training and test sets. The first training set spans the first N-2 weeks,
        and the second training set spans the first N-1 weeks. The test sets consist of the week immediately
        following the respective training set.

        Args:
            data (pd.DataFrame, optional): DataFrame to be split. If not provided, uses the loaded data.

        Returns:
            tuple: (first training set, first test set, second training set, second test set)

        Raises:
            DataSplitError: If there are fewer than 3 weeks in the data.
        """
        # Use the loaded data if none is provided
        if data is None:
            if self.data is None:
                raise DataSplitError(
                    "No data available to split. Please load data first.",
                    code=2001,
                )
            data = self.data.copy()
        else:
            data = data.copy()

        # Ensure that the 'timestamp' column is in datetime format
        if not pd.api.types.is_datetime64_any_dtype(data["timestamp"]):
            data["timestamp"] = pd.to_datetime(data["timestamp"])

        # reset the latest timestamp to midnight. This ensures that
        # only full days are included<s
        latest_date = data["timestamp"].max().normalize()
        data = data[data["timestamp"] < latest_date]

        # Create a new column 'week' representing the week period (using ISO week)
        data["week"] = data["timestamp"].dt.to_period("W")

        # Get a sorted list of unique weeks in the data
        unique_weeks = sorted(data["week"].unique())
        N = len(unique_weeks)

        if N < 3:
            raise DataSplitError(
                "Not enough weeks in data for splitting. Need at least 3 weeks.",
                code=2000,
            )

        # First split:
        #   Training set: weeks[0] to weeks[N-3] (i.e. first N-2 weeks)
        #   Test set: the week immediately after, i.e. week at index N-2
        train1_weeks = unique_weeks[: N - 2]
        test1_week = unique_weeks[N - 2]

        # Second split:
        #   Training set: weeks[0] to weeks[N-2] (i.e. first N-1 weeks)
        #   Test set: the week immediately after, i.e. the last week
        train2_weeks = unique_weeks[: N - 1]
        test2_week = unique_weeks[-1]

        # Create the splits based on the week periods
        train1 = data[data["week"].isin(train1_weeks)].copy()
        test1 = data[data["week"] == test1_week].copy()
        train2 = data[data["week"].isin(train2_weeks)].copy()
        test2 = data[data["week"] == test2_week].copy()

        for df in (train1, test1, train2, test2):
            df.drop(columns=["week"], inplace=True)

        return train1, test1, train2, test2


In [None]:
import pandas as pd
from datetime import datetime
# from utility.helpers import DataManager
# import visualization
# from utility.baseline_models import DeepAR
import matplotlib.pyplot as plt
import seaborn as sns
datamanager = DataManager('provisioned', 96)
import logging
import os

In [None]:
# Load the data
data = datamanager.load_data()
# Convert timestamp to datetime
data['timestamp'] = pd.to_datetime(data['timestamp'])
# Sort the data by timestamp
data = data.sort_values('timestamp')

In [None]:
# Visualize data
# visualization.visualize_data(data)
data
import numpy as np
data['query_count'] = np.log1p(data['query_count'])  # Apply log transform

In [None]:
# Split into training and test data:
# Following the approach in the paper (p. 132), for a cluster with N weeks of data,
# the first train-test split includes N-2 weeks for training and the following week
# for testing. The second train-test split contains the first N-1 weeks for training
# and the following week for testing, representing a scenario of re-training a model
# each week and forecasting for the next week

train1, test1, train2, test2 = datamanager.train_test_split(data)

print(f"train1 shape: {train1.shape}")
print(f"test1 shape: {test1.shape}")
print(f"train2 shape: {train2.shape}")
print(f"test2 shape: {test2.shape}")

print(train1.iloc[-1])
print(test1.iloc[0])
print(train1.iloc[-1])
print(test1.iloc[0])

## Baseline Model: DeepAR

In [None]:
print(train1['timestamp'].max(), test1['timestamp'].min())

In [None]:
print(train1.isnull().sum())  # Check for NaNs
train_data = train1.fillna(0)  # Fill missing values with zero

In [None]:
# Define the forecast horizon (e.g., forecast the next 48 hours)
prediction_length = 168

# Forecast should start one hour after the last training timestamp
start_forecast = test1['timestamp'].min()
end_forecast = start_forecast + pd.Timedelta(hours=prediction_length - 1)
print("Forecast horizon:", start_forecast, "to", end_forecast)

# Filter test1 to only include rows within the forecast horizon
test_forecast = test1[(test1['timestamp'] >= start_forecast) & (test1['timestamp'] <= end_forecast)]
print("Filtered test_forecast timestamps:")
print(test_forecast[['timestamp', 'query_count']].count())

hyperparameters = {
    'DeepAR': {
        'num_layers': 2,          # Increase to 3 layers for better representation
        'hidden_size': 40,        # Slightly larger hidden size
        'dropout_rate': 0.2,      # Higher dropout to prevent overfitting
        'learning_rate': 5e-4,    # Reduce learning rate for stable training
        'patience': 5,           # Increase patience for early stopping
        'max_epochs': 50,        # More epochs for better convergence
        'context_length': 168,
        'use_feat_dynamic_real': True,  # Use additional features (hour, day_of_week),
        'batch_size': 16,         # Reduce batch size for training stability
        'freq': 'H',  # Use uppercase "H" for hourly data
        'verbosity': 2
    }
}

# os.environ["AUTOGLUON_DEVICE"] = "cpu"

# Enable logging (Fix: Force verbose output)
logging.basicConfig(level=logging.INFO)
logging.getLogger("autogluon").setLevel(logging.DEBUG)

# Instantiate the DeepAR model using AutoGluon
model = DeepAR(prediction_length=prediction_length, freq="h",hyperparameters=hyperparameters)

# Train the model on the training data
model.train(train1, target_column="query_count")

# Generate predictions on the filtered test data (forecast horizon)
predictions_df = model.predict(test_forecast, target_column="query_count")
print("Predictions:")
print(predictions_df.head())

# # Evaluate the model on the filtered test data
# evaluation_results = model.evaluate(test_forecast, target_column="query_count")
# print("Evaluation Metrics:")
# print(evaluation_results)

# Save the trained model to disk
model.save_model()

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

# Optionally, convert timestamps to datetime if they aren't already
test1['timestamp'] = pd.to_datetime(test1['timestamp'])
predictions_df['timestamp'] = pd.to_datetime(predictions_df['timestamp'])

plt.figure(figsize=(12, 6))

plt.plot(test_forecast['timestamp'], test_forecast['query_count'], label='Actual')
plt.plot(predictions_df['timestamp'], predictions_df['mean'], label='Predicted', linestyle='--', color='red')

# Fill the confidence interval if available
plt.fill_between(predictions_df['timestamp'],
                 predictions_df['lower_bound'],
                 predictions_df['upper_bound'],
                 color='red', alpha=0.3, label='Confidence Interval')

plt.xlabel('Timestamp')
plt.ylabel('Query Count')
plt.title('Actual vs Predicted Values')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
evaluation_results = model.evaluate(test_forecast, target_column="query_count")
print("Evaluation Metrics:")
print(evaluation_results)

In [None]:
print("Test Forecast Shape:", test_forecast.shape)
print("Predictions Shape:", predictions_df.shape)

print("Test Forecast Head:")
print(test_forecast.head())

print("Predictions DataFrame Head:")
print(predictions_df.head())

print("Prediction timestamps range:", predictions_df["timestamp"].min(), "to", predictions_df["timestamp"].max())
print("Test data timestamps range:", test_forecast["timestamp"].min(), "to", test_forecast["timestamp"].max())