In [11]:
import sys, os
sys.path.insert(0, os.path.abspath("../../../"))
sys.path.append("enflow/examples/energy_community")

from dataclasses import dataclass
import typing as t 

import gymnasium as gym
import energydatamodel as edm
import enflow as ef

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import ScalarFormatter, FixedLocator, MultipleLocator

from pyomo.environ import AbstractModel,Set,Param,Var,Objective,Constraint,SolverFactory
from pyomo.environ import NonNegativeReals, PositiveReals, inequality
from pyomo.environ import value
solver_path = "/opt/homebrew/Cellar/cbc/2.10.7_1/bin/cbc"
solver_path = "C:/Solvers/Cbc-releases.2.10.10-w64-msvc17-md/bin/cbc.exe"

from gse.gse import create_model, instantiate_model, solve_model, get_results
from gse.pyomo_input import create_static_input, create_dynamic_input

In [2]:
# Import data
df_load = pd.read_csv("./data/load.csv", index_col=0, parse_dates=True)
df_pv = pd.read_csv("./data/pv.csv", index_col=0, parse_dates=True)
df_price = pd.read_csv("./data/grid.csv", index_col=0, parse_dates=True)
df_battery = pd.read_csv("./data/battery.csv", index_col=0)

In [3]:
# What to do next? 
# 0. Create energy assets
# 1. Create the state space and action space classes
# 2. Create the objective function class. This should take in the state space and action space classes
# 3. Create the environment class
# 4. Create the problem class
# 5. Create the agent class
# 6. Create the instatiate the experiment class
# 7. Run the experiment
# 8. Get the results and save the results

In [4]:
# Create the energy assets
all_houses = df_load.columns
houses_with_pv = df_pv.columns
houses_with_battery = df_battery.columns

houses = []
for house_name in all_houses:
    house = edm.House(name=house_name, timeseries=edm.TimeSeries(df=df_load, column_name=house_name, filename="load.csv"))

    if house_name in houses_with_pv:
        pvsystem = edm.PVSystem(timeseries=edm.TimeSeries(df=df_pv, column_name=house_name, filename="pv.csv"))
        house.add_assets(pvsystem)

    if house_name in houses_with_battery:
        battery = edm.Battery(capacity_kwh=df_battery.loc["capacity", house_name],
                              min_soc_kwh=df_battery.loc["min soc", house_name],
                              max_charge_kw=df_battery.loc["charging power", house_name],
                              max_discharge_kw=df_battery.loc["discharging power", house_name],
                              charge_efficiency=df_battery.loc["charging efficiency", house_name],
                              discharge_efficiency=df_battery.loc["discharging efficiency", house_name])
        house.add_assets(battery)

    houses.append(house)

energycommunity = edm.EnergyCommunity(assets=houses)

In [5]:
# Should the time series be nd.arrays, pd.DataFrames or edm.TimeSeries?
@dataclass
class SingleHouseState: 
    time_range_index: pd.DatetimeIndex
    demand_timeseries: np.ndarray
    price_timeseries: np.ndarray
    pv_timeseries: t.Optional[np.ndarray] = None
    initial_battery_soc: t.Optional[float] = None
    final_battery_soc: t.Optional[float] = None

In [6]:
class SingleHouseEnvironment(gym.Env):

    def __init__(self, house, df_price, initial_battery_soc=None, final_battery_soc=None):
        self.house = house
        self.df_price = df_price
        self.initial_battery_soc = initial_battery_soc
        self.final_battery_soc = final_battery_soc

        self.state = None
        self.action = None
        self.reward = None
        self.done = False
        self.info = {}

    def step(self, action):
        # In this case, we will not calculate a next state.
        # We should check that the solution is feasible. Or should it be an Action method to check for feasibility?
        # Then we should return some sort of flag telling if the solution is feasible or not.
        
        timedelta = 0.25 # To derive it based on data freq
        psell = np.zeros(len(self.state.time_range_index), dtype = float)
        pbuy = np.zeros(len(self.state.time_range_index), dtype = float)
        soc = np.zeros(len(self.state.time_range_index), dtype = float)
        battery_charge = np.zeros(len(self.state.time_range_index), dtype = float)
        battery_discharge = np.zeros(len(self.state.time_range_index), dtype = float)
        load = np.zeros(len(self.state.time_range_index), dtype = float)
        pv = np.zeros(len(self.state.time_range_index), dtype = float)
       
        if self.house.has_demand():
            load = np.array(df_load[self.house.assets[0].timeseries.column_name]) # Needs to be done in a better way
        if self.house.has_pvsystem():
            pv = np.array(df_pv[self.house.assets[0].timeseries.column_name]) # Needs to be done in a better way
        if self.house.has_battery():
            # dir(self.house)
            
            # Check action length
            # if (len(self.state.time_range_index) != len(action.battery_charge))or(len(self.state.time_range_index) != len(action.battery_discharge)):
            #     print('Not Feasible - Different Length')

            # # Check battery charging power feasibility
            # if any(action.battery_charge > self.house.get_batteries()[0].max_charge_kw):
            #     print('Not Feasible - battery_charge')
 
            # # Check battery discharging power feasibility
            # if any(action.battery_discharge > self.house.get_batteries()[0].max_discharge_kw):
            #     print('Not Feasible - battery_discharge')    
            
            for i in range(len(env.state.time_range_index)):

                if i == 0:
                    previous_soc = self.state.initial_battery_soc
                else:
                    previous_soc = soc[i-1]
                    
                battery_charge[i] = action.battery_charge[i]
                battery_discharge[i] = action.battery_discharge[i]
                  
                if battery_charge[i] > self.house.get_batteries()[0].max_charge_kw:
                    battery_charge[i] = self.house.get_batteries()[0].max_charge_kw
                    print(f'Battery charge in step {i} is limited to max_charge_kw')
                if battery_discharge[i] > self.house.get_batteries()[0].max_discharge_kw:
                    battery_discharge[i] = self.house.get_batteries()[0].max_discharge_kw 
                    print(f'Battery discharge in step {i} is limited to max_discharge_kw')
                
                soc[i] = previous_soc + self.house.get_batteries()[0].charge_efficiency*battery_charge[i]*timedelta - (1/self.house.get_batteries()[0].discharge_efficiency)*battery_discharge[i]*timedelta

                
                # Check battery soc limits
                if soc[i] > self.house.get_batteries()[0].capacity_kwh:
                    soc[i] = self.house.get_batteries()[0].capacity_kwh
                    battery_charge[i] = (soc[i] - previous_soc)/(self.house.get_batteries()[0].charge_efficiency*timedelta)
                    
                if soc[i] < self.house.get_batteries()[0].min_soc_kwh:
                    soc[i] = self.house.get_batteries()[0].min_soc_kwh
                    battery_discharge[i] = (previous_soc - soc[i])*self.house.get_batteries()[0].discharge_efficiency/timedelta   
        
        # Calculate power trading quantities
        trade = pv-load+battery_discharge-battery_charge
        psell[trade>0] = trade[trade>0]
        pbuy[trade<0] = -trade[trade<0]
        
        # Calculate cost
        cost_grid = (np.array(self.df_price['community fee'])+np.array(self.df_price['grid fee']))*(pbuy + psell)*timedelta
        cost_energy = np.array(self.df_price['market rate'])*pbuy*timedelta - np.array(self.df_price['feedin tariff'])*psell*timedelta
    
        return psell, pbuy, soc, battery_charge, battery_discharge, cost_grid, cost_energy
    
    

    
    def reset(self):
        self.state = SingleHouseState(time_range_index=house.timeseries.df.index,
                                      demand_timeseries=house.timeseries,
                                      price_timeseries=self.df_price)
        
        if self.house.has_pvsystem():
            pvsystem = house.get_pvsystems()[0]
            self.state.pv_timeseries = pvsystem.timeseries
        
        if self.house.has_battery():
            self.state.initial_battery_soc = self.initial_battery_soc
            self.state.final_battery_soc = self.final_battery_soc

        return self.state

    def render(self, psell, pbuy, soc, battery_charge, battery_discharge, cost_grid, cost_energy): # Include quantities into state

        load = np.zeros(len(self.state.time_range_index), dtype = float)
        pv = np.zeros(len(self.state.time_range_index), dtype = float)
        self.df_price
       
        if self.house.has_demand():
            load = np.array(df_load[self.house.assets[0].timeseries.column_name]) # Needs to be done in a better way
        if self.house.has_pvsystem():
            pv = np.array(df_pv[self.house.assets[0].timeseries.column_name]) # Needs to be done in a better way

        
        linewidth = 1.5
        fontsize = 12

        # Share both X and Y axes with all subplots
        fig, (ax1, ax2, ax3, ax4, ax5, ax6) = plt.subplots(6, 1, sharex='all', sharey='none', figsize=(15, 12))
        ax1.plot(df_load[self.house.assets[0].timeseries.column_name].index, load, linestyle='solid', color='blue', linewidth=linewidth)
        ax1.set_ylabel('Load [kW]', fontsize=fontsize)
        ax1.set_title(f'House {self.house.assets[0].timeseries.column_name}')
        # ax1.set_yticks(list(range(0,10,1)))
        ax1.grid(color='gray', axis = 'both', visible = True)
 
        ax2.plot(df_pv[self.house.assets[0].timeseries.column_name].index, pv, linestyle='solid', color='orange', linewidth=linewidth)
        ax2.set_ylabel('PV [kW]', fontsize=fontsize)
        ax2.grid(color='gray', axis = 'both', visible = True)
        
        ax3.plot(df_pv[self.house.assets[0].timeseries.column_name].index, soc, linestyle='solid', color='red', linewidth=linewidth)
        ax3.set_ylabel('Battery SOC [kWk]', fontsize=fontsize)
        ax3.grid(color='gray', axis = 'both', visible = True)
        
        ax4.plot(df_pv[self.house.assets[0].timeseries.column_name].index, pbuy, linestyle='solid', color='brown', linewidth=linewidth)
        ax4.set_ylabel('Grid import [kW]', fontsize=fontsize)
        ax4.grid(color='gray', axis = 'both', visible = True)
        
        ax5.plot(df_pv[self.house.assets[0].timeseries.column_name].index, psell, linestyle='solid', color='green', linewidth=linewidth)
        ax5.set_ylabel('Grid export [kW]', fontsize=fontsize)
        ax5.grid(color='gray', axis = 'both', visible = True)
        
        ax6.plot(df_pv[self.house.assets[0].timeseries.column_name].index, cost_grid+cost_energy, linestyle='solid', color='black', linewidth=linewidth)
        ax6.set_ylabel('Cost [EUR]', fontsize=fontsize)
        ax6.grid(color='gray', axis = 'both', visible = True)
        
        plt.tick_params(axis='x', rotation=0)
        fig.align_labels()
        
        return fig

    def close(self):
        # Close the environment
        pass

In [7]:
@dataclass
class Action:
    # Somehow we want to constriain the action space using bounds. 
    # Maybe use the gym spaces class? Box then add attribute for index datatime
    # Should action include a method to check for feasibility?
    time_range_index: pd.DatetimeIndex
    power_buy: np.ndarray
    power_sell: np.ndarray
    battery_soc: t.Optional[np.ndarray] = None
    battery_charge: t.Optional[np.ndarray] = None
    battery_discharge: t.Optional[np.ndarray] = None  


In [8]:
# Instead of an Agent, this could also be an Optimizer? Doesn't matter.
class SingleHouseAgent:
    def __init__(self, house):
        self.house = house

        self.abstract_model = create_model()
        self.static_data = create_static_input(self.house)

    def action(self, state):
        
        # Create model data
        dynamic_data = create_dynamic_input(state, self.house)
        model_data = {None: {**self.static_data, **dynamic_data}}
        # Instantiate model with data
        model_instance = self.abstract_model.create_instance(model_data)
        # Solve model
        optimizer = SolverFactory("cbc", executable=solver_path)
        optimizer.solve(model_instance, tee=True, keepfiles=False)
        # Convert results to action
        power_buy = value(model_instance.PL1_BUY[:,house.name])
        power_sell = value(model_instance.PL1_SELL[:,house.name])
        battery_charge = value(model_instance.B_IN[:,house.name])
        battery_discharge = value(model_instance.B_OUT[:,house.name])
        battery_soc = value(model_instance.B_SOC[:,house.name])

        action = Action(time_range_index=state.time_range_index,
                        power_buy=power_buy, 
                        power_sell=power_sell,
                        battery_charge=battery_charge, 
                        battery_discharge=battery_discharge, 
                        battery_soc=battery_soc)

        return action
        

In [10]:
house = energycommunity.assets[0]

env = SingleHouseEnvironment(house=house, df_price=df_price, initial_battery_soc=0, final_battery_soc=0)
agent = SingleHouseAgent(house=house)

initial_state = env.reset()
action = agent.action(initial_state)

psell, pbuy, soc, battery_charge, battery_discharge, cost_grid, cost_energy = env.step(action)

executable for solver cbc. File with
name=/opt/homebrew/Cellar/cbc/2.10.7_1/bin/cbc either does not exist or it is
not executable. To skip this validation, call set_executable with
validate=False.
Traceback (most recent call last):
  File "C:\Users\dimili\virtualenvs\basic\Lib\site-packages\pyomo\opt\base\solvers.py", line 151, in __call__
    opt = self._cls[_name](**kwds)
          ^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\dimili\virtualenvs\basic\Lib\site-packages\pyomo\solvers\plugins\solvers\CBCplugin.py", line 90, in __init__
    super(CBCSHELL, self).__init__(**kwds)
  File "C:\Users\dimili\virtualenvs\basic\Lib\site-packages\pyomo\opt\solver\shellcmd.py", line 67, in __init__
    self.set_executable(name=executable, validate=validate)
  File "C:\Users\dimili\virtualenvs\basic\Lib\site-packages\pyomo\opt\solver\shellcmd.py", line 116, in set_executable
    raise ValueError(
ValueError: Failed to set executable for solver cbc. File with name=/opt/homebrew/Cellar/cbc/2.10.7_1/bin/

RuntimeError: Attempting to use an unavailable solver.

The SolverFactory was unable to create the solver "cbc"
and returned an UnknownSolver object.  This error is raised at the point
where the UnknownSolver object was used as if it were valid (by calling
method "solve").

The original solver was created with the following parameters:
	executable: /opt/homebrew/Cellar/cbc/2.10.7_1/bin/cbc
	type: cbc
	_args: ()
	options: {}

In [9]:
fig = env.render(psell, pbuy, soc, battery_charge, battery_discharge, cost_grid, cost_energy)

NameError: name 'env' is not defined

In [None]:
from dataclasses import dataclass, fields
import gymnasium as gym
from gymnasium import spaces

@dataclass
class MyDataClass:
    name: spaces.Box(0, 1, shape=(2,))
    age: spaces.Box(0, 1, shape=(2,))
    is_student: spaces.Box(0, 1, shape=(2,))

def dataclass_to_dict(dc):
    return {field.name: field.type for field in fields(dc)}

# Convert the dataclass structure to a dictionary
dc_dict = dataclass_to_dict(MyDataClass)

s = spaces.Dict(dc_dict)