In [113]:
import numpy as np
import random

In [118]:
# monto carlo
class EpsilonGreedy():
    def __init__(self, epsilon, counts, values):
        self.epsilon = epsilon
        self.counts = counts
        self.values = values
        return
    
    def initialize(self, n_arms):
        self.counts = [0 for col in range(n_arms)]
        self.values = [0.0 for col in range(n_arms)]
        return
    # pull the arm once
    def select_arm(self):
        if random.random() > self.epsilon:
            return ind_max(self.values)
        else:
            return random.randrange(len(self.values))

    # update after pull the arm
    def update(self, chosen_arm, reward):
        self.counts[chosen_arm] = self.counts[chosen_arm] + 1
        n = self.counts[chosen_arm]
        
        value = self.values[chosen_arm]
        new_value = ((n-1) / float(n)) * value + (1 / float(n)) * reward
        self.values[chosen_arm] = new_value
        return

        
def ind_max(x):
    m = max(x)
    return x.index(m)


class BernoulliArm():
    def __init__(self, p):
        self.p = p
        
    def draw(self):
        # 有p的概率返回1，就是有概率p进行点击.
        if random.random() > self.p:
            return 0.0
        else:
            return 1.0

In [42]:
# Black and white test
means = [0.1, 0.1, 0.1, 0.1, 0.9]
n_arms = len(means)
random.shuffle(means)
arms = list(map(lambda mu: BernoulliArm(mu), means))

In [75]:
num_sims = 10
horizon  = 1
[0.0 for i in range(num_sims * horizon)]

[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]

In [116]:
def test_algorithm(algo, arms, num_sims, horizon):
    """
    algo: an bandit algoirthm we want to test.
    arms: an array of arms we want to simulate draws from.
    num_sims: a fixed number of simulations to run to avearge over the noise in each simulation.
    horizon: The number of the times each algorithm is allowed to pull on arms during simulation.
    """
    # 初始化
    chosen_arms = [0.0 for i in range(num_sims * horizon)]
    rewards = [0.0 for i in range(num_sims * horizon)]
    cumulative_rewards = [0.0 for i in range(num_sims * horizon)]
    sim_nums = [0.0 for i in range(num_sims * horizon)]
    times = [0.0 for i in range(num_sims * horizon)]
    
    for sim in range(num_sims):
        sim = sim + 1
        algo.initialize(len(arms))
        
        for t in range(num_sims):
            # 在多次simulation中
            sim = sim + 1
            algo.initialize(len(arms))
            # 在一次simulation中，可以pull arm的次数。
            for t in range(horizon):
                t = t + 1
                index = (sim - 1) * horizon + t - 1 
                sim_nums[index] = sim
                times[index] = t
                
                # 来看下每次选的是哪个arm
                chosen_arm = algo.select_arm()
                chosen_arms[index] = chosen_arm
                
                reward = arms[chosen_arms[index]].draw()
                rewards[index] = reward
                
                if t == 1:
                    cumulative_rewards[index] = reward
                else:
                    cumulative_rewards[index] = cumulative_rewards[index - 1] + reward
                # 更新reward
                algo.update(chosen_arm, reward)
            return [sim_nums, times, chosen_arms, rewards, cumulative_rewards]
                

In [129]:
import random
random.seed(1)
means = [0.1, 0.1, 0.1, 0.1, 0.9] 
n_arms = len(means)
random.shuffle(means)
arms = list(map(lambda mu: BernoulliArm(mu), means))
print('Best arm is ' + str(ind_max(means)))
f = open('standard_result.tsv', 'w')
# A. 对5个epsilon都做
for epsilon in [0.1, 0.2, 0.3, 0.4, 0.5]:
    # 1. 建立epsilonGreedy对象
    algo = EpsilonGreedy(epsilon, [], [])
    # 2. 初始化当前的epsilonGreedy对象
    algo.initialize(n_arms)
    # 3. 在当前epsilonGreedy对象上跑5000*250次算法
    results = test_algorithm(algo, arms, 5000, 250)
    # 4. 进行了多少次实验
    for i in range(len(results[0])):
        f.write(str(epsilon) + '\t')
        f.write('\t'.join([str(results[j][i]) for j in range(len(results))]) + '\n')
f.close()

Best arm is 2
0.0


KeyboardInterrupt: 

In [131]:
# UCB
class UCB1():
    def __init__(self, counts, values):
        self.counts = counts
        self.values = values
        return
    
    def initilize(self, n_arms):
        self.counts = [0 for col in range(n_arms)]
        self.values = [0.0 for col in range(n_arms)]
        return
    
    def select_arm(self):
        n_arms = len(self.counts)
        # 保证每个都有pull过
        for arm in range(n_arms):
            if self.counts[arm]==0:
                return arm
        
        ucb_values = [0.0 for arm in range(n_arms)]
        total_counts = sum(self.counts)
        for arm in range(n_arms):
            bonus = math.sqrt((2*np.log(total_counts))/float(self.counts[arm]))
            ucb_values[arm] = self.values[arm] + bonus
        return index_max(ucb_values)
    
    def update(self, chosen_arm, reward):
        self.counts[chosen_arm] = self.counts[chosen_arm] + 1
        n = self.counts[chosen_arm]
        value = self.values[chosen_arm]
        new_value = ((n-1)/float(n)) * value + (1 / float(n)) * reward
        self.values[chosen_arm] = new_value
        return