# Policy Synthesis in a Finite MDP  
This notebook compares these two methods, **Risk-neutral Value Iteration vs Robust Risk-sensitive Synthesis (Cantelli, 2 moments)**, on the same finite MDP (a betting-game example adapted from PRISM case study):

1. **Risk-neutral Value Iteration (baseline)**: Optimizes the expected cumulative reward/cost.

2. **Robust Risk-sensitive Value Iteration (our method)**: Optimizes expected performance while enforcing a conservative lower bound on a *chance constraint* using the **(one-sided) Cantelli inequality**.

In [136]:
import time
import numpy as np
from typing import Optional, Dict, Tuple
from mdp import MDP, build_betting_game_mdp

GLOBAL_SEED = 0

In [137]:
def value_iteration(
    mdp: MDP,
    tol: float = 1e-9,
    max_iter: int = 500,
    objective: str = "min",  # "min" (cost) or "max" (reward)
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Standard Value Iteration on a dense MDP.

    objective="min": V(s) = min_a E[R + gamma V(s')]
    objective="max": V(s) = max_a E[R + gamma V(s')]

    Returns:
        pi: greedy deterministic policy (S,)
        V : value function (S,)
    """
    objective = objective.lower()
    if objective not in ("min", "max"):
        raise ValueError("objective must be 'min' or 'max'")

    S, A = mdp.n_states, mdp.n_actions
    V = np.zeros(S, dtype=float)

    for _ in range(max_iter):
        V_old = V.copy()
        Q = np.zeros((S, A), dtype=float)

        for a in range(A):
            Q[:, a] = (mdp.P[:, a, :] * (mdp.R[:, a, :] + mdp.gamma * V_old[None, :])).sum(axis=1)

        if objective == "min":
            V = Q.min(axis=1)
        else:
            V = Q.max(axis=1)

        if np.max(np.abs(V - V_old)) <= tol:
            break

    # greedy policy extraction
    pi = np.zeros(S, dtype=int)
    for s in range(S):
        if mdp.terminal[s]:
            pi[s] = 0
            continue

        q_sa = np.zeros(A, dtype=float)
        for a in range(A):
            q_sa[a] = float(mdp.P[s, a, :] @ (mdp.R[s, a, :] + mdp.gamma * V))

        pi[s] = int(np.argmin(q_sa)) if objective == "min" else int(np.argmax(q_sa))

    return pi, V

In [138]:
def evaluate_policy_mean_var(mdp: MDP, pi: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """
    Exact (linear-system) evaluation of mean and second raw moment under a fixed policy.

    Returns:
        v  : E[G | s]
        m2 : E[G^2 | s]
    """
    S = mdp.n_states
    T = np.where(~mdp.terminal)[0]  # transient
    Z = np.where(mdp.terminal)[0]   # terminal

    Ppi = np.zeros((S, S), dtype=float)
    b = np.zeros(S, dtype=float)
    r2 = np.zeros(S, dtype=float)

    for s in range(S):
        a = int(pi[s])
        Prow = mdp.P[s, a]
        Rrow = mdp.R[s, a]
        Ppi[s, :] = Prow
        b[s] = float(Prow @ Rrow)               # E[r]
        r2[s] = float(Prow @ (Rrow * Rrow))     # E[r^2]

    # Mean: v_T = (I - gamma B)^(-1) b_T
    v = np.zeros(S, dtype=float)
    if T.size:
        B = Ppi[np.ix_(T, T)]
        A = np.eye(T.size) - mdp.gamma * B
        v[T] = np.linalg.solve(A, b[T])
    v[Z] = 0.0

    # Cross term: E[r * v(s')]
    ErV = np.zeros(S, dtype=float)
    for s in T:
        a = int(pi[s])
        ErV[s] = float(mdp.P[s, a] @ (mdp.R[s, a] * v))

    # Second moment: m2_T = (I - gamma^2 B)^(-1) [ E[r^2] + 2 gamma E[r v(s')] ]
    m2 = np.zeros(S, dtype=float)
    if T.size:
        B = Ppi[np.ix_(T, T)]
        A2 = np.eye(T.size) - (mdp.gamma ** 2) * B
        rhs = r2[T] + 2.0 * mdp.gamma * ErV[T]
        m2[T] = np.linalg.solve(A2, rhs)
    m2[Z] = 0.0

    return v, m2

In [139]:
def sample_returns(
    mdp: MDP,
    pi: np.ndarray,
    n_episodes: int = 10_000,
    max_steps: int = 1_000,
    rng: Optional[np.random.Generator] = None,
) -> np.ndarray:
    """Sample discounted returns G under deterministic policy pi for CVaR estimation"""
    if rng is None:
        rng = np.random.default_rng()

    S = mdp.n_states
    returns = np.zeros(n_episodes, dtype=float)

    for ep in range(n_episodes):
        s = mdp.start_state
        G = 0.0
        disc = 1.0

        for _ in range(max_steps):
            if mdp.terminal[s]:
                break
            a = int(pi[s])
            probs = mdp.P[s, a, :]
            s_next = rng.choice(S, p=probs)
            r = float(mdp.R[s, a, s_next])

            G += disc * r
            disc *= mdp.gamma
            s = s_next

        returns[ep] = G

    return returns

## Chance constraint (conservative bound)
We target a constraint of the form **R\_min:** $\Pr(G^\pi \le \tau) \ge \alpha$ or **R\_max:** $\Pr(G^\pi \ge \tau) \ge \alpha$. During synthesis we use the Cantelli’s inequality with $n=2$ to obtain a conservative lower bound \(\rho(\pi)\) on the satisfaction probability.

In [140]:
def constraint_satisfied(
    mu0: float,
    std0: float,
    tau: float,
    alpha: float,
    alpha_cvar: float,
    returns: Optional[np.ndarray],
    objective: str = "min",
):
    """
    Report:
      - Cantelli lower bound (prob_lb)
      - empirical chance probability (prob_emp)
      - empirical CVaR_alpha_cvar (cvar_alpha)
    """
    objective = objective.lower()
    if objective not in ("min", "max"):
        raise ValueError("objective must be 'min' or 'max'")

    var0 = float(std0 ** 2)

    def cantelli_right_lb(mu: float, std: float, tau_: float) -> float:
        gap = mu - tau_
        if std <= 1e-15:
            return float(gap >= 0.0)
        if gap <= 0.0:
            return 0.0
        return float((gap * gap) / (std * std + gap * gap))

    # Cantelli bound on the chance constraint
    if objective == "max":
        prob_lb = cantelli_right_lb(mu0, std0, tau)          # P(G >= tau)
    else:
        prob_lb = cantelli_right_lb(-mu0, std0, -tau)        # P(G <= tau) == P(-G >= -tau)

    satisfied_cantelli = bool(prob_lb >= alpha)

    def empirical_cvar(samples: np.ndarray, alpha_level: float, objective: str) -> float:
        xs = np.sort(np.asarray(samples, dtype=float))
        N = xs.size
        tail_mass = max(1e-6, 1.0 - alpha_level)
        k = max(1, int(np.ceil(tail_mass * N)))
        if objective == "max":
            tail = xs[:k]     # worst = smallest
        else:
            tail = xs[-k:]    # worst = largest
        return float(tail.mean())

    prob_emp = None
    satisfied_emp = None

    if returns is None or len(returns) == 0:
        cvar_alpha = float(mu0)
    else:
        xs = np.asarray(returns, dtype=float)
        if objective == "max":
            prob_emp = float(np.mean(xs >= tau))
        else:
            prob_emp = float(np.mean(xs <= tau))

        satisfied_emp = bool(prob_emp >= alpha)
        cvar_alpha = empirical_cvar(xs, alpha_cvar, objective)

    return satisfied_cantelli, float(prob_lb), var0, float(cvar_alpha), prob_emp, satisfied_emp

In [141]:
def robust_vi(
    mdp,
    alpha: float,
    tau: float,
    beta: float = 10.0,          # c-update multiplier/divider (paper Eq. c-update)
    eps: float = 1e-6,           # barrier slack
    max_outer: int = 50,
    R_max: int= 100.0,
    vi_tol: float = 1e-9,
    verbose: bool = False,
    tol_prob: float = 1e-6,
    lambda_init: float = 1.0,    # initial c
    tie_noise: float = 1e-9,
    hysteresis_eps: float = 1e-12,
    max_eval_iters: int = 200,
    c_max: float = 5e4,
    c_min: float = 1e-6,
    change_frac_tol: float = 0.0,
    min_outer_iters: int = 5,
    rho_stall_patience: int = 5,
    objective: str = "min",
    pi_init: Optional[np.ndarray] = None,
) -> Dict[str, object]:
    """
    Cantelli-based barrier-shaped policy improvement (Algorithm-1).

    Returns keys compatible with your old printing:
        pi, mu0, std0, J, prob_lb, outer_iters, lambda
    where:
        J = best barrier objective L_c (smaller is better in both modes after sign handling)
    """
    objective = objective.lower()
    if objective not in ("min", "max"):
        raise ValueError("objective must be 'min' or 'max'")

    S, A = int(mdp.n_states), int(mdp.n_actions)
    s0 = int(mdp.start_state)
    P = np.asarray(mdp.P, dtype=float)
    R = np.asarray(mdp.R, dtype=float)
    gamma = float(mdp.gamma)
    terminal = np.asarray(mdp.terminal, dtype=bool)

    rng = np.random.default_rng()

    # ------------------ Cantelli probability lower bound ------------------
    def cantelli_right_lb(mu: float, std: float, tau_: float) -> float:
        gap = mu - tau_
        if std <= 1e-15:
            return 1.0 if gap >= 0.0 else 0.0
        if gap <= 0.0:
            return 0.0
        den = std * std + gap * gap
        return (gap * gap) / den if den > 0 else 1.0

    def chance_prob_lb(mu0: float, std0: float) -> float:
        if objective == "max":
            return cantelli_right_lb(mu0, std0, tau)
        else:
            return cantelli_right_lb(-mu0, std0, -tau)

    # ------------------ Paper-like barrier objective L_c ------------------
    def barrier_Lc(mu0: float, std0: float, c: float) -> Tuple[float, float]:
        """
        Returns (Lc, rho). Lc is a scalar cost to minimize.
        We use a piecewise penalty so infeasible region has non-vanishing push.
        """
        rho = chance_prob_lb(mu0, std0)
        x = rho - alpha

        # base term: objective handling
        if objective == "min":
            base = mu0
        else:
            base = -mu0  # maximize mu0 <=> minimize -mu0

        if x >= 0.0:
            # log barrier inside feasible region
            Lc = base - c * np.log(x + eps)
        else:
            # linear extension outside feasible region
            Lc = base - ((c * x) / eps + c * np.log(eps) - R_max)

        return float(Lc), float(rho)

    # ------------------ Policy evaluation: u1,u2 ------------------
    def policy_eval_u1_u2(pi: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        u1 = np.zeros(S, dtype=float)
        u2 = np.zeros(S, dtype=float)

        Ppi = np.zeros((S, S), dtype=float)
        Rpi = np.zeros((S, S), dtype=float)
        for s in range(S):
            a = int(pi[s])
            Ppi[s, :] = P[s, a, :]
            Rpi[s, :] = R[s, a, :]

        ER = (Ppi * Rpi).sum(axis=1)
        ER2 = (Ppi * (Rpi ** 2)).sum(axis=1)

        for _ in range(max_eval_iters):
            u1_old = u1.copy()
            u2_old = u2.copy()

            u1 = ER + gamma * (Ppi @ u1_old)
            ERu1 = (Ppi * Rpi) @ u1_old
            u2 = ER2 + 2.0 * gamma * ERu1 + (gamma ** 2) * (Ppi @ u2_old)

            u1[terminal] = 0.0
            u2[terminal] = 0.0

            if max(np.max(np.abs(u1 - u1_old)), np.max(np.abs(u2 - u2_old))) <= vi_tol:
                break

        return u1, u2

    # ------------------ One-step propagated moments Q1,Q2 ------------------
    def one_step_Q1_Q2(u1: np.ndarray, u2: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        Q1 = np.zeros((S, A), dtype=float)
        Q2 = np.zeros((S, A), dtype=float)

        for a in range(A):
            Psa = P[:, a, :]
            Rsa = R[:, a, :]

            ER = (Psa * Rsa).sum(axis=1)
            ER2 = (Psa * (Rsa ** 2)).sum(axis=1)

            EV1 = Psa @ u1
            ERu1 = (Psa * Rsa) @ u1
            EV2 = Psa @ u2

            Q1[:, a] = ER + gamma * EV1
            Q2[:, a] = ER2 + 2.0 * gamma * ERu1 + (gamma ** 2) * EV2

        Q1[terminal, :] = 0.0
        Q2[terminal, :] = 0.0
        return Q1, Q2

    # ------------------ Compute k1,k2 from dL/du (n=2) ------------------
    def compute_k1_k2(u1_s0: float, u2_s0: float, c: float) -> Tuple[float, float]:
        mu0 = float(u1_s0)
        m2_0 = float(u2_s0)
        var0 = max(m2_0 - mu0 * mu0, 0.0)
        std0 = float(np.sqrt(var0))

        # unify direction for rho (right-tail Cantelli)
        if objective == "max":
            mu_tilde, tau_tilde, sign_mu = mu0, tau, +1.0
        else:
            mu_tilde, tau_tilde, sign_mu = -mu0, -tau, -1.0

        Delta = mu_tilde - tau_tilde
        if Delta <= 0.0 or std0 <= 1e-15:
            rho = 0.0
            drho_du1 = 0.0
            drho_du2 = 0.0
        else:
            D = max(var0 + Delta * Delta, 1e-12)
            rho = (Delta * Delta) / D

            drho_dmu_tilde = (2.0 * Delta * var0 + 2.0 * mu_tilde * (Delta * Delta)) / (D * D)
            drho_du1 = sign_mu * drho_dmu_tilde
            drho_du2 = -(Delta * Delta) / (D * D)

        # derivative of barrier w.r.t rho:
        # feasible: -c/(rho-alpha+eps)
        # infeasible: -(c/eps)  (linear extension)
        if rho - alpha >= 0.0:
            dLc_drho = -c / (rho - alpha + eps)
        else:
            dLc_drho = -c / eps

        # base term derivative: d(base)/du1 is +1 (min) or -1 (max), and d(base)/du2 is 0
        if objective == "min":
            dbase_du1 = 1.0
        else:
            dbase_du1 = -1.0

        k1 = dbase_du1 + dLc_drho * drho_du1
        k2 = dLc_drho * drho_du2
        return float(k1), float(k2)

    # ------------------ Greedy policy improvement ------------------
    def greedy_improve(pi: np.ndarray, u1: np.ndarray, u2: np.ndarray, c: float) -> Tuple[np.ndarray, float]:
        k1, k2 = compute_k1_k2(u1[s0], u2[s0], c)
        Q1, Q2 = one_step_Q1_Q2(u1, u2)

        J = k1 * Q1 + k2 * Q2  # cost-to-minimize surrogate

        if tie_noise > 0:
            J = J + tie_noise * rng.standard_normal(J.shape)

        pi_new = pi.copy()
        switches = 0
        for s in range(S):
            if terminal[s]:
                pi_new[s] = 0
                continue

            a_cur = int(pi[s])
            a_best = int(np.argmin(J[s, :]))

            # hysteresis: switch only if meaningfully better
            if J[s, a_best] + hysteresis_eps < J[s, a_cur]:
                pi_new[s] = a_best
                switches += 1

        changed_frac = switches / max(1, np.sum(~terminal))
        return pi_new, float(changed_frac)

    # ------------------ Initialization (same as before) ------------------
    if pi_init is None:
        pi = np.zeros(S, dtype=int)
    else:
        pi = np.array(pi_init, dtype=int).copy()
    pi[terminal] = 0

    c = float(max(lambda_init, c_min))

    # Evaluate initial policy
    u1, u2 = policy_eval_u1_u2(pi)
    mu0 = float(u1[s0])
    std0 = float(np.sqrt(max(float(u2[s0] - mu0 * mu0), 0.0)))
    Lc, rho = barrier_Lc(mu0, std0, c)

    best_pi = pi.copy()
    best_mu0, best_std0 = mu0, std0
    best_rho = rho
    best_J = Lc

    # stopping helpers
    prev_rho = rho
    rho_stall = 0

    # ------------------ Outer iterations ------------------
    for it in range(1, max_outer + 1):
        # Greedy improvement under current c
        pi_new, changed_frac = greedy_improve(pi, u1, u2, c)

        # Evaluate new policy
        u1_new, u2_new = policy_eval_u1_u2(pi_new)
        mu0_new = float(u1_new[s0])
        std0_new = float(np.sqrt(max(float(u2_new[s0] - mu0_new * mu0_new), 0.0)))
        Lc_new, rho_new = barrier_Lc(mu0_new, std0_new, c)

        if verbose:
            print(f"[OUT] it={it:02d} c={c:.6g} mu0={mu0_new:.6f} rho={rho_new:.6f} "
                  f"Lc={Lc_new:.6f} {'FEAS' if rho_new >= alpha else 'INF'} "
                  f"chg={changed_frac*100:.2f}%")

        # Best-so-far by barrier objective
        if Lc_new < best_J - 1e-12:
            best_pi = pi_new.copy()
            best_mu0, best_std0 = mu0_new, std0_new
            best_rho = rho_new
            best_J = Lc_new

        # Update c (paper-style)
        if rho_new < alpha:
            c = min(beta * c, c_max)
        else:
            c = max(c / beta, c_min)

        # Convergence tracking
        if abs(rho_new - prev_rho) <= tol_prob:
            rho_stall += 1
        else:
            rho_stall = 0
        prev_rho = rho_new

        # Advance iterate
        pi_unchanged = np.array_equal(pi_new, pi)
        pi = pi_new
        u1, u2 = u1_new, u2_new

        # Stopping (do not stop too early)
        if it >= min_outer_iters:
            if pi_unchanged and changed_frac <= change_frac_tol:
                if verbose:
                    print("[STOP] policy unchanged.")
                break
            if rho_stall >= rho_stall_patience:
                if verbose:
                    print("[STOP] rho stagnated.")
                break

    return {
        "pi": best_pi,
        "mu0": float(best_mu0),
        "std0": float(best_std0),
        "prob_lb": float(best_rho),
    }

## Running the comparison
We build the betting-game MDP, run the risk-neutral value iteration, then the robust risk-sensitive VI with a Cantelli-based chance constraint. For each method, we report the expected reward, the variance / std, a conservative satisfaction lower bound (Cantelli), empirical CVaR and runtime.

In [143]:
# -----------------------------
# Build environment
# -----------------------------
mdp = build_betting_game_mdp(
    M=5, MAX_MONEY=100, STAGES=10,
    p_win=0.7, p_jackpot=0.05, gamma=1.0
)

s0 = mdp.start_state
print(f"MDP: S={mdp.n_states}, A={mdp.n_actions}, gamma={mdp.gamma}, start={s0}, terminals={int(np.sum(mdp.terminal))}")

# -----------------------------
# Settings
# -----------------------------
alpha = 0.7
tau = 100.0
alpha_cvar = 0.7
N_samples = 5000
beta = 10
eps = 1e-6

# -----------------------------
# 1) Risk-neutral Value Iteration
# -----------------------------
t0 = time.perf_counter()
pi_rn, V_rn = value_iteration(mdp, tol=1e-2, max_iter=50, objective="min")
v_mu_rn, v_m2_rn = evaluate_policy_mean_var(mdp, pi_rn)
mu0_rn = float(v_mu_rn[s0])
std0_rn = float(np.sqrt(max(float(v_m2_rn[s0] - mu0_rn * mu0_rn), 0.0)))
rn_time = time.perf_counter() - t0

returns_rn = sample_returns(
    mdp, pi_rn, n_episodes=N_samples, max_steps=1000,
    rng=np.random.default_rng(GLOBAL_SEED)
)
rn_ok_cant, rn_prob_lb, rn_var, rn_cvar, rn_prob_emp, rn_ok_emp = constraint_satisfied(
    mu0_rn, std0_rn, tau, alpha, alpha_cvar, returns_rn, objective="min"
)

print("\n[RISK-NEUTRAL VALUE ITERATION]")
print(f"time={rn_time:.6f}s")
print(
    f"mu0={mu0_rn:.6f}, std0={std0_rn:.6f}, var={rn_var:.6f}, "
    f"prob_lb(Cantelli)={rn_prob_lb:.6f},"
    f"CVaRα(emp)={rn_cvar:.6f}, "
    f"satisfies(Cantelli)={rn_ok_cant}"
)

# -----------------------------
# 2) ROBUST RISK-SENSITIVE-VI (Cantelli chance constraint)
# -----------------------------
t1 = time.perf_counter()
res_rsvi = robust_vi(
    mdp,
    alpha=alpha,
    tau=tau,
    beta=beta,
    eps=eps,
    max_outer=50,
    vi_tol=1e-2,
    objective="min",
)
rs_time = time.perf_counter() - t1

returns_rs = sample_returns(
    mdp, res_rsvi["pi"], n_episodes=N_samples, max_steps=1000,
    rng=np.random.default_rng(GLOBAL_SEED)
)
rs_ok_cant, rs_prob_lb, rs_var, rs_cvar, rs_prob_emp, rs_ok_emp = constraint_satisfied(
    res_rsvi["mu0"], res_rsvi["std0"], tau, alpha, alpha_cvar, returns_rs, objective="min"
)

print("\n[Robust RISK-SENSITIVE VALUE ITERATION]")
print(f"time={rs_time:.6f}s")
print(
    f"mu0={res_rsvi['mu0']:.6f}, std0={res_rsvi['std0']:.6f}, var={rs_var:.6f}, "
    f"prob_lb(Cantelli)={rs_prob_lb:.6f},"
    f"CVaRα(emp)={rs_cvar:.6f}, "
    f"satisfies(Cantelli)={rs_ok_cant}"
)

MDP: S=1011, A=7, gamma=1.0, start=5, terminals=1

[RISK-NEUTRAL VALUE ITERATION]
time=0.314973s
mu0=61.921383, std0=29.778478, var=886.757735, prob_lb(Cantelli)=0.620515,CVaRα(emp)=92.735510, satisfies(Cantelli)=False

[Robust RISK-SENSITIVE VALUE ITERATION]
time=5.790376s
mu0=71.391637, std0=17.759828, var=315.411500, prob_lb(Cantelli)=0.721823,CVaRα(emp)=90.596935, satisfies(Cantelli)=True
