<a href="https://colab.research.google.com/github/spindouken/atlas-machine_learning/blob/main/reinforcement_learning/Q_Learning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Intro/Project/Game Explanation

This project was finalized 1/10/2024.<br>
Author: Mason Counts

gymnasium frozen lake documentation:
https://gymnasium.farama.org/environments/toy_text/frozen_lake/

**Explanation of frozen lake** from gym doc:<br>
The game starts with the player at location [0,0] of the frozen lake grid world with the goal located at far extent of the world e.g. [3,3] for the 4x4 environment.

Holes in the ice are distributed in set locations when using a pre-determined map or in random locations when a random map is generated.

The player makes moves until they reach the goal or fall in a hole.

The lake is slippery (unless disabled) so the player may move perpendicular to the intended direction sometimes (see is_slippery).

Randomly generated worlds will always have a path to the goal.

Project Explanation:
The point of this project is to gain some experience implementing and demonstrating a fundamental reinforcement learning (RL) algorithm, Q-learning, using the OpenAI Gym's FrozenLakeEnv. The goal is to train an agent to navigate a grid-like environment (the frozen lake) to reach a goal while avoiding pitfalls (holes). Here is some more information/explanation about the project tasks:

0. The FrozenLakeEnv Environment
Description: This environment is a grid of blocks, where each block can be either start (S), goal (G), safe (F), or hole (H). The agent starts at S and aims to reach G without falling into H.
Challenge: The environment can be deterministic or stochastic ('slippery'). In a slippery environment, the agent's actions don't always result in the intended outcomes, mimicking the uncertainty of a real frozen lake.
<br>
1. Q-Learning Algorithm
Purpose: Q-learning is a model-free reinforcement learning algorithm used to find an optimal action-selection policy for any given finite Markov decision process (MDP). It works by learning an action-value function that ultimately gives the expected utility of taking a given action in a given state and following the optimal policy thereafter.
Application: In this project, Q-learning is used to train an agent to navigate the FrozenLakeEnv. The agent learns from its environment by interacting with it and receiving feedback in the form of rewards.
<br>
3. Training the Agent (train function)
Process: The agent is trained over many episodes. In each episode, the agent makes decisions based on the current Q-table, observes the outcomes, and updates the Q-table using the Q-learning algorithm.
Outputs: The training process outputs an updated Q-table, which represents the learned policy, and a list of total rewards per episode, which is a measure of how well the agent is learning over time.
<br>
4. Testing the Agent (play function)
Purpose: After training, the play function is used to test the performance of the trained agent. It uses the learned Q-table to navigate the environment, always choosing the action with the highest Q-value.
Output: The function displays each state of the board and the chosen action, providing a visual representation of the agent's path through the environment. The total reward for the episode is returned, indicating the success of the agent in reaching the goal without falling into holes.
Significance of the Project
Understanding RL Concepts: This project is a practical demonstration of key reinforcement learning concepts like exploration vs. exploitation, learning rate, discount factor, and the balance between them.

## Code begins!

In [None]:
import gym

def load_frozen_lake(desc=None, map_name=None, is_slippery=False):
    """
    loads the pre-made FrozenLakeEnv evnironment from OpenAIâ€™s gym

    desc: either None or a list of lists containing
      a custom description of the map to load for the environment
    map_name: either None or a string containing
      the pre-made map to load
      Note: If both desc and map_name are None,
        the environment will load a randomly generated 8x8 map
    is_slippery: a boolean to determine if the ice is slippery

    Returns: the environment
    """
    env = gym.make('FrozenLake-v1', desc=desc, map_name=map_name, is_slippery=is_slippery)
    return env

In [None]:
#!/usr/bin/env python3

import numpy as np

np.random.seed(0)
env = load_frozen_lake()
print(env.desc)
print(env.P[0][0])
env = load_frozen_lake(is_slippery=True)
print(env.desc)
print(env.P[0][0])
desc = [['S', 'F', 'F'], ['F', 'H', 'H'], ['F', 'F', 'G']]
env = load_frozen_lake(desc=desc)
print(env.desc)
env = load_frozen_lake(map_name='4x4')
print(env.desc)


[[b'S' b'F' b'F' b'F' b'F' b'F' b'F' b'H']
 [b'H' b'F' b'F' b'F' b'F' b'H' b'F' b'F']
 [b'F' b'H' b'F' b'H' b'H' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'H' b'F' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'F' b'F' b'H' b'F']
 [b'F' b'F' b'F' b'F' b'F' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'H' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'F' b'F' b'F' b'G']]
[(1.0, 0, 0.0, False)]
[[b'S' b'F' b'H' b'F' b'H' b'F' b'H' b'F']
 [b'H' b'F' b'F' b'F' b'F' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'F' b'F' b'F' b'F']
 [b'F' b'H' b'F' b'F' b'F' b'F' b'F' b'F']
 [b'F' b'F' b'H' b'F' b'F' b'F' b'F' b'H']
 [b'F' b'F' b'F' b'F' b'F' b'H' b'F' b'H']
 [b'F' b'F' b'H' b'F' b'H' b'F' b'H' b'F']
 [b'F' b'F' b'H' b'F' b'F' b'F' b'F' b'G']]
[(0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 8, 0.0, True)]
[[b'S' b'F' b'F']
 [b'F' b'H' b'H']
 [b'F' b'F' b'G']]
[[b'S' b'F' b'F' b'F']
 [b'F' b'H' b'F' b'H']
 [b'F' b'F' b'F' b'H']
 [b'H' b'F' b'F' b'G']]



To implement the q_init function, we need to initialize a Q-table for the reinforcement learning agent in the FrozenLake environment. The Q-table is a matrix where each row represents a state in the environment, and each column represents an action the agent can take in that state. The values in the table are initialized to zero and will be updated during the learning process.

In [None]:
#!/usr/bin/env python3
"""
initializes the Q-table
"""
import numpy as np


def q_init(env):
    """
    initializes the Q-table for the FrozenLakeEnv environment

    env: the FrozenLakeEnv instance

    Returns: the Q-table as a numpy.ndarray of zeros
    """
    # retrieve the number of states from the observations space
    stateCount = env.observation_space.n

    # retrieve the number of actions from the action space
    actionCount = env.action_space.n

    # initialize Qtable with zeros
    Qtable = np.zeros((stateCount, actionCount))

    # the Q table will have number of rows = # states
    #  and number of columns = # actions
    return Qtable

In [None]:
print(env.observation_space.n)
print(env.action_space.n)

16
4


In [None]:
env = load_frozen_lake()
Q = q_init(env)
print('Q-Table Total States (map size(ex. 8x8 = 64)):')
print(Q.shape)
env = load_frozen_lake(is_slippery=True)
Q = q_init(env)
print('Q-Table Transition Probabilities (same as Q-Table size):')
print(Q.shape)
desc = [['S', 'F', 'F'], ['F', 'H', 'H'], ['F', 'F', 'G']]
env = load_frozen_lake(desc=desc)
Q = q_init(env)
print('Q-table generation for 3x3 map:')
print(Q.shape)
env = load_frozen_lake(map_name='4x4')
Q = q_init(env)
print('Q-table generation for 4x4 map:')
print(Q.shape)


Q-Table Total States (map size(ex. 8x8 = 64)):
(64, 4)
Q-Table Transition Probabilities (same as Q-Table size):
(64, 4)
Q-table generation for 3x3 map:
(9, 4)
Q-table generation for 4x4 map:
(16, 4)


In [None]:
#!/usr/bin/env python3
"""
uses epsilon-greedy to determine the next action
"""
import numpy as np


def epsilon_greedy(Q, state, epsilon):
    """
    uses epsilon-greedy to determine the next action
    epsilon_greedy chooses between explore or exploit according to the epsilon parameter
    epsilon is the probability to select a random action

    If exploring (p < epsilon): select a random action
      using numpy.random.randint().
    If exploiting (p >= epsilon): select the action
      with the highest Q-value for the current state.
    Returning the Action Index: The function returns
      the index of the chosen action.

    Q is a numpy.ndarray containing the q-table
    state is the current state of the agent in the environment
    epsilon is the epsilon to use for the calculation of the probability of selecting a random action
    You should sample p with numpy.random.uniformn to determine if your algorithm should explore or exploit
    If exploring, you should pick the next action with numpy.random.randint from all possible actions
    Returns: the index of the chosen action
    """
    # will agent explore or exploit?
    if np.random.uniform(0, 1) < epsilon:
        # explore: choose a random action
        chosenAction = np.random.randint(Q.shape[1])
    else:
        # exploit: choose the best action based on current Q-table
        chosenAction = np.argmax(Q[state])

    return chosenAction

In [None]:
desc = [['S', 'F', 'F'], ['F', 'H', 'H'], ['F', 'F', 'G']]
env = load_frozen_lake(desc=desc)
Q = q_init(env)
Q[7] = np.array([0.5, 0.7, 1, -1])
np.random.seed(0)
print('Testing epsilon greedy function with custom 3x3 env w/ zerod Q-table except for state 7:')

print(epsilon_greedy(Q, 7, 0.5))
print('The output is 2, meaning eps greedy chose to exploit')
np.random.seed(1)
print(epsilon_greedy(Q, 7, 0.5))
print('The output is 0, meaning eps greedy chose to explore')



Testing epsilon greedy function with custom 3x3 env w/ zerod Q-table except for state 7:
2
The output is 2, meaning eps greedy chose to exploit
0
The output is 0, meaning eps greedy chose to explore


Iterate over the number of episodes.
For each episode, reset the environment and iterate for a maximum number of steps.
Use the epsilon-greedy strategy to choose an action.
Take the action and observe the new state and reward.
Update the Q-table using the Q-learning formula.
If the agent falls into a hole, update the reward to -1.
Update epsilon using the decay rate.
Store the total reward for each episode.<br>
**Q-learning Update Formula:**
Q[state, action] = Q[state, action] + alpha * (reward + gamma * max(Q[new_state]) - Q[state, action])
<br>
**Return:**
the updated Q-table and the list of total rewards per episode.

In [None]:
"""
performs Q-learning
"""


def train(
    env,
    Q,
    episodes=5000,
    max_steps=100,
    alpha=0.1,
    gamma=0.99,
    epsilon=1,
    min_epsilon=0.1,
    epsilon_decay=0.05,
):
    """
    performs Q-learning

    env is the FrozenLakeEnv instance
    Q is a numpy.ndarray containing the Q-table
    episodes is the total number of episodes to train over
    max_steps is the maximum number of steps per episode
    alpha is the learning rate
    gamma is the discount rate
    epsilon is the initial threshold for epsilon greedy
    min_epsilon is the minimum value that epsilon should decay to
    epsilon_decay is the decay rate for updating epsilon between episodes
    When the agent falls in a hole, the reward should be updated to be -1
    Returns: Q, total_rewards
    Q is the updated Q-table
    total_rewards is a list containing the rewards per episode
    """
    total_rewards = []

    for episode in range(episodes):
        state = env.reset()
        done = False
        total_reward = 0

        for step in range(max_steps):
            if np.random.uniform(0, 1) < epsilon:
                action = np.random.randint(env.action_space.n)
            else:
                action = np.argmax(Q[state])

            new_state, reward, done, info = env.step(action)

            # update reward when falling in a hole
            if done and reward == 0:
                reward = -1

            # Q-learning formula
            Q[state, action] = Q[state, action] + alpha * (reward + gamma * np.max(Q[new_state]) - Q[state, action])

            total_reward += reward
            state = new_state

            if done:
                break

        epsilon = max(min_epsilon, epsilon - epsilon_decay)
        total_rewards.append(total_reward)

    return Q, total_rewards

In [None]:
np.random.seed(0)
desc = [['S', 'F', 'F'], ['F', 'H', 'H'], ['F', 'F', 'G']]
env = load_frozen_lake(desc=desc)
Q = q_init(env)

Q, total_rewards  = train(env, Q)
print(Q)
split_rewards = np.split(np.array(total_rewards), 10)
for i, rewards in enumerate(split_rewards):
    print((i+1) * 500, ':', np.mean(rewards))


[[0.96059593 0.970299   0.95097357 0.96059591]
 [0.96059481 0.         0.02547319 0.        ]
 [0.25367681 0.         0.         0.        ]
 [0.97029691 0.9801     0.         0.9605959 ]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.98009953 0.98009941 0.99       0.97029715]
 [0.98009832 0.98999981 1.         0.        ]
 [0.         0.         0.         0.        ]]
500 : 0.892
1000 : 0.958
1500 : 0.924
2000 : 0.932
2500 : 0.942
3000 : 0.948
3500 : 0.95
4000 : 0.924
4500 : 0.958
5000 : 0.944


In [None]:
import numpy as np
from IPython.display import clear_output
import time

# how many games to have the agent play
play_count = 3

def render_game(env, state, grid_size):
    """
    Custom render function for the game.
    env: the game environment
    state: current state of the agent
    grid_size: size of the grid
    """
    grid = np.array(env.desc).reshape(grid_size, grid_size)
    row, col = state // grid_size, state % grid_size

    for r in range(grid_size):
        for c in range(grid_size):
            if r == row and c == col:
                print("\033[91m{}\033[00m".format(grid[r, c].decode("utf-8")), end=" ")
            else:
                print(grid[r, c].decode("utf-8"), end=" ")
        print("")

def play(env, Q, max_steps=100, grid_size=4):
    """
    env is the FrozenLakeEnv instance
    Q is a numpy.ndarray containing the Q-table
    max_steps is the maximum number of steps in the episode
    grid_size is the size of the grid
    """
    for episode in range(play_count):
        state = env.reset()
        done = False
        total_rewards = 0
        print("*****EPISODE ", episode+1, "*****\n\n\n\n")
        time.sleep(1)

        for step in range(max_steps):
            clear_output(wait=True)
            render_game(env, state, grid_size)
            time.sleep(0.3)

            action = np.argmax(Q[state])
            new_state, reward, done, info = env.step(action)
            total_rewards += reward

            if done:
                clear_output(wait=True)
                render_game(env, state, grid_size)
                if reward == 1:
                    print("****The agent ascended!****")
                else:
                    print("****The agent fell into the abyss!****")
                time.sleep(3)
                break

            state = new_state

        print(f"Episode: {episode + 1}, Total Rewards: {total_rewards}\n")

    env.close()



In [None]:

np.random.seed(0)
desc = [['S', 'F', 'F'], ['F', 'H', 'H'], ['F', 'F', 'G']]
env = load_frozen_lake(desc=desc)
Q = q_init(env)

Q, total_rewards = train(env, Q)
print("Total rewards from training:", total_rewards)

# Play the game
play(env, Q, max_steps=100, grid_size=3)

S F F 
F H H 
F [91mF[00m G 
****The agent ascended!****
Episode: 3, Total Rewards: 1.0



Code below is an expansion of the project, building a random size map with a random construction for each agent run. Stochasticity was also introduced to the frozen spots in the lake with the is_slippery being set to true in the main file. The agent is not having such a fun time with such short training now.

In [None]:
import gym
import numpy as np

def load_frozen_lake(desc=None, map_name=None, is_slippery=False):
    """
    Loads a FrozenLake environment with a random map if desc and map_name are None.
    """
    if desc is None and map_name is None:
        # generate a random map between 4 - 8 grid size
        size = np.random.choice([4, 5, 6, 7, 8])
        desc = gym.envs.toy_text.frozen_lake.generate_random_map(size=size)
    env = gym.make('FrozenLake-v1', desc=desc, map_name=map_name, is_slippery=is_slippery)
    return env


In [None]:
def main():
    np.random.seed(0)

    # number of maps to generate
    num_maps = 5

    for i in range(num_maps):
        print(f"Map {i+1}:")

        # load a random map
        env = load_frozen_lake(is_slippery=True)
        Q = q_init(env)

        # train the agent
        Q, total_rewards = train(env, Q)
        print("Total rewards from training:", total_rewards)

        # play the game
        play(env, Q, max_steps=1000, grid_size=env.nrow)

        print("\n\n")

if __name__ == "__main__":
    main()


S H F F H F 
F F F F F F 
[91mF[00m H H H F H 
H F H H F H 
F F H F F H 
F F F F F G 
****The agent fell into the abyss!****


KeyboardInterrupt: 