# Computational Game Theory Assignment: Hysteretic Q-Learning

**Objective:** Explore and implement Hysteretic Q-Learning to evaluate their cooperative capabilities compared to the standard Q-Learning approach in a grid-based, two-agent environment.

In cooperative multi-agent reinforcement learning, agents must learn to work together to reach common goals while managing the risk of conflicting or uncoordinated actions. This notebook guides you through the process of implementing the algorithms to observe how different strategies affect learning in a cooperative setting.

### Notebook Outline:

1. **Environment Setup**: Develop a custom grid-world environment where two agents must cooperate to achieve shared goals.
2. **Q-Learning Implementation**: Implement a standard single-agent Q-Learning algorithm to understand the baseline behavior.
3. **Hysteretic Q-Learning Implementation**: Implement hysteresis to balance learning between positive and negative rewards, stabilizing updates in a cooperative setting.
    

### Imports

In [None]:
import numpy as np
from pprint import pprint

## Environment

In this assignment, we’ll use a Cooperative Grid World as our testing ground. This grid-world environment simulates a modular, cooperative scenario where two agents must learn to navigate and reach their respective goal positions while avoiding obstacles and penalties. The design allows for flexible configurations of grid size, agent starting positions, goal locations, and obstacle placements, enabling a range of cooperative challenges.

### Key Features:

- **Grid Layout**: Agents operate on a grid where each cell represents a discrete location.
- **Cooperation Requirement**: Both agents must learn to reach their goals while coordinating to avoid penalties from conflicting moves.
- **Reward Structure**: The environment includes positive rewards for coordination and reaching goals, with penalties for collisions and miscoordination, promoting cooperative strategies.

In [None]:
class CooperativeGridWorld:
    """
    Cooperative GridWorld environment for multi-agent reinforcement learning.

    In this environment, multiple agents must navigate a grid with obstacles to reach 
    their designated goal positions. Agents are penalized for colliding with obstacles
    or each other, and receive rewards for successful coordination (reaching their goals
    simultaneously). Mis-coordination penalties are given if only some agents reach their goals.
    """

    def __init__(self, 
                 grid_size: tuple = (5, 5), 
                 agent_start: tuple = ((0, 0), (4, 4)), 
                 obstacles: tuple = (), 
                 goals: tuple = ((2, 2), (2, 3)),
                 max_steps: int = 50,
                 step_reward: int = -0.01,
                 collision_penalty: int = -1,
                 coordination_reward: int = 10,
                 miscoordination_penalty: int = -10,
                 end_on_miscoordination: bool = True):
        """
        Initialize the CooperativeGridWorld environment.

        Parameters:
        -----------
        grid_size : tuple
            Dimensions of the grid.
        agent_start : tuple
            Initial positions of agents.
        obstacles : tuple
            Locations of obstacles on the grid.
        goals : tuple
            Designated goal locations for each agent.
        max_steps : int
            Maximum allowed steps in an episode.
        step_reward : int
            Step penalty to discourage unnecessary movement.
        collision_penalty : int
            Penalty for collisions between agents or with obstacles.
        coordination_reward : int
            Reward for agents reaching their respective goals simultaneously.
        miscoordination_penalty : int
            Penalty if only some agents reach their goals.
        end_on_miscoordination : bool
            Whether to end the episode if miscoordination occurs.
        """
        self.grid_size = grid_size
        self._initial_agent_positions = agent_start
        self.n_agents = len(self._initial_agent_positions)
        
        assert all(isinstance(pos, tuple) and len(pos) == 2 for pos in agent_start), "Agent start must be tuples of (x, y) positions"
        assert all(isinstance(obs, tuple) and len(obs) == 2 for obs in obstacles), "Obstacles must be tuples of (x, y) positions"
        assert all(isinstance(g_pos, tuple) and len(g_pos) == 2 for g_pos in goals), "Goals must be tuples of (x, y) positions"
        assert len(goals) == self.n_agents
        self.agent_goals = goals
        self.obstacles = obstacles
        self.max_steps = max_steps
        self.step_reward = step_reward
        self.collision_penalty = collision_penalty
        self.coordination_reward = coordination_reward
        self.miscoordination_penalty = miscoordination_penalty
        self.end_on_miscoordination = end_on_miscoordination
        self.agent_positions = None
        self.steps = 0

    def reset(self):
        """Resets the environment to the initial state."""
        self.agent_positions = list(self._initial_agent_positions)
        self.steps = 0
        return self.agent_positions

    def step(self, actions):
        """
        Takes a step in the environment for all agents.

        Parameters:
        -----------
        actions : list of int
            List of actions for each agent. Actions: 0 = up, 1 = down, 2 = left, 3 = right.

        Returns:
        --------
        tuple
            (agent_positions, rewards, done) where:
            - agent_positions : list of tuples - new positions of agents after the step.
            - rewards : list of int - rewards received by each agent.
            - done : bool - whether the episode has ended.
        """
        assert len(actions) == self.n_agents, "Number of actions must match number of agents"
        reward = self.step_reward
        self.steps += 1
        done = False

        # Update each agent’s position
        new_positions = []
        for i, action in enumerate(actions):
            new_position = self._move(self.agent_positions[i], action)
            # Check for obstacles
            if new_position not in self.obstacles:
                new_positions.append(new_position)
            else:
                new_positions.append(self.agent_positions[i])
                reward += self.collision_penalty

        # Resolve collisions iteratively until all collisions are handled
        while True:
            # Identify all colliding positions
            position_counts = {}
            for pos in new_positions:
                position_counts[pos] = position_counts.get(pos, 0) + 1
            
            colliding_positions = {pos for pos, count in position_counts.items() if count > 1}

            if not colliding_positions:
                # Exit loop if no collisions
                break

            # Revert agents involved in collisions to their original positions
            for i, new_pos in enumerate(new_positions):
                if new_pos in colliding_positions:
                    new_positions[i] = self.agent_positions[i]
                    reward += self.collision_penalty

        # Update agent positions after checking collisions
        self.agent_positions = new_positions

        # Calculate coordination rewards and check goal conditions
        if all(pos == self.agent_goals[i] for i, pos in enumerate(self.agent_positions)):
            reward += self.coordination_reward
            done = True
        elif any(pos == self.agent_goals[i] for i, pos in enumerate(self.agent_positions)):
            reward += self.miscoordination_penalty
            if self.end_on_miscoordination:
                done = True

        done |= self.steps >= self.max_steps
        
        return self.agent_positions, reward, done

    def _move(self, position: tuple, action: int) -> tuple:
        """Moves an agent based on the chosen action within grid boundaries, collision with obstacles and between agents are managed somewhere else."""
        raise NotImplementedError("You have to implement this.")

    def render(self):
        """Renders the current grid showing agent positions, obstacles, and goals."""
        grid = np.zeros(self.grid_size)
        for pos in self.obstacles:
            grid[pos] = -1  # Obstacle
        for goal in self.agent_goals:
            grid[goal] = 2  # Goal
        for idx, pos in enumerate(self.agent_positions):
            grid[pos] = idx + 3  # Agents
        print(grid)


## Environment Configurations for Assignment

For this assignment, you’ll experiment with three grid-world environments of size 5×5 designed to highlight different aspects of cooperative agent behavior. These environments will help you test how single-agent and multi-agent algorithms adapt to various navigational constraints.

### Environment 1: Single-Agent Pathfinding

**Description**: This environment contains a single agent that must navigate from a starting point at (0, 0) to a goal at (4, 4). Obstacles are placed in the grid, but a valid path exists that allows the agent to reach its goal.
Objective: Implement a basic Q-learning agent to find the optimal path to the goal, avoiding obstacles and minimizing steps taken.

### Environment 2: Two-Agent Cooperative Pathfinding with Separate Paths

**Description**: This environment has two agents starting from opposite corners: Agent 1 at (0, 0) and Agent 2 at (4, 4). Each agent’s goal is to reach the other’s starting position. Obstacles are arranged to provide at least two distinct paths, allowing both agents to reach their destinations without interference.

### Environment 3: Two-Agent Cooperative Pathfinding with a Unique Path

**Description**: Similar to Environment 2, this setup features two agents with opposite starting points and goals. However, the obstacle layout here creates a single viable path, meaning the agents must coordinate closely to avoid blocking one another.

In [None]:
# Environment Configurations

# 1. Single-Agent Environment Configuration
single_agent_env_conf = {
    "grid_size": (5, 5),
    "agent_start": [(0, 0)],
    "goals": [(4, 4)],
    "obstacles": [(1, 1), (1, 3), (3, 1), (3, 3)],
    "max_steps": 20,
}

print("Single-Agent Environment Configuration:")
pprint(single_agent_env_conf)

# 2. Two-Agent Environment with Separate Paths
two_agent_env_separate_paths_conf = {
    "grid_size": (5, 5),
    "agent_start": [(0, 0), (4, 4)],
    "goals": [(4, 4), (0, 0)],
    "obstacles": [(1, 1), (1, 2), (2, 1), (3, 3), (2, 3)],
    "max_steps": 30,
}

print("\nTwo-Agent Environment with Separate Paths Configuration:")
pprint(two_agent_env_separate_paths_conf)

# 3. Two-Agent Environment with a Unique Path
two_agent_env_unique_path_conf = {
    "grid_size": (5, 5),
    "agent_start": [(0, 0), (4, 4)],
    "goals": [(4, 4), (0, 0)],
    "obstacles": [(1, 0), (1, 1), (1, 3), (1, 4), (3, 0), (3, 1), (3, 3), (3, 4), (2,1), (2,3)],
    "max_steps": 30,
}

print("\nTwo-Agent Environment with Unique Path Configuration:")
pprint(two_agent_env_unique_path_conf)


Let's render the environments.

In [None]:
single_agent_env = CooperativeGridWorld(**single_agent_env_conf)
two_agent_env_separate_paths = CooperativeGridWorld(**two_agent_env_separate_paths_conf)
two_agent_env_unique_path = CooperativeGridWorld(**two_agent_env_unique_path_conf)

# Render the environments
print("\nSingle-Agent Environment:")
single_agent_env.reset()
single_agent_env.render()

print("\nTwo-Agent Environment with Separate Paths:")
two_agent_env_separate_paths.reset()
two_agent_env_separate_paths.render()

print("\nTwo-Agent Environment with Unique Path:")
two_agent_env_unique_path.reset()
two_agent_env_unique_path.render()

# Starting Code

In this section, we provide the foundational code for implementing the Q-learning agent, along with the essential learning function. This code serves as a starting point for your assignments, allowing you to focus on the implementation of Hysteretic Q-Learning. By building upon this code, you will create agents capable of learning in both single-agent and multi-agent environments. Review the provided code carefully, as it includes important functions and structures that will facilitate your development of the Q-learning algorithms.

In [None]:
class Agent:
    def __init__(self, gamma: float, n_actions: int, grid_size: tuple, n_agents: int, seed: int):
        """
        Initialize the Q-Learning agent.

        Parameters:
        -----------
        gamma : float
            Discount factor for future rewards.
        grid_size : tuple
            Size of the grid
        n_states : int
            Number 
        n_actions : int
            Number of actions available to the agent.
        seed: int
            Seed for the pseudo random number generator.
        """
        self.gamma = gamma
        self.n_actions = n_actions
        self.grid_size = grid_size
        self.q_values = np.zeros(grid_size * n_agents + (n_actions,))
        self.rng = np.random.default_rng(seed)
    
    def greedy(self, agents_pos: tuple) -> int:
        # agent pos is an n-agent tuple of positions lets make it a simple tuple
        agents_pos: tuple = sum(agents_pos, ())
        return np.argmax(self.q_values[agents_pos])
    
    def act(self, agent_pos: tuple) -> int:
        # TODO: need to implement exploration
        return self.greedy(agent_pos)
    
    def update(self, agents_pos: tuple, action: int, reward: float, done: bool, next_agents_pos: tuple):
        raise NotImplementedError("Subclasses must implement this method.")
        

In [None]:
def learn(env: CooperativeGridWorld, agents: list[Agent], n_episodes: int = 10000, eval_every: int = 100) -> tuple[dict, list[Agent]]:
    """
    Train the agents using Q-Learning.

    Parameters:
    -----------
    env : CooperativeGridWorld
        The environment to train the agents on.
    agents : list of Agent
        The agents to train.
    n_episodes : int
        Number of episodes to train the agents.
    eval_every: int
        Number of episode to do evaluation.

    Returns:
    --------
    tuple
        A tuple containing:
        - A dictionary with the rewards obtained in each episode, the length in each episode and the eval rewards and eval length
        - The trained agents.
    """
    rewards = np.zeros(n_episodes)
    lengths = np.zeros(n_episodes)
    eval_rewards = []
    eval_lengths = []
    for episode in range(n_episodes):
        state = env.reset()
        total_reward = 0
        step = 0
        done = False
        while not done:
            actions = [agent.act(state) for agent in agents]
            next_state, reward, done = env.step(actions)
            total_reward += reward
            for agent, action in zip(agents, actions):
                agent.update(state, action, reward, done, next_state)
            state = next_state
            step += 1
        rewards[episode] = total_reward
        lengths[episode] = step
        
        if episode % eval_every == 0:
            state = env.reset()
            total_reward = 0
            step = 0
            done = False
            while not done:
                actions = [agent.greedy(state) for agent in agents]
                state, reward, done = env.step(actions)
                total_reward += reward
                step += 1
            eval_rewards.append(total_reward)
            eval_lengths.append(step)
        
    metrics = {"reward": rewards, "lengths": lengths, "eval_rewards": np.array(eval_rewards), "eval_lengths": np.array(eval_lengths)}
    return metrics, agents

# Q-Learning

First, you need to implement the missing components of the Q-learning agent.


In [None]:
class QLearningAgent(Agent):
    def __init__(self, epsilon_max: float = 1.0, epsilon_min: float = 0.05, epsilon_decay: float = 0.9999, learning_rate: float = 0.1, **kwargs):
        """
        Initialize the Q-Learning agent.

        Parameters:
        -----------
        epsilon_max : float
            Maximum value for epsilon (exploration rate).
        epsilon_min : float
            Minimum value for epsilon.
        epsilon_decay : float
            Decay rate for epsilon.
        learning_rate : float
            Learning rate for updating Q-values.
        """
        self.epsilon_max = epsilon_max
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.learning_rate = learning_rate
        self.epsilon = epsilon_max
        super().__init__(**kwargs)
        
    def explore(self, agent_pos: tuple) -> int:
        raise NotImplementedError("You have to implement this.")
    
    def act(self, agent_pos: tuple) -> int:
        raise NotImplementedError("You have to implement this.")

    
    def update(self, agents_pos: tuple, action: int, reward: float, done: bool, next_agents_pos: tuple):
        raise NotImplementedError("You have to implement this.")


Your next task is to evaluate the performance of the Q-learning agent in the single-agent environment and the performance of independent Q-learning in the multi-agent settings. To ensure the robustness of your findings, average the results over multiple random seeds. After collecting the data, conduct a thorough analysis that includes visualizations such as plots to illustrate trends and comparisons. Feel free to experiment with different variations of the environments to gain deeper insights. You are also encouraged to study the impact of hyperparameters.


In [None]:
...

# Hysteretic Q-Learning

Hysteretic Q-Learning is a modification of traditional Q-learning that employs two distinct learning rates, $\alpha$ and $\beta$, where $\beta < \alpha$. The core idea behind using two learning rates is to enhance the learning stability of the agent by differentiating the updates based on whether it would increase or decrease the Q-value. This approach allows the agent to learn faster from successful actions while being more conservative with updates from less successful ones.

### Key Concepts
**Dual Learning Rates**:
- $\alpha$ (the primary learning rate) is used to update Q-values for positive td-error, encouraging the agent to reinforce successful behaviors.
- $\beta$ (the secondary learning rate) is used for updating Q-values for negative td-error, which helps stabilize learning by preventing drastic changes to the Q-values.



In [None]:
class HystereticQLearningAgent(Agent):
    def __init__(self, epsilon_max: float, epsilon_min: float, epsilon_decay: float, learning_rate_alpha: float, learning_rate_beta: float, **kwargs):
        """
        Initialize the Hysteretic Q-Learning agent.

        Parameters:
        -----------
        epsilon_max : float
            Maximum value for epsilon (exploration rate).
        epsilon_min : float
            Minimum value for epsilon.
        epsilon_decay : float
            Decay rate for epsilon.
        learning_rate_alpha : float
            Learning rate for updating Q-values when the update increases the Q-values.
        learning_rate_beta : float
            Learning rate for updating Q-values when the update decreases the Q-values.
        """
        self.epsilon_max = epsilon_max
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay
        self.learning_rate_alpha = learning_rate_alpha
        self.learning_rate_beta = learning_rate_beta
        self.epsilon = epsilon_max
        super().__init__(**kwargs)
        
    def explore(self, agent_pos: tuple) -> int:
        raise NotImplementedError("You have to implement this.")

    
    def act(self, agent_pos: tuple) -> int:
        raise NotImplementedError("You have to implement this.")

    
    def update(self, agents_pos: tuple, action: int, reward: float, done: bool, next_agents_pos: tuple):
        raise NotImplementedError("You have to implement this.")


Your next task is to evaluate the performance of Hysteretic Q-learning in the multi-agent environments. To ensure the robustness of your findings, average the results over multiple random seeds. After collecting the data, conduct a thorough analysis that includes visualizations such as plots to illustrate trends and comparisons. Feel free to experiment with different variations of the environments to gain deeper insights.

In [None]:
...

# More Complex Environments

In this section, your objective is to define one or two alternative environments that introduce greater challenges for the agents. By constructing these more complex scenarios, you will have the opportunity to benchmark the performance of both Independent Q-Learning and Hysteretic Q-Learning. After conducting experiments in these environments, carefully analyze and discuss the results from both algorithms. Consider aspects such as learning efficiency, coordination, and robustness to environmental changes. This analysis will help reveal how well each algorithm adapts to more challenging cooperative tasks.

In [None]:
...

# Different exploration

This final task consits in analyzing the impact of Boltzman Exploartion on both Independent Q-Learning and Hysteretic Q-Learning.

In [None]:
class BoltzmanQLearningAgent(QLearningAgent):
    ...

In [None]:
class BoltzmanHystereticQLearningAgent(HystereticQLearningAgent):
    ...