In [None]:
import sinergym.utils.controllers as controllers
from sinergym.utils.wrappers import NormalizeObservation
import gymnasium as gym
import numpy as np
import random
import pandas as pd

In [None]:
def get_controller(env_name):
    if "5zone" in env_name:
        return controllers.RBC5Zone
    elif "data" in env_name:
        return controllers.RBCDatacenter
    else:
        return controllers.RandomController

In [None]:
def get_env_info(env_name):
    if "5zone" in env_name:
        # Change range of heating and cooling setpoint to match user comfort.
        new_action_space = gym.spaces.Box(
            low=np.array([15.0, 23.0], dtype=np.float32),
            high=np.array([23.0, 30.0], dtype=np.float32),
            shape=(2,),
            dtype=np.float32
        )
        env = gym.make(
            env_name, config_params={"runperiod": (1, 1, 2022, 31, 12, 2022)}, action_space=new_action_space
        )
        env = NormalizeObservation(env)
        range_low, range_high = new_action_space.low, new_action_space.high
        # obs_range = RANGES_5ZONE
    elif "datacenter" in env_name:
        env = gym.make(
            env_name, config_params={"runperiod": (1, 1, 2022, 31, 12, 2022)}
        )
        env = NormalizeObservation(env)
    heating_range = [
        range_low[0],
        range_high[0],
    ]
    cooling_range = [
        range_low[1],
        range_high[1],
    ]
    combined_ranges = {
        "Heating_Setpoint_RL": heating_range,
        "Cooling_Setpoint_RL": cooling_range,
    }
    print("Ranges:\n", combined_ranges)
    return env, env.get_wrapper_attr('observation_variables'), env.get_wrapper_attr('action_variables'), combined_ranges

In [None]:
def normalise(values, v_range, action=False):
    # normalization (handle DivisionbyZero Error)
    if v_range[1] - v_range[0] == 0:
        pass
    else:
        values = (values - v_range[0]) / (v_range[1] - v_range[0])
        if action:
            values = 2 * values - 1
            return np.clip(values, -1, 1)

    values[np.isnan(values)] = 0

    return np.clip(values, 0, 1)

In [None]:
cfg = {"env_name": "Eplus-5zone-hot-continuous-v1", "episodes": 1, "seed": 3, "cntrl": "random"}
PATH = "/home/data"

# TRY NOT TO MODIFY: seeding
random.seed(cfg["seed"])
np.random.seed(cfg["seed"])
env, obs_vars, act_vars, combined_ranges = get_env_info(cfg["env_name"])
print(obs_vars)

In [None]:
# Initialise controller agent.
Controller = get_controller(cfg["cntrl"])
agent = Controller(env)

In [None]:
obs_storage = {
    k: [] for k in obs_vars
}
org_obs_storage = {
    k: [] for k in obs_vars
}
act_storage = {k: [] for k in act_vars}
len(org_obs_storage), len(obs_storage), len(act_storage)

In [None]:
for episode in range(1, cfg["episodes"] + 1):
    next_obs, _ = env.reset(seed=cfg["seed"])
    terminated = False
    while not terminated:
        unwrapped_obs = env.unwrapped_observation
        org_obs_to_store = list(unwrapped_obs.copy())
        obs_to_store = list(next_obs.copy())
        if cfg["cntrl"] in ["5zone", "data"]:
            action = agent.act(unwrapped_obs)
        else:
            action = agent.act()
        
        next_obs, reward, terminated, truncated, info = env.step(action)
        
        for k, org_o_value in zip(org_obs_storage, org_obs_to_store):
            org_obs_storage[k].append(org_o_value)
        for k, o_value in zip(obs_storage, obs_to_store):
            obs_storage[k].append(o_value)
        for k, a_value in zip(act_storage, action):
            act_storage[k].append(a_value)

In [None]:
org_obs_df = pd.DataFrame.from_dict(org_obs_storage)
obs_df = pd.DataFrame.from_dict(obs_storage)
act_df = pd.DataFrame.from_dict(act_storage)
org_obs_df.shape, obs_df.shape, act_df.shape

In [None]:
org_obs_df.head()

In [None]:
obs_df.head()

In [None]:

org_data_df = pd.concat([org_obs_df, act_df], ignore_index=False, axis=1)
org_data_df.describe()

In [None]:
env_name = cfg["env_name"].split("-")[2]
org_data_df.to_csv(f"{PATH}/{env_name}/original_" + cfg["cntrl"] + "_data.csv", index=False)

In [None]:
act_df.head()

In [None]:
data_df = pd.concat([obs_df, act_df], ignore_index=False, axis=1)
for column in data_df.columns[-2:]:
    if column in combined_ranges:
        data_df[column] = normalise(data_df[column], combined_ranges[column], action=True)


In [None]:
data_df.describe()

In [None]:
data_df.head()

In [None]:
data_df.to_csv(f"{PATH}/{env_name}/norm_" + cfg["cntrl"] + "_data.csv", index=False)