## Notebook Roadmap
- Synthetic data + environment encapsulation
- Baseline controller plus KPIs
- Lightweight DQN agent and training loop
- Evaluation utilities and investor-friendly plots

In [None]:
# Imports and global configuration
from __future__ import annotations

import math
import random
from collections import deque
from dataclasses import dataclass
from typing import Callable, Dict, List, Tuple

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn.functional as F
from torch import nn

plt.style.use("seaborn-v0_8-colorblind")
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
np.set_printoptions(precision=3, suppress=True)
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

TIME_STEPS = 96  # 15-minute resolution
DELTA_T_HOURS = 0.25

@dataclass
class APVConfig:
    pv_kwp: float = 100.0
    bess_kwh: float = 200.0
    bess_kw: float = 100.0
    bess_efficiency: float = 0.92
    pump_kw: float = 30.0
    num_pumps: int = 2
    soil_target: float = 0.6
    soil_capacity: float = 1.0
    soil_min: float = 0.2
    water_per_kw: float = 1.6  # m3 delivered per kWh

@dataclass
class TariffConfig:
    import_price: float = 0.12  # $/kWh
    export_price: float = 0.05  # $/kWh credit

@dataclass
class RewardWeights:
    energy_cost: float = 1.0
    soil_stress: float = 4.5
    bess_cycle: float = 0.04
    irrigation_shortfall: float = 18.0

APV_CFG = APVConfig()
TARIFF = TariffConfig()
REWARD_W = RewardWeights()

In [None]:
# Synthetic weather, load, and irrigation profiles
def generate_synthetic_day(seed: int, steps: int = TIME_STEPS) -> Dict[str, np.ndarray]:
    rng = np.random.default_rng(seed)
    hours = np.arange(steps) * DELTA_T_HOURS
    solar_core = np.clip(np.sin((hours - 6) / 12 * math.pi), 0.0, 1.0)
    cloud_noise = rng.normal(0.0, 0.12, steps)
    irradiance = np.clip(solar_core + cloud_noise, 0.0, 1.0)
    pv_kw = irradiance * APV_CFG.pv_kwp * 0.9
    temp_c = 24 + 8 * irradiance + rng.normal(0.0, 1.5, steps)
    temp_norm = np.clip((temp_c - 18) / 20, 0.0, 1.0)
    base_load = 18 + 4 * np.sin((hours - 3) / 8) + rng.normal(0.0, 1.5, steps)
    evening_bump = 6 * np.exp(-((hours - 19) ** 2) / 18)
    base_load_kw = np.clip(base_load + evening_bump, 12, 40)
    irrigation_target_m3 = rng.uniform(480, 640)
    return {
        "irradiance": irradiance,
        "pv_kw": pv_kw,
        "temperature_norm": temp_norm,
        "base_load_kw": base_load_kw,
        "irrigation_target_m3": irrigation_target_m3,
        "hours": hours,
    }

In [None]:
# Agrivoltaic environment with reward shaping
class AgrivoltaicEnv:
    def __init__(self, apv_cfg: APVConfig = APV_CFG, tariff: TariffConfig = TARIFF, reward_weights: RewardWeights = REWARD_W):
        self.cfg = apv_cfg
        self.tariff = tariff
        self.reward_weights = reward_weights
        self.delta_t = DELTA_T_HOURS
        self.max_steps = TIME_STEPS
        self.state_dim = 8
        self.action_space = 5
        self.day: Dict[str, np.ndarray] | None = None
        self._step_log: List[Dict[str, float]] = []
        self.episode_metrics: Dict[str, float] = {}
        self.episode_trace: List[Dict[str, float]] = []

    def reset(self, seed: int | None = None) -> np.ndarray:
        if seed is None:
            seed = random.randint(0, 10_000)
        self.day = generate_synthetic_day(seed, self.max_steps)
        self.step_idx = 0
        self.soc = 0.5 * self.cfg.bess_kwh
        self._last_soc = self.soc
        self.soil = self.cfg.soil_target
        self.remaining_irrigation = 1.0
        self.daily_target = float(self.day["irrigation_target_m3"])
        self.cumulative_cost = 0.0
        self.cumulative_stress_steps = 0
        self.bess_throughput = 0.0
        self._episode_reward = 0.0
        self.episode_metrics = {}
        self._step_log = []
        self.episode_trace = []
        return self._get_obs()

    def _get_obs(self) -> np.ndarray:
        idx = min(self.step_idx, self.max_steps - 1)
        pv_norm = float(self.day["pv_kw"][idx] / self.cfg.pv_kwp)
        base_norm = float(self.day["base_load_kw"][idx] / 40.0)
        obs = np.array([
            idx / self.max_steps,
            self.soc / self.cfg.bess_kwh,
            self.soil,
            pv_norm,
            base_norm,
            float(self.day["irradiance"][idx]),
            float(self.day["temperature_norm"][idx]),
            self.remaining_irrigation,
        ], dtype=np.float32)
        return obs

    def step(self, action: int) -> Tuple[np.ndarray, float, bool, Dict[str, float]]:
        assert self.day is not None, "Call reset() before step()."
        pv_kw = float(self.day["pv_kw"][self.step_idx])
        base_kw = float(self.day["base_load_kw"][self.step_idx])
        irradiance = float(self.day["irradiance"][self.step_idx])
        temp_norm = float(self.day["temperature_norm"][self.step_idx])

        pump_kw, irrig_m3 = self._pump_dispatch(action)
        irrig_fraction = self._update_soil(irrig_m3, irradiance)
        residual_kw = base_kw + pump_kw - pv_kw
        bess_kw = self._bess_dispatch(action, residual_kw)
        net_grid_kw = residual_kw - bess_kw
        step_cost = self._settle_cost(net_grid_kw)
        soil_penalty = abs(self.soil - self.cfg.soil_target)
        delta_soc = abs(self.soc - self._last_soc)
        reward = -(self.reward_weights.energy_cost * step_cost +
                   self.reward_weights.soil_stress * soil_penalty +
                   self.reward_weights.bess_cycle * delta_soc)
        self._episode_reward += reward
        self.cumulative_cost += step_cost
        self.bess_throughput += delta_soc
        if self.soil < self.cfg.soil_target:
            self.cumulative_stress_steps += 1
        self._record_step(action, pump_kw, bess_kw, net_grid_kw, pv_kw, base_kw, temp_norm, irrig_fraction, reward)
        self._last_soc = self.soc

        self.step_idx += 1
        done = self.step_idx >= self.max_steps
        if done:
            shortfall_penalty = self.reward_weights.irrigation_shortfall * self.remaining_irrigation
            reward -= shortfall_penalty
            self._episode_reward -= shortfall_penalty
            self._finalize_metrics()
            next_obs = np.zeros(self.state_dim, dtype=np.float32)
        else:
            next_obs = self._get_obs()

        info = {
            "net_grid_kw": net_grid_kw,
            "step_cost": step_cost,
            "soil": self.soil,
            "soc": self.soc / self.cfg.bess_kwh,
            "remaining_irrigation": self.remaining_irrigation,
        }
        return next_obs, float(reward), done, info

    def _settle_cost(self, net_grid_kw: float) -> float:
        energy_kwh = net_grid_kw * self.delta_t
        if energy_kwh >= 0:
            return energy_kwh * self.tariff.import_price
        return energy_kwh * self.tariff.export_price

    def _pump_dispatch(self, action: int) -> Tuple[float, float]:
        if action in (0, 4):
            pumps = 0
        elif action == 1:
            pumps = 1
        else:
            pumps = self.cfg.num_pumps
        pump_kw = pumps * self.cfg.pump_kw
        energy_kwh = pump_kw * self.delta_t
        irrig_m3 = energy_kwh * self.cfg.water_per_kw
        return pump_kw, irrig_m3

    def _update_soil(self, irrig_m3: float, irradiance: float) -> float:
        if self.daily_target <= 0:
            return 0.0
        irrig_fraction = irrig_m3 / self.daily_target
        self.remaining_irrigation = max(0.0, self.remaining_irrigation - irrig_fraction)
        soil_gain = 0.8 * irrig_fraction
        evap = 0.01 + 0.05 * irradiance
        self.soil = float(np.clip(self.soil + soil_gain - evap, self.cfg.soil_min, self.cfg.soil_capacity))
        return irrig_fraction

    def _bess_dispatch(self, action: int, residual_kw: float) -> float:
        if action == 3 and residual_kw > 0:
            request = min(residual_kw, self.cfg.bess_kw)
        elif action == 4 and residual_kw < 0:
            request = max(residual_kw, -self.cfg.bess_kw)
        else:
            request = 0.0
        return self._apply_bess(request)

    def _apply_bess(self, requested_kw: float) -> float:
        if requested_kw >= 0:
            max_discharge_kw = min(self.cfg.bess_kw, self.soc / self.delta_t)
            actual_kw = min(requested_kw, max_discharge_kw)
            energy = actual_kw * self.delta_t
            self.soc -= energy
            return actual_kw
        max_charge_kw = min(self.cfg.bess_kw, (self.cfg.bess_kwh - self.soc) / (self.cfg.bess_efficiency * self.delta_t))
        charge_kw = min(-requested_kw, max_charge_kw)
        energy = charge_kw * self.delta_t * self.cfg.bess_efficiency
        self.soc += energy
        return -charge_kw

    def _record_step(self, action: int, pump_kw: float, bess_kw: float, grid_kw: float, pv_kw: float,
                     base_kw: float, temp_norm: float, irrig_fraction: float, reward: float) -> None:
        hour = float(self.day["hours"][self.step_idx])
        self._step_log.append({
            "hour": hour,
            "action": action,
            "pv_kw": pv_kw,
            "base_kw": base_kw,
            "pump_kw": pump_kw,
            "bess_kw": bess_kw,
            "grid_kw": grid_kw,
            "soc": self.soc / self.cfg.bess_kwh,
            "soil": self.soil,
            "remaining_irrig": self.remaining_irrigation,
            "temp_norm": temp_norm,
            "irradiance": float(self.day["irradiance"][self.step_idx]),
            "irrig_frac": irrig_fraction,
            "reward": reward,
        })

    def _finalize_metrics(self) -> None:
        imports = sum(max(entry["grid_kw"], 0.0) for entry in self._step_log) * self.delta_t
        exports = -sum(min(entry["grid_kw"], 0.0) for entry in self._step_log) * self.delta_t
        self.episode_metrics = {
            "total_cost": self.cumulative_cost,
            "stress_hours": self.cumulative_stress_steps * self.delta_t,
            "irrigation_shortfall": self.remaining_irrigation,
            "bess_cycles": self.bess_throughput / (2 * self.cfg.bess_kwh),
            "grid_import_kwh": imports,
            "grid_export_kwh": exports,
            "episode_reward": self._episode_reward,
        }
        self.episode_trace = list(self._step_log)

In [None]:
# Rule-based policy for benchmarking
def rule_based_policy(state: np.ndarray) -> int:
    time_norm, soc, soil, pv_norm, base_norm, irr, temp, remaining = state.tolist()
    if soil < 0.5 and pv_norm > 0.5:
        return 2 if soc > 0.4 else 1
    if remaining > 0.2 and soc > 0.6 and time_norm > 0.4:
        return 3
    if pv_norm > base_norm and soc < 0.9:
        return 4
    if soil < 0.45:
        return 1
    return 0

def random_policy(_: np.ndarray) -> int:
    return random.randint(0, 4)

In [None]:
# Lightweight DQN components
class ReplayBuffer:
    def __init__(self, capacity: int = 20000):
        self.buffer = deque(maxlen=capacity)

    def __len__(self) -> int:
        return len(self.buffer)

    def add(self, state: np.ndarray, action: int, reward: float, next_state: np.ndarray, done: bool) -> None:
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size: int):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*batch))
        return states, actions, rewards, next_states, dones

class QNetwork(nn.Module):
    def __init__(self, state_dim: int, action_dim: int, hidden_dim: int = 64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(state_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, action_dim),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)

class DQNAgent:
    def __init__(self, state_dim: int, action_dim: int, hidden_dim: int = 64,
                 lr: float = 1e-3, gamma: float = 0.99, epsilon_start: float = 1.0,
                 epsilon_end: float = 0.05, epsilon_decay: float = 0.995,
                 batch_size: int = 128, target_update: int = 10, buffer_size: int = 25000):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.batch_size = batch_size
        self.epsilon = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.target_update = target_update
        self.replay = ReplayBuffer(buffer_size)
        self.q_net = QNetwork(state_dim, action_dim, hidden_dim).to(DEVICE)
        self.target_net = QNetwork(state_dim, action_dim, hidden_dim).to(DEVICE)
        self.target_net.load_state_dict(self.q_net.state_dict())
        self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=lr)
        self.learn_steps = 0

    def select_action(self, state: np.ndarray, explore: bool = True) -> int:
        if explore and random.random() < self.epsilon:
            return random.randrange(self.action_dim)
        state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            q_values = self.q_net(state_tensor)
        return int(torch.argmax(q_values, dim=1).item())

    def remember(self, state, action, reward, next_state, done) -> None:
        self.replay.add(state, action, reward, next_state, done)

    def learn(self) -> float:
        if len(self.replay) < self.batch_size:
            return 0.0
        states, actions, rewards, next_states, dones = self.replay.sample(self.batch_size)
        states_t = torch.from_numpy(states).float().to(DEVICE)
        actions_t = torch.from_numpy(actions).long().to(DEVICE)
        rewards_t = torch.from_numpy(rewards).float().to(DEVICE)
        next_states_t = torch.from_numpy(next_states).float().to(DEVICE)
        dones_t = torch.from_numpy(dones.astype(np.float32)).to(DEVICE)

        q_values = self.q_net(states_t).gather(1, actions_t.unsqueeze(1)).squeeze(1)
        next_q = self.target_net(next_states_t).max(1)[0]
        targets = rewards_t + self.gamma * (1 - dones_t) * next_q.detach()
        loss = F.mse_loss(q_values, targets)
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.q_net.parameters(), 1.0)
        self.optimizer.step()
        self.learn_steps += 1
        return float(loss.item())

    def update_target(self) -> None:
        self.target_net.load_state_dict(self.q_net.state_dict())

    def schedule_epsilon(self) -> None:
        self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)

    def save(self, path: str) -> None:
        torch.save(self.q_net.state_dict(), path)

    def load(self, path: str) -> None:
        self.q_net.load_state_dict(torch.load(path, map_location=DEVICE))
        self.update_target()

In [None]:
# Training helper
def train_dqn(num_days: int = 220, seed_offset: int = 0) -> Tuple[DQNAgent, List[float], List[float]]:
    env = AgrivoltaicEnv()
    agent = DQNAgent(env.state_dim, env.action_space)
    reward_history: List[float] = []
    loss_history: List[float] = []
    for episode in range(num_days):
        state = env.reset(seed=seed_offset + episode)
        done = False
        ep_reward = 0.0
        ep_losses: List[float] = []
        while not done:
            action = agent.select_action(state, explore=True)
            next_state, reward, done, _ = env.step(action)
            agent.remember(state, action, reward, next_state, done)
            loss = agent.learn()
            if loss:
                ep_losses.append(loss)
            state = next_state
            ep_reward += reward
        agent.schedule_epsilon()
        if (episode + 1) % agent.target_update == 0:
            agent.update_target()
        reward_history.append(ep_reward)
        loss_history.append(float(np.mean(ep_losses)) if ep_losses else 0.0)
    return agent, reward_history, loss_history

In [None]:
# Evaluation utilities and plotting
def evaluate_policy(env: AgrivoltaicEnv, policy_fn: Callable[[np.ndarray], int], days: int = 20,
                    seed_start: int = 1000) -> Tuple[List[Dict[str, float]], List[List[Dict[str, float]]]]:
    metrics: List[Dict[str, float]] = []
    traces: List[List[Dict[str, float]]] = []
    for day in range(days):
        state = env.reset(seed=seed_start + day)
        done = False
        while not done:
            action = policy_fn(state)
            state, _, done, _ = env.step(action)
        metrics.append(env.episode_metrics)
        traces.append(env.episode_trace)
    return metrics, traces

def summarize_metrics(metrics: List[Dict[str, float]]) -> Dict[str, float]:
    if not metrics:
        return {}
    summary: Dict[str, float] = {}
    for key in metrics[0]:
        values = np.array([m[key] for m in metrics], dtype=np.float32)
        summary[key] = float(np.mean(values))
    return summary

def print_kpis(label: str, metrics: List[Dict[str, float]]) -> None:
    summary = summarize_metrics(metrics)
    if not summary:
        print(f"No metrics for {label}.")
        return
    print(f"\n--- {label} KPIs ---")
    print(f"Mean daily cost: ${summary['total_cost']:.2f}")
    print(f"Stress hours: {summary['stress_hours']:.2f} h")
    print(f"Irrigation shortfall: {summary['irrigation_shortfall']*100:.1f}% of target")
    print(f"BESS cycles: {summary['bess_cycles']:.3f} equiv/day")
    print(f"Import/Export: {summary['grid_import_kwh']:.1f} / {summary['grid_export_kwh']:.1f} kWh")
    print(f"Episode reward: {summary['episode_reward']:.2f}")

def plot_training(rewards: List[float], losses: List[float]) -> None:
    fig, ax1 = plt.subplots(figsize=(8, 4))
    ax1.plot(rewards, color="tab:blue", label="Episode reward")
    ax1.set_xlabel("Episode")
    ax1.set_ylabel("Reward", color="tab:blue")
    ax2 = ax1.twinx()
    ax2.plot(losses, color="tab:orange", alpha=0.6, label="Loss")
    ax2.set_ylabel("Loss", color="tab:orange")
    ax1.grid(True, alpha=0.3)
    plt.title("Training progress")
    fig.tight_layout()
    plt.show()

def plot_day(trace: List[Dict[str, float]], title: str) -> None:
    if not trace:
        print("Empty trace")
        return
    hours = [row["hour"] for row in trace]
    pv = [row["pv_kw"] for row in trace]
    base = [row["base_kw"] for row in trace]
    pump = [row["pump_kw"] for row in trace]
    bess = [row["bess_kw"] for row in trace]
    grid = [row["grid_kw"] for row in trace]
    soc = [row["soc"] for row in trace]
    soil = [row["soil"] for row in trace]
    fig, ax1 = plt.subplots(figsize=(9, 5))
    ax1.plot(hours, pv, label="PV kW")
    ax1.plot(hours, base, label="Base load kW")
    ax1.step(hours, pump, where="post", label="Pump kW")
    ax1.plot(hours, grid, label="Grid kW")
    ax1.plot(hours, bess, label="BESS kW")
    ax1.set_ylabel("Power (kW)")
    ax1.set_xlabel("Hour")
    ax1.legend(loc="upper left", ncol=2)
    ax2 = ax1.twinx()
    ax2.plot(hours, soc, color="tab:green", label="SOC")
    ax2.plot(hours, soil, color="tab:brown", label="Soil")
    ax2.set_ylabel("State (p.u.)")
    ax2.axhline(APV_CFG.soil_target, color="tab:brown", linestyle="--", alpha=0.4)
    plt.title(title)
    fig.tight_layout()
    plt.show()

In [None]:
# Train agent and benchmark vs. rule-based policy
agent, rewards, losses = train_dqn(num_days=220, seed_offset=0)
plot_training(rewards, losses)

eval_env = AgrivoltaicEnv()
baseline_metrics, baseline_traces = evaluate_policy(eval_env, rule_based_policy, days=20, seed_start=2000)
rl_policy = lambda state: agent.select_action(state, explore=False)
rl_metrics, rl_traces = evaluate_policy(eval_env, rl_policy, days=20, seed_start=3000)

print_kpis("Rule-based", baseline_metrics)
print_kpis("DQN", rl_metrics)

In [None]:
# Visualize representative baseline vs RL day
if baseline_traces:
    plot_day(baseline_traces[0], "Rule-based day")
if rl_traces:
    plot_day(rl_traces[0], "DQN day")