In [158]:
import numpy as np
import random
import matplotlib.pyplot as plt
import copy
import gymnasium as gym
from gymnasium import spaces
from pettingzoo import ParallelEnv

num_agents = 10                   
num_rounds = 200                    
b = [2,5,10]             # Benefit (focus on b/c=5 as in paper)
c = 1                    # Cost of cooperation
chi = 0.01              # Reputation assignment error

# Learning parameters
learning_rate = 0.01
gamma = 0.99
epsilon = 0.1
num_episodes = 1000

num_seeds = 20           # Number of random seeds for each experiment

In [159]:
class MatrixGame(ParallelEnv):
    metadata = {'render_modes': ['human']}

    def __init__(self, reward_matrix, agents):
        self.agents = agents 
        self.possible_agents = self.agents[:]
        self.reward_matrix = reward_matrix
        self.norm = [0, 0, 0, 0]

    def reset(self, seed=None, options=None):
        self.agents = self.possible_agents[:]
        self.rewards = {agent: 0.0 for agent in self.agents}
        self.states = {agent: np.random.choice([0,1]) for agent in self.agents}

        return self.states
    
    def get_action_rules(self, action_rule):
        # Convert rule_id to 4-bit binary
        bits = [(action_rule >> i) & 1 for i in range(4)]  # bits[0]=LSB, bits[3]=MSB
        return bits

    def determine_action(self, agent1, agent2, action_rules):
        state1 = self.states[agent1]
        state2 = self.states[agent2]

        action_rule1 = self.get_action_rules(action_rules[agent1])
        action_rule2 = self.get_action_rules(action_rules[agent2])

        action1 = self.action_func(action_rule1, state1, state2)
        action2 = self.action_func(action_rule2, state2, state1)

        return action1, action2
            
    
    def action_func(self, action_rule, focal_state, opponent_state):
        if focal_state == 0 and focal_state == 0:
            return action_rule[3]  # Bit 3
        elif focal_state == 0 and opponent_state == 1:
            return action_rule[2]  # Bit 2
        elif focal_state == 1 and opponent_state == 0:
            return action_rule[1]  # Bit 1
        else:  # (1,1)
            return action_rule[0]  # Bit 0

    def determine_state(self, focal_action, opponent_state):
        if focal_action == 0 and opponent_state == 0:
            return self.norm[3]  # Bit 3
        elif focal_action == 0 and opponent_state == 1:
            return self.norm[2]  # Bit 2
        elif focal_action == 1 and opponent_state == 0:
            return self.norm[1]  # Bit 1
        else:  # (1,1)
            return self.norm[0]  # Bit 0

    def step(self, action_rules):
        pairings = []
        players = self.agents.copy()
        for _ in range(num_agents//2):
            index = random.randrange(len(players))
            elem1 = players.pop(index)

            index = random.randrange(len(players))
            elem2 = players.pop(index)

            pairings.append((elem1, elem2))
            

        for pair in pairings:
            action1, action2 = self.determine_action(pair[0], pair[1], action_rules)

            reward1 = self.reward_matrix[action1][action2]
            reward2 = self.reward_matrix[action2][action1]

            self.rewards[pair[0]] = reward1
            self.rewards[pair[1]] = reward2

            state1 = self.determine_state(action1, self.states[pair[1]])
            state2 = self.determine_state(action2, self.states[pair[0]])

            if (random.random() < chi):
                state1 = 1-state1
            if (random.random() < chi):
                state2 = 1-state2

            self.states[pair[0]] = state1
            self.states[pair[1]] = state2

        return self.states, self.rewards

In [160]:
class Qlearner:
    """A Q-learning agent"""

    def __init__(
        self,
        action_size=16,
        state_size=2,
        learning_rate=0.01,
        gamma=0.99,
        epsilon=0.1,
    ):
        self.action_size = action_size
        self.state_size = state_size

        # initialize the Q-table: (State x Agent Action)
        self.qtable = np.zeros((self.state_size, self.action_size))

        self.learning_rate = learning_rate
        self.gamma = gamma  # discount factor
        self.epsilon = epsilon # exploration

        # tracking rewards/progress:
        self.rewards_this_episode = []  # during an episode, save every time step's reward
        self.episode_total_rewards = []  # each episode, sum the rewards, possibly with a discount factor
        self.average_episode_total_rewards = []  # the average (discounted) episode reward to indicate progress

        self.state_history = []
        self.action_history = []

    def reset_agent(self):
        self.qtable = np.zeros((self.state_size, self.action_size))

    def select_greedy(self, state):
        # np.argmax(self.qtable[state]) will select first entry if two or more Q-values are equal, but we want true randomness:
        return np.random.choice(np.flatnonzero(np.isclose(self.qtable[state], self.qtable[state].max())))

    def select_action(self, state):
        if np.random.rand() < self.epsilon:
            action = random.randrange(self.action_size)
        else:
            action = self.select_greedy(state)
        self.state_history.append(state)
        self.action_history.append(action)
        return action

    def update(self, state, action, new_state, reward, done, update_epsilon=True):
        lr = self.learning_rate
        self.qtable[state, action] += lr * (reward + (not done) * self.gamma * np.max(self.qtable[new_state]) - self.qtable[state, action])

        self.rewards_this_episode.append(reward)

        if done:
            # track total reward:
            episode_reward = self._calculate_episode_reward(self.rewards_this_episode, discount=False)
            self.episode_total_rewards.append(episode_reward)

            k = len(self.average_episode_total_rewards) + 1  # amount of episodes that have passed
            self._calculate_average_episode_reward(k, episode_reward)
            
            # reset the rewards for the next episode:
            self.rewards_this_episode = []

    def _calculate_episode_reward(self, rewards_this_episode, discount=False):
        if discount:
            return sum([self.gamma**i * reward for i, reward in enumerate(rewards_this_episode)])
        return sum(rewards_this_episode)

    def _calculate_average_episode_reward(self, k, episode_reward):
        if k > 1:  # running average is more efficient:
            average_episode_reward = (1 - 1 / k) * self.average_episode_total_rewards[-1] + episode_reward / k
        else:
            average_episode_reward = episode_reward
        self.average_episode_total_rewards.append(average_episode_reward)

    def print_rewards(self, episode, print_epsilon=True, print_q_table=True):
        # print("Episode ", episode + 1)
        print("Total (discounted) reward of this episode: ", self.episode_total_rewards[episode])
        print("Average total reward over all episodes until now: ", self.average_episode_total_rewards[-1])

        print("Epsilon:", self.epsilon) if print_epsilon else None
        print("Q-table: ", self.qtable) if print_q_table else None

In [None]:
payoff = np.array([[0, b[2]],[-c, b[2]-c]])

run = 0
np.random.seed(run)
random.seed(run)

agents = [Qlearner() for _ in range(10)]
env = MatrixGame(payoff, agents)

for episode in range(num_episodes):
    print(f"Number of episodes: {episode}")
    obs = env.reset()
    for round in range(num_rounds):
        action_rules = {agent: agent.select_action(obs[agent]) for agent in agents}
        next_obs, rewards = env.step(action_rules)

        for agent in agents:
            if round == num_rounds-1:
                agent.update(obs[agent], action_rules[agent], next_obs[agent], rewards[agent], done=True)
            else:
                agent.update(obs[agent], action_rules[agent], next_obs[agent], rewards[agent], done=False)
        obs = next_obs

average_agent_round_payoff = np.zeros(num_episodes//2)
for agent in agents:
    average_round_payoff = np.array(agent.episode_total_rewards[num_episodes//2:])/num_rounds
    print(average_round_payoff)
    average_agent_round_payoff += average_round_payoff


average_agent_round_payoff = (average_agent_round_payoff.sum()/(10*(num_episodes//2)))/(b[2]-c)

print(average_agent_round_payoff)

[7.695 7.345 7.65  7.6   7.41  7.3   7.415 7.295 7.195 7.35  7.515 7.39
 7.38  7.77  7.455 7.39  7.61  7.55  7.475 7.915 7.055 7.57  6.905 8.07
 7.55  7.265 7.365 7.7   7.55  7.64  7.305 7.765 6.555 5.97  6.385 4.825
 5.91  6.38  5.78  6.31  5.505 6.51  5.065 5.29  5.395 6.62  5.445 5.7
 6.335 5.455 5.525 4.435 4.63  5.065 6.34  6.25  4.465 4.045 5.46  3.015
 2.3   3.18  2.25  4.975 1.775 3.825 2.83  6.605 3.54  4.18  4.695 3.975
 7.03  6.825 3.575 4.39  2.9   4.945 3.07  7.045 3.085 2.465 4.48  3.385
 6.115 6.005 3.    3.52  5.135 7.03  4.95  6.245 5.605 3.45  3.87  7.765
 6.085 4.63  3.33  3.95  4.605 2.325 2.68  4.41  2.73  2.94  3.485 3.54
 3.22  5.91  4.845 1.745 7.015 3.295 5.585 4.12  3.855 1.78  2.69  3.305
 3.19  3.96  3.565 3.02  3.765 2.955 7.5   7.76  7.45  6.36  5.815 4.15
 5.76  6.47  1.515 3.12  2.815 2.725 8.24  5.85  7.115 4.4   3.74  5.395
 4.33  2.625 5.955 2.76  3.875 2.255 3.365 2.215 3.505 5.38  3.735 3.805
 3.395 2.06  2.95  2.63  3.02  3.55  3.605 3.735 3.565 6.