In [21]:
import sys
import random
import numpy as np

# Chooses an item from a list based on associated weights.
def choose_weighted_item(items, weights):
    total_weight = sum(weights)
    random_value = random.uniform(0, total_weight)
    cumulative_weight = 0
    for item, weight in zip(items, weights):
        cumulative_weight += weight
        if cumulative_weight >= random_value:
            return item
    raise AssertionError("Shouldn't get here")

# Represents a Markov Decision Process.
class MarkovDecisionProcess:
    def __init__(self, state_transitions, state_rewards, initial_state=None):
        self._validate_parameters(state_transitions, state_rewards)
        self._state_transitions = state_transitions
        self._state_rewards = state_rewards
        self._initial_state = initial_state
        self.state_count = len(state_transitions)
        self.reset_current_state()

    # Returns all states in the MDP.
    def get_all_states(self):
        return tuple(self._state_transitions.keys())

    # Returns valid actions for a given state.
    def get_valid_actions(self, state):
        return tuple(self._state_transitions.get(state, {}).keys())

    # Checks if a state is terminal.
    def is_terminal_state(self, state):
        return not bool(self.get_valid_actions(state))

    # Returns probabilities of next states given a state and action.
    def get_next_state_probabilities(self, state, action):
        if action not in self.get_valid_actions(state):
            raise AssertionError(f"cannot do action {action} from state {state}")
        return self._state_transitions[state][action]

    # Returns the probability of transitioning to a specific next state.
    def get_transition_probability(self, state, action, next_state):
        return self.get_next_state_probabilities(state, action).get(next_state, 0.0)

    # Returns the reward of transitioning to a specific next state.
    def get_transition_reward(self, state, action, next_state):
        if action not in self.get_valid_actions(state):
            raise AssertionError(f"cannot do action {action} from state {state}")
        return self._state_rewards.get(state, {}).get(action, {}).get(next_state, 0.0)

    # Resets the current state to the initial state.
    def reset_current_state(self):
        if self._initial_state is None:
            self._current_state = random.choice((*self._state_transitions.keys(),))
        elif self._initial_state in self._state_transitions:
            self._current_state = self._initial_state
        elif callable(self._initial_state):
            self._current_state = self._initial_state()
        else:
            raise ValueError(f"initial state {self._initial_state} should be either a state or a function() -> state")
        return self._current_state

    # Executes an action and returns the next state, reward, and done flag.
    def perform_action(self, action):
        next_states, probabilities = zip(*self.get_next_state_probabilities(self._current_state, action).items())
        next_state = choose_weighted_item(next_states, weights=probabilities)
        reward = self.get_transition_reward(self._current_state, action, next_state)
        is_done = self.is_terminal_state(next_state)
        self._current_state = next_state
        return next_state, reward, is_done, {}

    # Prints the current state.
    def display_current_state(self):
        print("Currently at %s" % self._current_state)

    # Validates input parameters for the MDP.
    def _validate_parameters(self, state_transitions, state_rewards):
        for state in state_transitions:
            assert isinstance(state_transitions[state], dict), "state_transitions for %s should be a dict" % state
            for action in state_transitions[state]:
                assert isinstance(state_transitions[state][action], dict), "state_transitions for %s, %s should be a dict" % (state, action)
                next_state_probabilities = state_transitions[state][action]
                assert len(next_state_probabilities) != 0, "from state %s action %s leads to no next states" % (state, action)
                sum_probabilities = sum(next_state_probabilities.values())
                assert abs(sum_probabilities - 1) <= 1e-10, "probabilities for state %s action %s sum to %f" % (state, action, sum_probabilities)
        for state in state_rewards:
            assert isinstance(state_rewards[state], dict), "state_rewards for %s should be a dict" % state
            for action in state_rewards[state]:
                assert isinstance(state_rewards[state][action], dict), "state_rewards for %s, %s should be a dict" % (state, action)
        assert None not in state_transitions, "Do not use None as a state identifier."
        assert None not in state_rewards, "Do not use None as an action identifier."

# Environment Setup

# Grid dimensions.
num_rows = 3
num_columns = 4
all_states = list(range(12))  # States 0 to 11.
goal_state = 3
hole_state = 7
wall_state = 5
terminal_states = [goal_state, hole_state]
non_terminal_states = [s for s in all_states if s not in terminal_states]  # Includes wall state 5.

# Actions.
available_actions = ['left', 'right', 'up', 'down']

# Action to direction mapping.
action_directions = {
    'left': (0, -1),
    'right': (0, 1),
    'up': (-1, 0),
    'down': (1, 0)
}

# Stochastic probabilities.
intended_probability = 0.8
orthogonal_probability = 0.1  # 0.2 split between two orthogonal directions.

# Orthogonal actions per action.
orthogonal_actions = {
    'up': ['left', 'right'],
    'down': ['left', 'right'],
    'left': ['up', 'down'],
    'right': ['up', 'down']
}

# Helper functions.
def get_row_column(state):
    return divmod(state, num_columns)

def get_state_index(row, column):
    if 0 <= row < num_rows and 0 <= column < num_columns:
        return row * num_columns + column
    return None

# Initialize transition probabilities and rewards.
state_transitions = {s: {a: {} for a in available_actions} for s in non_terminal_states}
state_rewards = {s: {a: {} for a in available_actions} for s in non_terminal_states}

for state in non_terminal_states:
    row, column = get_row_column(state)
    for action in available_actions:
        # Intended direction.
        row_change, column_change = action_directions[action]
        next_row = row + row_change
        next_column = column + column_change
        next_state = get_state_index(next_row, next_column)
        if next_state is None or next_state == wall_state:
            next_state = state
        state_transitions[state][action][next_state] = state_transitions[state][action].get(next_state, 0) + intended_probability
        state_rewards[state][action][next_state] = 1.0 if next_state == goal_state else -1.0 if next_state == hole_state else -0.01

        # Orthogonal directions.
        for orthogonal_action in orthogonal_actions[action]:
            row_change_orthogonal, column_change_orthogonal = action_directions[orthogonal_action]
            next_row_orthogonal = row + row_change_orthogonal
            next_column_orthogonal = column + column_change_orthogonal
            next_state_orthogonal = get_state_index(next_row_orthogonal, next_column_orthogonal)
            if next_state_orthogonal is None or next_state_orthogonal == wall_state:
                next_state_orthogonal = state
            state_transitions[state][action][next_state_orthogonal] = state_transitions[state][action].get(next_state_orthogonal, 0) + orthogonal_probability
            state_rewards[state][action][next_state_orthogonal] = 1.0 if next_state_orthogonal == goal_state else -1.0 if next_state_orthogonal == hole_state else -0.01

# Create MDP environment.
environment = MarkovDecisionProcess(state_transitions, state_rewards, initial_state=0)

# 1. Policy Iteration.

# Calculates the value function for a given policy.
def calculate_value_function(policy, environment, discount_factor=1.0, convergence_tolerance=1e-8):
    value_function = {state: 0.0 for state in all_states}
    evaluation_count = 0
    while True:
        delta = 0
        for state in non_terminal_states:
            old_value = value_function[state]
            action = policy[state]
            state_value = sum(
                probability * (environment.get_transition_reward(state, action, next_state) + discount_factor * value_function[next_state])
                for next_state, probability in environment.get_next_state_probabilities(state, action).items()
            )
            value_function[state] = state_value
            delta = max(delta, abs(old_value - state_value))
            evaluation_count += 1
        if delta < convergence_tolerance:
            break
    return value_function, evaluation_count

# Generates an improved policy based on the value function.
def generate_improved_policy(environment, value_function, discount_factor=1.0):
    policy = {}
    for state in non_terminal_states:
        action_values = {
            action: sum(
                probability * (environment.get_transition_reward(state, action, next_state) + discount_factor * value_function[next_state])
                for next_state, probability in environment.get_next_state_probabilities(state, action).items()
            )
            for action in available_actions
        }
        policy[state] = max(action_values, key=action_values.get)
    return policy

# Runs the policy iteration algorithm.
def run_policy_iteration(environment, discount_factor=1.0, max_iterations=1000):
    policy = {state: random.choice(available_actions) for state in non_terminal_states}
    total_evaluations = 0
    for iteration in range(max_iterations):
        value_function, evaluation_count = calculate_value_function(policy, environment, discount_factor)
        total_evaluations += evaluation_count
        new_policy = generate_improved_policy(environment, value_function, discount_factor)
        if new_policy == policy:
            print(f"Policy Iteration converged after {iteration + 1} policy improvements, {total_evaluations} total evaluations")
            return new_policy, value_function
        policy = new_policy
    print("Policy Iteration did not converge within max iterations")
    return policy, value_function

# Run Policy Iteration.
print('=== Policy Iteration ===')
pi_policy, pi_value_function = run_policy_iteration(environment)

# Display policy.
print('Optimal Policy (PI):')
action_symbols = {'up': '^', 'down': 'v', 'left': '<', 'right': '>'}
grid = [[' ' for _ in range(num_columns)] for _ in range(num_rows)]
for state in all_states:
    row, column = get_row_column(state)
    if state == goal_state:
        grid[row][column] = 'G'
    elif state == hole_state:
        grid[row][column] = 'H'
    elif state == wall_state:
        grid[row][column] = 'W'
    elif state in pi_policy:
        grid[row][column] = action_symbols[pi_policy[state]]
for row in grid:
    print(' '.join(row))

# 2. Value Iteration.

# Runs the value iteration algorithm.
def run_value_iteration(environment, discount_factor=1.0, convergence_tolerance=1e-8, max_iterations=1000):
    value_function = {state: 0.0 for state in all_states}
    for iteration in range(max_iterations):
        delta = 0
        for state in non_terminal_states:
            old_value = value_function[state]
            action_values = [sum(probability * (environment.get_transition_reward(state, action, next_state) + discount_factor * value_function[next_state])
                                    for next_state, probability in environment.get_next_state_probabilities(state, action).items())
                                    for action in available_actions]
            value_function[state] = max(action_values)
            delta = max(delta, abs(old_value - value_function[state]))
        if delta < convergence_tolerance:
            print(f'Value Iteration converged after {iteration+1} iterations')
            break
    policy = generate_improved_policy(environment, value_function, discount_factor)
    return policy, value_function

# Run Value Iteration.
print('\n=== Value Iteration ===')
vi_policy, vi_value_function = run_value_iteration(environment)

# Display policy.
print('Optimal Policy (VI):')
grid = [[' ' for _ in range(num_columns)] for _ in range(num_rows)]
for state in all_states:
    row, column = get_row_column(state)
    if state == goal_state:
        grid[row][column] = 'G'
    elif state == hole_state:
        grid[row][column] = 'H'
    elif state == wall_state:
        grid[row][column] = 'W'
    elif state in vi_policy:
        grid[row][column] = action_symbols[vi_policy[state]]
for row in grid:
    print(' '.join(row))

# 3. Comparison of PI and VI.

print('\n=== Comparison ===')
policies_equal = pi_policy == vi_policy
print(f'Policies are {"identical" if policies_equal else "different"}')
print('Convergence comparison is based on the output above:')
print('- PI reports number of policy improvements and total evaluations (iterations within policy evaluation).')
print('- VI reports total iterations until value function convergence.')
print('Typically, VI converges faster in terms of total computational steps, as PI requires multiple evaluations per policy update.')

=== Policy Iteration ===
Policy Iteration converged after 5 policy improvements, 317400 total evaluations
Optimal Policy (PI):
> > > G
^ W < H
^ < < v

=== Value Iteration ===
Value Iteration converged after 148 iterations
Optimal Policy (VI):
> > > G
^ W < H
^ < < v

=== Comparison ===
Policies are identical
Convergence comparison is based on the output above:
- PI reports number of policy improvements and total evaluations (iterations within policy evaluation).
- VI reports total iterations until value function convergence.
Typically, VI converges faster in terms of total computational steps, as PI requires multiple evaluations per policy update.
