In [None]:
#https://sourceforge.net/projects/swig/files/swigwin/swigwin-4.0.2/swigwin-4.0.2.zip/download?use_mirror=ixpeering

# 1. Import Dependencies

In [1]:
# Gymnasium imports
import gymnasium as gym 
from gymnasium import Env
from gymnasium.spaces import Discrete, Box, Dict, Tuple, MultiBinary, MultiDiscrete 

# Import helpers
import numpy as np
import random
import os

# Import stable baselines
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.evaluation import evaluate_policy

# 2. Types of Spaces

In [2]:
# Discrete space with three different values
Discrete(3)

Discrete(3)

In [None]:
# Box space with 3x3 shape
Box(0,1,shape=(3,3)).sample()

In [15]:
Box(0,255,shape=(3,3), dtype=int).sample()

array([[ 43, 172,  95],
       [222, 130,  79],
       [215, 165,  98]])

In [16]:
# Not supported by stable baselines
# Combination of differet spaces
Tuple((Discrete(2), Box(0,100, shape=(1,)))).sample()

(0, array([46.0833], dtype=float32))

In [17]:
# Not supported by stable baselines
# Combination of different spaces with keywords
# Most likely the one I will use
Dict({'height':Discrete(2), "speed":Box(0,100, shape=(1,))}).sample()

OrderedDict([('height', 1), ('speed', array([65.581764], dtype=float32))])

In [18]:
# Binary set of values
MultiBinary(4).sample()

array([0, 0, 1, 0], dtype=int8)

In [19]:
# Discrete set of values
MultiDiscrete([5,2,2]).sample()

array([3, 0, 1])

# 3. Building an Environment

In [21]:
# Build an agent that gives the best shower possible

class ShowerEnv(Env):
    def __init__(self):
        # Actions we can take, down, stay, up
        self.action_space = Discrete(3)
        # Temperature array
        self.observation_space = Box(low=np.array([0]), high=np.array([100]))
        # Set start temp
        self.state = 38 + random.randint(-3,3)
        # Set shower length
        self.shower_length = 60
        
    def step(self, action):
        # Apply action
        # 0 -1 = -1 temperature
        # 1 -1 = 0 
        # 2 -1 = 1 temperature 
        self.state += action - 1 
        # Reduce shower length by 1 second
        self.shower_length -= 1 
        
        # Calculate reward
        if self.state >= 37 and self.state <= 39: 
            reward = 1 
        else: 
            reward = -1 
        
        # Check if shower is done
        if self.shower_length <= 0: 
            done = True
        else:
            done = False
        
        # Apply temperature noise
        #self.state += random.randint(-1,1)
        # Set placeholder for info
        info = {}

        truncated = False

        obs = np.array([self.state], dtype=np.float32)
        
        # Return step information
        return obs, reward, done, truncated, info

    def render(self):
        # Implement viz
        pass
    
    # def reset(self, seed = None):
    #     # Reset shower temperature
    #     self.state = np.array([38 + random.randint(-3,3)]).astype(float)
    #     # Reset shower time
    #     self.shower_length = 60 
    #     return self.state

    def reset(self, seed=None):
        # Handle the seed
        super().reset(seed=seed)
        if seed is not None:
            np.random.seed(seed)
            random.seed(seed)
        # Reset shower temperature
        self.state = 38 + random.randint(-3, 3)

        obs = np.array([self.state], dtype=np.float32)

        # Reset shower time
        self.shower_length = 60

        info = {}

        return obs, info


In [22]:
env = ShowerEnv()

In [15]:
env.observation_space.sample()

array([29.095186], dtype=float32)

In [16]:
env.reset()

(array([38.], dtype=float32), {})

In [11]:
from stable_baselines3.common.env_checker import check_env

In [23]:
check_env(env, warn=True)

# 4. Test Environment

In [25]:
episodes = 5
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0 
    
    while not done:
        action = env.action_space.sample()
        n_state, reward, done, info, _ = env.step(action)
        score+=reward
    print('Episode:{} Score:{}'.format(episode, score))

Episode:1 Score:-24
Episode:2 Score:0
Episode:3 Score:12
Episode:4 Score:-30
Episode:5 Score:-12


# 5. Train Model

In [33]:
log_path = os.path.join('Logs')

In [34]:
model = PPO("MlpPolicy", env, verbose=1, tensorboard_log=log_path)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [38]:
model.learn(total_timesteps=400000)

Logging to Logs/PPO_2
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 60       |
|    ep_rew_mean     | -22.9    |
| time/              |          |
|    fps             | 7188     |
|    iterations      | 1        |
|    time_elapsed    | 0        |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 60           |
|    ep_rew_mean          | -11.1        |
| time/                   |              |
|    fps                  | 4146         |
|    iterations           | 2            |
|    time_elapsed         | 0            |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0071394052 |
|    clip_fraction        | 0.157        |
|    clip_range           | 0.2          |
|    entropy_loss         | -0.895       |
|    explained_variance   | 0.00026 

<stable_baselines3.ppo.ppo.PPO at 0x31e943da0>

# 6. Save Model

In [31]:
save_path = os.path.join('Model', 'PPO')

model.save(save_path)

In [39]:
evaluate_policy(model, env, n_eval_episodes=10, render=False)

(59.4, 0.9165151389911679)