smart grid simulation

----

In [14]:
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Dict, Optional, List, Tuple, Literal
from enum import Enum, auto
from collections import deque
import heapq
import numpy as np
import pandas as pd
import warnings

# silence future pandas groupby warning in our summaries (optional)
warnings.filterwarnings("ignore", category=FutureWarning, module="pandas.core.groupby")

# reproducibility
DEFAULT_SEED = 42
rng = np.random.default_rng(DEFAULT_SEED)

Time = float
ID = int


In [15]:
class PolicyType(Enum):
    FIFO = auto()
    NPPS = auto()   # Non-Preemptive Priority Scheduling
    WRR  = auto()   # Weighted Round Robin
    EDF  = auto()   # Earliest Deadline First

@dataclass
class SimConfig:
    # --- non-defaults first ---
    chi_arrival: float                   # χ — Poisson arrival rate
    lambda_ctrl: float                   # λ1 — controller service rate (exp)
    lambda_res: Dict[str, float]         # λ2 per resource {'PV':6, 'BAT':12, ...}
    setup_delay: float                   # t — fixed setup delay (ctrl & res)
    T: Time                              # total horizon
    N_ctrl: int                          # number of controller servers (M/M/N)
    transfer_overhead: Dict[str, float]  # C_time per resource (routing delay)
    route_probs: Dict[str, float]        # P routing probabilities to resources

    # --- defaults below ---
    ctrl_policy: PolicyType = PolicyType.FIFO

    # priorities & default deadlines (slots interpreted in same time unit)
    group_priority: Dict[str, int] = field(
        default_factory=lambda: {"essential": 3, "delay_sensitive": 2, "delay_tolerant": 1}
    )
    default_deadline_slots: Dict[str, Tuple[int,int]] = field(
        default_factory=lambda: {"delay_sensitive": (1,4), "delay_tolerant": (3,12)}
    )
    seed: int = DEFAULT_SEED

    def validate(self):
        assert self.chi_arrival > 0, "chi_arrival must be > 0"
        assert self.lambda_ctrl > 0, "lambda_ctrl must be > 0"
        assert all(v > 0 for v in self.lambda_res.values()), "All lambda_res must be > 0"
        assert self.N_ctrl >= 1, "N_ctrl must be >= 1"
        assert self.T > 0, "T must be > 0"
        s = sum(self.route_probs.values())
        assert abs(s - 1.0) < 1e-8, f"route_probs must sum to 1 (got {s})"
        # keys must align
        for r in self.transfer_overhead:
            assert r in self.lambda_res, f"transfer_overhead key {r} missing in lambda_res"
        for r in self.lambda_res:
            assert r in self.transfer_overhead, f"lambda_res key {r} missing in transfer_overhead"


In [16]:
from math import inf

@dataclass
class Request:
    rid: ID
    group: str                          # 'essential' | 'delay_sensitive' | 'delay_tolerant' | ...
    priority: int                       # for NPPS
    arrival_time: Time
    deadline_time: Optional[Time] = None

    # timestamps through system
    t_ctrl_start: Optional[Time] = None
    t_ctrl_end:   Optional[Time] = None
    routed_to:    Optional[str]  = None
    t_route_done: Optional[Time] = None
    t_res_start:  Optional[Time] = None
    t_res_end:    Optional[Time] = None

class BaseQueuePolicy:
    def push(self, req: Request) -> None: ...
    def pop(self, now: Time) -> Optional[Request]: ...
    def __len__(self) -> int: ...

class FIFOQueue(BaseQueuePolicy):
    def __init__(self): self._q = deque()
    def push(self, req: Request) -> None: self._q.append(req)
    def pop(self, now: Time) -> Optional[Request]:
        return self._q.popleft() if self._q else None
    def __len__(self) -> int: return len(self._q)

class NPPSQueue(BaseQueuePolicy):
    """Higher priority first; FIFO within same priority."""
    def __init__(self):
        self._counter = 0
        self._heap: List[Tuple[int, int, Request]] = []  # (-priority, order, req)
    def push(self, req: Request) -> None:
        self._counter += 1
        heapq.heappush(self._heap, (-int(req.priority), self._counter, req))
    def pop(self, now: Time) -> Optional[Request]:
        if not self._heap: return None
        _, _, req = heapq.heappop(self._heap)
        return req
    def __len__(self) -> int: return len(self._heap)

class EDFQueue(BaseQueuePolicy):
    """Earliest absolute deadline first; FIFO on ties; NaN/None treated as +inf."""
    def __init__(self):
        self._counter = 0
        self._heap: List[Tuple[float, int, Request]] = []  # (deadline, order, req)
    def push(self, req: Request) -> None:
        self._counter += 1
        dl = req.deadline_time if (req.deadline_time is not None) else inf
        heapq.heappush(self._heap, (float(dl), self._counter, req))
    def pop(self, now: Time) -> Optional[Request]:
        if not self._heap: return None
        _, _, req = heapq.heappop(self._heap)
        return req
    def __len__(self) -> int: return len(self._heap)

class WRRQueue(BaseQueuePolicy):
    """
    Weighted Round Robin over groups (each group has FIFO).
    Weights are positive numbers; internally normalized to small integers.
    """
    def __init__(self, group_weights: Dict[str, float]):
        assert all(w > 0 for w in group_weights.values()), "WRR weights must be > 0"
        self.fifos: Dict[str, deque] = {g: deque() for g in group_weights}
        # normalize to small integers for quotas
        g, w = zip(*group_weights.items())
        base = min(w)
        ints = {gi: max(1, int(round(wi / base))) for gi, wi in group_weights.items()}
        self.weights = ints
        self.groups = list(self.fifos.keys())
        self.idx = 0
        self.quota_left = dict(self.weights)

    def push(self, req: Request) -> None:
        if req.group not in self.fifos:
            # unseen group gets weight 1 lazily
            self.fifos[req.group] = deque()
            self.weights[req.group] = 1
            self.groups.append(req.group)
            self.quota_left[req.group] = 1
        self.fifos[req.group].append(req)

    def _advance(self): self.idx = (self.idx + 1) % len(self.groups)
    def _has_any(self) -> bool: return any(self.fifos[g] for g in self.groups)

    def pop(self, now: Time) -> Optional[Request]:
        if not self._has_any(): return None
        tried = 0
        while tried < len(self.groups):
            g = self.groups[self.idx]
            # reset cycle quotas if all exhausted
            if all(self.quota_left[x] == 0 for x in self.groups):
                self.quota_left = dict(self.weights)
            if self.quota_left[g] == 0 or not self.fifos[g]:
                self._advance(); tried += 1; continue
            self.quota_left[g] -= 1
            return self.fifos[g].popleft()
        # queues changed; reset quotas and retry once
        self.quota_left = dict(self.weights)
        return self.pop(now)


In [17]:
def make_queue(policy: PolicyType, *, wrr_weights: Optional[Dict[str, float]] = None) -> BaseQueuePolicy:
    if policy == PolicyType.FIFO: return FIFOQueue()
    if policy == PolicyType.NPPS: return NPPSQueue()
    if policy == PolicyType.EDF:  return EDFQueue()
    if policy == PolicyType.WRR:
        assert wrr_weights is not None and len(wrr_weights) > 0, "Provide wrr_weights for WRR"
        return WRRQueue(wrr_weights)
    raise NotImplementedError(policy)


In [18]:
class EventType(Enum):
    ARRIVAL = auto()       # to controller
    CTRL_FINISH = auto()   # controller finished a request
    RES_ARRIVAL = auto()   # arrival to a resource after transfer overhead
    RES_FINISH = auto()    # resource finished

@dataclass(order=True)
class Event:
    time: Time
    etype: EventType
    payload: dict = field(compare=False)

class ResourceServer:
    """Single-server M/M/1 resource with FIFO queue (per resource)."""
    def __init__(self, name: str, rate: float, setup_delay: float):
        self.name = name
        self.rate = rate
        self.setup_delay = setup_delay
        self.queue = deque()
        self.busy_until: Time = 0.0
        self.in_service: Optional[Request] = None

    def push(self, req: Request):
        self.queue.append(req)

    def try_start(self, now: Time, evq: List[Event]):
        if self.in_service is not None: return False
        if self.queue and self.busy_until <= now:
            req = self.queue.popleft()
            req.t_res_start = now
            svc = rng.exponential(1.0 / self.rate) + self.setup_delay
            finish = now + svc
            self.busy_until = finish
            self.in_service = req
            heapq.heappush(evq, Event(finish, EventType.RES_FINISH, {"res": self.name, "req": req}))
            return True
        return False

    def on_finish(self, now: Time):
        req = self.in_service
        if req: req.t_res_end = now
        self.in_service = None
        self.busy_until = now

class ResourcePool:
    def __init__(self, cfg: SimConfig):
        self.cfg = cfg
        self.resources: Dict[str, ResourceServer] = {
            rname: ResourceServer(rname, rate=lambda_rate, setup_delay=cfg.setup_delay)
            for rname, lambda_rate in cfg.lambda_res.items()
        }

    def route_choice(self) -> str:
        names = list(self.cfg.route_probs.keys())
        probs = np.array(list(self.cfg.route_probs.values()))
        return rng.choice(names, p=probs)

    def on_ctrl_finish(self, req: Request, now: Time, evq: List[Event]):
        rname = self.route_choice()
        req.routed_to = rname
        req.t_ctrl_end = now
        overhead = self.cfg.transfer_overhead.get(rname, 0.0)
        arrival_at_res = now + overhead
        req.t_route_done = arrival_at_res
        heapq.heappush(evq, Event(arrival_at_res, EventType.RES_ARRIVAL, {"res": rname, "req": req}))

    def on_res_arrival(self, rname: str, req: Request, now: Time, evq: List[Event]):
        res = self.resources[rname]
        res.push(req)
        res.try_start(now, evq)

    def on_res_finish(self, rname: str, now: Time, evq: List[Event]):
        res = self.resources[rname]
        res.on_finish(now)
        res.try_start(now, evq)

class MultiServerController:
    """Controller M/M/N with pluggable queue."""
    def __init__(self, cfg: SimConfig):
        self.cfg = cfg
        if cfg.ctrl_policy == PolicyType.WRR:
            # by default map weights from priorities (custom dict welcome too)
            wrr_weights = {g: float(p) for g, p in cfg.group_priority.items()}
            self.queue = make_queue(cfg.ctrl_policy, wrr_weights=wrr_weights)
        else:
            self.queue = make_queue(cfg.ctrl_policy)
        self.server_busy_until: List[Time] = [0.0] * cfg.N_ctrl
        self.in_service: Dict[int, Request] = {}

    def _first_free_server(self, now: Time) -> Optional[int]:
        for sid, busy_until in enumerate(self.server_busy_until):
            if busy_until <= now and self.in_service.get(sid) is None:
                return sid
        return None

    def try_start_service(self, now: Time, evq: List[Event]):
        started = 0
        while True:
            sid = self._first_free_server(now)
            if sid is None: break
            req = self.queue.pop(now)
            if req is None: break
            req.t_ctrl_start = now
            svc = rng.exponential(1.0 / self.cfg.lambda_ctrl) + self.cfg.setup_delay
            finish = now + svc
            self.server_busy_until[sid] = finish
            self.in_service[sid] = req
            heapq.heappush(evq, Event(finish, EventType.CTRL_FINISH, {"sid": sid, "req": req}))
            started += 1
        return started

    def on_arrival(self, req: Request, now: Time, evq: List[Event]):
        self.queue.push(req)
        self.try_start_service(now, evq)

    def on_finish(self, sid: int, now: Time) -> Request:
        self.server_busy_until[sid] = now
        return self.in_service.pop(sid)


In [19]:
def generate_next_arrival(prev_time: Time, chi: float) -> Time:
    return prev_time + rng.exponential(1.0 / chi)

def _sample_deadline_for_group(cfg: SimConfig, group: str, arrival_time: float) -> Optional[float]:
    if group in cfg.default_deadline_slots:
        lo, hi = cfg.default_deadline_slots[group]
        delta = float(rng.uniform(lo, hi))
        return arrival_time + delta
    return None

def bootstrap_requests(cfg: SimConfig, group_mix: Dict[str, float]) -> List[Request]:
    assert abs(sum(group_mix.values()) - 1.0) < 1e-8, "group_mix must sum to 1"
    t = 0.0
    rid = 0
    reqs: List[Request] = []
    groups = list(group_mix.keys())
    probs  = np.array(list(group_mix.values()))
    while t < cfg.T:
        t = generate_next_arrival(t, cfg.chi_arrival)
        if t >= cfg.T: break
        g = rng.choice(groups, p=probs)
        pr = cfg.group_priority.get(g, 1)
        dl = _sample_deadline_for_group(cfg, g, t)
        reqs.append(Request(
            rid=rid, group=g, priority=pr,
            arrival_time=t, deadline_time=dl
        ))
        rid += 1
    return reqs


In [20]:
def generate_next_arrival(prev_time: Time, chi: float) -> Time:
    return prev_time + rng.exponential(1.0 / chi)

def _sample_deadline_for_group(cfg: SimConfig, group: str, arrival_time: float) -> Optional[float]:
    if group in cfg.default_deadline_slots:
        lo, hi = cfg.default_deadline_slots[group]
        delta = float(rng.uniform(lo, hi))
        return arrival_time + delta
    return None

def bootstrap_requests(cfg: SimConfig, group_mix: Dict[str, float]) -> List[Request]:
    assert abs(sum(group_mix.values()) - 1.0) < 1e-8, "group_mix must sum to 1"
    t = 0.0
    rid = 0
    reqs: List[Request] = []
    groups = list(group_mix.keys())
    probs  = np.array(list(group_mix.values()))
    while t < cfg.T:
        t = generate_next_arrival(t, cfg.chi_arrival)
        if t >= cfg.T: break
        g = rng.choice(groups, p=probs)
        pr = cfg.group_priority.get(g, 1)
        dl = _sample_deadline_for_group(cfg, g, t)
        reqs.append(Request(
            rid=rid, group=g, priority=pr,
            arrival_time=t, deadline_time=dl
        ))
        rid += 1
    return reqs


In [21]:
def summarize_metrics(df: pd.DataFrame):
    """
    Returns (overall_df, per_group_df, miss_df or None)
    overall/per_group: count/valid, mean, median, p95 for core timing columns.
    miss_df: per-group deadline meeting/miss stats (+ __OVERALL__), if 'deadline' present.
    """
    if df.empty:
        return pd.DataFrame(), pd.DataFrame(), None

    cols = ["wait_ctrl", "svc_ctrl", "transfer", "wait_res", "svc_res", "total_response"]

    overall = df[cols].agg(["count", "mean", "median", lambda s: s.quantile(0.95)]).T
    overall.columns = ["count/valid", "mean", "median", "p95"]
    overall = overall.sort_index()

    # exclude grouping col from apply to avoid future behavior change
    per_group = df.groupby("group", dropna=False)[cols].apply(
        lambda g: g.agg(["count", "mean", "median", lambda s: s.quantile(0.95)]).T
    )
    per_group.columns = ["count/valid", "mean", "median", "p95"]

    # deadline miss table
    miss_table = None
    if "deadline" in df.columns:
        m = df.dropna(subset=["deadline"]).copy()
        if not m.empty:
            m["deadline_met"] = (m["res_end"] <= m["deadline"]).astype(int)
            g = m.groupby("group", dropna=False)["deadline_met"].agg(count="count", met="sum")
            g["misses"] = g["count"] - g["met"]
            g["miss_rate"] = g["misses"] / g["count"]
            overall_miss = pd.DataFrame({
                "count": [g["count"].sum()],
                "met": [g["met"].sum()],
                "misses": [g["misses"].sum()],
                "miss_rate": [g["misses"].sum() / max(1, g["count"].sum())]
            }, index=["__OVERALL__"])
            miss_table = pd.concat([g, overall_miss])

    return overall, per_group, miss_table


In [22]:
def run_simulation(cfg: SimConfig, group_mix: Dict[str, float]) -> pd.DataFrame:
    cfg.validate()
    # reset RNG per run
    global rng
    rng = np.random.default_rng(cfg.seed)

    reqs = bootstrap_requests(cfg, group_mix)
    controller = MultiServerController(cfg)
    pool = ResourcePool(cfg)

    # prime event queue with arrivals
    evq: List[Event] = []
    for req in reqs:
        heapq.heappush(evq, Event(req.arrival_time, EventType.ARRIVAL, {"req": req}))

    # main loop
    while evq:
        ev = heapq.heappop(evq)
        now = ev.time
        if now > cfg.T: break

        if ev.etype == EventType.ARRIVAL:
            controller.on_arrival(ev.payload["req"], now, evq)

        elif ev.etype == EventType.CTRL_FINISH:
            sid = ev.payload["sid"]
            req: Request = controller.on_finish(sid, now)
            pool.on_ctrl_finish(req, now, evq)
            controller.try_start_service(now, evq)

        elif ev.etype == EventType.RES_ARRIVAL:
            rname = ev.payload["res"]
            req: Request = ev.payload["req"]
            pool.on_res_arrival(rname, req, now, evq)

        elif ev.etype == EventType.RES_FINISH:
            rname = ev.payload["res"]
            pool.on_res_finish(rname, now, evq)

    # assemble results
    finished: List[Request] = [r for r in reqs if r.t_res_end is not None]

    def safe(x): return None if x is None else float(x)

    rows = []
    for r in finished:
        wait_ctrl = (r.t_ctrl_start - r.arrival_time) if (r.t_ctrl_start is not None) else None
        svc_ctrl  = (r.t_ctrl_end - r.t_ctrl_start) if (r.t_ctrl_end is not None and r.t_ctrl_start is not None) else None
        transfer  = (r.t_route_done - r.t_ctrl_end) if (r.t_route_done is not None and r.t_ctrl_end is not None) else None
        wait_res  = (r.t_res_start - r.t_route_done) if (r.t_res_start is not None and r.t_route_done is not None) else None
        svc_res   = (r.t_res_end - r.t_res_start) if (r.t_res_end is not None and r.t_res_start is not None) else None
        total     = (r.t_res_end - r.arrival_time) if (r.t_res_end is not None) else None

        rows.append({
            "rid": r.rid,
            "group": r.group,
            "priority": r.priority,
            "arrival": safe(r.arrival_time),
            "deadline": safe(r.deadline_time),
            "ctrl_start": safe(r.t_ctrl_start),
            "ctrl_end": safe(r.t_ctrl_end),
            "routed_to": r.routed_to,
            "route_done": safe(r.t_route_done),
            "res_start": safe(r.t_res_start),
            "res_end": safe(r.t_res_end),
            "wait_ctrl": safe(wait_ctrl),
            "svc_ctrl": safe(svc_ctrl),
            "transfer": safe(transfer),
            "wait_res": safe(wait_res),
            "svc_res": safe(svc_res),
            "total_response": safe(total),
        })

    return pd.DataFrame(rows)


In [23]:
def run_policy_sweep(cfg_base: SimConfig, group_mix: Dict[str, float],
                     policies: List[PolicyType]) -> pd.DataFrame:
    """
    Runs multiple controller policies on the same base config (cloned per run)
    and returns a compact summary table.
    """
    rows = []
    for pol in policies:
        cfg = SimConfig(
            chi_arrival=cfg_base.chi_arrival,
            lambda_ctrl=cfg_base.lambda_ctrl,
            lambda_res=dict(cfg_base.lambda_res),
            setup_delay=cfg_base.setup_delay,
            T=cfg_base.T,
            N_ctrl=cfg_base.N_ctrl,
            transfer_overhead=dict(cfg_base.transfer_overhead),
            route_probs=dict(cfg_base.route_probs),
            ctrl_policy=pol,
            group_priority=dict(cfg_base.group_priority),
            default_deadline_slots=dict(cfg_base.default_deadline_slots),
            seed=cfg_base.seed
        )
        df = run_simulation(cfg, group_mix)
        overall, per_group, miss = summarize_metrics(df)

        row = {
            "policy": pol.name,
            "count": overall.loc["total_response", "count/valid"],
            "total_mean": overall.loc["total_response", "mean"],
            "total_p95": overall.loc["total_response", "p95"],
            "wait_ctrl_mean": overall.loc["wait_ctrl", "mean"],
            "wait_res_mean": overall.loc["wait_res", "mean"],
            "svc_ctrl_mean": overall.loc["svc_ctrl", "mean"],
            "svc_res_mean": overall.loc["svc_res", "mean"],
            "transfer_mean": overall.loc["transfer", "mean"],
            "deadline_miss_rate": np.nan
        }
        if miss is not None and "__OVERALL__" in miss.index:
            row["deadline_miss_rate"] = float(miss.loc["__OVERALL__", "miss_rate"])

        rows.append(row)

    return pd.DataFrame(rows).sort_values(by=["total_mean", "total_p95"]).reset_index(drop=True)

def with_load(cfg: SimConfig, chi_new: float, seed: Optional[int] = None) -> SimConfig:
    return SimConfig(
        chi_arrival=chi_new,
        lambda_ctrl=cfg.lambda_ctrl,
        lambda_res=dict(cfg.lambda_res),
        setup_delay=cfg.setup_delay,
        T=cfg.T,
        N_ctrl=cfg.N_ctrl,
        transfer_overhead=dict(cfg.transfer_overhead),
        route_probs=dict(cfg.route_probs),
        ctrl_policy=cfg.ctrl_policy,
        group_priority=dict(cfg.group_priority),
        default_deadline_slots=dict(cfg.default_deadline_slots),
        seed=cfg.seed if seed is None else seed
    )


In [24]:
# === Utilization & deadline dashboards (re-add) ===
def utilization_report(cfg: SimConfig, df: pd.DataFrame) -> pd.DataFrame:
    """
    Rough utilization ρ estimates over horizon T:
      - Controller: sum(svc_ctrl) / (N_ctrl * T)
      - Resource s: sum(svc_res for routed_to==s) / T
    """
    if df.empty:
        return pd.DataFrame()

    out = []
    ctrl_busy = df["svc_ctrl"].dropna().sum()
    rho_ctrl = (ctrl_busy / cfg.N_ctrl) / cfg.T
    out.append({"unit": "CONTROLLER", "rho": rho_ctrl, "busy_time": ctrl_busy, "servers": cfg.N_ctrl})

    for rname in cfg.lambda_res.keys():
        busy = df.loc[df["routed_to"] == rname, "svc_res"].dropna().sum()
        rho = busy / cfg.T
        out.append({"unit": f"RES::{rname}", "rho": rho, "busy_time": busy, "servers": 1})

    rep = pd.DataFrame(out)
    rep["rho_clipped"] = rep["rho"].clip(upper=1.0)
    rep["warning"] = np.where(rep["rho"] >= 1.0, "UNSTABLE (ρ≥1)", "")
    return rep


def deadline_dashboard(df: pd.DataFrame) -> pd.DataFrame:
    """
    Per-group and overall deadline meeting stats.
    A request 'meets deadline' iff res_end <= deadline. NaN deadlines are ignored.
    """
    if "deadline" not in df.columns or df["deadline"].isna().all():
        return pd.DataFrame({"note": ["no deadlines present"]})

    m = df.dropna(subset=["deadline"]).copy()
    if m.empty:
        return pd.DataFrame({"note": ["no finite deadlines present"]})

    m["met"] = (m["res_end"] <= m["deadline"]).astype(int)
    g = m.groupby("group", dropna=False)["met"].agg(count="count", met="sum")
    g["misses"] = g["count"] - g["met"]
    g["miss_rate"] = g["misses"] / g["count"]

    overall = pd.DataFrame({
        "count": [g["count"].sum()],
        "met": [g["met"].sum()],
        "misses": [g["misses"].sum()],
        "miss_rate": [g["misses"].sum() / max(1, g["count"].sum())]
    }, index=["__OVERALL__"])

    return pd.concat([g, overall])


In [25]:
# # example usage
cfg = SimConfig(
    chi_arrival=8.0,
    # lambda_ctrl=10.0,
    # lambda_ctrl=6.0,
    lambda_ctrl=7.0,
    lambda_res={"PV":6.0, "BAT":12.0, "GRID":20.0},
    # setup_delay=0.02,
    setup_delay=0.05,
    T=240.0,
    # N_ctrl=2,
    N_ctrl=2,
    transfer_overhead={"PV":0.03, "BAT":0.02, "GRID":0.01},
    route_probs={"PV":0.35, "BAT":0.25, "GRID":0.40},
    ctrl_policy=PolicyType.FIFO,
)
group_mix = {"essential":0.2, "delay_sensitive":0.4, "delay_tolerant":0.4}

df = run_simulation(cfg, group_mix)
overall, per_group, miss = summarize_metrics(df)
rep = utilization_report(cfg, df)
dash = deadline_dashboard(df)

display(df.head(), overall, per_group, miss, rep, dash)

# try NPPS / EDF / WRR
table = run_policy_sweep(cfg, group_mix, [PolicyType.FIFO, PolicyType.NPPS, PolicyType.WRR, PolicyType.EDF])
display(table)

# stress
sweep_light = run_policy_sweep(with_load(cfg, 6.0), group_mix, [PolicyType.FIFO, PolicyType.NPPS, PolicyType.WRR, PolicyType.EDF])
sweep_heavy = run_policy_sweep(with_load(cfg, 12.0), group_mix, [PolicyType.FIFO, PolicyType.NPPS, PolicyType.WRR, PolicyType.EDF])
display(sweep_light, sweep_heavy)


Unnamed: 0,rid,group,priority,arrival,deadline,ctrl_start,ctrl_end,routed_to,route_done,res_start,res_end,wait_ctrl,svc_ctrl,transfer,wait_res,svc_res,total_response
0,0,delay_sensitive,2,0.300526,3.87632,0.300526,0.353951,GRID,0.363951,0.363951,0.420958,0.0,0.053424,0.01,0.0,0.057008,0.120432
1,1,essential,3,0.3355,,0.3355,0.455007,PV,0.485007,0.485007,0.582384,0.0,0.119506,0.03,0.0,0.097377,0.246883
2,2,delay_tolerant,1,0.517083,10.591662,0.517083,0.711306,PV,0.741306,0.741306,0.821118,0.0,0.194223,0.03,0.0,0.079812,0.304035
3,3,delay_sensitive,2,0.526995,2.639389,0.526995,0.738158,BAT,0.758158,0.758158,0.824794,0.0,0.211164,0.02,0.0,0.066636,0.2978
4,4,delay_tolerant,1,0.663123,11.067977,0.711306,0.82927,PV,0.85927,0.85927,0.928393,0.048183,0.117964,0.03,0.0,0.069123,0.26527


Unnamed: 0,count/valid,mean,median,p95
svc_ctrl,1938.0,0.190625,0.146467,0.473038
svc_res,1938.0,0.145558,0.105123,0.383167
total_response,1938.0,0.667422,0.540969,1.53764
transfer,1938.0,0.019185,0.02,0.03
wait_ctrl,1938.0,0.216612,0.080996,0.891066
wait_res,1938.0,0.095443,0.0,0.568512


Unnamed: 0_level_0,Unnamed: 1_level_0,count/valid,mean,median,p95
group,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
delay_sensitive,wait_ctrl,806.0,0.229397,0.089198,0.914565
delay_sensitive,svc_ctrl,806.0,0.190401,0.148373,0.479354
delay_sensitive,transfer,806.0,0.019132,0.02,0.03
delay_sensitive,wait_res,806.0,0.086341,0.0,0.50818
delay_sensitive,svc_res,806.0,0.145009,0.102272,0.394982
delay_sensitive,total_response,806.0,0.670279,0.544775,1.567524
delay_tolerant,wait_ctrl,748.0,0.210119,0.079057,0.881629
delay_tolerant,svc_ctrl,748.0,0.186698,0.142221,0.450087
delay_tolerant,transfer,748.0,0.019305,0.02,0.03
delay_tolerant,wait_res,748.0,0.100572,0.0,0.575437


Unnamed: 0,count,met,misses,miss_rate
delay_sensitive,806,786,20,0.024814
delay_tolerant,748,748,0,0.0
__OVERALL__,1554,1534,20,0.01287


Unnamed: 0,unit,rho,busy_time,servers,rho_clipped,warning
0,CONTROLLER,0.76965,369.432081,2,0.76965,
1,RES::PV,0.567352,136.164376,1,0.567352,
2,RES::BAT,0.276141,66.273762,1,0.276141,
3,RES::GRID,0.331887,79.652946,1,0.331887,


Unnamed: 0,count,met,misses,miss_rate
delay_sensitive,806,786,20,0.024814
delay_tolerant,748,748,0,0.0
__OVERALL__,1554,1534,20,0.01287


Unnamed: 0,policy,count,total_mean,total_p95,wait_ctrl_mean,wait_res_mean,svc_ctrl_mean,svc_res_mean,transfer_mean,deadline_miss_rate
0,NPPS,1938.0,0.667422,1.789528,0.216612,0.095443,0.190625,0.145558,0.019185,0.004505
1,FIFO,1938.0,0.667422,1.53764,0.216612,0.095443,0.190625,0.145558,0.019185,0.01287
2,EDF,1938.0,0.667422,1.692594,0.216612,0.095443,0.190625,0.145558,0.019185,0.002574
3,WRR,1938.0,0.667422,1.706696,0.216612,0.095443,0.190625,0.145558,0.019185,0.011583


Unnamed: 0,policy,count,total_mean,total_p95,wait_ctrl_mean,wait_res_mean,svc_ctrl_mean,svc_res_mean,transfer_mean,deadline_miss_rate
0,FIFO,1450.0,0.482512,1.039004,0.073042,0.055943,0.189084,0.145325,0.019117,0.003436
1,NPPS,1450.0,0.482512,1.072139,0.073042,0.055943,0.189084,0.145325,0.019117,0.003436
2,EDF,1450.0,0.482512,1.072418,0.073042,0.055943,0.189084,0.145325,0.019117,0.002577
3,WRR,1450.0,0.482512,1.075811,0.073042,0.055943,0.189084,0.145325,0.019117,0.004296


Unnamed: 0,policy,count,total_mean,total_p95,wait_ctrl_mean,wait_res_mean,svc_ctrl_mean,svc_res_mean,transfer_mean,deadline_miss_rate
0,EDF,2485.0,6.601965,8.383242,5.873411,0.36337,0.191853,0.15404,0.019292,0.037307
1,NPPS,2485.0,13.971645,76.399864,13.243091,0.36337,0.191853,0.15404,0.019292,0.392894
2,WRR,2485.0,14.208216,76.512315,13.479662,0.36337,0.191853,0.15404,0.019292,0.6069
3,FIFO,2485.0,18.376172,33.635775,17.647618,0.36337,0.191853,0.15404,0.019292,0.858652


In [26]:
# Next steps 

# Resource-side scheduling (carry priority/deadlines into PV/BAT/GRID): add res_policy (FIFO/NPPS/EDF/WRR) so differences show up in wait_res under PV stress.

# Outage scenarios: inject on/off events for sources; measure impact on delays & miss rate.

# (then) Hybrid selector and ML tasks (forecasting & clustering) atop the stable baseline.

In [27]:
# === Resource-side scheduling (policy-aware PV/BAT/GRID) ===
# Replaces ResourceServer and ResourcePool with policy-aware queues.
# Uses cfg.res_policy if present; defaults to FIFO. For WRR, uses cfg.res_wrr_weights or group_priority.

def _get_res_policy(cfg: SimConfig) -> PolicyType:
    return getattr(cfg, "res_policy", PolicyType.FIFO)

def _get_res_wrr_weights(cfg: SimConfig) -> Dict[str, float]:
    # default: weights proportional to group_priority
    return getattr(cfg, "res_wrr_weights", {g: float(p) for g, p in cfg.group_priority.items()})

class ResourceServer:
    """Single-server M/M/1 resource with pluggable queue policy (FIFO/NPPS/EDF/WRR)."""
    def __init__(self, name: str, rate: float, setup_delay: float,
                 policy: PolicyType, wrr_weights: Optional[Dict[str, float]] = None):
        self.name = name
        self.rate = rate
        self.setup_delay = setup_delay
        # build a queue with chosen policy
        if policy == PolicyType.WRR:
            if not wrr_weights:
                wrr_weights = {"essential": 3.0, "delay_sensitive": 2.0, "delay_tolerant": 1.0}
            self.queue = make_queue(PolicyType.WRR, wrr_weights=wrr_weights)
        else:
            self.queue = make_queue(policy)
        self.busy_until: Time = 0.0
        self.in_service: Optional[Request] = None

    def push(self, req: Request):
        self.queue.push(req)

    def try_start(self, now: Time, evq: List[Event]):
        if self.in_service is not None:
            return False
        if len(self.queue) > 0 and self.busy_until <= now:
            req = self.queue.pop(now)
            if req is None:
                return False
            req.t_res_start = now
            svc = rng.exponential(1.0 / self.rate) + self.setup_delay
            finish = now + svc
            self.busy_until = finish
            self.in_service = req
            heapq.heappush(evq, Event(finish, EventType.RES_FINISH, {"res": self.name, "req": req}))
            return True
        return False

    def on_finish(self, now: Time):
        req = self.in_service
        if req:
            req.t_res_end = now
        self.in_service = None
        self.busy_until = now

class ResourcePool:
    """Holds all resources and routes requests; now policy-aware per resource."""
    def __init__(self, cfg: SimConfig):
        self.cfg = cfg
        res_policy = _get_res_policy(cfg)
        wrr_weights = _get_res_wrr_weights(cfg) if res_policy == PolicyType.WRR else None
        self.resources: Dict[str, ResourceServer] = {
            rname: ResourceServer(
                rname,
                rate=lambda_rate,
                setup_delay=cfg.setup_delay,
                policy=res_policy,
                wrr_weights=wrr_weights
            )
            for rname, lambda_rate in cfg.lambda_res.items()
        }

    def route_choice(self) -> str:
        names = list(self.cfg.route_probs.keys())
        probs = np.array(list(self.cfg.route_probs.values()))
        return rng.choice(names, p=probs)

    def on_ctrl_finish(self, req: Request, now: Time, evq: List[Event]):
        rname = self.route_choice()
        req.routed_to = rname
        req.t_ctrl_end = now
        overhead = self.cfg.transfer_overhead.get(rname, 0.0)
        arrival_at_res = now + overhead
        req.t_route_done = arrival_at_res
        heapq.heappush(evq, Event(arrival_at_res, EventType.RES_ARRIVAL, {"res": rname, "req": req}))

    def on_res_arrival(self, rname: str, req: Request, now: Time, evq: List[Event]):
        res = self.resources[rname]
        res.push(req)
        res.try_start(now, evq)

    def on_res_finish(self, rname: str, now: Time, evq: List[Event]):
        res = self.resources[rname]
        res.on_finish(now)
        res.try_start(now, evq)


In [28]:
# keep your controller policy as you like, e.g. EDF to reduce misses:
cfg.ctrl_policy = PolicyType.EDF

# pick a resource policy (default was FIFO). try EDF to propagate deadlines:
cfg.res_policy = PolicyType.EDF
# or NPPS to keep group priorities through the resources:
# cfg.res_policy = PolicyType.NPPS
# or WRR with custom weights (optional):
# cfg.res_policy = PolicyType.WRR
# cfg.res_wrr_weights = {"essential": 3, "delay_sensitive": 2, "delay_tolerant": 1}

df = run_simulation(cfg, group_mix)
overall, per_group, miss = summarize_metrics(df)
rep = utilization_report(cfg, df)
dash = deadline_dashboard(df)
display(overall, miss, rep)


Unnamed: 0,count/valid,mean,median,p95
svc_ctrl,1938.0,0.190625,0.146467,0.473038
svc_res,1938.0,0.145558,0.105123,0.383167
total_response,1938.0,0.667422,0.47416,1.7003
transfer,1938.0,0.019185,0.02,0.03
wait_ctrl,1938.0,0.216612,0.043825,1.045168
wait_res,1938.0,0.095443,0.0,0.529995


Unnamed: 0,count,met,misses,miss_rate
delay_sensitive,806,803,3,0.003722
delay_tolerant,748,748,0,0.0
__OVERALL__,1554,1551,3,0.001931


Unnamed: 0,unit,rho,busy_time,servers,rho_clipped,warning
0,CONTROLLER,0.76965,369.432081,2,0.76965,
1,RES::PV,0.567352,136.164376,1,0.567352,
2,RES::BAT,0.276141,66.273762,1,0.276141,
3,RES::GRID,0.331887,79.652946,1,0.331887,


# 4.1

In [29]:
# === Outage events for resources (MTBF/MTTR or fixed blocks) ===

# extend event types
class EventType(Enum):
    ARRIVAL = auto()
    CTRL_FINISH = auto()
    RES_ARRIVAL = auto()
    RES_FINISH = auto()
    OUTAGE_DOWN = auto()
    OUTAGE_UP = auto()

@dataclass(order=True)
class Event:
    time: Time
    etype: EventType
    payload: dict = field(compare=False)

def _get_res_policy(cfg: SimConfig) -> PolicyType:
    return getattr(cfg, "res_policy", PolicyType.FIFO)

def _get_res_wrr_weights(cfg: SimConfig) -> Dict[str, float]:
    return getattr(cfg, "res_wrr_weights", {g: float(p) for g, p in cfg.group_priority.items()})

class ResourceServer:
    """
    Single-server M/M/1 resource with policy-aware queue + outage handling.
    If an outage occurs during service, we 'pause' and resume on UP.
    Uses an 'epoch' to invalidate stale RES_FINISH events.
    """
    def __init__(self, name: str, rate: float, setup_delay: float,
                 policy: PolicyType, wrr_weights: Optional[Dict[str, float]] = None):
        self.name = name
        self.rate = rate
        self.setup_delay = setup_delay
        # queue policy
        if policy == PolicyType.WRR:
            if not wrr_weights:
                wrr_weights = {"essential": 3.0, "delay_sensitive": 2.0, "delay_tolerant": 1.0}
            self.queue = make_queue(PolicyType.WRR, wrr_weights=wrr_weights)
        else:
            self.queue = make_queue(policy)
        # state
        self.available: bool = True
        self.busy_until: Time = 0.0
        self.in_service: Optional[Request] = None
        self.paused_remaining: Optional[float] = None
        self.epoch: int = 0  # increments on DOWN/UP to kill stale finish events

    def push(self, req: Request):
        self.queue.push(req)

    def try_start(self, now: Time, evq: List[Event]):
        if not self.available:
            return False
        if self.in_service is not None:
            return False
        if len(self.queue) == 0 or self.busy_until > now:
            return False
        # start fresh service
        req = self.queue.pop(now)
        if req is None:
            return False
        req.t_res_start = now
        svc = rng.exponential(1.0 / self.rate) + self.setup_delay
        finish = now + svc
        self.busy_until = finish
        self.in_service = req
        heapq.heappush(evq, Event(finish, EventType.RES_FINISH,
                                  {"res": self.name, "req": req, "epoch": self.epoch}))
        return True

    def on_finish(self, now: Time, ev: Event):
        # ignore stale finishes from a previous epoch (e.g., outage happened)
        if ev.payload.get("epoch", self.epoch) != self.epoch:
            return
        if not self.available:
            return  # should not happen for matching epoch, but guard anyway
        req = self.in_service
        if req:
            req.t_res_end = now
        self.in_service = None
        self.busy_until = now

    # --- outage handling ---
    def on_outage_down(self, now: Time):
        if not self.available:
            return
        self.available = False
        # invalidate in-flight finish events
        self.epoch += 1
        if self.in_service is not None:
            # compute remaining and pause
            remaining = max(0.0, self.busy_until - now)
            self.paused_remaining = remaining
            # mark server as free at 'now' but keep req in_service to resume
            self.busy_until = now

    def on_outage_up(self, now: Time, evq: List[Event]):
        if self.available:
            return
        self.available = True
        # new epoch for finishes after UP
        self.epoch += 1
        # resume paused service if any
        if self.in_service is not None and self.paused_remaining is not None:
            finish = now + self.paused_remaining
            self.busy_until = finish
            heapq.heappush(evq, Event(finish, EventType.RES_FINISH,
                                      {"res": self.name, "req": self.in_service, "epoch": self.epoch}))
            self.paused_remaining = None
        else:
            # try to start next job
            self.try_start(now, evq)

class ResourcePool:
    """Policy-aware resources with outage support + fixed routing."""
    def __init__(self, cfg: SimConfig):
        self.cfg = cfg
        res_policy = _get_res_policy(cfg)
        wrr_weights = _get_res_wrr_weights(cfg) if res_policy == PolicyType.WRR else None
        self.resources: Dict[str, ResourceServer] = {
            rname: ResourceServer(
                rname, rate=lambda_rate, setup_delay=cfg.setup_delay,
                policy=res_policy, wrr_weights=wrr_weights
            )
            for rname, lambda_rate in cfg.lambda_res.items()
        }

    def route_choice(self) -> str:
        names = list(self.cfg.route_probs.keys())
        probs = np.array(list(self.cfg.route_probs.values()))
        return rng.choice(names, p=probs)

    def on_ctrl_finish(self, req: Request, now: Time, evq: List[Event]):
        rname = self.route_choice()
        req.routed_to = rname
        req.t_ctrl_end = now
        overhead = self.cfg.transfer_overhead.get(rname, 0.0)
        arrival_at_res = now + overhead
        req.t_route_done = arrival_at_res
        heapq.heappush(evq, Event(arrival_at_res, EventType.RES_ARRIVAL, {"res": rname, "req": req}))

    def on_res_arrival(self, rname: str, req: Request, now: Time, evq: List[Event]):
        res = self.resources[rname]
        res.push(req)
        res.try_start(now, evq)

    def on_res_finish(self, rname: str, now: Time, evq: List[Event], ev: Event):
        res = self.resources[rname]
        res.on_finish(now, ev)
        res.try_start(now, evq)

    def on_outage_down(self, rname: str, now: Time):
        self.resources[rname].on_outage_down(now)

    def on_outage_up(self, rname: str, now: Time, evq: List[Event]):
        self.resources[rname].on_outage_up(now, evq)

# --- outage plan utilities ---
def schedule_outages(outage_plan: Optional[Dict[str, dict]], evq: List[Event], T: float, seed: int = DEFAULT_SEED):
    """
    outage_plan example:
    {
      "PV": {"mtbf": 120.0, "mttr": 10.0},               # exponential up/down in minutes
      "BAT": {"blocks": [{"start": 90.0, "dur": 15.0}]},  # fixed blocks
    }
    """
    if not outage_plan:
        return
    local_rng = np.random.default_rng(seed + 12345)
    for rname, spec in outage_plan.items():
        if "blocks" in spec:
            for b in spec["blocks"]:
                s = float(b["start"]); d = float(b["dur"])
                if s < T:
                    heapq.heappush(evq, Event(s, EventType.OUTAGE_DOWN, {"res": rname}))
                    if s + d < T:
                        heapq.heappush(evq, Event(s + d, EventType.OUTAGE_UP, {"res": rname}))
        else:
            # alternating UP (~Exp(mtbf)) and DOWN (~Exp(mttr)), starting in UP state
            mtbf = float(spec.get("mtbf", 0))
            mttr = float(spec.get("mttr", 0))
            if mtbf <= 0 or mttr <= 0:
                continue
            t = float(local_rng.exponential(mtbf))
            while t < T:
                heapq.heappush(evq, Event(t, EventType.OUTAGE_DOWN, {"res": rname}))
                t_up = t + float(local_rng.exponential(mttr))
                if t_up < T:
                    heapq.heappush(evq, Event(t_up, EventType.OUTAGE_UP, {"res": rname}))
                t = t_up + float(local_rng.exponential(mtbf))

# --- patch: run_simulation to accept outage_plan and pass events ---
def run_simulation(cfg: SimConfig, group_mix: Dict[str, float], outage_plan: Optional[Dict[str, dict]] = None) -> pd.DataFrame:
    cfg.validate()
    global rng
    rng = np.random.default_rng(cfg.seed)

    reqs = bootstrap_requests(cfg, group_mix)
    controller = MultiServerController(cfg)
    pool = ResourcePool(cfg)

    evq: List[Event] = []
    # arrivals
    for req in reqs:
        heapq.heappush(evq, Event(req.arrival_time, EventType.ARRIVAL, {"req": req}))
    # outages
    schedule_outages(outage_plan, evq, cfg.T, seed=cfg.seed)

    while evq:
        ev = heapq.heappop(evq)
        now = ev.time
        if now > cfg.T:
            break

        if ev.etype == EventType.ARRIVAL:
            controller.on_arrival(ev.payload["req"], now, evq)

        elif ev.etype == EventType.CTRL_FINISH:
            sid = ev.payload["sid"]
            req: Request = controller.on_finish(sid, now)
            pool.on_ctrl_finish(req, now, evq)
            controller.try_start_service(now, evq)

        elif ev.etype == EventType.RES_ARRIVAL:
            pool.on_res_arrival(ev.payload["res"], ev.payload["req"], now, evq)

        elif ev.etype == EventType.RES_FINISH:
            pool.on_res_finish(ev.payload["res"], now, evq, ev)

        elif ev.etype == EventType.OUTAGE_DOWN:
            pool.on_outage_down(ev.payload["res"], now)

        elif ev.etype == EventType.OUTAGE_UP:
            pool.on_outage_up(ev.payload["res"], now, evq)

    # assemble results (unchanged)
    finished: List[Request] = [r for r in reqs if r.t_res_end is not None]
    def safe(x): return None if x is None else float(x)
    rows = []
    for r in finished:
        wait_ctrl = (r.t_ctrl_start - r.arrival_time) if (r.t_ctrl_start is not None) else None
        svc_ctrl  = (r.t_ctrl_end - r.t_ctrl_start) if (r.t_ctrl_end is not None and r.t_ctrl_start is not None) else None
        transfer  = (r.t_route_done - r.t_ctrl_end) if (r.t_route_done is not None and r.t_ctrl_end is not None) else None
        wait_res  = (r.t_res_start - r.t_route_done) if (r.t_res_start is not None and r.t_route_done is not None) else None
        svc_res   = (r.t_res_end - r.t_res_start) if (r.t_res_end is not None and r.t_res_start is not None) else None
        total     = (r.t_res_end - r.arrival_time) if (r.t_res_end is not None) else None
        rows.append({
            "rid": r.rid, "group": r.group, "priority": r.priority,
            "arrival": safe(r.arrival_time), "deadline": safe(r.deadline_time),
            "ctrl_start": safe(r.t_ctrl_start), "ctrl_end": safe(r.t_ctrl_end),
            "routed_to": r.routed_to, "route_done": safe(r.t_route_done),
            "res_start": safe(r.t_res_start), "res_end": safe(r.t_res_end),
            "wait_ctrl": safe(wait_ctrl), "svc_ctrl": safe(svc_ctrl),
            "transfer": safe(transfer), "wait_res": safe(wait_res),
            "svc_res": safe(svc_res), "total_response": safe(total),
        })
    return pd.DataFrame(rows)


In [30]:
# no outages (baseline): df = run_simulation(cfg, group_mix)

# MTBF/MTTR style (minutes):
out_plan = {
    "PV":  {"mtbf": 120.0, "mttr": 10.0},
    "GRID":{"mtbf": 300.0, "mttr": 5.0},
    # fixed block outage:
    "BAT": {"blocks": [{"start": 90.0, "dur": 15.0}]}
}

df_out = run_simulation(cfg, group_mix, outage_plan=out_plan)
overall_out, pergrp_out, miss_out = summarize_metrics(df_out)
rep_out = utilization_report(cfg, df_out)
dash_out = deadline_dashboard(df_out)
display(overall_out, miss_out, rep_out)


Unnamed: 0,count/valid,mean,median,p95
svc_ctrl,1822.0,0.192702,0.150036,0.474089
svc_res,1822.0,0.174054,0.103755,0.376856
total_response,1822.0,2.972979,0.49902,27.181441
transfer,1822.0,0.01843,0.02,0.03
wait_ctrl,1822.0,0.204989,0.048707,0.963024
wait_res,1822.0,2.382805,0.0,26.007008


Unnamed: 0,count,met,misses,miss_rate
delay_sensitive,773,682,91,0.117723
delay_tolerant,715,665,50,0.06993
__OVERALL__,1488,1347,141,0.094758


Unnamed: 0,unit,rho,busy_time,servers,rho_clipped,warning
0,CONTROLLER,0.731463,351.102175,2,0.731463,
1,RES::PV,0.696548,167.17155,1,0.696548,
2,RES::BAT,0.257343,61.762411,1,0.257343,
3,RES::GRID,0.367466,88.191778,1,0.367466,


# 4.2