In [8]:
import numpy as np

In [9]:
ACTION_SPACE = ('U', 'D', 'L', 'R')

In [10]:
class Grid:
  def __init__(self, rows, cols, start):
    self.rows = rows
    self.cols = cols
    self.i = start[0]
    self.j = start[1]

  def set(self, rewards, actions):
    # rewards : dict of (i, j) -> reward
    # actions : dist of (i, j) -> list of possible actions
    self.rewards = rewards
    self.actions = actions

  def set_state(self, s):
    self.i = s[0]
    self.j = s[1]

  def current_state(self):
    return (self.i, self.j)

  def is_terminal(self, s):
    return s not in self.actions

  def reset(self):
    # put agent back in start position
    self.i = 2
    self.j = 0
    return (self.i, self.j)

  def get_next_state(self, s, a):
    i, j = s[0], s[1]

    if a in self.actions[(i, j)]:
      if a == 'U':
        i -= 1
      elif a == 'D':
        i += 1
      elif a == 'L':
        j -= 1
      elif a == 'R':
        j += 1

    return i, j

  def move(self, action):
    if action in self.actions[(self.i, self.j)]:
      if action == 'U':
        self.i -= 1
      elif action == 'D':
        self.i += 1
      elif action == 'L':
        self.j -= 1
      elif action == 'R':
        self.j += 1

    return self.rewards.get((self.i, self.j), 0)

  def undo_move(self, action):
    if action == 'U':
      self.i += 1
    elif action == 'D':
      self.i -= 1
    elif action == 'L':
      self.j += 1
    elif action == 'R':
      self.j -= 1

  def game_over(self):
    return (self.i, self.j) not in self.actions

  def all_states(self):
    return set(self.actions.keys()) | set(self.rewards.keys())

In [11]:
def standard_grid():
  g = Grid(3, 4, (2, 0))
  rewards = {(0, 3): 1, (1, 3): -1}
  actions = {
      (0, 0): ('D', 'R'),
      (0, 1): ('L', 'R'),
      (0, 2): ('L', 'D', 'R'),
      (1, 0): ('U', 'D'),
      (1, 2): ('U', 'D', 'R'),
      (2, 0): ('U', 'R'),
      (2, 1): ('L', 'R'),
      (2, 2): ('L', 'R', 'U'),
      (2, 3): ('L', 'U'),
  }
  g.set(rewards, actions)
  return g



In [12]:
SMALL_ENOUGH = 1e-3
GAMMA = 0.9

In [18]:
def get_transition_probs_and_rewards(grid):
  # key -> (s, a, s')
  # value -> p(s' | s, a)

  transition_probs = {}

  # key -> (s, a, s')
  # value -> r(s, a, s')

  rewards = {}


  for i in range(grid.rows):
    for j in range(grid.cols):
      s = (i, j)
      if not grid.is_terminal(s):
        for a in ACTION_SPACE:
          s2 = grid.get_next_state(s, a)
          transition_probs[(s, a, s2)] = 1
          if s2 in grid.rewards:
            rewards[(s, a, s2)] = grid.rewards[s2]

  return transition_probs, rewards

In [22]:
def evaluate_deterministic_policy(grid, policy):
  # initialize V(s) = 0
  V = {}
  for s in grid.all_states():
    V[s] = 0

  # repeat till convergence
  it = 0
  while True:
    biggest_change = 0
    for s in grid.all_states():
      if not grid.is_terminal(s):
        old_v = V[s]
        new_v = 0 # will accumulate
        for a in ACTION_SPACE:
          for s2 in grid.all_states():
            # deterministic action policy
            action_prob = 1 if policy.get(s) == a else 0

            # reward is a function of (s, a, s'), 0 if not specified
            r = rewards.get((s, a, s2), 0)
            new_v += action_prob * transition_probs.get((s, a, s2), 0) * (r + GAMMA * V[s2])

        V[s] = new_v
        biggest_change = max(biggest_change, np.abs(old_v - V[s]))
    it += 1
    if biggest_change < SMALL_ENOUGH:
      break
  return V

In [16]:
from tabulate import tabulate

In [15]:
def print_values(V, g) :
  table = []
  for i in range(g.rows) :
    row = []
    for j in range(g.cols) :
      v = V.get((i, j), 0)
      row.append(v)
    table.append(row)
  print(tabulate(table, tablefmt="grid", floatfmt=".3f"))

def print_policy(P, g) :
  table = []
  for i in range(g.rows) :
    row = []
    for j in range(g.cols) :
      a = P.get((i, j), ' ')
      row.append(a)
    table.append(row)
  print(tabulate(table, tablefmt="grid"))

In [19]:
grid = standard_grid()
transition_probs, rewards = get_transition_probs_and_rewards(grid)

print("rewards\n")
print_values(grid.rewards, grid)

rewards

+---+---+---+----+
| 0 | 0 | 0 |  1 |
+---+---+---+----+
| 0 | 0 | 0 | -1 |
+---+---+---+----+
| 0 | 0 | 0 |  0 |
+---+---+---+----+


In [20]:
policy = {}
for s in grid.actions.keys():
  policy[s] = np.random.choice(ACTION_SPACE)

print("initial policy")
print_policy(policy, grid)

initial policy
+---+---+---+---+
| R | L | L |   |
+---+---+---+---+
| D |   | D |   |
+---+---+---+---+
| U | D | L | U |
+---+---+---+---+


In [23]:
# repeat until convergence
while True:

  # policy evaluation step
  V = evaluate_deterministic_policy(grid, policy)

  # policy improvement step
  is_policy_stable = True
  for s in grid.actions.keys():
    old_a = policy[s]
    new_a = None
    best_value = float('-inf')

    # finding the best action
    for a in ACTION_SPACE:
      v = 0
      for s2 in grid.all_states():
        r = rewards.get((s, a, s2), 0)
        v += transition_probs.get((s, a, s2), 0) * (r + GAMMA * V[s2])

      if v > best_value:
        best_value = v
        new_a = a
    policy[s] = new_a
    if new_a != old_a:
      is_policy_stable = False
  if is_policy_stable:
    break

In [24]:
print("values:\n")
print_values(V, grid)
print("\npolicy:\n")
print_values(policy, grid)

values:

+-------+-------+-------+-------+
| 0.810 | 0.900 | 1.000 | 0.000 |
+-------+-------+-------+-------+
| 0.729 | 0.000 | 0.900 | 0.000 |
+-------+-------+-------+-------+
| 0.656 | 0.729 | 0.810 | 0.729 |
+-------+-------+-------+-------+

policy:

+---+---+---+---+
| R | R | R | 0 |
+---+---+---+---+
| U | 0 | U | 0 |
+---+---+---+---+
| U | R | U | L |
+---+---+---+---+
