### Import

In [1]:
import os
import copy
import cma
import gymnasium as gym
import numpy as np

from gymnasium.spaces import Dict, Box, Discrete, MultiDiscrete

from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.callbacks import CheckpointCallback


### Utils

In [2]:
def is_between(A, b):
    return A[0] < b <= A[1]

def calculate_soil_evap_rate(T, R):
    A0 = 1.0
    A1 = 0.1
    return np.float32(A0*(1 + A1*T)*(1-R*0.01))

def calculate_plant_evap_rate(T, R, light_on_off):
    A2 = 1.0
    A3 = 0.1
    A4 = 0.1
    return np.float32(A2*(1 + A3*T)*(1-R*0.01)*(1+A4*light_on_off))

### PlantAirControl Environment

In [3]:
class PlantAirControl(gym.Env):
    # Constants are hard-coded for now but can set up to read from a spreadsheet
    CHAMBER_VOLUME = 1.0
    NON_LIGHT_HEAT = 0.5
    LIGHT_HEAT = 1.0

    FAN_MAX_WATTAGE = 0.5 # kW
    HEAT_MAX_WATTAGE = 3 # kW
    COOL_MAX_WATTAGE = 3 # kW

    FAN_MAX_AIR_INOUT_RATE = 1.0 
    HEAT_MAX_WATER_TEMP_UP_RATE = 1.0
    COOL_MAX_WATER_TEMP_DOWN_RATE = 1.0

    DESIRED_TEMPS = [25.0, 32.0] # Celcius
    DESIRED_HUMIDS = [70.0, 85.0] # Percentage humidity

    def __init__(self, render_mode=None,
                light_heat = LIGHT_HEAT,
                non_light_heat = NON_LIGHT_HEAT,
                chamber_volume = CHAMBER_VOLUME,

                fan_max_wattage = FAN_MAX_WATTAGE,
                heat_max_wattage = HEAT_MAX_WATTAGE,
                cool_max_wattage = COOL_MAX_WATTAGE,

                fan_max_air_inout_rate = FAN_MAX_AIR_INOUT_RATE,
                heat_max_water_temp_up_rate = HEAT_MAX_WATER_TEMP_UP_RATE,
                cool_max_water_temp_down_rate = COOL_MAX_WATER_TEMP_DOWN_RATE,

                desired_temps = DESIRED_TEMPS,
                desired_humids = DESIRED_HUMIDS,
                ):
        # Set up some variables
        self.chamber_volume = np.float32(chamber_volume)
        self.light_heat = np.float32(light_heat)
        self.non_light_heat = np.float32(non_light_heat)

        self.fan_max_wattage = np.float32(fan_max_wattage)
        self.heat_max_wattage = np.float32(heat_max_wattage)
        self.cool_max_wattage = np.float32(cool_max_wattage)

        self.fan_max_air_inout_rate = np.float32(fan_max_air_inout_rate)
        self.heat_max_water_temp_up_rate = np.float32(heat_max_water_temp_up_rate)
        self.cool_max_water_temp_down_rate = np.float32(cool_max_water_temp_down_rate)

        self.dersired_temps = np.float32(np.array(desired_temps))
        self.dersired_humids = np.float32(np.array(desired_humids))

        # Specify the action_space
        self.action_space = MultiDiscrete([101]*3) # e.g., fan capacity, heating component capacity, cooling component capacity
        
        # Speicfy the observation_space
        self.observation_space = Dict({"InTemp/InHumid/OutTemp/OutHumid/Energy": Box(-100, 100, shape=(5,)),
                                       "LightOnOff": Discrete(2),
                                        "Status": Discrete(2),})
        
        # # Logger objects for longer time scale
        # self.inside_temps = []
        # self.inside_humids = []
        # self.outside_temps = []
        # self.outside_humids = []
        # self.fan_controls = []
        # self.heat_controls = []
        # self.cool_controls = []

    def reset(self, seed=None):
        super().reset(seed=seed) # To enable self.np_random seeding
        rng = self.np_random

        # Start state initialisation
        self.light_on_off = rng.integers(2)

        self.inside_temp = rng.uniform(low = 15, high = 45)
        self.inside_humid = rng.uniform(low = 40, high = 90)
        self.outside_temp = rng.uniform(low = 0, high = 30)
        self.outside_humid = rng.uniform(low = 30, high = 70)

        self.energy = 0
        
        self.plant_OK = 0 # 0 means not in the box yet, 1 means OK
        
        observation = {"InTemp/InHumid/OutTemp/OutHumid/Energy": np.float32(np.array([self.inside_temp, 
                                                                          self.inside_humid, 
                                                                          self.outside_temp,
                                                                          self.outside_humid,
                                                                          self.energy])),
                        "LightOnOff": self.light_on_off,
                        "Status": self.plant_OK}

        info = {}

        return observation, info

    def step(self, action):
        # # Log things for longer time scale
        # self.inside_temps.append(self.inside_temp)
        # self.inside_humids.append(self.inside_humid)
        # self.outside_temps.append(self.outside_temp)
        # self.outside_humids.append(self.outside_humid)
        # self.fan_controls.append(action[0])
        # self.heat_controls.append(action[1])
        # self.cool_controls.append(action[2])

        # Caculate some quantities
        self.plant_evap_rate = calculate_plant_evap_rate(self.inside_temp, self.inside_humid, self.light_on_off)
        self.soil_evap_rate = calculate_soil_evap_rate(self.inside_temp, self.inside_humid)

        # Interprete action into parameters of mathematical model
        self.fan_air_inout_rate = (action[0] * self.fan_max_air_inout_rate)/100
        self.heat_water_temp_up_rate = (action[1] * self.heat_max_water_temp_up_rate)/100
        self.cool_water_temp_down_rate = (action[2] * self.cool_max_water_temp_down_rate)/100

        self.fan_wattage = (action[0] * self.fan_max_wattage)/100
        self.heat_wattage = (action[1] * self.heat_max_wattage)/100
        self.cool_wattage = (action[2] * self.cool_max_wattage)/100

        # Update observations via model for temp, humididty
        self.inside_temp += (self.non_light_heat + self.light_on_off * self.light_heat)/self.chamber_volume \
            + 5.0 * self.fan_air_inout_rate * (self.outside_temp - self.inside_temp) \
            + 5.0 * (2*self.heat_water_temp_up_rate - self.cool_water_temp_down_rate)

        self.inside_humid += (self.plant_evap_rate + self.soil_evap_rate)/self.chamber_volume \
            + 5.0 * self.fan_air_inout_rate * (self.outside_humid - self.inside_humid) \
            + 5.0 * (self.heat_water_temp_up_rate - 2*self.cool_water_temp_down_rate)

        self.energy += self.fan_wattage + self.heat_wattage + self.cool_wattage

        if is_between(self.dersired_temps, self.inside_temp) and is_between(self.dersired_humids, self.inside_humid):
            self.plant_OK = 1

        observation = {"InTemp/InHumid/OutTemp/OutHumid/Energy": np.float32(np.array([self.inside_temp, 
                                                                          self.inside_humid, 
                                                                          self.outside_temp,
                                                                          self.outside_humid,
                                                                          self.energy])),
                        "LightOnOff": self.light_on_off,
                        "Status": self.plant_OK}

        # Reward
        reward = self.plant_OK * 10 - self.energy

        terminated = True

        info = {}
            
        return observation, reward, terminated, False, info

### Testing

In [4]:
env = PlantAirControl()
episodes = 10
for episode in range(1, episodes + 1):
    obsINI, infoINI  = env.reset()
    print(obsINI)
    score = 0
    terminated = False
    truncated = False

    while not terminated or truncated:
        action = env.action_space.sample()
        obs, reward, terminated, truncated, info = env.step(action)
        print(obs)
        print(f"The final reward is {reward}")

{'InTemp/InHumid/OutTemp/OutHumid/Energy': array([22.600061, 67.65732 , 26.657713, 64.71329 ,  0.      ],
      dtype=float32), 'LightOnOff': 1, 'Status': 0}
{'InTemp/InHumid/OutTemp/OutHumid/Energy': array([37.099064, 72.06908 , 26.657713, 64.71329 ,  2.995   ],
      dtype=float32), 'LightOnOff': 1, 'Status': 0}
The final reward is -2.9949999999999997
{'InTemp/InHumid/OutTemp/OutHumid/Energy': array([28.222027, 76.547356, 29.562115, 68.03025 ,  0.      ],
      dtype=float32), 'LightOnOff': 0, 'Status': 0}
{'InTemp/InHumid/OutTemp/OutHumid/Energy': array([32.099163, 65.788666, 29.562115, 68.03025 ,  0.545   ],
      dtype=float32), 'LightOnOff': 0, 'Status': 0}
The final reward is -0.545
{'InTemp/InHumid/OutTemp/OutHumid/Energy': array([36.408325, 47.318813,  7.525201, 46.402637,  0.      ],
      dtype=float32), 'LightOnOff': 0, 'Status': 0}
{'InTemp/InHumid/OutTemp/OutHumid/Energy': array([-21.13455 ,  52.492916,   7.525201,  46.402637,   2.14    ],
      dtype=float32), 'LightOnOf

### Training

In [5]:
log_path = os.path.join(os.getcwd(), "Logs")
save_path = os.path.join(os.getcwd(), "Saved Models")

In [6]:
 # Set up the training environment
n_envs = 8
envs = make_vec_env(PlantAirControl, 
                    n_envs=n_envs,
                    seed=42,
                    vec_env_cls=DummyVecEnv)

# Set up the training model
model = PPO("MultiInputPolicy", 
            envs, 
            verbose=0, 
            tensorboard_log= os.path.join(log_path, "Training"), 
            device = "cpu")

# Set up checkpoints for during training
checkpoint_callback = CheckpointCallback(save_freq= 1e5, 
                                            save_path=os.path.join(save_path, 'Checkpoints'),
                                            name_prefix='Checkpoint',
                                            save_replay_buffer=True,
                                            save_vecnormalize=True,
                                            verbose = 0)

# Training
model.learn(2e6, callback = checkpoint_callback, progress_bar = True)

# Save the model
model_final_path = os.path.join(save_path, 'Final') 
model.save(model_final_path)

Output()

KeyboardInterrupt: 

### Finetune with Evolutionary Strategies

In [None]:
model_final_path = os.path.join(os.getcwd(), "Saved Models", 'Final')

In [None]:
# Turn RL problem into classic optimisation problem
class evaluate_action:
    def __init__(self, env, obsINI):
        self.env = env
        self.obsINI = obsINI

    def __call__(self, action):
        action = np.floor(action)
        env_copy = copy.deepcopy(self.env)
        # Reset environment
        score = 0
        terminated = False
        truncated = False
        while not terminated or truncated:
            obs, reward, terminated, truncated, info = env_copy.step(action)
            score += reward
        fitness = 10 - score # This turns a maximisation problem into a minimsation problem
        return fitness

In [None]:
# Fine-tune model with Evolutionary Strategies
def ESOptimisation(env, obsINI):
    # Copy action
    model = PPO.load(model_final_path, env)
    action, _  = model.predict(obsINI, deterministic = True)

    # Orginal model score
    env_copy = copy.deepcopy(env)
    original_score = 0
    terminated = False
    truncated = False
    while not terminated or truncated:
        obs, reward, terminated, truncated, info = env_copy.step(action)
        original_score += reward

    # Finetune
    env_copy = copy.deepcopy(env)
    fun = evaluate_action(env_copy, obsINI)
    upper_buffer = np.float32(np.array([1 - 1e-7] * 3)) # Add buffer to give equal prob for Max action in ES 
    upper_bounds = np.float32(np.array([100]*3)) + upper_buffer
    lower_bounds = np.float32(np.array([0] * 3))

    print(f"Fine-tuning in progress ...")
    # ES starting from no action
    x, es= cma.fmin2(fun, action, 25,
                      {'integer_variables': list(range(len(lower_bounds))),
                       'bounds': [lower_bounds, upper_bounds], 
                       'tolflatfitness':10, 
                       'tolfun': 1e-6,
                       'tolfunhist': 1e-7,
                       'verbose': -10},
                        restarts=2,
                        eval_initial_x = True,
                        )

    final_score = 10 - fun(x)
    final_action = np.floor(x)

    return final_action

### Deployment

In [None]:
# Create an environment
env = PlantAirControl()

In [None]:
# Load RL model
model_final_path = os.path.join(os.getcwd(), "Saved Models", 'Final')
model = PPO.load(model_final_path, env)

In [None]:
episodes = 5
for episode in range(1, episodes + 1):
    obsINI, infoINI  = env.reset()
    score = 0
    terminated = False
    truncated = False

    while not terminated or truncated:
        action, _  = model.predict(obsINI, deterministic = False)
        obs, reward, terminated, truncated, info = env.step(action)
        score += reward
        print(f"The final score is {score}")

-0.7549999999999999
-0.87
-0.905
-0.64
-0.375


In [None]:
episodes = 5
for episode in range(1, episodes + 1):
    obsINI, infoINI  = env.reset()
    action = ESOptimisation(env, obsINI)
    score = 0
    terminated = False
    truncated = False

    while not terminated or truncated:
        obs, reward, terminated, truncated, info = env.step(action)
        score += reward
        print(f"The final score is {score}")

Fine-tuning in progress ...
The final score is 8.505
8.505
Fine-tuning in progress ...
The final score is 0.0
0.0
Fine-tuning in progress ...
The final score is 0.0
0.0
Fine-tuning in progress ...
The final score is 0.0
0.0
Fine-tuning in progress ...
The final score is 0.0
0.0
