<a href="https://colab.research.google.com/github/otitamario/sp-pa-gep/blob/main/experiments/Example_5_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Clone the repository into Colab runtime
!git clone https://github.com/otitamario/sp-pa-gep.git

# Move into repo root
%cd sp-pa-gep

# Make sure Python sees the project root
import sys
sys.path.append(".")

fatal: destination path 'sp-pa-gep' already exists and is not an empty directory.
/content/sp-pa-gep


In [2]:
%cd /content/sp-pa-gep
!git pull

/content/sp-pa-gep
remote: Enumerating objects: 7, done.[K
remote: Counting objects: 100% (7/7), done.[K
remote: Compressing objects: 100% (4/4), done.[K
remote: Total 4 (delta 3), reused 0 (delta 0), pack-reused 0 (from 0)[K
Unpacking objects: 100% (4/4), 1.30 KiB | 265.00 KiB/s, done.
From https://github.com/otitamario/sp-pa-gep
   b5c8d30..18102ff  main       -> origin/main
Updating b5c8d30..18102ff
Fast-forward
 src/benchkit.py | 31 [32m++++++++++++++++[m[31m---------------[m
 1 file changed, 16 insertions(+), 15 deletions(-)


In [3]:
import os
# =========================
# OUTPUT DIRECTORY (save plots here)
# =========================
FIGDIR = "figures"
os.makedirs(FIGDIR, exist_ok=True)

In [4]:
import os
import numpy as np
import matplotlib.pyplot as plt
from typing import Dict, List

# -----------------------------
# Problem data (from the draft)
# -----------------------------
q = np.array([1, -2, -1, 2, -1], dtype=float)

P = np.array([
    [3.1, 2.0, 0,   0,   0],
    [2.0, 3.6, 0,   0,   0],
    [0,   0,   3.5, 2.0, 0],
    [0,   0,   2.0, 3.3, 0],
    [0,   0,   0,   0,   3.0],
], dtype=float)

Q = np.array([
    [1.6, 1.0, 0,   0,   0],
    [1.0, 1.6, 0,   0,   0],
    [0,   0,   1.5, 1.0, 0],
    [0,   0,   1.0, 1.5, 0],
    [0,   0,   0,   0,   2.0],
], dtype=float)

n = 5

# -----------------------------
# Sets K and C(x)
# -----------------------------
def proj_box(x, lo, hi):
    return np.minimum(np.maximum(x, lo), hi)

def C_upper(x):
    """Return the upper bound 'a(x)' so that C(x) = [0, a(x)]^5, with cap <= 2."""
    s = float(np.sum(x))
    if s <= 5.0:
        a = 1.0
    else:
        a = (s + 5.0) / 10.0
    # Ensure C(x) subset of K=[0,2]^5
    return min(a, 2.0)

def proj_C_of(x, y):
    """Project y onto C(x) = [0, a(x)]^5."""
    a = C_upper(x)
    return proj_box(y, 0.0, a)

def proj_K(y):
    return proj_box(y, 0.0, 2.0)

# -----------------------------
# Inner solver for resolvent (approx)
# -----------------------------
def solve_inner_qp_on_box(u_ref, x_n, r, y0=None, inner_it=300, inner_tol=1e-10):
    """
    Approximately solve a strongly convex quadratic over C(u_ref) using projected gradient:

        minimize_y  0.5 y^T (Q + (1/r) I) y + (P u_ref + q - (1/r) x_n)^T y
        subject to  y in C(u_ref) = [0, a(u_ref)]^5.

    This is a practical surrogate for the regularized EP step when C depends on u.
    """
    A = Q + (1.0 / r) * np.eye(n)
    b = P @ u_ref + q - (1.0 / r) * x_n

    # Lipschitz constant for grad: L = lambda_max(A)
    L = np.linalg.eigvalsh(A).max()
    step = 1.0 / L

    if y0 is None:
        y = proj_C_of(u_ref, x_n.copy())
    else:
        y = proj_C_of(u_ref, y0.copy())

    for _ in range(inner_it):
        grad = A @ y + b
        y_new = proj_C_of(u_ref, y - step * grad)
        if np.linalg.norm(y_new - y) <= inner_tol:
            y = y_new
            break
        y = y_new

    return y

def resolvent_state_dependent(x_n, r=1.0, fp_it=200, fp_tol=1e-8, inner_it=400, omega=0.5):
    """
    Fixed-point approximation of the state-dependent resolvent.

    Returns:
        u, fp_iters, fp_hit_max, fp_converged
    """
    # start guess (keep in K)
    u = proj_K(x_n.copy())
    fp_converged = False

    for j in range(fp_it):
        # raw fixed-point map: u -> projection onto C(u) of inner-QP solution
        u_next_raw = solve_inner_qp_on_box(u_ref=u, x_n=x_n, r=r, y0=u, inner_it=inner_it)
        u_next_raw = proj_K(u_next_raw)

        # damping / relaxation (critical for convergence)
        u_next = (1.0 - omega) * u + omega * u_next_raw

        # stopping test
        if np.linalg.norm(u_next - u) <= fp_tol:
            fp_converged = True
            return u_next, (j + 1), False, True

        u = u_next

    # if reached max
    return u, fp_it, True, False

# -----------------------------
# SPPA / WPPA outer iterations
# -----------------------------


In [5]:
import src.benchkit as bk
import time

r = 1.0
maxit = 200
fp_it = 200
inner_it = 400
stop_tol = 0.0

def make_sppa_step():
    def step_fn(x, k):
        alpha = 1.0 / (k + 2.0)

        u, fp_iters, fp_hit_max, fp_ok = resolvent_state_dependent(
            x, r=r, fp_it=fp_it, fp_tol=1e-8, inner_it=inner_it, omega=0.5
        )
        residual = float(np.linalg.norm(x - u))
        x_new = alpha * u_anchor + (1.0 - alpha) * u

        return x_new, {
            "u": u,
            "residual": residual,
            "inner_iters": fp_iters,
            "inner_hit_max": fp_hit_max,
            "ok": fp_ok,
            "constraint_violation": 0.0,
        }
    return step_fn


def make_wppa_step():
    def step_fn(x, k):
        alpha = 1.0 / (k + 2.0)

        u, fp_iters, fp_hit_max, fp_ok = resolvent_state_dependent(
            x, r=r, fp_it=fp_it, fp_tol=1e-8, inner_it=inner_it, omega=0.5
        )
        residual = float(np.linalg.norm(x - u))
        x_new = alpha * x + (1.0 - alpha) * u

        return x_new, {
            "u": u,
            "residual": residual,
            "inner_iters": fp_iters,
            "inner_hit_max": fp_hit_max,
            "ok": fp_ok,
            "constraint_violation": 0.0,
        }
    return step_fn

In [6]:
x0 = np.ones(n) * 1.8  # any point in [0,2]^5
u_anchor = x0.copy()

logs_sppa, sum_sppa = bk.run(
    method_name="SPPA",
    x0=x0,
    max_iter=maxit,
    stop_tol_step=stop_tol,
    step_fn=make_sppa_step(),
    residual_fn=None,
    error_fn=None,
)

logs_wppa, sum_wppa = bk.run(
    method_name="WPPA",
    x0=x0,
    max_iter=maxit,
    stop_tol_step=stop_tol,
    step_fn=make_wppa_step(),
    residual_fn=None,
    error_fn=None,
)

bk.make_standard_plots(
    logs_by_method={"SPPA": logs_sppa, "WPPA": logs_wppa},
    outdir="figures",
    tag="ex53_qep_gep",
    plot_residual=True,
    plot_error=False,
)

print(bk.latex_tables_split(
    [sum_sppa, sum_wppa],
    caption_perf="Computational performance for Experiment 3.",
    label_perf="tab:ex53_perf",
    caption_acc="Accuracy and feasibility metrics for Experiment 3.",
    label_acc="tab:ex53_acc",
))

\begin{table}[!ht]
\centering
\begin{tabular}{lrrrrrrrr}
\hline
Method & It. & Tot (s) & Avg res (s) & Step$_f$ & Avg in. & Max in. & \% max & Succ. (\%)\\
\hline
SPPA & 200 & 1.0416 & 0.00520 & 1.0166e-04 & 22.37 & 29 & 0.0 & 100.0 \\
WPPA & 200 & 0.0643 & 0.00031 & \le 1e-16 & 2.02 & 29 & 0.0 & 100.0 \\
\hline
\end{tabular}
\caption{Computational performance for Experiment 3.}
\label{tab:ex53_perf}
\end{table}

\begin{table}[!ht]
\centering
\begin{tabular}{lrr}
\hline
Method & Constr. viol. & $R(x_n)$\\
\hline
SPPA & 0 & 1.8441e-02 \\
WPPA & 0 & \le 1e-16 \\
\hline
\end{tabular}
\caption{Accuracy and feasibility metrics for Experiment 3.}
\label{tab:ex53_acc}
\end{table}

