In [2]:
!pip install gym==0.7
import gym
import numpy as np

Collecting gym==0.7
  Downloading gym-0.7.0.tar.gz (149 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/149.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.5/149.5 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting pyglet>=1.2.0 (from gym==0.7)
  Downloading pyglet-2.0.17-py3-none-any.whl.metadata (7.9 kB)
Downloading pyglet-2.0.17-py3-none-any.whl (936 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m936.6/936.6 kB[0m [31m20.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: gym
  Building wheel for gym (setup.py) ... [?25l[?25hdone
  Created wheel for gym: filename=gym-0.7.0-py3-none-any.whl size=201576 sha256=371e8525d90ea3e1b1735210af4d1aeff94b8e66073c9e495de60c80e614a370
  Stored in directory: /root/.cache/pip/wheels/45/41/63/c8ad08982323c2d78191ea49280294935111fd8e5010534f85
Successf

# 0. Monte Carlo
Write the function `def monte_carlo(env, V, policy, episodes=5000, max_steps=100, alpha=0.1, gamma=0.99):` that performs the Monte Carlo algorithm:


- `env` is the openAI environment instance
- `V` is a `numpy.ndarray` of shape `(s,)` containing the value estimate
- `policy` is a function that takes in a state and returns the next action to take
- `episodes` is the total number of episodes to train over
- `max_steps` is the maximum number of steps per episode
- `alpha` is the learning rate
- `gamma` is the discount rate
- Returns: `V`, the updated value estimate

In [7]:
np.random.seed(0)

env = gym.make('FrozenLake8x8-v0')
LEFT, DOWN, RIGHT, UP = 0, 1, 2, 3

def policy(s):
    p = np.random.uniform()
    if p > 0.5:
        if s % 8 != 7 and env.desc[s // 8, s % 8 + 1] != b'H':
            return RIGHT
        elif s // 8 != 7 and env.desc[s // 8 + 1, s % 8] != b'H':
            return DOWN
        elif s // 8 != 0 and env.desc[s // 8 - 1, s % 8] != b'H':
            return UP
        else:
            return LEFT
    else:
        if s // 8 != 7 and env.desc[s // 8 + 1, s % 8] != b'H':
            return DOWN
        elif s % 8 != 7 and env.desc[s // 8, s % 8 + 1] != b'H':
            return RIGHT
        elif s % 8 != 0 and env.desc[s // 8, s % 8 - 1] != b'H':
            return LEFT
        else:
            return UP

V = np.where(env.desc == b'H', -1, 1).reshape(64).astype('float64')
np.set_printoptions(precision=4)
env.seed(0)
print(monte_carlo(env, V, policy).reshape((8, 8)))

INFO:gym.envs.registration:Making new env: FrozenLake8x8-v0
[2024-09-20 11:47:51,023] Making new env: FrozenLake8x8-v0


[[ 0.81    0.9     0.4783  0.4305  0.3874  0.4305  0.6561  0.9   ]
 [ 0.9     0.729   0.5905  0.4783  0.5905  0.2824  0.2824  0.3874]
 [ 1.      0.5314  0.729  -1.      1.      0.3874  0.2824  0.4305]
 [ 1.      0.5905  0.81    0.9     1.     -1.      0.3874  0.6561]
 [ 1.      0.6561  0.81   -1.      1.      1.      0.729   0.5314]
 [ 1.     -1.     -1.      1.      1.      1.     -1.      0.9   ]
 [ 1.     -1.      1.      1.     -1.      1.     -1.      1.    ]
 [ 1.      1.      1.     -1.      1.      1.      1.      1.    ]]


In [6]:
def monte_carlo(env, V, policy, episodes=5000,
                max_steps=100, alpha=0.1, gamma=0.99):
    """ function that performs the monte carlo algorithm

        Args:
            env (gym.Env): gym environment instance.
            V (numpy.ndarray): value estimate
            policy (function): takes in a state and returns the next action
                to take
            episodes (int): total number of episodes to train over,
                default is 5000.
            max_steps (int): maximum number of steps per episode,
                default is 100.
            alpha (float): learning rate for value estimate update,
                default is 0.1.
            gamma (float): discount rate for future rewards, default is 0.99.


        Returns:
            np.ndarray: V
                - V (numpy.ndarray): updated value estimate
    """

    for i in range(episodes):
        # reset environment
        state = env.reset()

        # keep track of each state and its reward
        states = []
        rewards = []
        done = False

        for _ in range(max_steps):
            # get new action
            action = policy(state)

            # take a step and document the states and rewards
            next_state, reward, done, info = env.step(action)
            states.append(state)
            rewards.append(reward)

            # go to next state
            state = next_state

            # check if episode is finished
            if done:
                break

        # keep track of the accumulated reward
        returns = 0
        for step in range(len(states) - 1, -1, -1):

            # get the state and reward from this timestep for this episode
            state = states[step]
            reward = rewards[step]

            # calculate accumulated reward
            returns = (gamma * returns) + reward

            # if the state hasnt been visited yet in this episode
            if state not in states[:i]:

                # update the value estimate
                V[state] += alpha * (returns - V[state])
    return V


# 1. TD(λ)
Write the function `def td_lambtha(env, V, policy, lambtha, episodes=5000, max_steps=100, alpha=0.1, gamma=0.99):` that performs the TD(λ) algorithm:

- `env` is the openAI environment instance
- `V` is a `numpy.ndarray` of shape `(s,)` containing the value estimate
- `policy` is a function that takes in a state and returns the next action to take
- `lambtha` is the eligibility trace factor
- `episodes` is the total number of episodes to train over
- `max_steps` is the maximum number of steps per episode
- `alpha` is the learning rate
- `gamma` is the discount rate
- Returns: `V`, the updated value estimate

In [5]:
np.random.seed(0)

env = gym.make('FrozenLake8x8-v0')
LEFT, DOWN, RIGHT, UP = 0, 1, 2, 3

def policy(s):
    p = np.random.uniform()
    if p > 0.5:
        if s % 8 != 7 and env.desc[s // 8, s % 8 + 1] != b'H':
            return RIGHT
        elif s // 8 != 7 and env.desc[s // 8 + 1, s % 8] != b'H':
            return DOWN
        elif s // 8 != 0 and env.desc[s // 8 - 1, s % 8] != b'H':
            return UP
        else:
            return LEFT
    else:
        if s // 8 != 7 and env.desc[s // 8 + 1, s % 8] != b'H':
            return DOWN
        elif s % 8 != 7 and env.desc[s // 8, s % 8 + 1] != b'H':
            return RIGHT
        elif s % 8 != 0 and env.desc[s // 8, s % 8 - 1] != b'H':
            return LEFT
        else:
            return UP

V = np.where(env.desc == b'H', -1, 1).reshape(64).astype('float64')
np.set_printoptions(precision=4)
env.seed(0)
print(td_lambtha(env, V, policy, 0.9).reshape((8, 8)))

INFO:gym.envs.registration:Making new env: FrozenLake8x8-v0
[2024-09-20 11:47:31,975] Making new env: FrozenLake8x8-v0


[[-0.774  -0.8288 -0.8065 -0.7214 -0.6344 -0.548  -0.4152 -0.4393]
 [-0.7643 -0.7553 -0.776  -0.6273 -0.4213 -0.4698 -0.3294 -0.4009]
 [-0.8883 -0.8796 -0.9215 -1.     -0.669  -0.37   -0.2522 -0.4788]
 [-0.9091 -0.907  -0.9199 -0.9078 -0.8009 -1.     -0.3478 -0.1532]
 [-0.8774 -0.9579 -0.9336 -1.     -0.7624 -0.8244 -0.6629 -0.1192]
 [-0.9308 -1.     -1.      0.6361 -0.7978 -0.715  -1.      0.3673]
 [-0.9145 -1.     -0.5743 -0.0703 -1.     -0.3774 -1.      0.9231]
 [-0.8599 -0.8444 -0.7795 -1.      1.      0.4657  0.5018  1.    ]]


In [4]:
def td_lambtha(env, V, policy, lambtha, episodes=5000,
               max_steps=100, alpha=0.1, gamma=0.99):
    """ function that performs the TD(λ) algorithm

        Args:
            env (gym.Env): gym environment instance.
            V (numpy.ndarray): value estimate
            policy (function): takes in a state and returns the next action
                to take
            lambtha (float):  eligibility trace factor
            episodes (int): total number of episodes to train over,
                default is 5000.
            max_steps (int): maximum number of steps per episode,
                default is 100.
            alpha (float): learning rate, default is 0.1.
            gamma (float): discount rate for future rewards, default is 0.99.


        Returns:
            np.ndarray: V
                - V (numpy.ndarray): updated value estimate
    """

    for episode in range(episodes):
        # reset the environment and EoE flag
        state = env.reset()
        end_of_episode = False

        # initialize empty array to keep track of which states have been
        # visited and how many times
        eligibilty_trace = np.zeros_like(V)

        for _ in range(max_steps):
            # use policy to decide which action to take
            action = policy(state)

            # use selected action and get new reward
            current_state, reward, end_of_episode, _ = env.step(action)

            # caculate the temporal difference error
            td_error = reward + gamma * V[current_state] - V[state]
            # add one each time a certain state is visited
            eligibilty_trace[state] += 1

            # update the value estimate using the eligibilty trace
            V += alpha * td_error * eligibilty_trace

            # decay the eligibility trace
            eligibilty_trace *= gamma * lambtha

            # go to next state
            state = current_state

            # check if episode is over
            if end_of_episode:
                break

    return V


# 2. SARSA(λ)

Write the function `def sarsa_lambtha(env, Q, lambtha, episodes=5000, max_steps=100, alpha=0.1, gamma=0.99, epsilon=1, min_epsilon=0.1, epsilon_decay=0.05):` that performs SARSA(λ):

- `env` is the openAI environment instance
- `Q` is a `numpy.ndarray` of shape `(s,a)` containing the Q table
- `lambtha` is the eligibility trace factor
- `episodes` is the total number of episodes to train over
- `max_steps` is the maximum number of steps per episode
- `alpha` is the learning rate
- `gamma` is the discount rate
- `epsilon` is the initial threshold for epsilon greedy
- `min_epsilon` is the minimum value that epsilon should decay to
- `epsilon_decay` is the decay rate for updating epsilon between episodes
- Returns: `Q`, the updated Q table

In [1]:
def ep_greedy_policy(Q, state, epsilon):
    """ function that performs the epsilon greedy policy

        Args:
            Q (numpy.ndarray): Q-table where rows represent states and columns
                represent actions.
            state (int): the current state of the agent
            epsilon (float): initial exploration rate for the epsilon-greedy
                policy, default is 1.
    """
    # use epsilon-greedy to decide the next move
    # choose a random value between 0 and 1
    choice = np.random.random()
    # decide to explore or exploit based on the epsilon
    if choice < epsilon:
        # exploration: pick a random action
        action = np.random.randint(len(Q[state]))
    else:
        # exploitation: pick the action with highest q-value
        action = np.argmax(Q[state, :])

    return action


def sarsa_lambtha(env, Q, lambtha, episodes=5000, max_steps=100, alpha=0.1,
                  gamma=0.99, epsilon=1, min_epsilon=0.1, epsilon_decay=0.05):
    """ function that that performs SARSA(λ)

        Args:
            env (gym.Env): gym environment instance.
            Q (numpy.ndarray): Q-table where rows represent states and columns
                represent actions.
            lambtha (float):  eligibility trace factor
            episodes (int): total number of episodes to train over,
                default is 5000.
            max_steps (int): maximum number of steps per episode,
                default is 100.
            alpha (float): learning rate for Q-learning, default is 0.1.
            gamma (float): discount rate for future rewards, default is 0.99.
            epsilon (float): initial exploration rate for the epsilon-greedy
                policy, default is 1.
            min_epsilon (float): minimum value that epsilon should decay to,
                default is 0.1.
            epsilon_decay (float): rate at which epsilon decays after each
                episode, default is 0.05.

        Returns:
            np.ndarray: Q
                - Q (numpy.ndarray): updated Q-table after sarsa lambda.
    """
    for episode in range(episodes):
        # reset the environment and EoE flag
        state = env.reset()
        end_of_episode = False
        eligibility_trace = np.zeros_like(Q)
        action = ep_greedy_policy(Q, state, epsilon)

        for _ in range(max_steps):

            # use selected action and get new reward
            current_state, reward, end_of_episode, _ = env.step(action)

            # find next action for the td error
            next_action = ep_greedy_policy(Q, current_state, epsilon)

            # calculate the temporal difference error using the immediate
            # reward and the discounted estimate of the future rewards
            td_error = reward + \
                (gamma * Q[current_state, next_action]) - Q[state, action]

            # add one each time a certain state is visited
            eligibility_trace[state, action] += 1

            # update the Q value using the eligibility trace
            Q[state, action] = Q[state, action] + alpha * \
                td_error * eligibility_trace[state, action]

            # decay the eligibility trace
            eligibility_trace *= gamma * lambtha

            # set the new state and action
            state = current_state
            action = next_action

            # check if the episode is complete
            if end_of_episode:
                break

        # epsilon decay - as the agent explores throughout training, we can
        # reduce the epsilon to encourage exploitation
        epsilon = min_epsilon + (epsilon - min_epsilon) * \
            np.exp(-epsilon_decay * episode)

    return Q


In [3]:
np.random.seed(0)
env = gym.make('FrozenLake8x8-v0')
Q = np.random.uniform(size=(64, 4))
np.set_printoptions(precision=4)
env.seed(0)
print(sarsa_lambtha(env, Q, 0.9))

INFO:gym.envs.registration:Making new env: FrozenLake8x8-v0
[2024-09-20 11:47:19,882] Making new env: FrozenLake8x8-v0
  result = entry_point.load(False)


[[0.6189 0.6679 0.6419 0.6232]
 [0.6605 0.6211 0.6169 0.6075]
 [0.638  0.5846 0.5795 0.5705]
 [0.4008 0.5194 0.317  0.3719]
 [0.3511 0.4706 0.4719 0.4602]
 [0.463  0.461  0.4824 0.4369]
 [0.3563 0.4596 0.3743 0.481 ]
 [0.4513 0.4234 0.4002 0.395 ]
 [0.6523 0.6906 0.6434 0.6497]
 [0.6921 0.6352 0.6378 0.6332]
 [0.6354 0.5387 0.5762 0.5833]
 [0.326  0.4566 0.3632 0.3228]
 [0.4371 0.4556 0.49   0.411 ]
 [0.566  0.3604 0.3804 0.2725]
 [0.5469 0.2942 0.4672 0.2923]
 [0.159  0.2049 0.5482 0.1812]
 [0.7034 0.7204 0.6864 0.6745]
 [0.6763 0.7257 0.6648 0.6513]
 [0.6942 0.5519 0.5537 0.4395]
 [0.2828 0.1202 0.2961 0.1187]
 [0.4236 0.496  0.3147 0.4507]
 [0.5585 0.4804 0.7179 0.2561]
 [0.5519 0.5437 0.4268 0.5985]
 [0.244  0.6455 0.3958 0.3561]
 [0.7232 0.7526 0.7191 0.7237]
 [0.7093 0.7441 0.7324 0.7189]
 [0.6982 0.7739 0.747  0.729 ]
 [0.73   0.7849 0.5068 0.646 ]
 [0.6839 0.7876 0.6701 0.6463]
 [0.8811 0.5813 0.8817 0.6925]
 [0.7589 0.5385 0.6984 0.6519]
 [0.4696 0.6705 0.1542 0.3001]
 [0.775 