In [None]:
import numpy as np
import matplotlib.pyplot as plt


In [None]:
#class to contain the testbed: states, actions, rewards etc
#to incorporate question 2f too we will include an indicator that tells us if we want to use a random walk or not
class ArmedBanditTB:
    
    def __init__(self, nRuns, eps, q_init = 5, ucb = False, c=2, rw = False, alpha_const = False, alpha = 0.1, decay_eps = False):
        
        #number of times we will interact with environment
        self.nRuns= nRuns
        
        #eps for eps greedy algorithm (0 eps just means standard greedy)
        self.epsilon = eps
        
        #initial values for all Q_0(a_t)
        self.initial_q = q_init
        
        #if we want to use UCB or not
        self.ucb = ucb
        if self.ucb:
            self.c = c
        
        #constant alphas for updates or not
        self.alpha_const = alpha_const
        if self.alpha_const:
            self.alpha = alpha
        
        #implemented for part f when we want to use random walks to update our Q*(At)s
        self.rw = rw
        
        self.decay = decay_eps
        
        #calling function below to initialise a few more variables- case specific generalisations
        self.reset()
        
    def reset(self):
        
        self.Q = np.ones(10)*self.initial_q
        
        #for the ucb calculation if needed
        if self.ucb:
            self.U = np.zeros(10)
        
        #store rewards, count of how many times each action has been taken
        self.rewards = []
        #conventionally initialise all counts at 1 (no division by 0 errors)
        self.count = np.ones(10)
        
        #indicator of optimal move chosen or not at each step (for optimal choice ratio)
        self.count_correct = []
        
        #for part 2f we know we want to update Q* at each timestep as a random walk, start at 0
        if self.rw:
            self.q_actuals = np.zeros(10)
        #part 2e we will just initialise with some random normal(0,1) values
        else:
            self.q_actuals = np.random.normal(size=10)
            
        #will be required for part 2f when we want to also consider maximum reward attainable so far
        #therefore we store the history of the Q* now as its non stationary (at each time step we can now ascertain maximum reward so far)
        self.q_optimals_history = np.empty(shape=(10, self.nRuns))        
    
    #function to update Q recursively in alpha constant or not case
    def updateQ(self,action_index, R):
        if self.alpha_const:
            self.Q[action_index] = self.Q[action_index] + (self.alpha * (R-self.Q[action_index]))
        else:
            N = 1/self.count[action_index]
            #Q2b shows us this is how to update recursively in the sample average case
            self.Q[action_index] = self.Q[action_index] + (N * (R-self.Q[action_index]))
    
    #random walk updates- only needed in random walk case
    def updateTrueQ(self):
        self.q_actuals += np.random.normal(loc=0.0,scale=0.1,size=10)
    
    #normal reward with mean Q*(index) and Variance = 1
    def generateRewards(self, action_index):
        return np.random.normal(loc=self.q_actuals[action_index], scale = 1)
    
    #will return all indices where we have greedy choice (could be more than one)
    def findGreedy(self,arr):
        greedy_indices = []
        
        #greedy choice
        max_arr = np.max(arr)
        
        for i in range(len(arr)):
            if max_arr == arr[i]:
                greedy_indices.append(i)
        return greedy_indices
    
    #function that will make greedy choice- random choice from greedya actions
    def chooseGreedyAction(self, arr):
        return np.random.choice(self.findGreedy(arr))
    
    #function to check if action made was optimal (in line with Q*) or not
    def checkOptimal(self, action_index):
        optimal_act = np.argmax(self.q_actuals)

        if action_index == optimal_act:
            self.count_correct.append(1)
        else:
            self.count_correct.append(0)
    
    #make move (random or greedy) and output reward from move + make updates to scheme
    def makeMove(self, random=False):
        
        #if random i.e. eps case then just take a random index and return its associated reward
        if random:
            index = np.random.randint(10)
        
        #if not random want greedy choice- UCB case or standard case- call chooseGreedy function
        else:
            if self.ucb:
                overall_t = np.sum(self.count)
                self.U = self.c * np.sqrt(np.log(overall_t) / self.count)
                arr = self.U + self.Q
            else:
                arr = self.Q

            index = np.random.choice(self.findGreedy(arr))
        
        #generate associated reward
        R = self.generateRewards(index)
        
        #non-stationary case first update q_actuals (only update if random walk)
        if self.rw:
            self.updateTrueQ()
        
        #update count correct if optimal or not
        self.checkOptimal(index)
        
        #increment how many moves made with this index
        self.count[index] += 1
        
        #make recursive update to our estimate Qs; depending on if its alpha constant or not
        self.updateQ(index, R)
        
        return R
    
    #run trial, go through all time steps and pick epsilon greedy or greedy choice
    def runTrials(self):
        
        for i in range(self.nRuns):
            
            #store optimal Q* at this timestep as we go along (rw case)
            if self.rw:
                self.q_optimals_history[:,i]=self.q_actuals
            
            #condition on random not needed as if eps = 0 the unif will always be >0 anyway
            unif = np.random.random()
            if self.decay:
                eps = self.epsilon / (i+1)
            else:
                eps = self.epsilon
            if unif < eps:
                R = self.makeMove(random=True)
            else:
                R = self.makeMove(random=False)
            
            self.rewards.append(R)
            

In [None]:
#running part e- will initialise a 10 armed bandit as needed with 1000 timesteps and then run it 2000 times
def playBanditStationary(runs=2000,steps=1000, eps_val = 0, q_init_val = 5, ucb_indicator = False, decay_eps = False, verbose = 1):
    '''
    verbose = 0: don't print runs progress, verbose > 0: print runs progress
    Use 2000 runs as per the experiment carried out in the link provided
    '''
    bandit = ArmedBanditTB(nRuns=steps, eps = eps_val, q_init=q_init_val, ucb=ucb_indicator, decay_eps = decay_eps)
    
    rewards = np.zeros(shape=(steps,runs))
    correct_ratio = np.zeros(shape=(steps,runs))
    
    for i in range(runs):
        if verbose != 0:
            if i == runs-1:
                print("Run %i Completed" %runs)
            elif i == 0:
                print("Run 001 Completed")
            elif i%250 == 0:
                print("Run %i Completed" %i)
        
        bandit.reset()
        bandit.runTrials()
        
        rewards[:, i] = np.asarray(bandit.rewards)
        correct_ratio[:, i] = np.asarray(bandit.count_correct)
    
    #provides optimal action ratio
    correct_arr = correct_ratio.mean(axis=1)
    
    return rewards, correct_arr


In [None]:
#now obtaining results we want to test

#greedy with Q_0 = 5, no UCB
print("Greedy, Initial Values 5")
rewards_greedy, ratios_greedy = playBanditStationary()

#epsilons we want to test against the greedy, initial values 0
epsilons = [0.10, 0.01]
reward_eps, ratios_eps = [], []
for es in epsilons:
    print("\neps-Greedy with eps = %.2f, Initial Values 0, UCB" %es)
    rewards, ratios = playBanditStationary(eps_val=es, q_init_val= 0, ucb_indicator=True)
    reward_eps.append(rewards)
    ratios_eps.append(ratios)
    
#greedy with Q_0 = 0, no UCB (no verbose as know it works from above)
rewards_greedy_1, ratios_greedy_1 = playBanditStationary(q_init_val = 0, verbose=0)
    

In [None]:
rewards_eps_decay, ratios_decay_eps = playBanditStationary(eps_val=0.1, q_init_val= 0, ucb_indicator=True, decay_eps = True)
rewards_eps_decay5, ratios_decay_eps5 = playBanditStationary(eps_val=0.1, q_init_val= 5, ucb_indicator=True, decay_eps = True)

In [None]:
#resulting plots- comparing Q_0 = Greedy to epsGreedy counterparts
fig, (ax1,ax2) = plt.subplots(2,1,figsize=(10,8))
#average rewards plot
ax1.plot(rewards_greedy.mean(axis=1), label = 'Greedy, Initial Value 5')
for es2 in range(len(epsilons)):
    ax1.plot(reward_eps[es2].mean(axis=1),
             label = 'epsGreedy with Epsilon = %.2f, Initial Value 0, UCB' %epsilons[es2])
ax1.set_xlabel("Steps",fontsize=12)
ax1.set_ylabel("Average Reward", fontsize=12)
ax1.set_title("eps-Greedy vs Greedy", fontsize=12)
ax1.plot(rewards_eps_decay5.mean(axis=1), label = 'eps_t Decay Greedy, Initial Value 0')
ax1.legend()


#optimal rewards plot
ax2.plot(ratios_greedy*100, label = 'Greedy, Initial Value 5')
for es3 in range(len(epsilons)):
    ax2.plot(100*(ratios_eps[es3]),
             label = 'epsGreedy with Epsilon = %.2f, Initial Value 0, UCB' %epsilons[es3])
ax2.set_xlabel("Steps",fontsize=12)
ax2.set_ylabel("Optimal Action (%)", fontsize=12)
ax2.set_title("eps-Greedy vs Greedy", fontsize=12)
ax2.legend()

plt.tight_layout()
plt.show()

print("NOW USING Q_0 = 0 for ALL INSTANCES")

#now plots for Q_0 = 0 for all
fig, (ax1_1,ax2_1) = plt.subplots(2,1,figsize=(10,8))
#average rewards plot
ax1_1.plot(rewards_greedy_1.mean(axis=1), label = 'Greedy, Initial Value 0')
for es2 in range(len(epsilons)):
    ax1_1.plot(reward_eps[es2].mean(axis=1),
             label = 'epsGreedy with Epsilon = %.2f, Initial Value 0, UCB' %epsilons[es2])
ax1_1.set_xlabel("Steps",fontsize=12)
ax1_1.set_ylabel("Average Reward", fontsize=12)
ax1_1.set_title("eps-Greedy vs Greedy", fontsize=12)
ax1_1.plot(rewards_eps_decay.mean(axis=1), label = 'eps_t Decay Greedy, Initial Value 0')
ax1_1.plot(rewards_eps_decay5.mean(axis=1), label = 'eps_t Decay Greedy, Initial Value 5')
ax1_1.legend()


#optimal rewards plot
ax2_1.plot(ratios_greedy_1*100, label = 'Greedy, Initial Value 0')
for es in range(len(epsilons)):
    ax2_1.plot(100*(ratios_eps[es]),
             label = 'epsGreedy with Epsilon = %.2f, Initial Value 0, UCB' %epsilons[es])
ax2_1.set_xlabel("Steps",fontsize=12)
ax2_1.set_ylabel("Optimal Action (%)", fontsize=12)
ax2_1.set_title("eps-Greedy vs Greedy", fontsize=12)
ax2_1.plot(ratios_decay_eps*100, label = 'Greedy, Initial Value 0')
ax2_1.plot(ratios_decay_eps5*100, label = 'eps_t Greedy, Initial Value 5')
ax2_1.legend()

plt.tight_layout()
plt.show()


### Q2f

Now initialising Q* optimal values as a random walk (starting at 0) instead of a stationary value. Therefore, at each timestep we will update Q* by adding on a Normal(0,0.01) sample to each instance. 

Moreover, to showcase the limitations of a sample-average method in updating Q estimates, we will compare it to an action-value method that updates Q estimates at every increment with a constant alpha (=0.1) parameter. 

Mostly, the experimentation and resulting plots will remain the same. The main change will be now we are also interested in the maximum attainable reward which we use to compare our two methods (can be used to showcase the sub-optimality of one in time better). This change comes in the form of now also updating the Q* history (now a random walk so max changing) and storing the max reward attainable at that time. Then we will plot all rewards and see which method (constant alpha OR sample-average) provides a reward closer to the maximum attainable reward as well as look at the optimality % as usual.

Additionally, we will also use 10,000 steps instead of a 1000 in each run, Number of Runs remains at 2000.

In [None]:
def playBanditNonStationary(steps=10000, runs=2000, verbose = 1):
    '''
    Function to carry out all the testing required for Q2f
    First run the standard eps-Greedy algorithm with sample average Q estimate updates
    Then run the eps-Greedy algorithm with constant alpha Q estimate updates
    To grasp which one is better, we also compute and return the actual optimal Q*'s as the random walk progresses 
    '''
    
    
    print("eps-Greedy with eps=0.1, 2000 Runs, 10000 Steps")
    #standard set up but now with a random walk- eps=0.1 case (no UCB), now with random walk
    
    bandit_eps = ArmedBanditTB(nRuns = steps, q_init=0, eps=0.1, rw=True)
    
    rewards_eps = np.zeros(shape=(steps,runs))
    history_eps = np.zeros(shape=(steps,runs))
    correct_ratio_eps = np.zeros(shape=(steps,runs))
    
    #carry out runs and store results accordingly
    for i in range(runs):
        if verbose!=0:
            if i == runs-1:
                print("Run %i Completed" %runs)
            elif i == 0:
                print("Run 001 Completed")
            elif i%250 == 0:
                print("Run %i Completed" %i)
            
            bandit_eps.reset()
            bandit_eps.runTrials()
            rewards_eps[:,i]=np.asarray(bandit_eps.rewards)
            correct_ratio_eps[:,i]=np.asarray(bandit_eps.count_correct)
            
            #take max reward at time and store it accordingly- maximum reward could've been attained as per Q*
            history_eps[:,i]=np.max(bandit_eps.q_optimals_history,axis=0)
    
    
    
    #now doing the same in the second case but also initialising with a constant alpha, no UCB again
    print("\neps-Greedy with eps=0.1, 2000 Runs, 10000 Steps, Constant Alpha=0.1")
    bandit_alpha = ArmedBanditTB(nRuns=steps, q_init=0, eps=0.1, rw= True, alpha_const=True, alpha=0.1)
    rewards_alpha = np.zeros(shape=(steps, runs))
    history_alpha = np.zeros(shape=(steps,runs))
    correct_ratio_alpha = np.zeros(shape=(steps,runs))
    
    for i in range(runs):
        if verbose!=0:
            if i == runs-1:
                print("Run %i Completed" %runs)
            elif i == 0:
                print("Run 001 Completed")
            elif i%250 == 0:
                print("Run %i Completed" %i)
                
        bandit_alpha.reset()
        bandit_alpha.runTrials()
        rewards_alpha[:,i]=np.asarray(bandit_alpha.rewards)
        history_alpha[:,i]=np.max(bandit_alpha.q_optimals_history,axis=0)
        correct_ratio_alpha[:,i]=np.asarray(bandit_alpha.count_correct)
    
    return rewards_eps, rewards_alpha, history_eps, history_alpha, correct_ratio_eps, correct_ratio_alpha
    
    

In [None]:
#obtaining results for both tests
rw_rewards_eps, rw_rewards_alpha, rw_history_eps, rw_history_alpha, rw_ratio_eps, rw_ratio_alpha = playBanditNonStationary()

In [None]:
#producing plots required for both tests
fig2, (ax3, ax4) = plt.subplots(2,1,figsize=(10,8))
ax3.plot(rw_rewards_eps.mean(axis=1), label="Sample Average Method")
ax3.plot(rw_rewards_alpha.mean(axis=1), label="Constant Alpha = 0.1")
ax3.plot(rw_history_eps.mean(axis=1), label="Maximum Possible Reward")
ax3.set_xlabel("Steps", fontsize=12)
ax3.set_ylabel("Average Reward", fontsize=12)
ax3.set_title("Sample Average Method vs Constant Alpha Method vs Max Attainable Reward", fontsize=12)
ax3.legend()

ax4.plot(100*rw_ratio_eps.mean(axis=1), label="Sample Average Method")
ax4.plot(100*rw_ratio_alpha.mean(axis=1), label="Constant Alpha")
ax4.set_xlabel("Steps", fontsize=12)
ax4.set_ylabel("Optimal Action (%)", fontsize = 12)
ax4.set_title("Sample Average Method vs Constant Alpha Method", fontsize=12)
ax4.legend()

plt.tight_layout()
plt.show()
