# Electricity portfolio VaR & ES framework with real exposures and EWMA covariance.

Core features:
- Reads market data from Excel:
    - prices.xlsx: wide format, index = Date, columns = product names, values = prices
    - positions.xlsx: columns = ["Product", "MWh", "Sensitivity"]
- Computes:
    - Historical VaR & ES
    - Parametric VaR & ES using EWMA covariance 
    - Monte Carlo VaR & ES using EWMA covariance
    - Backtesting (Kupiec POF, Christoffersen independence)
    - Volatility and correlation stress tests
- Builds a Plotly dashboard:
    - Row 1: Histograms with VaR & ES lines (Historical, Parametric, Monte Carlo)
    - Row 2: Enhanced exposure weights plot (sorted, top-5 highlight, category colors, Pareto curve)
    - Row 3: Metrics table
- Logs all key metrics and exposure weights to an Excel file (risk_results.xlsx)

In [1]:
import os
import yaml
from datetime import datetime
import numpy as np
import pandas as pd
from scipy.stats import norm, chi2
import plotly.graph_objects as go
from plotly.subplots import make_subplots


# Configuration

with open("config.yaml", "r") as f:
    config = yaml.safe_load(f)
    
alpha = config["alpha"]                 # VaR / ES confidence level
n_days = config["n_days"]               # Number of historical days (synthetic)
n_mc = config["n_mc"]                   # Monte Carlo scenarios
kpi_filename = config["KPI_filename"]
dashboard_name = config["dashboard_name"]
ewma_lambda = config["ewma_lambda"]     
k = config["volatility_stress_factor"] 
delta_rho = config["correlation_stress_factor"] 

prices_xlsx = "prices.xlsx"   
positions_xlsx = "positions.xlsx"  


# Instrument and Portfolio Classes
class Instrument:
    """
    Represents a single electricity trading instrument with real exposure.
    Attributes:
    - name: product identifier (e.g. "MBL_1", "MPL_3", etc.)
    - prices: time series of prices (pd.Series indexed by Date)
    - mwh: position size in MWh
    - price_sensitivity: delta with respect to price 
    """

    def __init__(self, name: str, prices: pd.Series, mwh: float, price_sensitivity: float):
        self.name = name
        self.prices = prices
        self.mwh = mwh
        self.price_sensitivity = price_sensitivity

    @property
    def returns(self) -> pd.Series:
        """
        Compute log-returns of the instrument:
            r_t = ln(P_t / P_{t-1})
        Returns:
            pd.Series of log-returns, aligned with the price index (minus first observation).
        """
        return np.log(self.prices / self.prices.shift(1)).dropna()

    @property
    def exposure(self) -> float:
        """
        Compute the instrument's exposure using the latest available price:
            Exposure_i = Price_i * MWh_i * price_sensitivity_i
        Returns:
            float: exposure value.
        """
        latest_price = self.prices.iloc[-1]
        return float(latest_price * self.mwh * self.price_sensitivity)


class Portfolio:
    """
    Represents a portfolio of electricity instruments.
    Provides:
    - Aligned returns matrix for all instruments
    - Exposure vector
    - Portfolio return and loss series
    """

    def __init__(self, instruments: list[Instrument]):
        self.instruments = instruments

    def get_returns_matrix(self) -> pd.DataFrame:
        """
        Build a DataFrame of instrument returns aligned by date.
        Returns:
            pd.DataFrame: columns = instrument names, index = dates, values = returns.
        """
        rets = {inst.name: inst.returns for inst in self.instruments}
        df = pd.DataFrame(rets).dropna()
        return df

    def get_exposure_vector(self) -> np.ndarray:
        """
        Build the exposure vector from all instruments.
        Returns:
            np.ndarray: exposures for each instrument in the portfolio.
        """
        return np.array([inst.exposure for inst in self.instruments], dtype=float)

    def portfolio_return_series(self) -> pd.Series:
        """
        Compute the portfolio return series as a weighted sum of instrument returns:
            R_{P,t} = sum_i w_i * r_{i,t}
        where:
            w_i = Exposure_i / sum_j |Exposure_j|
        Returns:
            pd.Series: portfolio returns over time.
        """
        returns_df = self.get_returns_matrix()
        exposures = self.get_exposure_vector()
        # Normalize exposures by total absolute exposure to get weights
        w_norm = exposures / np.sum(np.abs(exposures))
        rp = returns_df.dot(w_norm)
        rp.name = "Portfolio_Return"
        return rp

    def portfolio_loss_series(self) -> pd.Series:
        """
        Compute the portfolio loss series:
            L_t = - R_{P,t}
        Returns:
            pd.Series: portfolio losses over time.
        """
        rp = self.portfolio_return_series()
        loss = -rp
        loss.name = "Portfolio_Loss"
        return loss

# EWMA Covariance

In [2]:
def ewma_covariance(returns_df: pd.DataFrame, lambda_: float = ewma_lambda) -> np.ndarray:
    """
    Compute the EWMA covariance matrix.
    Recursion:
        Σ_t = λ Σ_{t-1} + (1 - λ) r_t r_t^T
    where:
        - λ is the decay factor (0 < λ < 1)
        - r_t is the vector of returns at time t
    Args:
        returns_df: DataFrame of returns (rows = time, columns = instruments)
        lambda_: EWMA decay factor
    Returns:
        np.ndarray: EWMA covariance matrix.
    """
    # Convert to numpy array: shape (n_observations, n_instruments)
    r = returns_df.values
    n, k = r.shape
    # Initialize with the sample covariance matrix
    sigma = np.cov(r, rowvar=False)
    # Recursively update the covariance matrix
    for t in range(1, n):
        rt = r[t].reshape(-1, 1)  # column vector
        sigma = lambda_ * sigma + (1 - lambda_) * (rt @ rt.T)

    return sigma

# VaR & ES Calculator

In [3]:
class VaRCalculator:
    """
    Computes Historical, Parametric, and Monte Carlo VaR & ES for the portfolio.

    Uses:
    - Historical VaR/ES directly from the loss distribution.
    - Parametric VaR/ES using EWMA covariance and normality assumption.
    - Monte Carlo VaR/ES using EWMA covariance and simulated returns.
    """

    def __init__(self, portfolio: Portfolio, alpha: float = alpha):
        self.portfolio = portfolio
        self.alpha = alpha
        # Returns matrix and loss series are computed once and reused
        self.returns_df = portfolio.get_returns_matrix()
        self.loss_series = portfolio.portfolio_loss_series()

    # Historical VaR & ES 
    def historical_var(self) -> float:
        """
        Historical VaR at confidence level alpha:
            VaR_alpha = quantile_alpha(Losses)
        Returns:
            float: historical VaR.
        """
        return np.quantile(self.loss_series.values, self.alpha)

    def historical_es(self) -> float:
        """
        Historical Expected Shortfall (ES) at confidence level alpha:
            ES_alpha = E[Loss | Loss >= VaR_alpha]
        Returns:
            float: historical ES.
        """
        var = self.historical_var()
        tail_losses = self.loss_series[self.loss_series >= var]
        return tail_losses.mean()


    # Parametric VaR & ES (EWMA) 
    def parametric_inputs(self):
        """
        Compute parametric inputs based on EWMA covariance:
        - mu_vec: vector of mean returns per instrument
        - sigma_mat: EWMA covariance matrix
        - mu_p: portfolio mean return
        - sigma_p: portfolio volatility
        - w_norm: normalized exposure-based weights
        Returns:
            (mu_vec, sigma_mat, mu_p, sigma_p, w_norm)
        """
        rets = self.returns_df
        mu_vec = rets.mean().values
        sigma_mat = ewma_covariance(rets, lambda_=ewma_lambda)
        exposures = self.portfolio.get_exposure_vector()
        w_norm = exposures / np.sum(np.abs(exposures))

        # Portfolio mean and variance
        mu_p = w_norm @ mu_vec
        sigma_p2 = w_norm @ sigma_mat @ w_norm
        sigma_p = np.sqrt(sigma_p2)

        return mu_vec, sigma_mat, mu_p, sigma_p, w_norm

    def parametric_var(self) -> float:
        """
        Parametric VaR under normality assumption:

            VaR_alpha = - (μ_p + σ_p * z_alpha)

        where z_alpha is the alpha-quantile of the standard normal distribution.

        Returns:
            float: parametric VaR.
        """
        _, _, mu_p, sigma_p, _ = self.parametric_inputs()
        z = norm.ppf(self.alpha)
        var = -(mu_p + sigma_p * z)
        return var

    def parametric_es(self) -> float:
        """
        Parametric ES under normality assumption:

            ES_alpha = - ( μ_p - σ_p * φ(z_alpha) / (1 - α) )

        where:
            - φ is the standard normal pdf
            - z_alpha is the alpha-quantile of the standard normal distribution.

        Returns:
            float: parametric ES.
        """
        _, _, mu_p, sigma_p, _ = self.parametric_inputs()
        z = norm.ppf(self.alpha)
        phi_z = norm.pdf(z)
        es = -(mu_p - sigma_p * phi_z / (1 - self.alpha))
        return es

    # ---------------- Monte Carlo VaR & ES (EWMA) ----------------

    def monte_carlo_var_es(self, n_scenarios: int = n_mc):
        """
        Monte Carlo VaR & ES using EWMA covariance:

        Steps:
        1. Estimate mean vector and EWMA covariance matrix.
        2. Simulate multivariate normal returns.
        3. Compute portfolio returns and losses.
        4. Compute VaR and ES from simulated loss distribution.

        Args:
            n_scenarios: number of Monte Carlo scenarios.

        Returns:
            (var_mc, es_mc, sim_losses)
        """
        rets = self.returns_df
        mu_vec = rets.mean().values
        sigma_mat = ewma_covariance(rets, lambda_= ewma_lambda)
        exposures = self.portfolio.get_exposure_vector()
        w_norm = exposures / np.sum(np.abs(exposures))

        # Simulate multivariate normal returns
        sim_rets = np.random.multivariate_normal(mu_vec, sigma_mat, size=n_scenarios)
        # Portfolio returns from simulated scenarios
        sim_port_rets = sim_rets @ w_norm
        # Losses are negative returns
        sim_losses = -sim_port_rets

        # Monte Carlo VaR and ES
        var_mc = np.quantile(sim_losses, self.alpha)
        es_mc = sim_losses[sim_losses >= var_mc].mean()

        return var_mc, es_mc, sim_losses

# Backtesting

In [4]:
class Backtester:
    """
    Implements:
    - Kupiec Proportion of Failures (POF) test
    - Christoffersen independence test

    Uses a binary exceedance indicator:
        I_t = 1 if Loss_t > VaR_t, else 0
    """

    def __init__(self, loss_series: pd.Series, var_series: pd.Series, alpha: float = alpha):
        self.loss_series = loss_series
        self.var_series = var_series
        self.alpha = alpha
        self.indicator = self._exceedance_indicator()

    def _exceedance_indicator(self) -> pd.Series:
        """
        Build the exceedance indicator series:

            I_t = 1 if Loss_t > VaR_t, else 0

        Returns:
            pd.Series: indicator of VaR breaches.
        """
        ind = (self.loss_series > self.var_series).astype(int)
        ind.name = "Exceedance_Indicator"
        return ind

    def kupiec_pof_test(self):
        """
        Kupiec Proportion of Failures (POF) test.

        Tests whether the observed exceedance frequency matches the expected
        frequency under the model (1 - alpha).

        Returns:
            (LR_pof, p_value, N, T)
            - LR_pof: likelihood ratio statistic
            - p_value: p-value under chi-square(1)
            - N: number of exceedances
            - T: total number of observations
        """
        I = self.indicator
        T = len(I)
        N = I.sum()
        p_hat = N / T
        p0 = 1 - self.alpha

        if p_hat in [0, 1]:
            # Edge case: no variability in exceedances
            LR_pof = 0.0
        else:
            # Likelihood under null (p0) and alternative (p_hat)
            L0 = (p0 ** N) * ((1 - p0) ** (T - N))
            L1 = (p_hat ** N) * ((1 - p_hat) ** (T - N))
            LR_pof = -2 * np.log(L0 / L1)

        p_value = 1 - chi2.cdf(LR_pof, df=1)
        return LR_pof, p_value, N, T

    def christoffersen_independence_test(self):
        """
        Christoffersen independence test.

        Tests whether exceedances are independent over time by looking at
        the transition counts between states (0 -> 0, 0 -> 1, 1 -> 0, 1 -> 1).

        Returns:
            (LR_ind, p_value, (n00, n01, n10, n11))
        """
        I = self.indicator.values
        n00 = n01 = n10 = n11 = 0

        # Count transitions between consecutive days
        for t in range(1, len(I)):
            prev, curr = I[t-1], I[t]
            if prev == 0 and curr == 0:
                n00 += 1
            elif prev == 0 and curr == 1:
                n01 += 1
            elif prev == 1 and curr == 0:
                n10 += 1
            elif prev == 1 and curr == 1:
                n11 += 1

        n0 = n00 + n01
        n1 = n10 + n11

        # Conditional probabilities
        pi01 = n01 / n0 if n0 > 0 else 0.0
        pi11 = n11 / n1 if n1 > 0 else 0.0

        N = I.sum()
        T = len(I)
        pi = N / T if T > 0 else 0.0

        def safe_pow(base, exp):
            # Avoid numerical issues when base <= 0
            return base ** exp if base > 0 else 1.0

        # Likelihood under null (independent with probability pi)
        L0 = safe_pow(1 - pi, n00 + n10) * safe_pow(pi, n01 + n11)
        # Likelihood under alternative (different probabilities after 0 and 1)
        L1 = (safe_pow(1 - pi01, n00) * safe_pow(pi01, n01) *
              safe_pow(1 - pi11, n10) * safe_pow(pi11, n11))

        if L0 == 0 or L1 == 0:
            LR_ind = 0.0
        else:
            LR_ind = -2 * np.log(L0 / L1)

        p_value = 1 - chi2.cdf(LR_ind, df=1)
        return LR_ind, p_value, (n00, n01, n10, n11)

# Stress Testing

In [5]:
class StressTester:
    """
    Performs volatility and correlation stress testing using EWMA covariance.
    - Volatility stress: scales the covariance matrix by k^2.
    - Correlation stress: shifts correlations by delta_rho (capped at 1).
    """

    def __init__(self, portfolio: Portfolio):
        self.portfolio = portfolio
        self.returns_df = portfolio.get_returns_matrix()
        self.exposures = portfolio.get_exposure_vector()
        self.w_norm = self.exposures / np.sum(np.abs(self.exposures))

    def base_covariance(self) -> np.ndarray:
        """
        Compute the base EWMA covariance matrix.
        Returns:
            np.ndarray: covariance matrix.
        """
        return ewma_covariance(self.returns_df, lambda_=ewma_lambda)

    def base_portfolio_vol(self) -> float:
        """
        Compute the base portfolio volatility from EWMA covariance.
        Returns:
            float: portfolio volatility.
        """
        sigma = self.base_covariance()
        sigma_p2 = self.w_norm @ sigma @ self.w_norm
        return np.sqrt(sigma_p2)

    def volatility_stress(self, k=k):
        """
        Apply a volatility stress by scaling the covariance matrix:
            Σ_stress = k^2 * Σ
        This implies portfolio volatility scales by k.
        Args:
            k: volatility scaling factor.
        Returns:
            (sigma_stress, sigma_p_base, sigma_p_stress)
        """
        sigma = self.base_covariance()
        sigma_stress = (k ** 2) * sigma
        sigma_p_base = self.base_portfolio_vol()
        sigma_p_stress = k * sigma_p_base
        return sigma_stress, sigma_p_base, sigma_p_stress

    def correlation_stress(self, delta_rho=delta_rho):
        """
        Apply a correlation stress by increasing all correlations by delta_rho,
        capped at 1.0.

        Steps:
        1. Convert covariance to correlation matrix.
        2. Add delta_rho to off-diagonal elements (capped at 1).
        3. Convert back to covariance using original standard deviations.

        Args:
            delta_rho: correlation shift.

        Returns:
            (sigma_stress, sigma_p_base, sigma_p_stress)
        """
        sigma = self.base_covariance()
        stds = np.sqrt(np.diag(sigma))
        D = np.diag(stds)

        # Compute correlation matrix R = D^{-1} Σ D^{-1}
        with np.errstate(divide='ignore', invalid='ignore'):
            R = sigma / np.outer(stds, stds)
            R = np.nan_to_num(R, nan=0.0)

        # Apply correlation stress and cap at 1
        R_stress = np.minimum(R + delta_rho, 1.0)

        # Convert back to covariance: Σ_stress = D R_stress D
        sigma_stress = D @ R_stress @ D

        # Portfolio volatility under stressed correlations
        sigma_p2_stress = self.w_norm @ sigma_stress @ self.w_norm
        sigma_p_stress = np.sqrt(sigma_p2_stress)
        sigma_p_base = self.base_portfolio_vol()

        return sigma_stress, sigma_p_base, sigma_p_stress

# Result Logging to Excel

In [6]:
def append_results_to_excel(results: dict, excel_path: str = kpi_filename):
    """
    Append a single row of results to an Excel file.
    - Index is the timestamp of computation.
    - If the file exists, append; otherwise, create a new file.
    Args:
        results: dictionary of metric_name -> value
        excel_path: path to the Excel file
    """
    timestamp = pd.Timestamp(datetime.now())
    row = pd.DataFrame(results, index=[timestamp])

    if os.path.exists(excel_path):
        existing = pd.read_excel(excel_path, index_col=0)
        combined = pd.concat([existing, row])
    else:
        combined = row

    combined.to_excel(excel_path)


# Plotly Dashboard

In [7]:
def build_dashboard(loss_series: pd.Series,
                    hist_var: float,
                    hist_es: float,
                    para_var: float,
                    para_es: float,
                    mc_losses: np.ndarray,
                    mc_var: float,
                    mc_es: float,
                    exposure_weights: dict,
                    metrics: dict,
                    html_path: str = dashboard_name):
    """
    Build and save the Plotly HTML dashboard.

    Layout:
    - Row 1 (3 columns): Histograms with VaR & ES lines
        1. Historical VaR/ES
        2. Parametric VaR/ES
        3. Monte Carlo VaR/ES
    - Row 2 (full width): Enhanced exposure weights plot
        - Sorted ascending
        - Category color-coding
        - Top-5 exposures highlighted
        - Cumulative Pareto curve
    - Row 3 (full width): Metrics table

    Args:
        loss_series: portfolio loss series
        hist_var, hist_es: historical VaR and ES
        para_var, para_es: parametric VaR and ES
        mc_losses, mc_var, mc_es: Monte Carlo losses, VaR, ES
        exposure_weights: dict of instrument_name -> normalized weight
        metrics: dict of metric_name -> value (for table)
        html_path: output HTML file path
    """

    # Create a 3x3 grid of subplots with custom specs
    fig = make_subplots(
        rows=3, cols=3,
        specs=[
            [{"type": "histogram"}, {"type": "histogram"}, {"type": "histogram"}],
            [{"type": "bar", "colspan": 3}, None, None],
            [{"type": "table", "colspan": 3}, None, None],
        ],
        subplot_titles=("Historical VaR", "Parametric VaR", "Monte Carlo VaR",
                        "Exposure Weights", "Key Metrics")
    )

    # ---------------- Row 1: Historical VaR & ES ----------------
    # Histogram of historical losses
    fig.add_trace(
        go.Histogram(
            x=loss_series,
            nbinsx=50,
            name="Losses",
            marker_color="steelblue",
            showlegend=False
        ),
        row=1, col=1
    )
    # VaR line
    fig.add_vline(x=hist_var, line=dict(color="red", dash="dash"), row=1, col=1)
    fig.add_annotation(
        x=hist_var, y=0.95, xref="x1", yref="paper",
        text=f"Hist VaR = {hist_var:.4f}",
        showarrow=True, arrowhead=2, ax=20, ay=-30,
        font=dict(color="red")
    )
    # ES line
    fig.add_vline(x=hist_es, line=dict(color="purple", dash="dot"), row=1, col=1)
    fig.add_annotation(
        x=hist_es, y=0.85, xref="x1", yref="paper",
        text=f"Hist ES = {hist_es:.4f}",
        showarrow=True, arrowhead=2, ax=20, ay=-30,
        font=dict(color="purple")
    )

    # ---------------- Row 1: Parametric VaR & ES ----------------
    fig.add_trace(
        go.Histogram(
            x=loss_series,
            nbinsx=50,
            name="Losses",
            marker_color="seagreen",
            showlegend=False
        ),
        row=1, col=2
    )
    fig.add_vline(x=para_var, line=dict(color="red", dash="dash"), row=1, col=2)
    fig.add_annotation(
        x=para_var, y=0.95, xref="x2", yref="paper",
        text=f"Param VaR = {para_var:.4f}",
        showarrow=True, arrowhead=2, ax=20, ay=-30,
        font=dict(color="red")
    )
    fig.add_vline(x=para_es, line=dict(color="purple", dash="dot"), row=1, col=2)
    fig.add_annotation(
        x=para_es, y=0.85, xref="x2", yref="paper",
        text=f"Param ES = {para_es:.4f}",
        showarrow=True, arrowhead=2, ax=20, ay=-30,
        font=dict(color="purple")
    )

    # Row 1: Monte Carlo VaR & ES
    fig.add_trace(
        go.Histogram(
            x=mc_losses,
            nbinsx=50,
            name="Sim Losses",
            marker_color="darkorange",
            showlegend=False
        ),
        row=1, col=3
    )
    fig.add_vline(x=mc_var, line=dict(color="red", dash="dash"), row=1, col=3)
    fig.add_annotation(
        x=mc_var, y=0.95, xref="x3", yref="paper",
        text=f"MC VaR = {mc_var:.4f}",
        showarrow=True, arrowhead=2, ax=20, ay=-30,
        font=dict(color="red")
    )
    fig.add_vline(x=mc_es, line=dict(color="purple", dash="dot"), row=1, col=3)
    fig.add_annotation(
        x=mc_es, y=0.85, xref="x3", yref="paper",
        text=f"MC ES = {mc_es:.4f}",
        showarrow=True, arrowhead=2, ax=20, ay=-30,
        font=dict(color="purple")
    )


    # Row 2: Enhanced Exposure Weights Plot 
    # 1. Sort exposure weights ascending for better readability
    sorted_items = sorted(exposure_weights.items(), key=lambda x: x[1])
    inst_names = [item[0] for item in sorted_items]
    inst_weights = [item[1] for item in sorted_items]

    # 2. Category color-coding based on instrument name prefix
    def category_color(name):
        if name.startswith("MBL"): return "royalblue"
        if name.startswith("MPL"): return "seagreen"
        if name.startswith("Q_"):  return "darkorange"
        if name.startswith("M_"):  return "purple"
        if name.startswith("H_"):  return "brown"
        return "grey"

    base_colors = [category_color(n) for n in inst_names]

    # 3. Highlight top 5 exposures in crimson
    if len(inst_weights) >= 5:
        top5_threshold = sorted(inst_weights)[-5]
    else:
        # If fewer than 5 instruments, highlight the largest one
        top5_threshold = sorted(inst_weights)[-1]

    colors = [
        "crimson" if w >= top5_threshold else base_colors[i]
        for i, w in enumerate(inst_weights)
    ]

    # 4. Add bar chart (exposure weights)
    fig.add_trace(
        go.Bar(
            x=inst_names,
            y=inst_weights,
            marker_color=colors,
            hovertemplate="<b>%{x}</b><br>Weight: %{y:.6f}<extra></extra>",
            name="Exposure Weights"
        ),
        row=2, col=1
    )

    # 5. Add cumulative exposure curve (Pareto-style)
    cum_weights = np.cumsum(inst_weights)

    fig.add_trace(
        go.Scatter(
            x=inst_names,
            y=cum_weights,
            mode="lines+markers",
            name="Cumulative Weight",
            yaxis="y2",  # secondary y-axis
            line=dict(color="black", width=2)
        ),
        row=2, col=1
    )

    # 6. Secondary y-axis for cumulative curve
    fig.update_layout(
        yaxis2=dict(
            overlaying="y",
            side="right",
            title="Cumulative Weight"
        )
    )

    # 7. Axis formatting for exposure plot
    fig.update_xaxes(
        tickangle=45,
        row=2, col=1
    )

    fig.update_yaxes(
        title_text="Weight",
        row=2, col=1
    )

    # ---------------- Row 3: Metrics Table ----------------
    # Convert metrics dict into a 2-column table: Metric | Value
    table_header = ["Metric", "Value"]
    table_values = [
        list(metrics.keys()),
        [f"{v:.2f}%" if isinstance(v, (int, float)) else str(v) for v in metrics.values()]
    ]

    fig.add_trace(
        go.Table(
            header=dict(values=table_header, fill_color="lightgrey", align="left"),
            cells=dict(values=table_values, align="left")
        ),
        row=3, col=1
    )

    # ---------------- Layout and Hover Line ----------------
    fig.update_layout(
        title="Electricity Portfolio VaR & ES Dashboard",
        bargap=0.1,
        template="plotly_white",
        height=1000,

        # Vertical hover line (spike) across subplots
        hovermode="x",
        spikedistance=-1,
        xaxis=dict(showspikes=True, spikemode="across", spikesnap="cursor", spikethickness=1),
        xaxis2=dict(showspikes=True, spikemode="across", spikesnap="cursor", spikethickness=1),
        xaxis3=dict(showspikes=True, spikemode="across", spikesnap="cursor", spikethickness=1),
        xaxis4=dict(showspikes=True, spikemode="across", spikesnap="cursor", spikethickness=1),
    )

    # Axis labels for histograms
    fig.update_xaxes(title_text="Loss", row=1, col=1)
    fig.update_xaxes(title_text="Loss", row=1, col=2)
    fig.update_xaxes(title_text="Loss", row=1, col=3)
    fig.update_yaxes(title_text="Frequency", row=1, col=1)

    # Write dashboard to HTML file
    fig.write_html(dashboard_name)

# Main Execution

In [8]:
def main():
    """
    Main orchestration function:

    1. Load price and position data from Excel.
    2. Build Instrument objects and the Portfolio.
    3. Compute portfolio returns and losses.
    4. Compute Historical, Parametric, and Monte Carlo VaR & ES.
    5. Run backtests (Kupiec, Christoffersen).
    6. Run stress tests (volatility and correlation).
    7. Compute normalized exposure weights.
    8. Log all metrics and weights to Excel.
    9. Build and save the Plotly dashboard.
    """

    # 1. Load real price data (wide format: Date index, product columns)
    price_df = pd.read_excel(prices_xlsx, index_col=0, parse_dates=True)

    # 2. Load positions (MWh and Sensitivity) from Excel
    pos_df = pd.read_excel(positions_xlsx)
    # Expected columns: Product, MWh, Sensitivity
    mwh_map = dict(zip(pos_df["Product"], pos_df["MWh"]))
    sens_map = dict(zip(pos_df["Product"], pos_df["Sensitivity"]))

    # 3. Build instruments from real data
    instruments = []
    for col in price_df.columns:
        prices = price_df[col].dropna()
        # Only create instruments for which we have a position
        if col not in mwh_map:
            continue
        mwh = float(mwh_map[col])
        sensitivity = float(sens_map.get(col, 1.0))  # default sensitivity = 1.0
        instruments.append(Instrument(name=col, prices=prices, mwh=mwh, price_sensitivity=sensitivity))

    if not instruments:
        raise ValueError("No instruments created. Check that product names in prices.xlsx match positions.xlsx.")

    # 4. Create portfolio from instruments
    portfolio = Portfolio(instruments=instruments)

    # 5. Portfolio returns and losses (used for historical VaR/ES and backtesting)
    port_returns = portfolio.portfolio_return_series()
    port_losses = portfolio.portfolio_loss_series()

    # 6. VaR & ES calculations
    var_calc = VaRCalculator(portfolio=portfolio, alpha=alpha)

    # Historical VaR & ES
    hist_var = var_calc.historical_var()
    hist_es = var_calc.historical_es()

    # Parametric VaR & ES (EWMA)
    mu_vec, sigma_mat, mu_p, sigma_p, w_norm = var_calc.parametric_inputs()
    para_var = var_calc.parametric_var()
    para_es = var_calc.parametric_es()

    # Monte Carlo VaR & ES (EWMA)
    mc_var, mc_es, mc_losses = var_calc.monte_carlo_var_es(n_mc)

    # 7. Backtesting using Historical VaR
    var_series_hist = pd.Series(hist_var, index=port_losses.index, name="VaR_Hist")
    backtester_hist = Backtester(loss_series=port_losses, var_series=var_series_hist, alpha=alpha)
    kupiec_LR, kupiec_p, N_exc, T_obs = backtester_hist.kupiec_pof_test()
    christ_LR, christ_p, trans_counts = backtester_hist.christoffersen_independence_test()

    # 8. Stress testing (volatility and correlation)
    stress_tester = StressTester(portfolio=portfolio)
    sigma_stress_vol, sigma_p_base_vol, sigma_p_stress_vol = stress_tester.volatility_stress(k=2.0)
    sigma_stress_corr, sigma_p_base_corr, sigma_p_stress_corr = stress_tester.correlation_stress(delta_rho=0.2)

    # 9. Normalized exposure weights (for Excel logging + dashboard)
    exposures = portfolio.get_exposure_vector()
    w_norm = exposures / np.sum(np.abs(exposures))
    exposure_weights = {inst.name: float(w * 100) for inst, w in zip(instruments, w_norm)}

    # 10. Collect metrics for Excel and table
    results = {
        "Portfolio_Mean": mu_p *100,
        "Portfolio_Volatility": sigma_p *100,
        "Hist_VaR": hist_var *100,
        "Hist_ES": hist_es *100,
        "Para_VaR": para_var *100,
        "Para_ES": para_es *100,
        "MC_VaR": mc_var *100,
        "MC_ES": mc_es *100,
        "Kupiec_LR": kupiec_LR *100,
        "Kupiec_p_value": kupiec_p,
        "Christoffersen_LR": christ_LR,
        "Christoffersen_p_value": christ_p,
        "N_exceedances": N_exc ,
        "T_observations": int(T_obs),
        "Base_Portfolio_Volatility": sigma_p_base_vol *100,
        "Vol_Stress_Portfolio_Volatility": sigma_p_stress_vol *100,
        "Corr_Stress_Base_Volatility": sigma_p_base_corr *100,
        "Corr_Stress_Portfolio_Volatility": sigma_p_stress_corr *100,
    }

    # Add exposure weights to Excel metrics (one column per instrument)
    for name, w in exposure_weights.items():
        results[f"{name}_weight"] = w

    # Append results to Excel log
    append_results_to_excel(results, kpi_filename)

    # 11. Build and save the dashboard
    build_dashboard(
        loss_series=port_losses,
        hist_var=hist_var,
        hist_es=hist_es,
        para_var=para_var,
        para_es=para_es,
        mc_losses=mc_losses,
        mc_var=mc_var,
        mc_es=mc_es,
        exposure_weights=exposure_weights,
        metrics=results,
        html_path=dashboard_name
    )

    print("Computation complete.")
    print(f"Dashboard saved to: {dashboard_name}")
    print(f"Results appended to: {kpi_filename}")



In [9]:
if __name__ == "__main__":
    main()

Computation complete.
Dashboard saved to: electricity_risk_report.html
Results appended to: KPI.xlsx
