In [1]:
import numpy as np
import matplotlib.pyplot as plt
import random
from collections import defaultdict


In [2]:
class GridEnvironment:
    def __init__(self, size=100, obstacle_prob=0.2):
        self.size = size
        self.grid = np.zeros((size, size))
        self.start = (0, 0)  # Start at the top-left corner
        self.goal = (size-1, size-1)  # Goal at the bottom-right corner
        self.obstacles = self._place_obstacles(obstacle_prob)

    def _place_obstacles(self, obstacle_prob):
        obstacles = []
        for i in range(self.size):
            for j in range(self.size):
                if (i, j) != self.start and (i, j) != self.goal:
                    if random.uniform(0, 1) < obstacle_prob:
                        self.grid[i, j] = -1  # Mark obstacle
                        obstacles.append((i, j))
        return obstacles

    def is_terminal(self, state):
        return state == self.goal


In [3]:
class MDP:
    def __init__(self, grid_env, gamma=0.9):
        self.env = grid_env
        self.gamma = gamma
        self.rewards = defaultdict(lambda: -0.1)  # Small penalty for each move
        self.rewards[self.env.goal] = 1.0         # Reward for reaching the goal
        for obstacle in self.env.obstacles:
            self.rewards[obstacle] = -1.0         # Penalty for hitting an obstacle

    def get_actions(self, state):
        x, y = state
        actions = []
        if x > 0:
            actions.append((-1, 0))  # Move up
        if x < self.env.size - 1:
            actions.append((1, 0))   # Move down
        if y > 0:
            actions.append((0, -1))  # Move left
        if y < self.env.size - 1:
            actions.append((0, 1))   # Move right
        return actions

    def get_next_state(self, state, action):
        x, y = state
        dx, dy = action
        next_state = (x + dx, y + dy)
        if next_state in self.env.obstacles:
            return state  # No movement if obstacle
        return next_state


In [4]:
class PolicyIteration:
    def __init__(self, mdp, theta=1e-3):
        self.mdp = mdp
        self.policy = defaultdict(lambda: random.choice(mdp.get_actions((0, 0))))
        self.value_function = defaultdict(float)
        self.theta = theta

    def policy_evaluation(self):
        while True:
            delta = 0
            for state in np.ndindex((self.mdp.env.size, self.mdp.env.size)):
                if self.mdp.env.is_terminal(state):
                    continue
                v = self.value_function[state]
                action = self.policy[state]
                next_state = self.mdp.get_next_state(state, action)
                reward = self.mdp.rewards[next_state]
                self.value_function[state] = reward + self.mdp.gamma * self.value_function[next_state]
                delta = max(delta, abs(v - self.value_function[state]))
            if delta < self.theta:
                break

    def policy_improvement(self):
        policy_stable = True
        for state in np.ndindex((self.mdp.env.size, self.mdp.env.size)):
            if self.mdp.env.is_terminal(state):
                continue
            old_action = self.policy[state]
            action_values = {}
            for action in self.mdp.get_actions(state):
                next_state = self.mdp.get_next_state(state, action)
                action_values[action] = self.mdp.rewards[next_state] + self.mdp.gamma * self.value_function[next_state]
            best_action = max(action_values, key=action_values.get)
            self.policy[state] = best_action
            if old_action != best_action:
                policy_stable = False
        return policy_stable

    def iterate_policy(self):
        while True:
            self.policy_evaluation()
            if self.policy_improvement():
                break


In [None]:
class QLearningAgent:
    def __init__(self, mdp, alpha=0.1, epsilon=0.1, episodes=200):
        self.mdp = mdp
        self.q_table = defaultdict(lambda: defaultdict(float))
        self.alpha = alpha
        self.epsilon = epsilon
        self.episodes = episodes

    def choose_action(self, state):
        if random.uniform(0, 1) < self.epsilon:
            # Randomly pick an action if exploring
            return random.choice(self.mdp.get_actions(state))
        else:
            # Pick the best action if exploiting, or default to a random action if q_table[state] is empty
            if state in self.q_table and self.q_table[state]:  # Check if q_table has actions for this state
                return max(self.q_table[state], key=self.q_table[state].get)
            else:
                return random.choice(self.mdp.get_actions(state))  # Default to a random action

    def learn(self):
        for episode in range(self.episodes):
            state = self.mdp.env.start
            while not self.mdp.env.is_terminal(state):
                action = self.choose_action(state)
                next_state = self.mdp.get_next_state(state, action)
                reward = self.mdp.rewards[next_state]
                
                # Best next action value
                if next_state in self.q_table and self.q_table[next_state]:  # Check if q_table has actions for next_state
                    best_next_action_value = max(self.q_table[next_state].values())
                else:
                    best_next_action_value = 0  # Default to 0 if no values are available
                
                # Q-learning update
                td_target = reward + self.mdp.gamma * best_next_action_value
                td_error = td_target - self.q_table[state][action]
                self.q_table[state][action] += self.alpha * td_error
                
                state = next_state


In [None]:
# Initialize environment and MDP
grid_env = GridEnvironment()
mdp = MDP(grid_env)

# Dynamic Programming (Policy Iteration)
dp_agent = PolicyIteration(mdp)
dp_agent.iterate_policy()
print("DP Policy Iteration Completed.")

# Q-learning
q_learning_agent = QLearningAgent(mdp)
q_learning_agent.learn()
print("Q-learning Completed.")

# Benchmark results
dp_values = sum(dp_agent.value_function.values())
q_learning_values = sum([max(q_learning_agent.q_table[state].values()) for state in q_learning_agent.q_table])

print(f"Total DP Value Function: {dp_values}")
print(f"Total Q-learning Value Function: {q_learning_values}")
