In [None]:
try:
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import requests
    import json
except ImportError:
    %pip install pandas numpy matplotlib requests json
    import pandas as pd
    import numpy as np
    import matplotlib.pyplot as plt
    import requests
    import json

# ==========================================
# 1. THE "PLANT" (Simulation Environment)
# ==========================================

## 1. The "Plant" (Simulation Environment)

The simulation runs in discrete time steps $t$ where $\Delta t = 1 \text{ hour}$.

### A. Scenario Data Generation
The simulation environment generates time series data for Solar Photovoltaic (PV) generation ($P_{PV,t}$), Household Load ($P_{Load,t}$), and electricity prices ($C_{Buy,t}$, $C_{Sell,t}$) for a total time $T=72$ hours.

$$
\begin{align*}
P_{PV,t} &= \max\left(0, 5 \sin\left(\frac{2\pi (t-6)}{24}\right) - \eta_{cloud,t}\right) \\
\eta_{cloud,t} &\sim 0.3 \cdot \text{Weibull}(\text{shape}=0.5) \\
P_{Load,t} &= \max\left(0.5, 2 + \cos\left(\frac{4\pi (t-18)}{24}\right) + 0.8 \cos\left(\frac{2\pi (t-14)}{24}\right)\right) \\
C_{Buy,t} &= 0.20 + 0.3 \cos\left(\frac{2\pi (t-18)}{24}\right) \\
C_{Sell,t} &= \min(C_{Buy,t}, 0.10) - 0.1
\end{align*}
$$

### B. Environment Dynamics (`SmartHomeEnv.step`)

**Parameters:**
* Battery Capacity: $E_{Bat}^{\max} = 10 \text{ kWh}$
* Max Power: $P^{\max} = 3 \text{ kW}$
* Initial SoC: $E_{Bat,0} = 5 \text{ kWh}$
* Controller's Requested Action: $A_t$ (positive is charge, negative is discharge).

**1. Power Limits (Controller Action Clipping):**
$$P_{Bat,t}^{\text{clipped}} = \text{clip}(A_t, -P^{\max}, P^{\max})$$

**2. Capacity Limits (Actual Battery Power $P_{Bat,t}$):**
$$
P_{Bat,t} = \begin{cases}
\min\left(P_{Bat,t}^{\text{clipped}}, E_{Bat}^{\max} - E_{Bat,t}\right) & \text{if } P_{Bat,t}^{\text{clipped}} > 0 \quad \text{(Charging)} \\
\max\left(P_{Bat,t}^{\text{clipped}}, -E_{Bat,t}\right) & \text{if } P_{Bat,t}^{\text{clipped}} \leq 0 \quad \text{(Discharging)}
\end{cases}
$$

**3. State Update (SoC):**
$$E_{Bat,t+1} = E_{Bat,t} + P_{Bat,t} \cdot \Delta t \quad \text{where } \Delta t = 1$$

**4. Grid Exchange Power ($P_{Grid,t}$):**
$$P_{Grid,t} = P_{Load,t} - P_{PV,t} + P_{Bat,t}$$

**5. Step Cost ($C_{t}$):**
$$
C_{t} = \begin{cases}
P_{Grid,t} \cdot C_{Buy,t} & \text{if } P_{Grid,t} > 0 \quad \text{(Buying)} \\
P_{Grid,t} \cdot C_{Sell,t} & \text{if } P_{Grid,t} \leq 0 \quad \text{(Selling)}
\end{cases}
$$

## 2. The Controllers

### A. Residual Charge Controller
**Net Load:** $P_{NetLoad,t} = P_{Load,t} - P_{PV,t}$

**Action ($A_t$):**
$$A_t = -\text{clip}(P_{NetLoad,t}, -P^{\max}, P^{\max})$$

### B. Cost Optimized Residual Controller
**Action ($A_t$):**
$$
A_t = \begin{cases}
\min\left(P^{\max}, -P_{NetLoad,t}\right) & \text{if } P_{NetLoad,t} < 0 \quad \text{(Surplus: Charge)} \\
-\min\left(P^{\max}, P_{NetLoad,t}\right) & \text{if } P_{NetLoad,t} \geq 0 \text{ and } C_{Buy,t} > 0.20 \quad \text{(Deficit, High Price: Discharge)} \\
0 & \text{otherwise} \quad \text{(Deficit, Low Price: Hold)}
\end{cases}
$$

### C. Self-Sufficiency Reward Function
**Reward ($R_{t}^{\text{Self-Suff}}$):**
$$R_{t}^{\text{Self-Suff}} = -|P_{Grid,t}|$$

### D. Urbs MPC Controller (Perfect Foresight Optimization)

**Optimization Problem:**
$$\min \sum_{t=0}^{T_H-1} C_t \quad \text{subject to:}$$

**1. Dynamics and Balance:**
$$
\begin{align*}
P_{Grid,t} &= P_{Load,t} - P_{PV,t} + \hat{P}_{Bat,t} \\
E_{Bat,t+1} &= E_{Bat,t} + \hat{P}_{Bat,t}
\end{align*}
$$

**2. Constraints:**
$$
\begin{align*}
E_{Bat,0} &= E_{Bat}^{\text{initial}} \\
0 \leq E_{Bat,t} &\leq E_{Bat}^{\max} \\
-P^{\max} \leq \hat{P}_{Bat,t} &\leq P^{\max}
\end{align*}
$$

**3. MPC Closed-Loop Execution (Tracking):**
Let $\hat{P}_{Bat,\tau}$ be the optimal planned actions from the initial run.
$$\hat{E}_{Bat,t+1}^{\text{target}} = E_{Bat}^{\text{initial}} + \sum_{\tau=0}^{t} \hat{P}_{Bat,\tau}$$
The requested action $A_t$ sent to the environment is:
$$A_t = \hat{E}_{Bat,t+1}^{\text{target}} - E_{Bat,t}$$

### Mathematical Model: The "Plant" & Environment Dynamics

The simulation runs in discrete time steps $t$ where $\Delta t = 1 \text{ hour}$.

**A. Scenario Data Generation**
The environment generates time series data for Solar PV ($P_{PV,t}$), Load ($P_{Load,t}$), and prices ($C_{Buy,t}$, $C_{Sell,t}$) The goal for the Load profile is to roughly match the standard load profile in Germany.

$$
\begin{align*}
P_{PV,t} &= \max\left(0, 5 \sin\left(\frac{2\pi (t-6)}{24}\right) - \eta_{cloud,t}\right), \quad \eta_{cloud,t} \sim 0.3 \cdot \text{Weibull}(0.5) \\
P_{Load,t} &= \max\left(0.5, 2 + \cos\left(\frac{4\pi (t-18)}{24}\right) + 0.8 \cos\left(\frac{2\pi (t-14)}{24}\right)\right) \\
C_{Buy,t} &= 0.20 + 0.3 \cos\left(\frac{2\pi (t-18)}{24}\right) \\
C_{Sell,t} &= \min(C_{Buy,t}, 0.10) - 0.1
\end{align*}
$$

**B. Battery & Grid Dynamics**
Given a requested action $A_t$ (positive=charge, negative=discharge):

1.  **Power Limits:** $P_{Bat,t}^{\text{clipped}} = \text{clip}(A_t, -P^{\max}, P^{\max})$
2.  **Capacity Constraints:**
    $$
    P_{Bat,t} = \begin{cases}
    \min\left(P_{Bat,t}^{\text{clipped}}, E_{Bat}^{\max} - E_{Bat,t}\right) & \text{if } P_{Bat,t}^{\text{clipped}} > 0 \\
    \max\left(P_{Bat,t}^{\text{clipped}}, -E_{Bat,t}\right) & \text{if } P_{Bat,t}^{\text{clipped}} \leq 0
    \end{cases}
    $$
3.  **SoC Update:** $E_{Bat,t+1} = E_{Bat,t} + P_{Bat,t} \cdot \Delta t$
4.  **Grid Balance:** $P_{Grid,t} = P_{Load,t} - P_{PV,t} + P_{Bat,t}$
5.  **Cost:** $C_{t} = P_{Grid,t} \cdot C_{Buy,t} \text{ (if } >0) \text{ or } P_{Grid,t} \cdot C_{Sell,t} \text{ (if } \leq 0)$

In [None]:
class SmartHomeEnv:
    def __init__(self, battery_capacity=10.0, max_power=3, seed=42):
        self.battery_capacity = battery_capacity # kWh
        self.max_power = max_power # kW
        self.soc = 5 # Initial State of Charge (kWh)
        self.initial_soc = self.soc
        self.time_step = 0

        # Set a fixed seed for reproducibility
        np.random.seed(seed)

        # Internal Data Generation (The "Real World")
        self.data = self._generate_scenario_data(hours=72)

    def _generate_scenario_data(self, hours):
        t = np.arange(hours)
        # Solar: Peak at noon + random clouds
        solar = np.maximum(0, 5 * np.sin(2 * np.pi * (t - 6) / 24))
        solar = np.maximum(0, solar - 0.3 * np.random.weibull(0.5, size=hours))

        # Load: Morning/Evening peaks
        load = 2 + np.cos(4 * np.pi * (t - 18) / 24) + \
                0.8 * np.cos(2 * np.pi * (t - 14) / 24)
        load = np.maximum(0.5, load)
        # Price: High in evening
        price = 0.20 + 0.3 * np.cos(2 * np.pi * (t - 18) / 24)
        
        # Sell Price: Fixed feed-in tariff (e.g. 0.10 EUR/kWh)
        sell_price = np.minimum(price,0.10 * np.ones(hours)) - 0.1
        # sell_price = price - 0.1

        return pd.DataFrame({'solar': solar, 'load': load, 'price': price, 'sell_price': sell_price})

    def reset(self):
        self.soc = self.initial_soc
        self.time_step = 0
        return self.data.iloc[0]

    def step(self, action_kw):
        """
        Executes one time step.
        Args:
            action_kw (float): Desired battery power (+ Charge, - Discharge)
        Returns:
            observation (Series): The NEXT state (load, solar, price)
            reward (float): The cost incurred this step
            done (bool): Is simulation over?
            info (dict): Debug info
        """
        current_data = self.data.iloc[self.time_step]

        # --- 1. Apply Physics Constraints (The "Real" Battery) ---
        # A. Power Limits
        power = np.clip(action_kw, -self.max_power, self.max_power)

        # B. Capacity Limits
        if power > 0: # Charging
            max_charge = self.battery_capacity - self.soc
            power = min(power, max_charge)
        else: # Discharging
            max_discharge = self.soc
            power = max(power, -max_discharge) # (power is negative)

        # --- 2. Update State ---
        self.soc += power # Simple energy bucket model (1 hour timestep)

        # --- 3. Calculate Cost ---
        # Grid Balance: Load + Charge = Solar + Discharge + Grid
        # Grid = (Load - Solar) + Power
        net_load = current_data['load'] - current_data['solar']
        grid_kw = net_load + power

        if grid_kw > 0:
            cost = grid_kw * current_data['price']
        else:
            cost = grid_kw * current_data['sell_price']

        # --- 4. Prepare Next Step ---
        self.time_step += 1
        done = self.time_step >= len(self.data)

        next_obs = None
        if not done:
            next_obs = self.data.iloc[self.time_step]

        info = {
            'soc': self.soc,
            'grid_kw': grid_kw,
            'battery_action_actual': power,
            'load': current_data['load'],
            'solar': current_data['solar'],
            'price': current_data['price'],
            'sell_price': current_data['sell_price']
        }

        return next_obs, cost, done, info

    def get_forecast(self, horizon=24):
        """Returns the data for the next N hours (for MPC)"""
        start = self.time_step
        end = min(start + horizon, len(self.data))
        return self.data.iloc[start:end]

The gymnasium wrapper just makes the environment usable for reinforcement learning by aligning it with the gymnasium framework.

In [None]:
# --- Define the Gymnasium Environment Wrapper ---
try:
    from pathlib import Path
    import gymnasium as gym
    from gymnasium import spaces
    from stable_baselines3 import PPO
    from stable_baselines3.common.env_checker import check_env
except ImportError:
    %pip install stable-baselines3 gymnasium shimmy
    from pathlib import Path
    import gymnasium as gym
    from gymnasium import spaces
    from stable_baselines3 import PPO
    from stable_baselines3.common.env_checker import check_env

class HEMSGymEnv(gym.Env):
    """
    Gymnasium wrapper for SmartHomeEnv to make it compatible with Stable Baselines3.
    """
    def __init__(self):
        super(HEMSGymEnv, self).__init__()
        self.env = SmartHomeEnv()
        
        # Action space: Continuous [-1, 1] representing fraction of max power
        # We will scale this to [-max_power, max_power] inside step()
        self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32)
        
        # Observation space: [solar, load, price, sell_price, soc]
        # We use -inf to inf to avoid bounds issues, but in practice these are bounded
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(5,), dtype=np.float32)

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        # Reset internal env
        obs_series = self.env.reset()
        
        # Construct observation
        self.current_obs = np.array([
            obs_series['solar'],
            obs_series['load'],
            obs_series['price'],
            obs_series['sell_price'],
            self.env.soc
        ], dtype=np.float32)
        
        return self.current_obs, {}

    def step(self, action):
        # Scale action [-1, 1] -> [-max_power, max_power]
        action_kw = float(action[0]) * self.env.max_power
        
        # Step internal env
        next_obs_series, cost, done, info = self.env.step(action_kw)
        
        # Reward: Negative cost (Maximize negative cost => Minimize cost)
        reward = -cost
        
        # Update observation
        if not done:
            self.current_obs = np.array([
                next_obs_series['solar'],
                next_obs_series['load'],
                next_obs_series['price'],
                next_obs_series['sell_price'],
                self.env.soc
            ], dtype=np.float32)
        else:
            # If done, next_obs might be None or we just return the last one
            pass

        terminated = done
        truncated = False
        
        return self.current_obs, reward, terminated, truncated, info

### Reward Function: Self-Sufficiency
Modifies the standard cost-minimization objective to maximize energy independence by penalizing grid exchange.

**Reward:**
$$R_{t}^{\text{Self-Suff}} = -|P_{Grid,t}|$$

In [None]:
class HEMSSelfsufficientGymEnv(HEMSGymEnv):
    def step(self, action):
        self.current_obs, reward, terminated, truncated, info = super().step(action)


        if info['solar'] > 0:
            reward = (min(info['solar'], info['load'] + info['battery_action_actual']))/ info['solar']* 10
        else:
            reward = 0.0
        reward = -abs(info['grid_kw'])
        return self.current_obs, reward, terminated, truncated, info


# --- Train the Agents ---
envs = {
    "ppo_hems_cost_model.zip": HEMSGymEnv,
    "ppo_hems_selfsufficient_model.zip": HEMSSelfsufficientGymEnv,
    }

In [None]:
for rl_model_path, GymEnv in envs.items():
    rl_model_path = Path(rl_model_path)
    if rl_model_path.exists():
        print(f"Trained Model exists at {rl_model_path}")
    else:
        print(f"Training RL Agent {rl_model_path}... this might take a minute.")
        train_env = GymEnv()
        # Use MlpPolicy (Multi-Layer Perceptron) suitable for vector observations
        rl_model = PPO("MlpPolicy", train_env, verbose=0, learning_rate=0.0003, n_steps=2048)
        rl_model.learn(total_timesteps=50000)
        rl_model.save(rl_model_path)
        print("Training Complete. Model saved.")

# ==========================================
# 2. THE CONTROLLERS
# ==========================================

In [None]:
class Controller:
    """Base controller: stores battery capacity and max power values and defines interface."""
    def __init__(self, env):
        self.bat_cap = env.battery_capacity
        self.max_p = env.max_power

    def get_action(self, observation, current_soc):
        raise NotImplementedError

In [None]:
class BasicController(Controller):
    """Store all solar; discharge battery completely."""

    def get_action(self, observation, current_soc):
        net_load = observation['load'] - observation['solar']
        if net_load < 0:
            return self.max_p
        else:
            return -self.max_p

### Controller: Residual Charge
This controller stores excess solar energy and discharges only to cover deficits.

**Logic:**
Let $P_{NetLoad,t} = P_{Load,t} - P_{PV,t}$.

$$
A_t = \begin{cases}
\min(P^{\max}, -P_{NetLoad,t}) & \text{if } P_{NetLoad,t} < 0 \quad \text{(Charge surplus)} \\
-\min(P^{\max}, P_{NetLoad,t}) & \text{if } P_{NetLoad,t} \geq 0 \quad \text{(Discharge for load)}
\end{cases}
$$

In [1]:
class ResidualChargeController(Controller):
    """Store excess solar; discharge to meet deficits."""

    def get_action(self, observation, current_soc):
        net_load = observation['load'] - observation['solar']
        if net_load < 0:
            return min(self.max_p, -net_load)
        else:
            return -min(self.max_p, net_load)

NameError: name 'Controller' is not defined

In [None]:
class CostOptimizedBasicController(Controller):
    """Charge from solar; discharge when price is high; otherwise hold."""
    def __init__(self, env, price_threshold=0.20):
        super().__init__(env)
        self.price_threshold = price_threshold

    def get_action(self, observation, current_soc):
        net_load = observation['load'] - observation['solar']
        current_price = observation['price']
        if net_load < 0:
            return self.max_p
        else:
            if current_price > self.price_threshold:
                return -self.max_p
            else:
                return 0.0

### Controller: Cost-Optimized Residual
This controller behaves like the Residual controller but only discharges when electricity prices are high ($> 0.20$).

**Logic:**
$$
A_t = \begin{cases}
\min\left(P^{\max}, -P_{NetLoad,t}\right) & \text{if } P_{NetLoad,t} < 0 \\
-\min\left(P^{\max}, P_{NetLoad,t}\right) & \text{if } P_{NetLoad,t} \geq 0 \text{ AND } C_{Buy,t} > 0.20 \\
0 & \text{otherwise (Hold)}
\end{cases}
$$

In [None]:
class CostOptimizedResidualController(Controller):
    """Charge from surplus solar; discharge to cover residual load when price is high; otherwise hold."""
    def __init__(self, env, price_threshold=0.20):
        super().__init__(env)
        self.price_threshold = price_threshold

    def get_action(self, observation, current_soc):
        net_load = observation['load'] - observation['solar']
        current_price = observation['price']
        if net_load < 0:
            return min(self.max_p, -net_load)
        else:
            if current_price > self.price_threshold:
                return -min(self.max_p, net_load)
            else:
                return 0.0

### Controller: Urbs MPC (Perfect Foresight)
Uses an external optimization solver (Urbs) to plan the optimal battery trajectory $\hat{P}_{Bat}$ over a horizon $T_H$.

**Optimization Problem:**
$$
\min \sum_{t=0}^{T_H-1} C_t \quad \text{s.t.} \quad E_{Bat,t+1} = E_{Bat,t} + \hat{P}_{Bat,t}, \quad 0 \leq E_{Bat,t} \leq E_{Bat}^{\max}
$$

**Closed-Loop Tracking:**
To prevent drift, the controller tracks the planned State of Charge ($\text{SoC}$).
Target SoC: $\hat{E}_{Bat,t+1}^{\text{target}} = E_{Bat}^{\text{initial}} + \sum_{\tau=0}^{t} \hat{P}_{Bat,\tau}$
Action: $A_t = \hat{E}_{Bat,t+1}^{\text{target}} - E_{Bat,t}$

### D. Urbs MPC Controller (Perfect Foresight Optimization)

The Urbs-based Model Predictive Controller (MPC) solves a full-horizon optimization problem to find the optimal battery power trajectory $\hat{P}_{Bat,t}$ that minimizes total cost, assuming perfect foresight of load, solar, and prices over the horizon $T_H$.

**Optimization Problem:**
$$\min \sum_{t=0}^{T_H-1} C_t \quad \text{subject to:}$$

**1. Dynamics and Balance:**
$$
\begin{align*}
P_{Grid,t} &= P_{Load,t} - P_{PV,t} + \hat{P}_{Bat,t} \\
E_{Bat,t+1} &= E_{Bat,t} + \hat{P}_{Bat,t}
\end{align*}
$$

**2. Constraints:**
$$
\begin{align*}
E_{Bat,0} &= E_{Bat}^{\text{initial}} \\
0 \leq E_{Bat,t} &\leq E_{Bat}^{\max} \\
-P^{\max} \leq \hat{P}_{Bat,t} &\leq P^{\max}
\end{align*}
$$

**3. MPC Closed-Loop Execution:**
Instead of applying the first optimal action $\hat{P}_{Bat,0}$ directly (as in a standard MPC), the controller uses a tracking approach: it calculates the action $A_t$ required to move the *actual* current SoC ($E_{Bat,t}$) to the planned optimal SoC ($\hat{E}_{Bat,t+1}^{\text{target}}$) for the next step.

Let $\hat{P}_{Bat,\tau}$ be the optimal planned actions from the initial run.
$$\hat{E}_{Bat,t+1}^{\text{target}} = E_{Bat}^{\text{initial}} + \sum_{\tau=0}^{t} \hat{P}_{Bat,\tau}$$
The requested action $A_t$ sent to the environment is:
$$A_t = \hat{E}_{Bat,t+1}^{\text{target}} - E_{Bat,t}$$

In [None]:
class UrbsMPCController(Controller):
    """MPC controller using Urbs for optimization with perfect foresight."""
    def __init__(self, env, url="http://localhost:5000/simulate"):
        super().__init__(env)
        self.url = url
        self.plan = None
        self.env = env # Need access to env for full forecast
        self.horizon = 10000
        self.plan_initial_soc = None

    def get_action(self, observation, current_soc):
        # Plan once at the beginning (Perfect Foresight)
        if self.plan is None:
            self.plan = self._run_optimization(horizon=self.horizon, initial_soc=current_soc)
            self.plan_initial_soc = current_soc
        
        # Get action for current timestep
        t = self.env.time_step
        if t < len(self.plan):
            # Closed-Loop execution (SoC Tracking) to prevent drift:
            # Calculate where we SHOULD be at the end of this step according to the plan
            # Target SoC = Initial + Sum of all planned actions up to and including this step
            target_soc_end = self.plan_initial_soc + np.sum(self.plan[:t+1])
            
            # Action required to get from current_soc to target_soc_end
            action_required = target_soc_end - current_soc
            
            return action_required
        else:
            return 0.0

    def _run_optimization(self, horizon=10000, initial_soc=0.0):
        # 1. Get Full Forecast
        # We need the full data. env.data is available since we passed env.
        # But to be clean, let's use get_forecast with a large horizon.
        full_data = self.env.get_forecast(horizon=horizon) # Get everything
        timesteps = len(full_data)
        
        if timesteps < 2:
            return np.zeros(timesteps)

        # 2. Construct JSON Payload
        # Normalize Solar: Urbs SupIm is usually a profile. 
        # We'll set installed capacity to max(solar) and profile to solar/max.
        solar_profile = full_data['solar'].values
        max_solar = solar_profile.max()
        if max_solar == 0: max_solar = 1.0
        norm_solar = (solar_profile / max_solar).tolist()
        
        load_profile = full_data['load'].tolist()
        price_profile = full_data['price'].tolist()
        sell_price_profile = full_data['sell_price'].tolist()
        
        payload ={
            "site": {
                "Main": {
                    "area": 100,
                    "process": {
                        "Purchase": {
                            "wacc": 0,
                            "cap-lo": 0,
                            "cap-up": 1000,
                            "fix-cost": 0,
                            "inst-cap": 1000,
                            "inv-cost": 0,
                            "max-grad": "inf",
                            "var-cost": 0,
                            "commodity": {
                                "Elec": {
                                    "ratio": 1,
                                    "Direction": "Out",
                                    "ratio-min": 1
                                },
                                "Elec buy": {
                                    "ratio": 1,
                                    "Direction": "In",
                                    "ratio-min": 1
                                }
                            },
                            "description": "Buy electricity from the utility grid",
                            "depreciation": 50,
                            "min-fraction": 0
                        },
                        "Feed-in": {
                            "wacc": 0,
                            "cap-lo": 0,
                            "cap-up": 1000,
                            "fix-cost": 0,
                            "inst-cap": 1000,
                            "inv-cost": 0,
                            "max-grad": "inf",
                            "var-cost": 0,
                            "commodity": {
                                "Elec": {
                                    "ratio": 1,
                                    "Direction": "In",
                                    "ratio-min": 1
                                },
                                "Elec sell": {
                                    "ratio": 1,
                                    "Direction": "Out",
                                    "ratio-min": 1
                                }
                            },
                            "description": "Sell electricity to the utility grid",
                            "depreciation": 50,
                            "min-fraction": 0
                        },
                        "Photovoltaics": {
                            "wacc": 0.07,
                            "cap-lo": max_solar,
                            "cap-up": max_solar,
                            "fix-cost": 0,
                            "inst-cap": max_solar,
                            "inv-cost": 0,
                            "max-grad": "inf",
                            "var-cost": 0,
                            "commodity": {
                                "Elec": {
                                    "ratio": 1,
                                    "Direction": "Out",
                                    "ratio-min": 1
                                },
                                "Solar": {
                                    "ratio": 1,
                                    "Direction": "In",
                                    "ratio-min": 1
                                }
                            },
                            "description": "Generates electricity from sun",
                            "area-per-cap": 5,
                            "depreciation": 25,
                            "min-fraction": 0
                        }
                    },
                    "commodity": {
                        "Elec": {
                            "Type": "Demand",
                            "unitC": "kWh",
                            "unitR": "kW",
                            "demand": load_profile,
                            "storage": {
                                "Lead-Acid Battery": {
                                    "init": initial_soc / self.bat_cap if self.bat_cap > 0 else 0,
                                    "wacc": 0.007,
                                    "eff-in": 1,
                                    "eff-out": 1,
                                    "cap-lo-c": self.bat_cap,
                                    "cap-lo-p": self.max_p,
                                    "cap-up-c": self.bat_cap,
                                    "cap-up-p": self.max_p,
                                    "discharge": 0,
                                    "fix-cost-c": 0,
                                    "fix-cost-p": 0,
                                    "inst-cap-c": self.bat_cap,
                                    "inst-cap-p": self.max_p,
                                    "inv-cost-c": 0,
                                    "inv-cost-p": 0,
                                    "var-cost-c": 0,
                                    "var-cost-p": 0,
                                    "description": "Lead-Acid battery",
                                    "depreciation": 5
                                }
                            }
                        },
                        "Solar": {
                            "Type": "SupIm",
                            "supim": norm_solar,
                            "unitC": "kWh",
                            "unitR": "kW"
                        },
                        "Elec buy": {
                            "max": "inf",
                            "Type": "Buy",
                            "price": 0.1,
                            "unitC": "kWh",
                            "unitR": "kW",
                            "maxperhour": "inf"
                        },
                        "Elec sell": {
                            "max": "inf",
                            "Type": "Sell",
                            "price": 0.0,
                            "unitC": "kWh",
                            "unitR": "kW",
                            "maxperhour": "inf"
                        }
                    }
                }
            },
            "global": {
                "CO2 limit": 150000000,
                "Cost limit": 35000000000
            },
            "c_timesteps": timesteps,
            "buysellprice": {
                "Elec buy": price_profile,
                "Elec sell": sell_price_profile
            }
        }
        # 3. Send Request
        # try:

        # Save payload to JSON file
        with open('adg_payload.urbs', 'w') as f:
            json.dump(payload, f, indent=2)
        response = requests.post(self.url, json=payload, timeout=60)
        response.raise_for_status()
        result = response.json()
        
        # 4. Parse Result
        # result['results']['Main']['Elec']['storage']['Stored'] (Charge)
        # result['results']['Main']['Elec']['storage']['Retrieved'] (Discharge)
        storage_res = result['data']['results']['Main']['Elec']['storage']
        charge = np.array(storage_res['Stored'])
        discharge = np.array(storage_res['Retrieved'])
        

        # Net action: Charge - Discharge
        # Note: SmartHomeEnv expects positive for Charge, negative for Discharge.
        actions = charge - discharge
        return actions
            
        # except Exception as e:
        #     print(f"Optimization failed: {e}")
        #     # Fallback: Do nothing
        #     return np.zeros(timesteps)

In [None]:
class LimitedURBSController(UrbsMPCController):
    """MPC controller using Urbs for optimization with perfect foresight and limited horizon."""
    def __init__(self, env, url="http://localhost:5000/simulate"):
        super().__init__(env, url)
        self.horizon = 1000

    def get_action(self, observation, current_soc):
        actions = self._run_optimization(horizon=self.horizon, initial_soc=current_soc)
        
        # We only execute the first action of the plan
        if len(actions) > 0:
            return actions[0]
        else:
            return 0.0

In [None]:
class SB3RLCostController(Controller):
    """
    Controller that uses the pre-trained Stable Baselines3 model with cost optimization.
    """
    def __init__(self, env):
        super().__init__(env)
        self.model = PPO.load("ppo_hems_cost_model.zip")

    def get_action(self, observation, current_soc):
        # Prepare observation for the model
        obs_vector = np.array([
            observation['solar'],
            observation['load'],
            observation['price'],
            observation['sell_price'],
            current_soc
        ], dtype=np.float32)
        
        # Predict (deterministic=True for evaluation)
        action, _ = self.model.predict(obs_vector, deterministic=True)
        
        # Scale back to kW
        return action[0] * self.max_p

In [None]:
class SB3RLSelfSufficientController(SB3RLCostController):
    """
    Controller that uses the pre-trained Stable Baselines3 model with self sufficiency optimization.
    """
    def __init__(self, env):
        super(SB3RLCostController, self).__init__(env)
        self.model = PPO.load("ppo_hems_selfsufficient_model.zip")


# ==========================================
# 3. MAIN LOOP (The "Gym" Loop)
# ==========================================

In [None]:
def run_experiment(Controller):
    # Setup
    env = SmartHomeEnv()
    agent = Controller(env)
   # agent = CostOptimizedRuleBasedController(env)
   # agent = SOCAwareRuleBasedController(env)
    # History Storage
    results = []

    # Start
    obs = env.reset()
    done = False

    print("Starting Simulation...")

    while not done:
        # 1. Agent decides
        # Note: RuleBased needs current SOC to be perfect, but here we
        # just ask for MAX and let physics clip it.
        current_soc = env.soc # Capture SoC before step
        action_requested = agent.get_action(obs, current_soc)

        # 2. Environment reacts
        next_obs, cost, done, info = env.step(action_requested)



        # 3. Store Data
        info['action_requested'] = action_requested # Log the requested action
        info['cost'] = cost
        info['soc'] = current_soc # Use the SoC from start of step
        results.append(info)


        # 4. Advance
        obs = next_obs

    # Process Results
    df_res = pd.DataFrame(results)
    return df_res

# ==========================================
# 4. VISUALIZATION
# ==========================================

In [None]:
def plot_controller_performance(df, controller_name, total_cost):
    print(f"Total Cost for {controller_name}: €{total_cost:.2f}")

    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), sharex=True)

    # Ax1: Physics
    ax1.set_title(f"{controller_name}: Power Flows")
    ax1.plot(df['load'], 'k--', label='Load', alpha=0.5)
    ax1.plot(df['solar'], 'orange', label='Solar', alpha=0.5)
    ax1.bar(df.index, df['battery_action_actual'], color='green', alpha=0.3, label='Battery Flow')
    ax1.legend()
    ax1.set_ylabel("kW")

    # Ax2: Battery State
    ax2.set_title(f"{controller_name}: Battery State of Charge")
    ax2.plot(df['soc'], 'g-', linewidth=2)
    ax2.set_ylabel("kWh")
    ax2.set_xlabel("Hour")
    ax2.grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

# ==========================================
# 5. COMPARISON
# ==========================================

In [None]:
controllers = {
    # "Basic": BasicController,
    "ResidualCharge": ResidualChargeController,
    # "CostOptimizedBasic": CostOptimizedBasicController,
    "CostOptimizedResidual": CostOptimizedResidualController,
    "LimitedURBS": LimitedURBSController,
    "SB3RLCost": SB3RLCostController,
    # "SB3RLSelfSufficient": SB3RLSelfSufficientController,
    # "UrbsMPC": UrbsMPCController
}

all_results = {}
for name, controller in controllers.items():
    all_results[name] = run_experiment(controller)


fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), sharex=True)
df = list(all_results.values())[0]
ax1.plot(df['solar'], label='Solar')
ax1.plot(df['load'], 'k--', label='Load')
ax1.plot(df['price'], label='Price')
ax1.plot(df['sell_price'], label='Sell Price')
for name, df in all_results.items():
    total_cost = df['cost'].sum()
    # print(f"Total Cost for {name}: €{total_cost:.2f}")


    # Ax1: Physics
    ax1.set_title("Power Flows")


    ax1.bar(df.index, df['battery_action_actual'], alpha=0.3, label=f'{name} \n (Cost: {total_cost:.2f}€)')
    ax1.legend(bbox_to_anchor=(1, 1), loc='upper left')
    ax1.set_ylabel("kW")

    # Ax2: Battery State
    ax2.set_title("Battery State of Charge")
    ax2.plot(df['soc'], '--', linewidth=2, label=f"{name}")
    ax2.set_ylabel("kWh")
    ax2.set_xlabel("Hour")
    ax2.grid(True, alpha=0.3)
    ax2.legend(bbox_to_anchor=(1, 1), loc='upper left')

    plt.tight_layout()
plt.show()

In [None]:
class LimitedURBS2Controller(UrbsMPCController):
    """MPC controller using Urbs for optimization with perfect foresight and limited horizon."""
    def __init__(self, env, url="http://localhost:5000/simulate"):
        super().__init__(env, url)
        self.horizon = 2
    def get_action(self, observation, current_soc):
        actions = self._run_optimization(horizon=self.horizon, initial_soc=current_soc)
        
        # We only execute the first action of the plan
        if len(actions) > 0:
            return actions[0]
        else:
            return 0.0

In [None]:
class Limited3URBSController(UrbsMPCController):
    """MPC controller using Urbs for optimization with perfect foresight and limited horizon."""
    def __init__(self, env, url="http://localhost:5000/simulate"):
        super().__init__(env, url)
        self.horizon = 3

    def get_action(self, observation, current_soc):
        actions = self._run_optimization(horizon=self.horizon, initial_soc=current_soc)
        
        # We only execute the first action of the plan
        if len(actions) > 0:
            return actions[0]
        else:
            return 0.0

In [None]:
class LimitedURBS5Controller(UrbsMPCController):
    """MPC controller using Urbs for optimization with perfect foresight and limited horizon."""
    def __init__(self, env, url="http://localhost:5000/simulate"):
        super().__init__(env, url)
        self.horizon = 5
        
    def get_action(self, observation, current_soc):
        actions = self._run_optimization(horizon=self.horizon, initial_soc=current_soc)
        
        # We only execute the first action of the plan
        if len(actions) > 0:
            return actions[0]
        else:
            return 0.0

In [None]:
class LimitedURBS10Controller(UrbsMPCController):
    """MPC controller using Urbs for optimization with perfect foresight and limited horizon."""
    def __init__(self, env, url="http://localhost:5000/simulate"):
        super().__init__(env, url)
        self.horizon = 10

    def get_action(self, observation, current_soc):
        actions = self._run_optimization(horizon=self.horizon, initial_soc=current_soc)
        
        # We only execute the first action of the plan
        if len(actions) > 0:
            return actions[0]
        else:
            return 0.0

In [None]:
class LimitedURBS24Controller(UrbsMPCController):
    """MPC controller using Urbs for optimization with perfect foresight and limited horizon."""
    def __init__(self, env, url="http://localhost:5000/simulate"):
        super().__init__(env, url)
        self.horizon = 24
        
    def get_action(self, observation, current_soc):
        actions = self._run_optimization(horizon=self.horizon, initial_soc=current_soc)
        
        # We only execute the first action of the plan
        if len(actions) > 0:
            return actions[0]
        else:
            return 0.0

In [None]:
class LimitedURBS72Controller(UrbsMPCController):
    """MPC controller using Urbs for optimization with perfect foresight and limited horizon."""
    def __init__(self, env, url="http://localhost:5000/simulate"):
        super().__init__(env, url)
        self.horizon = 72
        
    def get_action(self, observation, current_soc):
        actions = self._run_optimization(horizon=self.horizon, initial_soc=current_soc)
        
        # We only execute the first action of the plan
        if len(actions) > 0:
            return actions[0]
        else:
            return 0.0

In [None]:
controllers = {
    "Limit 2": LimitedURBS2Controller,
    "Limit 3": Limited3URBSController,
    "Limit 5": LimitedURBS5Controller,
    "Limit 10": LimitedURBS10Controller,
    "Limit 24": LimitedURBS24Controller,
    "Limit 72": LimitedURBS72Controller,
}

all_results = {}
for name, controller in controllers.items():
    all_results[name] = run_experiment(controller)


fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8), sharex=True)
df = list(all_results.values())[0]
ax1.plot(df['solar'], label='Solar')
ax1.plot(df['load'], 'k--', label='Load')
ax1.plot(df['price'], label='Price')
ax1.plot(df['sell_price'], label='Sell Price')
for name, df in all_results.items():
    total_cost = df['cost'].sum()
    # print(f"Total Cost for {name}: €{total_cost:.2f}")


    # Ax1: Physics
    ax1.set_title("Power Flows")


    ax1.bar(df.index, df['battery_action_actual'], alpha=0.3, label=f'{name} \n (Cost: {total_cost:.2f}€)')
    ax1.legend(bbox_to_anchor=(1, 1), loc='upper left')
    ax1.set_ylabel("kW")

    # Ax2: Battery State
    ax2.set_title("Battery State of Charge")
    ax2.plot(df['soc'], '--', linewidth=2, label=f"{name}")
    ax2.set_ylabel("kWh")
    ax2.set_xlabel("Hour")
    ax2.grid(True, alpha=0.3)
    ax2.legend(bbox_to_anchor=(1, 1), loc='upper left')

    plt.tight_layout()
plt.show()