In [23]:
import numpy as np
import math
import matplotlib.pyplot as plt
from operator import add

In [24]:
class BanditGameNonStationary:
    def __init__(self, n, mu, sigma):
        self.n = n
        self.mu = mu
        self.sigma = sigma
        self.q = np.random.normal(mu, sigma, n)
    
    def reward(self, a):
        rew = self.q[a] + np.random.normal(self.mu, self.sigma, 1)[0]
        self.walk()
        return rew

    def walk(self):
        self.q = self.q + np.random.normal(0, 0.01, self.n)

    def trueReward(self, a):
        return self.q[a]

    def bestAction(self):
        return np.argmax(self.q)

In [25]:
class BanditGame:
    def __init__(self, n, mu, sigma):
        self.n = n
        self.mu = mu
        self.sigma = sigma
        self.q = np.random.normal(mu, sigma, n)
        self.bestA = np.argmax(self.q)

    def reward(self, a):
        return self.q[a] + np.random.normal(self.mu, self.sigma/2, 1)[0]

    def trueReward(self, a):
        return self.q[a]

    def bestAction(self):
        return self.bestA

In [26]:
class Player:
    def __init__(self, n, eps):
        self.eps = eps
        self.n = n
        self.N = [0 for i in range(n)]
        self.Q = [0 for i in range(n)]

    def play(self):
        return self.Q.index(max(self.Q)) if np.random.random() < (1 - self.eps) else np.random.randint(0, self.n-1)

    def update(self, a, R):
        self.N[a] = self.N[a] + 1
        self.Q[a] = self.Q[a] + (R - self.Q[a])/self.N[a]

    def reset(self):
        self.N = [0 for i in range(self.n)]
        self.Q = [0 for i in range(self.n)]
    
    def whoIam(self):
        return 'Player, eps={}'.format(self.eps)

In [27]:
class UCBPlayer:
    def __init__(self, n, c):
        self.c = c
        self.n = n
        self.N = [0 for i in range(n)]
        self.Q = [0 for i in range(n)]
        self.t = 1

    def play(self):
        uQ = [qt + self.c*math.sqrt(math.log(self.t)/nt) if nt != 0 else 1e6 for qt, nt in zip(self.Q, self.N)]
        return uQ.index(max(uQ))

    def update(self, a, R):
        self.t = self.t + 1
        self.N[a] = self.N[a] + 1
        self.Q[a] = self.Q[a] + (R - self.Q[a])/self.N[a]

    def reset(self):
        self.t = 1
        self.N = [0 for i in range(self.n)]
        self.Q = [0 for i in range(self.n)]
    
    def whoIam(self):
        return 'UCBPlayer, c={}'.format(self.c)

In [28]:
class AdaptivePlayer:
    def __init__(self, n, eps, nswitch):
        self.eps = eps
        self.epsBase = eps
        self.n = n
        self.N = [0 for i in range(n)]
        self.Q = [0 for i in range(n)]
        self.nswitch = nswitch
        self.iter = 0
        self.h = (self.epsBase - self.epsBase/10) / self.nswitch

    def play(self):
        if self.iter < self.nswitch:
            self.eps = self.epsBase - self.h * self.iter
        self.iter = self.iter + 1
        return self.Q.index(max(self.Q)) if np.random.random() < (1 - self.eps) else np.random.randint(0, self.n-1)

    def update(self, a, R):
        self.N[a] = self.N[a] + 1
        self.Q[a] = self.Q[a] + (R - self.Q[a])/self.N[a]

    def reset(self):
        self.N = [0 for i in range(self.n)]
        self.Q = [0 for i in range(self.n)]
        self.iter = 0 
        self.eps = self.epsBase

    def whoIam(self):
        return 'AdaptPlayer, eps={}, sw={}'.format(self.epsBase, self.nswitch)

In [29]:
class OptimisticTrackingPlayer:
    def __init__(self, n, eps, alpha, QInit):
        self.eps = eps
        self.n = n
        self.alpha = alpha
        self.Q = [QInit for i in range(n)]
        self.Qinit = QInit

    def play(self):
        return self.Q.index(max(self.Q)) if np.random.random() < (1 - self.eps) else np.random.randint(0, self.n-1)

    def update(self, a, R):
        self.Q[a] = self.Q[a] + self.alpha*(R - self.Q[a])

    def reset(self):
        self.Q = [self.Qinit for i in range(self.n)]
    
    def whoIam(self):
        return 'OptimisticTrackingPlayer, eps={}, alpha = {}, Qinit={}'.format(self.eps, self.alpha, self.Qinit)

In [30]:
class OptimisticPlayer:
    def __init__(self, n, eps, QInit):
        self.eps = eps
        self.n = n
        self.N = [0 for i in range(n)]
        self.Q = [QInit for i in range(n)]
        self.Qinit = QInit

    def play(self):
        return self.Q.index(max(self.Q)) if np.random.random() < (1 - self.eps) else np.random.randint(0, self.n-1)

    def update(self, a, R):
        self.N[a] = self.N[a] + 1
        self.Q[a] = self.Q[a] + (R - self.Q[a])/self.N[a]

    def reset(self):
        self.N = [0 for i in range(self.n)]
        self.Q = [self.Qinit for i in range(self.n)]
    
    def whoIam(self):
        return 'OptimisticPlayer, eps={}, Qinit={}'.format(self.eps, self.Qinit)

In [31]:
class TrackingPlayer:
    def __init__(self, n, eps, alpha):
        self.eps = eps
        self.n = n
        self.alpha = alpha
        self.Q = [0 for i in range(n)]

    def play(self):
        return self.Q.index(max(self.Q)) if np.random.random() < (1 - self.eps) else np.random.randint(0, self.n-1)

    def update(self, a, R):
        self.Q[a] = self.Q[a] + self.alpha*(R - self.Q[a])

    def reset(self):
        self.Q = [0 for i in range(self.n)]
    
    def whoIam(self):
        return 'TrackingPlayer, eps={}, a={}'.format(self.eps, self.alpha)

In [32]:
class TrackingUnbiasedPlayer:
    def __init__(self, n, eps, alpha):
        self.eps = eps
        self.n = n
        self.alpha = alpha
        self.Q = [0 for i in range(n)]
        self.o = 0

    def play(self):
        return self.Q.index(max(self.Q)) if np.random.random() < (1 - self.eps) else np.random.randint(0, self.n-1)

    def update(self, a, R):
        self.o = self.o + self.alpha*(1- self.o)
        self.Q[a] = self.Q[a] + self.alpha/self.o*(R - self.Q[a])

    def reset(self):
        self.Q = [0 for i in range(self.n)]
        self.o = 0
    
    def whoIam(self):
        return 'TrackingUnbiasedPlayer, eps={}, a={}'.format(self.eps, self.alpha)

In [33]:
class TestSuite:
    def __init__(self, Bandit, nBandits, n, mu, sigma):
        self.nBandits = nBandits
        self.bandits = [Bandit(n, mu, sigma) for i in range(nBandits)]

    def playBandit(self, player, iBandit, iter):
        playerReward = []
        isOptimalA = []
        for i in range(iter):
            a = player.play()
            isOptimalA.append(a == self.bandits[iBandit].bestAction())
            reward = self.bandits[iBandit].reward(a)
            playerReward.append(reward)
            player.update(a, reward)

        return playerReward, isOptimalA

    def playAllBandits(self, player, iter):
        avgReward = [0 for i in range(iter)]
        fractionOptimalA = [0 for i in range(iter)]
        for i in range(self.nBandits):
            reward, isOptimalA = self.playBandit(player, i, iter)
            avgReward = list( map(add, avgReward, reward) )
            fractionOptimalA = list( map(add, fractionOptimalA, isOptimalA) )
            player.reset()

        avgReward = [e / self.nBandits for e in avgReward]
        fractionOptimalA = [e / self.nBandits for e in fractionOptimalA]
        return avgReward, fractionOptimalA

In [34]:
#suite = TestSuite(BanditGameNonStationary, 2000, 10, 0, 1)
#players = [TrackingUnbiasedPlayer(10, 0.1, 0.1), TrackingPlayer(10, 0.1, 0.1)]
suite = TestSuite(BanditGame, 2000, 10, 0, 1)
players = [Player(10, 0.1), UCBPlayer(10, 2), UCBPlayer(10, 1)]

In [35]:
j = 1000

rewards = []
fractionOptimalA = []
for p in players:
    rews, frOptA = suite.playAllBandits(p, j)
    rewards.append(rews)
    fractionOptimalA.append(frOptA)

In [None]:
for r in fractionOptimalA:
    plt.plot(r, label = players[fractionOptimalA.index(r)].whoIam())

plt.legend()
plt.show()

In [None]:
for r in rewards:
    plt.plot(r, label = players[rewards.index(r)].whoIam())

plt.legend()
plt.show()