<a href="https://colab.research.google.com/github/lfmartins/introduction-to-computational-mathematics/blob/main/introduction_to_markov_decision_processes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

As an example of MDP, let's consider a robot that moves in a $4\times4$ grid. The objective is to mimimize the number of steps the robot takes to reach the one of the target cells $(0,0)$ and $(3,3)$. 

If the robot is in grid cell $(i,j)$, there are at most 4 actions, corresponding to the robot moving north, east, south and west (if the robot is on one of the edges not all actions are allowed). The reward of taking any one of the actions is $-1$, except at the target cells, in which case the reward is zero. 

A discrete MDP is defined in terms of a function:

$p(s', r \mid s, a)$ is the probability that the process will be in state $s'$ at time $t$, given that the process is in sate $s$ at time $t-1$ and action $a$ is selected. (The notation $p(s', r \mid s,a)$ is used to resemble the notation for conditional probability, but it just represents a function of three variables, $s'$, $s$ and $a$.

A *randomized policy* is represented by function of two variables $\pi(a|s)$ that, to every action $a$ available in state $s$, associates the probability that action $a$ is chosen when in state $s$.

To represent a MDP in Python we define the following data structures.

A MDP is represented by objects of the class `MDP`. A `MDP` holds information about a set of states. States are created by the method `add_state()`. When a state is added, the only information that is recorded is the `state_id`. This means that just adding the states does not define the structure of the chain.

To each state, we associate a set of actions through the `add_action()` method. An action specifies a probability distribution on the set $\mathscr{S}\times\mathscr{R}$.

This version has no error checking, but it should eventually be added.

In [None]:
class MDP(object):
  def __init__(self):
    self.states = dict()
  
  def add_state(self, state_id):
    self.states[state_id] = dict()

  def add_action(self, state_id, action_id, probs):
    self.states[state_id][action_id] = probs

As an example, let's implement a MDP corresponding to the gridworld example.

We can use the following MDP as a test case. The state space is $\mathscr{S}=\{1,2,3,4\}$. In each state, there are two possible actions, labeled $A$ and $B$. The following tables specify the transition probabilities and rewards for each action:



In [None]:
mdp = MDP()
mdp.add_state(1)
mdp.add_state(2)
mdp.add_state(3)
mdp.add_state(4)

We now add the actions:

In [None]:
mdp.add_action(1, 'A', {(1, 1): 1/3, (2, 1): 2/3})
mdp.add_action(1, 'B', {(2, 1): 1/2, (4, 1): 1/2})
mdp.add_action(2, 'A', {(2, 2): 1/3, (3, 1): 2/3})
mdp.add_action(2, 'B', {(1, 1): 1/2, (3, 1): 1/2})
mdp.add_action(3, 'A', {(3, 3): 1/3, (3, 4): 2/3})
mdp.add_action(3, 'B', {(2, 1): 1/2, (4, 1): 1/2})
mdp.add_action(4, 'A', {(4, 4): 1/3, (1, 1): 2/3})
mdp.add_action(4, 'B', {(1, 1): 1/2, (3, 1): 1/2})

According to this setup, the value of $P(s',r \mid s, a)$ is accessed in the `MDP` object as:

    self.states[s][a][s', a]

In [None]:
mdp.states[1]['A'][1, 1]

0.3333333333333333

A policy is represented by a dictionary associating to each state a probability distribution

In [None]:
policy = {
    1: {'A': 1/2, 'B': 1/2},
    2: {'A': 1/4, 'B': 3/4},
    3: {'A': 2/3, 'B': 1/3},
    4: {'A':   0, 'B': 1}
}

$p(a\mid s)$ is represented by `policy[s][a]`

We now need the code to solve Bellman's equation.

In [None]:
n = 4  # Grid size
# We initialize the grid to "empty" cells
states = [[None for j in range(n)] for i in range(n)]
states

[[None, None, None, None],
 [None, None, None, None],
 [None, None, None, None],
 [None, None, None, None]]

In [None]:
# Inner cells
for i in range(1, n-1):
  for j in range(1, n-1):
    states[i][j] = {
        'N': {'reward': -1, 'tprobs': {(i-1, j): 1, (i-1, j): 0, (i, j+1): 0, (i+1, j): 0}},
        'W': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 1, (i, j+1): 0, (i+1, j): 0}},
        'S': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 1, (i+1, j): 0}},
        'E': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 0, (i+1, j): 1}},
    }
# Edge cells, not corners
for j in range(1, n-1):
  # North edge
  states[0][j] = {
      'W': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 1, (i, j+1): 0, (i+1, j): 0}},
      'S': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 1, (i+1, j): 0}},
      'E': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 0, (i+1, j): 1}},
  }
  # West edge
  states[j][0] = {
      'N': {'reward': -1, 'tprobs': {(i-1, j): 1, (i-1, j): 0, (i, j+1): 0, (i+1, j): 0}},
      'W': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 1, (i, j+1): 0, (i+1, j): 0}},
      'S': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 1, (i+1, j): 0}},
  }
  # South edge 
  states[n-1][j] = {
      'N': {'reward': -1, 'tprobs': {(i-1, j): 1, (i-1, j): 0, (i, j+1): 0, (i+1, j): 0}},
      'W': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 1, (i, j+1): 0, (i+1, j): 0}},
      'E': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 0, (i+1, j): 1}},
  }
  # East edge
  states[j][n-1] = {
      'N': {'reward': -1, 'tprobs': {(i-1, j): 1, (i-1, j): 0, (i, j+1): 0, (i+1, j): 0}},
      'W': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 1, (i, j+1): 0, (i+1, j): 0}},
      'S': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 1, (i+1, j): 0}},
  }
# Nortwest corner (terminal state)
states[0][0] = {
    'T': {'reward': 0, 'tprobs': {(0,0): 1}}
}
# Southwest corner
states[n-1][0] = {
    'N': {'reward': -1, 'tprobs': {(i-1, j): 1, (i-1, j): 0, (i, j+1): 0, (i+1, j): 0}},
    'E': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 0, (i+1, j): 1}},
}
# Southeast corner (terminal state)
states[n-1][n-1] = {
    'T': {'reward': 0, 'tprobs': {(0,0): 1}}
}
# Northeast corner 
states[0][n-1] = {
    'W': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 1, (i, j+1): 0, (i+1, j):0}},
    'S': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 1, (i+1, j):0}},
}

A randomized policy $\pi$ associates to each state a probability distribution on the set of all actions available at that state. Let's first consider the policy that assumes all actions at each state are equally likely:

In [None]:
policy = [[None for j in range(n)] for i in range(n)]
# Inner cells
for i in range(0, n):
  for j in range(0,n):
    p = 1 / len(states[i][j])
    policy[i][j] = dict()
    for key in states[i][j]:
      policy[i][j][key] = p

Let's now compute the value $V_{\pi}$ of this policy. We first derive a mathematical equation and then show how to implement its solution in Python. Suppose that the process currently is at state $s$. We then choose an action according to the probability distribution $\pi(\cdot|s)$. Let $a$ be the resulting action. Then, we receive a reward $r(s,a)$. So, the expected immediate reward when in state $s$ is:
$$
\sum_{a\in A(s)}\pi(a|s)r(s,a)
$$
Then the process transitions to a new state according to the probability distribution $P(\cdot|s,a)$. The expected future cost is:
$$
\sum_{a\in A(s)}\pi(a|s)\sum_{u}P(u|s,a)V_{\pi}(u)
$$
We conclude that the system equations for $V_{\pi}(\cdot)$ is:
$$
V_{\pi}(s) = \sum_{a \in A(s)}\pi(a|s)\left[r(s,a)+\sum_{u}P(u|s,a)V_{\pi}(u)\right]
$$
Notice that this is a linear system on the unknown value function $V_{\pi}(\cdot)$. Instead of using a standard method (such as Gaussian Elimination), it use use an interactive method to compute the solution. We use the Gauss-Seidel method, which has a particularly simple implementation in this case.

We choose an initial approximation for the value function, $V_{\pi}(s)$, by choosing random values or simply using zeros. Then, we iterate the formula for $V_{\pi}$ given above. This is implemented in the following code. 


In [None]:
niter = 100
vfunc = [[0.0 for j in range(n)] for i in range(n)]
for _ in range(niter):
  for i in range(n):
    for j in range(n):
      for a, p in policy[i][j].items():
        acc = states[i][j][a]['reward']
        for s, q in states[i][j][a]['tprobs'].items():
          k, l = s
          acc += q * vfunc[k][l]
        vfunc[i][j] = p * acc

In [None]:
for i in range(n):
  for j in range(n):
    print(f"{vfunc[i][j]:8.5}", end=' ')
  print()

     0.0 -0.49417 -0.49417 -0.74126 
-0.45688 -0.34266 -0.34266 -0.45688 
-0.49417 -0.37063 -0.37063 -0.48252 
-0.74126 -0.48252 -0.48252      0.0 


In [None]:
for key, item in d.items():
  print(key, item)

NameError: ignored

The following code is to check the result

In [None]:
n = 4
v = [[0.0 for i in range(n)] for j in range(n)]
for _ in range(30):
  # Inner cells 
  for i in range(1, n-1):
    for j in range(1, n-1):
      v[i][j] = -1 + (1/4) * (v[i-1][j] + v[i][j-1] + v[i+1,j] + v[i][j+1])
  # Top and bottom rows
    for j in range(1, n-1):
      v[j][0] = -1 + (1/3) * (v[])