In [3]:
%pip install numpy


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [4]:
import numpy as np

In [5]:
def init_grid(grid: np.ndarray):
    counter = 1
    for i in range(len(grid)):
        for j in range(len(grid[0])):
            if (i == 0 and j == 0) or (i == 3 and j == 3):
                grid[i, j] = 0
            else:
                grid[i, j] = counter
                counter += 1               

In [6]:
from typing import Tuple

def get_next_state(state: Tuple[int, int], action: str) -> Tuple[int, int]:
    i, j = state
    
    if action == 'up':
        next_i, next_j = max(0, i - 1), j
    elif action == 'down':
        next_i, next_j = min(3, i + 1), j
    elif action == 'left':
        next_i, next_j = i, max(0, j - 1)
    elif action == 'right':
        next_i, next_j = i, min(3, j + 1)
    
    return (next_i, next_j)

In [7]:
def init_state_value_func(): 
    return np.zeros((4, 4))

In [14]:
def init_policy():
    return np.ones((4, 4, 4)) * 0.25

In [31]:
from typing import List

def action_to_index(action: str, actions: List[str] = ['up', 'down', 'left', 'right']):
    return actions.index(action)
    

In [9]:
from typing import Tuple, Dict, List

def calculate_p(current_state: Tuple[int], action, next_state: Tuple[int], reward: int, adjacency: Dict[Tuple[int, int], List[Tuple[int, int]]]):
    if next_state in adjacency[current_state] and reward == -1:
        return 1
    return 0

In [58]:
def run_policy_improvement(v, policy, grid, actions, discount_factor):
    # optimal policy is the one that maximizes the expected state-value-action q(s,a)
    # argmax_a r(s, a) + gamma * v(s')
    policy_stable = True
    optimal_policy = policy.copy()
    for state in [(i, j) for i in range(len(grid)) for j in range(len(grid[0])) if not (i ==0 and j == 0) and not (i == 3 and j == 3)]: # loop over all non-terminal states
        init_best_action = policy[state[0], state[1], :].argmax()
        best_action = actions[0]
        best_state_action_value = None
        curr_state_value = 0
        for action in actions:
            next_state = get_next_state(state, action)
            reward = -1
            curr_state_value = (reward + discount_factor * v[next_state])
            if best_state_action_value is None or curr_state_value > best_state_action_value:
                best_state_action_value = curr_state_value
                best_action = action
        if init_best_action != action_to_index(best_action):
            policy_stable = False

        optimal_policy[state[0], state[1], :] = 0
        optimal_policy[state[0], state[1], action_to_index(best_action)] = 1

    return optimal_policy, policy_stable

In [None]:
THETA = 1e-10
DISCOUNT_FACTOR = 1

v = init_state_value_func()

grid = np.zeros((4, 4))
init_grid(grid)

pi_a_given_s = 1/4 # pi(a | s)

actions = ['up', 'down', 'left', 'right']

policy = init_policy()

In [None]:
def run_policy_evaluation(v, policy, grid, actions, discount_factor, theta):
    error = float('inf')
    counter = 0

    while error > theta:
        v_new = v.copy()
        for state in [(i, j) for i in range(len(grid)) for j in range(len(grid[0])) if not (i ==0 and j == 0) and not (i == 3 and j == 3)]: # loop over all non-terminal states
            curr_state_value = 0
            for action in actions:
                next_state = get_next_state(state, action)
                reward = -1
                curr_state_value += policy[state[0], state[1], action_to_index(action)] * (reward + discount_factor * v[next_state])

            v_new[state[0], state[1]] = curr_state_value
        error = np.max(np.abs(v_new - v))
        v = v_new
        counter += 1
        
    return v

In [72]:
def run_policy_iteration(v_init, policy_init, grid, actions, discount_factor, theta):
    policy_stable = False

    v = v_init.copy()
    policy = policy_init.copy()

    while not policy_stable:
        # Iterative Policy Evaluation
        v = run_policy_evaluation(v, policy, grid, actions, discount_factor, theta)
        
        # Policy Improvement
        new_policy, is_policy_stable = run_policy_improvement(v, policy, grid, actions, discount_factor)
        
        if is_policy_stable:
            policy_stable = True
        
        policy = new_policy
    
    return v, policy


In [73]:
THETA = 1e-10
DISCOUNT_FACTOR = 1

v = init_state_value_func()

grid = np.zeros((4, 4))
init_grid(grid)

actions = ['up', 'down', 'left', 'right']

policy = init_policy()

v_optimal, policy_optimal = run_policy_iteration(v_init=v, policy_init=policy, grid=grid, actions=actions, discount_factor=DISCOUNT_FACTOR, theta=THETA)

print("Optimal Value Function:")
print(v_optimal)
print("Optimal Policy:")
print(policy_optimal)

Optimal Value Function:
[[ 0. -1. -2. -3.]
 [-1. -2. -3. -2.]
 [-2. -3. -2. -1.]
 [-3. -2. -1.  0.]]
Optimal Policy:
[[[0.25 0.25 0.25 0.25]
  [0.   0.   1.   0.  ]
  [0.   0.   1.   0.  ]
  [0.   1.   0.   0.  ]]

 [[1.   0.   0.   0.  ]
  [1.   0.   0.   0.  ]
  [1.   0.   0.   0.  ]
  [0.   1.   0.   0.  ]]

 [[1.   0.   0.   0.  ]
  [1.   0.   0.   0.  ]
  [0.   1.   0.   0.  ]
  [0.   1.   0.   0.  ]]

 [[1.   0.   0.   0.  ]
  [0.   0.   0.   1.  ]
  [0.   0.   0.   1.  ]
  [0.25 0.25 0.25 0.25]]]
