This Google Colab notebook is made to help guide students through the concept of a Markov Decision Process (MDP) using the example of a grid maze. It also covers setup of a Gymnasium Environment and an implementation of Value Iteration.

In [None]:
"""Install requisite packages"""
!pip install swig
!pip install gymnasium[box2d,classic-control]
!pip install numpy
!pip install matplotlib

Collecting swig
  Downloading swig-4.2.1-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl.metadata (3.6 kB)
Downloading swig-4.2.1-py2.py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m14.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: swig
Successfully installed swig-4.2.1
Collecting gymnasium[box2d,classic-control]
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium[box2d,classic-control])
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Collecting box2d-py==2.3.5 (from gymnasium[box2d,classic-control])
  Downloading box2d-py-2.3.5.tar.gz (374 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m374.4/374.4 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading Farama_Notifications-0.0.4-py3-n

In [None]:
"""Import packages"""
import io
import time

import ipywidgets
from IPython.display import display, clear_output
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

"""Utility functions"""
def image_to_bytes(image):
  img = Image.fromarray(image, 'RGB')

  # Convert the PIL image to bytes
  with io.BytesIO() as output:
    img.save(output, format="PNG")
    image_data = output.getvalue()
  return image_data

We want to represent a maze environment, where there is an agent that can move in the cardinal directions. There are walls which restrict movement. For a given maze of size WIDTHxHEIGHT, our MDP is defined as
- S: The state space is {0, 1, 2, ..., WIDTH*HEIGHT-1}, representing the column-wise flattened indices of the maze. For example, for a 5x5 maze, state 8 represents x=1, y=3.
- A: The action space is {0, 1, 2, 3}, representing the directions UP, DOWN, LEFT, and RIGHT respectively.
- R(S, A) -> ℝ The reward function takes a state and an action and gives a reward. For our simple problem, we give a constant penalty of -1 every action the agent takes. No reward is given at the goal.
- P(S, A, S) -> [0,1] The transition function shows the probability of starting at a given state, taking a certain action, and ending up in a different state. Our maze environment is deterministic, so all probabilities are either 0 or 1. For any non-neighbor states S1 and S2, P(S1, ⋅, S2)=0. But if state S3 is to the left of S4, then P(S3, LEFT, S4)=1.
- H: The horizon is the maximum amount of actions our agent is allowed to take.
- γ (gamma): Gamma is a value in [0,1] which acts as a discount factor to make states distant in the future less important. γ=1 means there is no discounting. γ=0.95 means that rewards lose 5% of their value each step into the future.

In [None]:
class MazeMDP():

  MOVES: dict[int,tuple] = {
         0: np.array([-1, 0]), #UP
         1: np.array([1, 0]),  #DOWN
         2: np.array([0, -1]), #LEFT
         3: np.array([0, 1])   #RIGHT
         }

  def __init__(self, maze, H=100, gamma=.95):
    """Initialize the MDP"""
    self.maze = np.array(maze)

    self.obstacles = [tuple(loc) for loc in np.argwhere(self.maze == 1)]
    def get_state_index_from_maze(value):
      locations = [tuple(loc) for loc in np.argwhere(self.maze == value)]
      assert len(locations) == 1, f"There should be exactly one {value} in the maze"
      x, y = locations[0]
      return x * self.maze.shape[0] + y
    self.init_state = get_state_index_from_maze(2)
    self.goal_state = get_state_index_from_maze(3)

    """Create the MDP functions"""
    self.S = set(range(self.maze.shape[0] * self.maze.shape[1]))
    self.A = set(range(len(self.MOVES)))
    self.R = self.build_rewards()
    self.P = self.build_transitions()

    self.H = H
    self.gamma = gamma


  def build_rewards(self):
    """Build reward function"""
    # make every state and action give -1 reward, to penalize long paths
    rewards = np.full((len(self.S), len(self.A)), -1)
    # all actions in the goal state give zero reward
    rewards[self.goal_state, :] = 0

    return rewards
  def build_transitions(self):
    """Build transitoins function"""
    transitions = np.zeros((len(self.S), len(self.A), len(self.S)))

    # Go through all states and all actions
    for s in self.S:
      for a in self.A:
        # By default, the next state is the current state
        s_p = s

        # Convert state to maze index
        xy = np.array([s // self.maze.shape[1], s % self.maze.shape[1]])

        # Get new potential state
        xy_new = xy + self.MOVES[a]

        # Check that new state is valid
        is_in_bounds = 0 <= xy_new[0] < self.maze.shape[0] and 0 <= xy_new[1] < self.maze.shape[1]
        is_free_space = tuple(xy_new) not in self.obstacles

        # Compute the new s' if the transition is valid
        if is_in_bounds and is_free_space:
          s_p = xy_new[0] * self.maze.shape[1] + xy_new[1]

        # Assign transition probability to 1
        transitions[s,a,s_p] = 1

    return transitions

Now that we've clearly defined the MDP, we want to provide an interface for an agent to interact with the environment. This is quite a simple environment, but you can imagine that they get quite complex. Gymnasium is a package that provides a common Environment interface between many such environments, and other useful utilities. `gymnasium` is often abbreviated as `gym`, because it is a well-maintained fork of OpenAI's Gym. A `gym.Env` has the following interface:
- `observation_space`: The observation space defines what data types the states are represented by. For example, an integer or a vector of floats.
- `action_space`: The action space defines what data types the actions are represented by. It is common to have an integer represent different choices.
- `reset()` -> `observation, info`: The reset function marks the start of a new episode. It resets the environment back to its starting state. It returns the starting observation (and a dictionary of auxillary info).
- `step(action)` -> `observation, reward, termination, truncation, info`: The step function executes an action chosen by the agent. The resulting state is returned along with the reward for that action. Termination and truncation signal that the current episode is done (such as by reaching the goal or time running out).
- `render()`: Render is for visualization purposes and is optional.

Let's see `gymnasium` in action with a few of their built-in environments. You can refer to https://gymnasium.farama.org/ for more details.

In [None]:
import gymnasium as gym

# Lots of environments to choose from. Here are some examples
# "CartPole-v1", "MountainCar-v0", "Acrobot-v1", "LunarLander-v2", "CarRacing-v2"
environment_id = "CartPole-v1"
env = gym.make(environment_id, render_mode="rgb_array")

def agent_policy(observation):
  # You can choose actions with any method.
  # This just chooses a random action
  return env.action_space.sample()

print("Observation space:", env.observation_space)
print("Action space:", env.action_space)
# Jupyter notebook stuff to render the output
image_widget = ipywidgets.Image()
display(image_widget)

observation, info = env.reset()
for _ in range(500):
  action = agent_policy(observation)
  observation, reward, terminated, truncated, info = env.step(action)

  done = terminated or truncated
  if done:
    # restart the episode when the previous finishes
    observation, info = env.reset()

  # Render
  image = env.render()
  image_widget.value = image_to_bytes(image)

env.close()

Observation space: Box([-4.8               -inf -0.41887903        -inf], [4.8               inf 0.41887903        inf], (4,), float32)
Action space: Discrete(2)


Image(value=b'')

Now that you are familiar with what Gymnasium does, let's define a gym environment for our previously defined MazeMDP class. In most cases, you would just build the MDP directly into the environment.

In [None]:
"""Define the Maze Gym Environment"""
class MazeEnv(gym.Env):

  def __init__(self, mdp, render_mode = "ansi"):
    """Initialize the maze environment setting all variables and parsing the map"""
    self.mdp = mdp

    self.agent_curr_state = self.mdp.init_state
    self.steps_taken = 0

    # Set action space and observation space
    self.action_space = gym.spaces.Discrete(len(self.mdp.A))
    self.observation_space = gym.spaces.Discrete(len(self.mdp.S))

    # Rendering
    self.render_mode = render_mode

  def reset(self):
    """Reset the environment to its initial state"""
    self.agent_curr_state = self.mdp.init_state
    self.steps_taken = 0

    initial_obs = self.agent_curr_state
    initial_info = {}

    return initial_obs, initial_info

  def step(self, action):
    """Take an action in the environment"""

    assert 0 <= action < 4, "Action should be one of [0,1,2,3]"

    # Get reward according to state and action
    reward = self.mdp.R[self.agent_curr_state, action]

    # Sample next state weighted by transition probabilities
    self.agent_curr_state = np.random.choice(list(self.mdp.S), 1, p=self.mdp.P[self.agent_curr_state, action,:])[0]

    # Make outputs for gym environment
    next_obs = self.agent_curr_state

    self.steps_taken += 1
    has_reached_goal = self.agent_curr_state == self.mdp.goal_state
    has_reached_time_limit = self.steps_taken >= self.mdp.H
    terminated = has_reached_goal or has_reached_time_limit

    # truncated indicates that the episode was ended due to some external condition
    truncated = False
    # info can contain auxillary information for logging and debugging
    info = {}

    return next_obs, reward, terminated, truncated, info

  def render(self):
    """ Render the environment """

    if self.render_mode == "ansi":
      # ansi is text mode
      return render_text(self.agent_curr_state, self.mdp.maze)
    else:
      raise ValueError(f"Unsupported rendering mode {self.render_mode}")

def render_text(agent_state, maze):
  """Get a text form of the maze"""
  character_map = ["-", "X", "-", "G"]
  ncols, nrows = maze.shape

  string=f""
  for i in range(ncols):
    for j in range(nrows):
      s = (i * ncols + j)
      if s == agent_state:
        string += "A"
      else:
        string += character_map[maze[i, j]]
    string += "\n"

  return string


We can test out this environment with an agent that selects random actions.

In [None]:
STEPS_PER_SECOND = 10
output = ipywidgets.Output()
display(output)

# Create the maze MDP. 0 is free space, 1 is walls,
# 2 is the starting location, 3 is the goal location
mdp = MazeMDP([[2,0,0,0,0],
               [1,1,1,1,0],
               [0,0,0,0,0],
               [0,1,0,1,1],
               [0,1,0,0,3]])

# Create the maze gym environment
maze_env = MazeEnv(mdp)

terminated = truncated = done = False
obs, info = maze_env.reset()
num_steps = 0

# Run the maze environment until the episode terminates
while not done:
  # sample a random action
  action = maze_env.action_space.sample()

  next_obs, reward, terminated, truncated, info = maze_env.step(action)
  done = terminated or truncated
  obs = next_obs

  # Render
  with output:
    clear_output(wait=True)
    print(maze_env.render())
  time.sleep(1/STEPS_PER_SECOND)
  num_steps += 1

print(f"Finished in {num_steps} steps")

Output()

Finished in 100 steps


Below, we provide the code for running Value Iteration, a method for solving an MDP. We can use it to test the effects of different reward functions on the Agent's behavior.

In [None]:
class ValueIterationAgent():
  """Agent class for solving tabular MDPs"""

  def __init__(self, env):
    """Initialize the agent. env should have a tabular mdp"""

    assert hasattr(env, 'mdp'), "Gymansium Environment does not have associated MDP"

    self.env = env
    self.mdp = env.mdp
    self.policy = None
    self.v = None


  def value_improvement(self, epsilon=.01):
    """Perform value improvement"""
    mdp = self.mdp

    # Randomly initialize v
    # Easiest initialization is all zeros
    v = np.zeros(len(mdp.S))

    # Loop until the horizon is reached
    for _ in range(mdp.H):

      old_v = v

      # Perform Bellman Backup
      v = np.max(np.sum( mdp.P * (mdp.R[:,:,np.newaxis] + mdp.gamma * v), axis=-1), axis=-1)
      # Set goal state value to 0
      v[mdp.goal_state] = 0

      # Exit if threshold is reached
      if np.max(np.abs(old_v-v)) < epsilon:
        break

    # Determine the best actions with bellman backups
    action_reward = np.sum(mdp.P * (mdp.R[:,:,np.newaxis] + mdp.gamma * v), axis=-1)

    # Compute the optimal policy
    best_actions = (action_reward - np.max(action_reward, axis=1)[:,np.newaxis]) == 0
    policy = best_actions * (1 / np.sum( best_actions, axis=1)[:,np.newaxis])

    self.policy = policy
    self.v = v

    return policy, v

  def get_action(self, state):
    """Get the action given by the policy"""

    # If policy is none, return a random action
    if self.policy is None:
      print("Warning: Value iteration policy has not been computed yet")
      return self.env.action_space.sample()

    # Get the action given by the policy
    action = np.random.choice(list(self.mdp.A), 1, p=self.policy[state,:])[0]

    return action

  def get_value(self, state):
    """Get the value of a given state"""

    return self.v[state]

Now, let's do some guided exploration with a new reward function.

1. Try a reward function with all zeros for all actions and 1 in the goal state. Does this reward function change the agent's behavior as you change gamma between 0 and 1?
2. Refer to the MazeMDP code for an example of how to implement the reward function.
3. Make a reward function of your own. Does this reward function solve the maze optimally? Under what conditions (goal location, agent starting location) does the agent move to the goal in the least number of steps?


*   Give a negative reward every time the agent moves down
*   Give a negative reward every time the agent moves right
*   Change the location of the agent so that it has to move up and right to get to the goal.
* What reward do you need to assign to the goal for the agent to still reach it?
* Does this change if you make the map bigger?

4. Can you find a reward function that generally allows the agent to solve the maze in the least number of steps? (Hint: look at the original implementation!)



In [None]:
class AltRewardMazeMDP(MazeMDP):
  def __init__(self, maze, H=100, gamma=.95):
    super().__init__(maze, H=H, gamma=gamma)

  def build_rewards(self):
    """Build reward function"""
    #raise NotImplementedError("Please implement this :) ")
    return

test_maze = [[2,0,0,0,0],
             [1,1,1,1,0],
             [0,0,0,0,0],
             [0,1,0,1,1],
             [0,1,0,0,3]]

test_maze_2 = [[2,0,0,0,0,1,0,0,0,0],
               [1,1,1,1,0,1,1,1,0,1],
               [0,0,0,0,0,1,0,1,0,0],
               [0,1,0,1,1,0,0,0,0,1],
               [0,1,0,1,0,0,0,1,0,0],
               [0,1,0,0,0,0,1,0,0,0],
               [0,1,0,1,1,1,1,1,1,1],
               [0,0,0,1,0,0,0,0,0,0],
               [0,1,0,0,0,0,1,1,1,0],
               [0,1,0,1,0,1,3,0,0,0],]

gamma = 0.99
test_mdp = AltRewardMazeMDP(test_maze_2, gamma=gamma )



Below, we provide testing code to see how your Maze Agent interacts with the environment after solving the MDP with value iteration. Is it solving the Maze optimally? Why or why not?

In [None]:
STEPS_PER_SECOND = 2
output = ipywidgets.Output()
display(output)

# Create the maze gym environment
maze_env = MazeEnv(test_mdp)

# Create and call a value iteration agent
agent = ValueIterationAgent(maze_env)
agent.value_improvement()

terminated = truncated = done = False
obs, info = maze_env.reset()
num_steps = 0

# Run the maze environment until the episode terminates
while not done and num_steps < 1000:
  # Get best action from the agent policy
  action = agent.get_action(obs)

  next_obs, reward, terminated, truncated, info = maze_env.step(action)
  done = terminated or truncated
  obs = next_obs

  # Render
  with output:
    clear_output(wait=True)
    print(maze_env.render())
  time.sleep(1/STEPS_PER_SECOND)
  num_steps += 1

print(f"Finished in {num_steps} steps")

Output()

Finished in 27 steps


For some fun, we are going to now play around with modifying the environment transitions by adding a transportation square. When the agent takes an action from a transportation square, it gets moved to another transportation square on the map at random.

We specify a teleport state on the map with the number '4'.

In [None]:
class TeleporterMazeMDP(MazeMDP):
  def __init__(self, maze, H=100, gamma=.95):
    super().__init__(maze, H=H, gamma=gamma)

    self.teleport_states = [tuple(loc) for loc in np.argwhere(self.maze == 4)]

  def build_transitions(self):
    """Build transitoins function"""
    transitions = np.zeros((len(self.S), len(self.A), len(self.S)))

    raise NotImplementedError("Please implement this :)")

    # Go through all states and all actions
    for s in self.S:
      for a in self.A:
        # By default, the next state is the current state
        s_p = s

        # Convert state to maze index
        xy = np.array([s // self.maze.shape[1], s % self.maze.shape[1]])

        # Define the logic for if the agent is not in a teleport state
        if xy not in self.teleport_states:

          # Get new potential state
          xy_new = xy + self.MOVES[a]

          # Check that new state is valid
          is_in_bounds = 0 <= xy_new[0] < self.maze.shape[0] and 0 <= xy_new[1] < self.maze.shape[1]
          is_free_space = tuple(xy_new) not in self.obstacles

          # Compute the new s' if the transition is valid
          if is_in_bounds and is_free_space:
            s_p = xy_new[0] * self.maze.shape[1] + xy_new[1]

          # Assign transition probability to 1
          transitions[s,a,s_p] = 1

        else:
          # Define the logic for the if agent is in a teleport state

          # Get the flat index of the teleport states
          tp_states = np.array([s[0]*self.maze.shape[1] + s[1] for s in self.teleport_states])

          # Get how many teleport states there are

          # Define a uniform probability of moving to each teleport state

          # Assign that uniform probability to transitions[s,a,tp_states]

    return transitions


tp_test_maze = [[2,0,0,0,0],
                [1,1,1,4,0],
                [4,0,0,0,0],
                [0,1,0,1,1],
                [0,4,0,0,3]]

tp_test_maze_2 = [[2,0,0,0,0,1,0,0,0,0],
                  [1,1,4,1,0,1,1,4,0,1],
                  [0,0,0,0,0,1,0,1,0,0],
                  [0,1,0,1,1,0,0,0,0,1],
                  [0,1,0,1,4,0,0,1,0,4],
                  [0,1,0,0,0,0,1,0,0,0],
                  [0,1,0,1,1,1,1,1,1,1],
                  [0,0,0,1,0,0,0,0,0,0],
                  [0,4,0,0,0,4,1,1,4,0],
                  [0,1,0,1,0,1,3,0,0,0],]

gamma = 0.99
test_mdp = TeleporterMazeMDP(tp_test_maze_2, gamma=gamma )

Below, we provide code to test your TeleporterMDP environment. Notice how the way we wrote the Gym Environment requires *no changes* eventhough we are using a different underlying MDP. This is the power of the Gym API!

1. How does the optimal policy change with the number and location of teleporting squares in the map? If there is 1 square? 2 squares? 4 squares?

2. What happens if you change the probability of teleporting?

3. Try to make teleporting optional. To do this, add a 5th action (id=4) that will only teleport the agent to another teleporting square if that action is taken. In non-teleporting squares the agent will remain in the same square when taking the teleporting action. How does this change the optimal policy?

4. Does changing the reward function change the utilty of teleporting squares? What if there is a cost associated with a teleporting square? To test this, add the build_rewards() function to the TeleporterMDP and implement it with a different reward function.

In [None]:
STEPS_PER_SECOND = 2
output = ipywidgets.Output()
display(output)

# Create the maze gym environment
maze_env = MazeEnv(test_mdp)

# Create and call a value iteration agent
agent = ValueIterationAgent(maze_env)
agent.value_improvement()

terminated = truncated = done = False
obs, info = maze_env.reset()
num_steps = 0

# Run the maze environment until the episode terminates
while not done and num_steps < 1000:
  # Get best action from the agent policy
  action = agent.get_action(obs)

  next_obs, reward, terminated, truncated, info = maze_env.step(action)
  done = terminated or truncated
  obs = next_obs

  # Render
  with output:
    clear_output(wait=True)
    print(maze_env.render())
  time.sleep(1/STEPS_PER_SECOND)
  num_steps += 1

print(f"Finished in {num_steps} steps")

Further exploration! A few more ideas if you are interested:

1. Add a lava tile where the agent incurs a large negative reward for touching the lava tile. When does the agent avoid the lava tiles? Or not? How does this depend on the reward function and the discount factor (gamma)?

2. Add more actions. Allow the agent to move along the diagonal. This should be a straightforward extension (add more moves to the MOVES array!)