# Chapter 10: Stochastic Model Predictive Control for Renewable Energy Allocation

## 10.1 Introduction to SMPC for Energy Systems

Renewable energy sources like solar and wind power are increasingly important in our electrical grid, but they present unique challenges. Unlike traditional power plants, their output fluctuates with weather conditions in ways we can predict only probabilistically. At the same time, modern power demands from data centers and electric vehicle (EV) charging stations require reliable power with different priorities and constraints.

In this chapter, we'll explore how Stochastic Model Predictive Control (SMPC) provides a framework for optimally allocating renewable power under uncertainty. We'll start with simple examples and gradually build toward a complete implementation that could be used in real-world renewable energy management.

## 10.2 Basic Probability Concepts for Energy Forecasting

Before diving into SMPC, let's establish some fundamentals about representing uncertainty in energy systems. Suppose we have a small solar array that powers a data center. On a given day, the power output might depend on cloud cover, which we'll simplify into three states: "clear," "partly cloudy," and "overcast."

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
import pymc as pm

# Solar power output (kW) based on weather condition
weather_power = {
    'clear': 100,
    'partly_cloudy': 60,
    'overcast': 20
}

# Prior probabilities of weather conditions
weather_probs = {
    'clear': 0.3,
    'partly_cloudy': 0.5,
    'overcast': 0.2
}

If we know nothing else about tomorrow's weather, our expected power output would be:

In [None]:
expected_power = sum(weather_power[condition] * prob 
                    for condition, prob in weather_probs.items())
print(f"Expected power output: {expected_power} kW")

This gives us a baseline expectation, but single-point forecasts aren't enough for robust planning. We need to model the full distribution of possible outcomes.

### Representing Forecast Uncertainty

Weather forecasts are probabilistic. If the forecast says "60% chance of clouds tomorrow," we can update our belief about tomorrow's weather condition:

In [None]:
def update_weather_probability(prior, forecast_cloudy_prob):
    """Update weather probabilities based on forecast."""
    # Simple likelihood model based on forecast accuracy
    likelihood = {
        'clear': 1 - forecast_cloudy_prob,
        'partly_cloudy': forecast_cloudy_prob * 0.7,  # More likely to be partly cloudy
        'overcast': forecast_cloudy_prob * 0.3        # Less likely to be overcast
    }
    
    # Calculate posterior using Bayes' rule
    posterior = {}
    total = 0
    for condition in prior:
        posterior[condition] = prior[condition] * likelihood[condition]
        total += posterior[condition]
    
    # Normalize
    for condition in posterior:
        posterior[condition] /= total
        
    return posterior

# Update with forecast of 60% chance of clouds
updated_probs = update_weather_probability(weather_probs, 0.6)
print("Updated weather probabilities:")
for condition, prob in updated_probs.items():
    print(f"  {condition}: {prob:.2f}")

updated_expected_power = sum(weather_power[condition] * prob 
                            for condition, prob in updated_probs.items())
print(f"Updated expected power: {updated_expected_power:.1f} kW")

This simple Bayesian update gives us a more informed power prediction. But in real systems, we need to forecast power not just for a single moment but over an entire day or week.

## 10.3 Modeling Time Series for Renewable Generation

Renewable energy output varies throughout the day in patterns that depend on both deterministic factors (like the sun's position) and stochastic factors (like cloud cover). Let's model solar output as a time series with uncertainty.

In [None]:
def solar_output_model(hour, weather_condition, max_power=100):
    """Model solar output based on hour of day and weather."""
    # Deterministic component - daily pattern
    if 6 <= hour <= 18:  # Daylight hours
        base_output = max_power * np.sin(np.pi * (hour - 6) / 12)
    else:
        base_output = 0
    
    # Apply weather factor
    weather_factor = {
        'clear': 1.0,
        'partly_cloudy': 0.6,
        'overcast': 0.2
    }
    
    return base_output * weather_factor[weather_condition]

# Generate a day's worth of output under different weather scenarios
hours = range(24)
clear_output = [solar_output_model(h, 'clear') for h in hours]
partly_cloudy_output = [solar_output_model(h, 'partly_cloudy') for h in hours]
overcast_output = [solar_output_model(h, 'overcast') for h in hours]

plt.figure(figsize=(10, 6))
plt.plot(hours, clear_output, label='Clear')
plt.plot(hours, partly_cloudy_output, label='Partly Cloudy')
plt.plot(hours, overcast_output, label='Overcast')
plt.xlabel('Hour of Day')
plt.ylabel('Power Output (kW)')
plt.title('Solar Power Output by Weather Condition')
plt.legend()
plt.grid(True)
plt.show()

Real weather isn't constant throughout the day. Let's simulate changing conditions using a simple Markov model:

In [None]:
def simulate_weather_sequence(initial_condition, hours, transition_matrix):
    """Simulate weather changes using a Markov model."""
    conditions = ['clear', 'partly_cloudy', 'overcast']
    condition_idx = conditions.index(initial_condition)
    sequence = [initial_condition]
    
    for _ in range(1, hours):
        # Transition to next state based on probabilities
        next_condition_idx = np.random.choice(3, p=transition_matrix[condition_idx])
        condition_idx = next_condition_idx
        sequence.append(conditions[condition_idx])
    
    return sequence

# Weather transition probabilities (rows: from, columns: to)
# [clear, partly_cloudy, overcast]
transition_matrix = np.array([
    [0.7, 0.25, 0.05],  # From clear
    [0.2, 0.6, 0.2],    # From partly cloudy
    [0.05, 0.35, 0.6]   # From overcast
])

# Simulate a day starting with clear weather
np.random.seed(42)  # For reproducibility
weather_sequence = simulate_weather_sequence('clear', 24, transition_matrix)

# Calculate power output based on simulated weather
simulated_output = [solar_output_model(h, w) for h, w in zip(hours, weather_sequence)]

plt.figure(figsize=(10, 6))
plt.plot(hours, simulated_output)
plt.xlabel('Hour of Day')
plt.ylabel('Power Output (kW)')
plt.title('Simulated Solar Power Output with Changing Weather')
plt.grid(True)
plt.show()

Each time we run this simulation, we get a different possible outcome. This gives us a way to generate scenarios for our stochastic optimization.

## 10.4 Basics of Model Predictive Control

Model Predictive Control (MPC) is an optimization-based control strategy that forecasts system behavior over a receding horizon. At each time step:

1. The controller predicts future states over a finite horizon
2. It optimizes control actions to minimize a cost function
3. It implements only the first control action
4. At the next time step, it repeats the process with updated information

For our renewable energy allocation problem, the "control actions" are decisions about how much power to allocate to each consumer (data center vs. EV charging).

Let's start with a simple deterministic version before adding stochasticity:

In [None]:
def simple_mpc(current_state, forecast, horizon, cost_func):
    """Simple Model Predictive Control optimization."""
    # For simplicity, we're using a dummy optimization
    # In reality, we would use a proper optimizer like scipy.optimize or cvxpy
    
    best_cost = float('inf')
    best_action = None
    
    # Try different possible actions
    for action in possible_actions:
        # Simulate future states based on this action
        future_states = simulate_future(current_state, action, forecast, horizon)
        
        # Calculate cost
        cost = cost_func(future_states, action)
        
        if cost < best_cost:
            best_cost = cost
            best_action = action
    
    return best_action

This is just a conceptual framework. Let's implement a concrete example for our energy allocation problem.

## 10.5 A Simple Deterministic Energy Allocation

Consider a system with a single renewable source (solar) and two loads: a data center (high priority) and an EV charging station (flexible). We'll first solve this deterministically.

In [None]:
def allocate_power_deterministic(available_power, datacenter_demand, ev_demand):
    """
    Allocate available power to datacenter (priority) and EV charging.
    Returns the allocation to each in a tuple (datacenter, ev).
    """
    # First satisfy datacenter demand
    datacenter_allocation = min(available_power, datacenter_demand)
    remaining_power = available_power - datacenter_allocation
    
    # Allocate remaining power to EV charging
    ev_allocation = min(remaining_power, ev_demand)
    
    return (datacenter_allocation, ev_allocation)

# Example usage
available_power = 80  # kW
datacenter_demand = 50  # kW
ev_demand = 40  # kW

allocation = allocate_power_deterministic(available_power, datacenter_demand, ev_demand)
print(f"Power allocation: Datacenter = {allocation[0]} kW, EV Charging = {allocation[1]} kW")

This simple function prioritizes the data center, but it doesn't consider future time steps. Let's expand to a multi-period deterministic MPC approach:

In [None]:
def deterministic_mpc(power_forecast, datacenter_demand_forecast, 
                      ev_demand_forecast, horizon, ev_battery_initial=0, 
                      ev_battery_capacity=100):
    """
    Multi-period deterministic MPC for power allocation.
    
    Args:
        power_forecast: List of forecasted available power for each period
        datacenter_demand_forecast: List of forecasted datacenter demand
        ev_demand_forecast: List of forecasted EV charging demand
        horizon: Number of periods to consider
        ev_battery_initial: Initial EV battery charge (kWh)
        ev_battery_capacity: Maximum EV battery capacity (kWh)
    
    Returns:
        List of allocations for each period
    """
    allocations = []
    ev_battery = ev_battery_initial
    
    for t in range(horizon):
        available = power_forecast[t]
        dc_demand = datacenter_demand_forecast[t]
        ev_demand = min(ev_demand_forecast[t], ev_battery_capacity - ev_battery)
        
        # Basic allocation prioritizing datacenter
        dc_allocation = min(available, dc_demand)
        remaining = available - dc_allocation
        
        # For EV, we could be smarter about when to charge
        # For now, just charge whenever possible
        ev_allocation = min(remaining, ev_demand)
        ev_battery += ev_allocation  # Update battery state
        
        allocations.append((dc_allocation, ev_allocation))
    
    return allocations

# Example with a 6-hour horizon
power_forecast = [80, 90, 60, 30, 10, 0]  # Available solar power (kW)
dc_demand = [50, 55, 60, 50, 45, 40]      # Datacenter demand (kW)
ev_demand = [30, 30, 30, 30, 30, 30]      # Maximum EV charging rate (kW)

allocations = deterministic_mpc(power_forecast, dc_demand, ev_demand, 6)

print("Hour | Available | DC Demand | DC Alloc | EV Alloc | Unmet DC")
print("-" * 60)
for t, (dc_alloc, ev_alloc) in enumerate(allocations):
    unmet = dc_demand[t] - dc_alloc
    print(f"{t:4d} | {power_forecast[t]:9d} | {dc_demand[t]:9d} | {dc_alloc:8.1f} | {ev_alloc:8.1f} | {unmet:8.1f}")

This approach works when we have perfect forecasts, but real renewable generation is uncertain. Let's introduce stochasticity.

## 10.6 Introducing Uncertainty: Stochastic MPC Basics

In Stochastic MPC, we account for uncertainties in our forecasts. Instead of a single forecast trajectory, we consider multiple scenarios or a probability distribution.

A common approach is scenario-based SMPC, where we:

1. Generate multiple possible future scenarios
2. Optimize across all scenarios, considering their probabilities
3. Implement a control action that performs well across scenarios

Let's implement a simple scenario-based SMPC for our energy allocation problem:

In [None]:
def generate_solar_scenarios(base_forecast, num_scenarios, uncertainty=0.2):
    """
    Generate multiple solar power scenarios based on uncertainty.
    
    Args:
        base_forecast: Base power forecast
        num_scenarios: Number of scenarios to generate
        uncertainty: Standard deviation as fraction of forecast
    
    Returns:
        List of scenario forecasts
    """
    scenarios = []
    horizon = len(base_forecast)
    
    for _ in range(num_scenarios):
        # Generate noise
        noise = np.random.normal(0, 1, horizon)
        
        # Apply noise to base forecast
        scenario = [max(0, base * (1 + uncertainty * n)) 
                   for base, n in zip(base_forecast, noise)]
        scenarios.append(scenario)
    
    return scenarios

def stochastic_mpc_simple(base_power_forecast, dc_demand_forecast,
                          ev_demand_forecast, horizon, num_scenarios=10):
    """
    Simple scenario-based stochastic MPC.
    
    Args:
        base_power_forecast: Base forecast for available power
        dc_demand_forecast: Datacenter demand forecast
        ev_demand_forecast: EV charging demand forecast
        horizon: Number of periods
        num_scenarios: Number of scenarios to consider
    
    Returns:
        First-period allocation decision
    """
    # Generate scenarios
    power_scenarios = generate_solar_scenarios(base_power_forecast, num_scenarios)
    
    # Evaluate each scenario
    scenario_allocations = []
    for scenario in power_scenarios:
        # Run deterministic MPC for this scenario
        allocs = deterministic_mpc(scenario, dc_demand_forecast, 
                                   ev_demand_forecast, horizon)
        scenario_allocations.append(allocs[0])  # First-period allocation
    
    # Average first-period allocations across scenarios
    dc_allocs = [alloc[0] for alloc in scenario_allocations]
    ev_allocs = [alloc[1] for alloc in scenario_allocations]
    
    avg_dc_alloc = sum(dc_allocs) / num_scenarios
    avg_ev_alloc = sum(ev_allocs) / num_scenarios
    
    return (avg_dc_alloc, avg_ev_alloc)

# Example usage
np.random.seed(42)
base_forecast = [80, 90, 60, 30, 10, 0]
dc_demand = [50, 55, 60, 50, 45, 40]
ev_demand = [30, 30, 30, 30, 30, 30]

first_decision = stochastic_mpc_simple(base_forecast, dc_demand, ev_demand, 6)
print(f"First-period allocation: Datacenter = {first_decision[0]:.1f} kW, EV = {first_decision[1]:.1f} kW")

This simple approach has limitations. It doesn't consider:
1. The probability of each scenario
2. The cost of unmet demand
3. The ability to adapt future decisions based on new information

Let's address these in a more comprehensive SMPC formulation.

## 10.7 Advanced SMPC: Scenario Trees and Recourse

In practical SMPC, we often use scenario trees to represent how uncertainty evolves over time. A scenario tree branches at each time step, representing different possible futures.

Let's implement a more advanced SMPC using this approach:

In [None]:
import networkx as nx

def build_scenario_tree(base_forecast, branching_factor, horizon, uncertainty=0.2):
    """
    Build a scenario tree for solar power forecasts.
    
    Args:
        base_forecast: Base forecast values
        branching_factor: Number of branches at each node
        horizon: Planning horizon
        uncertainty: Uncertainty parameter
    
    Returns:
        NetworkX DiGraph representing the scenario tree
    """
    G = nx.DiGraph()
    
    # Add root node
    G.add_node((0, 0), power=base_forecast[0], probability=1.0)
    
    # Generate levels of the tree
    for t in range(1, horizon):
        # For each node in the previous level
        for parent in [n for n in G.nodes if n[0] == t-1]:
            parent_power = G.nodes[parent]['power']
            parent_prob = G.nodes[parent]['probability']
            
            # Create branches
            branch_probs = np.ones(branching_factor) / branching_factor
            
            for b in range(branching_factor):
                # Create uncertainty factor based on branch
                if branching_factor == 1:
                    factor = 1.0  # No branching
                elif branching_factor == 2:
                    factor = 1.0 + uncertainty * (b*2 - 1)  # Low/high
                else:
                    # Evenly distributed factors
                    factor = 1.0 + uncertainty * (2*b/(branching_factor-1) - 1)
                
                # Calculate power for this node
                node_power = max(0, base_forecast[t] * factor)
                
                # Add node and edge
                node = (t, parent[1]*branching_factor + b)
                G.add_node(node, power=node_power, probability=parent_prob*branch_probs[b])
                G.add_edge(parent, node)
    
    return G

def optimize_over_tree(tree, dc_demand, ev_demand, dc_penalty=10, ev_penalty=1):
    """
    Optimize power allocation over a scenario tree.
    
    Args:
        tree: Scenario tree as NetworkX DiGraph
        dc_demand: List of datacenter demands
        ev_demand: List of EV charging demands
        dc_penalty: Penalty for unmet datacenter demand
        ev_penalty: Penalty for unmet EV demand
    
    Returns:
        Dictionary of optimal allocations for each node
    """
    # For simplicity, we'll use a greedy approach
    # In practice, you would use mathematical programming
    
    allocations = {}
    
    # Process nodes by time (level in tree)
    horizon = max(n[0] for n in tree.nodes) + 1
    
    for t in range(horizon):
        for node in [n for n in tree.nodes if n[0] == t]:
            available = tree.nodes[node]['power']
            node_dc_demand = dc_demand[t]
            node_ev_demand = ev_demand[t]
            
            # Prioritize based on penalty
            if dc_penalty > ev_penalty:
                # Datacenter first
                dc_alloc = min(available, node_dc_demand)
                remaining = available - dc_alloc
                ev_alloc = min(remaining, node_ev_demand)
            else:
                # EV first
                ev_alloc = min(available, node_ev_demand)
                remaining = available - ev_alloc
                dc_alloc = min(remaining, node_dc_demand)
            
            allocations[node] = (dc_alloc, ev_alloc)
    
    return allocations

def compute_expected_cost(tree, allocations, dc_demand, ev_demand, dc_penalty, ev_penalty):
    """Calculate expected cost over the scenario tree."""
    total_cost = 0
    
    for node in tree.nodes:
        t = node[0]
        prob = tree.nodes[node]['probability']
        dc_alloc, ev_alloc = allocations[node]
        
        # Calculate unmet demand
        dc_unmet = max(0, dc_demand[t] - dc_alloc)
        ev_unmet = max(0, ev_demand[t] - ev_alloc)
        
        # Calculate cost for this node
        node_cost = dc_unmet * dc_penalty + ev_unmet * ev_penalty
        total_cost += node_cost * prob
    
    return total_cost

# Example usage
np.random.seed(42)
base_forecast = [80, 90, 60, 30, 10, 0]
dc_demand = [50, 55, 60, 50, 45, 40]
ev_demand = [30, 30, 30, 30, 30, 30]

# Build a simple scenario tree with two branches per node
tree = build_scenario_tree(base_forecast, branching_factor=2, horizon=3)
allocations = optimize_over_tree(tree, dc_demand, ev_demand)

# Get first-stage decision (root node)
root_allocation = allocations[(0, 0)]
print(f"First-stage allocation: Datacenter = {root_allocation[0]:.1f} kW, EV = {root_allocation[1]:.1f} kW")

# Calculate expected cost
cost = compute_expected_cost(tree, allocations, dc_demand, ev_demand, dc_penalty=10, ev_penalty=1)
print(f"Expected cost: {cost:.2f}")

## 10.8 Real-world SMPC Implementation

For a more realistic implementation, we'll use a proper optimization library. Let's use `cvxpy` to formulate and solve our SMPC problem:

In [None]:
import cvxpy as cp

def smpc_with_cvxpy(power_scenarios, scenario_probs, dc_demand, ev_demand, 
                    dc_penalty=10, ev_penalty=1, ev_battery_capacity=100):
    """
    Solve SMPC problem with CVXPY.
    
    Args:
        power_scenarios: List of power forecast scenarios
        scenario_probs: Probabilities of each scenario
        dc_demand: Datacenter demand forecast
        ev_demand: EV charging demand forecast
        dc_penalty: Penalty for unmet datacenter demand
        ev_penalty: Penalty for unmet EV demand
        ev_battery_capacity: Maximum EV battery capacity
    
    Returns:
        Optimal first-stage decisions
    """
    num_scenarios = len(power_scenarios)
    horizon = len(power_scenarios[0])
    
    # Decision variables
    dc_allocation = {}
    ev_allocation = {}
    ev_battery = {}
    dc_unmet = {}
    ev_unmet = {}
    
    for s in range(num_scenarios):
        for t in range(horizon):
            dc_allocation[s, t] = cp.Variable(nonneg=True)
            ev_allocation[s, t] = cp.Variable(nonneg=True)
            ev_battery[s, t] = cp.Variable(nonneg=True)
            dc_unmet[s, t] = cp.Variable(nonneg=True)
            ev_unmet[s, t] = cp.Variable(nonneg=True)
    
    # Non-anticipativity constraints (first stage decisions must be the same)
    constraints = []
    for s in range(1, num_scenarios):
        constraints.append(dc_allocation[s, 0] == dc_allocation[0, 0])
        constraints.append(ev_allocation[s, 0] == ev_allocation[0, 0])
    
    # Power constraints and dynamics
    for s in range(num_scenarios):
        for t in range(horizon):
            # Power allocation can't exceed available power
            constraints.append(dc_allocation[s, t] + ev_allocation[s, t] <= power_scenarios[s][t])
            
            # Unmet demand definition
            constraints.append(dc_unmet[s, t] >= dc_demand[t] - dc_allocation[s, t])
            constraints.append(ev_unmet[s, t] >= ev_demand[t] - ev_allocation[s, t])
            
            # Battery dynamics
            if t == 0:
                constraints.append(ev_battery[s, t] == ev_allocation[s, t])
            else:
                constraints.append(ev_battery[s, t] == ev_battery[s, t-1] + ev_allocation[s, t])
            
            # Battery capacity constraint
            constraints.append(ev_battery[s, t] <= ev_battery_capacity)
    
    # Objective: minimize expected cost
    objective_terms = []
    for s in range(num_scenarios):
        for t in range(horizon):
            objective_terms.append(scenario_probs[s] * (dc_penalty * dc_unmet[s, t] + 
                                                        ev_penalty * ev_unmet[s, t]))
    
    objective = cp.Minimize(sum(objective_terms))
    
    # Solve the problem
    problem = cp.Problem(objective, constraints)
    problem.solve()
    
    if problem.status != 'optimal':
        print(f"Warning: Problem status is {problem.status}")
    
    # Extract first-stage decisions
    return (dc_allocation[0, 0].value, ev_allocation[0, 0].value)

# Example with multiple scenarios
np.random.seed(42)
base_forecast = [80, 90, 60, 30, 10, 0]
dc_demand = [50, 55, 60, 50, 45, 40]
ev_demand = [30, 30, 30, 30, 30, 30]

# Generate scenarios
num_scenarios = 5
scenarios = generate_solar_scenarios(base_forecast, num_scenarios)
scenario_probs = [1/num_scenarios] * num_scenarios  # Equal probability

first_decision = smpc_with_cvxpy(scenarios, scenario_probs, dc_demand, ev_demand)
print(f"Optimal first-stage decision: Datacenter = {first_decision[0]:.1f} kW, EV = {first_decision[1]:.1f} kW")

## 10.9 Professional Example: Grid-Connected Microgrid

Let's conclude with a comprehensive example that might be used in a professional context: a grid-connected microgrid with solar generation, battery storage, and both data center and EV charging loads.

In [None]:
import cvxpy as cp
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

class RenewablePowerSMPC:
    """
    SMPC controller for renewable power allocation in a microgrid.
    """
    
    def __init__(self, horizon=24, num_scenarios=10, time_step_minutes=60):
        """Initialize the SMPC controller."""
        self.horizon = horizon
        self.num_scenarios = num_scenarios
        self.time_step = time_step_minutes / 60.0  # Convert to hours
        
        # System parameters
        self.battery_capacity = 500.0  # kWh
        self.battery_max_charge_rate = 100.0  # kW
        self.battery_max_discharge_rate = 100.0  # kW
        self.battery_efficiency = 0.9  # 90% round-trip efficiency
        
        # Cost parameters
        self.grid_import_price = 0.15  # $/kWh
        self.grid_export_price = 0.05  # $/kWh
        self.datacenter_unmet_penalty = 10.0  # $/kWh
        self.ev_unmet_penalty = 2.0  # $/kWh
        
        # Maximum grid power
        self.max_grid_power = 200.0  # kW
    
    def generate_solar_scenarios(self, base_forecast, uncertainty_profile):
        """Generate solar power scenarios based on forecast uncertainty."""
        scenarios = []
        for _ in range(self.num_scenarios):
            # Generate correlated noise
            rho = 0.7  # Temporal correlation
            noise = np.zeros(self.horizon)
            noise[0] = np.random.normal(0, 1)
            
            for t in range(1, self.horizon):
                noise[t] = rho * noise[t-1] + np.sqrt(1 - rho**2) * np.random.normal(0, 1)
            
            # Apply time-of-day dependent uncertainty
            scenario = []
            for t in range(self.horizon):
                # Apply noise scaled by uncertainty
                uncertainty = uncertainty_profile[t]
                perturbed = base_forecast[t] * (1 + uncertainty * noise[t])
                # Ensure non-negative power
                scenario.append(max(0, perturbed))
            
            scenarios.append(scenario)
        
        return scenarios
    
    def solve(self, current_time, solar_forecast, dc_demand_forecast, 
              ev_demand_forecast, battery_soc, uncertainty_profile):
        """
        Solve the SMPC optimization problem.
        
        Args:
            current_time: Current time
            solar_forecast: Base solar power forecast
            dc_demand_forecast: Datacenter demand forecast
            ev_demand_forecast: EV charging demand forecast
            battery_soc: Current battery state of charge (0-1)
            uncertainty_profile: Forecast uncertainty by hour
            
        Returns:
            Dictionary with optimal decisions for the current time step
        """
        # Generate scenarios
        solar_scenarios = self.generate_solar_scenarios(solar_forecast, uncertainty_profile)
        scenario_probs = [1.0 / self.num_scenarios] * self.num_scenarios
        
        # Current battery energy
        initial_battery = battery_soc * self.battery_capacity
        
        # Variables
        dc_power = {}
        ev_power = {}
        grid_import = {}
        grid_export = {}
        battery_charge = {}
        battery_discharge = {}
        battery_energy = {}
        dc_unmet = {}
        ev_unmet = {}
        
        for s in range(self.num_scenarios):
            for t in range(self.horizon):
                dc_power[s, t] = cp.Variable(nonneg=True)
                ev_power[s, t] = cp.Variable(nonneg=True)
                grid_import[s, t] = cp.Variable(nonneg=True)
                grid_export[s, t] = cp.Variable(nonneg=True)
                battery_charge[s, t] = cp.Variable(nonneg=True)
                battery_discharge[s, t] = cp.Variable(nonneg=True)
                battery_energy[s, t] = cp.Variable(nonneg=True)
                dc_unmet[s, t] = cp.Variable(nonneg=True)
                ev_unmet[s, t] = cp.Variable(nonneg=True)
        
        # Constraints
        constraints = []
        
        # Non-anticipativity constraints (first-stage decisions must be the same)
        for s in range(1, self.num_scenarios):
            constraints.append(dc_power[s, 0] == dc_power[0, 0])
            constraints.append(ev_power[s, 0] == ev_power[0, 0])
            constraints.append(grid_import[s, 0] == grid_import[0, 0])
            constraints.append(grid_export[s, 0] == grid_export[0, 0])
            constraints.append(battery_charge[s, 0] == battery_charge[0, 0])
            constraints.append(battery_discharge[s, 0] == battery_discharge[0, 0])
        
        # Power balance and other constraints
        for s in range(self.num_scenarios):
            for t in range(self.horizon):
                # Power balance
                constraints.append(
                    solar_scenarios[s][t] + grid_import[s, t] + battery_discharge[s, t] == 
                    dc_power[s, t] + ev_power[s, t] + grid_export[s, t] + battery_charge[s, t]
                )
                
                # Grid limits
                constraints.append(grid_import[s, t] <= self.max_grid_power)
                constraints.append(grid_export[s, t] <= self.max_grid_power)
                
                # Battery limits
                constraints.append(battery_charge[s, t] <= self.battery_max_charge_rate)
                constraints.append(battery_discharge[s, t] <= self.battery_max_discharge_rate)
                
                # Cannot charge and discharge simultaneously
                # (This is a non-convex constraint, we'd need binary variables in practice)
                
                # Battery dynamics
                if t == 0:
                    constraints.append(
                        battery_energy[s, t] == initial_battery + 
                        battery_charge[s, t] * self.battery_efficiency * self.time_step - 
                        battery_discharge[s, t] * self.time_step / self.battery_efficiency
                    )
                else:
                    constraints.append(
                        battery_energy[s, t] == battery_energy[s, t-1] + 
                        battery_charge[s, t] * self.battery_efficiency * self.time_step - 
                        battery_discharge[s, t] * self.time_step / self.battery_efficiency
                    )
                
                # Battery capacity constraints
                constraints.append(battery_energy[s, t] <= self.battery_capacity)
                constraints.append(battery_energy[s, t] >= 0.1 * self.battery_capacity)  # Min SOC 10%
                
                # Load constraints
                constraints.append(dc_power[s, t] + dc_unmet[s, t] >= dc_demand_forecast[t])
                constraints.append(ev_power[s, t] + ev_unmet[s, t] >= ev_demand_forecast[t])
        
        # Objective function
        objective_terms = []
        for s in range(self.num_scenarios):
            for t in range(self.horizon):
                # Cost components
                grid_cost = (grid_import[s, t] * self.grid_import_price - 
                             grid_export[s, t] * self.grid_export_price) * self.time_step
                penalty_cost = (dc_unmet[s, t] * self.datacenter_unmet_penalty + 
                               ev_unmet[s, t] * self.ev_unmet_penalty) * self.time_step
                
                # Add to objective with scenario probability
                objective_terms.append(scenario_probs[s] * (grid_cost + penalty_cost))
        
        # Create and solve the problem
        objective = cp.Minimize(sum(objective_terms))
        problem = cp.Problem(objective, constraints)
        
        try:
            problem.solve()
            
            if problem.status != "optimal":
                print(f"Warning: Problem status is {problem.status}")
                return None
            
            # Extract first-stage decisions
            return {
                'datacenter_power': dc_power[0, 0].value,
                'ev_power': ev_power[0, 0].value,
                'grid_import': grid_import[0, 0].value,
                'grid_export': grid_export[0, 0].value,
                'battery_charge': battery_charge[0, 0].value,
                'battery_discharge': battery_discharge[0, 0].value,
                'expected_cost': problem.value,
                'battery_trajectory': [battery_energy[0, t].value for t in range(self.horizon)]
            }
            
        except Exception as e:
            print(f"Error solving optimization: {e}")
            return None

# Example usage with realistic data
def run_simulation():
    """Run a simulation of the microgrid over a day."""
    # Create controller
    controller = RenewablePowerSMPC(horizon=24, num_scenarios=10)
    
    # Time parameters
    start_time = datetime(2023, 7, 15, 0, 0)  # Summer day
    
    # Create realistic solar profile (summer day)
    hours = np.arange(24)
    solar_capacity = 400  # kW
    solar_profile = solar_capacity * np.maximum(0, np.sin(np.pi * (hours - 6) / 12))
    
    # Forecast uncertainty increases with forecast horizon
    base_uncertainty = 0.1  # 10% uncertainty
    uncertainty_profile = [base_uncertainty * (1 + 0.1 * t) for t in range(24)]
    
    # Datacenter has relatively constant load
    dc_base_load = 150  # kW
    dc_profile = dc_base_load + 20 * np.sin(np.pi * hours / 12)
    
    # EV charging peaks in morning and evening
    ev_capacity = 100  # kW
    morning_peak = np.exp(-0.5 * ((hours - 8) / 2) ** 2) * ev_capacity
    evening_peak = np.exp(-0.5 * ((hours - 18) / 2) ** 2) * ev_capacity
    ev_profile = morning_peak + evening_peak
    
    # Initial conditions
    battery_soc = 0.5  # 50% charged
    
    # Run simulation
    results = []
    actual_battery = battery_soc * controller.battery_capacity
    
    for hour in range(24):
        current_time = start_time + timedelta(hours=hour)
        
        # Forecast from current time onwards
        solar_forecast = solar_profile[hour:].tolist() + solar_profile[:hour].tolist()
        dc_forecast = dc_profile[hour:].tolist() + dc_profile[:hour].tolist()
        ev_forecast = ev_profile[hour:].tolist() + ev_profile[:hour].tolist()
        
        # Get uncertainty profile
        uncertainty = uncertainty_profile[:]
        
        # Solve SMPC
        decision = controller.solve(current_time, solar_forecast, dc_forecast, 
                                    ev_forecast, battery_soc, uncertainty)
        
        if decision is None:
            print(f"Failed to find solution at hour {hour}")
            continue
        
        # Simulate actual conditions (using base forecast for simplicity)
        actual_solar = solar_profile[hour]
        actual_dc = dc_profile[hour]
        actual_ev = ev_profile[hour]
        
        # Implement the decisions
        dc_power = min(decision['datacenter_power'], actual_dc)
        ev_power = min(decision['ev_power'], actual_ev)
        grid_import = decision['grid_import']
        grid_export = decision['grid_export']
        battery_charge = decision['battery_charge']
        battery_discharge = decision['battery_discharge']
        
        # Update battery state
        actual_battery += (battery_charge * controller.battery_efficiency - 
                          battery_discharge / controller.battery_efficiency) * controller.time_step
        actual_battery = min(max(actual_battery, 0), controller.battery_capacity)
        battery_soc = actual_battery / controller.battery_capacity
        
        # Calculate power balance
        power_balance = (actual_solar + grid_import + battery_discharge - 
                         dc_power - ev_power - grid_export - battery_charge)
        
        # Record results
        results.append({
            'hour': hour,
            'time': current_time,
            'solar': actual_solar,
            'datacenter_demand': actual_dc,
            'ev_demand': actual_ev,
            'datacenter_power': dc_power,
            'ev_power': ev_power,
            'grid_import': grid_import,
            'grid_export': grid_export,
            'battery_charge': battery_charge,
            'battery_discharge': battery_discharge,
            'battery_soc': battery_soc,
            'power_balance': power_balance,
            'expected_cost': decision['expected_cost']
        })
    
    return pd.DataFrame(results)

# Run simulation and plot results
results_df = run_simulation()

# Plot the results
fig, axs = plt.subplots(3, 1, figsize=(12, 15), sharex=True)

# Power sources and loads
axs[0].plot(results_df['hour'], results_df['solar'], 'y-', label='Solar Generation')
axs[0].plot(results_df['hour'], results_df['grid_import'], 'r-', label='Grid Import')
axs[0].plot(results_df['hour'], results_df['battery_discharge'], 'g-', label='Battery Discharge')
axs[0].plot(results_df['hour'], -results_df['datacenter_power'], 'b-', label='Datacenter Load')
axs[0].plot(results_df['hour'], -results_df['ev_power'], 'c-', label='EV Charging Load')
axs[0].plot(results_df['hour'], -results_df['grid_export'], 'm-', label='Grid Export')
axs[0].plot(results_df['hour'], -results_df['battery_charge'], 'k-', label='Battery Charge')
axs[0].set_ylabel('Power (kW)')
axs[0].set_title('Power Sources and Loads')
axs[0].grid(True)
axs[0].legend()

# Demands vs. allocations
axs[1].plot(results_df['hour'], results_df['datacenter_demand'], 'b--', label='Datacenter Demand')
axs[1].plot(results_df['hour'], results_df['datacenter_power'], 'b-', label='Datacenter Allocation')
axs[1].plot(results_df['hour'], results_df['ev_demand'], 'c--', label='EV Demand')
axs[1].plot(results_df['hour'], results_df['ev_power'], 'c-', label='EV Allocation')
axs[1].set_ylabel('Power (kW)')
axs[1].set_title('Demands vs. Allocations')
axs[1].grid(True)
axs[1].legend()

# Battery state of charge
axs[2].plot(results_df['hour'], results_df['battery_soc'] * 100, 'g-')
axs[2].set_xlabel('Hour of Day')
axs[2].set_ylabel('Battery SoC (%)')
axs[2].set_title('Battery State of Charge')
axs[2].grid(True)

plt.tight_layout()
plt.show()

# Calculate total costs
total_import_cost = sum(results_df['grid_import'] * controller.grid_import_price * controller.time_step)
total_export_revenue = sum(results_df['grid_export'] * controller.grid_export_price * controller.time_step)
net_grid_cost = total_import_cost - total_export_revenue

print(f"Summary Statistics:")
print(f"Total solar generation: {sum(results_df['solar'] * controller.time_step):.1f} kWh")
print(f"Total datacenter consumption: {sum(results_df['datacenter_power'] * controller.time_step):.1f} kWh")
print(f"Total EV charging: {sum(results_df['ev_power'] * controller.time_step):.1f} kWh")
print(f"Total grid import: {sum(results_df['grid_import'] * controller.time_step):.1f} kWh")
print(f"Total grid export: {sum(results_df['grid_export'] * controller.time_step):.1f} kWh")
print(f"Net grid cost: ${net_grid_cost:.2f}")

## 10.10 Key Takeaways and Extensions

In this chapter, we've explored how Stochastic Model Predictive Control can be applied to renewable energy allocation problems. Key takeaways include:

1. Renewable energy introduces uncertainty that must be explicitly modeled
2. SMPC provides a framework for making decisions under uncertainty
3. Scenario-based approaches allow us to consider multiple possible futures
4. The receding horizon principle allows us to adapt as new information becomes available
5. Properly formulated SMPC can balance competing objectives (reliability vs. cost)

Possible extensions to the models we've developed include:

- **More sophisticated uncertainty models**: Using weather forecasts, historical data, and machine learning to generate better scenarios
- **Distributionally robust SMPC**: Optimizing for the worst-case distribution within a set
- **Multi-stage SMPC**: Explicitly modeling decision points and information revelation
- **Chance-constrained SMPC**: Ensuring constraints are satisfied with a specified probability
- **Learning-based SMPC**: Combining reinforcement learning with SMPC for adaptive policies

As renewable energy continues to grow, stochastic optimization methods like SMPC will become increasingly important for managing the inherent variability and uncertainty in these systems.

## 10.11 Exercises

1. Modify the simple SMPC example to include more realistic solar forecasting by using historical solar data.
2. Implement a chance-constrained version of the microgrid example that ensures the datacenter has power with 99% probability.
3. Extend the microgrid model to include multiple renewable sources (e.g., wind and solar) with different uncertainty profiles.
4. Compare the performance of deterministic MPC versus SMPC for a week of operation with varying weather conditions.
5. Develop a multi-stage SMPC formulation that explicitly accounts for when new forecast information becomes available.