In [3]:
import gymnasium as gym

In [4]:
# make an environment

class SchedulerEnvironment(gym.Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 4}
    
    def __init__(self, render_mode=None, size=5, scenario_one = True):
        self.size = size  # The size of the square grid
        self.window_size = 512  # The size of the PyGame window
        self.scenario_one = scenario_one

        self.one_queue_size = 0.0
        self.two_queue_size = 0.0
        self.best_effort_queue_size = 0.0 
        
        PRIORITY_ONE_DELAY = 10
        PRIORITY_TWO_DELAY = 10
        BEST_EFFORT_DELAY = 10

        PRIORITY_ONE_SIZE = 10
        PRIORITY_TWO_SIZE = 10
        BEST_EFFORT_SIZE = 10
                
        self.observation_space = spaces.Dict( 
            {
                
                "queues": MultiDiscrete([
                    PRIORITY_ONE_SIZE,
                    PRIORITY_TWO_SIZE,
                    BEST_EFFORT_SIZE
                ]),
                
                "delay_counters": MultiDiscrete([
                    PRIORITY_ONE_DELAY,
                    PRIORITY_TWO_DELAY,
                    BEST_EFFORT_DELAY
                ]),
                
#                 boolean for timeslot delay during switch 
#                 zero for no delay, one for delay. 
                "switchCounter": Discrete([2])
                
            }
        )
        
        
        self.action_space = spaces.Discrete([3])
        
        self._action_to_queue = {
                0: self.observation_space["queues"][0],
                1: self.observation_space["queues"][1],
                2: self.observation_space["queues"][2]
            }


        assert render_mode is None or render_mode in self.metadata["render_modes"]
        self.render_mode = render_mode
        
        """
        If human-rendering is used, `self.window` will be a reference
        to the window that we draw to. `self.clock` will be a clock that is used
        to ensure that the environment is rendered at the correct framerate in
        human-mode. They will remain `None` until human-mode is used for the
        first time.
        """
        self.window = None
        self.clock = None
                
    def _increment_size(self):
        
        PACKET_ARRIVAL_ONE = 0.3
        PACKET_ARRIVAL_TWO = 0.25
        PACKET_ARRIVAL_BEST_EFFORT = 0.4
        
        self.one_queue_size += PACKET_ARRIVAL_ONE
        if self.one_queue_size >= 1.0:
            self.one_queue_size -= 1
            self.observation_space["queues"][0] += 1
                
        self.two_queue_size += PACKET_ARRIVAL_TWO
        
        if self.two_queue_size >= 1.0:
            self.two_queue_size -= 1
            self.observation_space["queues"][1] += 1

        self.best_effort_queue_size += PACKET_ARRIVAL_BEST_EFFORT

        if self.best_effort_queue_size >= 1.0:
            self.best_effort_queue_size -= 1
            self.observation_space["queues"][2] += 1

        return

    def _increment_delay(self):
        # TODO: implement logic here, not sure what to do
        return

    def _modify_states(self):
        self._increment_size()
        self._increment_delay()
        return
    
    def _get_obs(self):
        return {
            "queues": self._observation_space["queues"],
            "delay_counters": self._observation_space["delay_counters"]
        }
    
    def _get_info(self):
        return {
#                 TODO: return info of some kind.
        }
    
    def reset(self, seed=None, options=None):
        # We need the following line to seed self.np_random
        super().reset(seed=seed)

#             TODO: randomise queues 
        self.one_queue_size = 4.0
        self.two_queue_size = 5.0
        self.best_effort_queue_size = 6.0
        
        self.observation_space["queues"][0] = (int) (self.one_queue_size)
        self.observation_space["queues"][1] = (int) (self.two_queue_size)
        self.observation_space["queues"][2] = (int) (self.best_effort_queue_size)
        
        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human":
            self._render_frame()

        return observation, info
    
    def step(self, action):
        # TODO: determine reward function 
        reward = 0
        if self.scenario_one:
            # remove the packet. 
            self._action_to_queue[action] -= 1

            # determine reward
 
        else: 
            # steps should be 0 1 or 2. 
            if (action == currentState):
                # remove packet from queue
                self._action_to_queue[action] -= 1

                # determine reward


            else:
                # switch queue 
                currentState = action

                # determine reward
        
        self._modify_states()

        # TODO: consider if observation made before or after modifying states. 
        observation = self._get_obs()
        info = self._get_info()

        if self.render_mode == "human":
            self._render_frame()

        return observation, reward, terminated, False, info
    
    def render(self):
        if self.render_mode == "rgb_array":
            return self._render_frame()

    def _render_frame(self):
        if self.window is None and self.render_mode == "human":
            pygame.init()
            pygame.display.init()
            self.window = pygame.display.set_mode(
                (self.window_size, self.window_size)
            )
        if self.clock is None and self.render_mode == "human":
            self.clock = pygame.time.Clock()

        canvas = pygame.Surface((self.window_size, self.window_size))
        canvas.fill((255, 255, 255))
        pix_square_size = (
            self.window_size / self.size
        )  # The size of a single grid square in pixels

        # TODO: draw queues 

        # Finally, add some gridlines
        for x in range(self.size + 1):
            pygame.draw.line(
                canvas,
                0,
                (0, pix_square_size * x),
                (self.window_size, pix_square_size * x),
                width=3,
            )
            pygame.draw.line(
                canvas,
                0,
                (pix_square_size * x, 0),
                (pix_square_size * x, self.window_size),
                width=3,
            )

        if self.render_mode == "human":
            # The following line copies our drawings from `canvas` to the visible window
            self.window.blit(canvas, canvas.get_rect())
            pygame.event.pump()
            pygame.display.update()

            # We need to ensure that human-rendering occurs at the predefined framerate.
            # The following line will automatically add a delay to keep the framerate stable.
            self.clock.tick(self.metadata["render_fps"])
        else:  # rgb_array
            return np.transpose(
                np.array(pygame.surfarray.pixels3d(canvas)), axes=(1, 0, 2)
            )
    
    def close(self):
        if self.window is not None:
            pygame.display.quit()
            pygame.quit()
