<a href="https://colab.research.google.com/github/matisandacz/CS234-Reinforcement-Learning/blob/main/Multi_Armed_Bandit.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Original Code: https://github.com/ankonzoid/LearningX/blob/master/classical_RL/multiarmed_bandit/multiarmed_bandit.py

The code below adapts it to support M bandits

In [None]:
"""

 multiarmed_bandit.py  (author: Anson Wong / git: ankonzoid)

 We solve the multi-armed bandit problem using a classical epsilon-greedy
 agent with reward-average sampling as the estimate to action-value Q.
 This algorithm follows closely with the notation of Sutton's RL textbook.

 We set up bandit arms with fixed probability distribution of success,
 and receive stochastic rewards from each arm of +1 for success,
 and 0 reward for failure.

 The incremental update rule action-value Q for each (action a, reward r):
   n += 1
   Q(a) <- Q(a) + 1/n * (r - Q(a))

 where:
   n = number of times action "a" was performed
   Q(a) = value estimate of action "a"
   r(a) = reward of sampling action bandit (bandit) "a"

 Derivation of the Q incremental update rule:
   Q_{n+1}(a)
   = 1/n * (r_1(a) + r_2(a) + ... + r_n(a))
   = 1/n * ((n-1) * Q_n(a) + r_n(a))
   = 1/n * (n * Q_n(a) + r_n(a) - Q_n(a))
   = Q_n(a) + 1/n * (r_n(a) - Q_n(a))

"""
import os
import numpy as np
import matplotlib.pyplot as plt
np.random.seed(0)

class Environment:
  """This is a simulated environment.
  In reality for model-free tasks, we would interact with a real environment

  bandits: Number of bandits (Slot machines)
  actions: Number of arms for each bandit
  probs: Probability of success for each arm of each bandit.
  This is an array containing the probabilities for winning for each arm of each bandit.

  """

  def __init__(self, probs):
     # Success probabilities for each arm.
     # In practice, we don't know these values. We want to find them.
      self.gaussian_mean = 0
      self.gaussian_variance = 1
      self.probs = probs

  """
  Performs a step in the environment.
  You need to pick a bandit, and then you need to pick an action
  """
  def step(self, bandit, action):
      # Pull arm and get stochastic reward (1 for success, 0 for failure)
      base_reward = 1 if (np.random.random()  < self.probs[bandit][action]) else 0
      noise = np.random.normal(self.gaussian_mean, self.gaussian_variance) # Sample noise from Gaussian distribution
      return base_reward + noise # Return reward with added noise

class Agent:

    def __init__(self, nBandits, nActions, eps):
        self.nBandits = nBandits # Number of bandits
        self.nActions = nActions # Number of actions
        self.eps = eps # probability of exploration vs exploitation.
        self.n = np.zeros((nBandits, nActions), dtype=int) # action counts n(bandit, a)
        self.Q = np.zeros((nBandits, nActions), dtype=float) # value Q(bandit, a)

    def update_Q(self, bandit, action, reward):
        # Update Q action-value given (action, reward)
        self.n[bandit][action] += 1
        # A computation efficient strategy to store the value function.
        # This is the average cumilative reward per action
        self.Q[bandit][action] += (1.0/self.n[bandit][action]) * (reward - self.Q[bandit][action])

    def get_action(self):
        # Epsilon-greedy policy
        if np.random.random() < self.eps: # explore
            bandit = np.random.randint(self.nBandits)
            action = np.random.randint(self.nActions)
            return bandit, action
        else: # exploit
            # Find the indices of the maximum Q-value
            max_indices = np.argwhere(self.Q == np.amax(self.Q))  # Get all indices of max Q-values
            # Randomly select one of the maximum indices
            chosen_index = np.random.choice(max_indices.shape[0])
            bandit, action = max_indices[chosen_index]
            return bandit, action


def experiment(env, agent, probs, N_episodes):
    """Start multi-armed bandit simulation"""
    actions, rewards = [], []
    for episode in range(N_episodes):
        bandit, action = agent.get_action() # sample policy # integer correcponding to the arm taken.
        reward = env.step(bandit, action) # take step + get reward # integer 0/1
        agent.update_Q(bandit, action, reward) # update Q
        actions.append((bandit, action)) # list of tuples containing bandit action pairs
        rewards.append(reward) # list of ints
    return np.array(actions), np.array(rewards) # 500 x 1, 500 x 1

def create_bandit_probabilities(nBandits, nActions):
    """
    Creates an array of probabilities for winning for multiple bandits.

    Args:
      nBandits: The number of bandits.
      nActions: The number of actions per bandit.

    Returns:
      A NumPy array of shape (nBandits, nActions) containing probabilities.
    """
    probabilities = []
    for _ in range(nBandits):
      # Generate random numbers that will become probabilities
      random_numbers = np.random.rand(nActions)
      # Normalize the numbers to sum to 1 (creating a distribution)
      probabilities.append(random_numbers / np.sum(random_numbers))

    #probabilities[0][3] = 0.9
    #probabilities[0][4] = 0.1
    #for i in range(nActions):
    #  if i != 3 and i != 4:
    #    probabilities[0][i] = 0

    return np.array(probabilities)

# Settings
probs = create_bandit_probabilities(10, 10)

# This is the ground truth we don't know initially.
N_steps = 500 # number of steps (per episode)
N_experiments = 5000 # number of experiments to perform
eps = 0.1 # probability of random exploration (fraction)
save_fig = True # save file in same directory
output_dir = os.path.join(os.getcwd(), "output")
env = Environment(probs) # initialize arm probabilities
agent = Agent(len(env.probs), len(env.probs[0]), eps)  # initialize agent
####

# Run multi-armed bandit experiments
print("Running multi-armed bandits with nActions = {}, eps = {}".format(len(probs), eps))
R = np.zeros((N_steps,))  # reward history sum. 500 x 1
A = np.zeros((N_steps, len(probs), len(probs[0])))  # action history sum 500 x nBandits x nAactions
steps = np.zeros((N_experiments,)) # steps history 500 x 1
avg_rewards = np.zeros((N_experiments,)) # average rewards history 500 x 1
print("Running {} experiments".format(N_experiments))
for i in range(N_experiments):
    actions, rewards = experiment(env, agent, probs, N_steps)  # perform experiment. actions = 500x1, rewards = 500x1
    if i == 0:
      steps[0] = N_steps
    else:
      steps[i] = steps[i-1] + N_steps
    avg_rewards[i] = np.sum(rewards) / len(rewards)
    if (i + 1) % (N_experiments / 100) == 0:
        print("[Experiment {}/{}] ".format(i + 1, N_experiments) +
              "n_steps = {}, ".format(N_steps) +
              "reward_avg = {}".format(np.sum(rewards) / len(rewards)))
    R += rewards # Adding rewards for every time step across all experiments: 500x1
    for j, a in enumerate(actions):
        # step_index x num_actions = 500 x 10.
        # Each cell holds number of actions a in time step j across all experiments
        A[j][a[0]][a[1]] += 1

Running multi-armed bandits with nActions = 10, eps = 0.1
Running 5000 experiments
[Experiment 50/5000] n_steps = 500, reward_avg = 0.1474520487272523
[Experiment 100/5000] n_steps = 500, reward_avg = 0.26158514876457356
[Experiment 150/5000] n_steps = 500, reward_avg = 0.22187519529275454
[Experiment 200/5000] n_steps = 500, reward_avg = 0.24634795499051407
[Experiment 250/5000] n_steps = 500, reward_avg = 0.2894358048908714
[Experiment 300/5000] n_steps = 500, reward_avg = 0.3259415358088585
[Experiment 350/5000] n_steps = 500, reward_avg = 0.28030234941310495
[Experiment 400/5000] n_steps = 500, reward_avg = 0.3266505289951367
[Experiment 450/5000] n_steps = 500, reward_avg = 0.32110938694555863
[Experiment 500/5000] n_steps = 500, reward_avg = 0.18638222218807562
[Experiment 550/5000] n_steps = 500, reward_avg = 0.2546778493671617
[Experiment 600/5000] n_steps = 500, reward_avg = 0.188414495940578
[Experiment 650/5000] n_steps = 500, reward_avg = 0.22327016884385079
[Experiment 700

In [23]:
import numpy as np

# Assuming A is your 3D array with shape (N_steps, N_bandits, N_actions)
N_steps, N_bandits, N_actions = A.shape

for bandit in range(N_bandits):
    for action in range(N_actions):
        print(f"Bandit {bandit}, Action {action}: {np.sum(A[:, bandit, action])}")

Bandit 0, Action 0: 2482.0
Bandit 0, Action 1: 2709.0
Bandit 0, Action 2: 2501.0
Bandit 0, Action 3: 2720.0
Bandit 0, Action 4: 2540.0
Bandit 0, Action 5: 2577.0
Bandit 0, Action 6: 2505.0
Bandit 0, Action 7: 2624.0
Bandit 0, Action 8: 2527.0
Bandit 0, Action 9: 2551.0
Bandit 1, Action 0: 2948.0
Bandit 1, Action 1: 2735.0
Bandit 1, Action 2: 2854.0
Bandit 1, Action 3: 2569.0
Bandit 1, Action 4: 2565.0
Bandit 1, Action 5: 2559.0
Bandit 1, Action 6: 2706.0
Bandit 1, Action 7: 2616.0
Bandit 1, Action 8: 2481.0
Bandit 1, Action 9: 2654.0
Bandit 2, Action 0: 2614.0
Bandit 2, Action 1: 2528.0
Bandit 2, Action 2: 2610.0
Bandit 2, Action 3: 2628.0
Bandit 2, Action 4: 2536.0
Bandit 2, Action 5: 2626.0
Bandit 2, Action 6: 2504.0
Bandit 2, Action 7: 2500.0
Bandit 2, Action 8: 2541.0
Bandit 2, Action 9: 2452.0
Bandit 3, Action 0: 2547.0
Bandit 3, Action 1: 2544.0
Bandit 3, Action 2: 2431.0
Bandit 3, Action 3: 2626.0
Bandit 3, Action 4: 2475.0
Bandit 3, Action 5: 2537.0
Bandit 3, Action 6: 2640.0
B