# Building Model Predictive Controller
Keywords: model estimation, model predictive controller, load forecast


This notebook demonstrates the use of simplified gray box rc model for model estimation and model predictive controller development.

Exmaple building is a medium-sized office building with two office floors, one mechanical equipment floor, and one floor for NERSC data and computing center. The dataset only covers the two office floors. The building management system monitors and archives building-level electricity usage, HVAC and lighting system states (e.g., setpoint, temperature, flow rate, pressure), indoor environmental conditions (zone air temperature, relative humidity, CO2, indoor air quality), on-site weather (outdoor air temperature, relative humidity, solar radiation), and occupant counts, WiFi connected devices. This dataset supports multiple use cases, such as model predictive control and occupant related demand management.

The raw .zip file includes a set of .CSV files containing time-series data, one file per measurement, as available from the building management system. The processed .zip file includes a metadata .json file, a .ttl file to visualize the data according to BRICK schema, and several .csv files containing all the time-series measurements after cleaning and curation.

## 1.0 Define the data source


In [None]:
import requests
import numpy as np

def get_weather_forecast():
    """Fetch real-time outdoor temperature forecast (next 48 hours)."""
    # Placeholder API (replace with actual weather API)
    response = requests.get("https://api.weather.com/v1/forecast?location=san_diego")
    if response.status_code == 200:
        return np.array(response.json()["temperature"])
    return np.array([10.0] * 48)  # Default placeholder values

def get_electricity_price():
    """Fetch real-time electricity pricing data (next 48 hours)."""
    # Placeholder API (replace with real dynamic pricing API)
    response = requests.get("https://api.electricity-prices.com/v1/tou")
    if response.status_code == 200:
        return np.array(response.json()["prices"])
    return np.array([0.15] * 48)  # Default price (USD/kWh)

def get_co2_emission_factor():
    """Fetch real-time grid CO₂ intensity (next 48 hours)."""
    # Placeholder API (replace with real emissions API)
    response = requests.get("https://api.carbonintensity.org/v1/forecast")
    if response.status_code == 200:
        return np.array(response.json()["emission_factor"])
    return np.array([0.4] * 48)  # Default CO₂ emissions (kgCO₂/kWh)

def get_internal_loads():
    """Fetch real-time occupancy, lighting, and plug load data."""
    # Placeholder API (replace with real source)
    response = requests.get("https://api.buildingloads.com/internal")
    if response.status_code == 200:
        return response.json()
    
    return {
        "occupancy": np.random.randint(0, 10, 48),
        "lighting": np.random.randint(100, 500, 48),
        "plug_loads": np.random.randint(100, 1000, 48)
    }

In [1]:
def compute_heat_gains(occupancy, lighting_power, plug_load_power, solar_radiation, window_area, shading_factor=0.8):
    """
    Compute total internal and external heat gains.

    Parameters:
    - occupancy: People count per time step
    - lighting_power: Power consumption of lights (W)
    - plug_load_power: Power consumption of plug loads (W)
    - solar_radiation: External solar radiation (W/m2)
    - window_area: Total window area (m2)
    - shading_factor: Reduction factor for solar gain (default = 0.8)

    Returns:
    - Dictionary of heat gains
    """
    
    # Sensible heat gain from people (assumed 100W per person)
    Q_occupancy = occupancy * 100  

    # Heat gains from lighting and plug loads
    Q_lights = lighting_power  
    Q_plug_loads = plug_load_power  

    # Solar heat gain through windows
    Q_solar = solar_radiation * window_area * shading_factor  

    return {
        "Q_occupancy": Q_occupancy,
        "Q_lights": Q_lights,
        "Q_plug_loads": Q_plug_loads,
        "Q_solar": Q_solar
    }

In [3]:
def heat_pump_performance(Q_HVAC, To, COP_rated=3.5, T_rated=7):
    k = 0.05  # Degradation factor
    COP = max(1.0, COP_rated - k * (To - T_rated))
    P_HP = Q_HVAC / COP
    return COP, P_HP

def air_handling_unit(Q_HVAC, Tsupply, Treturn, airflow_rate):
    Cp = 1.005  # Specific heat capacity of air (kJ/kgK)
    rho = 1.2  # Air density (kg/m3)

    return Cp * rho * airflow_rate * (Tsupply - Treturn) + Q_HVAC

In [4]:
import gurobipy as gp
from gurobipy import GRB

def optimize_mpc(T_i, T_o, T_s, setpoint, time_horizon, params, electricity_prices, co2_emission_factors, objective):
    alpha = 0.1  # Energy cost weight
    beta = 1.0  # Comfort weight

    # Create Gurobi model
    mpc = gp.Model("MPC_Heat_Pump")
    
    # Define variables
    Ti = mpc.addVars(time_horizon, lb=18, ub=25, name="IndoorTemp")  # Temperature range
    Q_HVAC = mpc.addVars(time_horizon, lb=0, ub=5000, name="HVACHeat")  # HVAC heating
    P_HP = mpc.addVars(time_horizon, lb=0, ub=10, name="HeatPumpPower")  # Heat pump power
    
    # Define objective function
    if objective == "energy-cost":
        cost = gp.quicksum(electricity_prices[t] * P_HP[t] for t in range(time_horizon))
        mpc.setObjective(cost, GRB.MINIMIZE)
    elif objective == "peak-demand":
        peak_demand = mpc.addVar(lb=0, name="PeakDemand")
        for t in range(time_horizon):
            mpc.addConstr(P_HP[t] <= peak_demand)
        mpc.setObjective(peak_demand, GRB.MINIMIZE)
    elif objective == "carbon-emission":
        carbon_impact = gp.quicksum(co2_emission_factors[t] * P_HP[t] for t in range(time_horizon))
        mpc.setObjective(carbon_impact, GRB.MINIMIZE)
    
    # Define system dynamics constraints
    for t in range(1, time_horizon):
        mpc.addConstr(
            params["C1"] * (Ti[t] - Ti[t-1]) == (T_o[t] - Ti[t]) / params["R1"] +
            (T_s[t] - Ti[t]) / params["R2"] + Q_HVAC[t]
        )

    # Heat pump power constraints
    for t in range(time_horizon):
        mpc.addConstr(P_HP[t] == Q_HVAC[t] / 3.5)  # COP of 3.5

    # Solve optimization problem
    mpc.optimize()

    return {
        "Ti": [Ti[t].X for t in range(time_horizon)],
        "Q_HVAC": [Q_HVAC[t].X for t in range(time_horizon)],
        "P_HP": [P_HP[t].X for t in range(time_horizon)]
    }

ModuleNotFoundError: No module named 'gurobipy'

In [None]:
import numpy as np
from parameter_estimation.static_estimation import estimate_parameters as static_estimation
from parameter_estimation.adaptive_rls import AdaptiveRLSParameterTuning
from parameter_estimation.adaptive_ml import AdaptiveMLParameterTuning
from heat_gains import compute_heat_gains
from mpc_optimizer import optimize_mpc
from data_sources import get_weather_forecast, get_internal_loads, get_electricity_price, get_co2_emission_factor

# Select parameter tuning approach: "static", "adaptive-rls", or "adaptive-ml"
parameter_tuning_type = "adaptive-ml"  # Change as needed

# Select gray-box model type: "1R1C" or "2R2C"
model_type = "2R2C"

# Step 1: Fetch real-time external data
weather_forecast = get_weather_forecast()
internal_loads = get_internal_loads()
electricity_prices = get_electricity_price()
co2_emission_factors = get_co2_emission_factor()

# Define building characteristics
window_area = 50  # m² window surface

# Step 2: Prepare measured data for parameter tuning
measured_data = {
    "T_i": np.array([20.0] * 48),
    "T_o": np.array(weather_forecast["temperature"]),
    "T_s": np.array([15.0] * 48),
    "Q_HVAC": np.array([1000.0] * 48)
}

# Compute heat gains for each timestep
heat_gains = [compute_heat_gains(
    occupancy=internal_loads["occupancy"][t],
    lighting_power=internal_loads["lighting"][t],
    plug_load_power=internal_loads["plug_loads"][t],
    solar_radiation=weather_forecast["solar"][t],
    window_area=window_area
) for t in range(48)]

# Add heat gain data to measured dataset
for key in ["Q_occupancy", "Q_lights", "Q_plug_loads", "Q_solar"]:
    measured_data[key] = np.array([hg[key] for hg in heat_gains])

# Step 3: Choose Parameter Tuning Method
if parameter_tuning_type == "static":
    params = static_estimation(measured_data, model_type=model_type)
elif parameter_tuning_type == "adaptive-rls":
    rls_tuner = AdaptiveRLSParameterTuning(model_type=model_type)
    params = rls_tuner.update_parameters(measured_data)
elif parameter_tuning_type == "adaptive-ml":
    ml_tuner = AdaptiveMLParameterTuning(model_type=model_type)
    params = ml_tuner.update_parameters(measured_data)
else:
    raise ValueError("Invalid parameter tuning type selected!")

print(f"Updated Parameters ({parameter_tuning_type} - {model_type} Model):", params)

# Step 4: Run MPC Optimization
mpc_results = optimize_mpc(
    T_i=measured_data["T_i"],
    T_o=measured_data["T_o"],
    T_s=measured_data["T_s"],
    setpoint=22,
    time_horizon=48,
    params=params,
    electricity_prices=electricity_prices,
    co2_emission_factors=co2_emission_factors,
    objective="energy-cost"
)

# Output MPC results
print("Optimized Indoor Temperatures:", mpc_results["Ti"])
print("Optimized HVAC Heating Power:", mpc_results["Q_HVAC"])
print("Optimized Heat Pump Power:", mpc_results["P_HP"])