# EnCortex - Battery Arbitrage

In [11]:
# import all the required general libraries
import os
import numpy as np
import pandas as pd
import seaborn as sns
sns.set() 
import typing as t 
import gym
from gym import spaces
import pytorch_lightning as pl
import matplotlib.pyplot as plt
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity="all"

#import the encortex library and all the required dependencies
from encortex.backend import DFBackend
from encortex.env import EnCortexEnv
from encortex.logger import get_experiment_logger
from encortex.utils.data_loaders import load_data
from encortex.data import MarketData
from encortex.contract import Contract
from encortex.decision_unit import DecisionUnit
from encortex.grid import Grid
from encortex.sources import Battery, BatteryAction

from dfdb import create_in_memory_db

from encortex.environments.battery_arbitrage_env import BatteryArbitrageScenarioEnv
from encortex.optimizers.battery_arbitrage_optimizer import DRLBattOpt, MILPBattOpt, Optimizer
from encortex.datasets.grid import CaliforniaPricesEmissionsData, UKPricesEmissionsData

### Inputs from User: 

In [2]:
#specify the type of optimization to be used:
milp_flag = True #if False use RL
solver = ["ort"] #the algorithm to be used

In [3]:
#provide optimization weights for the objectives
weight_emission = 1.0
weight_price = 1.0
weight_degradation = 0.0

In [6]:
'''
run experiments for the following battery configurations
elements in the list denote different batteries/battery configurations to be used in the scenario together (here just 1 element indicating 1 battery being used)
'''
#the battery capacity (in kWh)
storage_capacity = [10.]

#here charging and discharging efficiency (in %) taken the same/ if different take it differently 
efficiency=[1.]

#the maximum discharge (in %) percentage that can happen at a time, here 90% 
depth_of_discharge = [90.] 

#the minimum state of charge of the battery, below which the battery should not be explored
soc_minimum = [0.1]

#battery decision time steps  
timestep = [np.timedelta64("1","h")]

#whether to have degradation model in place or not for the batteries 
degradation_flag = weight_degradation > 0 

#the battery capacity reduction percentage due to degradation, below which if capacity reduces due to overuse, battery does not stay at good optimal health
min_battery_capacity_factor = [0.8] 

#the battery replacement cost (in $/kWh)
battery_cost_per_kWh = [200.] 

#after every charge-discharge cycles over a certain period, the battery capacity reduced by the reduction coefficienct 
reduction_coefficient = [0.99998] 

# the period after which battery degrades
degradation_period_in_days = [7.] 

#battery actions
action = [BatteryAction("CHARGE_IDLE_DISCHARGE","actions of the battery",spaces.Discrete(3),True,)] 

# initial state of charge of the battery to run the test experiments
# the flag initiates random initial state of charge of the battery during training runs/experiments to avoid overfitting
soc_initial = [0.5] 
test_flag = [False] 

### Instantiating components from the framework:

In [26]:
'''
instantiate battery objects into a list based on the no. of batteries/elements provided in the list of configuration parameters 
'''
batteries = []
for ele in range(len(storage_capacity)):
    battery = Battery(
        timestep=timestep,
        name="Li-Ion Battery",
        id=ele,
        description="Li-Ion Battery",
        storage_capacity=storage_capacity[ele],
        charging_efficiency=efficiency[ele],
        discharging_efficiency=efficiency[ele],
        soc_initial=soc_initial[ele],
        depth_of_discharge=depth_of_discharge[ele],
        soc_minimum=soc_minimum[ele],
        degradation_flag=degradation_flag,
        min_battery_capacity_factor=min_battery_capacity_factor[ele],
        battery_cost_per_kWh=battery_cost_per_kWh[ele],
        reduction_coefficient=reduction_coefficient[ele],
        degradation_period=degradation_period_in_days[ele],
        test_flag=test_flag[ele],
        action=action[ele],
    )
    batteries.append(battery)

22/07/06 13:19:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , forecast_carbon_emissions, forecast_prices, timestamps
 Schema: _c0, forecast_carbon_emissions, forecast_prices, timestamps
Expected: _c0 but found: 
CSV file: file:///home/rluser/vb_cortex/data/forecast_grid_data.csv
22/07/06 13:19:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , actual_carbon_emissions, actual_prices, timestamps
 Schema: _c0, actual_carbon_emissions, actual_prices, timestamps
Expected: _c0 but found: 
CSV file: file:///home/rluser/vb_cortex/data/actual_grid_data.csv


In [28]:
'''
training data is not required for MILP, if working on RL training data is required, read the data from the dataloader, 
parse it to the backend of MArketData and feed it to the Grid class to instantiate a Grid object for training
'''

'''
load forecast and actual data for training - 
The CaliforniaPricesEmissionsData takes in 3 user-specific arguments:
    1. train: A flag saying whether training/test data to load
    2. forecasts: A flag saying whether experiments are to be run on forecasts/actuals
    3. forecast_type: A string specifyng the type of forecast:
        a) noise : Adding noise to the actual values and treating that as forecasts
        b) smoothing : Smoothing the actual values and using that as forecasts
        c) yesterdays : assuming yesterday's actual data as forecasts for today's data
'''
forecast_df = UKPricesEmissionsData(train=True, forecasts=True, )
actual_df = UKPricesEmissionsData(train=True, forecasts=False)

print(forecast_df.data.head(),'\n', actual_df.data.head())

forecast_df.data[['emissions', 'prices']]  = forecast_df.data[['emissions', 'prices']].apply(lambda x: np.float32(x))
actual_df.data[['emissions', 'prices']]  = actual_df.data[['emissions', 'prices']].apply(lambda x: np.float32(x))

#parse the training data to the MarketData backend
grid_data = MarketData.parse_backend(
    len(storage_capacity) + 2,
    True,
    len(storage_capacity) + 2,
    len(storage_capacity) + 2,
    np.timedelta64("5", "m"),
    price_forecast=DFBackend(forecast_df.data['prices'], forecast_df.data['timestamps']),
    
    price_actual=DFBackend(actual_df.data['prices'], actual_df.data['timestamps']),
    carbon_emissions_forecast=DFBackend(forecast_df.data['emissions'], forecast_df.data['timestamps']),
    carbon_emissions_actual=DFBackend(actual_df.data['emissions'], actual_df.data['timestamps']),
    carbon_prices_forecast=DFBackend(forecast_df.data['prices'], forecast_df.data['timestamps']),
    carbon_prices_actual=DFBackend(actual_df.data['prices'], actual_df.data['timestamps']),
    volume_forecast=DFBackend(None, None),
    volume_actual=DFBackend(None, None),
)

#instantiate a training grid Object by feeding in the parse training data as an argument to the Grid class
grid = Grid(
    timestep,
    "Grid",
    len(storage_capacity) + 2,
    "Simple Market as a Grid",
    "*/5 * * * *",
    np.timedelta64(5, "m"),
    np.timedelta64(5, "m"),
    np.timedelta64(5, "m"),
    grid_data,
)

  rank_zero_warn(


### Creating Decision units for the problem statement:

A [decision unit](../encortex/encortex.decision_unit.rst)

In [None]:
#Formulate the decision unit (both for training and testing) by creating contracts between the grid and the battery
def creating_decision_units(batteries, grid, forecast_df):
    contracts = []
    for battery in batteries:
        contracts.append(Contract(grid,battery))
    decision_unit = DecisionUnit(contracts)
    decision_unit.generate_schedule(
        current_reference_time=np.datetime64(pd.Timestamp(forecast_df.data['timestamps'][0]))
    )
    return decision_unit

decision_unit = creating_decision_units(batteries, grid, forecast_df)

### Function to Store Results in a dataframe and then later to csv format:

In [None]:
'''
Dump results into dataframes for later visualizations - 
    1. battery_soc_list: stores the current list of state of charge values for MILP
    2. action_list : list of actions: charging/discharging/idle taken by the optimizer for a step
    3. power_list: power associated with the action taken by the optimization algorithm
    4. carbon_intensity_list: Actual carbon emission intensity values in mTCO2eq/kWh
    5. price_intensity_list: Actual price values in $
    6. reward_list: List of rewards received for taking actions in particular states
    7. carbon_savings_forecast_list : Carbon savings done due to the action taken for a particular state at a certain timestamp
    8. price_savings_forecast_list : Price savings due to the action taken for a particular state at a certain timestamp 
'''
def create_dataframe(
    battery_soc_list, 
    action_list, 
    power_list, 
    carbon_intensity_list,
    price_intensity_list,
    carbon_savings_list,
    price_savings_list,
    reward_list, 
    carbon_savings_forecast_list, 
    price_savings_forecast_list
):
    df=pd.DataFrame()
    df.insert(loc=0, column='Current_SOC', value=battery_soc_list)
    df.insert(loc=1, column='Predicted_Action', value=action_list)
    df.insert(loc=2, column='Predicted_Power_Action', value=power_list)
    df.insert(loc=3, column='Carbon_emissions', value=carbon_intensity_list)
    df.insert(loc=4, column='Carbon_savings', value=carbon_savings_list)
    df.insert(loc=5, column='Forecast_Carbon_savings', value=carbon_savings_forecast_list)
    df.insert(loc=6, column='Price_emissions', value=price_intensity_list)
    df.insert(loc=7, column='Price_savings', value=price_savings_list)
    df.insert(loc=8, column='Forecast_Price_savings', value=price_savings_forecast_list)
    df.insert(loc=9, column='Reward', value=reward_list)  
    return df

#store the results into results folder
dir = os.getcwd()
dir_path = os.path.join(dir, 'results_UK_forecasts/')

if not os.path.isdir(dir_path):
    os.mkdir(dir_path)


In [None]:
'''
Instantiate an environment object from the scenario specific environment class 
'''
if milp_flag:
    step_time_diff = np.timedelta64("1", "D")
else:
    step_time_diff = np.timedelta64("1", "h")
    
env = BatteryArbitrageScenarioEnv(
    decision_unit,
    start_time=forecast_df.data['timestamps'][0],
    timestep=np.timedelta64("1", "h"),
    step_time_difference=step_time_diff,
    horizon=np.timedelta64("1", "D"),
    seed=40,
    weight_emission=weight_emission,
    weight_degradation=weight_degradation,
    weight_price=weight_price,
    milp_flag=milp_flag,
    train_flag=True,
    exp_logger=get_experiment_logger('wandb'),
    logging_interval = 1
)


### Training Pipeline for the algorithms:

In [None]:
#setting a seed for reproducibility of experiment results:
pl.seed_everything(40)
seed = 40

In [None]:
'''
Training Pipeline for RL:
The code in this cell helps to train a RL model, but there could be issues in trying it out in the jupyter cell. 
Hence we also provide a separate training script namely training_RL.py, try that out if the jupyter cell does not work.

During testing just the load the model saved from the training script
'''
if not milp_flag:

    #instantiate an optimizer object based on the optimizer chosen, and create the model
    opt = DRLBattOpt(
        env=env,
        seed=seed,
    )
    
    print("...... Starting Training .......")
    model = opt(
        env,
        train_flag=True,
    )
    print("------Training Completed--------")
    
    #save the model into a folder
    dir = os.getcwd()
    dir_path = os.path.join(dir, 'model_checkpoints/')

    if not os.path.isdir(dir_path):
        os.mkdir(dir_path)

    opt.save(dir_path, "Trained_RL_model")

#### Mixed integer linear programming (MILP) does not require training, hence the code for the MILP section is shown directly while testing

### Testing Pipeline for the algorithms : 

In [None]:
#The test pipeline:
def testing(model, env: BatteryArbitrageScenarioEnv, opt: Optimizer, milp_flag: bool):
    
    #for logging results into csv files
    carbon_intensity_list=[]
    price_intensity_list=[]
    battery_soc_list=[]
    action_list=[]
    power_list=[]
    reward_list=[]
    carbon_savings_list=[]
    price_savings_list=[]
    carbon_savings_forecast_list=[]
    price_savings_forecast_list=[]
    
    #For testing initiate the battery with 50% charge always (initial state of charge of the battery during test experiments : 0.5)
    for batt in env.decision_unit.storage_entities:
        batt: Battery
        batt.current_soc = 0.5
        batt.test = True
        
    #Reset the environment in the beginning and get the state values
    state = env.reset()
    done = False
    net_reward = 0
    steps=0
    
    #run the episode unless done
    while not done:
        print("------------------------------------------------------")
        print("State: ", len(state))
        print("Step no : ", steps)
        print("Battery SOC : ", env.decision_unit.storage_entities[0].current_soc)
        #storing the actual values of emissions & prices in the list for logging purpose
        for grid in env.decision_unit.markets:
            grid:Grid
            carbon_intensity_list+=list(grid.data.carbon_emissions_actual[env.time, env.time+env.step_time_difference].reshape(-1))
            price_intensity_list+=list(grid.data.carbon_prices_actual[env.time, env.time+env.step_time_difference].reshape(-1))
         
        #storing the state of charge of the batteries in a list for logging purpose:
        for batt in env.decision_unit.storage_entities:
            batt: Battery
            battery_soc_list+=[batt.current_soc]
             
        if milp_flag:
            #In MILP first the values are passed as a decision variable/ Affine Expression - the train flag signifies that
            env.train_flag = True
            
            #model called to solve the objective defined in the environment based on the constraints from the framework abstractions
            model = opt(train_flag=True)
            battery_actions = opt.predict(env)
            
            #get the numeric action values as the predicted action results and hence switch off the train flag
            env.train_flag = False
            
            for batt in env.decision_unit.storage_entities:
                batt: Battery
                action_list+=list(battery_actions[batt.id]['Dt']-battery_actions[batt.id]['Ct']+1)
                power_list+=list((battery_actions[batt.id]['Dt']-battery_actions[batt.id]['Ct'])*batt.max_discharging_power)
        else:
            #In RL, since training is already done, just load the model to predict the actions based on the current state
            action = model.predict(state)[0]

            #transform the actions similar to the environment-specific action transformation for uniformity
            battery_actions = {}
            for batt in env.decision_unit.storage_entities:
                
                #storing the power values into a list for logging purpose
                power_list.append((action-1)*batt.max_discharging_power)
                
                # battery_actions[batt.id] = {}
                # battery_actions[batt.id]["time"] = env.time
                # battery_actions[batt.id]["action"] = action

            # storing actions taken into a list for logging purpose:
            action_list+=[action]
            battery_actions = action
        
        #Step to the next_state, based on the actions predicted by the optimizers, getting a reward and indicating whether the episode completed or not
        # print("Battery Actions:", battery_actions)
        next_state, reward, done, info = env.step(battery_actions)
        
        #storing the rest of the state of charges in milp of the batteries in a list for logging purpose:
        if milp_flag:
            for batt in env.decision_unit.storage_entities:
                batt: Battery
                battery_soc_list+=info[batt.id]['soc_list'][:-1]
        
        #storing reward values in a list for logging purpose:
        if milp_flag:
            reward_list+=[0]*(int(env.step_time_difference / env.timestep)-1)
        reward_list+=[reward]
        
        net_reward += reward
        state = next_state
        print("Total reward", net_reward)
        
        #storing the savings values in the lists for logging purposes
#         print(env.carbon_savings_list)
        carbon_savings_list.append(env.carbon_savings_list)
        price_savings_list.append(env.price_savings_list)
        carbon_savings_forecast_list.append(env.carbon_savings_forecast_list)
        price_savings_forecast_list.append(env.price_savings_forecast_list)
        
        steps+=1
    return net_reward, reward_list, battery_soc_list, action_list, power_list, carbon_intensity_list, price_intensity_list, carbon_savings_list[0], price_savings_list[0], carbon_savings_forecast_list[0], price_savings_forecast_list[0]

In [None]:
# Code for MILP to generate results on the training dataset: 
if milp_flag:

    # instantiate an optimizer object based on the optimizer chosen, and create the model
    opt = MILPBattOpt(
        env=env, objective=weight_price, solver=solver[0], seed=seed
    )
    model = opt(train_flag=True)

    # provide the model to the environment so as to prepare the constraints and objectives
    env.set_model(model)

    # test the MILP model
    print("-------Producing results on training set--------")
    net_rewardt, rewardlist, battery_soc_list, action_list, power_list, carbon_intensity_list, price_intensity_list, carbon_savings_list, price_savings_list, carbon_savings_forecast_list, price_savings_forecast_list = testing(
        model, env, opt, milp_flag)

    # dump the results into csv file
    traindf = create_dataframe(battery_soc_list, action_list, power_list, carbon_intensity_list, price_intensity_list,
                          carbon_savings_list, price_savings_list, rewardlist, carbon_savings_forecast_list, price_savings_forecast_list)
    traindf.to_csv(dir_path+"traindf_MILP.csv", index=False)

In [None]:
# Test the Trained RL model on the training data set first:
if not milp_flag: 

    opt = DRLBattOpt(
        env=env,
        seed=seed,
    )

    #load the model first, if trained from the python script training_RL.py 
    opt.load('model_checkpoints_UK/', 'Trained_RL_model')
    
    
    print("-------Producing results on training set--------")
    #test the RL model on the training set
    net_rewardt,rewardlist,  battery_soc_list, action_list, power_list, carbon_intensity_list, price_intensity_list, carbon_savings_list, price_savings_list, carbon_savings_forecast_list, price_savings_forecast_list = testing(opt.model, env, opt, milp_flag)

    # dump the training results into csv file
    traindf= create_dataframe(battery_soc_list, action_list, power_list, carbon_intensity_list, price_intensity_list, carbon_savings_list, price_savings_list,rewardlist,  carbon_savings_forecast_list, price_savings_forecast_list)
    traindf.to_csv(dir_path+"traindf_RL.csv", index=False)

In [None]:
#read test data having emissions & prices values of California dataset
#load forecast and actual data for testing/inference 
forecast_df = UKPricesEmissionsData(train=False, forecasts=True) 
actual_df = UKPricesEmissionsData(train=False, forecasts=False)

forecast_df.data[['emissions', 'prices']]  = forecast_df.data[['emissions', 'prices']].apply(lambda x: np.float32(x))
actual_df.data[['emissions', 'prices']]  = actual_df.data[['emissions', 'prices']].apply(lambda x: np.float32(x))

#parse the test data to the MarketData backend
grid_data = MarketData.parse_backend(
    len(storage_capacity) + 2,
    True,
    len(storage_capacity) + 2,
    len(storage_capacity) + 2,
    np.timedelta64("5", "m"),
    price_forecast=DFBackend(forecast_df.data['prices'], forecast_df.data['timestamps']),
    price_actual=DFBackend(actual_df.data['prices'], actual_df.data['timestamps']),
    carbon_emissions_forecast=DFBackend(forecast_df.data['emissions'], forecast_df.data['timestamps']),
    carbon_emissions_actual=DFBackend(actual_df.data['emissions'], actual_df.data['timestamps']),
    carbon_prices_forecast=DFBackend(forecast_df.data['prices'], forecast_df.data['timestamps']),
    carbon_prices_actual=DFBackend(actual_df.data['prices'], actual_df.data['timestamps']),
    volume_forecast=DFBackend(None, None),
    volume_actual=DFBackend(None, None),
)

#instantiate a test grid Object by feeding in the parsed test data as an argument to the Grid class
grid = Grid(
    timestep,
    "Grid",
    len(storage_capacity) + 2,
    "Simple Market as a Grid",
    "*/5 * * * *",
    np.timedelta64(5, "m"),
    np.timedelta64(5, "m"),
    np.timedelta64(5, "m"),
    grid_data,
)

#modify the training data to test data and run the inference:
env.decision_unit.markets[0] = grid
env.decision_unit.generate_schedule(
        current_reference_time=np.datetime64(pd.Timestamp(forecast_df.data['timestamps'][0]))
    )
env.start_time = forecast_df.data['timestamps'][0]
env.decision_unit.storage_entities[0].current_soc = 0.5

In [None]:
# generate test results for MILP
# Code for MILP to generate results on the training dataset: 
if milp_flag:

    # instantiate an optimizer object based on the optimizer chosen, and create the model
    opt = MILPBattOpt(
        env=env, objective=weight_price, solver=solver[0], seed=seed
    )
    model = opt(train_flag=True)

    # provide the model to the environment so as to prepare the constraints and objectives
    env.set_model(model)

    # test the MILP model
    print("-------Producing results on test set--------")
    net_rewardt, rewardlist, battery_soc_list, action_list, power_list, carbon_intensity_list, price_intensity_list, carbon_savings_list, price_savings_list, carbon_savings_forecast_list, price_savings_forecast_list = testing(
        model, env, opt, milp_flag)

    # dump the results into csv file
    testdf = create_dataframe(battery_soc_list, action_list, power_list, carbon_intensity_list, price_intensity_list,
                          carbon_savings_list, price_savings_list, rewardlist, carbon_savings_forecast_list, price_savings_forecast_list)
    testdf.to_csv(dir_path+"testdf_MILP.csv", index=False)

In [None]:
# generate results for RL test data
if not milp_flag:  
    
    #load the model first, if trained from the python script training_RL.py 
    opt.load('model_checkpoints_UK/', 'Trained_RL_model')
      
    print("....... Starting Inference on the test dataset ........")
    #test the RL model on the test set
    net_rewardt,rewardlist, battery_soc_list, action_list, power_list, carbon_intensity_list, price_intensity_list, carbon_savings_list, price_savings_list, carbon_savings_forecast_list, price_savings_forecast_list = testing(opt.model, env, opt, milp_flag)

    # dump the test results into a csv file
    testdf= create_dataframe(battery_soc_list, action_list, power_list, carbon_intensity_list, price_intensity_list, carbon_savings_list, price_savings_list, rewardlist, carbon_savings_forecast_list, price_savings_forecast_list)
    testdf.to_csv(dir_path+"testdf_RL.csv", index=False)

### Result Visualization through Streamlit:

In [None]:
#Feed the following command to run streamlit in the background that would show the results
#modify the arguments as and when required, feed the training csv file, test csv file and whether the files are solved using MILP/RL. 
import os
pid=os.system(f"python -m streamlit run streamlit_analyze.py --server.address 127.0.0.1 --server.port 8501 -- --weight_emission {weight_emission} --weight_price {weight_price} &")

In [None]:
from IPython.display import IFrame
IFrame(src='http://127.0.0.1:8501',  width=1000, height=1050)