<a href="https://colab.research.google.com/github/yashashwi-s/RLbook/blob/main/Policy_Value_iteration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import io
import sys
from gym import Env, spaces

In [None]:
#Actions

UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3

In [3]:
class GridworldEnv(Env):
  metadata = {'render.modes': ['human', 'ansi']}

  def __init__(self, shape=(4,4)):
    self.shape = shape
    self.nS = np.prod(shape)
    self.nA = 4

    self.max_y = shape[0]
    self.max_x = shape[1]

    self.action_space = spaces.Discrete(self.nA)
    self.observation_space = spaces.Discrete(self.nS)

    self.state = None
    self.start_state = self.nS // 2

    self.P = self._create_transition_probabilities()

    self.isd = np.ones(self.nS) / self.nS

  def _create_transition_probabilities(self):
    P = {}
    grid = np.arange(self.nS).reshape(self.shape)
    it = np.nditer(grid, flags=['multi_index'])

    while not it.finished:
      s = it.iterindex
      y, x = it.multi_index

      P[s] = {a: [] for a in range(self.nA)}
      is_done = lambda s: s == 0 or s == (self.nS - 1)
      reward = 0.0 if is_done(s) else -1.0

      if is_done(s):
        for a in range(self.nA):
            P[s][a] = [(1.0, s, reward, True)]
      else:
        ns_up = s if y == 0 else s - self.max_x
        ns_right = s if x == (self.max_x - 1) else s + 1
        ns_down = s if y == (self.max_y - 1) else s + self.max_x
        ns_left = s if x == 0 else s - 1

        P[s][UP] = [(1.0, ns_up, reward, is_done(ns_up))]
        P[s][RIGHT] = [(1.0, ns_right, reward, is_done(ns_right))]
        P[s][DOWN] = [(1.0, ns_down, reward, is_done(ns_down))]
        P[s][LEFT] = [(1.0, ns_left, reward, is_done(ns_left))]

      it.iternext()

    return P

  def reset(self):
    self.state = self.start_state
    return self.state

  def step(self, action):
    transitions = self.P[self.state][action]
    prob, next_state, reward, done = transitions[0]
    self.state = next_state
    return next_state, reward, done, {}

  def render(self, mode='human'):
    if mode not in self.metadata['render.modes']:
      raise ValueError("Invalid render mode: {}".format(mode))

    outfile = io.StringIO() if mode == 'ansi' else sys.stdout

    grid = np.arange(self.nS).reshape(self.shape)
    it = np.nditer(grid, flags=['multi_index'])
    while not it.finished:
      s = it.iterindex
      y, x = it.multi_index

      if self.state == s:
          output = " x "
      elif s == 0 or s == self.nS - 1:
          output = " T "
      else:
          output = " o "

      if x == 0:
          output = output.lstrip()
      if x == self.shape[1] - 1:
          output = output.rstrip()

      outfile.write(output)

      if x == self.shape[1] - 1:
          outfile.write("\n")

      it.iternext()

    if mode == 'ansi':
      return outfile.getvalue()

In [4]:
env = GridworldEnv()

In [5]:
#action estimation

def action_estimation(env, state, V, gamma):
  A = np.zeros(env.nA)
  for a in range (env.nA):
    for prob, next_state, reward, done in env.P[state][a]:
      A[a] += prob*(reward + gamma*V[next_state])

  return A

In [6]:
def get_best_action(A):
  best_Action = np.argmax(A)
  return best_Action, A[best_Action]

In [7]:
def policy_estimation(env, V, gamma):
  policy = np.zeros([env.nS, env.nA])

  while True:
    policy_stable = True
    for s in range(env.nS):
      A = action_estimation(env, s, V, gamma)
      chosen_a = np.argmax(policy[s])
      best_action, _ = get_best_action(A)

      if best_action != chosen_a:
        policy_stable = False
      policy[s] = np.eye(env.nA)[best_action]

    if policy_stable:
      return policy


In [8]:
def policy_iteration (env, policy_estimator= policy_estimation, theta = 0.0001, gamma = 1, max_iterations = 1000):
  V = np.zeros(env.nS)
  p = policy_estimator(env, V, gamma)
  found_optimal = False

  for iteration in range(max_iterations):
    delta = 0
    for s in range(env.nS):
      best_Action_Value = 0
      for a, action_prob in enumerate(p[s]):
        for prob, next_state, reward, done in env.P[s][a]:
          best_Action_Value += action_prob * prob * (reward + gamma*V[next_state])

      delta = max(delta ,np.abs(best_Action_Value - V[s]))
      V[s] = best_Action_Value

    if delta < theta:
      found_optimal = True
      break

    p = policy_estimator(env, V, gamma)
    print(p)

  return p,V

In [9]:
policy, v = policy_iteration(env)
print(policy)
print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
print(np.reshape(np.argmax(policy, axis=1), env.shape))
print("")

print("Value Function:")
print(v)
print("")

print("Reshaped Grid Value Function:")
print(v.reshape(env.shape))
print("")

[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]
[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]
[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]
[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]
Reshaped Grid Policy (0=up, 1=right,

In [11]:
def value_iteration(env, theta=0.0001, discount_factor=1.0, max_iterations=1000):
  V = np.zeros(env.nS)

  for iteration in range(max_iterations):
      # Stopping condition
      delta = 0

      # Update each state...
      for s in range(env.nS):
          # Do a one-step lookahead to find the best action
          A = action_estimation(env, s, V, discount_factor)

          best_action, best_action_value = get_best_action(A)

          # Calculate delta across all states seen so far
          delta = max(delta, np.abs(best_action_value - V[s]))

          # Update the value function. Ref: Sutton book eq. 4.10.
          V[s] = best_action_value

        # Check if we can stop
      if delta < theta:
        break

  policy = policy_estimation(env, V, discount_factor)

  return policy, V

In [12]:
policy, v = value_iteration(env)

print("Policy Probability Distribution:")
print(policy)
print("")

print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
print(np.reshape(np.argmax(policy, axis=1), env.shape))
print("")

print("Value Function:")
print(v)
print("")

print("Reshaped Grid Value Function:")
print(v.reshape(env.shape))
print("")

Policy Probability Distribution:
[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 0. 0. 1.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 0. 1. 0.]
 [1. 0. 0. 0.]
 [0. 1. 0. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]

Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):
[[0 3 3 2]
 [0 0 0 2]
 [0 0 1 2]
 [0 1 1 0]]

Value Function:
[ 0. -1. -2. -3. -1. -2. -3. -2. -2. -3. -2. -1. -3. -2. -1.  0.]

Reshaped Grid Value Function:
[[ 0. -1. -2. -3.]
 [-1. -2. -3. -2.]
 [-2. -3. -2. -1.]
 [-3. -2. -1.  0.]]

