In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy.stats as stats
import cvxpy as cp
from typing import Dict, Any, Tuple

## Production System Class
This class randomly generates a fleet of convex generators. Parameters: `n_machines` controls fleet size, `seed` ensures reproducibility, `alpha/beta/gamma_bounds` define the cost coefficients, and `capacity_bounds` set min/max outputs. Methods `describe()` and `total_capacity()` summarize the fleet.

In [2]:
class ProductionSystem:
    """
    Holds the convex generator fleet (cost curves + capacity bounds).
    """
    def __init__(self, n_machines: int = 5, seed: int = 42,
                 alpha_bounds: Tuple[float, float] = (0.05, 0.2),
                 beta_bounds: Tuple[float, float] = (2.0, 5.0),
                 gamma_bounds: Tuple[float, float] = (10.0, 20.0),
                 capacity_bounds: Tuple[float, float] = (50.0, 150.0)):
        rng = np.random.default_rng(seed)
        self.n = n_machines
        
        # Quadratic coefficients cost(x) = alpha x^2 + beta x + gamma
        self.alpha = rng.uniform(alpha_bounds[0], alpha_bounds[1], n_machines)
        self.beta = rng.uniform(beta_bounds[0], beta_bounds[1], n_machines)
        self.gamma = rng.uniform(gamma_bounds[0], gamma_bounds[1], n_machines)
        
        # Lower/upper capacity bounds (MW)
        self.l = np.zeros(n_machines)
        self.u = rng.uniform(capacity_bounds[0], capacity_bounds[1], n_machines)
        
    def describe(self) -> pd.DataFrame:
        machines = np.arange(1, self.n + 1)
        data = {
            "alpha": self.alpha,
            "beta": self.beta,
            "gamma": self.gamma,
            "min_cap": self.l,
            "max_cap": self.u,
        }
        return pd.DataFrame(data, index=machines).rename_axis("machine")
        
    def total_capacity(self) -> float:
        return float(np.sum(self.u))

## Economic Dispatch Solver
`solve_dispatch` enforces chance constraints. Inputs: `system` (fleet), `mu_D`/`sigma_D` (demand stats), `reliability` (probability level), and `mode` (Gaussian vs. Chebyshev). It builds CVXPY variable `x`, computes effective demand `D_eff`, sets up the quadratic cost objective, enforces capacity/demand constraints, solves, and returns primal/dual results.

In [3]:
def solve_dispatch(
    system: ProductionSystem,
    mu_D: float,
    sigma_D: float,
    reliability: float = 0.95,
    mode: str = "normal",
) -> Dict[str, Any]:
    """Solve the stochastic production planning problem under Normal or Chebyshev reliability buffering.

    We convert a probabilistic customer demand requirement into a deterministic
    effective demand D_eff then solve:
        minimize   sum_i (alpha_i x_i^2 + beta_i x_i)
        subject to sum_i x_i >= D_eff
                   0 <= x_i <= max_capacity_i
    Fixed costs gamma_i are added after solving for reporting.

    Returns dictionary with optimal production vector, total cost (incl. gamma),
    effective demand enforced, and dual multipliers:
      lambda : shadow price of 1 additional unit of required demand
      nu_u   : scarcity indicator per machine at its capacity
      nu_l   : lower bound duals (typically zero here)
    """
    if sigma_D < 0 or mu_D < 0:
        raise ValueError("Demand statistics must be non-negative")
    if not 0.0 < reliability < 1.0:
        raise ValueError("Reliability must lie strictly between 0 and 1")

    x = cp.Variable(system.n)
    alpha_risk = 1.0 - reliability

    if mode == "normal":
        z_score = stats.norm.ppf(reliability)
        D_eff = mu_D + z_score * sigma_D
    elif mode == "robust":
        k_robust = np.sqrt((1 - alpha_risk) / alpha_risk)
        D_eff = mu_D + k_robust * sigma_D
    else:
        raise ValueError("Mode must be 'normal' or 'robust'")

    if D_eff > system.total_capacity() + 1e-6:
        raise ValueError("Effective customer demand exceeds installed production capacity")

    objective = cp.Minimize(
        cp.sum(cp.multiply(system.alpha, x**2) + cp.multiply(system.beta, x))
    )

    c_demand = [cp.sum(x) >= D_eff]
    c_upper = [x <= system.u]
    c_lower = [x >= system.l]
    constraints = c_demand + c_upper + c_lower

    prob = cp.Problem(objective, constraints)
    prob.solve(solver=cp.OSQP, warm_start=True)

    if prob.status not in {"optimal", "optimal_inaccurate"}:
        return {"status": prob.status}

    dispatch = x.value
    total_cost = float(
        np.sum(system.alpha * dispatch**2 + system.beta * dispatch + system.gamma)
    )

    return {
        "status": prob.status,
        "x": dispatch,
        "cost": total_cost,
        "objective": prob.value,
        "D_eff": float(D_eff),
        "lambda": c_demand[0].dual_value,
        "nu_u": c_upper[0].dual_value,
        "nu_l": c_lower[0].dual_value,
    }

## KKT Verification
`verify_kkt` receives the system and solver results, computes the stationarity residual (`grad_f - λ + ν⁺ - ν⁻`) and complementary slackness (`(∑x - D_eff) * λ`), prints both, and reports whether they are below tolerance.

In [4]:
def verify_kkt(system: ProductionSystem, res: Dict[str, Any], tol: float = 1e-4) -> Dict[str, float]:
    """Compute stationarity and complementary slackness residuals."""
    
    if res.get("status") not in {"optimal", "optimal_inaccurate"}:
        print("Cannot verify KKT: solution unavailable.")
        return {}
    
    dispatch = res["x"]
    lam = res["lambda"]
    nu_u = res["nu_u"]
    nu_l = res["nu_l"]
    
    grad_f = 2 * system.alpha * dispatch + system.beta
    stationarity_residual = grad_f - lam + nu_u - nu_l
    max_stationarity = float(np.max(np.abs(stationarity_residual)))
    
    total_prod = float(np.sum(dispatch))
    slack = total_prod - res["D_eff"]
    comp_slack_error = float(abs(slack * lam))
    
    print("\n--- KKT CHECK ---")
    print(f"Stationarity residual: {max_stationarity:.2e}")
    print(f"Complementary slackness residual: {comp_slack_error:.2e}")
    
    if max_stationarity < tol and comp_slack_error < tol:
        print("KKT conditions satisfied within tolerance.")
    else:
        print("Warning: residuals exceed tolerance.")
    
    return {"stationarity": max_stationarity, "slackness": comp_slack_error}

In [5]:
# 1. Create the factory
my_factory = ProductionSystem(n_machines=5)

# 2. Define the customer demand scenario
MEAN_DEMAND = 300
SIGMA = 30
RELIABILITY = 0.95  # 95% service level

# 3. Run the Normal (Gaussian) buffering model
print("--- NORMAL MODEL (Gaussian demand) ---")
res_normal = solve_dispatch(
    my_factory, MEAN_DEMAND, SIGMA, reliability=RELIABILITY, mode="normal"
)
print(f"Total Production Cost: ${res_normal['cost']:,.2f}")
print(f"Effective required production (D_eff): {res_normal['D_eff']:.2f} units")

# 4. Run the Robust (Chebyshev) buffering model
print("\n--- ROBUST MODEL (Distribution-free) ---")
res_robust = solve_dispatch(
    my_factory, MEAN_DEMAND, SIGMA, reliability=RELIABILITY, mode="robust"
)
print(f"Total Production Cost: ${res_robust['cost']:,.2f}")
print(f"Effective required production (D_eff): {res_robust['D_eff']:.2f} units")

# 5. Verify optimality (Normal model)
verify_kkt(my_factory, res_normal)

--- NORMAL MODEL (Gaussian demand) ---
Total Production Cost: $4,305.68
Effective required production (D_eff): 349.35 units

--- ROBUST MODEL (Distribution-free) ---
Total Production Cost: $6,336.46
Effective required production (D_eff): 430.77 units

--- KKT CHECK ---
Stationarity residual: 3.52e-13
Complementary slackness residual: 1.23e-12
KKT conditions satisfied within tolerance.


{'stationarity': 3.5216274341109965e-13, 'slackness': 1.2315451736330222e-12}