In [1]:
import numpy as np

In [2]:
class normalbandit(object):
    """
    Bandit class with rewards drawn from the normal distribution

parameters:
    k: number of bandits to initialize = 10
    mu range: range in which means are drawn: min to max
    var : variance is 1 as described in the testbed
 functions:
    pull:generates reward from bandit i in k
    """
    def __init__(self, arms=10):
        self.k = arms
        self._mu = np.random.normal(0,1, self.k)
        self._bestarm = np.argmax(self._mu)

    def pull(self, i):
        return np.random.normal(self._mu[i], 1)


In [3]:
class epsilongreedy(normalbandit):
    """
Epsilon-Greedy strategy

 parameters:
    epsilon: probability of choosing a random arm. Takes values in (0,1)
    annealing: whether epsilon decreases over time. Takes boolean
    bandit: which bandit object it will play 
"""

    def __init__(self,  normalbandit, turns=10, epsilon=.01):
        self._bandit =  normalbandit
        self._epsilon = epsilon
        self._turns = turns

    def initialize(self):
        self._choice = [0 for i in range(self._bandit.k)]
        self._avgreward = 0.0001*np.random.uniform(0, 1, self._bandit.k)
        #self._avgreward = [self._bandit.pull(i) for i in range(0, self._bandit.k)]
        self._sequence = []
        self._reward = [0 for i in range(self._turns-1)]
        self._optarm = [0 for i in range(self._turns-1)]
        
    def run(self):
        for t in range(1, self._turns):
            if np.random.uniform() > self._epsilon:
                arm = np.argmax(self._avgreward)  # Choose the arm with the highest history
                self._choice[arm] += 1
                self._sequence.append(arm)
                reward = self._bandit.pull(arm)  # Generate a reward from that arm
                self._avgreward[arm] = self._avgreward[arm] + (reward - self._avgreward[arm]) / self._choice[arm]
                
            else:
                arm = np.random.random_integers(0, self._bandit.k - 1)  # Generate a random arm
                self._choice[arm] += 1
                self._sequence.append(arm)
                reward = self._bandit.pull(arm)
                self._avgreward[arm] = self._avgreward[arm] + (reward - self._avgreward[arm]) / self._choice[arm]
            if arm == self._bandit._bestarm:
                self._optarm[int(t)-1] = 1
                      
            self._reward[int(t)-1] = reward

In [4]:
class softmax(normalbandit):

    def __init__(self,  normalbandit, turns=10, temperature=.01):
        self._bandit =  normalbandit
        self._temperature = temperature
        self._turns = turns

    def initialize(self):
        self._choice = [0 for i in range(self._bandit.k)]
        self._avgreward = 0.0001*np.random.uniform(0, 1, self._bandit.k)
        self._cumreward = [0 for i in range(self._bandit.k)]
        self._sequence = []
        self._reward = [0 for i in range(self._turns-1)]
        self._optarm = [0 for i in range(self._turns-1)]
        
    def run(self):
        for t in range(1, self._turns):
            exp_avreward=np.exp(self._avgreward/self._temperature)
            softmax_probab=exp_avreward/np.sum(exp_avreward) # softmax probabilities
            arm=np.random.choice(range(self._bandit.k),1,p=softmax_probab) # picking arms with softmax probability
            arm = int(arm)
            self._choice[arm] += 1
            self._sequence.append(arm)
            reward = self._bandit.pull(arm)  # Generate a reward from that arm
            self._cumreward[arm] += reward
            self._avgreward[arm] = self._cumreward[arm] / self._choice[arm]
            
            if arm == self._bandit._bestarm:
                self._optarm[int(t)-1] = 1
            self._reward[int(t)-1] = reward

In [5]:
# ucb1

class ucb1(normalbandit):
    """
UCB1 strategy

 parameters:
    bandit: which bandit object it will play
    turns: how long the strategy will run
"""

    def __init__(self, normalbandit, turns=10, alpha=1):
        self._bandit =  normalbandit
        self._turns = turns
        self._alpha = alpha

    def initialize(self):
        self._choice = [0 for i in range(self._bandit.k)]
        self._avgreward = 0.0001*np.random.uniform(0, 1, self._bandit.k)
        self._cumreward = [0 for i in range(self._bandit.k)]
        self._sequence = []
        self._reward = [0 for i in range(self._turns-1)]
        self._optarm = [0 for i in range(self._turns-1)]
        self._P = np.zeros(self._bandit.k)
        
    def run(self):
        for t in range(1, self._turns):
            arm = np.argmax(np.add(self._avgreward, self._alpha*self._P))
            self._choice[arm] += 1
            self._sequence.append(arm)
            reward = self._bandit.pull(arm)  # Generate a reward from that arm
            self._cumreward[arm] += reward
            self._avgreward[arm] = self._cumreward[arm] / self._choice[arm]
            self._P = np.sqrt((2 * np.log(t)) / (self._choice))
            
            if arm == self._bandit._bestarm:
                self._optarm[int(t)-1] = 1
            self._reward[int(t)-1] = reward


In [6]:
# linucb

In [7]:
import numpy as np


class UserAdvert(object):
    """
    Contextual bandit environment:

    There are 3 types of advertisements and each user is represented by a
    vector. Your task is to build a contextual bandit that gives an appropriate
    action (suggests appropriate add for each user)

    The reward is the profit (in $), as a result of playing that advertisement.

    check sample.py to see how to use function
    """
    def __init__(self):
        # Set random seed
        np.random.seed(100)

        # Load data and normalize
        self.data = np.loadtxt("ads.csv", delimiter=",")
        np.random.shuffle(self.data)

        self.labels = self.data[:, 4]
        self.data = self.data[:, :4]
        self.data = self.data - self.data.mean(axis=0)
        self.data = self.data / self.data.std(axis=0)

        # Set internal variables
        self.counter = 0
        self.num = self.data.shape[0]
        self.means = [[3, 1, 1],
                      [1, 3, 1],
                      [1, 1, 3]]
        self.var = 1.0

    def getState(self):
        self.counter = (self.counter + 1) % self.num
        curData = self.data[self.counter]
        returnObject = {
            "stateVec": curData,
            "stateId": self.counter
        }
        return returnObject

    def getReward(self, stateId, action):
        """
        Get reward for performing 'action' on 'stateId'
        """
        assert(action in [0, 1, 2] and type(action) is int), \
            "Invalid action, action must be an int which is 0, 1 or 2"
        #  Add 0.2 to avoid rounding errors
        dataClass = int((self.labels[stateId]) + 0.2)
        reward = np.random.normal(self.means[dataClass][action], self.var),
        return reward[0]


In [8]:
ACTION_SIZE = 3
STATE_SIZE = 4
TRAIN_STEPS = 10000  # Change this if needed
LOG_INTERVAL = 10

def learnBandit():
    env = UserAdvert()
    rew_vec = []
    w = np.random.normal(0,1,size = (ACTION_SIZE,STATE_SIZE))
    LEARNING_RATE = 0.01
    
    delta = np.zeros((1,4))
    for train_step in range(TRAIN_STEPS):
        state = env.getState()
        stateVec = state["stateVec"]
        stateId = state["stateId"]

        # ---- UPDATE code below ------j
        stateVec = np.reshape(stateVec,(4,1))
        q = np.matmul(w,stateVec)
        exp_q=np.exp(q)
        softmax_probab=exp_q/np.sum(exp_q) # softmax probabilities
        policy = np.reshape(softmax_probab, 3)
        action = int(np.random.choice(range(3),p=policy))
        
        reward = env.getReward(stateId, action)
        # ----------------------------
        # updating the policy parameters
        factor = -policy
        factor[action] +=1
        factor = np.expand_dims(factor,axis=1)
        
        delta = np.matmul(factor,np.transpose(stateVec))
        
        w = w + LEARNING_RATE*(reward-2)*delta    
        
        # ----------------------------

        if train_step % LOG_INTERVAL == 0:
            #print("Testing at: " + str(train_step))
            count = 0
            test = UserAdvert()
            for e in range(450):
                teststate = test.getState()
                testV = teststate["stateVec"]
                testI = teststate["stateId"]
                # ---- UPDATE code below ------
                
                q = np.matmul(w,stateVec)
                exp_q=np.exp(q)
                softmax_probab=exp_q/np.sum(exp_q)
                
                # ----------------------------
                act = int(np.random.choice(range(3), p=np.reshape(softmax_probab, 3)))
                reward = test.getReward(testI, act)
                count += (reward/450.0)
            rew_vec.append(count)

    # ---- UPDATE code below ------
    x= [i for i in range(int(TRAIN_STEPS/LOG_INTERVAL))]
    fig1=plt.figure(figsize=(10,7)).add_subplot(111)
    fig1.set_xlabel('epochs')
    fig1.set_ylabel('Average Returns for lr = 0.01')
    fig1.plot(x,rew_vec)
    plt.show()


if __name__ == '__main__':
    learnBandit()


OSError: ads.csv not found.