In [1]:
import numpy as np

In [3]:
class GridWorld:
    def __init__(self, N, obstacles, goal):
        self.N = N
        self.obstacles = obstacles
        self.goal = goal
        self.actions = [(-1, 0), (1, 0), (0, -1), (0, 1)]
        self.gamma = 0.9
        self.theta = 1e-4
        self.reward_default = -1
        self.reward_goal = 100
        self.reward_obstacle = -5
        self.policy = np.zeros((N, N, 4)) + 0.25
        self.value_function = np.zeros((N, N))

    def is_valid(self, x, y):
        return 0 <= x < self.N and 0 <= y < self.N and (x, y) not in self.obstacles

    def value_iteration(self):
        while True:
            delta = 0
            new_value_function = np.copy(self.value_function)
            for i in range(self.N):
                for j in range(self.N):
                    if (i, j) == self.goal:
                        continue
                    v = []
                    for a, (dx, dy) in enumerate(self.actions):
                        nx, ny = i + dx, j + dy
                        if not self.is_valid(nx, ny):
                            reward = self.reward_obstacle
                            nx, ny = i, j
                        else:
                            reward = self.reward_goal if (nx, ny) == self.goal else self.reward_default
                        v.append(reward + self.gamma * self.value_function[nx, ny])
                    new_value_function[i, j] = max(v)
                    delta = max(delta, abs(self.value_function[i, j] - new_value_function[i, j]))
            self.value_function = new_value_function
            if delta < self.theta:
                break

    def policy_iteration(self):
        policy_stable = False
        while not policy_stable:
            self.policy_evaluation()
            policy_stable = self.policy_improvement()

    def policy_evaluation(self):
        while True:
            delta = 0
            new_value_function = np.copy(self.value_function)
            for i in range(self.N):
                for j in range(self.N):
                    if (i, j) == self.goal:
                        continue
                    v = 0
                    for a, (dx, dy) in enumerate(self.actions):
                        nx, ny = i + dx, j + dy
                        if not self.is_valid(nx, ny):
                            reward = self.reward_obstacle
                            nx, ny = i, j
                        else:
                            reward = self.reward_goal if (nx, ny) == self.goal else self.reward_default
                        v += self.policy[i, j, a] * (reward + self.gamma * self.value_function[nx, ny])
                    new_value_function[i, j] = v
                    delta = max(delta, abs(self.value_function[i, j] - new_value_function[i, j]))
            self.value_function = new_value_function
            if delta < self.theta:
                break

    def policy_improvement(self):
        policy_stable = True
        new_policy = np.zeros_like(self.policy)
        for i in range(self.N):
            for j in range(self.N):
                if (i, j) == self.goal:
                    continue
                values = []
                for a, (dx, dy) in enumerate(self.actions):
                    nx, ny = i + dx, j + dy
                    if not self.is_valid(nx, ny):
                        reward = self.reward_obstacle
                        nx, ny = i, j
                    else:
                        reward = self.reward_goal if (nx, ny) == self.goal else self.reward_default
                    values.append(reward + self.gamma * self.value_function[nx, ny])
                best_action = np.argmax(values)
                new_policy[i, j, :] = 0
                new_policy[i, j, best_action] = 1
                if not np.array_equal(self.policy[i, j], new_policy[i, j]):
                    policy_stable = False
        self.policy = new_policy
        return policy_stable

In [4]:
N = 5
obstacles = {(1, 1), (2, 2), (3, 3)}
goal = (4, 4)
gw = GridWorld(N, obstacles, goal)
print("Performing Value Iteration...")
gw.value_iteration()
print("Optimal Value Function:")
print(gw.value_function)
print("Performing Policy Iteration...")
gw.policy_iteration()
print("Optimal Policy (Probability of Actions at Each State):")
print(gw.policy)

Performing Value Iteration...
Optimal Value Function:
[[ 42.612659  48.45851   54.9539    62.171     70.19    ]
 [ 48.45851   54.9539    62.171     70.19      79.1     ]
 [ 54.9539    62.171     70.19      79.1       89.      ]
 [ 62.171     70.19      79.1       89.       100.      ]
 [ 70.19      79.1       89.       100.         0.      ]]
Performing Policy Iteration...
Optimal Policy (Probability of Actions at Each State):
[[[0. 1. 0. 0.]
  [0. 0. 0. 1.]
  [0. 1. 0. 0.]
  [0. 1. 0. 0.]
  [0. 1. 0. 0.]]

 [[0. 1. 0. 0.]
  [0. 1. 0. 0.]
  [0. 0. 0. 1.]
  [0. 1. 0. 0.]
  [0. 1. 0. 0.]]

 [[0. 1. 0. 0.]
  [0. 1. 0. 0.]
  [0. 1. 0. 0.]
  [0. 0. 0. 1.]
  [0. 1. 0. 0.]]

 [[0. 1. 0. 0.]
  [0. 1. 0. 0.]
  [0. 1. 0. 0.]
  [0. 1. 0. 0.]
  [0. 1. 0. 0.]]

 [[0. 0. 0. 1.]
  [0. 0. 0. 1.]
  [0. 0. 0. 1.]
  [0. 0. 0. 1.]
  [0. 0. 0. 0.]]]
