In [1]:
!pip install gymnasium
!pip install pettingzoo

Collecting gymnasium
  Downloading gymnasium-0.27.1-py3-none-any.whl (883 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m12.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting gymnasium-notices>=0.0.1
  Downloading gymnasium_notices-0.0.1-py3-none-any.whl (2.8 kB)
Collecting jax-jumpy>=0.2.0
  Downloading jax_jumpy-0.2.0-py3-none-any.whl (11 kB)
Collecting typing-extensions>=4.3.0
  Downloading typing_extensions-4.5.0-py3-none-any.whl (27 kB)
Installing collected packages: gymnasium-notices, typing-extensions, jax-jumpy, gymnasium
  Attempting uninstall: typing-extensions
    Found existing installation: typing_extensions 4.1.1
    Uninstalling typing_extensions-4.1.1:
      Successfully uninstalled typing_extensions-4.1.1
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
tensorflow-io 0.21.0 requires te

In [2]:
import os
import pandas as pd
import numpy as np

In [3]:
import functools

import copy

import gymnasium
import numpy as np
from gymnasium.spaces import Discrete, Box, Sequence

from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector, wrappers

from enum import Enum
import random

In [4]:
SIMULATION_LENGTH = 2000

NOOP = 5
MOVES = [0, 1, 2, 3, 4, NOOP]

class MachineState(Enum):
    FREE = 0
    LOW = 1
    MED = 2
    HIGH = 3

In [5]:
# Reward and job definitions:
# 3 different job types: Low, Medium, High (L jobs are shorter, and H jobs take longer)
alpha_L, alpha_M, alpha_H = 0.7, 0.3, 0.1 # Probability of a job completing at any particular timestep
R_L, R_M, R_H = 20, 50, 80 # Rewards for each job type


In [6]:

def make_env(render_mode=None):
    """
    The env function often wraps the environment in wrappers by default.
    You can find full documentation for these methods
    elsewhere in the developer documentation.
    """
    internal_render_mode = render_mode if render_mode != "ansi" else "human"
    env = raw_env(render_mode=internal_render_mode)
    # This wrapper is only for environments which print results to the terminal
    if render_mode == "ansi":
        env = wrappers.CaptureStdoutWrapper(env)
    # this wrapper helps error handling for discrete action spaces
#     env = wrappers.AssertOutOfBoundsWrapper(env) # Do we need this anymore? It causes errors...
    # Provides a wide vareity of helpful user errors
    # Strongly recommended
    env = wrappers.OrderEnforcingWrapper(env)
    return env


class raw_env(AECEnv):
    ##### Helper Functions #####
    def _generate_workload(self):
        # Equal chance of generating a Low, Medium or High job, or no job
        population = [MachineState.LOW, MachineState.MED, MachineState.HIGH, MachineState.FREE]
        weights = [0.25, 0.25, 0.25, 0]
        return random.choices(population=population, weights=weights)[0]
    
    ##### Environment Definitions #####
    """
    The metadata holds environment constants. From gymnasium, we inherit the "render_modes",
    metadata which specifies which modes can be put into the render() method.
    At least human mode should be supported.
    The "name" metadata allows the environment to be pretty printed.
    """
    metadata = {"render_modes": ["human"], "name": "dmrs_v0"}

    def __init__(self, render_mode=None):
        """
        The init method takes in environment arguments and
         should define the following attributes:
        - possible_agents
        - action_spaces
        - observation_spaces
        These attributes should not be changed after initialization.
        """
        self.possible_agents = ["datacentre_" + str(r) for r in range(2)]
        self.agent_name_mapping = dict(
            zip(self.possible_agents, list(range(len(self.possible_agents))))
        )
        # Encodes the entire system state for all timesteps
        self.machine_states = [
            [[MachineState.FREE for _ in range(5)] for _ in range(2)] for _ in range(SIMULATION_LENGTH)
        ]
        # Encodes the list of jobs arriving at all timesteps
        self.job_queues = [
            [[] for _ in range(2)] for _ in range(SIMULATION_LENGTH)
        ]

        # Allocate to 5 machines, allocate to remote, NOOP
        self._action_spaces = { agent: Sequence(Discrete(5 + 1 + 1)) for agent in self.possible_agents } # Is this needed?

        # 5 machines, workload, workload_reward
        single_agent_observation_space = Box(low=-100, high=100, dtype=np.int32, shape=(5 + 1 + 1, )) # This is wrong now!
        self._observation_spaces = { agent: single_agent_observation_space for agent in self.possible_agents } # This is wrong now!
        self.render_mode = render_mode

    # this cache ensures that same space object is returned for the same agent
    # allows action space seeding to work as expected
    @functools.lru_cache(maxsize=None)
    def observation_space(self, agent):
        # 5 machines, workload, workload_reward
        return Box(low=-100, high=100, dtype=np.int32, shape=(5 + 1 + 1, )) # This is wrong now!

    @functools.lru_cache(maxsize=None)
    def action_space(self, agent):
        # Allocate to 5 machines, allocate to remote, NOOP for each action per job
        job_queue = self._get_state[agent]['job_queue']
        return MultiDiscrete(np.array([7 for _ in range(len(job_queue))]))

    def render(self):
        """
        Renders the environment. In human mode, it can print to terminal, open
        up a graphical window, or open up some other display that a human can see and understand.
        """
        if self.render_mode is None:
            gymnasium.logger.warn(
                "You are calling render method without specifying any render mode."
            )
            return

        if len(self.agents) == 2:
            state = self._get_state()
            string = f"Current state: datacentre_0: {state[self.agents[0]]} , datacentre_1: {state[self.agents[1]]}"
        else:
            string = "Game over"
        print(string)

    def observe(self, agent):
        """
        Observe should return the observation of the specified agent. This function
        should return a sane observation (though not necessarily the most up to date possible)
        at any time after reset() is called.
        """
        # observation of one agent is the previous state of the other
        return self.observations[agent]

    def close(self):
        """
        Close should release any graphical displays, subprocesses, network connections
        or any other environment data which should not be kept around after the
        user is no longer using the environment.
        """
        pass
    
    def _get_state(self):
        # machine_states: 0 if free, 1 if occupied
        # job_queue: list of jobs
        state = {
            agent: {
                "machine_states": [0 if machine_state == MachineState.FREE else 1 for machine_state in self.machine_states[self.curr_timestep][self.agent_name_mapping[agent]]],
                "job_queue": self.job_queues[self.curr_timestep][self.agent_name_mapping[agent]]
            } for agent in self.agents
        }
        return state

    def reset(self, seed=None, return_info=False, options=None):
        """
        Reset needs to initialize the following attributes
        - agents
        - rewards
        - _cumulative_rewards
        - terminations
        - truncations
        - infos
        - agent_selection
        And must set up the environment so that render(), step(), and observe()
        can be called without issues.
        Here it sets up the state dictionary which is used by step() and the observations dictionary which is used by step() and observe()
        """
        # Encodes the entire system state for all timesteps
        self.machine_states = [
            [[MachineState.FREE for _ in range(5)] for _ in range(2)] for _ in range(SIMULATION_LENGTH)
        ]
        # Encodes the list of jobs arriving at all timesteps
        self.job_queues = [
            [[] for _ in range(2)] for _ in range(SIMULATION_LENGTH)
        ]
        self.curr_timestep = 0
        self.agents = self.possible_agents[:]
        self.rewards = {agent: 0 for agent in self.agents}
        self._cumulative_rewards = {agent: 0 for agent in self.agents}
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}
        self.actions = {agent: [] for agent in self.agents}
        
        # Try and generate a job for each data centre
        for agent in self.agents:
            agent_num = self.agent_name_mapping[agent]
            new_job = self._generate_workload()
            if new_job != MachineState.FREE:
                self.job_queues[self.curr_timestep][agent_num].append(new_job)
        
        self.observations = self._get_state()
                
        """
        Our agent_selector utility allows easy cyclic stepping through the agents list.
        """
        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()

    def step(self, action):
        """
        step(action) takes in an action for the current agent (specified by
        agent_selection) and needs to update
        - rewards
        - _cumulative_rewards (accumulating the rewards)
        - terminations
        - truncations
        - infos
        - agent_selection (to the next agent)
        And any internal state used by observe() or render()
        """
        if (
            self.terminations[self.agent_selection]
            or self.truncations[self.agent_selection]
        ):
            # handles stepping an agent which is already dead
            # accepts a None action for the one agent, and moves the agent_selection to
            # the next dead agent,  or if there are no more dead agents, to the next live agent
            self._was_dead_step(action)
            return

        agent = self.agent_selection
        
        # If the agent is first, we should reset all state for a clean observation this time step
        if self._agent_selector.is_first():
            self.actions = {agent: [] for agent in self.agents}

            # no rewards are allocated until both players give an action
            self._clear_rewards()

        # the agent which stepped last had its _cumulative_rewards accounted for
        # (because it was returned by last()), so the _cumulative_rewards for this
        # agent should start again at 0
        self._cumulative_rewards[agent] = 0

        # stores action of current agent
        self.actions[agent] = action # action is a list of actions for each job in the job queue

        # collect reward if it is the last agent to act
        if self._agent_selector.is_last():
            # TODO: Process actions for each agent
            # TODO: Workloads are completed based on their alpha values
            # TODO: Calculate rewards for all agents. Placeholder:
            self.rewards[self.agents[0]] = 1
            self.rewards[self.agents[1]] = 2
            
            # The truncations dictionary must be updated for all players.
            self.truncations = {
                agent: self.curr_timestep >= SIMULATION_LENGTH for agent in self.agents
            }
            
            self.curr_timestep += 1
            if self.curr_timestep < SIMULATION_LENGTH:
                # Observations at the start of the next timestep
                self.observations = self._get_state() # Is this wrong??
                
                # Try and generate a job for each data centre for the coming timestep
                for agent in self.agents:
                    agent_num = self.agent_name_mapping[agent]
                    new_job = self._generate_workload()
                    if new_job != MachineState.FREE:
                        self.job_queues[self.curr_timestep][agent_num].append(new_job)
            
                

        # selects the next agent.
        self.agent_selection = self._agent_selector.next()
        # Adds .rewards to ._cumulative_rewards
        self._accumulate_rewards() # Not even sure if I need this

        if self.render_mode == "human":
            self.render()

print("Loaded environment")

Loaded environment


In [7]:
def policy(agent, state_observation, job):
    return np.random.randint(5 + 1 + 1)

In [8]:
NUM_EPISODES = 1

env = make_env()

steps_done = 0

episode_returns = {
    "datacentre_0": [],
    "datacentre_1": []
}

for i_episode in range(NUM_EPISODES):
    print("=====> episode ", i_episode+1, "/", NUM_EPISODES)
    env.reset()
    _observation, _reward, termination, truncation, _info = env.last()
    
    # Run an episode
    actions_taken = {
        "datacentre_0": None,
        "datacentre_1": None
    }
    state_observations = {
        "datacentre_0": env.observe("datacentre_0"),
        "datacentre_1": env.observe("datacentre_1")
    }
    for agent in env.agent_iter():
        print(f"Timestep: {env.env.curr_timestep}")
        if termination or truncation:
            env.step(None)
            continue
        
        # Use the policy to get an action for each job in the job queue
        job_queue = state_observations[agent]['job_queue']
        actions = [NOOP for _ in range(len(job_queue))]
        for i, job in enumerate(job_queue):
            actions[i] = action = policy(agent, state_observations[agent]['machine_states'], job)
        print(f"{agent} is taking actions {actions} for job queue {job_queue}")
        actions_taken[agent] = actions
        env.step(actions)
        
        _observation, _reward, termination, truncation, _info = env.last()
#         print(f"Observation: {observation}, reward: {reward}")


        if agent == "datacentre_1":
            print(f"Finished one step, now at timestep {env.env.curr_timestep}") # Should have advanced to the next timestep (t)
            # TODO: Update agents' policies
            print(f"Rewards (t-1): {env.rewards}")
            print(f"Actions taken (t-1): {actions_taken}")
            print(f"State Observations (t-1): {state_observations}") # previous state
            new_state_observations = {
                "datacentre_0": env.observe("datacentre_0"),
                "datacentre_1": env.observe("datacentre_1")
            }
            print(f"State Observations (t): {new_state_observations}")
            # state_observations + actions_taken + rewards => update policy with these 
            
            # Advance state
            state_observations = new_state_observations
            # Reset actions taken
            actions_taken = {
                "datacentre_0": None,
                "datacentre_1": None
            }

            print()

=====> episode  1 / 1
Timestep: 0
datacentre_0 is taking actions [2] for job queue [<MachineState.LOW: 1>]
Timestep: 0
datacentre_1 is taking actions [6] for job queue [<MachineState.LOW: 1>]
Finished one step, now at timestep 1
Rewards (t-1): {'datacentre_0': 1, 'datacentre_1': 2}
Actions taken (t-1): {'datacentre_0': [2], 'datacentre_1': [6]}
State Observations (t-1): {'datacentre_0': {'machine_states': [0, 0, 0, 0, 0], 'job_queue': [<MachineState.LOW: 1>]}, 'datacentre_1': {'machine_states': [0, 0, 0, 0, 0], 'job_queue': [<MachineState.LOW: 1>]}}
State Observations (t): {'datacentre_0': {'machine_states': [0, 0, 0, 0, 0], 'job_queue': [<MachineState.HIGH: 3>]}, 'datacentre_1': {'machine_states': [0, 0, 0, 0, 0], 'job_queue': [<MachineState.MED: 2>]}}

Timestep: 1
datacentre_0 is taking actions [4] for job queue [<MachineState.HIGH: 3>]
Timestep: 1
datacentre_1 is taking actions [3] for job queue [<MachineState.MED: 2>]
Finished one step, now at timestep 2
Rewards (t-1): {'datacentre