In [9]:
import bandit as b
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from itertools import product
from BaseAgent import AbstractAgent
np.random.seed(0)

# Multi-Armed Bandits: 

Three medicine where the effectiveness varies randomly over time. 

non-stationary MAB scenario? bc the reward distributions are changing over time.

In [2]:
bandit_final_env =b.Bandits_final()

The agent implements a variant of the Upper Confidence Bound (UCB) algorithm, a popular method in multi-armed bandit problems. It uses a sliding window mechanism to adapt to potentially non-stationary environments (where the properties of the environment can change over time).

1. Action Selection with UCB: This part calculates the Upper Confidence Bound for each action (or "arm") based on recent rewards. It balances exploration (trying new actions) and exploitation (using actions known to yield high rewards). Actions with fewer trials or high uncertainty get a "bonus" to their estimated value, encouraging exploration.
2. Sliding Window: This mechanism ensures that the agent's decisions are based on recent experiences by maintaining a fixed-size list of the most recent rewards for each action. This helps adapt to changes in the environment's dynamics.
3. Learning from Interaction: After performing an action and observing a reward, the agent updates its knowledge (the sliding windows of rewards)

In [13]:
class SlidingWindowUCB:
    def __init__(self, k, window_size=50, c=1):
        self.k = k
        self.window_size = window_size
        self.c = c
        self.windows = {i: [] for i in range(k)}
        self.t = 0
    
    def select_action(self):
        self.t += 1
        ucb_values = []
        for arm in range(self.k):
            rewards = self.windows[arm]
            if len(rewards) < 1:
                ucb_values.append(float('inf'))
            else:
                mean_reward = np.mean(rewards)
                bonus = self.c * np.sqrt(np.log(self.t) / len(rewards))
                ucb_values.append(mean_reward + bonus)
        return np.argmax(ucb_values)
    
    def update(self, action, reward):
        rewards = self.windows[action]
        if len(rewards) >= self.window_size:
            rewards.pop(0)
        rewards.append(reward)


agent = SlidingWindowUCB(k=3, window_size=50, c=1)

total_reward = 0
for episode in range(1000):
    obs, reward, terminated, truncated, info = bandit_final_env.reset()
    while not terminated:
        action = agent.select_action()
        _, reward, terminated, _, _ = bandit_final_env.step(action)
        agent.update(action, reward)
        total_reward += reward

print("Total Reward:", total_reward)

Total Reward: 4130.602790310084


# Decay epsilon greedy:

In [14]:
def calculate_regret(mab, cumulative_reward, t, print_=False):
    if mab.state is not None:
        optimal_reward = mab.means[mab.state][mab.get_optimal_action()] * t
    else:
        optimal_reward = mab.means[mab.get_optimal_action()] * t
    #if print_:
       # print("optimal reward at time step", t, ":", optimal_reward)
        #print("cumulative reward at time step", t, ":", cumulative_reward)
    return optimal_reward - cumulative_reward

 
def update_expected_reward(expected_reward,action_count,current_action,reward):
    expected_reward[current_action]+=(1/action_count[current_action]) * (reward-expected_reward[current_action])

def get_confidence_bounds(t,action_count):
    return np.sqrt(2*np.log(t)/action_count)
## here i am calculating the 


In [15]:
def decaying_epsilon_greedy(mab,T,epsilon, alpha,print_=False):
    mab.reset()

    # initialise the expected reward and action count and cumulative reward
    expected_reward = np.ones(mab.k)*100
    action_count = np.zeros(mab.k)
    cumulative_rewards = 0
    
    for _ in range(T):
        if np.random.uniform(0,1) < epsilon:
            current_action = np.random.randint(mab.k)
        else:
            current_action = np.argmax(expected_reward)
            
        current_state = mab.step(current_action)        # maybe not state...
        cumulative_rewards += current_state[1]
        action_count[current_action] += 1
        update_expected_reward(expected_reward,action_count,current_action,current_state[1])
        epsilon*=alpha
    
    if print_:
        for i in range(mab.k):
            print("Expected reward action", i, ":", expected_reward[i], "Action count:", action_count[i])

    regret = calculate_regret(mab, cumulative_rewards, T,print_)   

    if print_:
        print('-------------------------------------\n\n')

    return np.argmax(expected_reward), regret

In [16]:
print(decaying_epsilon_greedy(bandit_final_env,1000,0.5,0.99))

(2, -167.20138791448608)


In [17]:
def epsilon_greedy(mab,T,epsilon,print_=False):
    mab.reset()

    # initialise the expected reward and action count and cumulative reward
    expected_reward = np.ones(mab.k)*1000
    action_count = np.zeros(mab.k)
    cumulative_rewards = 0
    regret2 = np.zeros(T)
    
    for t in range(T):
        if np.random.uniform(0,1) < epsilon:
            current_action = np.random.randint(mab.k)
        else:
            current_action = np.argmax(expected_reward)
            
        current_state = mab.step(current_action)        # maybe not state...
        cumulative_rewards += current_state[1]
        action_count[current_action] += 1
        update_expected_reward(expected_reward,action_count,current_action,current_state[1])
        regret2[t] = calculate_regret(mab, cumulative_rewards, t,print_)
    if print_:
        for i in range(mab.k):
            print("Expected reward action", i, ":", expected_reward[i], "Action count:", action_count[i])
            
        
    regret = calculate_regret(mab, cumulative_rewards, T,print_)   

    if print_:
        print("regert", regret2)
        print('-------------------------------------\n\n')

    return np.argmax(expected_reward), regret2


In [18]:
def ucb(mab,T,c,print_=False):
    mab.reset()

    # initialise the expected reward and action count and cumulative reward
    expected_reward = np.zeros(mab.k)
    action_count = np.zeros(mab.k)
    cumulative_rewards =0

    # initialisation, pull each arm once and use the rewards to initialise the expected reward and action count
    # OBS! not a smart thing to do in real hospital setting, with trying drugs on patients ...
    # (needed to avoid dividing by zero in the first iteration)
    for i in range(mab.k):
        expected_reward[i] = mab.step(i)[1]
        action_count[i] += 1
    
    for t in range(mab.k+1,T+1):
        confidence_bounds = get_confidence_bounds(t,action_count)
        best_action = np.argmax(expected_reward + c*confidence_bounds)

        current_state = mab.step(best_action)        # maybe not state...
        cumulative_rewards += current_state[1]
        action_count[best_action] += 1
        update_expected_reward(expected_reward,action_count,best_action,current_state[1])
    
    if print_:
        for i in range(mab.k):
            print("Expected reward action", i, ":", expected_reward[i], "Action count:", action_count[i])
        
    regret = calculate_regret(mab, cumulative_rewards, T,print_)   

    if print_:
        print('-------------------------------------\n\n')

    return np.argmax(expected_reward), regret

ucb(bandit_final_env,1000,0.5)

NameError: name 'b1' is not defined

In [None]:
def run_experiments(mab, T, epsilon, alpha, c, print_=False):
    eps_greed=epsilon_greedy(mab,T,epsilon,print_)
    
    dec_greed=decaying_epsilon_greedy(mab,T,epsilon,alpha,print_)
    ucb_res=ucb(mab,T,c,print_)
    if print_:
        print(f'epsilon greedy:          medicine: {eps_greed[0]}  regret: {eps_greed[1]}')
        print(f'decaying epsiol greedy:  medicine: {dec_greed[0]}  regret: {dec_greed[1]}')
        print(f'ucb:                     medicine: {ucb_res[0]}  regret: {ucb_res[1]}')
        print('-------------------------------------\n')
    return eps_greed,dec_greed,ucb_res

run_experiments(bandit_final_env,1000,0.5,0.99,0.5,True)