In [None]:
import sys, os
if 'google.colab' in sys.modules and not os.path.exists('.setup_complete'):
    !wget -q https://raw.githubusercontent.com/yandexdataschool/Practical_RL/master/setup_colab.sh -O- | bash
    !pip -q install gymnasium
    !touch .setup_complete

# This code creates a virtual display to draw game images on.
# It will have no effect if your machine has a monitor.
if type(os.environ.get("DISPLAY")) is not str or len(os.environ.get("DISPLAY")) == 0:
    !bash ../xvfb start
    os.environ['DISPLAY'] = ':1'

In [None]:
from abc import ABCMeta, abstractmethod, abstractproperty
import enum

import numpy as np
np.set_printoptions(precision=3)
np.set_printoptions(suppress=True)

import matplotlib.pyplot as plt
import seaborn as sns

sns.set()
%matplotlib inline

## Bernoulli Bandit

We are going to implement several exploration strategies for simplest problem - bernoulli bandit.

The bandit has $K$ actions. Action produce 1.0 reward $r$ with probability $0 \le \theta_k \le 1$ which is unknown to agent, but fixed over time. Agent's objective is to minimize regret over fixed number $T$ of action selections:

$$\rho = T\theta^* - \sum_{t=1}^T \theta_{A_t}$$

Where $\theta^* = \max_k\{\theta_k\}$

**Real-world analogy:**

Clinical trials - we have $K$ pills and $T$ ill patient. After taking pill, patient is cured with probability $\theta_k$. Task is to find most efficient pill.

A research on clinical trials - https://arxiv.org/pdf/1507.08025.pdf

In [None]:
class BernoulliBandit:
    def __init__(self, n_actions=5):
        self._probs = np.random.random(n_actions)

    @property
    def action_count(self):
        return len(self._probs)

    def pull(self, action):
        if np.any(np.random.random() > self._probs[action]):
            return 0.0
        return 1.0

    def optimal_reward(self):
        """Used for regret calculation"""
        return np.max(self._probs)

    def action_value(self, action):
        """Used for regret calculation"""
        return self._probs[action]

    def step(self):
        """Used in nonstationary version"""
        pass

    def reset(self):
        """Used in nonstationary version"""

In [None]:
class AbstractAgent(metaclass=ABCMeta):
    def init_actions(self, n_actions):
        self._successes = np.zeros(n_actions)
        self._failures = np.zeros(n_actions)
        self._total_pulls = 0

    @abstractmethod
    def get_action(self):
        """
        Get current best action
        :rtype: int
        """
        pass

    def update(self, action, reward):
        """
        Observe reward from action and update agent's internal parameters
        :type action: int
        :type reward: int
        """
        self._total_pulls += 1
        if reward == 1:
            self._successes[action] += 1
        else:
            self._failures[action] += 1

    @property
    def name(self):
        return self.__class__.__name__


class RandomAgent(AbstractAgent):
    def get_action(self):
        return #YOUR CODE

### Epsilon-greedy agent

**for** $t = 1,2,...$ **do**

&nbsp;&nbsp; **for** $k = 1,...,K$ **do**

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $\hat\theta_k \leftarrow \alpha_k / (\alpha_k + \beta_k)$

&nbsp;&nbsp; **end for**

&nbsp;&nbsp; $x_t \leftarrow argmax_{k}\hat\theta$ with probability $1 - \epsilon$ or random action with probability $\epsilon$

&nbsp;&nbsp; Apply $x_t$ and observe $r_t$

&nbsp;&nbsp; $(\alpha_{x_t}, \beta_{x_t}) \leftarrow (\alpha_{x_t}, \beta_{x_t}) + (r_t, 1-r_t)$

**end for**

Implement the algorithm above in the cell below:

In [None]:
class EpsilonGreedyAgent(AbstractAgent):
    def __init__(self, epsilon=0.01):
        self._epsilon = epsilon

    def get_action(self):
        if np.random.random() < self._epsilon:
            return #YOUR CODE
        else:
            return #YOUR CODE

    @property
    def name(self):
        return self.__class__.__name__ + "(epsilon={})".format(self._epsilon)

### UCB Agent
Epsilon-greedy strategy have no preference for actions. It would be better to select among actions that are uncertain or have potential to be optimal. One can come up with idea of index for each action that represents optimality and uncertainty at the same time. One efficient way to do it is to use UCB1 algorithm:

**for** $t = 1,2,...$ **do**

&nbsp;&nbsp; **for** $k = 1,...,K$ **do**

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; $w_k \leftarrow \alpha_k / (\alpha_k + \beta_k) + \sqrt{2\log(t) \ / \ (\alpha_k + \beta_k)}$

&nbsp;&nbsp; **end for**

&nbsp;&nbsp; **end for**
 $x_t \leftarrow argmax_{k}w$

&nbsp;&nbsp; Apply $x_t$ and observe $r_t$

&nbsp;&nbsp; $(\alpha_{x_t}, \beta_{x_t}) \leftarrow (\alpha_{x_t}, \beta_{x_t}) + (r_t, 1-r_t)$

**end for**

__Note:__ in practice, one can multiply $\sqrt{2\log(t) \ / \ (\alpha_k + \beta_k)}$ by some tunable parameter to regulate agent's optimism and wilingness to abandon non-promising actions.

More versions and optimality analysis - https://homes.di.unimi.it/~cesabian/Pubblicazioni/ml-02.pdf

In [None]:
class UCBAgent(AbstractAgent):
    def get_action(self):
        mean_reward = #YOUR CODE
        numerator = #YOUR CODE
        denominator = #YOUR CODE
        return np.argmax(
            mean_reward + np.sqrt(2 * numerator / denominator)
        )

### Thompson sampling

UCB1 algorithm does not take into account actual distribution of rewards. If we know the distribution - we can do much better by using Thompson sampling:

**for** $t = 1,2,...$ **do**

&nbsp;&nbsp; **for** $k = 1,...,K$ **do**

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Sample $\hat\theta_k \sim beta(\alpha_k, \beta_k)$

&nbsp;&nbsp; **end for**

&nbsp;&nbsp; $x_t \leftarrow argmax_{k}\hat\theta$

&nbsp;&nbsp; Apply $x_t$ and observe $r_t$

&nbsp;&nbsp; $(\alpha_{x_t}, \beta_{x_t}) \leftarrow (\alpha_{x_t}, \beta_{x_t}) + (r_t, 1-r_t)$

**end for**


More on Thompson Sampling:
https://web.stanford.edu/~bvr/pubs/TS_Tutorial.pdf

In [None]:
class ThompsonSamplingAgent(AbstractAgent):
    def get_action(self):
        return #YOUR CODE

In [None]:
from collections import OrderedDict


def get_regret(env, agents, n_steps=5000, n_trials=50):
    scores = OrderedDict(
        {agent.name: [0.0 for step in range(n_steps)] for agent in agents}
    )

    for trial in range(n_trials):
        env.reset()

        for a in agents:
            a.init_actions(env.action_count)

        for i in range(n_steps):
            optimal_reward = env.optimal_reward()

            for agent in agents:
                action = agent.get_action()
                reward = env.pull(action)
                agent.update(action, reward)
                scores[agent.name][i] += optimal_reward - env.action_value(action)

            env.step()  # change bandit's state if it is unstationary

    for agent in agents:
        scores[agent.name] = np.cumsum(scores[agent.name]) / n_trials

    return scores


def plot_regret(agents, scores):
    plt.figure(figsize=(12, 8))
    for agent in agents:
        plt.plot(scores[agent.name])
    plt.legend([agent.name for agent in agents])

    plt.ylabel("regret")
    plt.xlabel("steps")

    plt.show()

In [None]:
# Uncomment agents
agents = [RandomAgent(), EpsilonGreedyAgent(), UCBAgent(), ThompsonSamplingAgent()]

regret = get_regret(BernoulliBandit(), agents, n_steps=1000, n_trials=1)
plot_regret(agents, regret)

# Contextual Bandits

## Linear UCB

The LinearUCB algorithm involves the following steps:
$\hat{r}_{t,a} = x_{t,a}^T \theta_a + \alpha \sqrt{x_{t,a}^T A_a^{-1} x_{t,a}}$

1. **Initialization**: For each action $a$, initialize:
   - $A_a = I_d$, the identity matrix.
   - $b_a = 0$, the zero vector.
2. **Loop for each round $t$**:
   - **Contextual Features**: Receive $x_{t,a}$ for each action $a$.
   - **Prediction and Selection**: Calculate for each action $a$: $\hat{r}_{t,a} = x_{t,a}^T \theta_a + \alpha \sqrt{x_{t,a}^T A_a^{-1} x_{t,a}}$ where $\theta_a = A_a^{-1} b_a$. Select action $a_t$ with the highest $\hat{r}_{t,a}$.
   - **Reward**: Observe reward $r_{t,a_t}$ for chosen action.
   - **Update**: Update $A_{a_t}$ and $b_{a_t}$ as follows: $A_{a_t} = A_{a_t} + x_{t,a_t} x_{t,a_t}^T, b_{a_t} = b_{a_t} + r_{t,a_t} x_{t,a_t}$

This method dynamically adjusts between exploring new actions and exploiting known rewards, using the context of decisions.


In [None]:
import gymnasium as gym
from gymnasium import spaces


class LinearContextualBanditEnv(gym.Env):

    def __init__(self, n_actions, n_contexts, context_dim):
        super().__init__()
        self.n_actions = n_actions
        self.n_contexts = n_contexts
        self.context_dim = context_dim

        self.contexts = np.random.randn(n_contexts, context_dim)
        self.theta = np.random.randn(context_dim, n_actions)
        self.reward_mean = self.contexts @ self.theta

        self.action_space = spaces.Discrete(n_actions)
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, shape=(context_dim,), dtype=np.float32
        )
        self.context_index = None

    def reset(self, **kwargs):
        # Generate a new context randomly
        self.context_index = np.random.choice(self.n_contexts)
        return self.contexts[self.context_index], {}

    def step(self, action):
        assert self.action_space.contains(action), action

        # Define reward logic based on the current context and chosen action
        self.context_index = np.random.choice(self.n_contexts)
        reward = self._calculate_reward(action, self.context_index)
        return self.contexts[self.context_index], reward, True, False, {}

    def get_context_index(self):
        return self.context_index

    def _calculate_reward(self, action, context_index):
        # Implement the logic to calculate the reward based on the action and context
        reward = self.reward_mean[context_index, action] + np.random.randn()
        return reward

    def render(self):
        # Optional: Implement rendering for human-readable output
        pass

In [None]:
class VanillaUCB:
    def __init__(self, n_contexts, n_actions):
        self.n_actions = n_actions
        self.n_contexts = n_contexts

        self.counts = np.zeros((self.n_contexts, n_actions))  # Count of actions taken
        self.values = np.zeros(
            (self.n_contexts, n_actions)
        )  # Estimated values of actions
        self.total_count = np.zeros(self.n_contexts)  # Total count of actions taken

    def get_action(self, context_index):
        numerator = #YOUR CODE
        denominator = #YOUR CODE
        upper_bound = self.values[context_index] + np.sqrt(2 * numerator / denominator)
        action = np.argmax(upper_bound)
        self.total_count[context_index] += 1
        return action

    def update(self, context_index, action, reward):
        self.counts[context_index, action] += 1
        n = self.counts[context_index, action]
        value = self.values[context_index, action]
        new_value = #YOUR CODE
        self.values[context_index, action] = new_value


class LinearUCB:
    def __init__(self, alpha, context_dim, n_actions):
        self.alpha = alpha  # exploration parameter
        self.context_dim = context_dim  # number of features
        self.n_actions = n_actions
        self.A = np.array(
            [np.identity(self.context_dim) for _ in range(self.n_actions)]
        )
        self.A_inv = self.A.copy()
        self.b = np.array([np.zeros(self.context_dim) for _ in range(self.n_actions)])

    def get_action(self, context):
        theta = #YOUR CODE
        p = #YOUR CODE
        return np.argmax(p)

    def update(self, context, action, reward):
        self.A[action] += #YOUR CODE
        self.A_inv[action] = np.linalg.inv(self.A[action])
        self.b[action] += #YOUR CODE

In [None]:
T = 1000
n_actions = 2
n_contexts = 5
context_dim = 4
alpha = 1.0  # exploration parameter

env = LinearContextualBanditEnv(n_actions, n_contexts, context_dim)
linear_ucb = LinearUCB(alpha, context_dim, n_actions)
vanilla_ucb = VanillaUCB(n_contexts, n_actions)

linear_regrets = [0]
vanilla_regrets = [0]

context, _ = env.reset()

# VanillaUCB
for t in range(T):
    action = linear_ucb.get_action(context)
    reward_mean = env.reward_mean[env.context_index]

    next_context, reward, _, _, _ = env.step(action)
    linear_ucb.update(context, action, reward)
    context = next_context
    linear_regrets.append(linear_regrets[-1] + reward_mean.max() - reward_mean[action])

env.reset()
context_index = env.get_context_index()
# LinearUCB
for t in range(T):

    action = vanilla_ucb.get_action(context_index)
    reward_mean = env.reward_mean[env.context_index]

    _, reward, _, _, _ = env.step(action)
    vanilla_ucb.update(context_index, action, reward)
    context_index = env.get_context_index()
    vanilla_regrets.append(
        vanilla_regrets[-1] + reward_mean.max() - reward_mean[action]
    )

plt.plot(linear_regrets[1:], label="LinearUCB")
plt.plot(vanilla_regrets[1:], label="VanillaUCB")
plt.legend()