<a href="https://colab.research.google.com/github/mohamedhassan279/Markov-Decision-Process/blob/main/Markov_Decision_Process.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [200]:
from enum import Enum
import copy
import numpy as np

world = np.array([[0, -1, 10], [-1, -1, -1], [-1, -1, -1]], dtype=np.float64)
gamma = 0.8
epsilon = 1e-9
success_prop = 0.8
fail_prop = 0.1

In [201]:
class Direction(Enum):
    STAY = (0, 0)
    UP = (-1, 0)
    DOWN = (1, 0)
    LEFT = (0, -1)
    RIGHT = (0, 1)

In [202]:
def isEnd(i, j):
    return (i == 0 and j == 2) or (i == 0 and j == 0)


def isWall(i, j):
    return i < 0 or i > 2 or j < 0 or j > 2

In [203]:
def get_successor(i, j, direction):
    successors = []
    if not isWall(i + direction.value[0], j + direction.value[1]):
        successors.append((i + direction.value[0], j + direction.value[1], success_prop))
    if not isWall(i + direction.value[1], j + direction.value[0]):
        successors.append((i + direction.value[1], j + direction.value[0], fail_prop))
    if not isWall(i - direction.value[1], j - direction.value[0]):
        successors.append((i - direction.value[1], j - direction.value[0], fail_prop))
    return successors


def reward(i, j):
    return world[i][j]

In [204]:
def value_iteration():
  V = np.zeros_like(world, dtype=np.float64)
  optimal_policy = np.array([[Direction.STAY for _ in range(3)] for _ in range(3)])
  while True:
    delta = 0  # delta = max(|v - v'|)
    for i in range(3):
      for j in range(3):
        if isEnd(i, j):
          continue
        v = V[i][j]
        max_value = float('-inf')
        for direction in Direction:
            successors = get_successor(i, j, direction)
            value = 0
            for x, y, prop in successors:
                value += prop * (reward(x, y) + gamma * V[x][y])

            if value > max_value:
                max_value = value
                optimal_policy[i][j] = direction
        V[i][j] = max_value
        delta = max(delta, float(abs(v - V[i][j])))
    if delta < epsilon:
      break
  return V, optimal_policy

In [205]:
def print_policy(policy):
  for i in range(3):
    for j in range(3):
      if policy[i][j] == Direction.UP:
          print('↑', end=' ')
      elif policy[i][j] == Direction.DOWN:
          print('↓', end=' ')
      elif policy[i][j] == Direction.LEFT:
          print('←', end=' ')
      elif policy[i][j] == Direction.RIGHT:
          print('→', end=' ')
      else:
        print('E', end=' ')
    print()

In [206]:
for r in [100, 3, 0, -3]:
    world[0][0] = r
    optimal_values, optimal_policy = value_iteration()
    print('r = ' + str(r))
    print("optimal values:-")
    print(optimal_values)
    print()
    print("optimal policy:-")
    print_policy(optimal_policy)
    print('----------------------------------------------------------------------------------')

r = 100
optimal values:-
[[ 0.         85.01888433  0.        ]
 [85.01888433 63.98605414 43.67295431]
 [57.28454339 47.15571778 32.77349572]]

optimal policy:-
E ← E 
↑ ← ← 
↑ ↑ ← 
----------------------------------------------------------------------------------
r = 3
optimal values:-
[[0.         8.31716852 0.        ]
 [2.82806385 5.21460644 8.31716852]
 [1.1339466  2.79982174 4.64697359]]

optimal policy:-
E → E 
→ ↑ ↑ 
↑ ↑ ↑ 
----------------------------------------------------------------------------------
r = 0
optimal values:-
[[0.         8.31694028 0.        ]
 [2.52274391 5.21175346 8.31694028]
 [1.09027121 2.79445577 4.64639824]]

optimal policy:-
E → E 
→ → ↑ 
→ ↑ ↑ 
----------------------------------------------------------------------------------
r = -3
optimal values:-
[[0.         8.31692548 0.        ]
 [2.22058017 5.21156853 8.31692548]
 [1.06470391 2.79227733 4.6462145 ]]

optimal policy:-
E → E 
→ → ↑ 
→ ↑ ↑ 
-------------------------------------------------------

In [207]:
def generate_randomized_policy():
    randomized_policy = np.array([[Direction.STAY for _ in range(3)] for _ in range(3)])
    directions = [d for d in Direction if d != Direction.STAY]
    # Iterate through the array and randomize directions
    for i in range(randomized_policy.shape[0]):
        for j in range(randomized_policy.shape[1]):
            if (i, j) not in [(0, 0), (0, 2)]:
                # Exclude STAY and randomize direction
                randomized_policy[i, j] = np.random.choice(directions)
    return randomized_policy

In [208]:
def policy_evaluation(policy):
    V = np.zeros_like(world, dtype=np.float64)
    while True:
        delta = 0  # delta = max(|v - v'|)
        for i in range(3):
            for j in range(3):
                if isEnd(i, j):
                    continue
                v = V[i][j]
                successors = get_successor(i, j, policy[i][j])
                value = 0
                for x, y, prop in successors:
                    value += prop * (reward(x, y) + gamma * V[x][y])
                V[i][j] = value
                delta = max(delta, float(abs(v - V[i][j])))
        if delta < epsilon:
            break
    return V

In [209]:
def policy_improvement(V, policy):
    for i in range(3):
        for j in range(3):
            if isEnd(i, j):
                continue
            max_value = float('-inf')
            for direction in Direction:
                successors = get_successor(i, j, direction)
                value = 0
                for x, y, prop in successors:
                    value += prop * (reward(x, y) + gamma * V[x][y])

                if value > max_value:
                    max_value = value
                    policy[i][j] = direction

In [210]:
def policy_iteration():
    old_policy = generate_randomized_policy()
    i = 1
    while True:
        V = policy_evaluation(old_policy)
        new_policy = copy.deepcopy(old_policy)
        print("iteration ", i)
        i += 1
        print_policy(new_policy)
        policy_improvement(V, new_policy)
        if np.array_equal(old_policy, new_policy):
            break
        old_policy = copy.deepcopy(new_policy)
    return V, old_policy

In [211]:
for r in [100, 3, 0, -3]:
    world[0][0] = r
    optimal_values, optimal_policy = policy_iteration()
    print('r = ' + str(r))
    print("optimal values:-")
    print(optimal_values)
    print()
    print("optimal policy:-")
    print_policy(optimal_policy)
    print('----------------------------------------------------------------------------------')

iteration  1
E ← E 
↓ ↓ → 
↓ → ↑ 
iteration  2
E ← E 
↑ ↑ ↑ 
← ↓ → 
iteration  3
E ← E 
↑ ↑ ← 
↑ ↑ ↑ 
iteration  4
E ← E 
↑ ← ← 
↑ ↑ ← 
r = 100
optimal values:-
[[ 0.         85.01888433  0.        ]
 [85.01888433 63.98605414 43.67295431]
 [57.28454339 47.15571778 32.77349572]]

optimal policy:-
E ← E 
↑ ← ← 
↑ ↑ ← 
----------------------------------------------------------------------------------
iteration  1
E → E 
← ↓ ↓ 
↓ ← ↑ 
iteration  2
E → E 
↑ ↑ ↑ 
← ↓ ↓ 
iteration  3
E → E 
→ ↑ ↑ 
↑ ↑ ↑ 
r = 3
optimal values:-
[[0.         8.31716852 0.        ]
 [2.82806385 5.21460644 8.31716852]
 [1.1339466  2.79982174 4.64697359]]

optimal policy:-
E → E 
→ ↑ ↑ 
↑ ↑ ↑ 
----------------------------------------------------------------------------------
iteration  1
E ↑ E 
↓ → ↑ 
← ← ↑ 
iteration  2
E → E 
→ → ↑ 
↓ → ↑ 
iteration  3
E → E 
→ → ↑ 
→ ↑ ↑ 
r = 0
optimal values:-
[[0.         8.31694028 0.        ]
 [2.52274391 5.21175346 8.31694028]
 [1.09027121 2.79445577 4.64639824]]

optimal 