In [3]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize

In [4]:
def sample_from_normal_dist(mu, sigma):
    return np.random.normal(mu)

In [5]:
class MultiArmedBandit:
    def __init__(self, arm_count, time_horizon, ground_truth_means):
        self.K = arm_count
        self.T = time_horizon
        self.curr_t = 0
        self.total_reward = 0
        self.ground_truth_means = ground_truth_means
        self.best_arm_mean = np.max(ground_truth_means)
        self.arm_rewards = np.zeros(self.K)
        self.reward_history = []
        self.regret_history = []
        self.empirical_means = np.zeros(self.K)
        self.arm_sample_count = np.zeros(self.K, dtype = np.int32)
        self.upper_confidence_bounds = np.zeros(self.K)
        self.lower_confidence_bounds = np.zeros(self.K)
    
    def ChooseBestArm(self):
        return np.argmax(self.upper_confidence_bounds)
    
    def UpdateEmpiricalMean(self, a, curr_sample):
        self.curr_t += 1
        self.arm_sample_count[a] += 1
        self.arm_rewards[a] += curr_sample
        self.empirical_means[a] = float(self.arm_rewards[a])/float(self.arm_sample_count[a])
    
    def UpdateConfidenceBounds(self, a):
        T=self.T
        self.upper_confidence_bounds[a] = self.empirical_means[a] + np.sqrt(float(2*np.log(T))/float(self.arm_sample_count[a]))
        self.lower_confidence_bounds[a] = self.empirical_means[a] - np.sqrt(float(2*np.log(T))/float(self.arm_sample_count[a]))
        
    def Update(self, a, curr_sample):
        self.UpdateEmpiricalMean(a, curr_sample)
        self.UpdateConfidenceBounds(a)
    
    def SampleOnce(self):
        for arm in range(self.K):
            curr_sample = np.random.binomial(1, self.ground_truth_means[arm])
            self.Update(arm, curr_sample)
            self.total_reward += curr_sample
            self.reward_history.append(self.total_reward)
            self.regret_history.append(self.best_arm_mean*(len(self.reward_history))-self.total_reward)

    def BanditPlay(self):
        self.SampleOnce()
        for t in range(self.T-self.K):
            curr_arm = self.ChooseBestArm()
            curr_sample = np.random.binomial(1, self.ground_truth_means[curr_arm])
            self.Update(curr_arm, curr_sample)
            self.total_reward += curr_sample
            self.reward_history.append(self.total_reward)
            self.regret_history.append(self.best_arm_mean*(len(self.reward_history))-self.total_reward)

In [6]:
class MultiplePlayers:
    def __init__(self, a_num_players, a_margin, a_arm_count, a_time_horizon, seed_ground_truth_means):
        self.m_num_players = a_num_players
        self.m_arm_count = a_arm_count
        self.m_margin = a_margin
        self.m_time_horizon = a_time_horizon
        self.m_ground_truth_means_array = np.zeros((self.m_num_players, self.m_arm_count))
        for col in range(self.m_arm_count):
            l = max(0.0, seed_ground_truth_means[col]-self.m_margin)
            h = min(seed_ground_truth_means[col]+self.m_margin, 1.0)
            self.m_ground_truth_means_array[:,col] = np.random.uniform(low=l, high=h, size=self.m_num_players)
        self.m_players = [MultiArmedBandit(self.m_arm_count, self.m_time_horizon, self.m_ground_truth_means_array[r,:]) for r in range(self.m_num_players)]

    def CalculateUCB_width(self, alpha, n, m, X, Y, epsilon):
        T=self.m_time_horizon
        ucb_wt = np.sqrt(2*np.log(T)*(float(alpha**2)/float(n) + float((1-alpha)**2)/float(m))) + (1-alpha)*epsilon
        return ucb_wt

    def BestWeightedUCB(self, n, m, X, Y, epsilon):
        T=self.m_time_horizon
        A=2*np.log(T)*(float(1)/float(n)+float(1)/float(m))
        B=-4*np.log(T)/float(m)
        C=2*np.log(T)/float(m)
        D=-1*epsilon
        S=float(4*A*C*(np.power(D,2))-np.power(B*D,2))/float(4*np.power(A,3)-4*np.power(A*D,2))
        if(S<0):
            if(self.CalculateUCB_width(0, n, m, X, Y, epsilon) < self.CalculateUCB_width(1, n, m, X, Y, epsilon)):
                UCB_alpha_star =  float(Y)/float(m) + self.CalculateUCB_width(0, n, m, X, Y, epsilon)
            else:
                UCB_alpha_star =  float(X)/float(n) + self.CalculateUCB_width(1, n, m, X, Y, epsilon)
        else:
            if(D>=0):
                alpha_star = -np.sqrt(S) - float(B)/float(2*A)
            else:
                alpha_star = np.sqrt(S) - float(B)/float(2*A)
            if(alpha_star<0):
                alpha_star = 0
            if(alpha_star>1):
                alpha_star = 1
            UCB_alpha_star = alpha_star*float(X)/float(n) + (1-alpha_star)*float(Y)/float(m) + self.CalculateUCB_width(alpha_star, n, m, X, Y, epsilon)
        return UCB_alpha_star

    def UpdateConfidenceBounds_best_weighted(self, curr_player):
        for a in range(self.m_arm_count):
            m_other_players = 0
            Y_a = 0
            for p in range(self.m_num_players):
                if(p != curr_player):
                    m_other_players += self.m_players[p].arm_sample_count[a]
                    Y_a += self.m_players[p].arm_rewards[a]
            n = self.m_players[curr_player].arm_sample_count[a]
            X_a = self.m_players[curr_player].arm_rewards[a]
            best_ucb = self.BestWeightedUCB(n,m,X_a,Y_a,self.m_margin)
            self.m_players[curr_player].upper_confidence_bounds[a] = best_ucb

    def ConcurrentBanditPlay_best_weighting(self):
        '''
            All the players play in parallel, weighting is corresponding to best alpha
        '''
        for player in self.m_players:
            player.SampleOnce()
        for t in range(self.m_time_horizon-self.m_arm_count):
            for p in range(self.m_num_players):
                p_arm = self.m_players[p].ChooseBestArm()
                p_sample = np.random.binomial(1, self.m_players[p].ground_truth_means[p_arm])
                self.m_players[p].Update(p_arm, p_sample)
                self.m_players[p].total_reward += p_sample
                self.m_players[p].reward_history.append(self.m_players[p].total_reward)
                self.m_players[p].regret_history.append(self.m_players[p].best_arm_mean*(len(self.m_players[p].reward_history))-self.m_players[p].total_reward)
            for p in range(self.m_num_players):
                self.UpdateConfidenceBounds_best_weighted(p)