## Import statements

In [1]:
import numpy as np

## Modelling environments

In [2]:
# parameters
num_states = 2 # (0=rocky, 1=Ridge)
num_actions = 3 # (0=drilling, 1=digging, 2=push_debris)
discount_factor = 0.9

In [3]:
# initializing transition probabilities and rewards
transitions = np.zeros((num_states, num_actions, num_states))
rewards = np.zeros((num_states, num_actions, num_states))

In [4]:
# Mdp definition
# transition dynamics/uncertainities
transitions[0, 0, 0] = 0.30  # (rocky, drilling, rocky)
transitions[0, 1, 0] = 0.70  # (rocky, digging, rocky)
transitions[0, 2, 0] = 0.45  # (rocky, pushing, rocky)
transitions[0, 0, 1] = 0.70  # (rocky, drilling, ridge)
transitions[0, 1, 1] = 0.25  # (rocky, digging, ridge)
transitions[0, 2, 1] = 0.55  # (rocky, pushing, ridge)
transitions[1, 0, 0] = 0.60  # (ridge, drilling, rocky)
transitions[1, 1, 0] = 0.00  # (ridge, digging, rocky)
transitions[1, 2, 0] = 0.20   # (ridge, pushing, rocky)
transitions[1, 0, 1] = 0.40  # (ridge, drilling, ridge)
transitions[1, 1, 1] = 0.00  # (ridge, digging, ridge)
transitions[1, 2, 1] = 0.80  # (ridge, pushing, ridge)

# rewards
rewards[0, 0, 0] = 5  # (rocky, drilling, rocky)
rewards[0, 1, 0] = 7  # (rocky, digging, rocky)
rewards[0, 2, 0] = 9  # (rocky, pushing, rocky)
rewards[0, 0, 1] = 1  # (rocky, drilling, ridge)
rewards[0, 1, 1] = 7  # (rocky, digging, ridge)
rewards[0, 2, 1] = 5  # (rocky, pushing, ridge)
rewards[1, 0, 0] = 6  # (ridge, drilling, rocky)
rewards[1, 1, 0] = 0  # (ridge, digging, rocky)
rewards[1, 2, 0] = 10   # (ridge, pushing, rocky)
rewards[1, 0, 1] = 2  # (ridge, drilling, ridge)
rewards[1, 1, 1] = 0  # (ridge, digging, ridge)
rewards[1, 2, 1] = 2  # (ridge, pushing, ridge)

## Policy iteration

In [45]:
# policy iteration
def policy_evaluation(policy, transitions, rewards, discount_factor, tol=1e-6):
    values = np.zeros(num_states)

    while True:
        delta = 0
        for s in range(num_states):
            v = values[s]
            action = policy[s]

            values[s] = sum(transitions[s, action, s_next] * (rewards[s, action, s_next] + discount_factor * values[s_next]) for s_next in range(num_states))

            delta = max(delta, abs(v - values[s]))

        if delta < tol:
            break

    return values
    
def policy_iteration(policy, transitions, rewards, discount_factor, tol=1e-6):
    # Initialize q(s, a)
    q_values = np.zeros((num_states, num_actions))

    while True:
        stop = True  # Assume policy is stable initially
        # Evaluate the policy to get state values
        values = policy_evaluation(policy, transitions, rewards, discount_factor, tol)

        # Update the policy for each state
        for s in range(num_states):
            # Find the best action and its Q-value for this state
            best_action = policy[s]
            best_q_value = q_values[s, best_action]

            for action in range(num_actions):
                new_q_value = sum(
                    transitions[s, action, s_next] * (rewards[s, action, s_next] + discount_factor * values[s_next])
                    for s_next in range(num_states)
                )
                
                # Update if this action is better than the best one found so far
                if new_q_value > best_q_value:
                    best_q_value = new_q_value
                    best_action = action
                    stop = False  # Mark that we made a policy change
            
            # Update policy and q_values with the best action for state s
            policy[s] = best_action
            q_values[s, best_action] = best_q_value
        
        # Stop if the policy is stable
        if stop:
            break

    return values, policy


In [46]:
policy = np.zeros(num_states, dtype=int)
# deterministic policy
policy[0] = 0 
policy[1] = 2

values, policy = policy_iteration(policy, transitions, rewards, discount_factor)
values, policy

(array([57.53303394, 55.41849691]), array([2, 0]))

## Value iteration

In [41]:
def get_optimal_policy(values, transitions, rewards, discount_factor):
    q_values = np.zeros((num_states, num_actions))
    for s in range(num_states):
        for action in range(num_actions):
            q_values[s, action] = sum(transitions[s, action, s_next] * (rewards[s, action, s_next] + discount_factor * values[s_next]) for s_next in range(num_states))
            
    best_actions = np.argmax(q_values, axis=1)
    return best_actions
    


# value iteration
def value_iteration(transitions, rewards, discount_factor, tolerance=1e-6):
    values = np.zeros(num_states)

    while(True):
        delta = 0
        for s in range(num_states):
            v = values[s]

            # update value
            values[s] = max([sum(transitions[s, action, s_next] * (rewards[s, action, s_next]+discount_factor*values[s_next]) for s_next in range(num_states)) for action in range(num_actions)])

            delta = max(delta, abs(v-values[s]))
        
        if delta < tolerance:
            break
    
    best_actions = get_optimal_policy(values, transitions, rewards, discount_factor)

    return values, best_actions

In [47]:
values, best_actions = value_iteration(transitions, rewards, 0.9)
values, best_actions

(array([57.53303394, 55.41849691]), array([2, 0]))