<a href="https://colab.research.google.com/github/jmhuer/utaustin_optimization/blob/main/homework12/LinUCB_assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LinUCB

In this exercise, we will start looking at linear bandit with finite arms.

The set-up is captured by the following python class:

In [101]:
import numpy as np
from matplotlib import pylab as plt
from tqdm import tqdm

# (4 choose 2)
available_arms = np.array([
  (1, 1, 0, 0),
  (1, 0, 1, 0),
  (1, 0, 0, 1),
  (0, 1, 1, 0),
  (0, 1, 0, 1),
  (0, 0, 1, 1)])

class Context_arm(object):
  def __init__(self, available_arms=available_arms,gaussian_bandit=True):
    self.available_arms = np.array(available_arms)
    self.gaussian_bandit = gaussian_bandit
    self.num_arms = len(self.available_arms)
    self.theta = np.array((0.1, 0.2, 0.2, 0.3)) #this is what we want to learn
    self.num_features = len(self.available_arms[0])

    # keep track of the rewards for the user
    self.rewards_history = []
    # keep track of how many times the arms have been pulled
    self.total_pull = 0 
    
  def pull_arm(self, arm_idx):
    arm = self.available_arms[arm_idx]
    if self.gaussian_bandit:
      reward = self.theta.dot(arm) + np.random.randn() / 2 
    else:  # Bernoulli bandit
      reward = 1 if np.random.random()< self.theta.dot(arm) else 0
    
    self.total_pull+=1
    self.rewards_history.append(reward)
    return reward
  
  def genie_reward(self, arms=None):
    if not arms:
      arms = self.available_arms
    if self.gaussian_bandit:
      reward = np.max([self.theta.dot(arm) for arm in arms]) 
    else:  # Bernoulli bandit
      reward = np.max([self.theta.dot(arm) for arm in arms])
    return reward * self.total_pull

  def my_rewards(self):
    return sum(self.rewards_history)

  def clear_reward_hist(self):
    self.rewards_history = []
    self.total_pull = 0

In [102]:
#utils 

class Empirical_mean:
    def __init__(self,num_arms):
        self.rewards = np.zeros(num_arms)
        self.count = np.zeros(num_arms)
        self.mean =  np.zeros(num_arms)
    def add_new_reward(self, reward, indx):
        self.rewards[indx] += reward
        self.count[indx] += 1
        self.mean[indx] = self.rewards[indx] / self.count[indx]
    def mean(self, indx):
        return self.mean[indx]
    def reset(self):
        self.rewards = self.rewards * 0 
        self.count = self.rewards * 0 
        self.mean =  self.rewards * 0 


## Goal of this exercise
1. Basic LinUCB algorithm implementation, for both Gaussian Reward and Bernoulli rewards
2. Plot the regret VS horizon ($n$).
3. Compare LinUCB with original UCB (for Gaussian rewards only) and KL-UCB (for Bernoulli rewards only)



# 1. Basic LinUCB algorithm implementation, for both Gaussian Reward and Bernoulli rewards

## LinUBC implemented below

In [103]:



def LinUCB(arm, N): 
  ##pre alg calculations
  delta = 1/N**2 #desiree confidence
  alpha = 1 + np.sqrt(np.log(2/delta) / 2)
  bonus = lambda x, A, a, t: 2 * np.sqrt(x.T @ (np.linalg.inv(A[a]) @ x)) 

  #I will initialize A, b before main loop for simplicity, yahoo paper does it when new actions show up
  A = [np.identity(arm.num_features) for i in range(arm.num_arms)]
  b = [np.zeros(arm.num_features) for i in range(arm.num_arms)]

  # MAIN LOOOP
  for t in range(N):
      thetas = [np.linalg.inv(Aa) @ ba for (Aa, ba) in zip(A, b)] #disjoint implementation meaning thetas are not shared accross arms 
      confidence_intervals = np.array([(theta.T @ xt) + bonus(xt, A, a, t) for (xt, a, theta) in zip(arm.available_arms, range(arm.num_arms), thetas)])
      UCBbest_arms = np.argmax(confidence_intervals)
      rt = arm.pull_arm(UCBbest_arms)
      A[UCBbest_arms] += np.outer(arm.available_arms[UCBbest_arms], arm.available_arms[UCBbest_arms].T) 
      b[UCBbest_arms] += rt * arm.available_arms[UCBbest_arms]

  # DONE
  return arm.my_rewards()





#

## UCB implemented below

In [104]:

def UCB(arm, N):
  bonus = lambda delta, t: np.sqrt((2*np.log(1/delta))/t) if t > 0 else float('inf')
  delta = 1/N**2 
  mean_vals = Empirical_mean(arm.num_arms)
  for i in range(N):
      UCBbest_arm = np.argmax([u + bonus(delta, t) for (u,t) in zip(mean_vals.mean, mean_vals.count)])
      mean_vals.add_new_reward(arm.pull_arm(UCBbest_arm), UCBbest_arm)
  return arm.my_rewards()


## KL-UBC implemented below

In [105]:
  #(relative entropy or KL divergence defined below)
  def klBern(x, y):
    eps = 1e-15 
    x = min(max(x, eps), 1 - eps)
    y = min(max(y, eps), 1 - eps)
    return x*np.log(x/y) + (1-x)*np.log((1-x) / (1-y))

def dkl_bernoulli(p, q):
    eps = 1e-15 
    p = min(max(p, eps), 1 - eps)
    q = min(max(q, eps), 1 - eps)
    result = (q-p)/(q*(1.0-q))
    return result

def max_newton(kl_distance, empiral_mean, k, t,dkl, precision = 1e-6, max_iterations = 50):
    Nk = empiral_mean.count[k]
    Sk = empiral_mean.rewards[k]
    delta = 0.1
    logtdt = np.log(t)/Nk
    p = max(Sk/Nk, delta)
    if p>=1: return 1
    q = p + delta
    for n in range(max_iterations):
        f  = logtdt - kl_distance(p, q)
        df = - dkl(p, q)
        if f*f < precision: break
    q = min(1 - delta , max(q - f / df, p + delta))
    return q

def KLUCB(arm, N):
    max_u = lambda k,t: max_newton(klBern,mean_vals, k, t, dkl_bernoulli) if t > 0 else float('inf')
    mean_vals = Empirical_mean(arm.num_arms)
    for i in range(N):
        UCBbest_arm = np.argmax([max_u(k,i) for k in range(arm.num_arms)])
        mean_vals.add_new_reward(arm.pull_arm(UCBbest_arm), UCBbest_arm)
    return arm.my_rewards()

## Utils for plotting 

In [106]:

def regret_vs_horizon(arm, Ns:list, REPEAT:int, algorithm: type(lambda x: None)):
  regret = []
  my_arm = arm
  for NUM_RUNs in Ns:
    print(NUM_RUNs)
    cur_regret = 0
    for repeat in range(REPEAT):
        rewards = algorithm(my_arm, NUM_RUNs) ## everyrun NUM_RUNs += 10000
        cur_regret += my_arm.genie_reward() - rewards
        my_arm.clear_reward_hist()
    cur_regret /= REPEAT
    regret.append(cur_regret)
  return regret


import plotly.graph_objects as graph
def plot(all_history:list, title:str, log = False):
    """
    input:
        all_history: list of dicts to plot
    ret:
        None: show plotly fig
    """
    fig = graph.Figure(layout = graph.Layout(title=graph.layout.Title(text=title))) 
    for i in range(len(all_history)):
        fig.add_trace(graph.Scatter(x = all_history[i]["x"], 
                                    y = all_history[i]["y"],
                                    name = all_history[i]["legend"])) 
    if log: fig.update_xaxes(type="log")
    fig.show()



# 2. Plot the regret VS horizon ($n$).

In [None]:

Ninit = 100
Ns  = [Ninit * (2**i) for i in range(1, 7)]

gauss_arm     = Context_arm(gaussian_bandit=True)
bernoulli_arm = Context_arm(gaussian_bandit=False)


bern_LinUCB_regret = regret_vs_horizon(bernoulli_arm, Ns, REPEAT=100, algorithm=LinUCB)
LinUCB_regret      = regret_vs_horizon(gauss_arm, Ns, REPEAT=100, algorithm=LinUCB)


plot_LinUCB_regret = {"legend": "mean_LinUBC_regret", 
                      "x": Ns , 
                      "y": LinUCB_regret}

plot_bern_LinUCB_regret = {"legend": "mean_bern_LinUCB_regret", 
                           "x": Ns , 
                           "y": bern_LinUCB_regret}


plot([plot_LinUCB_regret], title="regret VS horizon - linear", log = False)
plot([plot_bern_LinUCB_regret], title="regret VS horizon - Log", log = False)
# plot([plot_LinUCB_regret], title="regret VS horizon - linear", log = True)
# plot([plot_bern_LinUCB_regret], title="regret VS horizon - Log", log = True)




















  

  










#

# 3a. Compare LinUCB with original UCB (for Gaussian rewards only) 

In [None]:

Ninit = 100
Ns  = [Ninit * (2**i) for i in range(1, 7)]

gauss_arm = Context_arm(gaussian_bandit=True)

UCB_regret = regret_vs_horizon(gauss_arm, Ns, REPEAT=100, algorithm=UCB)
LinUCB_regret = regret_vs_horizon(gauss_arm, Ns, REPEAT=100, algorithm=LinUCB)

plot_LinUCB_regret = {"legend": "mean_LinUBC_regret", 
                      "x": Ns , 
                      "y": LinUCB_regret}

plot_UCB_regret = {"legend": "mean_UBC_regret", 
                    "x": Ns , 
                    "y": UCB_regret}



plot([plot_UCB_regret,plot_LinUCB_regret], title="regret VS horizon - linear", log = False)
# plot([plot_UCB_regret,plot_LinUCB_regret], title="regret VS horizon - Log", log = True)














































#


# 3b. Compare LinUCB with KL-UCB (for Bernoulli rewards only)

In [None]:

Ninit = 100
Ns  = [Ninit * (2**i) for i in range(1, 7)]

bernoulli_arm = Context_arm(gaussian_bandit=False)

KLUCB_regret = regret_vs_horizon(bernoulli_arm, Ns, REPEAT=100, algorithm=KLUCB)
LinUCB_regret = regret_vs_horizon(bernoulli_arm, Ns, REPEAT=100, algorithm=LinUCB)

plot_LinUCB_regret = {"legend": "mean_LinUBC_regret", 
                      "x": Ns , 
                      "y": LinUCB_regret}

plot_KLUCB_regret = {"legend": "mean_KLUCB_regret", 
                    "x": Ns , 
                    "y": KLUCB_regret}

plot([plot_LinUCB_regret,plot_KLUCB_regret], title="regret VS horizon - linear" , log = False)
# plot([plot_LinUCB_regret,plot_KLUCB_regret], title="regret VS horizon - Log" , log = True)
