In [1]:
pip install gymnasium# > /dev/null 2>&1

Note: you may need to restart the kernel to use updated packages.


In this Notebook, we'll implement an agent that plays several matrix (normal-form) games against opponents that have fixed strategies. The games are a.o. the Prisoner's Dilemma, the Chicken game, and the Stag-Hunt game.

In [2]:
import numpy as np
import random
import matplotlib.pyplot as plt

import gymnasium as gym
from gymnasium import spaces

ModuleNotFoundError: No module named 'gymnasium'

In [None]:
class Qlearner:
    """A Q-learning agent"""

    def __init__(
        self,
        action_size,
        state_size,
        learning_rate=0.0,
        gamma=0.95,
        epsilon=1.0,
        epsilon_min=0.01,
        epsilon_decay=0.995,
    ):
        self.action_size = action_size
        self.state_size = state_size

        # initialize the Q-table:
        self.qtable = np.zeros((self.state_size, self.action_size))

        # define learning rate:
        if learning_rate == 0.0:
            self.dynamic_lr = True
            self.action_counter = np.zeros((self.state_size, self.action_size))
        else:
            self.dynamic_lr = False
            self.learning_rate = learning_rate

        # discount factor:
        self.gamma = gamma

        # Exploration parameters:
        self.epsilon = epsilon
        self.epsilon_min = epsilon_min
        self.epsilon_decay = epsilon_decay

        # tracking rewards/progress:
        self.rewards_this_episode = []  # during an episode, save every time step's reward
        self.episode_total_rewards = []  # each episode, sum the rewards, possibly with a discount factor
        self.average_episode_total_rewards = []  # the average (discounted) episode reward to indicate progress

        self.state_history = []
        self.action_history = []

    def reset_agent(self):
        self.qtable = np.zeros((self.state_size, self.action_size))

    def select_greedy(self, state):
        # np.argmax(self.qtable[state]) will select first entry if two or more Q-values are equal, but we want true randomness:
        return np.random.choice(np.flatnonzero(np.isclose(self.qtable[state], self.qtable[state].max())))

    def select_action(self, state):
        if np.random.rand() < self.epsilon:
            action = random.randrange(self.action_size)
        else:
            action = self.select_greedy(state)
        self.state_history.append(state)
        self.action_history.append(action)
        return action

    def update_epsilon(self):
        self.epsilon *= self.epsilon_decay
        self.epsilon = max(self.epsilon_min, self.epsilon)

    def update(self, state, action, new_state, reward, done, update_epsilon=True):
        if not self.dynamic_lr:
            lr = self.learning_rate
        else:
            self.action_counter[state, action] += 1
            lr = 1 / self.action_counter[state, action]

        # Q(s,a) <-- Q(s,a) + learning_rate [R + gamma * max_a' Q(s',a') - Q(s,a)]
        self.qtable[state, action] += lr * (reward + (not done) * self.gamma * np.max(self.qtable[new_state]) - self.qtable[state, action])

        self.rewards_this_episode.append(reward)

        if done:
            # track total reward:
            episode_reward = self._calculate_episode_reward(self.rewards_this_episode, discount=False)
            self.episode_total_rewards.append(episode_reward)

            k = len(self.average_episode_total_rewards) + 1  # amount of episodes that have passed
            self._calculate_average_episode_reward(k, episode_reward)

            if update_epsilon:
                self.update_epsilon()

            # reset the rewards for the next episode:
            self.rewards_this_episode = []

    def _calculate_episode_reward(self, rewards_this_episode, discount=False):
        if discount:
            return sum([self.gamma**i * reward for i, reward in enumerate(rewards_this_episode)])
        return sum(rewards_this_episode)

    def _calculate_average_episode_reward(self, k, episode_reward):
        if k > 1:  # running average is more efficient:
            average_episode_reward = (1 - 1 / k) * self.average_episode_total_rewards[-1] + episode_reward / k
        else:
            average_episode_reward = episode_reward
        self.average_episode_total_rewards.append(average_episode_reward)

    def print_rewards(self, episode, print_epsilon=True, print_q_table=True):
        # print("Episode ", episode + 1)
        print("Total (discounted) reward of this episode: ", self.episode_total_rewards[episode])
        print("Average total reward over all episodes until now: ", self.average_episode_total_rewards[-1])

        print("Epsilon:", self.epsilon) if print_epsilon else None
        print("Q-table: ", self.qtable) if print_q_table else None


In [None]:
class StaticNormalFormGame(gym.Env):
    def __init__(self, reward_matrix, opponent_strategy):
        super(StaticNormalFormGame, self).__init__()

        self.opponent_strategy = opponent_strategy
        self.agent_rewards = []
        self.opponent_rewards = []

        self.action_space = spaces.Discrete(2)
        self.observation_space = spaces.Discrete(1)  # Stateless

        self.reward_matrix = reward_matrix
        self.prev_action = None

    def reset(self):
        info = {}
        return 0, info

    def step(self, action):
        if action not in [0, 1]:
            raise ValueError("Invalid action. Must be 0 or 1.")

        opponent_action = self.opponent_strategy(self.prev_action)

        # Fetch rewards from matrix
        agent_reward, opponent_reward = self.reward_matrix[action, opponent_action]

        # Info dictionary containing opponent's reward and action
        info = {'opponent_reward': opponent_reward, 'opponent_action': opponent_action}

        # Update prev_action for tit-for-tat or other strategies
        self.prev_action = action

        # Single-state game, so new state is still 0, and the episode is terminated
        new_state = 0
        terminated = True
        truncated = False
        return new_state, agent_reward, terminated, truncated, info

In [None]:
# Prisoner's Dilemma
# Players can either Cooperate (C) or Defect (D).
# If both cooperate, both get a moderate reward (3,3).
# If one defects and the other cooperates, the defector gets a high reward (5,0 or 0,5).
# Mutual defection gives both a low reward (1,1).
PD_MATRIX = np.array([[(3, 3), (0, 5)],
                      [(5, 0), (1, 1)]])

# Stag Hunt
# Both players benefit the most if they both hunt the stag (4,4).
# If one hunts a rabbit while the other hunts a stag, the rabbit hunter gets a small reward, and the stag hunter gets nothing (3,0 or 0,3).
# Both hunting rabbits gives a smaller but safe reward (2,2).
STAG_HUNT_MATRIX = np.array([[(4, 4), (0, 3)],
                             [(3, 0), (2, 2)]])

# Chicken (Hawk-Dove game)
# Both players get a moderate reward if they both swerve (2,2).
# If one swerves while the other goes straight, the straight driver gets a big reward, while the swerver gets nothing (0,3 or 3,0).
# If both go straight, they crash and both get negative rewards (-1, -1).
CHICKEN_MATRIX = np.array([[(2, 2), (0, 3)],
                           [(3, 0), (-1, -1)]])

# Toin-Coss (Matching Pennies)
# Each player chooses heads (C) or tails (D).
# If both choose heads, both win (1,1). If they mismatch, one wins, and the other loses (1,0 or 0,1).
# If both pick tails, both get nothing (0,0).
TOIN_COSS_MATRIX = np.array([[(1, 1), (0, 1)],
                             [(1, 0), (0, 0)]])

# Battle of the Sexes
# Players try to coordinate on an activity (opera or football).
# One player prefers opera (2,1), and the other prefers football (1,2).
# If they fail to coordinate, both get zero (0,0).
BATTLE_OF_SEXES_MATRIX = np.array([[(2, 1), (0, 0)],
                                   [(0, 0), (1, 2)]])

# Matching Pennies
# A zero-sum game where the players choose heads (H) or tails (T).
# One player wins if the choices match, and the other wins if they differ.
# If both pick heads or both pick tails, Player 1 wins (1,-1).
# If one picks heads and the other tails, Player 2 wins (-1, 1).
MATCHING_PENNIES_MATRIX = np.array([[(1, -1), (-1, 1)],
                                    [(-1, 1), (1, -1)]])


# Opponent Strategies
def cooperate(_):
    return 0

def defect(_):
    return 1

def tit_for_tat(prev_action):
    if prev_action is None:
        return 0
    return prev_action

def mixed(_):
    return np.random.choice([0, 1], p=[0.4, 0.6])  # Example mixed strategy with probabilities


# define the environment:
reward_matrix = BATTLE_OF_SEXES_MATRIX
opponent_strategy = mixed
env = StaticNormalFormGame(reward_matrix=reward_matrix, opponent_strategy=opponent_strategy)

# define our agent that will play against the opponents
learning_rate = 0.1
gamma = 0.95  # will not matter here, non-episodic games
epsilon = 1.0
epsilon_min = 0.01
epsilon_decay = 0.99
qlearner = Qlearner(
    action_size=env.action_space.n,
    state_size=env.observation_space.n,
    learning_rate=learning_rate,
    gamma=gamma,
    epsilon=epsilon,
    epsilon_min=epsilon_min,
    epsilon_decay=epsilon_decay
  )

num_episodes = 400
for episode in range(num_episodes):
    state, info = env.reset()
    terminated = False
    truncated = False
    while not terminated:
        action = qlearner.select_action(state)
        new_state, reward, terminated, truncated, info = env.step(action)
        qlearner.update(state, action, new_state, reward, terminated)
        state = new_state  # just stays 0, the environment is stateless
qlearner.print_rewards(num_episodes-1)

Total (discounted) reward of this episode:  2
Average total reward over all episodes until now:  0.8125000000000007
Epsilon: 0.017950553275045134
Q-table:  [[1.10563018 0.44092966]]
