In [1]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm.auto import tqdm

In [2]:
class IndependentPricingEnv: 
    
    def __init__(self, actions, curves, n_users, seed=0):
        assert curves.shape == actions.shape, "Shape of the action not coherent"
        self.n_products = actions.shape[0]
        self.n_actions = actions.shape[1]
        self.n_users = n_users
        self.actions = actions # dim 0: product; dim 1: actions
        self.curves = curves # dim 0: product; dim 1: actions
        self.actions_to_idx = {}
        for itm in range(self.n_products):
            self.actions_to_idx[itm] = {}
        for itm in range(self.n_products):
            for i, act in enumerate(self.actions[itm, :]):
                self.actions_to_idx[itm][act] = i
        self.reset(seed)  
    
    def step(self, action, convrate_only=True):
        assert action.ndim == 1, "The action must be 1-dimensional"
        assert action.shape[0] == self.n_products, "The action must be of dimension n_products"
        sales = np.zeros((self.n_products, self.n_users))
        for itm in range(self.n_products):
            sales[itm, :] = self.curves[itm, self.actions_to_idx[itm][action[itm]]]
        sales = sales > np.random.uniform(0, 1, sales.shape)
        if convrate_only:
            return sales.sum(axis=1) / self.n_users
        else: 
            sales
                
    
    def reset(self, seed=0):
        np.random.seed(seed)

In [5]:
class OptimisticKernelizedBanditAgent: 

    
    def __init__(self, horizon, actions, sigma=0.5, make_plots=True):
        self.horizon = horizon
        self.sigma_process = sigma  
        self.actions = actions # shape (action_number, action_dimension)
        self.action_dim = self.actions.shape[1]
        self.n_actions = self.actions.shape[0]
        self.make_plots = make_plots
        self.action_enum = np.linspace(0, self.n_actions-1, self.n_actions, dtype=int)
        self.reset()

        
    def pull(self):
        if self.nosamples:
            self.last_action = self.actions[np.random.choice(self.action_enum), :]
        else:
            beta = np.log(self.horizon)
            self.last_action = self.actions[np.argmax(self.mu + beta * self.sigma)]
        return self.last_action
            
    
    def update(self, reward):
        if self.x_vect is None:
            self.x_vect = np.array([self.last_action]).reshape(1, self.action_dim)
            self.y_vect = np.array([reward]).reshape(1, 1)
        else:
            self.x_vect = np.vstack((self.x_vect, np.array([self.last_action]).reshape(1, self.action_dim)))
            self.y_vect = np.vstack((self.y_vect, np.array([reward]).reshape(1, 1)))

    
    def compute(self):
        k = rbf(self.x_vect, self.x_vect)
        k = k + (self.sigma_process * np.eye(self.y_vect.shape[0]))
        k_inv = np.linalg.inv(k)  # can be improved (see, e.g., https://doi.org/10.1016/S0898-1221(01)00278-4)
        self.mu = np.zeros(self.n_actions)
        self.sigma = np.zeros(self.n_actions)
        for i, x_plt_i in enumerate(list(self.actions)):
            k_star = rbf(self.x_vect, np.array([x_plt_i]).reshape(1, self.action_dim))
            self.mu[i] = k_star.T @ k_inv @ self.y_vect
            self.sigma[i] = 1 - k_star.T @ k_inv @ k_star
        self.nosamples = False
        return self.actions, self.mu, self.sigma

    
    def reset(self):
        self.nosamples = True
        self.x_vect = None
        self.y_vect = None
    

    def compute(self):
        k = rbf(self.x_vect, self.x_vect)
        k = k + (self.sigma_process * np.eye(self.y_vect.shape[0]))
        k_inv = np.linalg.inv(k)  # can be improved (see, e.g., https://doi.org/10.1016/S0898-1221(01)00278-4)
        self.mu = np.zeros(self.n_actions)
        self.sigma = np.zeros(self.n_actions)
        for i, x_plt_i in enumerate(list(self.actions)):
            k_star = rbf(self.x_vect, np.array([x_plt_i]).reshape(1, self.action_dim))
            self.mu[i] = k_star.T @ k_inv @ self.y_vect
            self.sigma[i] = 1 - k_star.T @ k_inv @ k_star
        self.nosamples = False
        return self.actions, self.mu, self.sigma

In [7]:
class Runner: 

    
    def __init__(self, agent, env):
        self.agent = agent
        self.env = env

    
    def run_simulations(self, horizon, n_runs, draw=False, draw_every=10): 
        # actions are in [0, 1] for simplicity
        for run_i in range(n_runs):
            self.agent.reset()
            self.env.reset(seed=run_i)
            plt.figure()
            cols = 2
            f, ax = plt.subplots(int(horizon/(draw_every)), cols, figsize=(cols*6,horizon/draw_every*4))
            actions = np.zeros((horizon, self.agent.action_dim))
            rewards = np.zeros(horizon)
            plot_count = 0
            for t in tqdm(range(horizon)):
                actions[t, :] = self.agent.pull()
                rewards[t] = self.env.step(actions[t, :])
                self.agent.update(rewards[t])
                if draw and t % draw_every == 0:
                    # int(t/cols), t % cols
                    x_plt, mu, sigma = self.agent.compute()
                    ax[plot_count, 0].plot(x_plt.ravel(), mu)
                    ax[plot_count, 0].plot(x_plt.ravel(), self.env.mean0(x_plt))
                    ax[plot_count, 0].fill_between(x_plt.ravel(), mu-sigma, mu+sigma, alpha=0.3)
                    ax[plot_count, 0].set_title("t = " + str(t+1))
                    ax[plot_count, 0].scatter(actions[:t+1], rewards[:t+1])
                    ax[plot_count, 0].set_xlim([0, 1])
                    ax[plot_count, 0].set_ylim([-0.1, 1.1])
                    ax[plot_count, 1].hist(actions[:t+1], bins=11, alpha=0.1)
                    ax[plot_count, 1].set_xlim([0, 1])
                    for act in np.unique(actions[:t+1]):
                        avg = rewards[:t+1]
                        mask = actions[:t+1] == act
                        avg = np.mean(avg[mask.ravel()])
                        ax[plot_count, 0].scatter(act, avg, color="r")
                    plot_count = plot_count + 1
            """
            fig = plt.figure()
            ax = fig.add_subplot(projection='3d')
            ax.scatter(x_plt[:, 0], x_plt[:, 1], mu, marker="o")
            ax.scatter(x_plt[:, 0], x_plt[:, 1], 
                       self.env.mean0(x_plt[:, 0]) * self.env.mean1(x_plt[:, 1]), marker="x")
            """

In [9]:
# %matplotlib qt

horizon = 200

mean0 = lambda x : (np.power(x, 2) - np.power(x, 3)) * 2 + 0.3
mean1 = lambda x : (np.power(1-x, 2) - np.power(1-x, 3)) * 2 + 0.3

actions0 = np.linspace(0, 1, 6)
actions1 = np.linspace(0, 1, 6)
actions0, actions1 = np.meshgrid(actions0, actions1)
actions = np.hstack((actions0.ravel().reshape(-1, 1), actions1.ravel().reshape(-1, 1)))


agent = OptimisticKernelizedBanditAgent(horizon=horizon,
        actions=np.linspace(0, 1, 11).reshape(-1, 1))
env2d = BernoulliEnvironmentTwoDims(mean0, mean1)
env1d = BernoulliEnvironmentOneDim(mean0)
runner = Runner(agent, env1d)
runner.run_simulations(horizon=horizon, n_runs=1, draw=True, draw_every=2)

NameError: name 'BernoulliEnvironmentTwoDims' is not defined