In [40]:
import random
import numpy as np

In [41]:
class Arm:
    def __init__(self,probability: float, deviation: float):
        self.probability = probability
        self.deviation = deviation
        self.positive_reward = 1
        self.negative_reward = 0
    def pull(self) -> int:
        return self.positive_reward*np.random.normal(self.probability,self.deviation)

In [42]:
class Fair:
    def __init__(self):
        self.arms = []
        self.length = 0
    def add_arm(self,arm : Arm):
        self.arms.append(arm)
        self.length += 1
    def pull_at_machine(self,index : int) -> int:
        try:
            return self.arms[index].pull()
        except:
            print("Error at Fair.pull_at_machine")
            raise

In [43]:
f = Fair()
f.add_arm(Arm(0.7, 0.01))
f.add_arm(Arm(0.8, 0.01))
f.add_arm(Arm(0.6, 0.01))
f.add_arm(Arm(0.5, 0.03))
f.add_arm(Arm(0.7, 0.02))
f.add_arm(Arm(0.7, 0.01))
f.add_arm(Arm(0.8, 0.01))
f.add_arm(Arm(0.6, 0.03))
f.add_arm(Arm(0.5, 0.05))
f.add_arm(Arm(0.7, 0.02))
f.add_arm(Arm(0.7, 0.03))
f.add_arm(Arm(0.8, 0.06))
f.add_arm(Arm(0.6, 0.01))
f.add_arm(Arm(0.5, 0.01))
f.add_arm(Arm(0.9, 0.01))
f.add_arm(Arm(0.7, 0.01))
f.add_arm(Arm(0.91, 0.01))
f.add_arm(Arm(0.6, 0.02))
f.add_arm(Arm(0.5, 0.04))
f.add_arm(Arm(0.7, 0.05))

In [44]:
'''
Gradient bandit Algorithm for reinforcement learning 
'''
class Agent:
    def __init__(self,fair : Fair):
        self.H = np.ones(fair.length)
        self.trial_times = 0
        self.fair = fair
        self.action_length = fair.length
        self.total_reward = 0
        self.lr = 0.01
    def choose_action(self):
        def soft_max(H):
            h = H - np.max(H)
            exp = np.exp(h)
            return exp / np.sum(exp)
        #choose action based on policy
        policy = soft_max(self.H)
        machine = np.random.choice(self.action_length, p =policy)
        reward = self.fair.pull_at_machine(machine)

        #update policy
        self.total_reward += reward
        self.trial_times += 1
        avr_reward = self.total_reward / self.trial_times
        upd = self.H[machine] + self.lr*(reward - avr_reward)*(1 - policy[machine])
        self.H -= self.lr*(reward - avr_reward)*policy[machine]
        self.H[machine] = upd
        return reward
    def drive(self,num_episodes : int,num_timesteps_per_eposode : int):
        rewards = []
        for episode in range(num_episodes):
            reward = 0
            for timestep in range(num_timesteps_per_eposode):
                reward += self.choose_action()
            rewards.append(reward / num_timesteps_per_eposode)
        return rewards


In [46]:
a = Agent(f)
a.drive(10,10000)

[0.7288470085496952,
 0.8976725953636907,
 0.9100367763601426,
 0.9099983118532134,
 0.9101128804208963,
 0.9099294524265764,
 0.9101290074545877,
 0.9101428213485291,
 0.9101382601726149,
 0.9100306754171573]