In [32]:
import pyomo.environ as pe
import pandas as pd
import numpy as np
from datetime import timedelta
from scipy.stats import t

In [33]:
#---------------------------
# Load Market Data
# ---------------------------
market_file = r"C:\Users\USER\Desktop\job applying\spot_balancing_2025Q1.csv"
market_df = pd.read_csv(market_file, parse_dates=[0])
if market_df.columns[0].lower() not in ['date', 'timestamp', 'time']:
    market_df.rename(columns={market_df.columns[0]: 'date'}, inplace=True)
market_df.sort_values('date', inplace=True)
market_df['date'] = pd.to_datetime(market_df['date'])
# Convert market data datetime from EET to UTC
market_df['date'] = market_df['date'].dt.tz_localize('EET').dt.tz_convert('UTC')
market_df.set_index('date', inplace=True)

# Ensure required columns exist:
# 'LT_mfrr_SA_up_activ', 'LT_mfrr_SA_down_activ', 'LT_up_sa_cbmp', 'LT_down_sa_cbmp'


In [34]:
# ---------------------------
# SingleMarketSolver Class
# ---------------------------
class SingleMarketSolver:
    def __init__(self, data, initial_soc=1.0):
        """
        data: pandas DataFrame indexed by datetime (for one optimization horizon, e.g. one day)
              Must include the following columns:
                  - LT_mfrr_SA_up_activ: cleared mFRR upwards activations (MW)
                  - LT_mfrr_SA_down_activ: cleared mFRR downwards activations (MW)
                  - LT_up_sa_cbmp: cleared marginal price for upwards activations (EUR/MWh)
                  - LT_down_sa_cbmp: cleared marginal price for downwards activations (EUR/MWh)
        initial_soc: initial state-of-charge in MWh (default 1.0, i.e. 50% of 2 MWh capacity)
        """
        self.data = data.copy()
        self.initial_soc = initial_soc
        self.model = None
        self.schedule = None
        self.profit = None
        self.solve_model()
    
    def solve_model(self):
        # --- Clean missing price values ---
        for col in ['LT_up_sa_cbmp', 'LT_down_sa_cbmp']:
            mean_val = self.data[col].mean()
            if pd.isna(mean_val):
                mean_val = 0.0  
            self.data[col] = self.data[col].fillna(mean_val)
        
        # Create a Pyomo ConcreteModel
        model = pe.ConcreteModel()
        N = len(self.data.index)
        model.T = pe.RangeSet(0, N - 1)
        # Battery parameters and time interval
        interval_duration = 0.25   # hours (15-minute intervals)
        capacity = 2.0             # MWh
        
        # For daily cycle constraint: maximum discharged energy per day = 2 cycles * capacity = 4 MWh.
        times = self.data.index
        day_map = {i: times[i].date() for i in range(N)}
        
        # Convert market parameters so that keys are integer indices.
        act_up = {i: (1 if x >= 1 else 0) for i, x in enumerate(self.data['LT_mfrr_SA_up_activ'])}
        act_down = {i: (1 if x >= 1 else 0) for i, x in enumerate(self.data['LT_mfrr_SA_down_activ'])}
        price_up = {i: x for i, x in enumerate(self.data['LT_up_sa_cbmp'])}
        price_down = {i: x for i, x in enumerate(self.data['LT_down_sa_cbmp'])}
        
        model.act_up = pe.Param(model.T, initialize=act_up)
        model.act_down = pe.Param(model.T, initialize=act_down)
        model.price_up = pe.Param(model.T, initialize=price_up, within=pe.Reals)
        model.price_down = pe.Param(model.T, initialize=price_down, within=pe.Reals)
        
        # Decision variables:
        model.charge = pe.Var(model.T, within=pe.Binary)
        model.discharge = pe.Var(model.T, within=pe.Binary)
        model.E = pe.Var(model.T, within=pe.NonNegativeReals, bounds=(0, capacity))
        
        # Initial SoC constraint:
        def init_soc_rule(m):
            return m.E[0] == self.initial_soc
        model.init_soc = pe.Constraint(rule=init_soc_rule)
        
        # Energy dynamics constraint:
        def energy_dynamics_rule(m, t):
            if t < N - 1:
                return m.E[t+1] == m.E[t] + interval_duration*(m.charge[t] - m.discharge[t])
            else:
                return pe.Constraint.Skip
        model.energy_dyn = pe.Constraint(model.T, rule=energy_dynamics_rule)
        
        # Power constraint: no simultaneous charging and discharging.
        def power_constraint_rule(m, t):
            return m.charge[t] + m.discharge[t] <= 1
        model.power_cons = pe.Constraint(model.T, rule=power_constraint_rule)
        
        # Activation constraints: battery can only charge/discharge if the activation occurred.
        def charge_activation_rule(m, t):
            return m.charge[t] <= m.act_down[t]
        model.charge_act = pe.Constraint(model.T, rule=charge_activation_rule)
        
        def discharge_activation_rule(m, t):
            return m.discharge[t] <= m.act_up[t]
        model.discharge_act = pe.Constraint(model.T, rule=discharge_activation_rule)
        
        # Daily cycle constraint: total discharged energy per day <= 4 MWh.
        days = sorted(set(day_map.values()))
        model.Days = pe.Set(initialize=days)
        def daily_cycle_rule(m, d):
            return sum(m.discharge[t]*interval_duration for t in m.T if day_map[t] == d) <= 4.0
        model.daily_cycle = pe.Constraint(model.Days, rule=daily_cycle_rule)
        
        # Objective: maximize profit = sum over t of (discharge revenue - charging cost)*energy moved.
        def objective_rule(m):
            return sum((m.discharge[t]*m.price_up[t] - m.charge[t]*m.price_down[t]) * interval_duration for t in m.T)
        model.obj = pe.Objective(rule=objective_rule, sense=pe.maximize)
        
        # Solve the model using GLPK 
        solver = pe.SolverFactory('glpk', executable=r"C:\Users\USER\Desktop\job applying\glpk-4.65\w64\glpsol.exe", validate=False)
        solver.options['mipgap'] = 0.01
        results = solver.solve(model, tee=False)
        print("Solver status:", results.solver.status, results.solver.termination_condition)
        
        self.model = model
        
        # Extract schedule into a DataFrame.
        sched = []
        for t in model.T:
            rec = {
                'datetime': self.data.index[t],
                'charge': pe.value(model.charge[t]),
                'discharge': pe.value(model.discharge[t]),
                'SoC': pe.value(model.E[t])
            }
            sched.append(rec)
        self.schedule = pd.DataFrame(sched)
        self.profit = pe.value(model.obj)
    
    def get_profit_per_MWh(self):
        total_discharged = sum(pe.value(self.model.discharge[t])*0.25 for t in self.model.T)
        if total_discharged > 0:
            return self.profit / total_discharged
        else:
            return 0.0

In [35]:
# ---------------------------
# Rolling Horizon & Profit CI Calculation
# ---------------------------
def optimize_over_horizon(data, horizon_hours=24):
    """
    Runs SingleMarketSolver optimization over a rolling horizon (e.g., daily) and returns:
      - Mean profit per MWh (EUR/MWh)
      - 95% confidence interval for profit per MWh
      - Array of daily profit per MWh values.
    """
    daily_profit_per_MWh = []
    unique_days = sorted(set(data.index.date))
    for d in unique_days:
        # Create a tz-aware start timestamp in UTC
        start = pd.Timestamp(d, tz='UTC')
        end = start + pd.Timedelta(hours=horizon_hours)
        data_horizon = data.loc[(data.index >= start) & (data.index < end)]
        if len(data_horizon) == 0:
            continue
        solver = SingleMarketSolver(data_horizon, initial_soc=1.0)
        daily_profit_per_MWh.append(solver.get_profit_per_MWh())
    daily_profit_per_MWh = np.array(daily_profit_per_MWh)
    mean_profit = np.mean(daily_profit_per_MWh)
    std_profit = np.std(daily_profit_per_MWh, ddof=1)
    n = len(daily_profit_per_MWh)
    t_val = t.ppf(0.975, df=n-1)
    ci_lower = mean_profit - t_val * std_profit / np.sqrt(n)
    ci_upper = mean_profit + t_val * std_profit / np.sqrt(n)
    return mean_profit, (ci_lower, ci_upper), daily_profit_per_MWh


In [36]:
# ---------------------------
# Cycle Depth Calculation
# ---------------------------
def calculate_average_cycle_depth(schedule, capacity=2.0):
    """
    Suggestion for "small depth of cycle" metric:
    Detect local extrema in the SoC profile, compute the absolute differences between consecutive extrema,
    and return the average depth as a percentage of battery capacity.
    """
    soc = schedule['SoC'].values
    if len(soc) < 3:
        return 0
    extrema = []
    for i in range(1, len(soc)-1):
        if (soc[i] > soc[i-1] and soc[i] > soc[i+1]) or (soc[i] < soc[i-1] and soc[i] < soc[i+1]):
            extrema.append(soc[i])
    if len(extrema) < 2:
        return 0
    cycle_depths = [abs(extrema[i+1] - extrema[i]) for i in range(len(extrema)-1)]
    avg_depth = np.mean(cycle_depths)
    return avg_depth / capacity * 100


In [37]:
# ---------------------------
# Example Usage
# ---------------------------
# Optimize over a 24-hour horizon and compute the profit confidence interval:
mean_profit, (ci_lower, ci_upper), daily_profits = optimize_over_horizon(market_df, horizon_hours=24)
print("Mean profit per MWh (EUR/MWh):", mean_profit)
print("95% Confidence Interval:", (ci_lower, ci_upper))

# Calculate average cycle depth for a specific day (e.g., January 7, 2025)
solver_day = SingleMarketSolver(market_df.loc[(market_df.index >= '2025-01-07') & (market_df.index < '2025-01-08')])
avg_cycle_depth = calculate_average_cycle_depth(solver_day.schedule)
print("Average cycle depth (% of capacity):", avg_cycle_depth)

# Save the schedule of the last SingleMarketSolver instance to CSV.
solver_day.schedule.to_csv("BESS_schedule_output.csv", index=False)

Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver status: ok optimal
Solver statu