In [None]:
#https://sourceforge.net/projects/swig/files/swigwin/swigwin-4.0.2/swigwin-4.0.2.zip/download?use_mirror=ixpeering

# 1. Import Dependencies

In [47]:
import gymnasium as gym 
from gymnasium import Env
from gymnasium.spaces import Discrete, Box, Dict, Tuple, MultiBinary, MultiDiscrete 
import numpy as np
import random
import os
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.evaluation import evaluate_policy
from typing import Optional # for type hinting

# 2. Types of Spaces

In [3]:
Discrete(3)

Discrete(3)

In [6]:
Discrete(3).sample()

np.int64(2)

In [4]:
Box(0,1,shape=(3,3)).sample()

array([[0.68517005, 0.74287844, 0.51207143],
       [0.11664689, 0.55905044, 0.00550268],
       [0.27708128, 0.00790686, 0.34266573]], dtype=float32)

In [7]:
Box(0,255,shape=(3,3), dtype=int).sample()

array([[ 78,  70,  52],
       [244,  44, 161],
       [ 62,  32,   2]])

In [8]:
Tuple((Discrete(2), Box(0,100, shape=(1,)))).sample()

(np.int64(1), array([35.283913], dtype=float32))

In [9]:
Dict({'height':Discrete(2), "speed":Box(0,100, shape=(1,))}).sample()

{'height': np.int64(0), 'speed': array([39.625473], dtype=float32)}

In [10]:
MultiBinary(4).sample()

array([0, 1, 1, 1], dtype=int8)

In [21]:
MultiDiscrete([5,2,3]).sample()

array([3, 1, 0])

# 3. Building an Environment

In [61]:
class ShowerEnv(Env):
    """A custom environment for a shower temperature control problem."""
    def __init__(self):
        # Actions we can take, down, stay, up
        self.action_space = Discrete(3) # 0,1,2 for down, stay, up
        # Temperature array
        self.observation_space = Box(low=np.array([0]), high=np.array([100]))
        # Set start temp
        self.state = 38 + random.randint(-3,3)
        # Set shower length
        self.shower_length = 60
        
    def step(self, action):
        # Apply action
        # 0 -1 = -1 temperature
        # 1 -1 = 0 
        # 2 -1 = 1 temperature 
        self.state += action -1 
        # Reduce shower length by 1 second
        self.shower_length -= 1 
        
        # Calculate reward (1 for 37-39, else -1)
        if self.state >=37 and self.state <=39: 
            reward =1 
        else: 
            reward = -1 
        
        # Check if shower is done
        if self.shower_length <= 0: 
            terminated = True
        else:
            terminated = False
        truncated = False
        
        # Apply temperature noise
        #self.state += random.randint(-1,1)
        # Set placeholder for info
        info = {}
        
        # Return step information
        return self.state, reward, terminated, truncated, info

    def render(self):
        # Implement viz
        pass
    
    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
        super().reset(seed=seed)
        # Reset shower temperature
        self.state = np.array([38 + random.randint(-3,3)]).astype(np.float32)
        # Reset shower time
        self.shower_length = 60 
        info = {}
        return self.state, info

In [62]:
env=ShowerEnv()

In [50]:
env.observation_space.sample()

array([96.43866], dtype=float32)

In [51]:
env.reset()

array([37.])

In [52]:
from stable_baselines3.common.env_checker import check_env

In [63]:
check_env(env, warn=True)

# 4. Test Environment

In [38]:
episodes = 5
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0 
    
    while not done:
        env.render()
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score+=reward
    print('Episode:{} Score:{}'.format(episode, score))
env.close()

Episode:1 Score:-18
Episode:2 Score:-60
Episode:3 Score:-14
Episode:4 Score:-54
Episode:5 Score:-40


In [64]:
env.close()

# 5. Train Model

In [65]:
log_path = os.path.join('Training', 'Logs')

In [66]:
model = PPO("MlpPolicy", env, verbose=1, tensorboard_log=log_path)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [67]:
model.learn(total_timesteps=400000)

Logging to Training/Logs/PPO_4
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 60       |
|    ep_rew_mean     | -31.1    |
| time/              |          |
|    fps             | 7000     |
|    iterations      | 1        |
|    time_elapsed    | 0        |
|    total_timesteps | 2048     |
---------------------------------
------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 60           |
|    ep_rew_mean          | -29.4        |
| time/                   |              |
|    fps                  | 4353         |
|    iterations           | 2            |
|    time_elapsed         | 0            |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0039025033 |
|    clip_fraction        | 0.0167       |
|    clip_range           | 0.2          |
|    entropy_loss         | -1.09        |
|    explained_variance   |

<stable_baselines3.ppo.ppo.PPO at 0x14d0fff90>

# 6. Save Model

In [70]:
save_path = os.path.join('Training', 'Saved Models', 'PPO_shower_env')
model.save(save_path)

In [73]:
mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10, render=False)
print(f'Mean reward: {mean_reward} +/- {std_reward}')

Mean reward: 59.0 +/- 1.0
