<a href="https://colab.research.google.com/github/joblazek/psp-auction/blob/main/PSP_priority_queue_Lamport_single_seller.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [21]:
from __future__ import annotations

import math, heapq
from typing import Dict, Tuple, List, Optional
import numpy as np
import pandas as pd

# ------------------------------
# Event types
# ------------------------------
BUYER_COMPUTE = 1
POST_BID      = 2

# ------------------------------
# Market state (dict of arrays)
# ------------------------------

def make_market(I: int,
                Q_max: float,
                epsilon: float,
                budget_range=(5.0, 20.0),
                q_range=(1.0, 50.0),
                kappa_range=(1.0, 3.5),
                seed: int = 12345,
                jitter: float = 0.01,
                apply_jitter: float = 0.02) -> Dict:
    """
    Create a market with I buyers and a single seller of capacity Q_max.

    Arrays (length I):
      b       : budgets
      qbar    : capacities (per-buyer demand cap)
      kappa   : valuation intensity
      bid_q   : current bid quantity
      bid_p   : current bid price (θ'_i(bid_q))

    Diagnostics/Instrumentation:
      LC_buyer[i] : buyer i logical clock
      LC_cell[i]  : last-applied Lamport stamp for cell (i, j=0)
      retro_applies[i] : count of retrograde overwrites applied
      retro_mass[i]    : sum of retro depths applied
      retro_hist       : {depth -> count} global histogram
    """
    rng = np.random.default_rng(seed)
    b = rng.uniform(*budget_range, size=I)
    qbar = rng.uniform(*q_range, size=I)
    kappa = rng.uniform(*kappa_range, size=I)
    bid_q = np.zeros(I, dtype=float)
    bid_p = np.zeros(I, dtype=float)

    M = {
        "I": I,
        "Q_max": float(Q_max),
        "epsilon": float(epsilon),
        "b": b,
        "qbar": qbar,
        "kappa": kappa,
        "bid_q": bid_q,
        "bid_p": bid_p,

        # priority queue: (timestamp, seq, etype, payload)
        "pq": [],
        "seq": 0,
        "t": 0.0,

        # timings
        "jitter": float(jitter),
        "apply_jitter": float(apply_jitter),

        # RNG
        "rng": rng,

        # Lamport clocks
        "LC_buyer": np.zeros(I, dtype=np.int64),
        "LC_cell":  np.zeros(I, dtype=np.int64),

        # retrograde diagnostics
        "retro_applies": np.zeros(I, dtype=np.int64),
        "retro_mass":    np.zeros(I, dtype=np.int64),
        "retro_hist":    {},  # depth->count

        # counters
        "compute_count": 0,
        "apply_count":   0,

        #computation method
        "compute_method": "ladder",
    }
    return M

# ------------------------------
# Math primitives (vector-friendly)
# ------------------------------

def theta_i(i: int, z: float, M: Dict) -> float:
    """θ_i(z) = κ_i * qbar_i * m − 0.5 * κ_i * m^2, with m = min(z, qbar_i)."""
    m = min(z, float(M["qbar"][i]))
    k = float(M["kappa"][i])
    return k * float(M["qbar"][i]) * m - 0.5 * k * m * m

def theta_i_prime(i: int, z: float, M: Dict) -> float:
    """θ'_i(z) = κ_i * (qbar_i − z) for z ≤ qbar_i; 0 beyond."""
    q = float(M["qbar"][i])
    k = float(M["kappa"][i])
    return k * (q - z) if z < q else 0.0

def _others_mask(i: int, I: int) -> np.ndarray:
    m = np.ones(I, dtype=bool); m[i] = False; return m

def Q_i(i: int, p_i: float, M: Dict) -> float:
    """Q_i(p_i; s_-i): remaining seller capacity after removing opponents with p_j > p_i."""
    mask = _others_mask(i, M["I"])
    rem = M["Q_max"] - float(np.sum(M["bid_q"][mask][M["bid_p"][mask] > p_i]))
    return max(rem, 0.0)

def Q_i_bar(i: int, p_i: float, M: Dict) -> float:
    """Q̄_i(p_i; s_-i): remaining capacity after removing opponents with p_j ≥ p_i."""
    mask = _others_mask(i, M["I"])
    rem = M["Q_max"] - float(np.sum(M["bid_q"][mask][M["bid_p"][mask] >= p_i]))
    return max(rem, 0.0)

def P_i(i: int, z: float, M: Dict) -> float:
    """
    Price density P_i(z; s_-i) = inf { y ≥ 0 : Q_i(y) ≥ z }.
    Evaluate by scanning candidates {0} ∪ {p_j: j ≠ i}.
    """
    others = _others_mask(i, M["I"])
    cand = np.unique(np.concatenate(([0.0], M["bid_p"][others])))
    for y in cand:
        if Q_i(i, float(y), M) >= z:
            return float(y)
    return float("inf")

def integral_P(i: int, z: float, M: Dict, ladder: Optional[List[Tuple[float,float]]] = None) -> float:
    """Exact ∫₀ᶻ P_i(ζ; s_-i) dζ via the ladder."""
    if z <= 0.0:
        return 0.0
    if ladder is None:
        ladder = build_price_ladder(i, M)
    rem = float(z)
    cost = 0.0
    for size, price in ladder:
        if rem <= 0.0:
            break
        take = min(size, rem)
        cost += take * price
        rem -= take
    # if z > Q_max, we just cap implicitly
    return float(cost)

def a_i(i: int, M: Dict) -> float:
    """Allocation a_i = min(q_i, Q̄_i(p_i))."""
    q, p = float(M["bid_q"][i]), float(M["bid_p"][i])
    return min(q, Q_i_bar(i, p, M))

def c_i(i: int, M: Dict) -> float:
    """Cost c_i = ∫₀^{a_i} P_i(ζ) dζ."""
    return integral_P_exact(i, a_i(i, M), M)

def u_i(i: int, M: Dict) -> float:
    """Utility u_i = θ_i(a_i) − c_i."""
    a = a_i(i, M)
    return theta_i(i, a, M) - c_i(i, M)

# sup G_i and t_i

def sup_G_i(i: int, M: Dict) -> float:
    """
    Exact sup G_i by walking the ladder and enforcing, within each slice
    (size S at constant price r):
        (i) budget:   cost + r * Δz ≤ b_i  → Δz ≤ (b - cost)/r  (∞ if r=0)
        (ii) marginal: r ≤ κ_i( q̄_i - (z_so_far + Δz) ) → z_so_far + Δz ≤ q̄_i - r/κ_i
        (iii) demand:  z_so_far + Δz ≤ q̄_i
        (iv) slice cap: Δz ≤ S
    """
    ladder = build_price_ladder(i, M)

    kappa = float(M["kappa"][i])
    qbar  = float(M["qbar"][i])
    budget = float(M["b"][i])
    z = 0.0
    cost = 0.0
    for size, r in ladder:
        if z >= qbar:
            break
        # demand limit in this slice
        add_demand = qbar - z
        # marginal limit in this slice
        if kappa > 0.0:
            z_marg_lim = max(0.0, qbar - r / kappa)
        else:
            z_marg_lim = 0.0  # no value → nothing purchasable at positive r
        add_marg = max(0.0, z_marg_lim - z)
        # budget limit in this slice
        if r <= 0.0:
            add_budget = float("inf")
        else:
            add_budget = max(0.0, (budget - cost) / r)
        add = min(size, add_demand, add_marg, add_budget)
        if add <= 0.0:
            break
        z    += add
        cost += add * r
        # stop early if any constraint bound hit before finishing the slice
        if add < size:
            break
    return float(z)

def compute_t_i(i: int, M: Dict) -> Tuple[float, float]:
    """
    t_i = (v_i, w_i) where v_i = max( sup_G_i − ε/base, 0 ), base = θ'_i(0) = κ_i q̄_i,
    and w_i = θ'_i(v_i).
    """
    Gsup = sup_G_i(i, M)
    base = theta_i_prime(i, 0.0, M)  # = κ_i q̄_i
    v = max(Gsup - (float(M["epsilon"]) / base if base > 0 else 0.0), 0.0)
    w = theta_i_prime(i, v, M)
    return v, w

# ------------------------------
# Priority queue engine
# ------------------------------

def push(M: Dict, t: float, etype: int, payload: Tuple):
    M["seq"] += 1
    heapq.heappush(M["pq"], (float(t), int(M["seq"]), int(etype), payload))

def pop(M: Dict):
    return heapq.heappop(M["pq"]) if M["pq"] else None

def schedule_all_buyers(M: Dict, t0: float = 0.0):
    rng = M["rng"]
    for i in range(M["I"]):
        push(M, t0 + float(rng.random() * M["jitter"]), BUYER_COMPUTE, (i,))

# ------------------------------
# Ladder helpers
# ------------------------------
def build_price_ladder(i: int, M: Dict) -> List[Tuple[float, float]]:
    """
    Exact price ladder for P_i as slices (slice_size, price).
    For candidates y in ascending {0} ∪ {p_j: j≠i}, Q_i(y) is nondecreasing;
    the increment at y is the new capacity unlocked at that price.
    """
    others = _others_mask(i, M["I"])
    cand = np.unique(np.concatenate(([0.0], M["bid_p"][others])))
    cand = cand[np.isfinite(cand)]
    cand.sort()
    ladder: List[Tuple[float, float]] = []
    prev = 0.0
    prev_rem = Q_i(i, float(cand[0]), M)
    tol = 1e-12
    # initial slice at price cand[0] (often 0.0)
    if prev_rem > tol:
        ladder.append((prev_rem - prev, float(cand[0])))
        prev = prev_rem
    # subsequent slices
    for y in cand[1:]:
        rem = Q_i(i, float(y), M)
        delta = rem - prev
        if delta > tol:
            ladder.append((float(delta), float(y)))
            prev = rem
    # by construction Q_i(max(cand)) = Q_max; prev should already equal Q_max
    return ladder

# ------------------------------
# Lamport helpers (single-seller cells)
# ------------------------------

def lamport_on_read(i: int, M: Dict):
    """
    BUYER_COMPUTE(i) read:
      seen = max(LC_cell)    # all cells for single-seller
      LC_buyer[i] = max(LC_buyer[i], seen) + 1
    """
    seen = int(np.max(M["LC_cell"]))
    M["LC_buyer"][i] = max(int(M["LC_buyer"][i]), seen) + 1

def lamport_on_apply(i: int, lc_send: int, M: Dict) -> int:
    """
    On apply POST_BID:
      retro = max(0, LC_cell[i] - lc_send)
      LC_cell[i] = max(LC_cell[i], lc_send) + 1
    Returns retro depth.
    """
    cell = int(M["LC_cell"][i])
    retro = max(0, cell - int(lc_send))
    M["LC_cell"][i] = max(cell, int(lc_send)) + 1
    return retro

def _bump_retro_stats(i: int, retro: int, M: Dict):
    if retro <= 0: return
    M["retro_applies"][i] += 1
    M["retro_mass"][i]    += retro
    M["retro_hist"][retro] = M["retro_hist"].get(retro, 0) + 1

# ------------------------------
# Event handlers
# ------------------------------

def handle_buyer_compute(M, i, verbose=False):
    """
    Compute candidate (v,w) and schedule a delayed POST_BID with lc_send.
    Buyer-side ε-guard on the snapshot it read.
    """
    M["compute_count"] += 1

    # Lamport read tick (local)
    lamport_on_read(i, M)
    lc_send = int(M["LC_buyer"][i])

    # Snapshot utility (what buyer actually saw)
    old_q, old_p = M["bid_q"][i], M["bid_p"][i]
    old_u = u_i(i, M)

    # Candidate from Algorithm 1
    v, w = compute_t_i(i, M)

    # Evaluate Δu on the same snapshot
    M["bid_q"][i], M["bid_p"][i] = v, w
    new_u = u_i(i, M)
    M["bid_q"][i], M["bid_p"][i] = old_q, old_p

    if new_u <= old_u + float(M["epsilon"]):
        if verbose:
            print(f"BUYER_COMPUTE i={i}: suppressed (Δu={new_u-old_u:.6f} ≤ ε)")
        return False
    else:
        delay = 1e-3 + float(M["rng"].random() * M["apply_jitter"])
        push(M, M["t"] + delay, POST_BID, (i, float(v), float(w), lc_send))
        if verbose:
            print(f"BUYER_COMPUTE i={i}: accepted (Δu={new_u-old_u:.6f})")
        return True

def handle_post_bid(M, i, v, w, lc_send, verbose=False):
    """
    Apply candidate (v,w). Seller is a dumb mailbox.
    Lamport retrograde depth is recorded.
    """
    prev_q, prev_p = M["bid_q"][i], M["bid_p"][i]
    M["bid_q"][i], M["bid_p"][i] = v, w
    M["apply_count"] += 1

    # Diagnostics
    _, pstar = winners_and_pstar(M)
    M["p_star"] = pstar

    retro = lamport_on_apply(i, lc_send, M)
    _bump_retro_stats(i, retro, M)
    if verbose:
        print(f"POST_BID applied: i={i}, (q,p)=({prev_q:.4f},{prev_p:.4f}) -> ({v:.4f},{w:.4f}), "
              f"p_star={M.get('p_star',0.0):.4f}, retro={retro}")

# ------------------------------
# Runner
# ------------------------------

def run(M: Dict, steps: int = 1000, verbose: bool = False):
    for _ in range(steps):
        item = pop(M)
        if item is None: break
        t, _, etype, payload = item
        M["t"] = float(t)
        if etype == BUYER_COMPUTE:
            (i,) = payload
            handle_buyer_compute(M, i, verbose=verbose)
            # re-schedule next compute "Poisson-like" clock
            next_t = M["t"] + 1.0 + float(M["rng"].random() * M["jitter"])
            push(M, next_t, BUYER_COMPUTE, (i,))
        elif etype == POST_BID:
            i, v, w, lc_send = payload
            handle_post_bid(M, int(i), float(v), float(w), int(lc_send), verbose=verbose)
        else:
            raise RuntimeError(f"Unknown event type: {etype}")

# ------------------------------
# Diagnostics / Reporting
# ------------------------------

def winners_and_pstar(M: Dict) -> Tuple[List[int], float]:
    """
    Single-seller clearing winners and p* for diagnostics
    (not used for control; your a_i() remains the alloc rule).
    """
    I = M["I"]
    bids = [(i, float(M["bid_q"][i]), float(M["bid_p"][i])) for i in range(I)]
    # sort by price desc
    bids.sort(key=lambda x: x[2], reverse=True)
    cap = M["Q_max"]
    winners, taken = [], 0.0
    p_star = 0.0
    for (i, q, p) in bids:
        if taken >= cap:
            p_star = float(p)
            break
        take = min(q, cap - taken)
        if take > 0.0:
            winners.append(i)
            taken += take
    # If capacity never filled, p* tends to reserve (≈0 here)
    return winners, float(p_star)

def market_totals(M: Dict) -> Tuple[float, float, float]:
    """(TotalAlloc, TotalValue, TotalUtility)."""
    I = M["I"]
    a = np.array([a_i(i, M) for i in range(I)], dtype=float)
    val = np.array([theta_i(i, a[i], M) for i in range(I)], dtype=float)
    cost = np.array([c_i(i, M) for i in range(I)], dtype=float)
    util = val - cost
    return float(a.sum()), float(val.sum()), float(util.sum())

def print_market_config(M: Dict, round_decimals: int = 3, return_frames: bool = False):
    """Nice tabular dump of runtime parameters and buyer params."""
    I = M["I"]
    df_buyers = pd.DataFrame({
        "Buyer": [f"B_{i}" for i in range(I)],
        "b_i":   np.round(M["b"], round_decimals),
        "q̄_i":   np.round(M["qbar"], round_decimals),
        "κ_i":   np.round(M["kappa"], round_decimals),
    })
    meta = {
        "I": I,
        "Q_max": M["Q_max"],
        "epsilon": M["epsilon"],
        "jitter": M["jitter"],
        "apply_jitter": M["apply_jitter"],
    }
    df_meta = pd.DataFrame([meta])
    if return_frames:
        return df_meta, df_buyers
    else:
        print("\n=== Market Config ===")
        print(df_meta.to_string(index=False))
        print("\n=== Buyers ===")
        print(df_buyers.to_string(index=False))

def print_snapshot_wide(M: Dict,
                        cols: Tuple[str, ...] = ("Buyer","q_i","p_i","a_i","c_i","u_i"),
                        round_decimals: int = 3,
                        include_totals: bool = False) -> pd.DataFrame:
    """Row per buyer with computed fields; optional totals row."""
    I = M["I"]
    rows = []
    for i in range(I):
        row = {
            "Buyer": f"B_{i}",
            "q_i": float(M["bid_q"][i]),
            "p_i": float(M["bid_p"][i]),
            "a_i": a_i(i, M),
            "c_i": c_i(i, M),
            "u_i": u_i(i, M),
        }
        rows.append(row)
    df = pd.DataFrame(rows)
    df = df[[c for c in cols if c in df.columns]]
    df = df.round(round_decimals)
    if include_totals:
        Talloc, Tval, Tutil = market_totals(M)
        trow = {k: "" for k in df.columns}
        trow[df.columns[0]] = "TOTALS"
        if "a_i" in df.columns:
            trow["a_i"] = round(Talloc, round_decimals)
        if "u_i" in df.columns:
            trow["u_i"] = round(Tutil, round_decimals)
        df = pd.concat([df, pd.DataFrame([trow])], ignore_index=True)
    return df

def print_snapshot_summary(M: Dict, round_decimals: int = 3) -> pd.DataFrame:
    """Compact run summary (totals, winners count, p*)."""
    Talloc, Tval, Tutil = market_totals(M)
    winners, pstar = winners_and_pstar(M)
    retro_total = int(np.sum(M["retro_applies"]))
    out = pd.DataFrame([{
        "t":        round(M["t"], round_decimals),
        "TotalAlloc": round(Talloc, round_decimals),
        "TotalValue": round(Tval, round_decimals),
        "TotalUtil":  round(Tutil, round_decimals),
        "Winners":    len(winners),
        "p*":         round(pstar, round_decimals),
        "RetroApplies": retro_total,
        "Computes": int(M["compute_count"]),
        "Applies":  int(M["apply_count"]),
    }])
    return out

def retro_stats(M: Dict) -> pd.DataFrame:
    """Histogram of retrograde depths and per-buyer aggregates."""
    hist_rows = [{"retro_depth": k, "count": v} for k, v in sorted(M["retro_hist"].items())]
    df_hist = pd.DataFrame(hist_rows) if hist_rows else pd.DataFrame(columns=["retro_depth","count"])
    df_buyer = pd.DataFrame({
        "Buyer": [f"B_{i}" for i in range(M["I"])],
        "retro_applies": M["retro_applies"],
        "retro_mass":    M["retro_mass"],
        "LC_cell":       M["LC_cell"],
        "LC_buyer":      M["LC_buyer"],
    })
    return df_hist, df_buyer

# ------------------------------
# Example usage
# ------------------------------

if __name__ == "__main__":
    # Build market
    M = make_market(I=10, Q_max=100.0, epsilon=5.0, seed=2025,
                    jitter=0.01, apply_jitter=0.02)

    # Optional: randomize initial bids to avoid zero-price symmetry
    for i in range(M["I"]):
        q = float(M["rng"].uniform(0.0, M["qbar"][i]))
        p = theta_i_prime(i, q, M)
        M["bid_q"][i], M["bid_p"][i] = q, p

    # Pretty print config
    cfg, buyers = print_market_config(M, round_decimals=3, return_frames=True)
    print(cfg.to_string(index=False)); print(buyers.to_string(index=False))

    # Schedule buyers and run
    schedule_all_buyers(M, t0=0.0)
    run(M, steps=1000, verbose=True)

    # Snapshots
    wide = print_snapshot_wide(M,
                               cols=("Buyer","q_i","p_i","a_i","c_i","u_i"),
                               round_decimals=3,
                               include_totals=True)
    summary = print_snapshot_summary(M, round_decimals=3)
    df_hist, df_buyer = retro_stats(M)

    # Display (stdout)
    print("\n=== Snapshot ===")
    print(wide.to_string(index=False))
    print("\n=== Summary ===")
    print(summary.to_string(index=False))
    print("\n=== Retrograde Histogram ===")
    print(df_hist.to_string(index=False))
    print("\n=== Retrograde by Buyer ===")
    print(df_buyer.to_string(index=False))

    Talloc, Tval, Tutil = market_totals(M)
    print(f"\nMarket Totals -> TotalAlloc: {Talloc:.2f}, TotalValue: {Tval:.2f}, TotalUtility: {Tutil:.2f}")


 I  Q_max  epsilon  jitter  apply_jitter
10  100.0      5.0    0.01          0.02
Buyer    b_i   q̄_i   κ_i
  B_0 19.917 20.088 1.519
  B_1 10.730 12.281 2.448
  B_2 17.407  9.181 2.477
  B_3 17.559  8.518 3.220
  B_4 19.637 48.730 1.275
  B_5  6.158 21.771 2.241
  B_6  9.762  4.094 1.725
  B_7 18.793 25.088 2.215
  B_8 15.138 15.129 3.371
  B_9  9.287 11.369 3.316
BUYER_COMPUTE i=6: suppressed (Δu=0.000000 ≤ ε)
BUYER_COMPUTE i=1: accepted (Δu=26.330681)
BUYER_COMPUTE i=0: accepted (Δu=52.169352)
BUYER_COMPUTE i=5: suppressed (Δu=-349.189600 ≤ ε)
BUYER_COMPUTE i=3: suppressed (Δu=-20.400519 ≤ ε)
BUYER_COMPUTE i=8: suppressed (Δu=-200.279724 ≤ ε)
BUYER_COMPUTE i=4: suppressed (Δu=-500.876096 ≤ ε)
BUYER_COMPUTE i=2: suppressed (Δu=-4.019087 ≤ ε)
BUYER_COMPUTE i=9: suppressed (Δu=-65.993773 ≤ ε)
BUYER_COMPUTE i=7: suppressed (Δu=-400.978350 ≤ ε)
POST_BID applied: i=0, (q,p)=(15.2686,7.3180) -> (2.4750,26.7461), p_star=6.5518, retro=0
POST_BID applied: i=1, (q,p)=(9.6039,6.5518) -> (1.2554