# Energy Environment Implementation

This notebook contains the implementation of the energy environment used in the MARL system. The environment simulates a residential energy system with solar panels, grid connection, and battery storage.

In [None]:
import gymnasium as gym
import numpy as np
from typing import Dict, List, Tuple, Any
import pypsa
import os
import sys
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime
import logging

# Configure logging to suppress PyPSA INFO messages
logging.getLogger('pypsa').setLevel(logging.WARNING)

project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(project_root)

from src.utils.network_visualization import NetworkVisualizer

class EnergyResilienceEnv(gym.Env):
    """Environment for simulating energy resilience in critical infrastructure.
    This environment has a multi-agent system where each agent controls
    different energy sources."""

## Environment Initialization

The `__init__` method sets up the environment with all necessary components:

In [None]:
def __init__(self, config: Dict[str, Any]):
    super().__init__()
    
    # Environment configuration
    self.config = config
    self.n_agents = config.get('n_agents', 3)
    self.time_step = 0
    self.max_steps = 96  # 24 hours * 4 (15-minute intervals)
    self.blackout = False
    self.episode_count = 0
    
    # Load solar generation data
    self.solar_data = pd.read_csv('data/archive (2)/Plant_1_Generation_Data.csv')
    self.solar_data['DATE_TIME'] = pd.to_datetime(self.solar_data['DATE_TIME'], format='%d-%m-%Y %H:%M')
    
    # Scale down solar generation to match residential installation
    self.solar_scaling_factor = 5.0 / self.solar_data['AC_POWER'].max()
    self.solar_data['AC_POWER_kWh'] = self.solar_data['AC_POWER'] * self.solar_scaling_factor * 0.25
    
    # Solar investment costs
    self.solar_installation_cost = 15000
    self.solar_lifespan_years = 25
    self.daily_solar_cost = self.solar_installation_cost / (self.solar_lifespan_years * 365)
    
    # Battery configuration
    self.battery_capacity = 12.5
    self.battery_charge = self.battery_capacity * 0.6
    self.battery_reserve_threshold = 0.2 * self.battery_capacity
    self.battery_efficiency = 0.9
    self.battery_investment_cost = 0.001
    self.battery_power_rating = 5
    
    # Solar tracking
    self.daily_solar_generated = 0
    self.solar_energy_storage = 0
    self.available_solar = 0
    
    # Solar cost and limits
    self.solar_cost_per_kwh = 0.02
    self.solar_maintenance_cost = 0.005
    
    # Grid pricing (in $/kWh)
    self.peak_price = 0.58672
    self.off_peak_price = 0.46366
    self.peak_hours = set(range(16, 21))
    
    # Initialize network and components
    self.network = pypsa.Network()
    self._initialize_network()
    self._initialize_pricing_and_demand()
    
    # Initialize visualizer
    self.visualizer = NetworkVisualizer(self.network)
    
    # Create output directory
    self.viz_dir = config.get('viz_dir', 'visualizations')
    os.makedirs(self.viz_dir, exist_ok=True)
    
    # Define spaces
    self.observation_space = self._get_observation_space()
    self.action_space = self._get_action_space()
    
    # Metrics tracking
    self.metrics = {
        'energy_cost': [],
        'resilience_score': [],
        'reliability_score': []
    }
    
    # Communication channel
    self.communication_channel = {
        'resilience_agent': None,
        'cost_agent': None
    }
    
    # Initialize tracking
    self.cumulative_energy = {
        'solar': 0.0,
        'grid': 0.0,
        'battery': 0.0
    }
    self.cumulative_costs = {
        'solar': 0.0,
        'grid': 0.0,
        'battery': 0.0,
        'penalty': 0.0
    }

## Network Initialization

The `_initialize_network` method sets up the power network with all necessary components:

In [None]:
def _initialize_network(self):
    # Add buses
    self.network.add('Bus', 'grid_bus', v_nom=1.0)
    self.network.add('Bus', 'solar_bus', v_nom=1.0)
    self.network.add('Bus', 'battery_bus', v_nom=1.0)
    self.network.add('Bus', 'load_bus', v_nom=1.0)
    
    # Add generators
    self.network.add('Generator', 'grid_generator', bus='grid_bus', p_nom=100, marginal_cost=0.5)
    self.network.add('Generator', 'solar_generator', bus='solar_bus', p_nom=5, marginal_cost=0.02)
    
    # Add storage
    self.network.add('StorageUnit', 'battery', bus='battery_bus', p_nom=5, max_hours=2.5, efficiency_store=0.9, efficiency_dispatch=0.9)
    
    # Add loads
    self.network.add('Load', 'load', bus='load_bus', p_set=10)
    
    # Add lines
    self.network.add('Line', 'grid_to_load', bus0='grid_bus', bus1='load_bus', x=0.1, r=0.01, s_nom=100)
    self.network.add('Line', 'solar_to_load', bus0='solar_bus', bus1='load_bus', x=0.1, r=0.01, s_nom=5)
    self.network.add('Line', 'battery_to_load', bus0='battery_bus', bus1='load_bus', x=0.1, r=0.01, s_nom=5)
    self.network.add('Line', 'solar_to_battery', bus0='solar_bus', bus1='battery_bus', x=0.1, r=0.01, s_nom=5)
    
    # Add transformers
    self.network.add('Transformer', 'grid_transformer', bus0='grid_bus', bus1='load_bus', x=0.1, s_nom=100)
    self.network.add('Transformer', 'solar_transformer', bus0='solar_bus', bus1='load_bus', x=0.1, s_nom=5)
    self.network.add('Transformer', 'battery_transformer', bus0='battery_bus', bus1='load_bus', x=0.1, s_nom=5)

## Pricing and Demand Initialization

The `_initialize_pricing_and_demand` method sets up the pricing structure and demand profiles:

In [None]:
def _initialize_pricing_and_demand(self):
    # Updated grid pricing for different periods ($/kWh)
    self.pricing = {
        'morning': 0.28,
        'afternoon': 0.15,
        'evening': 0.47
    }

    # Base demand profile
    self.base_demand_profile = {
        'morning': 10,
        'afternoon': 8,
        'evening': 12
    }
    
    # Add random variation to demand profile
    variation = np.random.uniform(0.8, 1.2)
    self.demand_profile = {
        period: self.base_demand_profile[period] * variation
        for period in self.base_demand_profile.keys()
    }

    # Solar generation potential (kWh) for different periods
    self.solar_potential = {
        'morning': 4.0,
        'afternoon': 6.0,
        'evening': 2.0
    }

## Observation and Action Spaces

These methods define the observation and action spaces for the environment:

In [None]:
def _get_observation_space(self) -> gym.spaces.Dict:
    return gym.spaces.Dict({
        'solar_generation': gym.spaces.Box(low=0, high=5, shape=(1,), dtype=np.float32),
        'battery_charge': gym.spaces.Box(low=0, high=self.battery_capacity, shape=(1,), dtype=np.float32),
        'grid_price': gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32),
        'demand': gym.spaces.Box(low=0, high=20, shape=(1,), dtype=np.float32),
        'time_of_day': gym.spaces.Box(low=0, high=24, shape=(1,), dtype=np.float32),
        'blackout': gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
    })

def _get_action_space(self) -> gym.spaces.Dict:
    return gym.spaces.Dict({
        'solar_usage': gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32),
        'battery_charge': gym.spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float32),
        'grid_usage': gym.spaces.Box(low=0, high=1, shape=(1,), dtype=np.float32)
    })

## Environment Reset

The `reset` method initializes the environment for a new episode:

In [None]:
def reset(self, seed=None, options=None) -> Tuple[Dict[str, np.ndarray], Dict[str, Any]]:
    super().reset(seed=seed)
    
    # Reset time and state
    self.time_step = 0
    self.blackout = False
    self.episode_count += 1
    
    # Reset battery
    self.battery_charge = self.battery_capacity * 0.6
    
    # Reset solar tracking
    self.daily_solar_generated = 0
    self.solar_energy_storage = 0
    self.available_solar = 0
    
    # Reset cumulative metrics
    self.cumulative_energy = {
        'solar': 0.0,
        'grid': 0.0,
        'battery': 0.0
    }
    self.cumulative_costs = {
        'solar': 0.0,
        'grid': 0.0,
        'battery': 0.0,
        'penalty': 0.0
    }
    
    # Reset metrics
    self.metrics = {
        'energy_cost': [],
        'resilience_score': [],
        'reliability_score': []
    }
    
    # Get initial observation
    observation = self._get_observation()
    info = {
        'time_step': self.time_step,
        'blackout': self.blackout,
        'battery_charge': self.battery_charge,
        'solar_generation': self.available_solar
    }
    
    return observation, info

## Action Application

The `_apply_actions` method processes the actions taken by the agents:

In [None]:
def _apply_actions(self, actions: Dict[str, np.ndarray]) -> None:
    # Extract actions
    solar_usage = actions['solar_usage'][0]
    battery_action = actions['battery_charge'][0]
    grid_usage = actions['grid_usage'][0]
    
    # Get current demand
    current_demand = self._get_current_demand()
    
    # Calculate available solar
    self.available_solar = self._get_solar_generation()
    
    # Apply solar usage
    solar_used = min(self.available_solar * solar_usage, current_demand)
    self.available_solar -= solar_used
    
    # Apply battery actions
    if battery_action > 0:  # Charging
        charge_amount = min(
            battery_action * self.battery_power_rating,
            self.battery_capacity - self.battery_charge,
            self.available_solar
        )
        self.battery_charge += charge_amount * self.battery_efficiency
        self.available_solar -= charge_amount
    elif battery_action < 0:  # Discharging
        discharge_amount = min(
            abs(battery_action) * self.battery_power_rating,
            self.battery_charge
        )
        self.battery_charge -= discharge_amount
        current_demand -= discharge_amount * self.battery_efficiency
    
    # Apply grid usage
    grid_used = min(grid_usage * 10, current_demand)  # Assuming max grid power of 10 kW
    
    # Update cumulative energy
    self.cumulative_energy['solar'] += solar_used
    self.cumulative_energy['battery'] += abs(battery_action) * self.battery_power_rating
    self.cumulative_energy['grid'] += grid_used
    
    # Check for blackout
    total_supply = solar_used + grid_used + (abs(battery_action) * self.battery_power_rating if battery_action < 0 else 0)
    if total_supply < current_demand * 0.9:  # 90% of demand must be met
        self.blackout = True
        self.cumulative_costs['penalty'] += 100  # Penalty for blackout

## Environment Step

The `step` method advances the environment by one time step:

In [None]:
    def step(self, actions: Dict[str, np.ndarray]) -> Tuple[Dict[str, np.ndarray], float, bool, bool, Dict[str, Any]]:
        # Apply actions
        self._apply_actions(actions)
        
        # Advance time
        self.time_step += 1
        
        # Calculate reward
        reward = self._calculate_reward()
        
        # Check termination conditions
        terminated = self.time_step >= self.max_steps
        truncated = False
        
        # Get observation
        observation = self._get_observation()
        
        # Prepare info
        info = {
            'time_step': self.time_step,
            'blackout': self.blackout,
            'battery_charge': self.battery_charge,
            'solar_generation': self.available_solar,
            'cumulative_energy': self.cumulative_energy,
            'cumulative_costs': self.cumulative_costs
        }
        
        return observation, reward, terminated, truncated, info

## Reward Calculation

The `_calculate_reward` method computes the reward for the current state:

In [None]:
    def _calculate_reward(self) -> float:
        # Get current time of day
        current_hour = (self.time_step * 15) // 60  # Convert 15-minute intervals to hours
        
        # Calculate costs
        solar_cost = self.cumulative_energy['solar'] * self.solar_cost_per_kwh
        grid_cost = self.cumulative_energy['grid'] * (self.peak_price if current_hour in self.peak_hours else self.off_peak_price)
        battery_cost = self.cumulative_energy['battery'] * self.battery_investment_cost
        
        # Calculate total cost
        total_cost = solar_cost + grid_cost + battery_cost + self.cumulative_costs['penalty']
        
        # Calculate resilience score
        resilience_score = 1.0 - (total_cost / (self.cumulative_energy['solar'] + self.cumulative_energy['grid'] + self.cumulative_energy['battery'] + 1e-6))
        
        # Calculate reliability score
        reliability_score = 0.0 if self.blackout else 1.0
        
        # Store metrics
        self.metrics['energy_cost'].append(total_cost)
        self.metrics['resilience_score'].append(resilience_score)
        self.metrics['reliability_score'].append(reliability_score)
        
        # Calculate final reward
        reward = resilience_score + reliability_score - (total_cost / 100)  # Scale down cost impact
        
        return reward

## Observation Generation

The `_get_observation` method generates the current observation:

In [None]:
    def _get_observation(self) -> Dict[str, np.ndarray]:
        # Get current time of day
        current_hour = (self.time_step * 15) // 60  # Convert 15-minute intervals to hours
        
        # Get current demand
        current_demand = self._get_current_demand()
        
        # Get current grid price
        current_price = self.peak_price if current_hour in self.peak_hours else self.off_peak_price
        
        # Get solar generation
        solar_generation = self._get_solar_generation()
        
        return {
            'solar_generation': np.array([solar_generation], dtype=np.float32),
            'battery_charge': np.array([self.battery_charge], dtype=np.float32),
            'grid_price': np.array([current_price], dtype=np.float32),
            'demand': np.array([current_demand], dtype=np.float32),
            'time_of_day': np.array([current_hour], dtype=np.float32),
            'blackout': np.array([1.0 if self.blackout else 0.0], dtype=np.float32)
        }
        
    def _get_current_demand(self) -> float:
        current_hour = (self.time_step * 15) // 60
        if 6 <= current_hour < 12:
            return self.demand_profile['morning']
        elif 12 <= current_hour < 18:
            return self.demand_profile['afternoon']
        else:
            return self.demand_profile['evening']
            
    def _get_solar_generation(self) -> float:
        current_hour = (self.time_step * 15) // 60
        if 6 <= current_hour < 12:
            return self.solar_potential['morning']
        elif 12 <= current_hour < 18:
            return self.solar_potential['afternoon']
        else:
            return self.solar_potential['evening']