# Battery Optimization

## Setup

In [51]:
# Core
from datetime import datetime

# Installed
import pandas as pd
import numpy as np
import pulp as pl
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Local
from models import Battery

np.random.seed(42)  # The Answer to the Ultimate Question of Life, The Universe, and Everything

## Mocking data

In [52]:
# Generate 2024 datetime index with 15-minute intervals
start_date     = datetime(2024, 1, 1)
end_date       = datetime(2025, 1, 1)
datetime_index = pd.date_range(start=start_date, end=end_date, freq='15min')[:-1]  # Exclude 2025-01-01

# Create base dataframe
df_energy = pd.DataFrame(index=datetime_index)

# Add time-based features for calculations
df_energy = df_energy.assign(DayOfYear = df_energy.index.dayofyear)
df_energy = df_energy.assign(HourOfDay = df_energy.index.hour + df_energy.index.minute/60)
df_energy = df_energy.assign(DayOfWeek = df_energy.index.dayofweek)                         # 0=Monday, 6=Sunday

# Generate solar generation (HomeGeneration)
# Seasonal component: higher in summer (cos wave with peak around day 172 = June 21)
seasonal_solar = 0.5 + 0.5 * np.cos(2 * np.pi * (df_energy.DayOfYear - 172) / 365)

# Daily component: sunrise/sunset pattern (cos wave peaking at noon)
daily_solar = np.maximum(0, np.cos(2 * np.pi * (df_energy.HourOfDay - 12) / 24)) ** 2

# Scale to realistic kW values (0-8 kW peak for typical home solar)
base_solar = 8 * seasonal_solar * daily_solar

# Add weather noise (some days are cloudier)
weather_noise = np.random.normal(1, 0.3, len(df_energy))
weather_noise = np.maximum(0.2, weather_noise)            # Minimum 20% of clear sky (ensures some daily generation)

df_energy = df_energy.assign(HomeGeneration = base_solar * weather_noise)

# Generate home consumption (HomeConsumption)
# Base load (always-on devices)
base_load = 0.5  # kW

# Daily pattern: higher in morning (7-9) and evening (17-22)
morning_peak  = 2 * np.exp(-((df_energy.HourOfDay - 8) ** 2) / 2)
evening_peak  = 3 * np.exp(-((df_energy.HourOfDay - 19.5) ** 2) / 8)
daily_pattern = morning_peak + evening_peak

# Weekend effect: slightly higher consumption, shifted patterns
weekend_multiplier    = np.where(df_energy.DayOfWeek >= 5, 1.2, 1.0)
weekend_shift         = np.where(df_energy.DayOfWeek >= 5, 1.0, 0.0)  # Later wake-up on weekends
daily_pattern_weekend = daily_pattern * weekend_multiplier

# Seasonal heating (higher consumption in winter)
seasonal_heating = 2 * (0.5 + 0.5 * np.cos(2 * np.pi * (df_energy.DayOfYear - 21) / 365))  # Peak around Jan 21

# Temperature-dependent heating (kicks in more during cold hours)
heating_daily = np.where(
    (df_energy.HourOfDay < 8) | (df_energy.HourOfDay > 16),
    seasonal_heating * 1.5,
    seasonal_heating * 0.3
)

# Combine all consumption components
base_consumption = base_load + daily_pattern_weekend + heating_daily

# Add consumption noise
consumption_noise = np.random.normal(1, 0.2, len(df_energy))
consumption_noise = np.maximum(0.3, consumption_noise)

df_energy = df_energy.assign(HomeConsumption = base_consumption * consumption_noise)

# Generate price signal (PriceSignal) in €/MWh
# Base price level
base_price = 80  # €/MWh

# Seasonal component: higher in winter due to heating demand
seasonal_price = 30 * (0.3 + 0.7 * np.cos(2 * np.pi * (df_energy.DayOfYear - 21) / 365))

# Daily pattern: higher during peak hours (morning/evening)
daily_price_pattern = 25 * (morning_peak + evening_peak) / 3

# Weekend effect: generally lower prices
weekend_price_discount = np.where(df_energy.DayOfWeek >= 5, -15, 0)

# Solar correlation: negative correlation with solar generation (more solar = lower prices)
solar_effect = -20 * (df_energy.HomeGeneration / 8)  # Normalize to max generation

# Add price volatility and occasional negative prices
price_volatility = np.random.normal(0, 25, len(df_energy)) / 3

# Occasional negative price events (especially during high solar periods)
negative_price_prob = 0.02 * (df_energy.HomeGeneration / 8)  # Higher prob during high solar
negative_price_events     = np.random.random(len(df_energy)) < negative_price_prob
negative_price_adjustment = np.where(negative_price_events, np.random.uniform(-50, -120, len(df_energy)), 0)

# Combine all price components
total_price = (
    base_price + 
    seasonal_price + 
    daily_price_pattern + 
    weekend_price_discount + 
    solar_effect + 
    price_volatility + 
    negative_price_adjustment
)

df_energy = df_energy.assign(PriceSignal = total_price)


print(f"Generated {len(df_energy)} data points for 2024")
print(f"Date range: {df_energy.index[0]} to {df_energy.index[-1]}")
print(f"\nDataFrame columns: {list(df_energy.columns)}")
print(f"\nSample data preview:")
print(df_energy.head())
print(f"\nPrice signal stats (€/MWh):")
print(f"Min: {df_energy.PriceSignal.min():.1f}")
print(f"Max: {df_energy.PriceSignal.max():.1f}")  
print(f"Mean: {df_energy.PriceSignal.mean():.1f}")
print(f"Negative prices: {(df_energy.PriceSignal < 0).sum()} events")


Generated 35136 data points for 2024
Date range: 2024-01-01 00:00:00 to 2024-12-31 23:45:00

DataFrame columns: ['DayOfYear', 'HourOfDay', 'DayOfWeek', 'HomeGeneration', 'HomeConsumption', 'PriceSignal']

Sample data preview:
                     DayOfYear  HourOfDay  DayOfWeek  HomeGeneration  \
2024-01-01 00:00:00          1       0.00          0             0.0   
2024-01-01 00:15:00          1       0.25          0             0.0   
2024-01-01 00:30:00          1       0.50          0             0.0   
2024-01-01 00:45:00          1       0.75          0             0.0   
2024-01-01 01:00:00          1       1.00          0             0.0   

                     HomeConsumption  PriceSignal  
2024-01-01 00:00:00         3.035247   100.649966  
2024-01-01 00:15:00         4.198418   108.187603  
2024-01-01 00:30:00         3.015899   120.303569  
2024-01-01 00:45:00         2.898689   105.608502  
2024-01-01 01:00:00         4.112579   119.663848  

Price signal stats (€/MWh):


In [53]:
df_energy

Unnamed: 0,DayOfYear,HourOfDay,DayOfWeek,HomeGeneration,HomeConsumption,PriceSignal
2024-01-01 00:00:00,1,0.00,0,0.0,3.035247,100.649966
2024-01-01 00:15:00,1,0.25,0,0.0,4.198418,108.187603
2024-01-01 00:30:00,1,0.50,0,0.0,3.015899,120.303569
2024-01-01 00:45:00,1,0.75,0,0.0,2.898689,105.608502
2024-01-01 01:00:00,1,1.00,0,0.0,4.112579,119.663848
...,...,...,...,...,...,...
2024-12-31 22:45:00,366,22.75,1,0.0,4.329018,107.756358
2024-12-31 23:00:00,366,23.00,1,0.0,3.346362,124.013647
2024-12-31 23:15:00,366,23.25,1,0.0,4.734271,116.703751
2024-12-31 23:30:00,366,23.50,1,0.0,4.309025,115.619819


## Battery & Infra setup

In [54]:
battery = Battery(
    capacity               = 13.5,    # kWh (typical Tesla Powerwall size)
    max_charge_rate        = 5.0,     # kW
    max_discharge_rate     = 5.0,     # kW
    current_soc            = 6.75,    # kWh (50% of capacity)
    final_soc              = 6.75,    # kWh (same as current - maintain level)
    charging_efficiency    = 0.95,    # 95% efficiency when charging
    discharging_efficiency = 0.95,    # 95% efficiency when discharging
    soc_min                = 1.35,    # kWh (10% of capacity reserved)
    soc_max                = 13.5,    # kWh (same as capacity)
    grid_import_limit      = 17.25,   # kW (3x25A typical home connection)
    grid_export_limit      = 17.25,   # kW (same as import for simplicity)
)

## Optimization

### Selecting limits for optimization

In [55]:
t0 = pd.Timestamp('2024-06-01')
t1 = pd.Timestamp('2024-06-03')
interval_minutes = 15

df_energy = df_energy.loc[t0 : t1]

### Plotting simulation data

In [56]:
cols = ['HomeGeneration', 'HomeConsumption', 'PriceSignal']

df_plot = df_energy.filter(cols)

# Assuming you have your df_energy loaded
fig = make_subplots(
    rows=len(df_plot.columns), 
    cols=1,
    subplot_titles=df_plot.columns,
    shared_xaxes=True
)

for i, col in enumerate(df_plot.columns, 1):
    fig.add_trace(
        go.Scatter(x=df_plot.index, y=df_plot[col], name=col),
        row=i, col=1
    )

fig.update_layout(height=300*len(df_plot.columns))


In [62]:
# Convert interval to hours for energy calculations
interval_hours = interval_minutes / 60.0
# Number of timesteps
n_steps   = len(df_energy)
timesteps = range(n_steps)

# Create optimization problem
prob = pl.LpProblem("Battery_Optimization", pl.LpMaximize)

# Decision variables
battery_charge = pl.LpVariable.dicts(
    "BatteryCharge",
    timesteps,
    lowBound=0,
    upBound=battery.max_charge_rate
)
battery_discharge = pl.LpVariable.dicts(
    "BatteryDischarge",
    timesteps,
    lowBound=0,
    upBound=battery.max_discharge_rate
)
grid_import = pl.LpVariable.dicts(
    "GridImport",
    timesteps,
    lowBound=0,
    upBound=battery.grid_import_limit
)
grid_export = pl.LpVariable.dicts(
    "GridExport",
    timesteps,
    lowBound=0,
    upBound=battery.grid_export_limit
)
soc = pl.LpVariable.dicts(
    "SoC",
    timesteps,
    lowBound=battery.soc_min,
    upBound=battery.soc_max
)

# Objective: maximize profit
price_per_kwh = df_energy.PriceSignal / 1000
profit = pl.lpSum([
    grid_export[t] * price_per_kwh.iloc[t] * interval_hours -  # Revenue from export
    grid_import[t] * price_per_kwh.iloc[t] * interval_hours     # Cost of import
    for t in timesteps
])
prob += profit

# Constraints
for t in timesteps:
    # Power balance constraint
    prob += (
        df_energy.HomeGeneration.iloc[t] +
        battery_discharge[t] + grid_import[t]
        ==
        df_energy.HomeConsumption.iloc[t] +
        battery_charge[t] + grid_export[t]
    )

    # SoC dynamics with efficiency losses
    if t == 0:
        # Initial SoC
        prob += soc[t] == (
            battery.current_soc +
            battery_charge[t] * battery.charging_efficiency * interval_hours -
            battery_discharge[t] / battery.discharging_efficiency * interval_hours
        )
    else:
        # SoC evolution
        prob += soc[t] == (
            soc[t-1] +
            battery_charge[t] * battery.charging_efficiency * interval_hours -
            battery_discharge[t] / battery.discharging_efficiency * interval_hours
        )

# Final SoC constraint
prob += soc[n_steps - 1] == battery.final_soc

# Solve the problem
prob.solve(pl.PULP_CBC_CMD(msg=0))  # msg = 0 suppresses solver output

# Check if solution is optimal
if prob.status != pl.LpStatusOptimal:
    raise ValueError(f"Optimization failed with status: {pl.LpStatus[prob.status]}")

# Extract results and add to dataframe
df_result = df_energy.copy()
df_result = df_result.assign(
    BatteryCharge    = [battery_charge[t].varValue for t in timesteps],
    BatteryDischarge = [battery_discharge[t].varValue for t in timesteps],
    GridImport       = [grid_import[t].varValue for t in timesteps],
    GridExport       = [grid_export[t].varValue for t in timesteps],
    SoC              = [soc[t].varValue for t in timesteps]
)

# Add sanity check baseline for no-battery case
df_result = df_result.assign(BaselineGridExport=(df_result.HomeGeneration  - df_result.HomeConsumption).clip(lower=0))
df_result = df_result.assign(BaselineGridImport=(df_result.HomeConsumption - df_result.HomeGeneration ).clip(lower=0))

baseline_price_export = df_result.BaselineGridExport * df_result.PriceSignal * interval_hours / 1000
baseline_price_import = df_result.BaselineGridImport * df_result.PriceSignal * interval_hours / 1000
df_result = df_result.assign(BaselineProfit=baseline_price_export - baseline_price_import)

df_result = df_result.assign(BaselineCumulativeProfit=df_result.BaselineProfit.cumsum())

battery_price_export = df_result.GridExport * df_result.PriceSignal * interval_hours / 1000
battery_price_import = df_result.GridImport * df_result.PriceSignal * interval_hours / 1000
df_result = df_result.assign(BatteryProfit=battery_price_export - battery_price_import)

df_result = df_result.assign(BatteryCumulativeProfit=df_result.BatteryProfit.cumsum())

# Print results
battery_final  = df_result.BatteryCumulativeProfit .iloc[-1]
baseline_final = df_result.BaselineCumulativeProfit.iloc[-1]
print(f"Optimization completed successfully")
print(f"Total profit with battery:               €{battery_final:.2f}")
print(f"Total profit without battery (baseline): €{baseline_final:.2f}")
print(f"Difference:                              €{(battery_final - baseline_final):.2f}")
print(f"Final SoC: {df_result.SoC.iloc[-1]:.2f} kWh (target: {battery.final_soc} kWh)")



Optimization completed successfully
Total profit with battery:               €-0.11
Total profit without battery (baseline): €-1.75
Difference:                              €1.64
Final SoC: 6.75 kWh (target: 6.75 kWh)


### Running optimization

In [None]:
fig = make_subplots(
    rows=9, cols=1, 
    shared_xaxes=True,
    subplot_titles=(
        "Home Generation & Consumption",
        "Battery Operations", 
        "Net Grid Flow (Import+ Export-)",
        "State of Charge",
        "Price Signal",
        "Profit Comparison",
        "Baseline vs Optimized Net Grid",
        "Cumulative Profit",
        "Energy Balance Check",
    ),
    vertical_spacing=0.02
)

# Home Generation & Consumption
fig.add_trace(go.Scatter(x=df_result.index, y=df_result['HomeGeneration'], name='Home Generation'), row=1, col=1)
fig.add_trace(go.Scatter(x=df_result.index, y=df_result['HomeConsumption'], name='Home Consumption'), row=1, col=1)

# Battery Operations
fig.add_trace(go.Bar(x=df_result.index, y=df_result['BatteryCharge'], name='Battery Charge', width=300000), row=2, col=1)
fig.add_trace(go.Bar(x=df_result.index, y=-df_result['BatteryDischarge'], name='Battery Discharge', width=300000), row=2, col=1)

# Net Grid Flow with battery operation highlighting
fig.add_trace(go.Scatter(x=df_result.index, y=df_result['GridImport'] - df_result['GridExport'], name='Net Grid Flow'), row=3, col=1)

max_opacity = 0.3

# Add individual time interval highlighting based on battery operations
for i, timestamp in enumerate(df_result.index):
    charge_rate = df_result['BatteryCharge'].iloc[i]
    discharge_rate = df_result['BatteryDischarge'].iloc[i]
    
    # Calculate next timestamp for rectangle width
    if i < len(df_result.index) - 1:
        next_timestamp = df_result.index[i + 1]
    else:
        # For last interval, use same duration as previous
        duration = df_result.index[i] - df_result.index[i-1]
        next_timestamp = timestamp + duration
    
    # Charging periods (green)
    if charge_rate > 0.01:  # threshold to avoid noise
        opacity = min(max_opacity, (charge_rate / battery.max_charge_rate) * max_opacity)
        fig.add_vrect(
            x0=timestamp, x1=next_timestamp,
            fillcolor="Green", opacity=opacity,
            layer="below", line_width=0,
            row=3, col=1
        )
    
    # Discharging periods (red)  
    elif discharge_rate > 0.01:
        opacity = min(max_opacity, (discharge_rate / battery.max_discharge_rate) * max_opacity)
        fig.add_vrect(
            x0=timestamp, x1=next_timestamp,
            fillcolor="Red", opacity=opacity,
            layer="below", line_width=0,
            row=3, col=1
        )

# State of Charge
fig.add_trace(go.Scatter(x=df_result.index, y=df_result['SoC'], name='Battery SoC'), row=4, col=1)

# Price Signal
fig.add_trace(go.Scatter(x=df_result.index, y=df_result['PriceSignal'], name='Price Signal'), row=5, col=1)

# Profit Comparison
fig.add_trace(go.Scatter(x=df_result.index, y=df_result['BaselineProfit'], name='Baseline Profit'), row=6, col=1)
fig.add_trace(go.Scatter(x=df_result.index, y=df_result['BatteryProfit'], name='Battery Profit'), row=6, col=1)

# Baseline vs Optimized Net Grid
fig.add_trace(go.Scatter(x=df_result.index, y=df_result['BaselineGridImport'] - df_result['BaselineGridExport'], name='Baseline Net Grid'), row=7, col=1)
fig.add_trace(go.Scatter(x=df_result.index, y=df_result['GridImport'] - df_result['GridExport'], name='Optimized Net Grid'), row=7, col=1)

# Cumulative Profit
fig.add_trace(go.Scatter(x=df_result.index, y=df_result['BaselineCumulativeProfit'], name='Baseline Cumulative'), row=8, col=1)
fig.add_trace(go.Scatter(x=df_result.index, y=df_result['BatteryCumulativeProfit'], name='Battery Cumulative'), row=8, col=1)

# Energy Balance Check
energy_balance = df_result['HomeGeneration'] - df_result['HomeConsumption'] + df_result['BatteryDischarge'] - df_result['BatteryCharge'] - df_result['GridExport'] + df_result['GridImport']
fig.add_trace(go.Scatter(x=df_result.index, y=energy_balance, name='Energy Balance'), row=9, col=1)

fig.update_layout(
    height=2000, 
    title="Battery Optimization Physics Check",
    hovermode='x unified'
)

fig.update_xaxes(matches='x')
