In [1]:
import numpy as np

# LineWorld MDP

In [2]:
S = [0, 1, 2, 3, 4] # agent position in line world
A = [0, 1] # 0: Left, 1: Right
R = [-1.0, 0.0, 1.0]
p = np.zeros((len(S), len(A), len(S), len(R))) # state, action, next_state, reward_index
T = [0, 4]

p[3, 0, 2, 1] = 1.0
p[2, 0, 1, 1] = 1.0
p[1, 0, 0, 0] = 1.0

p[3, 1, 4, 2] = 1.0
p[2, 1, 3, 1] = 1.0
p[1, 1, 2, 1] = 1.0

# Iterative Policy Evaluation

In [3]:
from typing import List

In [4]:
def iterative_policy_evaluation(
    pi: np.ndarray,
    S: List[int],
    A: List[int],
    R: List[float],
    T: List[int], # terminal states
    p: np.ndarray,
    theta: float = 0.00001,
    gamma: float = 0.9999999,
):
  V = np.random.random((len(S),))
  V[T] = 0.0

  while True:
    delta = 0.0

    for s in S:
      v = V[s]
      total = 0.0
      for a in A:
        sub_total = 0.0
        for s_p in S:
          for r_index in range(len(R)):
            r = R[r_index]
            sub_total += p[s, a, s_p, r_index] * (r + gamma * V[s_p])
        total += pi[s, a] * sub_total
      V[s] = total
      abs_diff = np.abs(v - V[s])
      delta = np.maximum(delta, abs_diff)

    if delta < theta:
      break
  return V




In [5]:
pi_always_right = np.zeros((len(S), len(A)))
pi_always_right[:, 1] = 1.0

In [6]:
iterative_policy_evaluation(pi_always_right, S, A, R, T, p)

array([0.       , 0.9999998, 0.9999999, 1.       , 0.       ])

In [7]:
pi_always_left = np.zeros((len(S), len(A)))
pi_always_left[:, 0] = 1.0

In [8]:
iterative_policy_evaluation(pi_always_left, S, A, R, T, p)

array([ 0.       , -1.       , -0.9999999, -0.9999998,  0.       ])

In [9]:
pi_uniform_random = np.ones((len(S), len(A))) * 0.5

In [10]:
iterative_policy_evaluation(pi_uniform_random, S, A, R, T, p)

array([ 0.00000000e+00, -4.99993279e-01,  6.72123684e-06,  5.00003361e-01,
        0.00000000e+00])

In [11]:
pi_weird_random = np.zeros((len(S), len(A)))
pi_weird_random[:, 1] = 0.7
pi_weird_random[:, 0] = 0.3

In [12]:
iterative_policy_evaluation(pi_weird_random, S, A, R, T, p)

array([0.        , 0.18275208, 0.68965118, 0.90689533, 0.        ])

In [13]:
(0.7 * (0 + 0.9999999 * 0.68965317) + 0.3 * (-1 + 0.99999999 * 0.0))

0.1827571707242781

# Policy Iteration

In [14]:
def policy_iteration(
    S: List[int],
    A: List[int],
    R: List[int],
    T: List[int],
    p: np.ndarray,
    theta: float = 0.00001,
    gamma: float = 0.999999,
):
  V = np.random.random((len(S),))
  V[T] = 0.0
  pi = np.array([np.random.choice(A) for s in S])
  pi[T] = 0

  while True:

    # Policy Evaluation
    while True:
      delta = 0.0

      for s in S:
        v = V[s]
        total = 0.0
        for s_p in S:
          for r_index in range(len(R)):
            r = R[r_index]
            total += p[s, pi[s], s_p, r_index] * (r + gamma * V[s_p])
        V[s] = total
        abs_diff = np.abs(v - V[s])
        delta = np.maximum(delta, abs_diff)

      if delta < theta:
        break

    # Policy Improvement

    policy_stable = True
    for s in S:
      old_action = pi[s]
      best_a = None
      best_a_score = -999999999.99999
      for a in A:
        score = 0.0
        for s_p in S:
          for r_index in range(len(R)):
            r = R[r_index]
            score += p[s, a, s_p, r_index] * (r + gamma * V[s_p])
        if best_a is None or score > best_a_score:
          best_a = a
          best_a_score = score
      if best_a != old_action:
        policy_stable = False
      pi[s] = best_a

    if policy_stable:
      break

  return pi, V

In [15]:
policy_iteration(S, A, R, T, p)

(array([0, 1, 1, 1, 0]),
 array([0.      , 0.999998, 0.999999, 1.      , 0.      ]))

In [16]:
class MonteCarloEnv:

  def num_states(self) -> int:
    raise NotImplementedError()

  def num_actions(self) -> int:
    raise NotImplementedError()

  def step(self, a: int):
    raise NotImplementedError()

  def score(self) -> float:
    raise NotImplementedError()

  def is_game_over(self) -> bool:
    raise NotImplementedError()

  def reset(self):
    raise NotImplementedError()



# Line World for Monte Carlo algorithms

In [17]:
class LineWorld(MonteCarloEnv):
  def __init__(self):
    self.s = 2
    self.inner_score = 0.0

  def num_states(self) -> int:
    return 5

  def num_actions(self) -> int:
    return 2

  def state(self) -> int:
    return self.s

  def step(self, a: int):
    assert(a == 1 or a == 0)
    if self.is_game_over():
      raise Exception("Youpi")

    if a == 0:
      self.s -= 1
    else:
      self.s += 1

    if self.s == 0:
      self.inner_score -= 1.0
    if self.s == 4:
      self.inner_score += 1.0

  def score(self) -> float:
    return self.inner_score

  def is_game_over(self) -> bool:
    return self.s == 0 or self.s == 4

  def reset(self):
    self.s = 2
    self.inner_score = 0.0


In [18]:
from tqdm import tqdm

In [19]:
def first_visit_monte_carlo_prediction(
    pi: np.ndarray,
    env: MonteCarloEnv,
    iterations_count: int,
    gamma: float = 0.9999999,
):
  V = np.random.random((env.num_states(),))
  Returns = [[] for s in range(env.num_states())]

  all_actions = np.arange(env.num_actions())

  for it in tqdm(range(iterations_count)):
    env.reset()


    trajectory_states = []
    trajectory_actions = []
    trajectory_rewards = []

    while not env.is_game_over():
      s = env.state()
      a = np.random.choice(all_actions, p=pi[s])

      prev_score = env.score()
      env.step(a)
      r = env.score() - prev_score

      trajectory_states.append(s)
      trajectory_actions.append(a)
      trajectory_rewards.append(r)

    terminal_state = env.state()
    V[terminal_state] = 0.0

    G = 0

    for t in reversed(range(len(trajectory_states))):
      s_t = trajectory_states[t]
      a_t = trajectory_actions[t]
      r_t_plus_1 = trajectory_rewards[t]

      G = gamma * G + r_t_plus_1

      if s_t not in trajectory_states[0:t]:
        Returns[s_t].append(G)
        V[s_t] = np.mean(Returns[s_t])

  return V

In [20]:
def better_first_visit_monte_carlo_prediction(
    pi: np.ndarray,
    env: MonteCarloEnv,
    iterations_count: int,
    gamma: float = 0.9999999,
):
  V = np.random.random((env.num_states(),))
  Returns = [0.0 for s in range(env.num_states())]
  Returns_counts = [0 for s in range(env.num_states())]

  all_actions = np.arange(env.num_actions())

  trajectory_states = []
  trajectory_actions = []
  trajectory_rewards = []

  for it in tqdm(range(iterations_count)):
    env.reset()

    trajectory_states.clear()
    trajectory_actions.clear()
    trajectory_rewards.clear()

    while not env.is_game_over():
      s = env.state()
      a = np.random.choice(all_actions, p=pi[s])

      prev_score = env.score()
      env.step(a)
      r = env.score() - prev_score

      trajectory_states.append(s)
      trajectory_actions.append(a)
      trajectory_rewards.append(r)

    terminal_state = env.state()
    V[terminal_state] = 0.0

    G = 0

    for t in reversed(range(len(trajectory_states))):
      s_t = trajectory_states[t]
      a_t = trajectory_actions[t]
      r_t_plus_1 = trajectory_rewards[t]

      G = gamma * G + r_t_plus_1

      if s_t not in trajectory_states[0:t]:
        Returns[s_t] = (Returns[s_t] * Returns_counts[s_t] + G) / (Returns_counts[s_t] + 1)
        Returns_counts[s_t] += 1
        V[s_t] = Returns[s_t]

  return V

In [None]:
better_first_visit_monte_carlo_prediction(
    pi_uniform_random,
    LineWorld(),
    100_000,
)

 61%|██████    | 60554/100000 [00:16<00:07, 5412.82it/s]

In [None]:
better_first_visit_monte_carlo_prediction(
    pi_always_right,
    LineWorld(),
    100_000,
)

In [None]:
pi_do_not_evaluate_me = np.zeros((len(S), len(A)))
pi_do_not_evaluate_me[1, 1] = 1.0
pi_do_not_evaluate_me[2, 0] = 1.0
pi_do_not_evaluate_me[3, 1] = 1.0

In [None]:
iterative_policy_evaluation(
    pi_do_not_evaluate_me,
    S,
    A,
    R,
    T,
    p
)

In [None]:
## DO NOT RUN THIS OR IT WILL LOOP TO INFINITY !!!
# better_first_visit_monte_carlo_prediction(
#     pi_do_not_evaluate_me,
#     LineWorld(),
#     100_000,
# )

In [None]:
def on_policy_monte_carlo_control(
    env: MonteCarloEnv,
    iterations_count: int,
    gamma: float = 0.999999,
    epsilon: float = 0.1,
):
  pi = (1.0 / env.num_actions()) * np.ones((env.num_states(), env.num_actions()))
  Q = np.random.random((env.num_states(), env.num_actions()))
  Returns_counts = np.zeros((env.num_states(), env.num_actions()))

  trajectory_states = []
  trajectory_actions = []
  trajectory_rewards = []

  all_actions = np.arange(env.num_actions())

  for it in tqdm(range(iterations_count)):
    env.reset()

    trajectory_states.clear()
    trajectory_actions.clear()
    trajectory_rewards.clear()

    while not env.is_game_over():
      s = env.state()
      a = np.random.choice(all_actions, p=pi[s])

      prev_score = env.score()
      env.step(a)
      r = env.score() - prev_score

      trajectory_states.append(s)
      trajectory_actions.append(a)
      trajectory_rewards.append(r)

    terminal_state = env.state()
    Q[terminal_state, :] = 0.0

    G = 0

    for t in reversed(range(len(trajectory_states))):
      s_t = trajectory_states[t]
      a_t = trajectory_actions[t]
      r_t_plus_1 = trajectory_rewards[t]

      G = gamma * G + r_t_plus_1

      if (s_t, a_t) not in zip(trajectory_states[0:t], trajectory_actions[0: t]):
        Q[s_t, a_t] = (Q[s_t, a_t] * Returns_counts[s_t, a_t] + G) / (Returns_counts[s_t, a_t] + 1)
        Returns_counts[s_t] += 1
        best_a = np.argmax(Q[s_t])

        pi[s_t, :] = epsilon / env.num_actions()
        pi[s_t, best_a] = 1.0 - epsilon + epsilon / env.num_actions()
  return pi, Q

In [None]:
on_policy_monte_carlo_control(LineWorld(), 1_000_000)