# Fixed Confidence Best Arm Identification in Bayesian Settings

This code is the official implementation of 'Fixed Confidence Best Arm Identification in Bayesian Settings.'

To proceed, simply press 'shift+enter' for each cell. 

### Requirements
The following cell includes all the required packages for this code.

In [27]:
import numpy as np
from scipy.stats import norm
from scipy.integrate import quad
from scipy.optimize import minimize, minimize_scalar
import random
import time

## Prior Distribution class

This class is for implementing prior distribution $H$ and computing prior-dependent constants such as $L(H)$ efficiently. 
*prior\_distribution* takes three main parameters:
- *prior\_mean*: corresponds to $(m_i)_{i=1}^k$, the mean vector of the prior distribution.
- *prior\_std*: corrsponds to $(\xi_i)_{i=1}^k$, the vector of standard deviations.
- *instance\_std*: corresponds to $(\sigma_i)_{i=1}^k$, the standard deviations of the reward distributions. 

We will mainly use two methods from this distribution:
- sample\_instance(): Sample $(\mu_i)_{i=1}^k$ from the prior distribution $H$. 
- get_Delta_0($\delta$): Compute $\Delta_0$, which is defined in Algorithm 1. 

In [3]:
class prior_distribution:
  def __init__(self, prior_mean, prior_std, instance_std):
    self.K=np.size(prior_mean)
    self.prior_mean = prior_mean
    self.prior_std = prior_std
    self.prior_cov_mat = np.diag(self.prior_std**2)
    self.instance_std = instance_std
    self._get_Lij()

  def sample_instance(self):
    return np.random.multivariate_normal(self.prior_mean, self.prior_cov_mat)

  def _get_Lij(self):
    self.Lij_whole = np.zeros((self.K, self.K)) #whole list of Lij
    for i in range(self.K):
      for j in range(self.K):
        if i==j:
          self.Lij_whole[i][j]=0
        else:
          integrand= lambda x: self._integrand(x,i,j)
          self.Lij_whole[i][j]=quad(integrand, -np.inf, np.inf)[0]
    self.Lij=np.sum(self.Lij_whole)

  def _integrand(self,x,i,j):
    product=1
    for s in range(self.K):
      if (s==i or s==j):
        product=product*norm.pdf(self._standardize(x,s))
      else:
        product=product*norm.cdf(self._standardize(x,s))
    return product

  def _standardize(self,x,i):
    return (x-self.prior_mean[i])/self.prior_std[i]

  def get_Delta_0(self, delta):
    return delta/(4*self.Lij)



# Best-Arm-Identification Algorithm
## 0) Parent Class
Each BAI algorithm will have the following four methonds as its main method:
- \_\_init\_\_: for initialization
- sample(): A sampling rule $(A_t)_t$, which determines the arm to draw at round $t$ based on the previous history.
- stopping\_criterion(): when to stop the sampling
- update(): Update the information after each sampling.
- recommendation(): A decision rule $J$, which determines the arm the forecaster recommends based on his sampling history

All algorithms take only two inputs:
- prior_dist: corresponds to $H$ in our main paper. It will be *prior\_distribution* instance.
- delta: the confidence level $\delta$ in our main paper. 

In [4]:
from ast import Pass
class BAI_Algorithm:                    #Parent class for all algorithms
  def __init__(self, prior_dist, delta):
    Pass

  def sample(self):
    raise NotImplementedError

  def stopping_criterion(self): # False for continue, True for stop
    raise NotImplementedError

  def update(self):
    raise NotImplementedError

  def recommendation(self):
    raise NotImplementedError

## 1) Top-Two Thompson Sampling

is a class that implements the algorithm devised in the paper ['Simple bayesian algorithms for best arm identification'](https://proceedings.mlr.press/v49/russo16.html)


In [5]:
class TTTS(BAI_Algorithm):              #Top Two Thompson Sampling - UNDER PROGRESS
  def __init__(self, prior_dist, delta):
    self.beta = 0.5
    self.prior_dist = prior_dist
    self.K= prior_dist.K
    self.delta = delta
    self.C = 1
    self.alpha=1
    self.hist=[ [] for _ in range(self.K) ] # list of empty lists
    self.pos_mean=np.zeros(self.K)
    self.pos_std=np.zeros(self.K)
    self.n_list=np.zeros(self.K)
    self.m_hat = np.zeros(self.K)

  def sample(self):
    if np.min(self.n_list)==0:
      return np.argmin(self.n_list)
    alpha=np.zeros(self.K)
    index_sample=np.random.binomial(1,self.beta)
    alpha=self.posterior_sample()
    bestI=np.argmax(alpha)

    final_index=bestI         #With probability beta, return best index from the sample

    if index_sample==0:       #With prob. 1-beta, repeat sampling until new best index appears
      bestJ=bestI
      while bestI==bestJ:
        alpha=self.posterior_sample()
        _mask = np.zeros(self.K, dtype=bool)
        _mask[bestI] = True
        masked = np.ma.array(alpha, mask=_mask)
        bestJ=np.argmax(masked)
      final_index=bestJ
    return final_index

  def posterior_sample(self):               #Function that calculates posterior distribution

    sample_result=np.zeros(self.K)
    for i in range(self.K):
      sample_result[i]=np.random.normal(self.pos_mean[i],self.pos_std[i])
    return sample_result


  def update(self, a, reward):
                          #Posterior update
    self.m_hat[a] = (reward+self.n_list[a]*self.m_hat[a])/(self.n_list[a]+1)               #sample mean
    self.n_list[a] = self.n_list[a]+1
    m_p   = self.prior_dist.prior_mean[a]       #prior mean
    var_a = self.prior_dist.instance_std[a]**2  # variance of the arm
    var_p = self.prior_dist.prior_std[a]**2     # variance of the prior distribution
    self.pos_mean[a] = (m_p*var_p + self.n_list[a]*self.m_hat[a]*var_a)/(self.n_list[a]*var_a + var_p)
    self.pos_std[a]  = np.sqrt(var_a*var_p/(self.n_list[a]*var_a + var_p))




  def stopping_criterion(self):
    T=np.sum(self.n_list)
    if np.min(self.n_list)==0:
      return False
    i_max=np.argmax(self.m_hat)
    W = np.zeros(self.K)
    for j in range(self.K):
      if i_max==j:
          W[j]=np.inf
      else:
        tempi=self.n_list[i_max]/self.prior_dist.instance_std[i_max]**2
        tempj=self.n_list[j]/self.prior_dist.instance_std[j]**2
        infx=(tempi*self.m_hat[i_max]+tempj*self.m_hat[j])/(tempi+tempj)
        W[j]=self.n_list[i_max]*self.Kinf_neg(i_max,infx)+self.n_list[j]*self.Kinf_pos(j,infx)

    minWij=np.min(W)
    threshold = np.log(self.C*(T**self.alpha) / self.delta) ####################### Need to be adjusted later

    if minWij>threshold:
      return True
    else:
      return False



  def Kinf_neg(self, i, x):
    if x>self.m_hat[i]:
      return 0
    else:
      return self.KL_div(i,x)

  def Kinf_pos(self,j,x):
    if x<self.m_hat[j]:
      return 0
    else:
      return self.KL_div(j,x)

  def KL_div(self,i,x):
    return (x-self.m_hat[i])**2/(2*prior_dist.instance_std[i]**2)

  def recommendation(self):
    return np.argmax(self.m_hat)



## 2) Top-Two UCB
is a class that implements the algorithm devised in the paper ['Non-asymptotic analysis of a ucb-based top two algorithm'](https://proceedings.neurips.cc/paper_files/paper/2023/hash/d9b564716709357b4bccec9fc9ad04d2-Abstract-Conference.html)


In [6]:
class TTUCB(BAI_Algorithm):              #Top Two Thompson Sampling - UNDER PROGRESS
  def __init__(self, prior_dist, delta):
    self.beta = 0.5
    self.prior_dist = prior_dist
    self.K= prior_dist.K
    self.delta = delta
    self.n_la = np.zeros((self.K, self.K)) # number of arm 'a'(second) pull when the leader was 'l'(first)
    self.n_list=np.zeros(self.K) # number of each arm pull
    self.leader_list=np.zeros(self.K)
    self.m_hat = np.zeros(self.K)
    self.width = np.ones(self.K) # Not precisely width, without log factor
    self.UCB = self.m_hat+self.width*np.inf

  def sample(self):
    if np.min(self.n_list)==0: #Pull each arm at least once
        return np.argmin(self.n_list)
    bestI=np.argmax(self.m_hat)
    leader_ind = np.argmax(self.UCB)
    if self.beta*self.leader_list[leader_ind]<self.n_la[leader_ind][leader_ind]: #Pick Challenger
        challenger_measure = np.zeros(self.K)
        for i in range(self.K):
            if i==leader_ind:
                challenger_measure[i]=np.inf
            else:
                challenger_measure[i]=np.max(self.m_hat[leader_ind]-self.m_hat[i], 0)/np.sqrt(1/self.n_list[leader_ind]+1/self.n_list[i])
        challenger_ind=np.argmin(challenger_measure)
        return challenger_ind
    return leader_ind
    
    



  def update(self, a, reward):
      leader_ind = np.argmax(self.UCB)
      self.n_la[leader_ind][a]=self.n_la[leader_ind][a]+1
      self.leader_list[leader_ind]=self.leader_list[leader_ind]+1
      self.m_hat[a] = (reward+self.n_list[a]*self.m_hat[a])/(self.n_list[a]+1)               #sample mean
      self.n_list[a]=self.n_list[a]+1

      T=np.sum(self.n_list)
      self.width[a]=1/np.sqrt(self.n_list[a])
      self.UCB=self.m_hat+self.width*np.sqrt(4*np.log(T+1))
      


  def stopping_criterion(self):
    T=np.sum(self.n_list)
    if np.min(self.n_list)==0:
      return False
    i_max=np.argmax(self.m_hat)
    W = np.zeros(self.K)
    minW = np.inf
    for j in range(self.K):
        if i_max==j:
            W[j]=np.inf
        else:
            W[j]=(self.m_hat[i_max]-self.m_hat[j])/np.sqrt(1/self.n_list[i_max]+1/self.n_list[j])
        minW=np.min((minW, W[j]))

    C=2*self.C_G(0.5*np.log((self.K-1)/self.delta))+4*np.log(4+np.log((T-1)/2))
    threshold = np.sqrt(2*C)
    if minW>threshold:
      return True
    else:
      return False


  def recommendation(self):
    return np.argmax(self.m_hat)

  def C_G(self,x):
      return x+ np.log(x)

## 3) Our Algorithm (Algorithm 1, Successive Elimination)

In [7]:
class Elimination(BAI_Algorithm):
  def __init__(self, prior_dist, delta):
    self.prior_dist = prior_dist
    self.K= prior_dist.K
    self.delta=delta
    self.Delta0=prior_dist.get_Delta_0(delta)
    self.survived_arms = list(range(self.K))
    self.need_elimination = False
    self.Delta_safe = np.inf

    #self.hist=[ [] for _ in range(self.K) ] # list of empty lists
    self.n_list = np.zeros(self.K)
    self.m_hat = np.zeros(self.K)


  def sample(self):
#    n_list = [len(self.hist[i]) for i in self.survived_arms]
    chosen = np.argmin(self.n_list[self.survived_arms])
    if chosen==len(self.survived_arms)-1:
      self.need_elimination=True
    return self.survived_arms[chosen]

  def recommendation(self):
    if len(self.survived_arms)==1:
      return self.survived_arms[0]
    elif self.Delta_safe<self.Delta0:
      return random.choice(self.survived_arms)
    else:
      print('Error: recommendation should be done only after stopping criterion')
      return -1

  def stopping_criterion(self):
    if self.need_elimination:
      self.elim()

    if len(self.survived_arms)==1:                # Note that these two parameters, self,survived_arms and self.Delta_safe only changes when elim() has been called.
      return True
    elif self.Delta_safe<self.Delta0:
      return True
    else:
      return False


  def elim(self):
    ucbs=np.zeros(len(self.survived_arms))
    lcbs=np.zeros(len(self.survived_arms))
    next_survived=[]

    for i in range(len(self.survived_arms)):                      # Compute UCB and LCB
      ucbs[i], lcbs[i] = self.conf_bounds(self.m_hat,self.survived_arms[i])
    lcbmax=np.max(lcbs)                               # Maximum LCB
    ucbmax=np.max(ucbs)                               # Maximum UCB
    for i in range(len(self.survived_arms)):                       # Include all arms which satisfies UCB>LCBMAX to a new basket
      if ucbs[i]>lcbmax:
        next_survived.append(self.survived_arms[i])

    self.Delta_safe=ucbmax-lcbmax
    self.survived_arms=next_survived
    self.need_elimination = False

  def conf_bounds(self,m_hat,i):
    m_hat=self.m_hat[i]
    n=self.n_list[i]
    width=np.sqrt(
        2*self.prior_dist.instance_std[i]**2
        *np.log(12*self.K*n**2/self.delta**2/np.pi**2)/n
    )
    return m_hat+width, m_hat-width

  def update(self, a, reward):
    self.m_hat[a]=(self.m_hat[a]*self.n_list[a]+reward)/(self.n_list[a]+1)
    self.n_list[a]=self.n_list[a]+1


# General Experiment Design
A class designed to effectively manage the experimental environment. This is mainly for guaranteeing same instance samples $(\mu_i)_{i=1}^k$ for all algorithms. It takes the following major inputs:
- prior\_dist: corresponds to $H$ in our main paper. It will be *prior\_distribution* instance.
- $\delta$: Error probability. 
- algolist: list of algorithms for this experiment. 

Methods:
- single\_experiment(mean, algoname): run single experiment for *algoname* with given mean $(\mu_i)_{i=1}^k$. Returns stopping time and whether the prediction is correct or wrong. 
- monte\_carlo\_experiment(num\_of\_exp): repeat experiment *num\_of\_exp* times, but for each repetition all algorithms share same $(\mu_i)_{i=1}^k$. Output is expected stopping time and success rate of each algorithm. 


In [8]:
class Experiment:
  def __init__(self, prior_dist, delta, algolist):
    self.delta=delta
    self.algolist=algolist                              # Strings of algorithm names, such as ['Elim', 'TTTS']
    self.prior_dist=prior_dist
    self.K=self.prior_dist.K                            # Number of Arms
    self.stopping_time_hist = []                        # Record of Stopping time
    self.success_hist = []             
    self.time_spent=[]                                  # Record of time spent (unit: sec)


  def monte_carlo_experiment(self, num_of_exp): # running multiple experiment - num_of_exp times
    exp_stopping_time=np.zeros(len(self.algolist))
    success_rate=np.zeros(len(self.algolist))
    for i in range(num_of_exp):
      mean=self.prior_dist.sample_instance()
      for alg in range(len(self.algolist)):
        algoname=self.algolist[alg]
        start_time=time.time()
        sample_stopping_time, sample_success = self.single_experiment(mean, algoname)
        elapsed=time.time()-start_time
        self.stopping_time_hist.append(sample_stopping_time)
        self.success_hist.append(sample_success)
        self.time_spent.append(elapsed)
        exp_stopping_time[alg] =exp_stopping_time[alg]+sample_stopping_time/num_of_exp
        success_rate[alg]      =success_rate[alg]+sample_success/num_of_exp
    return exp_stopping_time, success_rate



  def single_experiment(self, mean, algoname):
    print("Starting an experiment - mean ")
    print(mean)
    alg=BAI_Algorithm(self.prior_dist,self.delta)
    if algoname=='Elim':
      print("Algo: Elim")
      alg = Elimination(self.prior_dist, self.delta)
    elif algoname=='TTTS':
      print("Algo: TTTS")
      alg = TTTS(self.prior_dist, self.delta)
    elif algoname=='TTUCB':
      print("Algo: TTUCB")
      alg = TTUCB(self.prior_dist, self.delta)
    answer=np.argmax(mean)
    std=self.prior_dist.instance_std
    stopping_time=0
    while not alg.stopping_criterion():
      a=alg.sample()
      reward = np.random.normal(mean[a],std[a])
      alg.update(a, reward)
      stopping_time=stopping_time+1
    print('Final stopping time: %d'%stopping_time)
    if answer==alg.recommendation():
      return stopping_time,1
    else:
      return stopping_time,0


# Start of the Main Code
Parameters
- $K$: number of arms.
- *prior\_mean*: corresponds to $(m_i)_{i=1}^k$, the mean vector of the prior distribution.
- *prior\_std*: corrsponds to $(\xi_i)_{i=1}^k$, the vector of standard deviations.
- *instance\_std*: corresponds to $(\sigma_i)_{i=1}^k$, the standard deviations of the reward distributions. 
- $\delta$: error probability.

In [19]:
K=2
prior_mean=np.zeros(K)
prior_std=np.ones(K)
instance_std=np.ones(K)
delta=0.1

Create prior distribution $H$

In [None]:
prior_dist=prior_distribution(prior_mean, prior_std,instance_std)
prior_dist.get_Delta_0(delta)

Experiment instance

In [11]:
myExp=Experiment(prior_dist, delta, ['TTTS', 'TTUCB', 'Elim'])

Run the experiment 1000 times.
- res: average stopping time variable

- ult: success rate variable

In [None]:
res, ult= myExp.monte_carlo_experiment(1000)

### Stopping time variables

- ElimStop: Stopping time history for our algorithm
- TTTSStop: Stopping time history for TTTS
- TTUCBStop: Stopping time history for TTUCB

### Computation time variables

- ElimCompTime: Computation time history for our algorithm
- TTTSComp: Computation time history for TTTS
- TTUCBComp: Computation time history for TTUCB


In [59]:
lnow=1000
ElimStop = [myExp.stopping_time_hist[3*i+2] for i in range(lnow)]
TTTSStop = [myExp.stopping_time_hist[3*i+1] for i in range(lnow)]
TTUCBStop = [myExp.stopping_time_hist[3*i] for i in range(lnow)]
ElimCompTime = [myExp.time_spent[3*i+2] for i in range(lnow)]
TTTSCompTime = [myExp.time_spent[3*i+1] for i in range(lnow)]
TTUCBCompTime = [myExp.time_spent[3*i] for i in range(lnow)]
np.savetxt('Elim_K2_1000.txt', ElimStop, delimiter=',')
np.savetxt('TTTS_K2_1000.txt', TTTSStop, delimiter=',')
np.savetxt('TTUCB_K2_1000.txt', TTUCBStop, delimiter=',')
np.savetxt('Elim_Comptime_K2_1000.txt', ElimCompTime, delimiter=',')
np.savetxt('TTTS_Comptime_K2_1000.txt', TTTSCompTime, delimiter=',')
np.savetxt('TTUCB_Comptime_K2_1000.txt', TTUCBCompTime, delimiter=',')

In [None]:
print(np.mean(ElimStop))
print(np.mean(TTTSStop))
print(np.mean(TTUCBStop))

In [None]:
print(np.mean(ElimCompTime))
print(np.mean(TTTSCompTime))
print(np.mean(TTUCBCompTime))

In [None]:
print(np.max(ElimStop))
print(np.max(TTTSStop))
print(np.max(TTUCBStop))