In [None]:
import gymnasium as gym
import numpy as np
import os
import random
import matplotlib.pyplot as plt
from IPython.display import Image

## random policy

- Equal probability for selecting any action in any state

In [None]:
def random_policy(nS,nA):
    action_prob = [1/nA] * nA
    return [action_prob for _ in range(nS)]

# greedy policy

- Always select the action with the highest Q-value (predicted future reward) for the current state to exploit 

In [None]:
def greedy_policy(Q, nS, nA):
    # initialize arbitrary policy
    policy = np.zeros([nS,nA])
    
    # list of states in environment 
    S = [i for i in range(nS)]

    # loop over states
    for state in S:
        if all([ q == 0 for q in Q[state]]) :
            policy[state] = random_policy(1,nA)[0]
        
        else:
            best_action = np.argmax(Q[state]) 
            policy[state] = np.eye(nA)[best_action].tolist()

    return policy

## ε-greedy policy

- Initialize the exploration rate ε
- Generate a random number between 0 and 1.
- If the generated random number is less than ε, select a random action to explore.
- Otherwise, select the action with the highest Q-value for the current state to exploit.

In [None]:
def epsilon_greedy_policy(Q, nS, nA, epsilon):
    # initialize arbitraty policy
    policy = [[0 for _ in range(nA)] for _ in range (nS)]
    
    # list of states in environment 
    S = [i for i in range(nS)]

    # list of possible actions to be taken by the agent
    A = [i for i in range(nA)]

    # loop over states
    for state in S:

        # if all Q values in state are 0, agent picks random action
        if all([ q == 0 for q in Q[state][:]]):
            for action in range(nA):
                policy[state][action] = 1/nA

        else:
            # position of maximum Q value in state
            maxQ = np.argmax(Q[state])

            # probability agent acts randomly
            prob = [epsilon/nA for _ in range(nA)]

            # probability agent acts greedily
            prob[maxQ] = 1 - epsilon  + (epsilon/nA)

            for action in A:
                policy[state][action] = prob[action]
    return policy

Function to evaluate policy performance
----------------------------------------

- Evaluate average episode reward following a given policy 

In [None]:
def evaluate_average_reward(env, nA, policy, num_eval_episodes = 100):
    # store episode rewards 
    episode_rewards = []

    # list of possible actions to be taken by the agent
    A = [i for i in range(nA)]
    
    # loop over number of episodes 
    for _ in range(num_eval_episodes):
        state = env.reset()[0]
        episode_reward = 0

        # loop until episode ends
        while True:
            # pick action according to the current policy
            action = random.choices(A, weights=[policy[state][i] for i in range(nA)], k=1)[0]

            # take action
            next_state, reward, terminated, truncated, info = env.step(action)

            # update current state
            state = next_state

            # count cumulative episode reward:
            episode_reward += reward
        
            # exit loop when episode is terminated or truncated
            if terminated==True or truncated == True:
                episode_rewards.append(episode_reward)
                break
    
    env.reset()
    
    reward_average = np.mean(episode_rewards)

    return reward_average

Frozen Lake environment from gymnasium API
--------------------------------------------

In [None]:
env = gym.make('FrozenLake-v1',map_name="4x4",render_mode="ansi",is_slippery=False)

# restart episode with agent in state 0
next_state, info = env.reset()

print(env.render())

nS = env.observation_space.n
nA = env.action_space.n

print(f"Size of state space: {nS} \nSize of action space: {nA}")

Off-policy MC control
----------------------
---------------------

Off-policy methods are where actions taken by the agent for data aquisition follows a behaviour policy, which is different from the policy evaluated for control, called the target policy. This allows for more exploration of the state-action space that may have otherwise been missed if following the target policy, and thus reduces the risk of learning a sub-optimial policy. 

In order to evaluate the target policy from average returns sampled under the behaviour policy, we need to weight each return by an importance sampling ratio, which is the likelihood of the return under the target policy relative to the behaviour policy.

In [None]:
print("Importance sampling ratio:")
Image(filename='Pseudo-code/importance_sampling_ratio.png')

In [None]:
print("on-policy MC control algorithm:")
Image(filename='Pseudo-code/off-policy_MC_control.png')

Set hyperparameters

In [None]:
# Discount factor [0,1]
gamma = # ...
# Exploration rate [0,1]
epsilon = # ...
# Number of episodes [1,infty]
num_episodes = # ...

In [None]:
def offpolicy_control(env, nS, nA, num_episodes, alpha, gamma, epsilon):

    # initialize arbitary action-value function
    Q = # ...

    # list of possible actions to be taken by the agent
    A = [i for i in range(nA)]
    
    # list to store cumulative sum of weights, W, for a given state-action value
    C = np.zeros([nS,nA])

    # initialize epsilon-greedy policy
    b_policy = # ....

    average_reward = evaluate_average_reward(env, b_policy, num_eval_episodes = 100)
    
    # store number of policy iterations and average episode reward during training
    num_policy_iter = [0]
    average_rewards_list_t = [average_reward]
    average_rewards_list_b = [average_reward]

    # number of policy iterations
    n = 0
    
    for _ in range(num_episodes):
        n+=1
        # initialize initial state value
        state = 0

        # save states, actions and rewards for each episode
        states = [0]
        actions = []
        rewards = []

        # loop until episode terminates or truncates 
        while True:
            b_policy = epsilon_greedy_policy(Q, nS, nA, epsilon = 0.1)
            
            # pick action according to the current behaviour policy
            action = # ...

            # take action
            next_state, reward, terminated, truncated, info = env.step(action)

            if terminated == False and truncated == False:
                # save state, action and reward in episode
                
                # update current state
                state = next_state

            # if epsiode terminates or truncates, reset episode and exit loop
            if terminated == True or truncated == True:
                actions.append(action)
                rewards.append(reward)
                
                # reset episode
                # exit loop
    
        # initialize return value
        G = 0

        # initialize importance sampling ratio
        W = 1

        episode_length = len(states)
        
        # evaluate return for each state in episode
        # loop backwards from the end of the episode, as return is a function of all future rewards in an episode
        for i in reversed(range(episode_length)):

            # return at the current state-action pair is equal to the sum of the immediate reward plus the future discounted return
            G = # ...
            
            # cumulative sum of weights for each state-action pair
            C[states[i],actions[i]] = C[states[i],actions[i]] + W

            # incremental update rule for Q
            Q[states[i],actions[i]]= # ...

            t_policy = greedy_policy(Q,nS,nA)

            # if action taken is not the optimal action under the target policy
            if actions[i] != np.argmax(t_policy[states[i]]):
                # then t_policy(s,a) = 0 (as target policy is deterministic) and so update of W = 0
                break
            
            # if action taken is the optimial action, update W
            else:
                # where the probability the optimial action is taken under target policy is 1
                W = # ...

        average_reward_t = evaluate_average_reward(env, t_policy, num_eval_episodes = 100)
        average_reward_b = evaluate_average_reward(env, b_policy, num_eval_episodes = 100)

        average_rewards_list_t.append(average_reward_t)
        average_rewards_list_b.append(average_reward_b)

        num_policy_iter.append(n)
    
    return Q, t_policy, b_policy, num_policy_iter, average_rewards_list_t, average_rewards_list_b

In [None]:
Q, t_policy, b_policy, num_policy_iter, average_rewards_list_t, average_rewards_list_b = offpolicy_control(env, nS, nA, num_episodes, gamma, epsilon)

Evaluate the results
--------------------

In [None]:
# plot learning rate
fig, ax = plt.subplots()
ax.plot(num_policy_iter, average_rewards_list_b, color = 'black', label = "behaviour policy")
ax.plot(num_policy_iter, average_rewards_list_t, color = 'red', linestyle = 'dashed', label = "target policy")

ax.set_xlabel("Number of policy iterations")
ax.set_ylabel('Average reward')

ax.legend(prop = { "size": 10 })

plt.xticks(np.arange(num_policy_iter[0], num_policy_iter[-1]+1,  num_policy_iter[-1]*0.1))

plt.show()