In [1]:
import numpy as np
import gymnasium as gym
import random
import time
from tqdm import tqdm
from collections import defaultdict
from IPython.display import clear_output

In [2]:
env = gym.make("FrozenLake-v1", render_mode="human", is_slippery=False)
env.reset()

env.render()

In [3]:
# observation space - states
print(env.observation_space)
print(env.observation_space.n)

# action space - (0) left, (1) down, (2) right, (3) up
print(env.action_space)
print(env.action_space.n)

Discrete(16)
16
Discrete(4)
4


# Frozen Lake - Stochastic movement

Note that if `is_slippery` is True (default), then the agent will move in the intended direction with a probability of 1/3, otherwise they will move in either *perpendicular* direction with an equal probability of 1/3 in both directions. Hence, an action `right` may result in the agent moving, `down` or `up` instead!

In [4]:
env.reset()
terminated = False
while(not terminated):
    # generate and apply random action
    randomAction = env.action_space.sample()
    next_state, reward, terminated, truncated, info = env.step(randomAction)
    print(["left", "down", "right", "up"][randomAction])
    if not terminated : time.sleep(2)

print("Episode terminated!")

  if not isinstance(terminated, (bool, np.bool8)):


left
left


2023-01-11 19:44:43.497 python[94486:5782127] IMKClient Stall detected, *please Report* your user scenario attaching a spindump (or sysdiagnose) that captures the problem - (imkxpc_bundleIdentifierWithReply:) block performed very slowly (1.97 secs).
2023-01-11 19:44:43.497 python[94486:5782127] IMKClient Stall detected, *please Report* your user scenario attaching a spindump (or sysdiagnose) that captures the problem - (imkxpc_bundleIdentifierWithReply:) block performed very slowly (1.97 secs).


up
up
left
down
up
right
up
up
down
Episode terminated!


## Transition Probabilities

`p(s'|s,a)` denotes the transition probability to next state `s'` from current state `s` with action `a`.

In [None]:
# env.P[state][action]
# Output: transition probability, next state, reward, is terminal
env.P[0][1]

# Building an agent

In [5]:
class FrozenLakeAgent:
    def __init__(
        self,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95
    ):
        """
        Initialize a Reinforcement Learning agent with an empty dictionary of state-action values (q_values),
        a learning rate and an epsilon.
        Args:
        - learning_rate: Amount with which to weight newly learned reward vs old reward (1 - lr)
        - initial epsilon: The initial probability w/ with we sample random action (exploration)
        - epsilon_decay: Value by which epsilon value decays through subtraction
        - final_epsilon: Epsilon value at which decay stops
        - discount_factor: The factor by which future rewards are counted, i.e. expected return on next state (recursive)
        """
        self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon
        
        self.training_error = []
    
    def get_action(self, state: tuple[int, int, bool]) -> int:
        """
        Returns the best action with probability (1 - epsilon) -> exploitation. 
        Otherwise a random action with probability epsilon to ensure exploration.
        """
        if np.random.random() < self.epsilon:
            return env.action_space.sample()
        else:
            return int(np.argmax(self.q_values[state]))
    
    def update(
        self,
        state: tuple[int, int, bool],
        action: int,
        reward: float,
        terminated: bool,
        next_state: tuple[int, int, bool]
    ):
        """
        Updates the Q-value of an action.
        The Q-value update is equivalent to the following weighting of old and new information by the learning rate:
        # self.q_values[state][action] = (1 - self.lr) * self.q_values[state][action] +
        #                                self.lr * (reward + self.discount_factor * future_q_value)
        The temporal difference is the difference between the old and new value over one (time) step.
        """
        future_q_value = (not terminated) * np.max(self.q_values[next_state]) 
        temporal_difference = reward + self.discount_factor * future_q_value - self.q_values[state][action]
        self.q_values[state][action] = self.q_values[state][action] + self.lr * temporal_difference
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)

In [None]:
print(agent.q_values)

# Write training loop

In [6]:
learning_rate = 0.01
n_episodes = 1000
start_epsilon = 1.0
epsilon_decay = start_epsilon / (n_episodes / 2)
final_epsilon = 0.1

agent = FrozenLakeAgent(
    learning_rate = learning_rate,
    initial_epsilon = start_epsilon,
    epsilon_decay = epsilon_decay,
    final_epsilon = final_epsilon,
)

env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=n_episodes)
for episode in tqdm(range(n_episodes)):
    state, info = env.reset()
    done = False
    
    #play one episode
    while not done:
        action = agent.get_action(state)
        next_state, reward, terminated, truncated, info = env.step(action)

        # update the agent
        agent.update(state, action, reward, terminated, next_state)

        # update done status and state
        done = terminated or truncated
        state = next_state

    # once a game is finished we decay epsilon -> converge towards exploitation
    agent.decay_epsilon()

 45%|████▍     | 446/1000 [15:49<16:22,  1.77s/it]  

In [None]:

q_table = np.zeros((state_space_size, action_space_size))
print(q_table)

In [None]:
num_episodes = 10000
max_steps_per_episode = 100

learning_rate = 0.1
discount_rate = 0.99

exploration_rate = 1
max_exploration_rate = 1
min_exploration_rate = 0.01
exploration_decay_rate = 0.001

In [None]:
rewards_all_episodes = []

# Q-learning algorithmm
for episode in range(num_episodes):
    state, _ = env.reset()

    done = False
    rewards_current_episode = 0

    for step in range(max_steps_per_episode):

        # Exploration-exploitation trade-off
        exploration_rate_treshold = random.uniform(0, 1)
        if exploration_rate_treshold > exploration_rate:
            action = np.argmax(q_table[state, :])
        else:
            action = env.action_space.sample()
        
        new_state, reward, done, info, _ = env.step(action)

        # Update Q-table for Q(s,a)
        q_table[state, action] = q_table[state, action] * (1 - learning_rate) + \
            learning_rate * (reward + discount_rate * np.max(q_table[new_state, :])) 

        state = new_state
        rewards_current_episode += reward
        if done == True:
            break

        # Exploration rate decay
        exploration_rate = min_exploration_rate + \
            (max_exploration_rate - min_exploration_rate) * np.exp(-exploration_decay_rate * episode)

        rewards_all_episodes.append(rewards_current_episode)

# Calculate and print the average reward per thousand episodes
rewards_per_thousand_episodes = np.array_split(np.array(rewards_all_episodes), num_episodes / 1000)
count = 1000
print("********Average reward per thousand episodes********\n")
for r in rewards_per_thousand_episodes: 
    print(count, ": ", str(sum(r / 1000)))
    count += 1000

# Print updated Q-table
print("\n\n********Q-table********\n")
print(q_table)