- See instructions based on <em> ../coverages.ipynb </em>

In [1]:
import numpy as np
import pandas as pd
from autogluon.timeseries.metrics import TimeSeriesScorer
from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor
from plotnine import *
from statsmodels.tsa.arima_process import ArmaProcess
from typing import Any, Dict, Type

In [2]:
########################################################################
# 1. Helper function: compute coverage for one prediction window.
########################################################################

def help_calc_coverages_for_1_window(
    target: str,
    test_data: TimeSeriesDataFrame,
    prediction_length: int,
    predictions: TimeSeriesDataFrame
) -> float:
    """
    Given:
      - test_data: a TimeSeriesDataFrame whose target column holds the observerd values.
      - predictions: a TimeSeriesDataFrame containing forecasts for the last prediction_length
                     time points in test_data. Its next-to-last column holds the lower
                     prediction bound and its last column holds the upper bound.
    
    This function computes the coverage of the prediction intervals (i.e. the fraction of
    true values that lie between the lower and upper bounds).
    """
    # Get the true values for the test period.
    # (Assume that test_data is ordered so that its last prediction_length rows are the test set.)
    obs_values = test_data[target].iloc[-prediction_length:]
    
    # The next-to-last column contains the lower bound and the last column the upper bound.
    lower_bounds = predictions.iloc[:, -2]
    upper_bounds = predictions.iloc[:, -1]

    print(pd.DataFrame({"Obs":obs_values, predictions.columns[1]:lower_bounds, predictions.columns[2]:upper_bounds})) # for checking

    # Compute the fraction of true values that fall within the interval.
    in_interval = ((obs_values >= lower_bounds) & (obs_values <= upper_bounds)).mean()

    return float(in_interval)

In [3]:

########################################################################
# 2. Compute coverages on one window using one (or more) models.
########################################################################

def calc_coverages_for_1_window(
    window_data: pd.DataFrame,
    timestamp_col: str,
    target: str | None,
    prediction_length: int,
    eval_metric: str | TimeSeriesScorer | None,
    ci_level: float,
    time_limit: int | None,
    hyperparameters: Dict[str | Type, Any]
) -> pd.DataFrame:
    """
    Given a block of data (window_data) with observation times (in timestamp_col)
    and targets (in target), this function splits the window into training and test parts.
    The training part is the first (window length - prediction_length) observations and
    the test part is all the window length observations.
    
    For each model specified in the hyperparameters dict, a TimeSeriesPredictor is trained
    (with the given eval_metric and within time_limit seconds) and used to produce forecasts
    along with ci_level prediction intervals. The test-set coverage is computed
    using help_calc_coverages_for_1_window.
    
    The function returns a DataFrame with one row per model and columns:
       test_start_time, test_end_time, model, coverage.
    """
    
    # Assume window_data is already modified with timestamp, target columns.
    # Add item_id col
    window_data["item_id"] = 0
    # Create AutoGluon TimeSeriesDataFrame objects.
    window_ts = TimeSeriesDataFrame.from_data_frame(window_data)

    # Split the window into training and test data.
    train_df = window_data.iloc[:-prediction_length].copy()
    test_df = window_data.iloc[-prediction_length:].copy() # ease for getting start & end time

    train_ts, test_ts = window_ts.train_test_split(prediction_length) # predictor & helper function

    # Get the start and end times of the test period.
    test_start_time = test_df[timestamp_col].iloc[0]
    test_end_time   = test_df[timestamp_col].iloc[-1]
    
    # Compute the quantile levels corresponding to the desired ci_level.
    # For example, if ci_level=0.95, we use lower quantile 0.025 and upper quantile 0.975.
    lower_quantile = (1 - ci_level) / 2
    upper_quantile = 1 - lower_quantile

    results = []
    
    # Iterate over models specified in hyperparameters.
    for model_name, params in hyperparameters.items():

        # Infer the frequency from the training timestamps.
        freq = pd.infer_freq(train_df[timestamp_col])
        
        # Create a predictor.
        predictor = TimeSeriesPredictor(
            prediction_length = prediction_length,
            target = target,
            eval_metric = eval_metric,
            freq = freq,
            quantile_levels = [lower_quantile, upper_quantile]
        )
        
        # Fit the predictor using only this model's hyperparameters.
        predictor.fit(train_ts, hyperparameters = {model_name: params}, time_limit = time_limit)
        
        # Get predictions including quantile forecasts.
        predictions = predictor.predict(train_ts)
        
        # Compute the coverage using our helper function.
        coverage = help_calc_coverages_for_1_window(target, test_ts, prediction_length, predictions)
        
        results.append({
            "test_start_time": test_start_time,
            "test_end_time": test_end_time,
            "model": model_name,
            "coverage": coverage
        })
    
    return pd.DataFrame(results, 
                        columns = ["test_start_time", "test_end_time", "model", "coverage"])


In [4]:
########################################################################
# 3. Compute coverages across many windows.
########################################################################

def calc_coverages(
    data: pd.DataFrame,
    train_size: int,
    prediction_length: int,
    stride: int,
    timestamp_col: str,
    target: str | None,
    eval_metric: str | TimeSeriesScorer | None,
    ci_level: float,
    time_limit: int | None,
    hyperparameters: Dict[str | Type, Any]
) -> pd.DataFrame:
    """
    Carves data into overlapping windows. Each window has train_size + prediction_length
    observations. The first train_size observations are used for training and the final
    prediction_length for testing. Windows are separated by stride.
    
    For each window and for each model in hyperparameters, the test-set coverage is computed
    (using calc_coverages_for_1_window). The results are returned in a DataFrame with columns:
       test_start_time, test_end_time, model, coverage.
    """
    results = []
    total_window_length = train_size + prediction_length
    
    # Loop over all starting indices for windows that fit in data.
    for start in range(0, len(data) - total_window_length + 1, stride):

        window_data = data.iloc[start : start + total_window_length].copy()

        window_coverages = calc_coverages_for_1_window(
            window_data = window_data,
            timestamp_col = timestamp_col,
            target = target,
            prediction_length = prediction_length,
            eval_metric = eval_metric,
            ci_level = ci_level,
            time_limit = time_limit,
            hyperparameters = hyperparameters
        )

        results.append(window_coverages)
    
    if results:
        return pd.concat(results, ignore_index=True)[["test_start_time", "test_end_time", "model", "coverage"]]
    else:
        # If no window fits, return an empty DataFrame with the correct columns.
        return pd.DataFrame(columns=["test_start_time", "test_end_time", "model", "coverage"])

In [5]:
########################################################################
# 4. Simulate an AR(1) process.
########################################################################

def simulate_ar1(phi: float, sigma: float, n: int, rng: np.random.Generator) -> pd.DataFrame:
    """
    Simulate a Gaussian AR(1) process defined by:
         y[t] = phi * y[t-1] + epsilon[t],
    where the epsilon's are independent N(0,sigma^2) random variables.
    
    The process is stationary only if |phi| < 1.
    A ValueError is raised if |phi| >= 1.
    
    Returns a DataFrame with two columns:
      - timestamp: starting at "2020-01-01 00:00:00" with a one-minute frequency.
      - target: the simulated time series.
    """
    if abs(phi) >= 1:
        raise ValueError("The AR(1) process is stationary only if |phi| < 1.")
    
    # The ARMA process in statsmodels is specified with the convention:
    #    ar_params are the coefficients for L^1, L^2, ... in the polynomial:
    #         1 - phi * L
    # so we set ar = [1, -phi] and ma = [1].
    ar = np.array([1, -phi])
    ma = np.array([1])
    arma_process = ArmaProcess(ar, ma)
    
    # Generate a sample of length n.
    sample = arma_process.generate_sample(nsample=n, scale=sigma, distrvs = lambda size: rng.standard_normal(size))
    
    # Create a timestamp series (one-minute steps).
    timestamps = pd.date_range(start="2020-01-01 00:00:00", periods=n, freq='T')
    
    return pd.DataFrame({"timestamp": timestamps, "target": sample})

In [6]:
########################################################################
# 5. Compute phi from FVE.
########################################################################

def calc_phi_from_fve(fve: float) -> float:
    """
    For the AR(1) model y[t] = phi*y[t-1] + epsilon[t] (with stationary variance),
    one can show that the optimal one‐step linear predictor is phi*y[t-1] and that
    the fraction of variance explained (FVE) is given by
    
         FVE = phi^2.
    
    This function returns the nonnegative phi (i.e. sqrt(fve)) that yields the given FVE.
    A ValueError is raised if fve is not in the interval [0, 1).
    """
    if not (0 <= fve < 1):
        raise ValueError("fve must be in the interval [0, 1).")
    return np.sqrt(fve)

In [7]:
########################################################################
# 6. Test the code.
########################################################################

if __name__ == "__main__":
    fve = 0.9
    phi = calc_phi_from_fve(fve)
    sigma = 1
    n = 1000
    rpg = np.random.default_rng(123)
    # Simulate the AR(1) series.
    window_data = simulate_ar1(phi, sigma, n, rpg)
    
    # Set up parameters for evaluation.
    timestamp_col = "timestamp"
    target = "target"
    prediction_length = 100
    eval_metric = "RMSE" 
    ci_level = 0.95
    time_limit = 60
    hyperparameters = {"AutoARIMA": {}, "PatchTST": {}}
    
    # Run the coverage evaluation for one window.
    coverage_df = calc_coverages_for_1_window(
        window_data, timestamp_col, target, prediction_length,
        eval_metric, ci_level, time_limit, hyperparameters
    )
    
    print(coverage_df)
    # Since the prediction intervals are calibrated at 95%, the coverage should be around 0.95.

Beginning AutoGluon training... Time limit = 60s
AutoGluon will save models to 'c:\Users\Administrator\Desktop\URPS\urps_2025\notebooks\Jingyi_notebooks\AutogluonModels\ag-20250204_184713'
AutoGluon Version:  1.2
Python Version:     3.11.11
Operating System:   Windows
Platform Machine:   AMD64
Platform Version:   10.0.19045
CPU Count:          16
GPU Count:          0
Memory Avail:       4.95 GB / 15.73 GB (31.5%)
Disk Space Avail:   72.76 GB / 200.00 GB (36.4%)

Fitting with arguments:
{'enable_ensemble': True,
 'eval_metric': RMSE,
 'freq': 'min',
 'hyperparameters': {'AutoARIMA': {}},
 'known_covariates_names': [],
 'num_val_windows': 1,
 'prediction_length': 100,
 'quantile_levels': [0.025000000000000022, 0.975],
 'random_seed': 123,
 'refit_every_n_windows': 1,
 'refit_full': False,
 'skip_model_selection': False,
 'target': 'target',
 'time_limit': 60,
 'verbosity': 2}

Provided train_data has 900 rows, 1 time series. Median time series length is 900 (min=900, max=900). 

Provide

                                  Obs  0.025000000000000022     0.975
item_id timestamp                                                    
0       2020-01-01 15:00:00  4.841164              3.035982  6.981537
        2020-01-01 15:01:00  5.326229              1.964472  7.466225
        2020-01-01 15:02:00  6.155110              1.152297  7.727629
        2020-01-01 15:03:00  8.303626              0.485910  7.878035
        2020-01-01 15:04:00  9.405896             -0.080984  7.961571
...                               ...                   ...       ...
        2020-01-01 16:35:00 -1.432573             -5.546097  6.284125
        2020-01-01 16:36:00 -1.268630             -5.546715  6.283556
        2020-01-01 16:37:00 -0.832870             -5.547294  6.283022
        2020-01-01 16:38:00 -1.853398             -5.547835  6.282522
        2020-01-01 16:39:00 -1.519172             -5.548343  6.282053

[100 rows x 3 columns]


	-3.6299       = Validation score (-RMSE)
	25.40   s     = Training runtime
	0.01    s     = Validation (prediction) runtime
Not fitting ensemble as only 1 model was trained.
Training complete. Models trained: ['PatchTST']
Total runtime: 25.43 s
Best model: PatchTST
Best model score: -3.6299
Model not specified in predict, will default to the model with the best validation score: PatchTST


                                  Obs  0.025000000000000022     0.975
item_id timestamp                                                    
0       2020-01-01 15:00:00  4.841164              0.838783  7.556188
        2020-01-01 15:01:00  5.326229             -0.343544  7.558633
        2020-01-01 15:02:00  6.155110             -0.695125  8.064848
        2020-01-01 15:03:00  8.303626             -0.954300  8.061183
        2020-01-01 15:04:00  9.405896             -0.698660  8.805294
...                               ...                   ...       ...
        2020-01-01 16:35:00 -1.432573             -5.431295  6.274995
        2020-01-01 16:36:00 -1.268630             -5.073925  5.779355
        2020-01-01 16:37:00 -0.832870             -5.464878  6.106858
        2020-01-01 16:38:00 -1.853398             -3.542656  5.221827
        2020-01-01 16:39:00 -1.519172             -3.874869  5.493094

[100 rows x 3 columns]
      test_start_time       test_end_time      model  coverage
0 2