In [9]:
import gym
from gym import spaces
import numpy as np

class PacketSchedulingEnv(gym.Env):
    metadata = {"render.modes": ["human"]}

    def __init__(self, max_queue_size=50, max_delay=10, seed=None):
        super(PacketSchedulingEnv, self).__init__()

        # Traffic classes: 0 - Video, 1 - Voice, 2 - Best Effort
        self.num_queues = 3
        self.max_queue_size = max_queue_size
        self.max_delay = max_delay

        self.qos_targets = [3, 5, None]  # Video < 3, Voice < 5, Best-effort has no strict delay

        self.queues = [[] for _ in range(self.num_queues)]
        self.time = 0
        self.rng = np.random.default_rng(seed)

        # Observation space: queue lengths + average delays (cap at max_delay)
        self.observation_space = spaces.Box(
            low=0, high=max_queue_size,
            shape=(self.num_queues * 2,), dtype=np.float32
        )

        # Action space: select one of the queues to serve
        self.action_space = spaces.Discrete(self.num_queues)

    def reset(self):
        self.queues = [[] for _ in range(self.num_queues)]
        self.time = 0
        return self._get_state()

    def _get_state(self):
        state = []
        for q in self.queues:
            lengths = len(q)
            if lengths > 0:
                delays = [self.time - t for t in q]
                avg_delay = np.mean(delays)
            else:
                avg_delay = 0
            state.extend([lengths, min(avg_delay, self.max_delay)])
        return np.array(state, dtype=np.float32)

    def _generate_arrivals(self):
        # Poisson arrivals — tweak rates as needed
        arrival_rates = [0.6, 0.5, 1.0]
        for i in range(self.num_queues):
            if self.rng.random() < arrival_rates[i] and len(self.queues[i]) < self.max_queue_size:
                self.queues[i].append(self.time)

    def step(self, action):
        reward = 0
        done = False

        self._generate_arrivals()
        self.time += 1

        if len(self.queues[action]) > 0:
            arrival_time = self.queues[action].pop(0)
            delay = self.time - arrival_time

            # QoS reward
            target = self.qos_targets[action]
            if target is not None:
                if delay <= target:
                    reward += 1
                else:
                    reward -= 2
            else:
                # best-effort: reward is inverse delay
                reward += max(0, 1.0 - delay / self.max_delay)
        else:
            # Penalty for selecting an empty queue
            reward -= 1

        state = self._get_state()
        return state, reward, done, {}

    def render(self, mode="human"):
        for i, q in enumerate(self.queues):
            print(f"Queue {i}: {[self.time - t for t in q]}")



In [10]:
env = PacketSchedulingEnv()

In [11]:
state = env.reset()
for _ in range(20):
    action = env.action_space.sample()
    next_state, reward, done, _ = env.step(action)
    env.render()
    print(f"Action: {action}, Reward: {reward}")


Queue 0: [1]
Queue 1: []
Queue 2: [1]
Action: 1, Reward: 1
Queue 0: [2]
Queue 1: []
Queue 2: [2, 1]
Action: 1, Reward: -1
Queue 0: [1]
Queue 1: [1]
Queue 2: [3, 2, 1]
Action: 0, Reward: 1
Queue 0: [2, 1]
Queue 1: [2, 1]
Queue 2: [3, 2, 1]
Action: 2, Reward: 0.6
Queue 0: [2, 1]
Queue 1: [3, 2]
Queue 2: [4, 3, 2, 1]
Action: 0, Reward: 1
Queue 0: [3, 2, 1]
Queue 1: [4, 3, 1]
Queue 2: [4, 3, 2, 1]
Action: 2, Reward: 0.5
Queue 0: [4, 3, 2, 1]
Queue 1: [4, 2]
Queue 2: [5, 4, 3, 2, 1]
Action: 1, Reward: 1
Queue 0: [4, 3, 2]
Queue 1: [5, 3, 1]
Queue 2: [6, 5, 4, 3, 2, 1]
Action: 0, Reward: -2
Queue 0: [5, 4, 3]
Queue 1: [6, 4, 2]
Queue 2: [6, 5, 4, 3, 2, 1]
Action: 2, Reward: 0.30000000000000004
Queue 0: [5, 4, 1]
Queue 1: [7, 5, 3]
Queue 2: [7, 6, 5, 4, 3, 2, 1]
Action: 0, Reward: -2
Queue 0: [6, 5, 2, 1]
Queue 1: [8, 6, 4]
Queue 2: [7, 6, 5, 4, 3, 2, 1]
Action: 2, Reward: 0.19999999999999996
Queue 0: [7, 6, 3, 2, 1]
Queue 1: [9, 7, 5]
Queue 2: [7, 6, 5, 4, 3, 2, 1]
Action: 2, Reward: 0.19999