In [5]:
import gymnasium as gym
from gymnasium.spaces import Discrete, Box, Sequence, Dict
import numpy as np
from utils import methods


In [6]:

CONFIG = {
    'alpha_range': range(2, 8),
    'beta_range': [round(i * 0.5, 1) for i in range(2, 9)],
    'h_range': [round(i * 0.01, 2) for i in range(6, 61)],
    'c_range': range(20, 30),
    'total': range(10, 40), 
}

def get_realized_data(config):
    alpha = np.random.choice(config['alpha_range'])
    beta = np.random.choice(config['beta_range'])
    h = np.random.choice(config['h_range'])
    c = np.random.choice(config['c_range'])
    total = np.random.choice(config['total'])
    intervals = np.random.gamma(shape=alpha, scale=beta, size=total)
    travel_time = sum(intervals[3:]) - np.random.exponential(scale=beta)
    travel_time = max(beta * 2, travel_time)

    return alpha, beta, h, c, total, intervals, travel_time

In [7]:
class CustomEnv(gym.Env):
    def __init__(self, step_size=1):
        super(CustomEnv, self).__init__()

        self.alpha = -1
        self.beta = -1
        self.h = -1
        self.c = -1
        self.total = -1
        self.intervals = -1
        self.travel_time = -1
        self.cur_time = -1
        self.obs_intervals = -1
        self.n = -1
        self.N = -1
        self.cum_sum_intervals = -1

        self.step_size = step_size

        # 0 = wait, 1 = leave
        self.action_space = Discrete(2)

        self.observation_space = Dict({
            'h': Box(0, max(CONFIG['h_range']) + 1),
            'c': Box(0, max(CONFIG['c_range']) + 1),
            'n': Discrete(max(CONFIG['total']) + 1),
            'N': Discrete(max(CONFIG['total']) + 1),
            'obs_intervals': Sequence(Box(0, np.inf)),
            'travel_time': Box(0, np.inf),
            'cur_time': Box(0, np.inf)
        })
    
    def _get_obs(self):
        return {
            'h': self.h,
            'c': self.c,
            'n': self.n,
            'N': self.N,
            'obs_intervals': self.obs_intervals,
            'travel_time': self.travel_time,
            'cur_time': self.cur_time
        }

    def reset(self):

        self.alpha, self.beta, self.h, self.c, self.total, self.intervals, self.travel_time = get_realized_data(CONFIG)
        
        self.cum_sum_intervals = np.cumsum(self.intervals)

        self.obs_intervals = self.intervals[:3]
        self.n =  3
        self.N = self.total - self.n
        self.cur_time = self.cum_sum_intervals[self.n - 1]

        return self._get_obs()

    def step(self, action):
        self.cur_time += self.step_size

        if self.cur_time >= self.cum_sum_intervals[-1]:
            action = 1

        if action == 0:
            while self.cur_time >= self.cum_sum_intervals[self.n]:
                self.n += 1
                self.N -= 1
                self.obs_intervals.append(self.intervals[self.n])
            return self._get_obs(), 0, False, {}
        else:
            cost = methods.cal_cost(c=self.c, h=self.h, actual_time=self.cum_sum_intervals[-1], predicted_time=self.cur_time + self.travel_time)
            self.obs_intervals = self.intervals[:self.total]
            self.n = self.total
            self.N = 0
            return self._get_obs(), -cost, True, {} 

    def render(self, mode='human'):
        print(self._get_obs())

In [None]:
c = CustomEnv()

# experiment with the environment
c.reset()
done = False 
while not done:
    _, reward, done, _ = c.step(np.random.choice([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]))
    print(c._get_obs())
    print(reward)

In [None]:
print(c.observation_space)

In [None]:
import stable_baselines3
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.vec_env import SubprocVecEnv

# Create the environment
env = CustomEnv()
env.reset()

# Instantiate the age nt
# model = PPO("MultiInputPolicy", env, verbose=1)
model = PPO("MultiInputPolicy", env, device="cpu")

# Train the agent
model.learn(total_timesteps=100)


# Save the model
model.save("ppo_custom_env")

# Load the model
model = PPO.load("ppo_custom_env")

# Test the trained agent
obs = env.reset()
done = False
while not done:
    action, _states = model.predict(obs)
    obs, rewards, done, info = env.step(action)
    env.render(mode='human')