# Microgrid Dynamic Pricing with Advantage Actor-Critic

This notebook builds a minimal microgrid simulator, then trains an advantage actor-critic (A2C) policy to choose demand response actions that balance bill savings and occupant comfort. The workflow mirrors the standalone report in the repository README.

## Load data and dependencies

We use a small household-level consumption sample with four features per timestamp: average temperature, energy use, household size, and peak-hour share. Actions represent a normalized load-shedding ratio between 0 (no reduction) and 1 (full reduction).

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pandas as pd

df = pd.read_csv("household_energy_consumption.csv")
df.head()


## Environment definition
The environment tracks a seven-step episode sampled from the time series. Observations include normalized temperature, consumption, household size, and the derived grid price. The reward subtracts quadratic discomfort from the money saved after shedding an action ratio $a$ from the baseline load $c$ when the price is $p$:
$$r = (c p) - (c (1 - a) p) - c a^2 rac{T}{10}$$
Episodes terminate after seven transitions to keep training iterations fast.

In [None]:
class MicrogridEnv(gym.Env):
    def __init__(self, data):
        super().__init__()
        self.data_matrix = data[[
            'Avg_Temperature_C',
            'Energy_Consumption_kWh',
            'Household_Size',
            'Peak_Hours_Usage_kWh'
        ]].to_numpy(dtype=np.float32)
        self.max_vals = self.data_matrix.max(axis=0) + 1e-5
        self.total_data_len = len(self.data_matrix)
        self.episode_duration = 7
        self.action_space = spaces.Box(low=0.0, high=1.0, shape=(1,), dtype=np.float32)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(4,), dtype=np.float32)

    def reset(self, seed=42, options=None):
        super().reset(seed=seed)
        self.steps_taken = 0
        self.current_step_idx = np.random.randint(0, self.total_data_len - self.episode_duration)
        self._load_data()
        return self._get_obs(), {}

    def step(self, action):
        shed_factor = float(action.item())
        actual_cost = self.consumption * (1 - shed_factor) * self.grid_price
        money_saved = (self.consumption * self.grid_price) - actual_cost
        temp_factor = self.temp / 10.0
        discomfort = self.consumption * (shed_factor ** 2) * temp_factor
        raw_reward = money_saved - discomfort
        self.steps_taken += 1
        self.current_step_idx += 1
        terminated = self.steps_taken >= self.episode_duration
        if not terminated:
            self._load_data()
            new_observation = np.array([self.temp, self.consumption, self.household_size, self.grid_price], dtype=np.float32)
        else:
            new_observation = np.zeros(4, dtype=np.float32)
        return new_observation, raw_reward, terminated, False, {}

    def _load_data(self):
        row = self.data_matrix[self.current_step_idx]
        self.temp = row[0]
        self.consumption = row[1]
        self.household_size = row[2]
        self.peak_usage = row[3]
        self.grid_price = 0.12 + (0.10 * self.peak_usage)

    def _get_obs(self):
        obs = np.array([self.temp, self.consumption, self.household_size, self.grid_price], dtype=np.float32)
        obs[:3] = obs[:3] / self.max_vals[:3]
        return obs


## Advantage actor-critic model
The policy outputs a Gaussian mean $\mu$ and standard deviation $\sigma$ bounded to $[0, 1]$ through a sigmoid transformation. The critic estimates the state value $V(s)$. Loss combines the policy gradient with a squared value error and uses gradient clipping for stability.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class ActorCriticNetwork(nn.Module):
    def __init__(self, input_dim, action_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.actor_mu = nn.Linear(64, action_dim)
        self.actor_sig = nn.Linear(64, action_dim)
        self.critic = nn.Linear(64, 1)

    def forward(self, state):
        x = F.relu(self.fc1(state))
        x = F.relu(self.fc2(x))
        actor_mu = torch.sigmoid(self.actor_mu(x))
        actor_sig = torch.sigmoid(self.actor_sig(x)) * 0.3 + 0.01
        critic_value = self.critic(x)
        return actor_mu, actor_sig, critic_value


## Train the A2C agent
Each episode samples a random seven-step window. For every transition, the algorithm draws an action from $\mathcal{N}(\mu, \sigma)$, computes a one-step return target, and updates both actor and critic. Rewards log the combined savings and discomfort signal.

In [None]:
env = MicrogridEnv(df)
input_dim = 4
action_dim = 1
model = ActorCriticNetwork(input_dim=input_dim, action_dim=action_dim)
gamma = 0.99
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
all_rewards = []
num_episodes = 2500

for episode in range(num_episodes):
    state, _ = env.reset()
    state = torch.from_numpy(state).float().unsqueeze(0)
    episode_reward = 0
    terminated = False
    while not terminated:
        mu, sigma, value = model(state)
        normal_dist = torch.distributions.Normal(mu, sigma)
        action = torch.clamp(normal_dist.sample(), 0.0, 1.0)
        next_state_np, reward, terminated, _, _ = env.step(action)
        next_state = torch.from_numpy(next_state_np).float().unsqueeze(0)
        episode_reward += reward
        _, _, next_value = model(next_state)
        target = reward if terminated else reward + gamma * next_value
        advantage = target - value
        critic_loss = advantage.pow(2)
        actor_loss = -(normal_dist.log_prob(action) * advantage.detach())
        total_loss = actor_loss + critic_loss
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=0.5)
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        state = next_state
    all_rewards.append(episode_reward)
    if episode % 10 == 0:
        print(f"Episode {episode}: Reward {episode_reward:.2f}")

import matplotlib.pyplot as plt
plt.plot(all_rewards)
plt.title("A2C Learning Curve")
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.show()


## Validate pricing and shedding behavior
A held-out sequence evaluates how the trained policy reacts to evolving prices. Actions follow the learned mean without exploration, and plots highlight price sensitivity and the temperature-action relationship.

In [None]:
model.eval()
test_start_idx = env.total_data_len - 49
test_data = env.data_matrix[test_start_idx:]
prices = []
actions = []
temps = []
state_row = test_data[0]
price = 0.15 + (0.20 * state_row[3])
obs = np.array([state_row[0], state_row[1], state_row[2], price], dtype=np.float32)
obs[:3] = obs[:3] / env.max_vals[:3]
current_state = torch.from_numpy(obs).float().unsqueeze(0)
print("Running validation simulation...")
for i in range(49):
    with torch.no_grad():
        mu, sigma, _ = model(current_state)
        action = mu.item()
    real_price = price
    real_temp = state_row[0]
    prices.append(real_price)
    actions.append(action)
    temps.append(real_temp)
    if i < 48:
        state_row = test_data[i + 1]
        price = 0.15 + (0.20 * state_row[3])
        obs = np.array([state_row[0], state_row[1], state_row[2], price], dtype=np.float32)
        obs[:3] = obs[:3] / env.max_vals[:3]
        current_state = torch.from_numpy(obs).float().unsqueeze(0)

import matplotlib.pyplot as plt
fig, ax1 = plt.subplots(figsize=(12, 6))
ax1.set_xlabel("Validation step")
ax1.set_ylabel("Grid price ($/kWh)", color="tab:red")
ax1.plot(prices, color="tab:red", linewidth=2, label="Grid price")
ax1.tick_params(axis="y", labelcolor="tab:red")
ax2 = ax1.twinx()
ax2.set_ylabel("Load shedding", color="tab:blue")
ax2.fill_between(range(len(actions)), actions, color="tab:blue", alpha=0.3, label="Shedding")
ax2.plot(actions, color="tab:blue", linewidth=1)
ax2.tick_params(axis="y", labelcolor="tab:blue")
ax2.set_ylim(0, 1.0)
plt.title("A2C response to dynamic pricing")
fig.tight_layout()
plt.show()
plt.figure(figsize=(8, 6))
plt.scatter(temps, actions, alpha=0.5, c="orange")
plt.xlabel("Temperature (C)")
plt.ylabel("Load shedding")
plt.title("Temperature impact on shedding")
plt.grid(True)
plt.show()
