# Notebook checks

In [2]:
!hostname

gpu-16


In [3]:
!nvidia-smi

Wed Aug  3 15:35:03 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 510.47.03    Driver Version: 510.47.03    CUDA Version: 11.6     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  NVIDIA A100-SXM...  On   | 00000000:01:00.0 Off |                    0 |
| N/A   25C    P0    52W / 400W |      3MiB / 40960MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
|   1  NVIDIA A100-SXM...  On   | 00000000:41:00.0 Off |                    0 |
| N/A   26C    P0    53W / 400W |      3MiB / 40960MiB |      0%      Default |
|       

# Utils

In [1]:
import torch

from torch import Tensor

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')


def create_zeros_tensor(size: int) -> Tensor:
    """
        Create a Tensor with zeros of a defined size and mounted in the available device.
    :param size: int
        Size of the tensor.
    :return:
        Tensor
    """
    return torch.zeros(size).to(device)


def create_ones_tensor(size: int) -> Tensor:
    """
        Create a Tensor with ones of a defined size and mounted in the available device.
    :param size: int
        Size of the tensor.
    :return:
        Tensor
    """
    return torch.ones(size).to(device)

def normalize(value, min_value, max_value):
    """
        Normalize a value given its max and min value from the distribution it came from.
    Parameters
    ----------
    value: float
        Value to be normalized.
    min_value:
        Maximum value in the distribution.
    max_value:
        Minimum value in the distribution.

    Returns
    -------
        Normalized value
    """
    return (value - min_value) / (max_value - min_value)

# Grid

In [2]:
import torch

from torch import Tensor
from typing import TypedDict


class GridParameters(TypedDict):
    """
        max_export: float
            Value representing the maximum export power to the grid (kW).
        max_import: float
            Value representing the maximum import power from the grid (kW).
        price_export: float
            Value representing the cost of exporting to the grid (currency/kWh).
        price_import: float
            Value representing the cost of importing to the grid (currency/kWh).
        status: int, boolean
            Binary value representing whether the grid is connected or not (for example 0 represent a black-out of the
            main grid).
        co2: float
            Carbon footprint of the energy using this generator (CO2 g./kWh).
    """
    max_export: float
    max_import: float
    price_export: float
    price_import: float
    status: bool
    co2: float


class Grid:

    def __init__(self, parameters=None):
        """

        Class that defines the operation conditions for the energy grid.

        Parameters
        ----------
        parameters : GridParameters

            Dict of configurations with the following shape:

                {
                    max_export: float,
                    max_import: float,
                    price_export: float,
                    price_import: float,
                    status: bool,
                    co2: float
                }

        """

        # Check empty parameters configuration

        if parameters is None:
            parameters = {
                'max_export': 50.0,
                'max_import': 50.0,
                'price_export': 0.25,
                'price_import': 0.8,
                'status': True,
                'co2': 1.0
            }

        # Initialize the class attributes

        self.max_export = parameters['max_export']
        self.max_import = parameters['max_import']
        self.price_export = parameters['price_export']
        self.price_import = parameters['price_import']
        self.status = parameters['status']
        self.co2 = parameters['co2']

    def check_constraints(self, input_power: Tensor):
        """
            Check the generator limits to define the output power.
        Parameters
        ----------
        input_power: Tensor
            Demanded grid power (kW).

        Returns
        -------
            output_power: Tensor
                Grid output power according to the constraints check.
        """

        # Check upper limit

        real_power = torch.where(input_power > self.max_import, self.max_import, input_power)

        # Check lower limit

        real_power = torch.where(-real_power > self.max_export, self.max_export, real_power)

        return real_power


# Battery

In [3]:
import torch
import numpy as np

from torch import Tensor
from typing import TypedDict

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
torch.set_default_dtype(torch.float64)


class BatteryParameters(TypedDict):
    """
        soc_0: float
            Value between 0 and 1 representing the initial state of charge of the battery (1 being full, 0 being empty).
        capacity: int
            Energy capacity of the battery/array of batteries in (kWh).
        soc_max: float
            Value between 0 and 1 representing the highest value the SoC a battery can reach.
        soc_min: float
            Value between 0 and 1 representing the lowest value the SoC a battery can reach.
        p_charge_max: float
            Maximum charging rate of a battery (%).
        p_discharge_max: float
            Minimum charging rate of a battery (%).
        efficiency: float
            Value between 0 and 1 representing a one-way efficiency of the battery considering same efficiency for
            charging and discharging (%).
        buy_price: float
            Price for using energy from the battery ($/kWh).
        sell_price: float
            Price for injecting energy to the battery (reward to the prosumers).
    """
    soc_0: float
    capacity: int
    soc_max: float
    soc_min: float
    p_charge_max: float
    p_discharge_max: float
    efficiency: float
    buy_price: float
    sell_price: float


class Battery:
    def __init__(self, params=None, batch_size: int = 1):
        """

        Representation of a Battery and the basic parameters that define its operation.

        Parameters
        ----------
            batch_size : int
                Number of batteries to represent in the case of a batch training/testing.
            params: BatteryParameters

                Dict of configurations with the following shape:

                {
                    soc_0: float,
                    capacity: int,
                    soc_max: float,
                    soc_min: float,
                    p_charge_max: float,
                    p_discharge_max: float,
                    efficiency: float,
                    buy_price: float,
                    sell_price: float,
                }

        """

        # Check empty parameters configuration

        if params is None:
            params = {
                'soc_0': 0.1,
                'capacity': 1.0,
                'soc_max': 0.9,
                'soc_min': 0.1,
                'p_charge_max': 0.5,
                'p_discharge_max': 0.5,
                'efficiency': 0.9,
                'buy_price': 0.6,
                'sell_price': 0.6
            }

        # Initialize the class attributes

        self.batch_size = batch_size
        self.soc_0 = params['soc_0']
        self.capacity = params['capacity']
        self.soc_max = params['soc_max']
        self.soc_min = params['soc_min']
        self.p_charge_max = params['p_charge_max']
        self.p_discharge_max = params['p_discharge_max']
        self.efficiency = params['efficiency']
        self.buy_price = params['buy_price']
        self.sell_price = params['sell_price']
        self.capacity_to_charge = None
        self.capacity_to_discharge = None
        self.soc = self.initialize_soc()

        # Initialize the capacity status

        self.compute_capacity_status()

    def initialize_soc(self):
        """
            Initialize the SoC according to the batch size
        :return:

            init_values: Tensor:
                Tensor with the initialization values.
        """
        return (Tensor([self.soc_0]) if self.batch_size == 1 else Tensor(
            [np.random.uniform(low=self.soc_min, high=self.soc_max) for _ in range(self.batch_size)]
        )).to(device)

    def reset_battery(self):
        """
            Reset the battery to the initialization state.
        :return:
            None
        """
        self.soc = self.initialize_soc()
        self.capacity_to_charge = None
        self.capacity_to_discharge = None

    def compute_capacity_status(self):
        """
           Computes the capacity need to fully charge and completely discharge the battery.
        Returns
        -------
            None
        """
        self.capacity_to_charge = torch.maximum(
            (self.soc_max * self.capacity - self.soc * self.capacity) / self.efficiency,
            create_zeros_tensor(size=self.batch_size)
        )

        self.capacity_to_discharge = torch.maximum(
            (self.soc * self.capacity - self.soc_min * self.capacity) / self.efficiency,
            create_zeros_tensor(size=self.batch_size)
        )

    def check_battery_constraints(self, input_power: Tensor) -> (Tensor, Tensor):
        """
            Check the physical constrains of the battery to define the maximum power it can charge or discharge.
        Parameters
        ----------
        input_power: Tensor
            Amount of energy that is required from the battery, could be positive for charging or negative for
            discharging.

        Returns
        -------
            p_charge: Tensor
                Power of charge given the input power.
            p_discharge: Tensor
                Power of discharge given the input power.
            new_soc: Tensor
                Value between 0 and 1 indicating the new SoC after charging or discharging.
        """

        # Initialize the charge and discharge power

        p_charge = torch.maximum(input_power, create_zeros_tensor(size=self.batch_size)).to(device)
        p_discharge = torch.maximum(-input_power, create_zeros_tensor(size=self.batch_size)).to(device)

        # Compute the capacities to charge or discharge

        self.compute_capacity_status()

        # Check battery constraints

        min_charge = torch.minimum(
            self.capacity_to_charge,
            create_ones_tensor(size=self.batch_size) * self.p_charge_max
        )

        p_charge = torch.where(
            p_charge > min_charge,
            min_charge,
            p_charge
        )

        max_discharge = torch.minimum(
            self.capacity_to_discharge,
            create_ones_tensor(size=self.batch_size) * self.p_discharge_max
        )

        p_discharge = torch.where(
            p_discharge > max_discharge,
            max_discharge,
            p_discharge
        )

        return p_charge, p_discharge

    def compute_new_soc(self, p_charge: Tensor, p_discharge: Tensor):
        """
            Compute the new SoC according to an instruction for charging/discharging.
        :param p_charge: Tensor indicating the power of charge for the batch battery.
        :param p_discharge: Tensor indicating the power of discharge for the batch battery.
        :return:
            None
        """
        self.soc = self.soc + (p_charge * self.efficiency - p_discharge / self.efficiency) / self.capacity


# Generator

In [4]:
import torch
from torch import Tensor
from typing import TypedDict


class GeneratorParameters(TypedDict):
    """
        rated_power: float
            Maximum rated power of the generator (kW).
        p_min: float
            Value representing the minimum operating power of the generator (%)
        p_max: float
            Value representing the maximum operating power of the generator (%)
        fuel_cost: float
            Value representing the cost of using the generator ($/kWh).
        co2: float
            Carbon footprint of the energy using this generator (CO2 g./kWh)
    """
    rated_power: float
    p_min: float
    p_max: float
    fuel_cost: float
    co2: float


class Generator:
    def __init__(self, parameters=None):
        """

        Class that contains the characteristics of a local source of energy that uses fossil fuels.

        Parameters
        ----------
        parameters : GeneratorParameters

            Dict of configurations with the following shape:

                {
                    rated_power: int,
                    p_min: float,
                    p_max: float,
                    fuel_cost: float,
                    co2: float
                }

        """

        # Check empty parameters configuration

        if parameters is None:
            parameters = {
                'rated_power': 1.5,
                'p_min': 0.9,
                'p_max': 0.1,
                'fuel_cost': 0.5,
                'co2': 2.0
            }

        # Initialize the class attributes

        self.fuel_cost = parameters['fuel_cost']
        self.p_max = parameters['p_max']
        self.p_min = parameters['p_min']
        self.rated_power = parameters['rated_power']
        self.co2 = parameters['co2']

    def check_constraints(self, input_rate: Tensor):
        """
            Check the generator limits to define the output power.
        Parameters
        ----------
        input_rate: Tensor
            Demanded generator power (kW).

        Returns
        -------
            output_power: Tensor
                Generator output power according to the constraints check.
        """

        # Check upper limit

        real_rate = torch.where(input_rate > self.p_max, self.p_max, input_rate)

        # Check lower limit

        real_rate = torch.where(real_rate < self.p_min, self.p_min, real_rate)

        return real_rate * self.rated_power


# Load

In [5]:
import pandas as pd

from typing import TypedDict, Literal


LoadTypes = Literal[
    'industrial_1', 'industrial_2', 'industrial_3', 'public_1', 'public_2', 'residential_1',
    'residential_2', 'residential_3', 'residential_4', 'residential_5', 'residential_6'
]


class LoadParameters(TypedDict):
    """
        load_type: LoadTypes
            String that describes the kind of load according to the possible values in LoadTypes.
    """
    load_type: LoadTypes


class LoadProfile:

    def __init__(self, params=None):
        """

        Represents the load of an energy user of a defined type in kW

        Parameters
        ----------
        params : LoadParameters

            Dict of configurations with the following shape:

            {
                load_type: LoadTypes
            }

        """

        # Check empty parameters configuration

        if params is None:
            params = LoadParameters(load_type='residential_1')

        # Initialize the class attributes

        self.load_type = params['load_type']
        self._load_ts = self._get_year_data()

    def _get_year_data(self):
        """

        Get from the sample dataset the load of a year

        Returns
        -------

            load_data: pd.DataFrame

                DataFrame containing the year of the selected type of load.

        """
        load_data = pd.read_csv('opsd_household_data.csv')

        return load_data[f'{self.load_type}_grid_import'].diff(-1).abs().fillna(0)

    def get_step_load(self, time_step: int = 0):
        """

        Get the load at the current time step, also increase the time step.

            time_step : int
                Value of the required time step to get the value from.

        Returns
        -------

            load_t: float

                Load at current time step in kW.

        """
        load_t = self._load_ts[time_step]

        return load_t

    # TODO: Define a function that predicts coming load from previous n-steps


# PV

In [6]:
import pvlib
import pandas as pd

from typing import TypedDict, Union
from pvlib.forecast import GFS
from pvlib.pvsystem import PVSystem, FixedMount, Array
from pvlib.location import Location
from pvlib.modelchain import ModelChain


class Coordinates(TypedDict):
    latitude: float
    longitude: float
    name: str
    altitude: float
    timezone: str


class PVCharacteristics(TypedDict):
    """
        n_arrays: int
            Number of solar panel arrays.
        modules_per_string: int
            Number of solar panels per string.
        n_strings: int
            Number of strings per array.
        surface_tilt: int
            Degrees of tilt of each module mount.
        surface_azimuth: int
            Azimuth of the module surface over the mount.
        solar_panel_ref: string
            Reference of the solar panel module as it appear in SAM.
        inverter_ref: string
            Reference of the solar panel inverter as it appear in SAM.
    """
    n_arrays: int
    modules_per_string: int
    n_strings: int
    surface_tilt: int
    surface_azimuth: int
    solar_panel_ref: str
    inverter_ref: str


class PVParameters(TypedDict):
    """
        coordinates: Coordinates
            Dictionary with the location info of the PV system.
        year: int
            Year for the estimation.
    """
    coordinates: Coordinates
    pv_parameters: PVCharacteristics
    year: int


class PVGeneration:

    def __init__(self, params=None):
        """

        This represents a PV generation system located in a particular location in a defined year.

        Parameters
        ----------
        params: PVParameters

            Dict of configurations with the following shape:

            {
                coordinates: Coordinates,
                year: int
            }

        """
        # Check empty parameters configuration

        if params is None:
            params = PVParameters(
                coordinates=Coordinates(
                    latitude=24.4274827,
                    longitude=54.6234876,
                    name='Masdar',
                    altitude=0,
                    timezone='Asia/Dubai'
                ),
                pv_parameters=PVCharacteristics(
                    n_arrays=1,
                    modules_per_string=10,
                    n_strings=1,
                    surface_tilt=20,
                    surface_azimuth=180,
                    solar_panel_ref='Canadian_Solar_CS5P_220M___2009_',
                    inverter_ref='iPower__SHO_5_2__240V_'
                ),
                year=2022
            )

        # Initialize the class attributes

        self.coordinates = params['coordinates']
        self.pv_parameters = params['pv_parameters']
        self.year = params['year']
        self._model_chain = None
        self._weather_ts = None
        self._generation_ts = None

    def configure_pv_system(self):
        """

        Set up the solar panel configuration to be used.

        Returns
        -------
            None
        """

        n_arrays = self.pv_parameters['n_arrays']
        modules_per_string = self.pv_parameters['modules_per_string']
        n_strings = self.pv_parameters['n_strings']
        surface_tilt = self.pv_parameters['surface_tilt']
        surface_azimuth = self.pv_parameters['surface_azimuth']
        solar_panel_ref = self.pv_parameters['solar_panel_ref']
        inverter_ref = self.pv_parameters['inverter_ref']

        # Configure the solar panel and inverter specifications from SAM (Default)

        sandia_modules = pvlib.pvsystem.retrieve_sam('sandiamod')
        sapm_inverters = pvlib.pvsystem.retrieve_sam('cecinverter')

        module_params = sandia_modules[solar_panel_ref]
        inverter_params = sapm_inverters[inverter_ref]
        temp_params = pvlib.temperature.TEMPERATURE_MODEL_PARAMETERS['sapm']['open_rack_glass_glass']

        mount = FixedMount(surface_tilt=surface_tilt, surface_azimuth=surface_azimuth)

        location = Location(
            latitude=self.coordinates['latitude'],
            longitude=self.coordinates['longitude'],
            name=self.coordinates['name'],
            altitude=self.coordinates['altitude'],
            tz=self.coordinates['timezone'],
        )

        # Build PV array

        pv_array = []

        for _ in range(n_arrays):
            pv_array.append(
                Array(
                    mount=mount,
                    module_parameters=module_params,
                    temperature_model_parameters=temp_params,
                    modules_per_string=modules_per_string,
                    strings=n_strings
                )
            )

        # Define the system for the instance

        pv_system = PVSystem(arrays=pv_array, inverter_parameters=inverter_params)

        self._model_chain = ModelChain(system=pv_system, location=location, aoi_model='no_loss',
                                       spectral_model='no_loss')

        # Get the generation estimation and weather initialization

        self._weather_ts = self._get_tmy()
        self._generation_ts = self._get_estimate_generation()

    def _get_tmy(self):
        """

        Returns a typical meteorological year with resolution of 1 hour (8760 data points) for the PV location.

        Returns
        -------
            weather: DataFrame
                Contains a set with the following meteorological attributes:
                    temp_air           float64
                    relative_humidity  float64
                    ghi                float64
                    dni                float64
                    dhi                float64
                    IR(h)              float64
                    wind_speed         float64
                    wind_direction     float64
                    pressure
        """
        latitude = self.coordinates['latitude']
        longitude = self.coordinates['longitude']
        timezone = self.coordinates['timezone']

        weather = pvlib.iotools.get_pvgis_tmy(latitude, longitude, map_variables=True)[0]
        weather.index = pd.date_range(
            f'{self.year}-01-01T00:00:00', f'{self.year}-12-31T23:59:59', freq='60T', tz=timezone
        )
        weather.index.name = "utc_time"

        return weather

    def _get_estimate_generation(self):
        """
        Returns an estimation of the ac power, with 1h resolution, for a year of the defined solar panel.
        Parameters
        ----------

        Returns
        -------
        ac: DataFrame
            Hourly AC power for the given year with the configured solar panel.
        """

        weather = self._get_tmy()
        self._model_chain.run_model(weather)

        return self._model_chain.results.ac

    def get_step_generation(self, time_step: int = 0):
        """

        Get generation in kW at current time step, also increase the current time step.

        Parameters
        ----------
            time_step : int
                Value of the required time step to get the value from.
        Returns
        -------
            pv_power_t: float

                Output power of the PV system at current time step.

        """
        pv_power_t = self._generation_ts[time_step]
        return pv_power_t / 1000  # Convert to kW

    def get_forecast_generation(self, start: Union[int, str], n_days_ahead: int = 7):
        """

        Forecast generation n_days_ahead from a starting date (that should be at max. 30 before current day).

        Parameters
        ----------
        start: date

            String with the ISO format date string.

        n_days_ahead: int

            Days ahead to consider in the prediction.

        Returns
        -------

        """

        latitude = self.coordinates['latitude']
        longitude = self.coordinates['longitude']
        timezone = self.coordinates['timezone']

        initial_date = pd.Timestamp(start, tz=timezone)
        final_date = initial_date + pd.Timedelta(days=n_days_ahead)

        # GFS model, defaults to 0.5 degree resolution, 0.25 deg available

        fx_model = GFS()
        fx_data = fx_model.get_processed_data(latitude, longitude, initial_date, final_date)
        fx_data = fx_data.resample('1h').interpolate()

        self._model_chain.run_model(fx_data)

        return self._model_chain.results.ac

    def get_ghi(self, time_step: int = 0, normalized: bool = True):
        """

        Get Global Horizontal Irradiation at time current time step.

        Parameters
        ----------
            time_step : int
                Value of the required time step to get the value from.
            normalized : bool
                Return normalized value or not.

        Returns
        -------
            ghi: float
                Global Horizontal Irradiation.
        """
        value = self._weather_ts['ghi'][time_step]
        max_value = self._weather_ts['ghi'].max()
        min_value = self._weather_ts['ghi'].min()

        return value if not normalized else normalize(value, max_value, min_value)

    def get_dni(self, time_step: int = 0, normalized: bool = True):
        """

        Get Direct Normal Irradiation at time current time step.

        Parameters
        ----------
            time_step : int
                Value of the required time step to get the value from.
            normalized : bool
                Return normalized value or not.

        Returns
        -------
            dni: float
                Direct Normal Irradiation.
        """

        value = self._weather_ts['dni'][time_step]
        max_value = self._weather_ts['dni'].max()
        min_value = self._weather_ts['dni'].min()

        return value if not normalized else normalize(value, max_value, min_value)

    def get_dhi(self, time_step: int = 0, normalized: bool = True):
        """

        Get Diffused Horizontal Irradiation at time current time step.

        Parameters
        ----------
            time_step : int
                Value of the required time step to get the value from.
            normalized : bool
                Return normalized value or not.

        Returns
        -------
            dhi: float
                Diffused Horizontal Irradiation.
        """

        value = self._weather_ts['dhi'][time_step]
        max_value = self._weather_ts['dhi'].max()
        min_value = self._weather_ts['dhi'].min()

        return value if not normalized else normalize(value, max_value, min_value)

    def get_air_temperature(self, time_step: int = 0, normalized: bool = True):
        """

        Get Air Temperature at time current time step.

        Parameters
        ----------
            time_step : int
                Value of the required time step to get the value from.
            normalized : bool
                Return normalized value or not.

        Returns
        -------
            air_temperature: float
                Air Temperature.
        """

        value = self._weather_ts['temp_air'][time_step]
        max_value = self._weather_ts['temp_air'].max()
        min_value = self._weather_ts['temp_air'].min()

        return value if not normalized else normalize(value, max_value, min_value)

    def get_wind_speed(self, time_step: int = 0, normalized: bool = True):
        """

        Get Wind Speed at time current time step.

        Parameters
        ----------
            time_step : int
                Value of the required time step to get the value from.
            normalized : bool
                Return normalized value or not.

        Returns
        -------
            wind_speed: float
                Wind Speed.
        """

        value = self._weather_ts['wind_speed'][time_step]
        max_value = self._weather_ts['wind_speed'].max()
        min_value = self._weather_ts['wind_speed'].min()

        return value if not normalized else normalize(value, max_value, min_value)

    def get_relative_humidity(self, time_step: int = 0, normalized: bool = True):
        """

        Get Relative Humidity at time current time step.

        Parameters
        ----------
            time_step : int
                Value of the required time step to get the value from.
            normalized : bool
                Return normalized value or not.

        Returns
        -------
            relative_humidity: float
                Relative Humidity.
        """

        value = self._weather_ts['relative_humidity'][time_step]
        max_value = self._weather_ts['relative_humidity'].max()
        min_value = self._weather_ts['relative_humidity'].min()

        return value if not normalized else normalize(value, max_value, min_value)

    def get_pressure(self, time_step: int = 0, normalized: bool = True):
        """

        Get Pressure at time current time step.

        Parameters
        ----------
            time_step : int
                Value of the required time step to get the value from.
            normalized : bool
                Return normalized value or not.

        Returns
        -------
            pressure: float
                Pressure.
        """

        value = self._weather_ts['pressure'][time_step]
        max_value = self._weather_ts['pressure'].max()
        min_value = self._weather_ts['pressure'].min()

        return value if not normalized else normalize(value, max_value, min_value)




# Microgrid

In [7]:
import torch
import numpy as np

from typing import TypedDict
from torch import Tensor

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
torch.set_default_dtype(torch.float64)


class MicrogridArchitecture(TypedDict):
    pv: bool
    battery: bool
    generator: bool
    grid: bool


class MicrogridParameters(TypedDict):
    pv: PVParameters
    load: LoadParameters
    battery: BatteryParameters
    generator: GeneratorParameters
    grid: GridParameters


class MicrogridAction(TypedDict):
    use_pv: bool
    use_generator: bool
    use_grid: bool
    charge_battery: bool


class Microgrid:

    def __init__(self, batch_size: int, arch: MicrogridArchitecture = None, params: MicrogridParameters = None):
        """

        Parameters
        ----------
        batch_size: int
            Number of parallel runs of the current microgrid.
        arch : MicrogridArchitecture
            Dict indicating whether a microgrid contains a resource or not.
        params : MicrogridParameters
            Dict of parameters for the components of the microgrid.
        """

        # Check empty parameters configuration

        if arch is None:
            arch = {
                'pv': True,
                'battery': True,
                'generator': True,
                'grid': True
            }

        # Initialize the class attributes

        self.batch_size = batch_size
        self.architecture = arch
        self._current_t = 0

        # Configure the microgrid

        self._load = LoadProfile(params=params['load'] if params is not None else None)
        if arch['pv']:
            self._pv = PVGeneration(params=params['pv'] if params is not None else None)
            self._pv.configure_pv_system()
        if arch['battery']:
            self._battery = Battery(batch_size=batch_size, params=params['battery'] if params is not None else None)
        if arch['generator']:
            self._generator = Generator(parameters=params['generator'] if params is not None else None)
        if arch['grid']:
            self._grid = Grid(parameters=params['grid'] if params is not None else None)

    def observe_by_source_selection(self) -> np.ndarray:
        load_t = self._load.get_step_load(self.get_current_step())
        pv_t = 0

        if self.architecture['pv']:
            pv_t = self._pv.get_step_generation(self.get_current_step())

        return np.array([load_t, pv_t])

    def operation_by_source_selection(self, action: int) -> (float, float, float):

        # Process the action as MicrogridAction

        binary_action = [bit == '1' for bit in "{0:3b}".format(0)]

        action = MicrogridAction(
            use_pv=binary_action[0],
            use_grid=binary_action[1],
            use_generator=binary_action[2],
            charge_battery=False
        )

        load, pv = self.observe_by_source_selection()
        grid = 0.0
        generator = 0.0
        available_grid_supply = 0.0
        # battery = 0.0

        # Surplus could be negative (there is lack of energy) or positive (energy to export).

        surplus = pv - load

        # Is there is a lack of energy we get energy from the grid, when there is excess the energy we export.

        if self.architecture['grid'] and action['use_grid']:

            available_grid_supply = self._grid.max_export + surplus

            # If available grid supply is negative, the grid cannot meet the load, we need extra generation
            if available_grid_supply < 0:
                grid = self._grid.max_export
            else:
                # A positive grid value means its exporting energy, a negative that its importing
                grid = -surplus

        # In the case the grid is not able to provide energy, we get the energy from the generator

        generator_max_power = self._generator.p_max * self._generator.rated_power

        if self.architecture['generator'] and action['use_generator'] and action['use_grid']:

            available_generator_supply = generator_max_power + available_grid_supply

            # If available generator supply is negative, there is not enough energy in the microgrid
            if available_generator_supply < 0:
                generator = generator_max_power
            # Lack of grid energy, the generator gives the extra energy.
            elif available_generator_supply < generator_max_power:
                generator = available_grid_supply

        # If the grid is not enabled, we supply the rest of energy with the generator

        if self.architecture['generator'] and action['use_generator'] and not action['use_grid']:

            available_generator_supply = generator_max_power + surplus

            # If available generator supply is negative, there is not enough energy in the microgrid
            if available_generator_supply < 0:
                generator = generator_max_power
            else:
                # If there is surplus, it is just discarded as there is no grid.
                generator = max(-surplus, 0)

        # Check and compute if there is unmet load

        unmet_load = load - pv - grid - generator

        # Compute grid operation cost

        cost = grid * self._grid.price_export if grid > 0 else grid * self._grid.price_import
        cost += generator * self._generator.fuel_cost
        cost += unmet_load * 1.5 * self._grid.price_export if unmet_load > 0 else 0

        # Increase time step

        self._current_t += 1

        return self.observe_by_source_selection(), cost

    def observe_by_setting_generator(self) -> Tensor:

        # Get the observations

        soc = self._battery.soc
        ghi = self._pv.get_ghi(self.get_current_step())
        pressure = self._pv.get_pressure(self.get_current_step())
        wind_speed = self._pv.get_wind_speed(self.get_current_step())
        air_temperature = self._pv.get_air_temperature(self.get_current_step())
        relative_humidity = self._pv.get_relative_humidity(self.get_current_step())

        # The only observation that could change depending on the action is the SoC

        return torch.stack((
            soc,
            create_ones_tensor(len(soc)) * ghi,
            create_ones_tensor(len(soc)) * pressure,
            create_ones_tensor(len(soc)) * wind_speed,
            create_ones_tensor(len(soc)) * air_temperature,
            create_ones_tensor(len(soc)) * relative_humidity,
        ), dim=1)

    def operation_by_setting_generator(self, power_rate: Tensor) -> (np.ndarray, float):

        load = self._load.get_step_load(self.get_current_step())
        pv = self._pv.get_step_generation(self.get_current_step())
        generator = self._generator.check_constraints(input_rate=power_rate)

        # Decide the interaction with the battery

        remaining_power = generator + pv - load

        p_charge, p_discharge = self._battery.check_battery_constraints(input_power=remaining_power)
        self._battery.compute_new_soc(p_charge=p_charge, p_discharge=p_discharge)

        # Check if all the energy is attended

        power_after_battery = remaining_power + p_discharge
        unattended_power = torch.maximum(-power_after_battery, create_zeros_tensor(self.batch_size))

        # Compute grid operation cost, unattended power is penalized with more expensive fuel

        cost = (generator - p_discharge + unattended_power * 1.5) * self._generator.fuel_cost

        # Compute next state

        next_state = self.observe_by_setting_generator()

        # Increase time step

        self._current_t += 1

        return next_state, cost

    def get_current_step(self):
        """
            Returns the current time step.
        Returns
        -------
            self.current_t: int
                Current microgrid time step
        """
        return self._current_t % 8760

    def reset_current_step(self):
        """
            Resets the current time step.
        Returns
        -------
            None
        """
        self._current_t = 0


# Environment

In [8]:
import numpy as np

from gym import Env
from gym.spaces import Box
from torch import Tensor

inf = np.float64('inf')


class MGSetGenerator(Env):

    def __init__(self, mg_arch: MicrogridArchitecture, mg_params: MicrogridParameters, batch_size: int = 1):
        """
        Gym environment to simulate a Microgrid scenario
        """

        """
        Observation space is composed by:
        
            soc: [0,1]
            ghi: [0, 1064.0] normalized
            pressure: [98930.0, 102370.0] normalized
            wind_speed: [0.17, 11.57] normalized
            air_temperature: [10.67, 45.5] normalized
            relative_humidity = [11.55, 94.0] normalized
        
        """

        self.observation_space = Box(
            low=np.float32(np.array([0.0, 0.0, 0.0, 0.0, 0, 0])),
            high=np.float32(np.array([1.0, 1, 1, 1, 1, 1])),
            shape=(6,),
            dtype=np.float32
        )

        """
        Action space is a single continuous value defined by a Normal distribution with two parameters (actions):
            fossil generator power: Value between 0 and 1 that indicates the rate of the nominal power it should use.
        """

        self.action_space = Box(
            low=0,
            high=1,
            shape=(1,),
            dtype=np.float32
        )

        self.mg = Microgrid(batch_size=batch_size, arch=mg_arch, params=mg_params)

    def _observe(self):
        return self.mg.observe_by_setting_generator()

    def step(self, action: Tensor):
        state, cost = self.mg.operation_by_setting_generator(power_rate=action)

        state = state
        reward = -cost
        done = False
        info = {}

        return state, reward, done, info

    def reset(self):
        self.mg.reset_current_step()
        return self._observe(), 0, False, {}

    def render(self, mode="human"):
        print('Rendering not defined yet')


# A2C

In [None]:
"""

    Advantage Actor Critic (A2C) algorithm implementation

    Credits: Nicolás Cuadrado, MBZUAI, OptMLLab

"""
import os
import traceback
import numpy as np
import torch
import wandb

from gym import Env
from torch import Tensor
from torch.nn import Module, Sequential, Linear, LeakyReLU
from torch.optim import Adam
from torch.distributions import Normal
from dotenv import load_dotenv

torch.set_default_dtype(torch.float64)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# Initialize Wandb for logging purposes

load_dotenv()
wandb.login(key=str(os.environ.get("WANDB_KEY")))

# Define global variables

zero = 1e-5


# Misc. methods

def set_all_seeds(seed):
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True


class Actor(Module):

    def __init__(self, num_inputs, num_actions, hidden_size=64):
        super(Actor, self).__init__()

        self.model = Sequential(
            Linear(num_inputs, hidden_size),
            LeakyReLU(),
            Linear(hidden_size, hidden_size),
            LeakyReLU(),
            Linear(hidden_size, num_actions * 2)  # For each continuous action, a mu and a sigma
        )

    def forward(self, state: Tensor) -> (Tensor, Tensor):
        normal_params = self.model(state)

        mu = normal_params[:, 0]
        sigma = normal_params[:, 1]

        # Guarantee that the standard deviation is not negative

        sigma = torch.exp(sigma) + zero

        return mu, sigma


class Critic(Module):

    def __init__(self, num_inputs, hidden_size=64, ):
        super(Critic, self).__init__()

        self.model = Sequential(
            Linear(num_inputs, hidden_size),
            LeakyReLU(),
            Linear(hidden_size, hidden_size),
            LeakyReLU(),
            Linear(hidden_size, 1)
        )

    def forward(self, state: Tensor) -> Tensor:
        return self.model(state)


class Agent:

    def __init__(
            self, env: Env, gamma: float = 0.99, rollout_steps: int = 5, hidden_size: int = 64,
            actor_lr: float = 1e-4, critic_lr: float = 1e-4,
    ):

        # Parameter initialization

        self.env = env
        self.gamma = gamma
        self.rollout_steps = rollout_steps

        # Configure neural networks

        dim_obs = env.observation_space.shape[0]
        dim_action = env.action_space.shape[0]

        self.actor = Actor(num_inputs=dim_obs, num_actions=dim_action, hidden_size=hidden_size).to(device)
        self.critic = Critic(num_inputs=dim_obs, hidden_size=hidden_size).to(device)

        self.actor.optimizer = Adam(params=self.actor.parameters(), lr=actor_lr)
        self.critic.optimizer = Adam(params=self.critic.parameters(), lr=critic_lr)

        # Hooks into the models to collect gradients and topology

        wandb.watch(models=(self.actor, self.critic))

    def select_action(self, state: Tensor):

        mu, sigma = self.actor(state)
        dist = Normal(loc=mu, scale=sigma)
        action = dist.sample()
        log_prob = dist.log_prob(action)

        return action, log_prob

    def rollout(self):

        states, rewards, log_probs = [], [], []

        # Get the initial state by resetting the environment

        state, _, _, _ = self.env.reset()

        for step in range(self.rollout_steps):
            # Start by appending the state to create the states trajectory

            state = state.to(device)
            states.append(state)

            # Perform action and pass to next state

            action, log_prob = self.select_action(state)
            state, reward, _, _ = self.env.step(action=action)

            rewards.append(reward)
            log_probs.append(log_prob)

        return states, rewards, log_probs

    def train(self, training_steps: int = 1000):

        for step in range(training_steps):

            # Perform rollouts and sample trajectories

            states, rewards, log_probs = self.rollout()

            log_probs = torch.stack(log_probs, 0)
            value = [self.critic(state) for state in states]

            value = torch.stack(value, 0).squeeze()

            # Causality trick

            sum_rewards = []
            causal_reward = 0

            for reward in reversed(rewards):
                causal_reward = torch.clone(causal_reward + reward)
                sum_rewards.insert(0, causal_reward)

            sum_rewards = torch.stack(sum_rewards, 0)

            # Backpropagation to train Actor NN

            actor_loss = -torch.mean(torch.sum(log_probs * (sum_rewards - value.detach())))
            self.actor.optimizer.zero_grad()
            actor_loss.backward()
            self.actor.optimizer.step()

            # Backpropagation to train Critic NN

            critic_loss = torch.mean((value - sum_rewards) ** 2)
            self.critic.optimizer.zero_grad()
            critic_loss.backward()
            self.critic.optimizer.step()

            wandb.log({
                "rollout_avg_reward": torch.mean(sum_rewards),
                "actor_loss": actor_loss,
                "critic_loss": critic_loss
            })


if __name__ == '__main__':

    try:
        '''
            Define the Microgrid parameters
        '''

        exp_pv_params = PVParameters(
            coordinates=Coordinates(
                latitude=24.4274827,
                longitude=54.6234876,
                name='Masdar',
                altitude=0,
                timezone='Asia/Dubai'
            ),
            pv_parameters=PVCharacteristics(
                n_arrays=1,
                modules_per_string=10,
                n_strings=1,
                surface_tilt=20,
                surface_azimuth=180,
                solar_panel_ref='Canadian_Solar_CS5P_220M___2009_',
                inverter_ref='iPower__SHO_5_2__240V_'
            ),
            year=2022
        )
        exp_load_params = LoadParameters(load_type='residential_1')
        exp_battery_params = BatteryParameters(
            soc_0=0.1,
            soc_max=0.9,
            soc_min=0.1,
            p_charge_max=5,
            p_discharge_max=5,
            efficiency=0.9,
            capacity=50,
            sell_price=0.6,
            buy_price=0.6
        )

        exp_generator_params = GeneratorParameters(
            rated_power=2.5,
            p_max=0.9,
            p_min=0.1,
            fuel_cost=0.4,
            co2=2
        )

        exp_microgrid_arch = MicrogridArchitecture(
            pv=True,
            battery=True,
            generator=True,
            grid=False
        )

        exp_microgrid_params = MicrogridParameters(
            pv=exp_pv_params,
            load=exp_load_params,
            battery=exp_battery_params,
            generator=exp_generator_params,
            grid=None
        )

        '''
            Define the simulation parameters
        '''

        batch_size = 128
        agent_training_steps = 500
        agent_gamma = 0.99
        agent_rollout_steps = 24 * 365  # Hours * Days
        agent_actor_lr = 1e-3
        agent_critic_lr = 3e-3

        '''
            Setup all the configurations for Wandb
        '''

        wandb.init(
            project="cont-a2c-mg-set-gen",
            entity="madog",
            config={
                "batch_size": batch_size,
                "training_steps": agent_training_steps,
                "gamma": agent_gamma,
                "rollout_steps": agent_rollout_steps,
                "agent_actor_lr": agent_actor_lr,
                "agent_critic_lr": agent_critic_lr,
            }
        )

        # Define the custom x-axis metric
        wandb.define_metric("test_step")

        # Define the x-axis for the plots: (avoids an issue with Wandb step autoincrement on each log call)

        wandb.define_metric("test_reward", step_metric='test_step')

        '''
            Run the simulator
        '''

        set_all_seeds(420)

        # Instantiate the environment

        mg_env = MGSetGenerator(mg_arch=exp_microgrid_arch, mg_params=exp_microgrid_params, batch_size=batch_size)

        # Instantiate the agent
        agent = Agent(
            env=mg_env, gamma=agent_gamma, rollout_steps=agent_rollout_steps, critic_lr=agent_actor_lr,
            actor_lr=agent_actor_lr
        )

        # Launch the training

        agent.train(training_steps=agent_training_steps)

        # Test the trained model

        t_state, _, _, _ = agent.env.reset()

        for t_step in range(24 * 365):
            # Perform action and pass to next state

            t_action, _ = agent.select_action(t_state)
            t_state, t_reward, _, _ = agent.env.step(action=t_action)

            wandb.log({
                "test_step": t_step,
                "test_action": t_action,
                "test_reward": t_reward
            })

        # Finish wandb process

        wandb.finish()

    except (RuntimeError, KeyboardInterrupt):

        traceback.print_exc()
        wandb.finish()


[34m[1mwandb[0m: Appending key for api.wandb.ai to your netrc file: /home/nicolas.avila/.netrc
