This notebook includes the BESS optimization script using the predicted electricity prices

In [None]:
#Variables for the market data
# Alberta Internal Load (MW)
# Predicted pool price / pool price (for backtesting) ($/MWh)
# Solar generation (MW)
# Wind generation (MW)
# Supply cushion (MW) [ supply cushion = AIL - (solar + wind) ]
# Supply cushion as a percentage of AIL (%)

#Variables for the battery data
# Battery generation (MW) [vBattPower]
# Battery charging power (MW) [vCharge]
# Battery discharging power (MW) [vDischarge]
# Battery state of charge (SOC) (%) [vSOC]
# Battery charging status (charging-1 or discharging-0) [vChargeStatus]

#Constants for the battery data
# Battery Max charging power (MW) [max_charge_rate]
# Battery Max discharging power (MW) [max_discharge_rate]
# Battery initial state of charge (SOC) (%) [init_soc]
# Battery minimum state of charge (SOC) (%) [min_soc]
# Battery maximum state of charge (SOC) (%) [max_soc]
# Battery capacity (MWh) [capacity]
# Battery charging efficiency (%) [charge_eff]
# Battery discharging efficiency (%) [discharge_eff]

#Python version 3.11.2

In [69]:
import pandas as pd
from datetime import datetime, timedelta
import os
import numpy as np
import timeit
from ortools.linear_solver import pywraplp

# Set pandas to display all rows and columns
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

# Suppress pandas performance warnings
import warnings
warnings.filterwarnings('ignore', category=pd.errors.PerformanceWarning)

In [3]:
# Load the data
# df2019 = pd.read_csv('/home/kevin/Downloads/BESS/data/raw/2019/merged_df_2019_cleaned.csv')
df2020 = pd.read_csv('D:/Python Projects/git_projects/BESS/data/raw/2020/merged_df_2020_cleaned.csv')
df2021 = pd.read_csv('D:/Python Projects/git_projects/BESS/data/raw/2021/merged_df_2021_cleaned.csv')
df2022 = pd.read_csv('D:/Python Projects/git_projects/BESS/data/raw/2022/merged_df_2022_cleaned.csv')
df2023 = pd.read_csv('D:/Python Projects/git_projects/BESS/data/raw/2023/merged_df_2023_cleaned.csv')
df2024 = pd.read_csv('D:/Python Projects/git_projects/BESS/data/raw/2024/merged_df_2024_cleaned.csv')

In [52]:
# Concatenate the data
# df2019
df = pd.concat([df2020, df2021, df2022, df2023, df2024], axis=0, ignore_index=True)

In [53]:
#Convert the datetime column to datetime format and sort the dataframe by time
df['datetime_'] = pd.to_datetime(df['datetime_'])
df.sort_values(by=["datetime_"], inplace=True)

In [54]:
#Selecting only the relevant columns and timeframe for the analysis
df=df.iloc[-360:-240]
df=df[['datetime_', 'pool_price', 'alberta_internal_load', 'wind_generation', 'solar_generation']]


In [55]:
df.head()

Unnamed: 0,datetime_,pool_price,alberta_internal_load,wind_generation,solar_generation
43502,2024-12-17 00:00:00,37.38,10588.0,391.998671,0.101944
43503,2024-12-17 01:00:00,29.45,10446.0,523.500052,0.1
43504,2024-12-17 02:00:00,24.84,10380.0,641.04649,0.1
43505,2024-12-17 03:00:00,23.98,10355.0,712.877795,0.1
43506,2024-12-17 04:00:00,24.59,10375.0,624.075838,0.1


In [56]:
batt_df = {
    "max_charge_rate": [9.0],  # Example value in MW
    "max_discharge_rate": [9.0],  # Example value in MW
    "capacity": [20.0],  # Example value in MWh
    "charge_eff": [0.95],  # Example efficiency
    "discharge_eff": [0.95],  # Example efficiency
    "min_soc": [0.1],  # Minimum state of charge
    "max_soc": [0.95],  # Maximum state of charge
    "initial_soc": [0.5]  # Initial state of charge
}

grid_df = {
    "max_buy_power" : [13000], 
    "max_sell_power" : [13000], 
    "max_import_power" : [13000], 
    "max_export_power" : [13000]
}

battery_df = pd.DataFrame(batt_df)
grid_df = pd.DataFrame(grid_df)


In [57]:
#Creating the datetime index for the market dataframe in "%Y-%m-%d %H:%M:%S" format

market1DF = df.copy()
market1DF.sort_values(by=["datetime_"], inplace=True)
market1DF["time_string"] = market1DF.apply(
    lambda x: (x["datetime_"] + timedelta(seconds=0.002)).strftime("%Y-%m-%d %H:%M:%S"), axis=1)
market1DF.set_index("time_string", inplace=True)
marketDF = market1DF

In [58]:
#Checking for null values in the market dataframe
marketDF.isnull().sum()

datetime_                0
pool_price               0
alberta_internal_load    0
wind_generation          0
solar_generation         0
dtype: int64

In [59]:
#Converting all the dataframes to dictionaries
marketDict = marketDF.to_dict()
gridDict = grid_df.to_dict()
battDict = battery_df.to_dict()

In [73]:
# Calculate time interval
timeInterval = marketDF.iloc[1]['datetime_'] - marketDF.iloc[0]['datetime_']

In [74]:
timeInterval

Timedelta('0 days 01:00:00')

In [62]:
# Assign the data to the input structure
input_data = type("input", (dict,), {})()
input_data.update({
    "simData": {
        "startTime": datetime.strptime(marketDF.index[0], "%Y-%m-%d %H:%M:%S"),
        "dt": int(round(timeInterval.total_seconds())) / (60 * 60),  # in hours
        "tIndex": marketDF.shape[0]
    },
    "market": {
        key: {sub_key: sub_item for sub_key, sub_item in marketDict[key].items()}
        for key in marketDict.keys() if key != "datetime_"
    },
    "grid": {key: item[0] for key, item in gridDict.items()},
    "batt": {key: item[0] for key, item in battDict.items()}
})

In [63]:
# Create the mip solver with the CBC backend.
solver = pywraplp.Solver.CreateSolver("CBC")

inf = solver.infinity()

tIndex = input_data["simData"]["tIndex"] # number of timeslots
dt = input_data["simData"]["dt"] # time interval in hour

# Create datetime array
startTime = input_data["simData"]["startTime"].strftime("%Y-%m-%d %H:%M:%S")
tIndex = input_data["simData"]["tIndex"]
timestamp = pd.date_range(startTime, periods=tIndex, freq=str(dt * 60) + "min")
time = [timestamp[i].strftime("%Y-%m-%d %H:%M:%S") for i in range(len(timestamp))]

time_s = timeit.default_timer()

In [64]:

# Adding timeseries variables
vGrid = [solver.NumVar(lb=-inf, ub=inf, name=f"vGrid_{i}") for i in range(tIndex)]
vBattPower = [solver.NumVar(lb=-inf, ub=inf, name=f"vBattPower_{i}") for i in range(tIndex)]
vCharge = [solver.NumVar(lb=-inf, ub=0, name=f"vCharge_{i}") for i in range(tIndex)]
vDischarge = [solver.NumVar(lb=0, ub=inf, name=f"vDischarge_{i}") for i in range(tIndex)]
vChargeStatus = [solver.BoolVar(name=f"vChargeStatus_{i}") for i in range(tIndex)]
vSOC = [solver.NumVar(lb=0, ub=1, name=f"vSOC_{i}") for i in range(tIndex)]

# Adding constraints
for i in range(tIndex):
    t = time[i]

    # Grid constraints
    solver.Add(vGrid[i] == input_data["market"]["alberta_internal_load"].get(t, 0) - input_data["market"]["solar_generation"].get(t, 0) -
               input_data["market"]["wind_generation"].get(t, 0) - vBattPower[i])  
    #solver.Add(vGrid[i] <= input_data["grid"]["max_buy_power"])  # Eqn. 2
    #solver.Add(vGrid[i] >= -input_data["grid"]["max_sell_power"])  # Eqn. 2
    #solver.Add(input_data["market"]["alberta_internal_load"].get(t, 0) - input_data["market"]["solar_generation"].get(t, 0) -
    #           input_data["market"]["wind_generation"].get(t, 0) - (vDischarge[i] + vCharge[i]) <=
    #           input_data["grid"]["max_import_power"])  # Eqn. 3
    #solver.Add(input_data["market"]["alberta_internal_load"].get(t, 0) - input_data["market"]["solar_generation"].get(t, 0) -
    #           input_data["market"]["wind_generation"].get(t, 0) - (vDischarge[i] + vCharge[i]) >=
    #           -input_data["grid"]["max_export_power"])  # Eqn. 3
    # Battery constraints
    solver.Add(vBattPower[i] == vCharge[i] + vDischarge[i])  
    solver.Add(vCharge[i] >= -input_data["batt"]["max_charge_rate"] * vChargeStatus[i]) 
    solver.Add(vDischarge[i] <= input_data["batt"]["max_discharge_rate"] * (1 - vChargeStatus[i]))  
    if i == 0:
        solver.Add(vSOC[i] == input_data["batt"]["initial_soc"] - dt / input_data["batt"]["capacity"] *
                   (vCharge[i] * (input_data["batt"]["charge_eff"]) +
                    vDischarge[i] / (input_data["batt"]["discharge_eff"])))  
    else:
        solver.Add(vSOC[i] == vSOC[i-1] - dt / input_data["batt"]["capacity"] *
                   (vCharge[i] * (input_data["batt"]["charge_eff"]) +
                    vDischarge[i] / (input_data["batt"]["discharge_eff"]))) 
    solver.Add(vSOC[i] >= input_data["batt"]["min_soc"])
    solver.Add(vSOC[i] <= input_data["batt"]["max_soc"]) 



In [65]:
# Adding objective
obj = 0
#obj += sum(-[vBattPower[i] * input_data["market"]["pool_price"][time[i]] * dt for i in range(tIndex)])
for i in range(tIndex):
    t = time[i]
    pool_price = input_data["market"]["pool_price"].get(t, 0)
    #pool_price = input_data["market"]["pool_price"].get(t, 0)  # Use .get() to handle missing keys
    obj += vBattPower[i] * pool_price * dt  # Accumulate the objective function
solver.Maximize(obj)

#

status = solver.Solve()
print("Solver status:", status)

time_e = timeit.default_timer()
runTime = round(time_e - time_s, 4)

if status == solver.OPTIMAL or status == solver.FEASIBLE:
    print("Solution is found.")
    print("Number of variables =", solver.NumVariables())
    print("Number of constraints =", solver.NumConstraints())
    print("Computation time = ", runTime)
    
    # Extracting solution values
    
    objValue = round(solver.Objective().Value() / 100, 2)
    
    objValueDF = pd.DataFrame.from_dict({"obj_value": objValue}, orient="index", columns=["Total P&L of BESS Operation ($)"])
    
    result = list(zip([round(input_data["market"]["pool_price"].get(time[i], 0), 2) for i in range(tIndex)],
                      [round(vGrid[i].solution_value(), 2) for i in range(tIndex)], 
                      [round(vBattPower[i].solution_value(), 2) for i in range(tIndex)],
                      [round(vCharge[i].solution_value(), 2) for i in range(tIndex)],
                      [round(vDischarge[i].solution_value(), 2) for i in range(tIndex)],
                      [round(vSOC[i].solution_value(), 4) for i in range(tIndex)],
                      [int(vChargeStatus[i].solution_value()) for i in range(tIndex)]
                      ))
    resultDF = pd.DataFrame(result, index=timestamp, columns=["pool_price ($/MWh)","Grid Power Flow (MW)", "Battery Output (MW)", "Charging Power (MW)", "Discharging Power (MW)", "State-of-charge (SOC)", "Charge Status"])
    
else:
    print("Solution cannot be found.")

Solver status: 0
Solution is found.
Number of variables = 720
Number of constraints = 840
Computation time =  4.3118


In [66]:
objValue

55.66

In [None]:
resultDF

#Implementing Dynamic Programming for the Opitimization task

Key improvements and DP-specific features:

    SOC Discretization:

        Creates a grid of SOC states (default 100 steps)
        Handles nonlinear efficiency curves through state transitions

    Backward Induction:

        Solves recursively from final period backward
        Stores optimal actions for each state-time pair

    Action Space Exploration:

        Evaluates discrete charge/discharge rates within physical limits
        Considers efficiency losses during state transitions

    Forward Pass:

        Reconstructs optimal path using stored policy decisions
        Ensures SOC constraints are maintained throughout

In [77]:
battery_df.head()

Unnamed: 0,max_charge_rate,max_discharge_rate,capacity,charge_eff,discharge_eff,min_soc,max_soc,initial_soc
0,9.0,9.0,20.0,0.95,0.95,0.1,0.95,0.5


In [None]:

def optimize_bess_dp(marketDF, battery_df, soc_steps=100):
    """
    Optimize BESS using Dynamic Programming with SOC discretization
    
    Parameters:
        marketDF (DataFrame): Market Data with datetime, pool price, AIL and renewable generation data
        battery_df (DataFrame): Battery Data with battery parameters
        soc_steps (int): Number of SOC discretization steps
        
    Returns:
        Results Dataframe with optimal actions and SOC trajectory
    """
    
    # Time parameters
    dt = (marketDF["datetime_"].iloc[1] - marketDF["datetime_"].iloc[0]).total_seconds() / 3600
    n_periods = len(marketDF)
    
    # Price data
    prices = marketDF["pool_price"].values
    
    # SOC discretization
    soc_batt = np.linspace(battery_df['min_soc'], battery_df['max_soc'], soc_steps)
    soc_step = soc_batt[1] - soc_batt[0]
    
    # DP Table: value[t, s] = max profit at time t with SOC s
    value_table = np.full((n_periods, soc_steps), -np.inf)
    policy_table = np.zeros((n_periods, soc_steps))  # Optimal action (MW)
    
    # Initialize final state values
    value_table[-1] = 0  # No value after last period
    
    # Backward induction
    for t in reversed(range(n_periods-1)):
        current_price = prices[t]
        
        for s_idx, current_soc in enumerate(soc_batt):
            max_profit = -np.inf
            best_action = 0
            
            # Possible actions (discrete charge/discharge rates)
            max_charge = min(
                battery_df['max_charge_rate'],
                (battery_df['max_soc'] - current_soc) * battery_df['capacity'] / (dt * battery_df['charge_eff'])
            )
            
            max_discharge = min(
                battery_df['max_discharge_rate'],
                (current_soc - battery_df['min_soc']) * battery_df['capacity'] * dt * battery_df['discharge_eff']
            )
            
            # Action space discretization
            actions = np.linspace(-max_charge, max_discharge, 50)
            
            for action in actions:
                if action > 0:  # Discharging
                    next_soc = current_soc - (action * dt * battery_df['discharge_eff']) / battery_df['capacity']
                    revenue = action * current_price * dt
                elif action < 0:  # Charging
                    next_soc = current_soc - (action * dt * battery_df['charge_eff']) / battery_df['capacity']
                    revenue = action * current_price * dt
                else:
                    next_soc = current_soc
                    revenue = 0
                
                # Stay within SOC bounds
                next_soc = max(min(next_soc, battery_df['max_soc']), battery_df['min_soc'])
                
                # Find nearest SOC index for next period
                next_s_idx = np.abs(soc_batt - next_soc).argmin()
                
                # Calculate total profit
                total_value = revenue + value_table[t+1, next_s_idx]
                
                if total_value > max_profit:
                    max_profit = total_value
                    best_action = action
            
            # Update DP tables
            value_table[t, s_idx] = max_profit
            policy_table[t, s_idx] = best_action
    
    # Forward pass to get optimal path
    optimal_actions = np.zeros(n_periods)
    soc_trajectory = np.zeros(n_periods)
    current_soc = battery_df['initial_soc']
    
    for t in range(n_periods):
        current_s_idx = np.abs(soc_batt - current_soc).argmin()
        optimal_action = policy_table[t, current_s_idx]
        
        # Apply action
        if optimal_action > 0:  # Discharging
            delta_soc = (optimal_action * dt * battery_df['discharge_eff']) / battery_df['capacity']
        elif optimal_action < 0:  # Charging
            delta_soc = (optimal_action * dt * battery_df['charge_eff']) / battery_df['capacity']
        else:
            delta_soc = 0
            
        current_soc -= delta_soc
        current_soc = max(min(current_soc, battery_df['max_soc']), battery_df['min_soc'])
        
        optimal_actions[t] = optimal_action
        soc_trajectory[t] = current_soc
    
    # Create results dataframe
    results = pd.DataFrame({
        'datetime_': marketDF['datetime_'],
        'Pool Price ($/MWh)': prices,
        'BESS Action (MW)': optimal_actions,
        'SOC': soc_trajectory,
        'Charge (MW)': np.where(optimal_actions < 0, -optimal_actions, 0),
        'Discharge (MW)': np.where(optimal_actions > 0, optimal_actions, 0)
    })
    
    # Save results
    #output_path = os.path.join(output_folder, 'DP_Results.xlsx')
    #results.to_excel(output_path, index=False)
    #print(f'Results saved to {output_path}')
    return results

