<a href="https://colab.research.google.com/github/sepidehfat/NMA-RL/blob/main/social_hierarchy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#==== import libs
import gym
from gym import spaces
import numpy as np
import matplotlib.pyplot as plt

from collections import deque
import random
import gym
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam
import os

In [None]:
#===== Define environment
class GridWorld(gym.Env):
    def __init__(self, grid_size=10):
        super(GridWorld, self).__init__()
        self.grid_size = grid_size
        self.num_tiles = self.grid_size * self.grid_size

        self.grid = np.ones((self.grid_size, self.grid_size), dtype=np.int8)
        self.food = 50
        self.oxygen = 100
        self.agent_position = (5,5)

        # Define action space and observation space
        self.action_space = spaces.Discrete(4)  # Up, Down, Left, Right
        self.observation_space = spaces.Dict({
            'grid': spaces.Box(low=0, high=1, shape=(self.grid_size, self.grid_size), dtype=np.int8),
            'food': spaces.Discrete(101),
            'oxygen': spaces.Discrete(101)
        })

    def reset(self):
        # Reset the environment to the initial state
        self.grid = np.ones((self.grid_size, self.grid_size), dtype=np.int8)
        self.food = 50
        self.oxygen = 100
        self.agent_position = (5, 5)

        return self._get_observation()

    def step(self, action):
        # Take an action in the environment
        self._move_agent(action)

        if self.in_water():
          self.food += 6
          self.oxygen -= 10

        else:
          self.food -= 3
          self.oxygen = 10

        # Calculate the reward
        reward = self._calculate_reward()

        # Check if episode is done (e.g., agent reaches goal, runs out of oxygen, etc.)
        done = self._is_episode_done()

        return self._get_observation(), reward, done, {}

    def render(self, mode='human'):
        # Visualize the environment state (optional)
        # Here, we print the grid with agent's position, food, and oxygen levels
        print(f"Grid:\n{self.grid}\n")
        print(f"Agent Position: {self.agent_position}")
        print(f"Food: {self.food}")
        print(f"Oxygen: {self.oxygen}")

    def close(self):
        # Perform any necessary cleanup or finalization steps (optional)
        pass

    def _move_agent(self, action):
        # Move the agent based on the selected action
        x, y = self.agent_position

        if action == 0:  # Up
            x -= 1
        elif action == 1:  # Down
            x += 1
        elif action == 2:  # Left
            y -= 1
        elif action == 3:  # Right
            y += 1

        # Check if the new position is within grid boundaries
        if 0 <= x < self.grid_size and 0 <= y < self.grid_size:
            self.agent_position = (x, y)

    def in_water(self):
        # Check if the agent is in water based on its position
        x, y = self.agent_position
        return self.grid[x, y] == 0

    def _calculate_reward(self):
        # Calculate the reward based on the state of food and oxygen attributes
        reward = 0

        if self.food <= 50:
            reward -= self.food / 50
        else:
            reward += (self.food - 50) / 50

        if self.oxygen < 100:
            reward -= 1 - (self.oxygen / 100)

        return reward

    def _is_episode_done(self):
        # Check if the episode is done based on some termination conditions
        return self.food <= 0 or self.oxygen <= 0

    def _get_observation(self):
        # Get the current observation
        observation = {
            'grid': self.grid,
            'food': self.food,
            'oxygen': self.oxygen
        }
        return observation

gym.register(
    id='GridWorld',
    entry_point=GridWorld
)

In [None]:
class DQNAgent:
    def __init__(self, state_shape, action_size):
        self.state_shape = state_shape
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95  # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

        self.food = 50

    def _build_model(self):
        model = Sequential()
        model.add(Dense(32, kernel_size=(3, 3), activation='relu', input_shape=self.state_shape))
        model.add(Dense(64, kernel_size=(3, 3), activation='relu'))
        # model.add(Flatten())
        model.add(Dense(128, activation='relu'))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(lr=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):# action selection, epsilon-greedy.
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])

    def update(self,state,action,reward,next_state): # I just copied the code from replay, doesn't if it will work
      target = reward + self.gamma * np.amax(self.model.predict(next_state)[0])
      target_f[0][action] = target
      self.model.fit(state,target_f,epochs = 1, verbose = 0)


    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma *
                          np.amax(self.model.predict(next_state)[0])) # Q-learning,model predict is output of all values
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0) # guess, do gradient descent according to target value
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)


In [None]:
# ==== train the DQN ====
# ---- initialization
Np = 10
players = DQNAgent(Np)
env = gym.make('GridWorld')
state = env.reset() # initial state
# we need a clear definition of the end of episode.

Neps = 1000
for eps in range(Neps):
  done = false
  while not done:
    action = []
    # get actions of all agent
    for p in range(Np):
      action.append(players[p].act(state))
    next_state, reward, done, ~ = env.step(action) # we need reward to be a list of size Np
    for p in range(Np): # the DQN should also be update in each step, not implement yet
      players[p].remember(state,action[i],reward[i],next_state, done)
    state = next_state
  # train the model by replay, leverage memory
  batch_size = 10 # try this first
  for p in range(Np):
    players[p].replay(batch_size)


SyntaxError: ignored

In [None]:
env = gym.make('GridWorld')
players = DQNAgent(10, 4)

# env.reset()
# env.step()
#env.render()

TypeError: ignored