# Install Prerequisites: python3.8+

In [44]:
# !pip install stable-baselines3[extra]
# # !pip install sb3-contrib

See Algorithms, Parameters and More:
1. https://stable-baselines3.readthedocs.io/en/master/index.html
2. https://sb3-contrib.readthedocs.io/en/master/index.html
3. https://github.com/DLR-RM/stable-baselines3/

# Define Custom Gym Environment

In [1]:
import gymnasium as gym
import numpy as np
from gymnasium import spaces

import simpy
import random
random.seed(42)
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder

import gc
gc.collect()
NUM_DATA = 1000
SLOW_POWER = 1
FAST_POWER = 10
SLOW_RATE = 5
FAST_RATE = 10
ARRIVAL_RATE = 1        #np.random.exponential(1/ARRIVAL_RATE)
MIN_LATENCY = 0.01
MAX_LATENCY = 0.1
MIN_COMPUTATION_REQ = 1
MAX_COMPUTATION_REQ = 1000
NUM_FAST_SERVERS = 2
NUM_SLOW_SERVERS = 10
NUM_SERVERS = NUM_FAST_SERVERS+NUM_SLOW_SERVERS
PRIORITIES = 3
COST_MUL = 0.0005 # multiplier of cost in Reward calculation

class Server:
    def __init__(self, env, slow_capacity, fast_capacity, slow_computing_power, fast_computing_power, slow_rate, fast_rate):
        self.env = env
        self.slow_capacity = slow_capacity
        self.fast_capacity = fast_capacity 
        self.slow_computing_power = slow_computing_power
        self.fast_computing_power = fast_computing_power
        self.slow_rate = slow_rate
        self.fast_rate = fast_rate
        self.slow_resource = simpy.PriorityResource(env, capacity=slow_capacity)
        self.fast_resource = simpy.PriorityResource(env, capacity=fast_capacity)
        
    def process_packet(self, packet, completion_event):
        if packet.server_assigned=="fast":
            yield self.env.timeout(packet.latencies[0])
            completion_event.succeed()
            computation_time = packet.computational_requirement / self.fast_computing_power
            packet.cost = packet.computational_requirement * self.fast_rate

            with self.fast_resource.request(priority=packet.priority) as req:
                yield req
                packet.wait_end_time = self.env.now
                yield self.env.timeout(computation_time)
                
        else:
            yield self.env.timeout(packet.latencies[1])
            completion_event.succeed()
            computation_time = packet.computational_requirement / self.slow_computing_power
            packet.cost = packet.computational_requirement * self.slow_rate
            
            with self.slow_resource.request(priority=packet.priority) as req:
                yield req
                packet.wait_end_time = self.env.now
                yield self.env.timeout(computation_time)
                
                

class Packet:
    def __init__(self, env, name, id, creq, lats):
        self.env = env
        self.name = name
        self.id = id
        self.computational_requirement = creq  # Random computational requirement
        self.latencies = lats  # Random latency from each server
        self.wait_start_time = 0
        self.wait_end_time = 0
        self.wait = 0
        self.done_time = 0
        self.server_assigned = None
        self.priority = 1
        self.cost = 0
    def _wait(self):
        self.wait = self.wait_end_time - self.wait_start_time
"""-----------------------------------------------------------------------------------------------------------"""
class CustomEnv(gym.Env):
    """Custom Environment that follows gym interface."""

    def __init__(self,alpha = 0.5):
        super().__init__()
        self.waits = 0
        self.costs = 0
        self.packets=[]
        self.packet_id = 0
        self.alpha = alpha # is the α is a parameter that determines the trade-off between energy consumption and task execution delay.

        
        self.action_space = spaces.Discrete(6)
        self._action_code = {
            0: ["fast", 1],
            1: ["fast", 2],
            2: ["fast", 3],
            3: ["slow", 1],
            4: ["slow", 2],
            5: ["slow", 3]
        }
        self.observation_space = spaces.Dict({
            'comp_req'   : spaces.Box(low=MIN_COMPUTATION_REQ, high=MAX_COMPUTATION_REQ, shape=(1,), dtype=np.int32),
            'latency'    : spaces.Box(low=MIN_LATENCY, high=MAX_LATENCY, shape=(2,), dtype=np.float32),
            'queue_size' : spaces.Box(low=0, high=NUM_DATA, shape=(2,), dtype=np.int32),
            'using'      : spaces.Box(low=0, high=max(NUM_FAST_SERVERS,NUM_SLOW_SERVERS), shape=(2,), dtype=np.int32),
            'priorities' : spaces.Box(low=0, high=PRIORITIES*NUM_DATA, shape=(2,), dtype=np.int32),    
        })

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.waits = 0
        self.costs = 0
        self.packets=[]
        self.packet_id = 0
        
        self.env = simpy.Environment()
        self.server = Server(self.env, NUM_SLOW_SERVERS, NUM_FAST_SERVERS, SLOW_POWER, FAST_POWER, SLOW_RATE, FAST_RATE)
        
        self.current_state = self.observation_space.sample()
        self.packet_id = 0
        self.current_state['queue_size'] = np.array([len(self.server.fast_resource.queue), len(self.server.slow_resource.queue)], dtype=np.int32)
        self.current_state['using'] = np.array([self.server.fast_resource.count, self.server.slow_resource.count], dtype=np.int32)
        self.current_state['priorities'] = np.array([sum([i.priority if self.server.fast_resource.queue else 0 for i in self.server.fast_resource.queue]), sum([i.priority if self.server.slow_resource.queue else 0 for i in self.server.slow_resource.queue])], dtype=np.int32)
        return self.current_state, {}

    def _make_packet(self,action, id, state, completion_event):
        packet = Packet( self.env, f'Packet_{id}', id, state['comp_req'], state['latency'])
        packet.wait_start_time = self.env.now
        [s, p] = self._action_code[action]
        packet.server_assigned = s
        packet.priority = p
        yield self.env.process(self.server.process_packet(packet, completion_event))
        packet.done_time = self.env.now
        packet._wait()
        self.waits += packet.wait
        self.costs += packet.cost
        self.packets.append(packet)
        
    def _packet_generator(self, action, id, state, completion_event):
        yield self.env.timeout(np.random.exponential(1/ARRIVAL_RATE))
        self.env.process(self._make_packet(action, id, state, completion_event))
        
    def step(self, action):
        completion_event = self.env.event()
        self.env.process(self._packet_generator(action, self.packet_id, self.current_state, completion_event))
        self.env.run(until=completion_event)

        self.current_state = self.observation_space.sample()
        self.packet_id +=1
        self.current_state['queue_size'] = np.array([len(self.server.fast_resource.queue), len(self.server.slow_resource.queue)], dtype=np.int32)
        self.current_state['using'] = np.array([self.server.fast_resource.count, self.server.slow_resource.count], dtype=np.int32)
        self.current_state['priorities'] = np.array([sum([i.priority if self.server.fast_resource.queue else 0 for i in self.server.fast_resource.queue]), sum([i.priority if self.server.slow_resource.queue else 0 for i in self.server.slow_resource.queue])], dtype=np.int32)

        reward = 1
        terminated = self.packet_id == NUM_DATA
        if terminated:
            self.env.run()
            reward = 1/(self.waits/NUM_DATA) + COST_MUL/(self.costs/NUM_DATA)
            # reward = - self.alpha * (self.waits) - (1 - self.alpha) * (self.costs)
        
        return self.current_state, reward, terminated, False, {}

    def close(self):
        del self.env

        return 0

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


## Check Environment for Errors

In [2]:
from stable_baselines3.common.env_checker import check_env
# from gymnasium.utils.env_checker import check_env
env=CustomEnv()
check_env(env, warn=True, skip_render_check=True)




# Random Action Simulation

In [3]:
import gymnasium as gym

env=CustomEnv()
observation, info = env.reset()

for i in range(NUM_DATA):
    observation, reward, terminated, truncated, info = env.step(env.action_space.sample())

print(f"average waits: {env.waits/(env.packet_id+1)}\n")
print(f"average cost: {env.costs/(env.packet_id+1)}\n")

env.close( )

average waits: [8836.44422684]

average cost: [3705.35464535]



0

# Train RL Model & Simulate

In [4]:
import gymnasium as gym

from stable_baselines3 import PPO, DQN, A2C
from sb3_contrib import TRPO

env = CustomEnv(0.9)

model = PPO("MultiInputPolicy", env, verbose=1, learning_rate=0.0001, n_steps=8, batch_size=2048)
# model = DQN("MultiInputPolicy", env, verbose=1, batch_size=32, target_update_interval=100)
# model = A2C("MultiInputPolicy", env, verbose=1)
# model = TRPO("MultiInputPolicy", env, verbose=1)



model.learn(total_timesteps=5_000, log_interval=100) #30_000

obs,_ = env.reset()
for i in range(NUM_DATA):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, tru, info = env.step(int(action))
    
print(f"average waits: {env.waits/(env.packet_id+1)}\n")
print(f"average cost: {env.costs/(env.packet_id+1)}\n")

env.close()

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


We recommend using a `batch_size` that is a factor of `n_steps * n_envs`.
Info: (n_steps=8 and n_envs=1)


------------------------------------------
| time/                   |              |
|    fps                  | 70           |
|    iterations           | 100          |
|    time_elapsed         | 11           |
|    total_timesteps      | 800          |
| train/                  |              |
|    approx_kl            | 0.0009318888 |
|    clip_fraction        | 0            |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.73        |
|    explained_variance   | -0.00226     |
|    learning_rate        | 0.0001       |
|    loss                 | 6.99         |
|    n_updates            | 990          |
|    policy_gradient_loss | -0.0103      |
|    value_loss           | 14.3         |
------------------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 1e+03        |
|    ep_rew_mean          | 999          |
| time/                   |              |
|    fps   

0

## Simulate Trained Model

In [6]:
obs,_ = env.reset()
for i in range(NUM_DATA):
    action, _states = model.predict(obs, deterministic=True)
    obs, reward, done, tru, info = env.step(int(action))
print(f"average waits: {env.waits/(env.packet_id+1)}\n")
print(f"average cost: {env.costs/(env.packet_id+1)}\n")

env.close()

average waits: [11846.99384512]

average cost: [4951.18881119]



0

# Save Packet Data

In [148]:
pd.DataFrame([e.__dict__ for e in env.packets]).to_csv("packets.csv")