In [19]:
from collections import defaultdict, namedtuple
from enum import Enum
from typing import Tuple, List
import random
from IPython.display import clear_output

Point = namedtuple('Point', ['x', 'y'])
class Direction(Enum):
  NORTH = "⬆"
  EAST = "⮕"
  SOUTH = "⬇"
  WEST = "⬅"

  def values(self):
    return [v for v in self]

In [20]:
class SimpleGridWorld(object):
  def __init__(self, width: int = 5, height: int = 5, debug: bool = False):
    self.width = width
    self.height = height
    self.debug = debug
    self.action_space = [d for d in Direction]
    self.reset()

In [21]:
class SimpleGridWorld(SimpleGridWorld):
  def reset(self):
    self.cur_pos = Point(x=0, y=(self.height - 1))
    self.goal = Point(x=(self.width - 1), y=0)
 
    if self.debug:
      print(self)
    return self.cur_pos, 0, False

In [22]:
class SimpleGridWorld(SimpleGridWorld):
  def step(self, action: Direction):
 
    if action == Direction.NORTH:
      self.cur_pos = Point(self.cur_pos.x, self.cur_pos.y + 1)
    elif action == Direction.EAST:
      self.cur_pos = Point(self.cur_pos.x + 1, self.cur_pos.y)
    elif action == Direction.SOUTH:
      self.cur_pos = Point(self.cur_pos.x, self.cur_pos.y - 1)
    elif action == Direction.WEST:
      self.cur_pos = Point(self.cur_pos.x - 1, self.cur_pos.y)
    # To Check if out of bounds
    if self.cur_pos.x >= self.width:
      self.cur_pos = Point(self.width - 1, self.cur_pos.y)
    if self.cur_pos.y >= self.height:
      self.cur_pos = Point(self.width, self.cur_pos.y - 1)
    if self.cur_pos.x < 0:
      self.cur_pos = Point(0, self.cur_pos.y)
    if self.cur_pos.y < 0:
      self.cur_pos = Point(self.cur_pos.x, 0)

    # If at goal, terminate
    is_terminal = self.cur_pos == self.goal

    reward = -1

    if self.debug:
      print(self)

    return self.cur_pos, reward, is_terminal

In [23]:
class SimpleGridWorld(SimpleGridWorld):
  def __repr__(self):
    res = ""
    for y in reversed(range(self.height)):
      for x in range(self.width):
        if self.goal.x == x and self.goal.y == y:
          if self.cur_pos.x == x and self.cur_pos.y == y:
            res += "@"
          else:
            res += "o"
          continue
        if self.cur_pos.x == x and self.cur_pos.y == y:
          res += "x"
        else:
          res += "_"
      res += "\n"
    return res

In [24]:
s = SimpleGridWorld(debug=True)
print("This shows a simple visualisation of the environment state.\n")
s.step(Direction.SOUTH)
print(s.step(Direction.SOUTH), "⬅ This displays the state and reward from the environment AFTER moving.\n")
s.step(Direction.SOUTH)
s.step(Direction.SOUTH)
s.step(Direction.EAST)
s.step(Direction.EAST)
s.step(Direction.EAST)
s.step(Direction.EAST)

x____
_____
_____
_____
____o

This shows a simple visualisation of the environment state.

_____
x____
_____
_____
____o

_____
_____
x____
_____
____o

(Point(x=0, y=2), -1, False) ⬅ This displays the state and reward from the environment AFTER moving.

_____
_____
_____
x____
____o

_____
_____
_____
_____
x___o

_____
_____
_____
_____
_x__o

_____
_____
_____
_____
__x_o

_____
_____
_____
_____
___xo

_____
_____
_____
_____
____@



(Point(x=4, y=0), -1, True)

In [25]:
class MonteCarloGeneration(object):
  def __init__(self, env: object, max_steps: int = 1000, debug: bool = False):
    self.env = env
    self.max_steps = max_steps
    self.debug = debug

  def run(self) -> List:
    buffer = []
    n_steps = 0 
    state, _, _ = self.env.reset() 
    terminal = False
    while not terminal: 
      action = random.choice(self.env.action_space) 
      next_state, reward, terminal = self.env.step(action)
      buffer.append((state, action, reward)) # Store the result
      state = next_state 
      n_steps += 1
      if n_steps >= self.max_steps:
        if self.debug:
          print("Terminated early due to large number of steps")
        terminal = True 
    return buffer

In [26]:
env = SimpleGridWorld(debug=True) # Instantiate the environment
generator = MonteCarloGeneration(env=env, max_steps=10, debug=True) # Instantiate the generation
trajectory = generator.run() # Generate a trajectory
print([t[1].value for t in trajectory]) 
print(f"total reward: {sum([t[2] for t in trajectory])}") 

x____
_____
_____
_____
____o

x____
_____
_____
_____
____o

_x___
_____
_____
_____
____o

__x__
_____
_____
_____
____o

___x_
_____
_____
_____
____o

_____
_____
_____
_____
____o

____x
_____
_____
_____
____o

___x_
_____
_____
_____
____o

_____
___x_
_____
_____
____o

_____
_____
___x_
_____
____o

_____
_____
_____
___x_
____o

_____
_____
_____
_____
___xo

Terminated early due to large number of steps
['⮕', '⮕', '⮕', '⬆', '⬅', '⬅', '⬇', '⬇', '⬇', '⬇']
total reward: -10


In [27]:
class MonteCarloExperiment(object):
  def __init__(self, generator: MonteCarloGeneration):
    self.generator = generator
    self.values = defaultdict(float)
    self.counts = defaultdict(float)

  def _to_key(self, state, action):
    return (state, action)
  
  def action_value(self, state, action) -> float:
    key = self._to_key(state, action)
    if self.counts[key] > 0:
      return self.values[key] / self.counts[key]
    else:
      return 0.0

In [28]:
class MonteCarloExperiment(MonteCarloExperiment):
  def run_episode(self) -> None:
    trajectory = self.generator.run() # Generate a trajectory
    episode_reward = 0
    for i, t in enumerate(reversed(trajectory)): 
      state, action, reward = t
      key = self._to_key(state, action)
      episode_reward += reward  
      self.values[key] += episode_reward 
      self.counts[key] += 1 

In [29]:
env = SimpleGridWorld(debug=False) # Instantiate the environment 
generator = MonteCarloGeneration(env=env, debug=True) # Instantiate the trajectory generator
agent = MonteCarloExperiment(generator=generator)
for i in range(4):
  agent.run_episode()
  print(f"Run {i}: ", [agent.action_value(Point(3,0), d) for d in env.action_space])

Run 0:  [0.0, 0.0, 0.0, 0.0]
Run 1:  [0.0, -1.0, 0.0, 0.0]
Run 2:  [0.0, -1.0, 0.0, 0.0]
Run 3:  [-18.0, -1.0, 0.0, 0.0]


In [30]:
def state_value_2d(env, agent):
    res = ""
    for y in reversed(range(env.height)):
      for x in range(env.width):
        if env.goal.x == x and env.goal.y == y:
          res += "   @  "
        else:
          state_value = sum([agent.action_value(Point(x,y), d) for d in env.action_space]) / len(env.action_space)
          res += f'{state_value:6.2f}'
        res += " | "
      res += "\n"
    return res

print(state_value_2d(env, agent))

-51.63 | -38.54 | -47.12 | -32.69 | -31.34 | 
-10.12 | -39.06 | -52.67 | -44.20 | -21.04 | 
-14.00 | -29.38 | -36.90 | -27.25 | -18.00 | 
-22.00 | -31.67 | -32.10 | -11.58 | -11.62 | 
 -8.00 | -30.92 | -23.92 |  -4.75 |    @   | 



In [31]:
env = SimpleGridWorld() # Instantiate the environment
generator = MonteCarloGeneration(env=env) # Instantiate the trajectory generator
agent = MonteCarloExperiment(generator=generator)
for i in range(1000):
  clear_output(wait=True)
  agent.run_episode()
  print(f"Iteration: {i}")
  
  print(state_value_2d(env, agent), flush=True)
 

Iteration: 999
-80.98 | -82.68 | -80.28 | -78.81 | -79.68 | 
-81.56 | -78.58 | -77.76 | -76.40 | -73.78 | 
-82.23 | -78.90 | -75.16 | -68.34 | -61.94 | 
-82.77 | -76.54 | -67.35 | -54.34 | -40.21 | 
-76.42 | -71.21 | -61.99 | -39.14 |    @   | 



In [32]:
def argmax(a):
    return max(range(len(a)), key=lambda x: a[x])

def next_best_value_2d(env, agent):
    res = ""
    for y in reversed(range(env.height)):
      for x in range(env.width):
        if env.goal.x == x and env.goal.y == y:
          res += "@"
        else:
          # Find the action that has the highest value
          loc = argmax([agent.action_value(Point(x,y), d) for d in env.action_space])
          res += f'{env.action_space[loc].value}'
        res += " | "
      res += "\n"
    return res

print(next_best_value_2d(env, agent))

⬆ | ⮕ | ⮕ | ⬇ | ⬇ | 
⮕ | ⮕ | ⬇ | ⬇ | ⬇ | 
⮕ | ⮕ | ⮕ | ⬇ | ⬇ | 
⮕ | ⮕ | ⮕ | ⬇ | ⬇ | 
⮕ | ⮕ | ⮕ | ⮕ | @ | 



In [33]:
env = SimpleGridWorld() # Instantiate the environment
generator = MonteCarloGeneration(env=env) # Instantiate the trajectory generator
agent = MonteCarloExperiment(generator=generator)
for i in range(1000):
  clear_output(wait=True)
  agent.run_episode()
  print(f"Iteration: {i}")
  print(state_value_2d(env, agent))
  print(next_best_value_2d(env, agent), flush=True)

Iteration: 999
-85.63 | -84.67 | -82.89 | -81.27 | -80.26 | 
-88.72 | -87.74 | -83.96 | -79.52 | -75.95 | 
-87.54 | -85.27 | -77.64 | -71.53 | -66.30 | 
-86.01 | -78.17 | -69.30 | -57.46 | -44.02 | 
-87.50 | -77.37 | -64.31 | -40.65 |    @   | 

⬆ | ⮕ | ⬆ | ⬆ | ⬇ | 
⮕ | ⬆ | ⬇ | ⬇ | ⬇ | 
⮕ | ⬇ | ⬇ | ⬇ | ⬇ | 
⮕ | ⮕ | ⮕ | ⬇ | ⬇ | 
⮕ | ⮕ | ⮕ | ⮕ | @ | 

