In [2]:
import numpy as np

In [3]:
#Policy e-greedy
# def e_greedy_action_choice(q_estimates, epsilon):
#     if np.random.rand(1,1).item() <= epsilon:
#         return np.random.randint(high=len(q_estimates))
    
#     else:
#         return np.argmax(q_estimates)
    
# def e_greedy_policy(action_idx, state_idx, q_estimates, epsilon):
#     """
#     π(a|s)
#     Returns probability of taking action a given state s following a e-greedy policy
#     """
#     if np.argmax(q_estimates[state_idx]) == action_idx:
#         return (1 - epsilon) + (epsilon / len(q_estimates[state_idx]))
#     return (epsilon / len(q_estimates[state_idx]))

In [57]:
def numpy_argmax_all_indices(arr):
    """
    Returns a list of all indices where the maximum value occurs in a NumPy array.
    """
    if arr.size == 0:
        return []
    
    max_val = np.max(arr)
    # np.where returns a tuple of arrays, one for each dimension.
    # For a 1D array, it returns (array_of_indices,). We want the first element.
    return np.where(arr == max_val)[0].tolist()

In [116]:
#Policy Evaluation
ACTIONS = [(1,0), (-1,0), (0,-1), (0,1)]
GRID = 4
N_STATES = GRID**2
GAMMA = 1
delta = 0.01
THETA = 1e-5
r_policy = [[0,1,2,3] for _ in range(N_STATES)]

def get_prob_policy(policy, state, action):
    if action in policy[state]:
        return 1/len(policy[state])
    
    return 0

def step(state, action):
    if state == 0 or state == N_STATES-1:
        return state, 0
    
    row, col = divmod(state, GRID)
    step_i, step_j = action
    new_row, new_col = step_i + row, step_j + col

    if 0 <= new_row < GRID and 0 <= new_col < GRID:
        return new_row * GRID + new_col, -1
    else:
        return state, -1

v = np.zeros(N_STATES)

def policy_evaluation(v, policy, delta):
    while delta >= THETA:
        delta = 0
        for s in range(N_STATES):
            v_old = v[s]
            v_new = 0
            for a in range(len(ACTIONS)):
                new_s, reward = step(s, ACTIONS[a])
                v_new +=  get_prob_policy(policy, s, a) * (reward + GAMMA * v[new_s])
            v[s] = v_new
            delta = max(delta, abs(v_old - v[s]))

In [117]:
#Policy Improvement
def policy_improvement(policy, state_value):
    for state in range(N_STATES):
        if state == 0 or state == N_STATES-1:
            continue
        
        row, col = divmod(state, GRID)
        n_states = [(row+1, col), (row-1, col), (row, col-1), (row, col+1)]
        n_state_values = [GAMMA*state_value[r][c]-1 if 0 <= r < GRID and 0 <= c < GRID else float("-inf") for r, c in n_states]
        best_actions = numpy_argmax_all_indices(np.array(n_state_values))
        policy[state] = best_actions

In [124]:
#Policy Iteration
def policy_iteration(policy, delta):
    counter = 0
    policy_unstable = True
    old_policy = policy.copy()
    while policy_unstable:
        counter += 1
        state_value = np.zeros(N_STATES)
        policy_evaluation(state_value, old_policy, delta)
        new_policy = old_policy.copy()
        policy_improvement(new_policy, np.round(state_value.reshape(GRID, GRID)))
        if new_policy == old_policy:
            policy_unstable = False
        else:
            old_policy = new_policy

    print('Iterations: ', counter)
    return new_policy
        

In [126]:
optimized_policy = policy_iteration(r_policy, 0.01)

Iterations:  3


In [127]:
np.random.poisson(3)

2