In [11]:
!pip -q install "stable-baselines3==2.2.1" gymnasium==0.29 pyarrow tqdm

import gymnasium as gym, numpy as np, pandas as pd, pyarrow.dataset as ds, pathlib, random, collections, math, tqdm
from stable_baselines3 import DQN
from cost_model import CostModel

In [12]:
DATA   = pathlib.Path("../data")
blocks = ds.dataset(DATA/"blocks.parquet").to_table().to_pandas()
wl_ds  = ds.dataset(DATA/"workloads_daily.parquet")

cm = CostModel("../provider_configs/qpu_demo.yml")
EXEC      = cm.exec_fee                         # {'Atom':0.01, ...}
TRANSFER  = cm.transfer_fee                    # {'Atom':0.01, ...}
LEASE     = {t: cm.lease_fee[t]*24 for t in cm.lease_fee}   # daily $

In [13]:
# jobs table: index=size, columns=day 0-179
jobs_tbl = (wl_ds.to_table(columns=["qpu_units","day","n_workloads"])
              .to_pandas()
              .groupby(["qpu_units","day"])["n_workloads"]
              .sum().unstack(fill_value=0))

lease_day = blocks.set_index("qpu_units").lease_day.to_dict()

# lifetime-average jobs per block size
tot_jobs = jobs_tbl.sum(axis=1)
active_days = 180 - pd.Series(lease_day)
avg_jobs = (tot_jobs / active_days).fillna(0)

def cheapest(avg):
    return "Atom" if avg>=900 else ("Photon" if avg>=176 else "Spin")

tag_map_B = {sz: cheapest(avg_jobs.get(sz,0)) for sz in blocks.qpu_units}

In [14]:
class QPUEnv(gym.Env):
    """
    Observation: [days_left, avg7_jobs/1e6, current_tag (0,1,2), jobs_today/1e6]
    Action:      0 keep, 1 -> Photon, 2 -> Spin   (Atom=0)
    Reward:      –(incremental cost)
    """
    def __init__(self):
        super().__init__()
        self.block_ids = blocks.qpu_units.values
        self.action_space      = gym.spaces.Discrete(3)
        self.observation_space = gym.spaces.Box(0, np.inf, shape=(4,), dtype=np.float32)

    def reset(self, seed=None, options=None):
        self.size   = int(random.choice(self.block_ids))
        self.day    = lease_day[self.size]
        self.tag    = 0                                  # start Atom
        self.jobs_today = int(jobs_tbl.at[self.size, self.day])
        self.avg7  = self.jobs_today
        self.total = 0.0
        return self._obs(), {}

    def _obs(self):
        return np.array([179-self.day,
                         self.avg7/1e6,
                         self.tag,
                         self.jobs_today/1e6], dtype=np.float32)

    def step(self, action):
        if action != self.tag:                           # pay transfer
            self.total += TRANSFER[{0:"Atom",1:"Photon",2:"Spin"}[action]]
            self.tag = action
        typ = {0:"Atom",1:"Photon",2:"Spin"}[self.tag]
        self.total += LEASE[typ] \
                    + EXEC[typ]*self.jobs_today \
                    + cm.trigger_fee*self.jobs_today
        reward = -self.total

        self.day += 1
        done = self.day == 180
        if done:
            return np.zeros(4, dtype=np.float32), reward, True, False, {}
        self.jobs_today = int(jobs_tbl.at[self.size, self.day])
        self.avg7 = (self.avg7*6 + self.jobs_today)/7
        return self._obs(), reward, False, False, {}

In [15]:
TRAIN_NEW = False          # ← set True to retrain

env = QPUEnv()
MODEL_PATH = pathlib.Path("models/qpu_dqn.zip")
MODEL_PATH.parent.mkdir(exist_ok=True, parents=True)

if TRAIN_NEW or not MODEL_PATH.exists():
    print("▶ training DQN …")
    model = DQN("MlpPolicy", env,
                learning_rate=1e-3,
                buffer_size=50_000,
                batch_size=256,
                gamma=0.99,
                exploration_fraction=0.2,
                verbose=0)
    model.learn(total_timesteps=50_000)
    model.save(MODEL_PATH)
    print("✔ model saved →", MODEL_PATH)
else:
    model = DQN.load(MODEL_PATH, env=env)
    print("✔ loaded existing model", MODEL_PATH)

✔ loaded existing model models/qpu_dqn.zip


In [None]:
def simulate(policy):
    costs = {"B":0.0, "DQN":0.0}
    tag = tag_map_B.copy()               # mutable copy

    rolling_q = collections.defaultdict(collections.deque)
    rolling_s = collections.defaultdict(int)

    for day in range(180):
        jobs_d = wl_ds.filter(ds.field("day")==day).to_table().to_pandas()

        # lease fees for all blocks active today
        active = blocks.qpu_units[blocks.lease_day<=day]
        vc_B   = pd.Series(tag).reindex(active).value_counts()
        costs["B"]  += sum(LEASE[t]*vc_B.get(t,0)  for t in vc_B.index)
        vc_DQN = vc_B                               # same tags obj
        costs["DQN"]+= sum(LEASE[t]*vc_DQN.get(t,0) for t in vc_DQN.index)

        for r in jobs_d.itertuples(index=False):
            sz, jobs = r.qpu_units, r.n_workloads

            # baseline cost
            typB  = tag[sz]
            costs["B"] += EXEC[typB]*jobs + cm.trigger_fee*jobs

            # rolling stats for policy
            rolling_q[sz].append(jobs)
            rolling_s[sz]+=jobs
            if len(rolling_q[sz])>7:
                rolling_s[sz]-=rolling_q[sz].popleft()
            avg7 = rolling_s[sz]/len(rolling_q[sz])

            # policy action
            obs = np.array([179-day, avg7/1e6, ["Atom","Photon","Spin"].index(tag[sz]), jobs/1e6], dtype=np.float32)
            act = policy.predict(obs, deterministic=True)[0]
            new_tag = ["Atom","Photon","Spin"][act]
            if new_tag != tag[sz]:
                costs["DQN"] += TRANSFER[new_tag]
                tag[sz] = new_tag
            costs["DQN"] += EXEC[new_tag]*jobs + cm.trigger_fee*jobs
    return costs["B"], costs["DQN"]

cost_B, cost_DQN = simulate(model)
print(f"Baseline B : ${cost_B:,.2f}")
print(f"DQN Agent  : ${cost_DQN:,.2f}")
print(f"→ Savings vs B : {(cost_B-cost_DQN)/cost_B*100:,.1f} %")