In [2]:
import numpy as np 
from numpy.linalg import pinv
from scipy.stats import bernoulli
from math import log

import random
import sys
import matplotlib.pyplot as plt
%matplotlib inline


In [3]:
class LinTS:
    def __init__(self, d, sigma, lambda_reg=1.0, alpha=1.0):
        """
        LinTS algorithm for linear contextual bandits.

        Parameters:
        - d: Dimension of the action space.
        - sigma: Standard deviation of the noise (assumes sub-Gaussian noise).
        - lambda_reg: Regularization parameter for the covariance matrix.
        - alpha: Scaling factor for exploration (optional, typically set to 1.0).
        """
        self.d = d  # Dimension of the feature space
        self.sigma = sigma  # Noise standard deviation
        self.lambda_reg = lambda_reg  # Regularization parameter
        self.alpha = alpha  # Exploration scaling factor
        self.reset()

    def reset(self):
        """
        Reset all internal estimators and counters for independent experiments.
        """
        self.t = 0  # Time step counter
        self.hat_theta = np.zeros(self.d)  # Estimated parameter vector
        self.cov = 1/self.lambda_reg * np.identity(self.d)  # Covariance matrix (B_t)
        self.invcov = pinv(self.cov)  # Inverse of the covariance matrix
        self.b_t = np.zeros(self.d)  # Accumulated rewards vector

    def get_action(self, arms):
        """
        Choose an action using the LinTS strategy.

        Parameters:
        - arms: Array of shape (K, d) representing the available actions.

        Returns:
        - The chosen action (vector of dimension d).
        """
        # Sample a parameter vector from the posterior distribution
        sampled_theta = np.random.multivariate_normal(self.hat_theta, self.invcov)
        estimated_rewards = np.dot(arms, sampled_theta)
        best_arm_idx = np.argmax(estimated_rewards) # Choose the action with the highest estimated reward
        return arms[best_arm_idx]

    def receive_reward(self, chosen_arm, reward):
        """
        Update the internal model with the observed reward for the chosen action.

        Parameters:
        - chosen_arm: The action (vector) that was chosen.
        - reward: The observed reward.
        """
        # Update covariance matrix and its inverse
        self.cov += 1/self.sigma**2*np.outer(chosen_arm, chosen_arm)
        self.invcov = pinv(self.cov)

        # Update the b_t vector with the reward
        self.b_t += 1/self.sigma**2*(reward * chosen_arm)

        # Update the least squares estimate of theta
        self.hat_theta = np.dot(self.invcov, self.b_t)
        self.t += 1

    def name(self):
        """
        Returns the name of the algorithm for identification.
        """
        return f'LinTS(alpha={self.alpha})'

In [4]:
def play(environment, agent, Nmc, T, pseudo_regret=True):
    """
    Play one Nmc trajectories over a horizon T for the specified agent.
    Return the agent's name (sring) and the collected data in an nd-array.
    """

    data = np.zeros((Nmc, T))



    for n in range(Nmc):
        agent.reset()
        for t in range(T):
            action_set = environment.get_action_set()
            action = agent.get_action(action_set)
            reward = environment.get_reward(action)
            agent.receive_reward(action,reward)

            # compute instant (pseudo) regret
            means = environment.get_means()
            best_reward = np.max(means)
            if pseudo_regret:
              # pseudo-regret removes some of the noise and corresponds to the metric studied in class
              data[n,t] = best_reward - np.dot(environment.theta,action)
            else:
              data[n,t]= best_reward - reward # this can be negative due to the noise, but on average it's positive

    return agent.name(), data


def experiment(environment, agents, Nmc, T,pseudo_regret=True):
    """
    Play Nmc trajectories for all agents over a horizon T. Store all the data in a dictionary.
    """

    all_data = {}

    for agent in agents:
        agent_id, regrets = play(environment, agent,Nmc, T,pseudo_regret)

        all_data[agent_id] = regrets

    return all_data

In [5]:
def ActionsGenerator(K, d, mean=None):
    """
    Generate K actions in a d-dimensional space uniformly sampled on the unit sphere.

    Parameters:
    - K (int): Number of actions to generate.
    - d (int): Dimension of the action space.
    - mean (array-like, optional): Mean vector for generating actions. Defaults to the origin.

    Returns:
    - np.ndarray: An array of shape (K, d) with K action vectors uniformly distributed on the unit sphere.
    """
    # Generate K random points from a normal distribution
    random_points = np.random.normal(size=(K, d))
    
    # Normalize each vector to lie on the unit sphere
    norms = np.linalg.norm(random_points, axis=1, keepdims=True)
    unit_sphere_points = random_points / norms

    # Optionally adjust the mean of the distribution
    if mean is not None:
        mean = np.array(mean).reshape(1, -1)  # Ensure mean is compatible with unit_sphere_points
        unit_sphere_points += mean
        # Re-normalize to ensure points still lie on the sphere
        unit_sphere_points = unit_sphere_points / np.linalg.norm(unit_sphere_points, axis=1, keepdims=True)

    return unit_sphere_points

In [6]:
class LinearBandit:

    def __init__(self, theta, K, var=1., fixed_actions=None):
      """
      theta: d-dimensional vector (bounded) representing the hidden parameter
      K: number of actions per round (random action vectors generated each time)
      pb_type: string in 'fixed', 'iid', 'nsr' (please ignore NotSoRandom)
      """
      self.d = np.size(theta)
      self.theta = theta
      self.K = K
      self.var = var
      self.current_action_set = np.zeros(self.d)
      if fixed_actions is not None:
         self.pb_type = 'fixed'
         self.fixed_actions = fixed_actions
      else :
         self.pb_type = 'iid'


    def get_action_set(self):
      """
      Generates a set of vectors in dimension self.d. Use your ActionsGenerator
      Alternatively, the set of actions is fixed a priori (given as input).
      Implement a condition to return the fixed set when one is given
      """
      if self.pb_type == 'fixed':
          # If fixed actions are provided, use them
          self.current_action_set = self.fixed_actions
      else:
          # Generate K actions uniformly on the unit sphere
          self.current_action_set = ActionsGenerator(self.K, self.d)
      return self.current_action_set
      #return np.eye(self.d) ## dummy return, it only returns d actions (TODO)


    def get_reward(self, action):
      """ sample reward given action and the model of this bandit environment
      action: d-dimensional vector (action chosen by the learner)
      """
      mean = np.dot(action, self.theta)
      return np.random.normal(mean, scale=self.var)

    def get_means(self):
      return np.dot(self.current_action_set, self.theta)

In [None]:
sigma = 1
theta = np.
d = 1
T = 100  # Finite Horizon
N = 100  # Monte Carlo simulations
K = 10
iid_env = LinearBandit(theta, K, var=sigma**2)
linTS = LinTS(d, sigma, lambda_reg=1.0, alpha=1.0)

In [None]:
linucb_vs_greedy = experiment(iid_env, [linTS], Nmc=N, T=T, pseudo_regret=True)

In [None]:
def run_experiment(n_arms, n_trials, true_success_probs, strategy="rpm"):
    """Simulates an experiment with RPM ou Equal Allocation stragies
    
    Args :
        n_arms (int) : number of arms/machines
        n_trials (int) : number of trials
        true_success_probs (array) : array containing the true success probability of each arm
        strategy (str) : strategy used (rpm or equal allocation)

    Returns : 
        regrets (list) : list of regrets at each step 
        cumulative_regret (float) : total regret at the end of the experiment
    """

    optimal_reward = max(true_success_probs) #proba of success of the optimal arm

    Y_t = np.zeros(n_arms)  # Succès cumulatifs observés pour chaque bras
    N_t = np.zeros(n_arms)  # Nombre total cumulatifs de tirages pour chaque bras
    
    # n_prior_obs = 10**6
    # champion_index = 0
    # N_t[champion_index] = n_prior_obs
    # prior_obs = np.random.rand(n_prior_obs) < true_success_probs[champion_index]
    # Y_t[champion_index] = np.sum(prior_obs)

    regrets = []
    cumulative_regret = 0
    
    for t in range(1, n_trials + 1):
        n_observations = 1000 # Nb of observation for one trial

        # Calculer les probabilités d'optimalité
        
        win_probs = compute_win_prob(Y_t, N_t, 100) # w_at for each arm
        if strategy=="rpm":
            chosen_arm = np.random.choice(range(1,n_arms+1), p=win_probs, size=n_observations)
        if strategy=="equal allocation":
            chosen_arm = np.random.choice(range(1,n_arms+1), p=[1/n_arms]*n_arms, size=n_observations)
        
        hat_theta = np.dot(invcov, b_t)
        cov += 1/sigma**2*np.outer(chosen_arm, chosen_arm)
        invcov = pinv(self.cov)

        # Actualiser N_t, remplir n_t
        np.add.at(N_t, chosen_arm-1, 1) # Nombre total de tirages pour chaque bras 
        n_t = np.zeros(n_arms)  # Nombre de tirages pour chaque bras à cette étape
        np.add.at(n_t, chosen_arm-1, 1)

        # Actualiser Y_t et calculer le regret 
        regret_t = 0
        for a in range(n_arms):
            obs = np.random.rand(int(n_t[a])) < true_success_probs[a] # Simule des observations selon la probabilité réelle de succès de ce bras
            Y_t[a] += np.sum(obs) #mets à jour le nombre de succès de ce bras

            regret_t +=  n_t[a]*(optimal_reward - true_success_probs[a])
        regrets.append(regret_t)
        cumulative_regret += regret_t
        
        if max(win_probs)>0.95: 
            break

    
    return regrets, cumulative_regret

In [16]:
def generate_factor_vectors_with_zeros(levels):
    factor_vectors = []
    for level in levels:
        # Créer des vecteurs binaires pour un facteur avec une seule position "active"
        vectors = np.eye(level, dtype=int)
        
        # Ajouter un vecteur de zéros au début
        zeros_vector = np.zeros((1, level), dtype=int)
        vectors = np.vstack((zeros_vector, vectors))
        
        factor_vectors.append(vectors)
    return factor_vectors
generate_factor_vectors_with_zeros([2, 3])

[array([[0, 0],
        [1, 0],
        [0, 1]]),
 array([[0, 0, 0],
        [1, 0, 0],
        [0, 1, 0],
        [0, 0, 1]])]

In [29]:
import itertools
def generate_factor_vectors_with_zeros(levels):
    factor_vectors = []
    for level in levels:
        # Créer des vecteurs binaires pour un facteur avec une seule position "active"
        vectors = np.eye(level-1, dtype=int)
        
        # Ajouter un vecteur de zéros au début
        zeros_vector = np.zeros((1, level-1), dtype=int)
        vectors = np.vstack((zeros_vector, vectors))
        
        factor_vectors.append(vectors)
    return factor_vectors

def X_possibilities(levels):
    # Produire toutes les combinaisons dans le même ordre
    all_combinations = list(itertools.product(*generate_factor_vectors_with_zeros(levels)))

    # Convertir les combinaisons en vecteurs concaténés
    combined_vectors = [np.hstack(combination) for combination in all_combinations]
    #ajouter l'intercept
    combined_vectors_with_leading_one = [np.insert(vector, 0, 1) for vector in combined_vectors]

    return combined_vectors_with_leading_one

In [32]:
np.shape(X_possibilities([2,3,4,5]))

(120, 11)

In [17]:

import numpy as np
import pandas as pd

def X_possibilities():
    # Nombre de bits pour chaque facteur
    bits_factor_1 = 1  # 2 niveaux
    bits_factor_2 = 2  # 3 niveaux
    bits_factor_3 = 3  # 4 niveaux
    bits_factor_4 = 4  # 5 niveaux

    # Générer les combinaisons pour chaque facteur
    factor_1_combinations = list(itertools.product([0, 1], repeat=bits_factor_1))
    factor_1_combinations
    factor_2_combinations = list(itertools.product([0, 1], repeat=bits_factor_2))
    factor_3_combinations = list(itertools.product([0, 1], repeat=bits_factor_3))
    factor_4_combinations = list(itertools.product([0, 1], repeat=bits_factor_4))

    # Réduction à 120 combinaisons possibles en tenant compte des niveaux de chaque facteur
    factor_2_combinations = factor_2_combinations[:3]
    factor_3_combinations = factor_3_combinations[:4]
    factor_4_combinations = factor_4_combinations[:5]

    # Produit cartésien pour toutes les combinaisons
    all_combinations = itertools.product(
        [(1,)],  # Intercept always present
        factor_1_combinations,
        factor_2_combinations,
        factor_3_combinations,
        factor_4_combinations
    )

    # Applatir et convertir en vecteurs binaires
    binary_vectors = [
        np.hstack(combination).astype(int)
        for combination in all_combinations
    ]

    return binary_vectors
X_possibilities()

[array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
 array([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]),
 array([1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0]),
 array([1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1]),
 array([1, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0]),
 array([1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0]),
 array([1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1]),
 array([1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0]),
 array([1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1]),
 array([1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0]),
 array([1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0]),
 array([1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1]),
 array([1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0]),
 array([1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 1]),
 array([1, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0]),
 array([1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0]),
 array([1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1]),
 array([1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 0]),
 array([1, 0, 0, 0, 0, 1, 1, 0, 0, 1, 1]),
 array([1, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0]),
 array([1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]),
 array([1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1]),
 array([1, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0]),
 array([1, 