In [None]:
def forecast(
    self,
    x_input: np.ndarray,
    y_period: int,
    *,
    draw_z: bool = True,
    quantiles=(5, 50, 95),
) -> ForecastOutput:
    """Generate multi-step forecast using fitted CCC-GARCH model."""

    if not self.is_calibrated:
        raise ValueError("Model must be calibrated before forecasting")

    if not hasattr(self, 'idata') or not hasattr(self, 'pivot_data'):
        # Fallback to simple approach
        return ForecastOutput(y_hat=[[0.0] * len(self.y_features)] * y_period)

    try:
        # ---- Build the history used by the simulator ------------------------
        # Accept x_input as: (K, n) additional observations after calibration
        x_input = np.asarray(x_input, dtype=float)
        if x_input.ndim == 1:
            x_input = x_input[np.newaxis, :]

        # Scale to model’s internal units
        scale = getattr(self, "data_scale", 1.0)
        x_input_scaled = x_input / scale

        # Use training history + any new rows as simulator seed
        # self.x_full was stored at calibrate time (already scaled)
        X_hist = self.x_full
        if x_input_scaled.size > 0:
            X_hist = np.vstack([self.x_full, x_input_scaled])

        # ---- Simulate without premature averaging ---------------------------
        forecasts, mu_last, sigma_last = self._simulate_ccc_garch_paths(
            self.idata,
            X_hist,
            steps=y_period,
            draw_z=draw_z,
            seed=123,
            rescale=True,  # bring back to original scale
        )
        # forecasts: (steps, S, n) in original scale

        if forecasts is None or forecasts.shape[0] == 0:
            y_hat = [[0.0] * len(self.y_features)] * y_period
            return ForecastOutput(y_hat=y_hat)

        # ---- Summaries AFTER simulation -------------------------------------
        mean_path = forecasts.mean(axis=1)  # (steps, n)
        qs = {q: np.percentile(forecasts, q, axis=1) for q in quantiles}  # dict[q] -> (steps, n)

        # Keep your expected format: List[List[float]]
        y_hat = [mean_path[t].tolist() for t in range(y_period)]

        # If your ForecastOutput supports extra fields, attach intervals
        extras = dict(
            quantiles={int(q): qs[q].tolist() for q in quantiles},  # (steps, n) per quantile
        )

        return ForecastOutput(y_hat=y_hat, extras=extras)  # drop extras if your class doesn’t allow it

    except Exception as e:
        print(f"Forecast generation failed: {e}")
        return ForecastOutput(y_hat=[[0.0] * len(self.y_features)] * y_period)


In [None]:
def evaluate(
    self,
    x_test: np.ndarray,
    y_test: np.ndarray,
    *_, **__
) -> ForecastEvaluationOutput:
    """Evaluate multi-step forecast accuracy against y_test."""

    if not self.is_calibrated:
        raise ValueError("Model must be calibrated before evaluation")

    # y_test expected shape: (H, n)
    y_test = np.asarray(y_test, dtype=float)
    if y_test.ndim == 1:
        y_test = y_test[np.newaxis, :]

    H, n = y_test.shape

    # Produce H-step forecast starting from x_test
    fcst = self.forecast(x_test, H, draw_z=True)
    y_hat = np.asarray(fcst.y_hat, dtype=float)  # (H, n)

    if y_hat.shape != y_test.shape:
        raise ValueError(f"Shape mismatch: forecast {y_hat.shape} vs y_test {y_test.shape}")

    # NaN-safe RMSE
    diff = y_hat - y_test
    mask = np.isfinite(diff)
    if not np.any(mask):
        rmse = 999.0
        per_series_rmse = [999.0] * n
    else:
        rmse = np.sqrt(np.nanmean((diff[mask]) ** 2))
        per_series_rmse = np.sqrt(np.nanmean((diff) ** 2, axis=0)).tolist()

    return ForecastEvaluationOutput(
        y_hat=y_hat.tolist(),
        error_type=ErrorType.RMSE,
        error_value=rmse,
        extras={"per_series_rmse": per_series_rmse}
    )


In [None]:
def plot_forecast_fanchart(self, forecast_output: ForecastOutput, series_index: int = 0, title=None, savepath=None):
    import matplotlib.pyplot as plt
    qs = forecast_output.extras.get("quantiles", None) if hasattr(forecast_output, "extras") else None
    y_hat = np.asarray(forecast_output.y_hat)

    H = y_hat.shape[0]
    x_axis = np.arange(1, H + 1)

    plt.figure(figsize=(8, 4.5))
    plt.plot(x_axis, y_hat[:, series_index], label="Mean forecast")

    if qs is not None:
        q5  = np.asarray(qs[5])[:, series_index]
        q95 = np.asarray(qs[95])[:, series_index]
        plt.fill_between(x_axis, q5, q95, alpha=0.25, label="90% PI")

    plt.xlabel("Horizon")
    plt.ylabel(f"Series {series_index}")
    plt.title(title or "CCC-GARCH Forecast")
    plt.legend()
    plt.tight_layout()
    if savepath:
        plt.savefig(savepath, dpi=150, bbox_inches="tight")
    plt.close()


In [None]:
def plot_in_sample_fit(self, idata, y_true, series_names=None):
    """Plot actual vs fitted values (posterior mean) for in-sample data."""
    mu_hat = idata.posterior['mu'].mean(dim=["chain", "draw"]).values
    sigma_hat = idata.posterior['sigma'].mean(dim=["chain", "draw"]).values
    
    n_obs, n_series = y_true.shape
    if series_names is None:
        series_names = [f"Series {i+1}" for i in range(n_series)]

    fig, axes = plt.subplots(n_series, 1, figsize=(12, 4*n_series), sharex=True)
    if n_series == 1:
        axes = [axes]

    for j in range(n_series):
        axes[j].plot(y_true[:, j], label="Observed", color="black")
        axes[j].plot(mu_hat[:, j], label="Fitted mean", color="red")
        axes[j].fill_between(
            range(n_obs),
            mu_hat[:, j] - 2 * sigma_hat[:, j],
            mu_hat[:, j] + 2 * sigma_hat[:, j],
            color="red", alpha=0.2, label="±2σ"
        )
        axes[j].set_title(f"In-sample fit: {series_names[j]}")
        axes[j].legend()

    plt.tight_layout()
    plt.show()


In [None]:
def plot_forecast(self, forecasts, y_test=None, series_names=None):
    """Fan chart for CCC-GARCH forecasts."""
    n_steps, n_series = forecasts.shape[0], forecasts.shape[2]
    if series_names is None:
        series_names = [f"Series {i+1}" for i in range(n_series)]

    fig, axes = plt.subplots(n_series, 1, figsize=(12, 4*n_series), sharex=True)
    if n_series == 1:
        axes = [axes]

    for j in range(n_series):
        forecast_dist = forecasts[:, :, j]  # shape (steps, draws)
        p5, p50, p95 = np.percentile(forecast_dist, [5, 50, 95], axis=1)

        axes[j].plot(p50, label="Forecast median", color="blue")
        axes[j].fill_between(range(n_steps), p5, p95, color="blue", alpha=0.2, label="90% CI")
        if y_test is not None:
            axes[j].plot(y_test[:, j], label="Actual future", color="black")

        axes[j].set_title(f"Forecast: {series_names[j]}")
        axes[j].legend()

    plt.tight_layout()
    plt.show()


In [None]:
import arviz as az

# Plot fitted mu vs observed
mu_post = idata.posterior["mu"].mean(dim=("chain","draw"))
sigma_post = idata.posterior["sigma"].mean(dim=("chain","draw"))

plt.plot(Y[:,0], label="Observed series 1")
plt.plot(mu_post[:,0], label="Fitted mean")
plt.fill_between(range(len(Y)),
                 mu_post[:,0]-2*sigma_post[:,0],
                 mu_post[:,0]+2*sigma_post[:,0],
                 alpha=0.2, label="±2σ")
plt.legend()
