In [None]:
!pip install gymnasium
!pip install plotly
from myst_nb import glue

# Let's practice

We will see a very simle grid world problem. In this environment the agent start always in the same position and needs to reach a cell in the world.
To complicate things for the agent, one cell is a trap and if the agent go on it the game is lost.

Visualy the world will look like this:

```{figure} /lectures/mdp/dynamic-programming/grid_world.png
:align: center
:width: 70%
```

Since MDPs (Markov Decision Processes) are designed to solve stochastic problems, we will introduce some stochasticity to the problem. Specifically, we will add uncertainty to the agent's movements. When the agent decides to move in one direction, such as UP, there is an 80% chance of moving in the desired direction and a 10% chance of moving to one of the adjacent cells.

```{figure} /lectures/mdp/dynamic-programming/transition.png
:align: center
:width: 70%
```

If the agent moves into a wall, the agent stays in the same cell instead.

## Markov Decision Process

Before implementing the environment and solving it, we need to define it as a MDP $\Omega = (S,A,T,R)$, where:
- $S$ is the state space: each state $s$ will be a cell of the grid such as $\forall s \in S, s \in [0, 11]$.
- $A$ is the action space: each action $a$ will be a direction such as $\forall a \in A, a \in \{UP, DOWN, LEFT, RIGHT\}$.
- $T$ is the transition function. Moving in a specific direction have a succes chance of $80\%$, and there is a $20\%$ probability of going in the adjacent cells instead.
- $R$ is the reward function: Entering the goal gives $+10$, falling in the trap gives $-10$, and all the other actions gives $0$.

## Environment Implementation

Again, we will implement the environment using `Gym` conventions and call it `GridWorld`.

```python
class GridWorld(gymnasium.Env):
  def __init__(self):
    pass

  def step(self, action: int):
    pass
```

### State space and Action space

Based on our definition of the MDP, the state space represents all the states possible, which in this problem is all the cells the agent can reach, and the action space is all the action the agent can perform.

In both cases the spaces are discrete, so we can use the `Discrete` type of `Gym` to represent them.

```python
self.action_space = spaces.Discrete(4) # Up, Down, Left, Right
self.observation_space = spaces.Discrete(12) # 12 cells
```

```{margin} State space
There is not one way to represent the state space for a problem.
In this problem we could represent it by the grid itself where the agent will be represented by the cell having boolean equal to `True`.
You would have `spaces.MultiDiscrete(np.array([3, 4]))`.
```

```{note}
We consider the a state space containing 12 states, so it includes the wall cell. The agent cannot technically go on it, but it eases the implementation so we include it.
```

### Transition function

In reinforcement learning, we don't need to explicitly implement the transition function. However, solving MDPs with value iteration requires it: 

```{math}
V(s_{k+1}) = \sum_a\pi(a|s)\sum_{s'}p(s'|s,a)\left[ r(s,a) + \gamma v_k(s') \right]
```

`Gym` doesn't provide any method or convention of doing it, so we'll create a variable `P` representing our transiton function. It will be a dictionary of dictionary, where the first key is the current state and the second key will be the action. It will return the list of list containing the probability, the state and the reward.

```python
self.P = { 0: { 0: [[0.9, 0, 0], [0.1, 1, 0]], 1: [[0.8, 4, 0], [0.1, 1, 0], [0.1, 0, 0]], 2: [[0.9, 0, 0], [0.1, 4, 0]], 3: [[0.8, 1, 0], [0.1, 4, 0], [0.1, 0, 0]]},
               1: { 0: [[0.8, 1, 0], [0.1, 0, 0], [0.1, 2, 0]],  1: [[0.8, 1, 0], [0.1, 0, 0], [0.1, 2, 0]], 2: [[0.8, 0, 0], [0.2, 1, 0]], 3: [[0.8, 2, 0], [0.2, 1, 0]]},
               2: { 0: [[0.8, 2, 0], [0.1, 1, 0], [0.1, 3, 0]], 1: [[0.8, 6, 0], [0.1, 1, 0], [0.1, 3, 0]], 2: [[0.8, 1, 0], [0.1, 2, 0], [0.1, 6, 0]], 3: [[0.8, 3, 0], [0.1, 2, 0], [0.1, 6, 0]]},
               3: { 0: [[0.9, 3, 0], [0.1, 2, 0]], 1: [[0.8, 7, -10], [0.1, 2, 0], [0.1, 3, 0]], 2: [[0.8, 2, 0], [0.1, 3, 0], [0.1, 7, -10]], 3: [[0.9, 3, 0], [0.1, 7, -10]]},
               4: { 0: [[0.8, 0, 0], [0.2, 4, 0]], 1: [[0.8, 8, 0], [0.2, 4, 0]], 2: [[0.8, 4, 0], [0.1, 0, 0], [0.1, 8, 0]], 3: [[0.8, 4, 0], [0.1, 0, 0], [0.1, 8, 0]]},
               5: { 0: [[1, 5, 0]], 1: [[1, 5, 0]], 2: [[1, 5, 0]], 3: [[1, 5, 0]]},
               6: { 0: [[0.8, 2, 0], [0.1, 6, 0], [0.1, 7, -10]], 1: [[0.8, 10, 0], [0.1, 6, 0], [0.1, 7, -10]], 2: [[0.8, 6, 0], [0.1, 10, 0], [0.1, 2, 0]], 3: [[0.8, 7, -10], [0.1, 2, 0], [0.1, 10, 0]]},
               7: { 0: [[1, 7, -10]], 1: [[1, 7, -10]], 2: [[1, 7, -10]], 3: [[1, 7, -10]]},
               8: { 0: [[0.8, 4, 0], [0.1, 9, 0], [0.1, 8, 0]], 1: [[0.9, 8, 0], [0.1, 9, 0]], 2: [[0.9, 8, 0], [0.1, 4, 0]], 3: [[0.8, 9, 0], [0.1, 4, 0], [0.1, 8, 0]]},
               9: { 0: [[0.8, 9, 0], [0.1, 8, 0], [0.1, 10, 0]], 1: [[0.8, 9, 0], [0.1, 8, 0], [0.1, 10, 0]], 2: [[0.8, 8, 0], [0.2, 9, 0]], 3: [[0.8, 10, 0], [0.2, 9, 0]]},
               10: { 0: [[0.8, 6, 0], [0.1, 9, 0], [0.1, 11, 10]], 1: [[0.8, 10, 0], [0.1, 9, 0], [0.1, 11, 10]], 2: [[0.8, 9, 0], [0.1, 6, 0], [0.1, 10, 0]], 3: [[0.8, 11, 10], [0.1, 6, 0], [0.1, 10, 0]]},
               11: { 0: [[1, 11, 10]], 1: [[1, 11, 10]], 2: [[1, 11, 10]], 3: [[1, 11, 10]]},
              } 
```

It is preferable to not use the variable `P` for applying the transition in the environment. We can implement it by modifying the action based on a random number to simulate the error. If the random number is below $0.8$, then it is a success and the action stay the same. Otherwise the action is changed to the clockwise or counter clockwise action.

```{figure} /lectures/mdp/dynamic-programming/actions.png
:align: center
:width: 50%
```

```python
prob = np.random.random()
if prob < 0.80:
  actual_action = action
elif prob < 0.90:
  # Adjacent cell "clockwise"
  actual_action = (action + 1) % 4
else:
  # Adjacent cell "counter clockwise"
  actual_action = (action - 1) % 4  
```

Our state is represented as an integer, but we can convert it to coordinates.

```python
r = np.floor(self.state / 3)
c = self.state % 4
```

Once it is done we can apply the action and modify the coordinates.

```python
if actual_action == 0:
  r = max(0, r - 1)
elif actual_action == 2:
  r = min(2, r + 1)
elif actual_action == 1:
  c = max(0, c - 1)
elif actual_action == 3:
  c = min(3, c + 1)
```
 And finally, we convert it back to an integer.

```python
self.state = r * 4 + c
```

If we put everyting together, we obtain a function that will manage the transtion for us.

```python

  def _transition(self, action: int):
    """
    Transition function.
    :param action: Action to take
    """
    r = np.floor(self.state / 3)
    c = self.state % 4

    prob = np.random.random()
    if prob < 0.80:
      actual_action = action
    elif prob < 0.90:
      # Adjacent cell "clockwise"
      actual_action = (action + 1) % 4
    else:
      # Adjacent cell "counter clockwise"
      actual_action = (action - 1) % 4


    if actual_action == 0:
      r = max(0, r - 1)
    elif actual_action == 2:
      r = min(2, r + 1)
    elif actual_action == 1:
      c = max(0, c - 1)
    elif actual_action == 3:
      c = min(3, c + 1)
    self.state = r * 4 + c
```

### Reward function

The reward is straightforward, if we reach the $+10$ ($-10$) cell the agent receive $+10$ ($-10$), otherwise the rewards is always $0$.

We implement the reward directly in the `step` function.

```python

  def step(self, action: int):
    self._transition(action)
    
    done = False

    reward = 0
    if self.state == 11:
      reward = 10
      done = True
    elif self.state == 7:
      reward = -10
      done = True

    # Return the observation, reward, done flag, and info
    return self.state, reward, done, {}
```

### Everything together

The full envronment is provided below. You will see it contains a way to render the environment, it can be useful sometimes.

```{admonition} Activity
:class: activity

The function `render` doesn't show the position of the agent and it is up to you to finish it.
```


In [None]:
import gymnasium
from gymnasium import spaces
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle

class GridWorld(gymnasium.Env):
  def __init__(self):
    # Define the action and observation spaces
    self.action_space = spaces.Discrete(4) # Up, Down, Left, Right
    self.observation_space = spaces.Discrete(12) # 12 cells

    self.P = { 0: { 0: [[0.9, 0, 0], [0.1, 1, 0]], 1: [[0.8, 4, 0], [0.1, 1, 0], [0.1, 0, 0]], 2: [[0.9, 0, 0], [0.1, 4, 0]], 3: [[0.8, 1, 0], [0.1, 4, 0], [0.1, 0, 0]]},
               1: { 0: [[0.8, 1, 0], [0.1, 0, 0], [0.1, 2, 0]],  1: [[0.8, 1, 0], [0.1, 0, 0], [0.1, 2, 0]], 2: [[0.8, 0, 0], [0.2, 1, 0]], 3: [[0.8, 2, 0], [0.2, 1, 0]]},
               2: { 0: [[0.8, 2, 0], [0.1, 1, 0], [0.1, 3, 0]], 1: [[0.8, 6, 0], [0.1, 1, 0], [0.1, 3, 0]], 2: [[0.8, 1, 0], [0.1, 2, 0], [0.1, 6, 0]], 3: [[0.8, 3, 0], [0.1, 2, 0], [0.1, 6, 0]]},
               3: { 0: [[0.9, 3, 0], [0.1, 2, 0]], 1: [[0.8, 7, -10], [0.1, 2, 0], [0.1, 3, 0]], 2: [[0.8, 2, 0], [0.1, 3, 0], [0.1, 7, -10]], 3: [[0.9, 3, 0], [0.1, 7, -10]]},
               4: { 0: [[0.8, 0, 0], [0.2, 4, 0]], 1: [[0.8, 8, 0], [0.2, 4, 0]], 2: [[0.8, 4, 0], [0.1, 0, 0], [0.1, 8, 0]], 3: [[0.8, 4, 0], [0.1, 0, 0], [0.1, 8, 0]]},
               5: { 0: [[1, 5, 0]], 1: [[1, 5, 0]], 2: [[1, 5, 0]], 3: [[1, 5, 0]]},
               6: { 0: [[0.8, 2, 0], [0.1, 6, 0], [0.1, 7, -10]], 1: [[0.8, 10, 0], [0.1, 6, 0], [0.1, 7, -10]], 2: [[0.8, 6, 0], [0.1, 10, 0], [0.1, 2, 0]], 3: [[0.8, 7, -10], [0.1, 2, 0], [0.1, 10, 0]]},
               7: { 0: [[1, 7, -10]], 1: [[1, 7, -10]], 2: [[1, 7, -10]], 3: [[1, 7, -10]]},
               8: { 0: [[0.8, 4, 0], [0.1, 9, 0], [0.1, 8, 0]], 1: [[0.9, 8, 0], [0.1, 9, 0]], 2: [[0.9, 8, 0], [0.1, 4, 0]], 3: [[0.8, 9, 0], [0.1, 4, 0], [0.1, 8, 0]]},
               9: { 0: [[0.8, 9, 0], [0.1, 8, 0], [0.1, 10, 0]], 1: [[0.8, 9, 0], [0.1, 8, 0], [0.1, 10, 0]], 2: [[0.8, 8, 0], [0.2, 9, 0]], 3: [[0.8, 10, 0], [0.2, 9, 0]]},
               10: { 0: [[0.8, 6, 0], [0.1, 9, 0], [0.1, 11, 10]], 1: [[0.8, 10, 0], [0.1, 9, 0], [0.1, 11, 10]], 2: [[0.8, 9, 0], [0.1, 6, 0], [0.1, 10, 0]], 3: [[0.8, 11, 10], [0.1, 6, 0], [0.1, 10, 0]]},
               11: { 0: [[1, 11, 10]], 1: [[1, 11, 10]], 2: [[1, 11, 10]], 3: [[1, 11, 10]]},
              } 

    # Initialize the state
    self.state = 0

  def step(self, action: int):

    self._transition(action)

    done = False

    reward = 0
    if self.state == 11:
      reward = 10
      done = True
    elif self.state == 7:
      reward = -10
      done = True


    # Return the observation, reward, done flag, and info
    return self.state, reward, done, {}

  def _transition(self, action: int):
    """
    Transition function.
    :param action: Action to take
    """
    r = np.floor(self.state / 3)
    c = self.state % 4

    prob = np.random.random()
    if prob < 0.80:
      actual_action = action
    elif prob < 0.90:
      # Adjacent cell "clockwise"
      actual_action = (action + 1) % 4
    else:
      # Adjacent cell "counter clockwise"
      actual_action = (action - 1) % 4


    if actual_action == 0:
      r = max(0, r - 1)
    elif actual_action == 2:
      r = min(2, r + 1)
    elif actual_action == 1:
      c = max(0, c - 1)
    elif actual_action == 3:
      c = min(3, c + 1)
    self.state = r * 4 + c

  def reset(self):
    """
    Reset the environment.
    """
    self.state = 0
    return self.state

  def render(self, render="human"):
    fig, ax = plt.subplots()
    ax.set_xlim(0, 4)
    ax.set_ylim(0, 3)
    ax.set_aspect('equal')


    for i in range(4):
      for j in range(3):
        if j * 4 + i == 11:
          rect = Rectangle((i, j), 1, 1, edgecolor='black', facecolor='green')
          ax.add_patch(rect)
        elif j * 4 + i == 7:
          rect = Rectangle((i, j), 1, 1, edgecolor='black', facecolor='red')
          ax.add_patch(rect)
        elif j * 4 + i == 5:
          rect = Rectangle((i, j), 1, 1, edgecolor='black', facecolor='grey')
          ax.add_patch(rect)
        else:
          rect = Rectangle((i, j), 1, 1, edgecolor='black', facecolor='white')
          ax.add_patch(rect)

    ax.tick_params(axis='both',       # changes apply to both axis
                    which='both',      # both major and minor ticks are affected
                    bottom=False,      # ticks along the bottom edge are off
                    top=False,         # ticks along the top edge are off
                    left=False,
                    right=False,
                    labelbottom=False,
                    labelleft=False) # labels along the bottom edge are off

    plt.show()


## Value Iteration

Value Iteration is very easy to implement, and take advantage of the information provide by the environment to simplify the code.


In [None]:
def value_iteration(env, gamma=0.9, theta=0.0001):
  V = np.zeros(env.observation_space.n)
  while True:
    delta = 0
    for s in range(env.observation_space.n):
      v = V[s]
      V[s] = max([sum([p * (r + gamma * V[s_]) for p, s_, r in env.P[s][a]]) for a in env.P[s]])
      delta = max(delta, np.abs(v - V[s]))
      V[7] = -10
      V[11] = 10
    if delta < theta:
      break
  pi = np.zeros(env.observation_space.n)
  for s in range(env.observation_space.n):
    pi[s] = np.argmax([sum([p * (r + gamma * V[s_]) for p, s_, r in env.P[s][a]]) for a in env.P[s]])
  return V, pi

We can use the observation space to intialize the value function with zeros.

```python
V = np.zeros(env.observation_space.n)
```

We update the value for each state by looping other them (using the observation space again) and we apply the formula. This is where we actually need `P`, because it simplifies the code a lot.

```python
V[s] = max([sum([p * (r + gamma * V[s_]) for p, s_, r in env.P[s][a]]) for a in env.P[s]])
```
```{margin} Theta
The value of theta is up to you, but if theta is too big yo may not converge to the optimal policy.
Too small, it could take a long time to converge.
```

Finally, once we converged (based on $\theta$) we calculate the policy the same way we calculated the value function but we use `argmax` instead.

```python
for s in range(env.observation_space.n):
    pi[s] = np.argmax([sum([p * (r + gamma * V[s_]) for p, s_, r in env.P[s][a]]) for a in env.P[s]])
```


In [None]:
from IPython.display import display, clear_output

def value_iteration_interact(env, gamma=0.9, theta=0.0001, step=0):
  V = np.zeros(env.observation_space.n)
  for i in range(step):
    delta = 0
    for s in range(env.observation_space.n):
      v = V[s]
      V[s] = max([sum([p * (r + gamma * V[s_]) for p, s_, r in env.P[s][a]]) for a in env.P[s]])
      delta = max(delta, np.abs(v - V[s]))
      V[7] = -10
      V[11] = 10
    if delta < theta:
      break
  pi = np.zeros(env.observation_space.n)
  for s in range(env.observation_space.n):
    pi[s] = np.argmax([sum([p * (r + gamma * V[s_]) for p, s_, r in env.P[s][a]]) for a in env.P[s]])
  return V, pi

def render(labels):
  fig, ax = plt.subplots()
  ax.set_xlim(0, 4)
  ax.set_ylim(0, 3)
  ax.set_aspect('equal')

  for i in range(4):
    for j in range(3):
      if j * 4 + i == 11:
        rect = Rectangle((i, j), 1, 1, edgecolor='black', facecolor='green')
        ax.add_patch(rect)
      elif j * 4 + i == 7:
        rect = Rectangle((i, j), 1, 1, edgecolor='black', facecolor='red')
        ax.add_patch(rect)
      elif j * 4 + i == 5:
        rect = Rectangle((i, j), 1, 1, edgecolor='black', facecolor='grey')
        ax.add_patch(rect)
      else:
        rect = Rectangle((i, j), 1, 1, edgecolor='black', facecolor='white')
        ax.add_patch(rect)
      ax.text(i + 0.5, j + 0.5, int(labels[j * 4 + i]), ha='center', va='center')

  ax.tick_params(axis='both',        # changes apply to both axis
                  which='both',      # both major and minor ticks are affected
                  bottom=False,      # ticks along the bottom edge are off
                  top=False,         # ticks along the top edge are off
                  left=False,
                  right=False,
                  labelbottom=False,
                  labelleft=False)   # labels along the bottom edge are off
  display(fig)
  plt.close(fig)

def update(gamma=0.9, step=0):
  V, pi = value_iteration_interact(env, gamma, step=step)
  render(V)

In [None]:
from ipywidgets import *

env = GridWorld()
interact(update, gamma = (0.5,0.9,0.1), step = (0, 50, 1));

In [None]:
import plotly.graph_objects as go
import numpy as np

def value_iteration_interact(env, gamma=0.9, theta=0.0001, step=0):
  V = np.zeros(env.observation_space.n)
  V[7] = -10
  V[11] = 10
  for i in range(step):
    delta = 0
    for s in range(env.observation_space.n):
      v = V[s]
      V[s] = max([sum([p * (r + gamma * V[s_]) for p, s_, r in env.P[s][a]]) for a in env.P[s]])
      delta = max(delta, np.abs(v - V[s]))
      V[7] = -10
      V[11] = 10
    if delta < theta:
      break
  pi = np.zeros(env.observation_space.n)
  for s in range(env.observation_space.n):
    pi[s] = np.argmax([sum([p * (r + gamma * V[s_]) for p, s_, r in env.P[s][a]]) for a in env.P[s]])
  return V, pi


env = GridWorld()
# Create figure
fig = go.Figure()

# Add traces, one for each slider step
for step in range(0, 15):
    V, _ = value_iteration_interact(env, 0.9, step=step) 
    x, y, text = [], [], []
    for i in range(4):
      for j in range(3):
        if j * 4 + i == 11:
          fig.add_shape(type="rect", x0=i, y0=j, x1=i + 1, y1=j + 1, line=dict(color="black"), fillcolor="green", layer="below")
        elif j * 4 + i == 7:
          fig.add_shape(type="rect", x0=i, y0=j, x1=i + 1, y1=j + 1, line=dict(color="black"), fillcolor="red", layer="below")
        elif j * 4 + i == 5:
          fig.add_shape(type="rect", x0=i, y0=j, x1=i + 1, y1=j + 1, line=dict(color="black"), fillcolor="gray", layer="below")
        else:
          fig.add_shape(type="rect", x0=i, y0=j, x1=i + 1, y1=j + 1, line=dict(color="black"), fillcolor="white", layer="below")
        x.append(i + 0.5)
        y.append(j + 0.5)
        text.append(int(V[j * 4 + i]))
    fig.add_trace(go.Scatter(
      x=x,
      y=y,
      text=text,
      mode="text",
      textfont=dict(
          color="black",
          size=18,
          family="Arail",
      ),
      visible=False
    )
)
fig.data[0].visible = True
# Create and add slider
steps = []
for i in range(len(fig.data)):
    step = dict(
        method="update",
        args=[{"visible": [False] * len(fig.data)}],  # layout attribute
    )
    step["args"][0]["visible"][i] = True  # Toggle i'th trace to "visible"
    steps.append(step)

sliders = [dict(
    active=0,
    pad={"t": 50},
    steps=steps
)]

fig.update_xaxes(
    showticklabels=False,
    showgrid=False,
    zeroline=False,
    automargin=True
)

fig.update_yaxes(
    showticklabels=False,
    showgrid=False,
    zeroline=False,
    scaleanchor="x",
    scaleratio=1,
    automargin=True
)

fig.update_layout(
    autosize=False,
    sliders=sliders
)

fig.show()
