<a href="https://colab.research.google.com/github/lfmartins/markov-decision-processes/blob/main/markov_decision_processes.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

As an example of MDP, let's consider a robot that moves in a $4\times4$ grid. The objective is to mimimize the number of steps the robot takes to reach the one of the target cells $(0,0)$ and $(3,3)$. 

If the robot is in grid cell $(i,j)$, there are at most 4 actions, corresponding to the robot moving north, east, south and west (if the robot is on one of the edges not all actions are allowed). The reward of taking any one of the actions is $-1$, except at the target cells, in which case the reward is zero. 

A discrete MDP is defined in terms of two functions:

- $p(s' \mid s, a)$ is the probability that the process will be in state $s'$ at time $t$, given that the process is in sate $s$ at time $t-1$ and action $a$ is selected. (The notation $p(s' \mid s,a)$ is used to resemble the notation for conditional probability, but it just represents a function of three variables, $s'$, $s$ and $a$.
- $r(s,a)$ is the reward obtained for selecting action $a$.

A *randomized policy* is represented by function of two variables $\pi(a|s)$ that, to every action $a$ available in state $s$, associates the probability that action $a$ is chosen when in state $s$.

To represent a MDP in Python we define the following data structures.

A MDP is represented by objects of the class `MDP`. This is a very thin implementation. A `MDP` holds information about a set of states. States are created by the method `add_state()`. When a state is added, the only information that is recorded is the `state_id`. This means that just adding the states does not define the structure of the chain.

To each state, we associate a set of actions through the `add_action()` method. An action specifies a reward and a set of transition probabilities.

In [1]:
import numpy as np

class MDP(object):
  def __init__(self):
    self.states = dict()
    self.states_list = []
    self.terminal_states = set()
  
  def add_state(self, state_id, terminal=False):
    if state_id in self.states:
      raise ValueError(f'state {state_id} already exists')
    self.states[state_id] = dict()
    self.states_list.append(state_id)
    if terminal:
      self.terminal_states.add(terminal)
  
  def add_action(self, state_id, action_id, reward, tprobs):
    if not state_id in self.states:
      raise KeyError(f'state {state_id} does not exist')
    if action_id in self.states[state_id]:
      raise ValueError('repeated action for state {state_id}')
    self.states[state_id][action_id] = {'reward': reward, 'tprobs': tprobs}

  def reward(self, state_id, action_id):
    return self.states[state_id][action_id]['reward']
    
  def transition_prob(self, state1_id, action_id, state2_id):
    return self.states[state1_id][action_id]['tprobs'][state2_id]

  def pretty_print(self):
    for state_id in self.states:
      print(f'State {state_id}')
      for action_id in self.states[state_id]:
        print(f'  Action {action_id}:')
        print(f'    Reward: {self.reward(state_id, action_id)}')
        print(f'    Transition probabilities: ', end='')
        for state2_id in self.states[state_id][action_id]['tprobs']:
          print(f'({state2_id}, {self.transition_prob(state_id, action_id, state2_id)})', end=' ')
        print()
    
  def value_function(self, policy, gamma, iterations=1000):
    value = dict((state_id, 0.0) for state_id in self.states)
    for _ in range(iterations):
      for state_id in self.states:
        if state_id in self.terminal_states:
          value[state_id] = 0.0
          continue
        value[state_id] = sum(
          policy[state_id][action_id] * (
            self.reward(state_id, action_id) + gamma * sum(
              self.transition_prob(state_id, action_id, state1_id) * value[state1_id]
              for state1_id in self.states[state_id][action_id]['tprobs']))
            for action_id in policy[state_id])
      return value

  def transition_matrix(self, policy):
    n = len(self.states_list)
    tm = np.zeros(n, n, np.float64)
    for i, s in enumerate(states):


SyntaxError: ignored

As an example, let's implement a MDP corresponding to the tiny robot example.

In [None]:
tiny_robot = MDP()
tiny_robot.add_state(1)
tiny_robot.add_state(2)
tiny_robot.add_state(3)
tiny_robot.add_state(4)

In [None]:
tiny_robot.add_action(1, 'A', 1, {1: 1/3, 2: 2/3})
tiny_robot.add_action(1, 'B', 4, {2: 1/2, 4: 1/2})
tiny_robot.add_action(2, 'A', 2, {2: 1/3, 3: 2/3})
tiny_robot.add_action(2, 'B', 3, {1: 1/2, 3: 1/2})
tiny_robot.add_action(3, 'A', 3, {3: 1/3, 4: 2/3})
tiny_robot.add_action(3, 'B', 2, {2: 1/2, 4: 1/2})
tiny_robot.add_action(4, 'A', 4, {4: 1/3, 1: 2/3})
tiny_robot.add_action(4, 'B', 1, {1: 1/2, 3: 1/2})

In [None]:
tiny_robot.pretty_print()

State 1
  Action A:
    Reward: 1
    Transition probabilities: (1, 0.3333333333333333) (2, 0.6666666666666666) 
  Action B:
    Reward: 4
    Transition probabilities: (2, 0.5) (4, 0.5) 
State 2
  Action A:
    Reward: 2
    Transition probabilities: (2, 0.3333333333333333) (3, 0.6666666666666666) 
  Action B:
    Reward: 3
    Transition probabilities: (1, 0.5) (3, 0.5) 
State 3
  Action A:
    Reward: 3
    Transition probabilities: (3, 0.3333333333333333) (4, 0.6666666666666666) 
  Action B:
    Reward: 2
    Transition probabilities: (2, 0.5) (4, 0.5) 
State 4
  Action A:
    Reward: 4
    Transition probabilities: (4, 0.3333333333333333) (1, 0.6666666666666666) 
  Action B:
    Reward: 1
    Transition probabilities: (1, 0.5) (3, 0.5) 


In [None]:
policy = {
    1: {'A': 1/2, 'B': 1/2},
    2: {'A': 1/4, 'B': 3/4},
    3: {'A': 2/3, 'B': 1/3},
    4: {'A':   0, 'B':   1}
}

In [None]:
value = tiny_robot.value_function(policy, gamma=0.8, iterations=1000)

In [None]:
for state_id, v in value.items():
    print(f'{state_id}: {v}')

1: 2.5
2: 3.5
3: 3.1333333333333333
4: 3.2533333333333334


In [None]:
import numpy as np


In [None]:
M = np.array([[1,2,3],[2,-1,4],[-3,3,2]])
M

array([[ 1,  2,  3],
       [ 2, -1,  4],
       [-3,  3,  2]])

In [None]:
b = np.array([1,3,2]).reshape(3,1)
b

array([[1],
       [3],
       [2]])

In [None]:
v = np.linalg.inv(M) @ b
v

array([[-0.62162162],
       [-0.56756757],
       [ 0.91891892]])

In [None]:
M @ v

array([[1.],
       [3.],
       [2.]])

In [None]:
gw = MDP()
n = 4
for i in range(n):
  for j in range(n):
    gw.add_state((i,j))

In [None]:
# Interior cells
for i in range(1, n - 1):
  for j in range(1, n - 1):
    gw.add_action((i, j), 'U', -1, {(i - 1, j): 1})
    gw.add_action((i, j), 'L', -1, {(i, j - 1): 1})
    gw.add_action((i, j), 'D', -1, {(i + 1, j): 1})
    gw.add_action((i, j), 'R', -1, {(i, j + 1): 1})
# Top and bottom borders, not corners
for j in range(1, n - 1):
  gw.add_action((0, j), 'U', -1, {(0, j): 1})
  gw.add_action((0, j), 'L', -1, {(0, j - 1): 1})
  gw.add_action((0, j), 'D', -1, {(1, j): 1})
  gw.add_action((0, j), 'R', -1, {(0, j + 1): 1})
  gw.add_action((n - 1, j), 'U', -1, {(n - 1, j): 1})
  gw.add_action((n - 1, j), 'L', -1, {(n - 1, j - 1): 1})
  gw.add_action((n - 1, j), 'D', -1, {(n - 1, j): 1})
  gw.add_action((n - 1, j), 'R', -1, {(n - 1, j + 1): 1})
# Right and left borders, not corners
for i in range(1, n - 1):
  gw.add_action((i, 0), 'U', -1, {(i - 1, 0): 1})
  gw.add_action((i, 0), 'L', -1, {(i, 0): 1})
  gw.add_action((i, 0), 'D', -1, {(i + 1, 0): 1})
  gw.add_action((i, 0), 'R', -1, {(i, 1): 1})
  gw.add_action((i, n - 1), 'U', -1, {(i - 1, n - 1): 1})
  gw.add_action((i, n - 1), 'L', -1, {(i, n - 2): 1})
  gw.add_action((i, n - 1), 'D', -1, {(i, n - 1): 1})
  gw.add_action((i, n - 1), 'R', -1, {(i, n - 1): 1})
# Corners
gw.add_action((0, 0), 'U', 0, {(0, 0): 1})
gw.add_action((0, 0), 'L', 0, {(0, 0): 1})
gw.add_action((0, 0), 'D', 0, {(0, 0): 1})
gw.add_action((0, 0), 'R', 0, {(0, 1): 1})
gw.add_action((n - 1, n - 1), 'U', 0, {(n - 1, n - 1): 1})
gw.add_action((n - 1, n - 1), 'L', 0, {(n - 1, n - 1): 1})
gw.add_action((n - 1, n - 1), 'D', 0, {(n - 1, n - 1): 1})
gw.add_action((n - 1, n - 1), 'R', 0, {(n - 1, n - 1): 1})
gw.add_action((0, n - 1), 'U', -1, {(0, n - 1): 1})
gw.add_action((0, n - 1), 'L', -1, {(0, n - 2): 1})
gw.add_action((0, n - 1), 'D', -1, {(1, n - 1): 1})
gw.add_action((0, n - 1), 'R', -1, {(0, n - 1): 1})
gw.add_action((n - 1, 0), 'U', -1, {(n - 2, 0): 1})
gw.add_action((n - 1, 0), 'L', -1, {(n - 1, 0): 1})
gw.add_action((n - 1, 0), 'D', -1, {(n - 1, 0): 1})
gw.add_action((n - 1, 0), 'R', -1, {(n - 1, 1): 1})

Let's now print all the information about the MDP. This also demonstrates how to access the elements defining the structure of the chain.

In [None]:
for state_id, actions in gw.states.items():
  print(f'State {state_id}:')
  state_info = gw.states[state_id]
  for action_id, action_info in state_info.items():
    print(f'  Action {action_id}: Reward: {action_info["reward"]}')
    for next_state_id, tprob in action_info['tprobs'].items():
      print(f'    {next_state_id}: {tprob}')

Let's now see how to specify a policy. A (randomized) policy assigns to each state a probability distribution on the set of actions avaiable at that state. As an example, let's consider the policy that assigns equal probabilities to each of the actions associated to a state. In this case, there are 4 actions possible at each state. So we can use the following code to set up a policy:

In [None]:
action_probs = {'U': 1/4, 'L': 1/4, 'D':1/4, 'R': 1/4}
policy = dict()
for i in range(n):
  for j in range(n):
    policy[(i,j)] = action_probs

In [None]:
value = gw.value_function(policy, 1, {(0,0), (n-1, n-1)}, 5000)
for state_id, v in value.items():
  print(f'value)

{(0, 0): 0.0, (0, 1): -8.231134329204314, (0, 2): -11.614077205594233, (0, 3): -11.631948674342972, (1, 0): -4.792491442094327, (1, 1): -9.079325782018712, (1, 2): -10.979148613235413, (1, 3): -10.176209877960307, (2, 0): -5.840639986358598, (2, 1): -8.314528743540793, (2, 2): -9.046981587368405, (2, 3): -8.852809166144027, (3, 0): -4.80741689927569, (3, 1): -5.291167618417454, (3, 2): -4.041439826553388, (3, 3): 0.0}


We can use the following MDP as a test case. The state space is $\mathscr{S}=\{1,2,3,4\}$. In each state, there are two possible actions, labeled $A$ and $B$. The following tables specify the transition probabilities and rewards for each action:



In [None]:
example_mdp = MDP()
example_mdp.add_state(1)
example_mdp.add_state(2)
example_mdp.add_state(3)
example_mdp.add_state(4)

In [None]:
example_mdp.add_action(1, 'A', 1, {1: 1/3, 2})

In [None]:
  def value_function(self, policy, discount_factor  , iterations=1000):
    value = dict(zip(self.states, len(self.states) * [0.0]))
    for n in range(iterations):
      for state_id, state_info in self.states.items():
        if state_id in absorbing_states:
          value[state_id] = 0.0
          continue
        acc = 0
        for action_id, tprob in policy[state_id].items():
          acc += (state_info[action_id]['reward'] +
                 discount_factor * 
                  sum(prob * value[next_state_id] 
                      for next_state_id, prob
                      in state_info[action_id]['tprobs'].items()))
          value[state_id] = tprob * acc
    return value 

In [None]:
n = 4  # Grid size
# We initialize the grid to "empty" cells
states = [[None for j in range(n)] for i in range(n)]
states

[[None, None, None, None],
 [None, None, None, None],
 [None, None, None, None],
 [None, None, None, None]]

In [None]:
# Inner cells
for i in range(1, n-1):
  for j in range(1, n-1):
    states[i][j] = {
        'N': {'reward': -1, 'tprobs': {(i-1, j): 1, (i-1, j): 0, (i, j+1): 0, (i+1, j): 0}},
        'W': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 1, (i, j+1): 0, (i+1, j): 0}},
        'S': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 1, (i+1, j): 0}},
        'E': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 0, (i+1, j): 1}},
    }
# Edge cells, not corners
for j in range(1, n-1):
  # North edge
  states[0][j] = {
      'W': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 1, (i, j+1): 0, (i+1, j): 0}},
      'S': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 1, (i+1, j): 0}},
      'E': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 0, (i+1, j): 1}},
  }
  # West edge
  states[j][0] = {
      'N': {'reward': -1, 'tprobs': {(i-1, j): 1, (i-1, j): 0, (i, j+1): 0, (i+1, j): 0}},
      'W': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 1, (i, j+1): 0, (i+1, j): 0}},
      'S': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 1, (i+1, j): 0}},
  }
  # South edge 
  states[n-1][j] = {
      'N': {'reward': -1, 'tprobs': {(i-1, j): 1, (i-1, j): 0, (i, j+1): 0, (i+1, j): 0}},
      'W': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 1, (i, j+1): 0, (i+1, j): 0}},
      'E': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 0, (i+1, j): 1}},
  }
  # East edge
  states[j][n-1] = {
      'N': {'reward': -1, 'tprobs': {(i-1, j): 1, (i-1, j): 0, (i, j+1): 0, (i+1, j): 0}},
      'W': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 1, (i, j+1): 0, (i+1, j): 0}},
      'S': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 1, (i+1, j): 0}},
  }
# Nortwest corner (terminal state)
states[0][0] = {
    'T': {'reward': 0, 'tprobs': {(0,0): 1}}
}
# Southwest corner
states[n-1][0] = {
    'N': {'reward': -1, 'tprobs': {(i-1, j): 1, (i-1, j): 0, (i, j+1): 0, (i+1, j): 0}},
    'E': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 0, (i+1, j): 1}},
}
# Southeast corner (terminal state)
states[n-1][n-1] = {
    'T': {'reward': 0, 'tprobs': {(0,0): 1}}
}
# Northeast corner 
states[0][n-1] = {
    'W': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 1, (i, j+1): 0, (i+1, j):0}},
    'S': {'reward': -1, 'tprobs': {(i-1, j): 0, (i-1, j): 0, (i, j+1): 1, (i+1, j):0}},
}

A randomized policy $\pi$ associates to each state a probability distribution on the set of all actions available at that state. Let's first consider the policy that assumes all actions at each state are equally likely:

In [None]:
policy = [[None for j in range(n)] for i in range(n)]
# Inner cells
for i in range(0, n):
  for j in range(0,n):
    p = 1 / len(states[i][j])
    policy[i][j] = dict()
    for key in states[i][j]:
      policy[i][j][key] = p

Let's now compute the value $V_{\pi}$ of this policy. We first derive a mathematical equation and then show how to implement its solution in Python. Suppose that the process currently is at state $s$. We then choose an action according to the probability distribution $\pi(\cdot|s)$. Let $a$ be the resulting action. Then, we receive a reward $r(s,a)$. So, the expected immediate reward when in state $s$ is:
$$
\sum_{a\in A(s)}\pi(a|s)r(s,a)
$$
Then the process transitions to a new state according to the probability distribution $P(\cdot|s,a)$. The expected future cost is:
$$
\sum_{a\in A(s)}\pi(a|s)\sum_{u}P(u|s,a)V_{\pi}(u)
$$
We conclude that the system equations for $V_{\pi}(\cdot)$ is:
$$
V_{\pi}(s) = \sum_{a \in A(s)}\pi(a|s)\left[r(s,a)+\sum_{u}P(u|s,a)V_{\pi}(u)\right]
$$
Notice that this is a linear system on the unknown value function $V_{\pi}(\cdot)$. Instead of using a standard method (such as Gaussian Elimination), it use use an interactive method to compute the solution. We use the Gauss-Seidel method, which has a particularly simple implementation in this case.

We choose an initial approximation for the value function, $V_{\pi}(s)$, by choosing random values or simply using zeros. Then, we iterate the formula for $V_{\pi}$ given above. This is implemented in the following code. 


In [None]:
niter = 100
vfunc = [[0.0 for j in range(n)] for i in range(n)]
for _ in range(niter):
  for i in range(n):
    for j in range(n):
      for a, p in policy[i][j].items():
        acc = states[i][j][a]['reward']
        for s, q in states[i][j][a]['tprobs'].items():
          k, l = s
          acc += q * vfunc[k][l]
        vfunc[i][j] = p * acc

In [None]:
for i in range(n):
  for j in range(n):
    print(f"{vfunc[i][j]:8.5}", end=' ')
  print()

     0.0 -0.49417 -0.49417 -0.74126 
-0.45688 -0.34266 -0.34266 -0.45688 
-0.49417 -0.37063 -0.37063 -0.48252 
-0.74126 -0.48252 -0.48252      0.0 


In [None]:
for key, item in d.items():
  print(key, item)

NameError: ignored

The following code is to check the result

In [None]:
n = 4
v = [[0.0 for i in range(n)] for j in range(n)]
for _ in range(30):
  # Inner cells 
  for i in range(1, n-1):
    for j in range(1, n-1):
      v[i][j] = -1 + (1/4) * (v[i-1][j] + v[i][j-1] + v[i+1,j] + v[i][j+1])
  # Top and bottom rows
    for j in range(1, n-1):
      v[j][0] = -1 + (1/3) * (v[])