# section 1: Generating Simulation Dataset

In [None]:
import os,subprocess,time,random,logging
from pathlib import Path
from typing import List,Dict,Tuple
import json

import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support,accuracy_score,roc_auc_score
from scipy.stats import spearmanr
from sklearn.feature_extraction import DictVectorizer
import matplotlib.pyplot as plt
from collections import deque
from filelock import FileLock


In [30]:
#parameter
RND=42
np.random.seed(RND) 
random.seed(RND)

HORIZON=10
VW_IMAGE="vowpalwabbit/vw-rel-alpine:9.9.0"
WORKDIR=Path.cwd()
VW_TRAIN_FILE = WORKDIR/"contextual_bandit_adf.vw"
VW_MODEL_FILE=WORKDIR/"model.vw"
PENDING_LABELS=WORKDIR/"pending_labels.vw"
PENDING_LOCK = FileLock(str(WORKDIR/"vw_resume.lock"))
MIN_PROP=1e-3



In [3]:
def clean_interests_for_vw(s:str)->str:
    if pd.isna(s) or s=="":
        return "interests_none"
    return "".join(f"interests_{t.strip()}" for t in str(s).split(","))

def make_shared_line(row:pd.Series)->str:
    return f"shared |user age_group={row.age_group} device_type={row.device} location={row.location} {clean_interests_for_vw(row.interests)}"

def make_action_feat_line(row:pd.Series)->str:
    return f"|campaign campaign={row.campaign} type={row.campaign_type} media={row.media_format} audience={row.target_audience} category={row.category}"

def adf_example(shared:str,actions:List[str])->str:
    return "\n".join([shared]+actions+[""])

## Simulate Users, campaigns and per-action 10-day engagement 

In [4]:
N_IMP=800
N_CAMPAIGNS=4


age_groups =["18-24","25-34","35-44","45-54","55-64"]
devices =["mobile","tablet","laptop"]
locations =["US","EU","Asia"]
interest_pool =["tech","sports","fashion","food","finance"]
campaign_types =["discount","new-launch","awareness","loyalty"]
media_formats=["video","image","text"]
target_aud=["youth","adult","senior"]
categories =["electronics","clothing","food","finance","travel"]

rows=[]

for imp in range(N_IMP):
    user={
        "age_group":random.choice(age_groups),
        "device":random.choice(devices),
        "location":random.choice(locations),
        "interests":",".join(random.sample(interest_pool,k=random.randint(1,3)))
    }


    for cid in range(N_CAMPAIGNS):
        camp={
            "campaign":f"Campaign-{cid+1}",
            "campaign_type":random.choice(campaign_types),
            "media_format":random.choice(media_formats),
            "target_audience":random.choice(target_aud),
            "category":random.choice(categories)
        }
        
        daily_scores =[]

        for d in range(1,HORIZON+1):
            base=0.05
            if "tech" in user["interests"] and camp["category"] in ("electronics","finance"):
                base+=0.12
            if user["device"]=="mobile" and camp["media_format"]=="video":
                base+=0.06
            
            minutes = np.random.exponential(scale=3.0)
            minutes_score = min(minutes/20.0,1.0)
            clicks = np.random.binomial(1,min(base+0.02*minutes_score,0.9))
            revisit = np.random.binomial(1,0.05)
            daily_score=0.5*clicks + 0.4*minutes_score+0.1*revisit
            daily_score *=(0.95**(d-1)) #daily declay

            daily_scores.append(float(daily_score))

        reward_10d=float(sum(daily_scores))

        rows.append({
            "impression_id":f"imp_{imp}",
            "age_group":user["age_group"],
            "device":user["device"],
            "location":user["location"],
            "interests":user["interests"],
            "campaign":camp["campaign"],
            "campaign_type":camp["campaign_type"],
            "media_format":camp["media_format"],
            "target_audience":camp["target_audience"],
            "category":camp["category"],
            "daily_scores":daily_scores,
            "reward_10d":reward_10d
            
        })

df_pairs=pd.DataFrame(rows)

In [7]:
df_pairs.sample(6)

Unnamed: 0,impression_id,age_group,device,location,interests,campaign,campaign_type,media_format,target_audience,category,daily_scores,reward_10d
2714,imp_678,25-34,mobile,US,sports,Campaign-3,new-launch,text,senior,clothing,"[0.2604664662795057, 0.022526849010310428, 0.0...",0.473899
2763,imp_690,55-64,mobile,US,finance,Campaign-4,discount,text,adult,finance,"[0.05542124015514102, 0.005346758189419973, 0....",0.254231
1063,imp_265,18-24,laptop,Asia,tech,Campaign-4,loyalty,text,senior,travel,"[0.011100516750271213, 0.1677556856545594, 0.0...",0.612034
1267,imp_316,25-34,tablet,EU,"sports,food",Campaign-4,loyalty,image,adult,travel,"[0.034070264239137046, 0.0034042040144594515, ...",0.343691
2035,imp_508,18-24,laptop,Asia,"fashion,food",Campaign-4,loyalty,text,senior,electronics,"[0.14339728234688554, 0.05831366940615855, 0.0...",0.707973
1804,imp_451,25-34,tablet,US,"finance,sports",Campaign-1,awareness,text,adult,finance,"[0.08867316121202912, 0.30847939617362713, 0.0...",0.792876


## Create a logging policy ( eps greedy) and logged dataset

In [8]:
eps=0.16 #exploration probability for logging policy
rng=np.random.default_rng(RND)

logged=[]

for imp_id,grp in df_pairs.groupby("impression_id"):
    grp=grp.reset_index(drop=True)
   
    rewards=grp['reward_10d'].values.astype(float)
    best_idx = int(np.argmax(rewards))

    if rng.random()<eps:
        chosen_idx =int(rng.integers(0,len(rewards)))
    
    else:
        chosen_idx = best_idx
    
    if chosen_idx == best_idx:
        prob=(1.0-eps)+eps/len(rewards)

    else:
        prob = eps/len(rewards)
    
    chosen_row=grp.iloc[chosen_idx].to_dict()

    chosen_row["impression_id"]=imp_id
    chosen_row["logging_prob"]=float(prob)
    chosen_row['candidate_campaigns']=list(grp['campaign'].values)
    chosen_row["all_rewards"]=grp.set_index('campaign')['reward_10d'].to_dict()
    chosen_row['best_idx']=best_idx
    chosen_row['chosen_idx']=chosen_idx
    logged.append(chosen_row)

df_logged = pd.DataFrame(logged)



In [9]:
df_logged[["impression_id","campaign","logging_prob","chosen_idx","best_idx"]].sample(6)

Unnamed: 0,impression_id,campaign,logging_prob,chosen_idx,best_idx
722,imp_749,Campaign-2,0.88,1,1
405,imp_463,Campaign-1,0.88,0,0
47,imp_140,Campaign-1,0.04,0,3
270,imp_341,Campaign-2,0.88,1,1
271,imp_342,Campaign-1,0.88,0,0
667,imp_7,Campaign-1,0.88,0,0


## Surrogate Training dataset: (User,Campaign) pairs as rows - Train RandomForestRegressor

In [10]:
def make_feature_dict(row):
    d= {
        "age_group":row["age_group"],
        "device":row["device"],
        "location":row["location"],
        "campaign":row["campaign"],
        "campaign_type":row["campaign_type"],
        "media_format":row["media_format"],
        "target_audience":row["target_audience"],
        "category":row["category"]
    }
    for t in str(row["interests"]).split(","):
        d[f"interests_{t.strip()}"]=1
    
    return d

In [11]:
q50,q80 = df_pairs['reward_10d'].quantile([0.5,0.8]).values

def tier_label(x):
    if x<=q50:
        return "low"
    elif x<=q80:
        return "med"
    else: 
        return "high"

df_pairs["tier"]=df_pairs["reward_10d"].apply(tier_label)

dicts = [make_feature_dict(r) for _,r in df_pairs.iterrows()]

dv=DictVectorizer(sparse=False)
X=dv.fit_transform(dicts)
y_class=df_pairs['tier'].values
X_train,X_val,y_train,y_val = train_test_split(X,y_class,test_size=0.2,random_state=RND,stratify=y_class)
clf=RandomForestClassifier(n_estimators=200,random_state=RND,n_jobs=-1)
clf.fit(X_train,y_train)

0,1,2
,n_estimators,200
,criterion,'gini'
,max_depth,
,min_samples_split,2
,min_samples_leaf,1
,min_weight_fraction_leaf,0.0
,max_features,'sqrt'
,max_leaf_nodes,
,min_impurity_decrease,0.0
,bootstrap,True


In [13]:
y_val_pred=clf.predict(X_val)
acc=accuracy_score(y_val,y_val_pred)

print("Classifier accuracy(val):",acc)
print("Per-class metrics (precision,recall,fscore):")
print(precision_recall_fscore_support(y_val,y_val_pred,labels=clf.classes_))

Classifier accuracy(val): 0.509375
Per-class metrics (precision,recall,fscore):
(array([0.48571429, 0.56210526, 0.26315789]), array([0.265625  , 0.834375  , 0.13020833]), array([0.34343434, 0.67169811, 0.17421603]), array([128, 320, 192]))


In [14]:
tier_means=df_pairs.groupby("tier")['reward_10d'].mean().to_dict()
classes=list(clf.classes_)

print("Tier means:",tier_means,"Classifier classes order",classes)

probs=clf.predict_proba(X)


tier_mean_array=np.array([tier_means[c] for c in classes])

expected_reward =(probs*tier_mean_array.reshape(1,-1)).sum(axis=1) #continuous expected reward

df_pairs['pred_expected_reward']=expected_reward
scaler_expected=MinMaxScaler()
df_pairs['pred_expected_norm']=scaler_expected.fit_transform(df_pairs[['pred_expected_reward']])
scaler_true=MinMaxScaler()
df_pairs['reward_norm']=scaler_true.fit_transform(df_pairs[['reward_10d']])


rho,pval=spearmanr(df_pairs['pred_expected_reward'],df_pairs['reward_10d'])

print(f"Spearman rank correlation (pre_expected vs true):{rho:.4f} (p={pval:.3g})")


surrogate_clf=clf
surrogate_dv=dv
surrogate_tier_means=tier_means
surrogate_classes=classes
surrogate_expected_scaler=scaler_expected
true_reward_scaler=scaler_true

Tier means: {'high': 1.5116901315424458, 'low': 0.5058367003431165, 'med': 0.9307193012587082} Classifier classes order ['high', 'low', 'med']
Spearman rank correlation (pre_expected vs true):0.7895 (p=0)


In [15]:
def short_proxy_from_daily(daily_scores:List[float],k=2)->float:
    return float(np.mean(daily_scores[:k]))

def alpha_for_day(day:int,alpha_start=0.9,alpha_min=0.1,decary_every=7,decay_step=0.05):
    steps=day // decary_every

    return max(alpha_min,alpha_start-steps*decay_step)

df_pairs['short_proxy']=df_pairs['daily_scores'].apply(lambda x:short_proxy_from_daily(x,k=2))
current_day=0
alpha0=alpha_for_day(current_day)

df_pairs['composite_pred']=df_pairs.apply(
    lambda r:alpha0* r['short_proxy'] +(1.0-alpha0)*r['pred_expected_norm'],
    axis=1
)

print("Composite_pred stats:",df_pairs['composite_pred'].describe().to_dict())

Composite_pred stats: {'count': 3200.0, 'mean': 0.1246259441356468, 'std': 0.10868927781024668, 'min': 0.008022047599311168, '25%': 0.0526768330612524, '50%': 0.08691627873208185, '75%': 0.14433178340300326, 'max': 0.6197879167074504}


In [16]:
lines =[]
for imp_id,grp in df_pairs.groupby("impression_id"):
    shared_row = grp.iloc[0]
    shared= make_shared_line(shared_row)
    logged_row=df_logged[df_logged['impression_id']==imp_id].iloc[0]

    actions=[]

    for _,act in grp.reset_index(drop=True).iterrows():
        feat= make_action_feat_line(act)
        if act['campaign']==logged_row['campaign']:
            comp=float(act['composite_pred'])
            cost=0.0-comp
            prob=float(logged_row['logging_prob'])
            label=f"0:{cost:.6f}:{prob:.6f}"
            actions.append(f"{label} {feat}")
        else:
            actions.append(feat)
            lines.append(shared)
            lines.extend(actions)
            lines.append("")
    
VW_TRAIN_FILE.write_text("\n".join(lines))
print("Wrote VW ADF training file:",VW_TRAIN_FILE,"examples:",len(lines)//(N_CAMPAIGNS+2))


Wrote VW ADF training file: /home/pritam/contextual-bandit/contextual_bandit_adf.vw examples: 1804


In [17]:
def run_vw(args:list):
    cmd=["vw"]+args
    print("Running:"," ".join(cmd))
    subprocess.run(cmd,check=True)

run_vw(["--cb_explore_adf","-d",str(VW_TRAIN_FILE),"-f",f"{VW_MODEL_FILE}.cover","--cover","8","--quiet"])

print("Training Complete. Model saved as:",VW_MODEL_FILE.name+".cover")

Running: vw --cb_explore_adf -d /home/pritam/contextual-bandit/contextual_bandit_adf.vw -f /home/pritam/contextual-bandit/model.vw.cover --cover 8 --quiet
Training Complete. Model saved as: model.vw.cover


In [19]:
def predict_with_model(model_path:str,shared_line:str,campaign_lines:List[str])->Tuple[int,float,str]:
    input_file=WORKDIR/"deploy_input.vw"
    out_file =WORKDIR/"deploy_pred.txt"

    with open(input_file,"w") as f:
        f.write(shared_line+"\n")
        for c in campaign_lines:
            f.write(c+"\n")
    
    cmd=[
        "vw","-i",str(model_path),"-t","--cb_explore_adf","-d",str(input_file),"-p",str(out_file),"--quiet"
    ]

    print("Running:"," ".join(cmd))
    subprocess.run(cmd,check=True)

    with open(out_file) as f:
        line = f.readline().strip()

    
    best_idx,best_prob = None,-1.0
    for token in line.split(","):
        if ":" in token:
            a, p = token.split(":")
            p=float(p)
            a=int(a)

            if p>best_prob:
                best_idx,best_prob=a,p
        else:
            best_idx,best_prob=int(token),1.0
    
    return best_idx,best_prob,line


shared_example="shared |user age_group=25-34 device_type=mobile location=US "+clean_interests_for_vw("tech,food")
campaign_lines =[
    "|campaign campaign=Campaign-1 type=awareness media=video audience=senior category=travel","|campaign campaign=Campaign-2 type=discount media=image audience=youth category=electronics","|campaign campaign=Campaign-3 type=new-launch media=text audience=adult category=clothing","|campaign campaign=Campaign-4 type=loyalty media=image audience=adult category=finance"
]

model_for_pred=f"{VW_MODEL_FILE}.cover"

idx,prob,raw=predict_with_model(model_for_pred,shared_example,campaign_lines)

print("Chosen Idx (0 based):",idx," Prob:",prob)
print("chosen campaign:",campaign_lines[idx])


Running: vw -i /home/pritam/contextual-bandit/model.vw.cover -t --cb_explore_adf -d /home/pritam/contextual-bandit/deploy_input.vw -p /home/pritam/contextual-bandit/deploy_pred.txt --quiet
Chosen Idx (0 based): 2  Prob: 0.375
chosen campaign: |campaign campaign=Campaign-3 type=new-launch media=text audience=adult category=clothing


In [20]:
def read_vw_pred_pmf(pred_file:Path)->List[List[Tuple[int,float]]]:
   #each line: tokens "a:p ..."-> return list of (a,p) tuples per impression
   out=[]
   with open(pred_file,'r') as f:
      for i,ln in enumerate(f,start=1):
         ln=ln.strip()
         if not ln:
            # print(f"[Line {i}] Empty line -skipping")
            continue

         # print(f"[Line {i}] Raw:{ln}")
         parts=ln.split(",")
         row=[]
         for part in parts:
            subparts=part.split(":")
            if len(subparts)!=2:
               print(f"[Line {i}] Unexpected format: '{part} - split into {subparts}")
               continue

            try:
               action,prob=subparts
               row.append((int(action),float(prob)))
            except ValueError as e:
               print(f"[Line {i}] ValueError on part '{part}:{e}")
            
         out.append(row)

        
    
   return out

In [21]:
pred_out ="pred_cover_on_train.txt"

vw_cmd=[
   "vw",
   "-t",
   "-i"
   f"{VW_MODEL_FILE}.cover",
   "--cb_explore_adf",
   "-d",VW_TRAIN_FILE.name,
   "-p",pred_out,
   "--quiet"
]

subprocess.run(vw_cmd, check=True)

pmfs=read_vw_pred_pmf(Path(pred_out))

# pmfs

imp_to_order={}
imp_to_surrogate_expected ={}
for imp_id,grp in df_pairs.groupby("impression_id"):
   imp_to_order[imp_id]=list(grp['campaign'].values)
   imp_to_surrogate_expected[imp_id]=list(grp['pred_expected_norm'].values)

ips_vals=[]
dr_vals=[]

for i,row in df_logged.reset_index(drop=True).iterrows():
    imp_id=row['impression_id']
    logged_action = row['campaign']
    logged_prob=row['logging_prob']
    true_reward=row['all_rewards'][logged_action]
    pmf=pmfs[i]


    pi=None

    for a,p in pmf:
        camp=imp_to_order[imp_id][a]

        if camp==logged_action:
            pi=p
            break
    
    if pi==None:
      pi=MIN_PROP
    
    ips_vals.append((pi/logged_prob)*true_reward)

    idx_in_order = imp_to_order[imp_id].index(logged_action)

    q_hat=imp_to_surrogate_expected[imp_id][idx_in_order]

    dr_vals.append(q_hat+(pi/logged_prob)*(true_reward-q_hat))

ips_est=np.mean(ips_vals)
dr_est=np.mean(dr_vals)

print("IPS estimate (avg reward):",ips_est)
print("Doubly Robust estimate:",dr_est)

IPS estimate (avg reward): 0.9638392375653324
Doubly Robust estimate: 1.0971705870241504


In [22]:
SURROGATE_RETRAIN_BATCH =500
VW_QUIET =True
ETA_ALPHA=1e-2
ETA_W=1e-2
W_MAX=10.0

## Online Learning

In [23]:
class OnlineShortProxyUpdater:
    def __init__(self,alpha_init=0.9,w_init=None,eta_alpha=ETA_ALPHA,eta_w=ETA_W,W_MAX=W_MAX):
        self.alpha=float(np.clip(alpha_init,0.0,1.0))
        self.w=np.array(w_init if w_init is not None else [],dtype=float)
        self.eta_alpha=float(eta_alpha)
        self.eta_w=float(eta_w)
        self.W_MAX=float(W_MAX)

    
    def predict_short_proxy(self,c_vec):
        c_vec=np.asarray(c_vec,dtype=float)
        if self.w.size == 0:
            return float(np.mean(c_vec)) if c_vec.size else 0.0
        return float(np.dot(self.w,c_vec))
    
    def composite(self,c_vec,R_hat_norm):
        short_proxy = self.predict_short_proxy(c_vec)
        return float(self.alpha)*short_proxy + (1.0 - float(self.alpha))*float(R_hat_norm)
    
    def update(self,c_vec,R_hat_norm,r_obs_norm):

        c_vec = np.asarray(c_vec,dtype=float)
        if self.w.size ==0:
            self.w=np.zeros_like(c_vec)
        
        short_proxy = float(np.dot(self.w,c_vec))
        C_pred = self.alpha*short_proxy +(1.0 - self.alpha)*float(R_hat_norm)
        eps=C_pred - float(r_obs_norm)
        grad_alpha =2.0*eps*( -float(R_hat_norm))
        self.alpha=float(np.clip(self.alpha -self.eta_alpha*grad_alpha,0.0,1.0))

        grad_w =2.0*eps*self.alpha*c_vec
        self.w=np.clip(self.w-self.eta_w*grad_w,-self.W_MAX,self.W_MAX)

        return {"alpha":self.alpha,"w":self.w.copy(),"error":float(eps),"C_pred":float(C_pred)}      

In [24]:
    
current_day=0
alpha_prior = alpha_for_day(current_day,alpha_start=0.9,alpha_min=0.1,decary_every=7,decay_step=0.05)
ONLINE_UPDATER = OnlineShortProxyUpdater(alpha_init=alpha_prior,w_init=[0.4,0.4,0.2])


DEPLOY_LOG ={}
SURROGATE_BUFFER = deque()

def vw_predict_pmf(model_path:Path,shared_line:str,campaign_lines:list):
    inp=WORKDIR/"deploy_input.vw"
    out =WORKDIR/"deploy_pred.txt"
    with open(inp,"w") as f:
        f.write(shared_line+"\n")
        for c in campaign_lines:
            f.write(c+"\n")
    
    cmd=[
        "vw","-i",str(model_path),"-t","--cb_explore_adf","-d",str(inp),"-p",str(out),"--quiet"
    ]

    subprocess.run(cmd,check=True)

    content =Path(out).read_text().splitlines()
    if not content:
        return []
    return read_vw_pred_pmf(content[0])

In [25]:
logging.basicConfig(level=logging.INFO)
logger=logging.getLogger("online-loop")

In [None]:
def decide_and_log(impression_id:str,user_row:pd.Series,candidate_rows:pd.DataFrame,short_proxy_component:list,chosen_policy:str="sample"):
    shared_line =make_shared_line(user_row)
    campaign_lines=[make_action_feat_line(r) for _,r in candidate_rows.iterrows()]

    try:
        dicts=[make_feature_dict(r) for _,r in candidate_rows.iterrows()]

        X=surrogate_dv.transform(dicts)
        probs=surrogate_clf.predict_proba(X)
        tier_mean_arr=np.array([surrogate_tier_means[c] for c in surrogate_clf.classes_]) 
        pred_expected =(probs*tier_mean_arr.reshape(1,-1)).sum(axis=1)
        R_hat_norms=surrogate_expected_scaler.transform(pred_expected.reshape(-1,1)).ravel()
    except Exception as e:
        logger.warning("Surrogate on-the-fly failed; falling back to zeros %s",e)

        R_hat_norms=np.zeros(len(campaign_lines),dtype=float)
    
    pmf = vw_predict_pmf(VW_MODEL_FILE,shared_line,campaign_lines)
    if not pmf:
        pmf =[(i+1,1.0/len(campaign_lines)) for i in range(len(campaign_lines))]
    
    s=sum(p for _,p in pmf)
    pmf = [(a,(p/s) if s>0 else 1.0/len(pmf)) for a,p in pmf]

    if chosen_policy == "argmax":
        chosen = max(pmf,key=lambda t:t[1])[0]
    
    else:
        actions,probs = zip(*pmf)
        probs=np.array(probs,dtype=float)

        if probs.sum <=0:
            probs = np.ones_like(probs)/len(probs)
        
        else:
            probs = probs/probs.sum()
        
        chosen =int(np.random.choice(actions,p=probs))

    
    c_vec = np.asarray(short_proxy_component,dtype=float)
    short_proxy_scaler = ONLINE_UPDATER.predict_short_proxy(c_vec)
    R_hat_norms_for_chosen = float(R_hat_norms[chosen])
    composite_pred=ONLINE_UPDATER.composite(c_vec,R_hat_norms_for_chosen)


    # record in DEPLOY_LOG (persists in Redis in prod)

    p_log = float(dict(pmf).get(chosen,MIN_PROP))

    DEPLOY_LOG[impression_id]={
        "shared_line":shared_line,
        "campaign_lines":campaign_lines,
        "chosen_idx":chosen,
        "chosen_campaign":candidate_rows.iloc[chosen]['campaign'],
        'pmf':pmf,
        "p_log":max(p_log,MIN_PROP),
        "c_vec":c_vec.tolist(),
        "R_hat_norms":[float(x) for x in R_hat_norms],
        "composite_pred":float(composite_pred),
        "short_proxy_scaler":float(short_proxy_scaler),
        "timestamp":time.time()
    }

    return {
        "impression_id":impression_id,
        "chosen_idx":chosen,
        "chosen_campaign":DEPLOY_LOG[impression_id],
        "pmf":pmf,
        "composite_pred":composite_pred,
        "alpha":ONLINE_UPDATER.alpha,
        "w":ONLINE_UPDATER.w.copy()
    }


In [35]:
def retrain_surrogate_from_buffer():
    global surrogate_clf,surrogate_dv,surrogate_tier_means,surrogate_expected_scaler,df_pairs
    new_rows=[]

    while SURROGATE_BUFFER:
        imp_id,idx0,r_true=SURROGATE_BUFFER.popleft()

        grp=df_pairs[df_pairs['impression_id']==imp_id].reset_index(drop=True)

        if grp.empty:
            continue
        row=grp.iloc[idx0]

    
    new_rows.append((make_feature_dict(row),r_true))

    if not new_rows:
        logger.info("Surrogate retrain called but no new rows found")
        return
    
    existing_dicts =[make_feature_dict(r) for _,r in df_pairs.iterrows()]
    existing_rewards = list(df_pairs['reward_10d'].values)

    augmented_dicts = existing_dicts + [d for d,_ in new_rows]
    augmented_rewards = np.array(existing_rewards+[r for _,r in new_rows],dtype=float)

    q50,q80=np.quantile(augmented_rewards,[0.5,0.8])

    augmented_tiers = [tier_label(rv) for rv in augmented_rewards]

    dv2=DictVectorizer(sparse=False)
    X_aug = dv2.fit_transform(augmented_dicts)
    clf2=RandomForestClassifier(n_estimators=200,random_state=RND,n_jobs=-1)
    clf2.fit(X_aug,augmented_tiers)


    df_aug = pd.DataFrame({"tier":augmented_tiers,"reward":augmented_rewards})

    tier_means_new = df_aug.groupby('tier')['reward'].mean().to_dict()

    surrogate_clf=clf2
    surrogate_dv=dv2
    surrogate_tier_means=tier_means_new


    X_all = dv2.transform([make_feature_dict(r) for _,r in df_pairs.iterrows()])
    probs_all = clf2.predict_proba(X_all)
    tier_mean_arr=np.array([tier_means_new[c] for c in clf2.classes_])
    df_pairs['pred_expected_reward']=(probs_all*tier_mean_arr.reshape(1,-1)).sum(axis=1)
    df_pairs['pred_expected_norm']=surrogate_expected_scaler.fit_transform(df_pairs[['pred_expected_reward']])
    df_pairs['reward_norm']=true_reward_scaler.fit_transform(df_pairs[['reward_10d']])

    logger.info("[Surrogate] Retrained on %d total examples (incl. %d new).",len(augmented_rewards),len(new_rows))

        

    

In [37]:
def observe_and_update(impression_id:str,r_true_raw:float,resume_vw_immediately:bool=True):

    if impression_id not in DEPLOY_LOG:
        raise KeyError(f"Unknown impression_id {impression_id}")
    
    info = DEPLOY_LOG.pop(impression_id)

    r_true_norm = float(true_reward_scaler.transform([[float(r_true_raw)]])[0,0])

    cost=-r_true_norm

    labeled_actions =[]

    for i,line in enumerate(info['campaign_lines']):
        if i == info['chosen_idx']:
            labeled_actions.append(f"0:{cost:.6}:{info['p_log']:.6f} {line}")
        else:
            labeled_actions.append(line)
    
    with open(PENDING_LABELS,"a") as f:
        f.write(adf_example(info['shared_line'],labeled_actions))
    

    c_vec=np.asarray(info["c_vec"],dtype=float)
    R_hat_norm_chosen=float(info['R_hat_norms'][info['chosen_idx']])
    upd = ONLINE_UPDATER.update(c_vec,R_hat_norm_chosen,r_true_norm)
    logger.info("Updated alpga/w: alpha:%.4f err=%.4f w=%s",upd['alpha'],upd['error'],upd['w'].tolist())


    if resume_vw_immediately:
        with PENDING_LOCK:
            if PENDING_LABELS.exists() and PENDING_LABELS.stat().st_size >0:
                cmd=["vw","--cb_explore_adf","-i",str(VW_MODEL_FILE),"-d",str(PENDING_LABELS),"-f",str(VW_MODEL_FILE),"--resume","--quiet"]
                subprocess.run(cmd,check=True)

                archived = WORKDIR/f"archived_labels_{int(time.time())}.vw"

                PENDING_LABELS.rename(archived)


    SURROGATE_BUFFER.append((impression_id,info['chosen_idx'],float(r_true_raw)))

    if len(SURROGATE_BUFFER) >= SURROGATE_RETRAIN_BATCH:
        retrain_surrogate_from_buffer()
    
    return {"impression_id":impression_id,"r_true_raw":float(r_true_raw),"alpha":ONLINE_UPDATER.alpha}