In [1]:
import random
import math
import numpy as np

#### Environment



In [2]:
class Environment:
    def __init__(self, proba):
        self.proba = proba
    
    def do(self, action):
        reward = 1 if random.random() < self.proba[action] else 0
        return reward

    @property
    def number_of_actions(self):
        return len(self.proba)

#### Bandit

In [3]:
class UGapE:
    def __init__(self, ϵ, m, env, *args, **kwargs):
        # TODO: Make it work for more than 1 arm?
        assert m == 1
        
        self.ϵ = ϵ
        self.m = m
        self.env = env
        
        self.arms = list(range(env.number_of_actions))
        self._rewards = {arm: [] for arm in self.arms}
        
        for param in ['β', 'U', 'L', 'B']:
            setattr(self, param, None)
        
        self.t = 1
    
    def play_round(self):
        action = self.get_action()
        reward = self.env.do(action)
        self.update(action, reward)
    
    def get_action(self):
        # Check if any arm has not been pulled
        for arm in self.arms:
            if not len(self._rewards[arm]):
                return arm
        
        # Stop condition ?
        μ = np.array([np.mean(self._rewards[arm]) for arm in self.arms])
        T = np.array([len(self._rewards[arm]) for arm in self.arms])
        self.β = self.compute_β(T=T)
        self.L, self.U = μ - self.β, μ + self.β
        self.B = self.compute_B()
        
        J = np.argmin(self.B)
        l = J
        # TODO: Shorter code?
        u, u_value = None, None
        for arm in set(self.arms) - {J}:
            if u == None or self.U[arm] > u_value:
                u, u_value = arm, self.U[arm]
        print(u)
        I = l if self.β[l] >= self.β[u] else u        
        return I
        
    def update(self, action, reward):
        self._rewards[action].append(reward)
        self.t += 1
    
    def compute_β(self, *args, **kwargs):
        raise NotImplementedError()
    
    def compute_B(self):
        # TODO - Make this more efficient?
        B = []
        for k in self.arms:
            B.append(max([
                self.U[i] - self.L[k] for i in set(self.arms) - {k}
            ]))
        return B

class UGapEb(UGapE):
    def __init__(self, ϵ, m, n, a, *args, **kwargs):
        super().__init__(ϵ, m, *args, **kwargs)
        self.n = n
        self.a = a
    
    def compute_β(self, T, *args, **kwargs):
        b = 1  # TODO - Should be a parameter
        return b * np.sqrt(self.a / T)

class UGapEc(UGapE):
    def __init__(self, ϵ, m, δ, c, *args, **kwargs):
        super().__init__(ϵ, m, *args, **kwargs)
        self.δ = δ
        self.c = c
    
    def compute_β(self, T, *args, **kwargs):
        b = 1  # TODO - Should be a parameter
        K = len(self.arms)
        
        return b * np.sqrt(
            (self.c * math.log((4 * K * (self.t - 1) ** 3) / self.δ)) / 
            T
        )

In [4]:
env = Environment([0.5, 0.2, 0.9, 0.6, 0.7, 0.1])
env.do(2)

1

In [6]:
from pundit.bandits import UGapEc

In [7]:
algo = UGapEc(
    ϵ=0.01, m=1, δ=0.05, c=0.5, env=env
)

In [8]:
for _ in range(2_000):
    algo.play_round()

In [9]:
algo.B

[0.9781879408429209,
 1.5333375756238947,
 0.0790066087846435,
 0.5874322735098487,
 0.48938255622892357,
 1.49421478232296]