<a href="https://colab.research.google.com/github/neven-x/Social-Hierarchy-RL/blob/main/RL_Social_Hierarchy_Demo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import colors

import gym
from gym import spaces

import functools
import random
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

from pettingzoo.utils.env import ParallelEnv


In [None]:
class Hierarchy_Grid(ParallelEnv):
    metadata = {
        "name": "Hierarchy Grid",
    }

    def __init__(self, grid_size, num_agents, max_iter):
        self.timestep = None
        self.grid_size = grid_size
        self.agents = np.arange(num_agents)
        self.agent_positions = None
        self.food_positions = None
        self.fight_probs = None
        self.rewards = {name: 0 for name in self.agents}

        self.observation_space = spaces.MultiDiscrete([self.grid_size, self.grid_size, self.num_agents + 1])
        self.action_space = spaces.Discrete(5)

        self.food_position_map = np.zeros((self.grid_size, self.grid_size))
        self.agent_position_maps = {}

    def reset(self):
        self.timestep = 0
        self.fight_probs = {name: np.zeros(self.num_agents) for name in self.agents}

        self.agent_positions = {}
        self.agent_position_maps = {}
        for agent in self.agents:

            agent_position = np.random.randint(0, self.grid_size, 2)
            self.agent_positions[agent] = agent_position

            agent_position_map = np.zeros((self.grid_size, self.grid_size))
            agent_position_map[agent_position] = 1
            self.agent_position_maps[agent] = agent_position_map

        self.food_positions = [np.random.randint(0, self.grid_size, 2)]
        self.food_position_map = np.zeros((self.grid_size, self.grid_size))
        self.food_position_map[self.food_positions[0]] = 1

        observations = np.stack(self.agent_position_maps.values())
        observations = np.stack([observations, self.food_position_map])

        observations = {name: observations for name in self.agents}
        return observations, {}

    def step(self, actions):

        self.timestep += 1

        for agent in self.agents:
            action = actions[agent]
            self.move_agent(agent, action)

        self.several_on_food_tile()

        if self.timestep > self.max_iter:
            terminations = {name: True for name in self.agents}
        else:
            terminations = {name: False for name in self.agents}

        # Add check if any agents have 0 food in which case they die

        observations = np.stack(self.agent_position_maps.values())
        observations = np.stack([observations, self.food_position_map])

        observations = {name: observations for name in self.agents}

        return observations, self.rewards, terminations, _, _


    def move_agent(self, agent, action):
        # Move the agent based on the selected action
        x, y = self.agent_positions[agent]

        if action == 0:  # Up
            x -= 1
        elif action == 1:  # Down
            x += 1
        elif action == 2:  # Left
            y -= 1
        elif action == 3:  # Right
            y += 1
        elif action == 4:  # Stay
            pass

        # Check if the new position is within grid boundaries
        if 0 <= x < self.grid_size and 0 <= y < self.grid_size:
            self.agent_positions[agent] = (x, y)


    def conflict(self, agent1, agent2):

        def sig(x):
            return 1 / (1 + np.exp(-x))

        # Agents make decision to fight or leave
        # 1 == fight, 0 == leave
        decision1 = bool(np.random.binomial(1, sig(self.stay_probs[agent1][agent2])))
        decision2 = bool(np.random.binomial(1, sig(self.stay_probs[agent2][agent1])))


        # Outcome of fight is determined in case both decide to stay
        outcome = np.random.binomial(1, 0.5)
        if outcome == 0:
            outcome = -1

        if not (decision1):
            self.relocate_agent(agent1)
        if not (decision2):
            self.relocate_agent(agent2)

        reward_dict = {(False, False): (0, 0),
                       (True, False): (5, 0),
                       (False, True): (0, 5),
                       (True, True): (5 * outcome, 5 * np.delete([-1,1], outcome))}

        # Allocate rewards based on decisions and fight outcome
        reward1, reward2 = reward_dict((decision1, decision2))
        self.reward[agent1] += reward1
        self.reward[agent2] += reward2

        # Update future staying probabilities
        lr = 0.01
        self.stay_prob[agent1][agent2] += lr * reward1 * (decision1 - sig(self.stay_prob[agent1][agent2]))
        self.stay_prob[agent2][agent1] += lr * reward2 * (decision2 - sig(self.stay_prob[agent2][agent1]))

    def several_on_food_tile(self):

        for food_tile in self.food_positions:
            agents_on_tile = [agent for agent, position in self.agent_positions.items() if position == food_tile]

        if len(agents_on_tile) > 1:

            pairs = zip(agents_on_tile[:-1], agents_on_tile[1:])

            for pair in pairs:
                
                self.conflict(pair[0], pair[1])


    def relocate_agent(self, agent):
        # Relocate the agent to an adjacent position
        agent_position = self.agent_positions[agent]
        #agent_position_map = self.agent_position_maps[agent]

        valid_position = False
        step = 1
        while not valid_position:

            # Generate moves
            possible_moves = np.array([[0, 1], [0, -1], [1, 0], [-1, 0]]) * step  # Right, Left, Down, Up

            # Check if any of new positions are valid (within gridworld and not already occupied)
            for move in possible_moves:
                new_position = tuple(map(sum, zip(agent_position, move)))

                if 0 <= new_position[0] < self.grid_size and 0 <= new_position[1] < self.grid_size:
                    if new_position not in self.agent_positions.values():

                        valid_position = True
                        self.agent_positions[agent] = new_position
                        agent_position_map = np.zeros((self.grid_size, self.grid_size))
                        agent_position_map[new_position] = 1
                        self.agent_positions_maps[agent] = agent_position_map

            step += 1

    def render(self):
        # colormap_food = colors.ListedColormap(["white","green"])
        # colormap_agent = colors.ListedColormap(["white","red"])

        fig = plt.figure(figsize=(5,5), frameon=False)

        plt.title("Grid World",size=13)
        plt.xticks(np.arange(0,self.greed_size,1))
        plt.yticks(np.arange(0,self.greed_size,1))

        agent_position_map = sum(self.agent_position_maps.values())

        plt.imshow(self.food_position_map, vmax = 2, cmap = 'Greens', alpha=0.4, extent=[0, 10, 0, 10])
        plt.imshow(agent_position_map, vmax = 2, cmap = 'Reds', alpha=0.4, extent=[0, 10, 0, 10])

        ax = plt.gca();
        ax.grid()

        plt.show()
        plt.figure()

gym.register(
    id='Hierarchy_Grid',
    entry_point=Hierarchy_Grid,
    kwargs={'grid_size': 10, 'num_agents': 10, 'max_iter': 200}
)

env = gym.make('Hierarchy_Grid')

  and should_run_async(code)
  logger.warn(f"Overriding environment {spec.id}")
  deprecation(
  deprecation(


In [None]:
class DQNAgent:
    def __init__(self, state_shape, action_size):
        self.state_shape = state_shape
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 1  # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        model = Sequential()
        model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=self.state_shape))
        model.add(Conv2D(64, kernel_size=(3, 3), activation='relu'))
        model.add(Flatten())
        model.add(Dense(128, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)


In [None]:
env = gym.make('GridWorld')

n_actions = env.action_space.n
state_shape = (10, 10, 3)

dqn = DQNAgent(state_shape, n_actions)

  logger.warn(
  deprecation(
  deprecation(
  super().__init__(name, **kwargs)
