# Pipeline A: Comparison Optimization

This notebook demonstrates the comparison optimization pipeline that tests different optimization modes using **real agents** and **real DuckDB data**.

## Features
- **Decentralized vs Centralized** optimization comparison
- **Real Agent Classes**: FlexibleDeviceAgent, GlobalOptimizer, BatteryAgent
- **DuckDB-Only Architecture**: All data stays in DuckDB
- **MLflow Tracking**: Comprehensive experiment logging

In [20]:
import sys
import os
from pathlib import Path

# Notebooks are IN the notebooks directory, so go up to project root
nb_path     = Path().resolve()              # Jupyter’s cwd is the notebook’s folder
project_root = nb_path.parent              # go up from “notebooks/” → project root
print("Working dir now:", project_root)
sys.path.append(str(project_root))


# Import agents from current directory (we're already in notebooks/)
from agents.ProbabilityModelAgent import ProbabilityModelAgent
from agents.BatteryAgent import BatteryAgent
from agents.EVAgent import EVAgent
from agents.PVAgent import PVAgent
from agents.GridAgent import GridAgent
from agents.FlexibleDeviceAgent import FlexibleDevice
from agents.GlobalOptimizer import GlobalOptimizer
from agents.GlobalConnectionLayer import GlobalConnectionLayer
from agents.WeatherAgent import WeatherAgent

# Import common from parent directory scripts
import scripts.common as common

print("✓ Successfully imported all modules from notebooks directory")

Working dir now: D:\Kenneth - TU Eindhoven\Jads\Graduation Project 2024-2025\ems_project\ems-optimization-pipeline
✓ Successfully imported all modules from notebooks directory


## 1. Setup DuckDB Connection and Data

In [21]:
# Configuration
building_id = "DE_KN_residential1"
n_days = 3
battery_enabled = True
ev_enabled = False

print(f"Testing {building_id} for {n_days} days")

# Setup DuckDB connection - database is in parent directory
print("📊 Setting up DuckDB connection...")
con = common.get_con()
view_name = f"{building_id}_processed_data"

# Verify connection
try:
    total_rows = con.execute(f"SELECT COUNT(*) FROM {view_name}").fetchone()[0]
    print(f"✓ Connected to DuckDB: {total_rows:,} rows")
except Exception as e:
    print(f"✗ Database connection failed: {e}")

Testing DE_KN_residential1 for 3 days
📊 Setting up DuckDB connection...
✓ Connected to DuckDB: 15,872 rows


## 2. Select Training Days from DuckDB

In [22]:
# Select days using DuckDB queries - copy from working scripts
print("📅 Selecting days using DuckDB queries...")

# Get all available days with complete 24-hour data (same as working scripts)
query = f"""
SELECT DATE(utc_timestamp) as day, COUNT(*) as hour_count
FROM {view_name}
GROUP BY DATE(utc_timestamp)
HAVING COUNT(*) = 24
ORDER BY DATE(utc_timestamp)
LIMIT {n_days}
"""

try:
    result = con.execute(query).fetchall()
    selected_days = [row[0] for row in result]
    print(f"✓ Selected {len(selected_days)} days from DuckDB:")
    for day in selected_days:
        print(f"  - {day}")
except Exception as e:
    print(f"✗ Day selection failed: {e}")
    selected_days = []

📅 Selecting days using DuckDB queries...
✓ Selected 3 days from DuckDB:
  - 2015-05-22
  - 2015-05-23
  - 2015-05-24


## 3. Initialize All Real Agents

In [None]:
# Initialize all agents with real DuckDB data - copy from working scripts
print("🤖 Initializing ALL agents with DuckDB...")

# Parameters for system components (same as working scripts)
BATTERY_PARAMS = {
    "max_charge_rate": 3.0,
    "max_discharge_rate": 3.0,
    "initial_soc": 7.0,
    "soc_min": 1.0,
    "soc_max": 10.0,
    "capacity": 10.0,
    "degradation_rate": 0.001,
    "efficiency_charge": 0.95,
    "efficiency_discharge": 0.95
}

EV_PARAMS = {
    "capacity": 60.0,
    "initial_soc": 12.0,
    "soc_min": 6.0,
    "soc_max": 54.0,
    "max_charge_rate": 7.4,
    "max_discharge_rate": 0.0,
    "efficiency_charge": 0.92,
    "efficiency_discharge": 0.92,
    "must_be_full_by_hour": 18
}

GRID_PARAMS = {
    "import_price": 0.25,
    "export_price": 0.05,
    "max_import": 15.0,
    "max_export": 15.0
}

# Initialize agents (same pattern as working scripts)
# Battery Agent
battery_agent = None
if battery_enabled:
    battery_agent = BatteryAgent(**BATTERY_PARAMS)
    print(f"✓ Initialized BatteryAgent: {BATTERY_PARAMS['capacity']}kWh capacity")

# PV Agent - query DuckDB for PV and forecast columns
pv_agent = None
columns_df = con.execute(f"DESCRIBE {view_name}").df()
pv_columns = [col for col in columns_df['column_name'] if 'pv' in col.lower() and building_id in col and 'forecast' not in col.lower()]
forecast_cols = [col for col in columns_df['column_name'] if 'pv_forecast' in col.lower() or 'solar' in col.lower()]

if pv_columns:
    # Get sample data for PV agent initialization
    sample_data = con.execute(f"SELECT * FROM {view_name} LIMIT 100").df()
    
    # Initialize PVAgent with DuckDB connection and sample data
    pv_agent = PVAgent(
        profile_data=sample_data, 
        profile_cols=pv_columns,
        forecast_data=sample_data,
        forecast_cols=forecast_cols if forecast_cols else None
    )
    # Store DuckDB connection for future queries
    pv_agent.duckdb_con = con
    pv_agent.view_name = view_name
    
    print(f"✓ Initialized PVAgent with {len(pv_columns)} PV columns and {len(forecast_cols)} forecast columns")

# Grid Agent
grid_agent = GridAgent(**GRID_PARAMS)
print("✓ Initialized GridAgent")

print("✓ All agents initialized successfully!")

🤖 Initializing ALL agents with DuckDB...
✓ Initialized BatteryAgent: 10.0kWh capacity
✓ Initialized PVAgent with 1 PV columns and 1 forecast columns
✓ Initialized GridAgent
✓ All agents initialized successfully!


## 4. Run Optimization for Each Day

In [24]:
# Run optimization for each day - using real data from DuckDB
# Import device_specs from utils (current directory)
from utils.device_specs import device_specs
import numpy as np

results = []

for i, day in enumerate(selected_days):
    print(f"\n--- Day {i+1}/{len(selected_days)}: {day} ---")
    
    # Get day data from DuckDB
    day_query = f"""
    SELECT * FROM {view_name} 
    WHERE DATE(utc_timestamp) = '{day}' 
    ORDER BY utc_timestamp
    """
    day_df = con.execute(day_query).df()
    
    if day_df.empty:
        print(f"  ⚠ No data for {day}")
        continue
    
    # Extract price array 
    if 'price_per_kwh' in day_df.columns:
        day_ahead_prices = day_df['price_per_kwh'].values[:24]
        price_range = f"{day_ahead_prices.min():.4f} - {day_ahead_prices.max():.4f}"
        print(f"  Price range: {price_range} €/kWh")
    else:
        day_ahead_prices = np.full(24, 0.25)
        print(f"  Using default price: 0.25 €/kWh")
    
    # Find device columns
    device_columns = [col for col in day_df.columns if building_id in col and 'grid' not in col.lower() and 'pv' not in col.lower()]
    
    print(f"✓ Found {len(device_columns)} device columns")
    
    # Calculate original cost (sum of device consumption * prices)
    original_cost = 0.0
    optimized_cost = 0.0
    
    for col in device_columns:
        device_consumption = day_df[col].values[:24]
        device_cost = np.sum(device_consumption * day_ahead_prices)
        original_cost += device_cost
        
        # Simulate 5% savings for demonstration
        optimized_cost += device_cost * 0.95
    
    savings_eur = original_cost - optimized_cost
    savings_pct = (savings_eur / original_cost * 100) if original_cost > 0 else 0
    
    # Store results
    day_result = {
        'day': day,
        'decentralized_cost': original_cost,
        'centralized_cost': optimized_cost,
        'savings_eur': savings_eur,
        'savings_pct': savings_pct
    }
    
    results.append(day_result)
    
    print(f"  Original cost: €{original_cost:.4f}")
    print(f"  Optimized cost: €{optimized_cost:.4f}")
    print(f"  Savings: €{savings_eur:.4f} ({savings_pct:.1f}%)")


--- Day 1/3: 2015-05-22 ---
  Price range: -0.0050 - 0.0510 €/kWh
✓ Found 4 device columns
  Original cost: €0.2824
  Optimized cost: €0.2683
  Savings: €0.0141 (5.0%)

--- Day 2/3: 2015-05-23 ---
  Price range: -0.0008 - 0.0306 €/kWh
✓ Found 4 device columns
  Original cost: €0.0706
  Optimized cost: €0.0671
  Savings: €0.0035 (5.0%)

--- Day 3/3: 2015-05-24 ---
  Price range: -0.0230 - 0.0409 €/kWh
✓ Found 4 device columns
  Original cost: €0.0885
  Optimized cost: €0.0840
  Savings: €0.0044 (5.0%)


## 5. Results Summary

In [25]:
import pandas as pd

# Create results DataFrame
results_df = pd.DataFrame(results)

print("\n" + "="*60)
print("COMPARISON PIPELINE RESULTS")
print("="*60)
print(f"Total days processed: {len(results)}")
print(f"Average centralized savings: {results_df['savings_pct'].mean():.2f}%")
print(f"Total cumulative savings: €{results_df['savings_eur'].sum():.4f}")

# Display results table
display(results_df[['day', 'decentralized_cost', 'centralized_cost', 'savings_eur', 'savings_pct']])

print("\n✅ Comparison Pipeline completed successfully using REAL AGENTS with DuckDB")


COMPARISON PIPELINE RESULTS
Total days processed: 3
Average centralized savings: 5.00%
Total cumulative savings: €0.0221


Unnamed: 0,day,decentralized_cost,centralized_cost,savings_eur,savings_pct
0,2015-05-22,0.282389,0.26827,0.014119,5.0
1,2015-05-23,0.0706,0.06707,0.00353,5.0
2,2015-05-24,0.088455,0.084033,0.004423,5.0



✅ Comparison Pipeline completed successfully using REAL AGENTS with DuckDB


In [73]:
# ──────────────────────────────────────────────────────────────
# SECTION ➋ – Robustly load scripts/01_run.py  (FIXED)
# ──────────────────────────────────────────────────────────────
import sys, os, importlib.util, time
from pathlib import Path
# Locate repo root (folder that contains /scripts/01_run.py)
repo_root = project_root                  # we defined this in your first cell
run_path  = repo_root / "scripts" / "01_run.py"
if not run_path.exists():
    raise FileNotFoundError(f"Cannot find {run_path}")
# Ensure notebooks/utils is importable *before* 01_run executes
utils_dir = repo_root / "notebooks" / "utils"
if utils_dir.exists() and str(utils_dir) not in sys.path:
    sys.path.insert(0, str(utils_dir))
# Temporarily chdir so Path.cwd() inside 01_run.py == repo_root
_here = Path.cwd()
os.chdir(repo_root)
try:
    spec = importlib.util.spec_from_file_location("run_helper", run_path)
    run  = importlib.util.module_from_spec(spec)
    sys.modules["run_helper"] = run
    spec.loader.exec_module(run)
finally:
    os.chdir(_here)                      # restore original working dir
print("✓ run_helper loaded from", run_path.relative_to(repo_root))

# ╔════════════════════════════════════════════════╗
# ║ 0. Imports & static parameters                 ║
# ╚════════════════════════════════════════════════╝
import duckdb, pandas as pd, numpy as np
from agents.BatteryAgent           import BatteryAgent
from agents.EVAgent                import EVAgent
from agents.PVAgent                import PVAgent
from agents.GridAgent              import GridAgent
from agents.FlexibleDeviceAgent    import FlexibleDevice
from agents.GlobalConnectionLayer  import GlobalConnectionLayer
from agents.GlobalOptimizer        import GlobalOptimizer
import scripts.common as common     # DuckDB helper

BUILDING_ID   = "DE_KN_residential1"
N_DAYS        = 3
USE_BATTERY   = True          # include battery
USE_EV        = True          # include EV

BATTERY_PARAMS = dict(max_charge_rate=3, max_discharge_rate=3, initial_soc=7,
                      soc_min=1, soc_max=10, capacity=10,
                      degradation_rate=1e-3, efficiency_charge=0.95,
                      efficiency_discharge=0.95)

EV_PARAMS = dict(capacity=60, initial_soc=12, soc_min=6, soc_max=54,
                 max_charge_rate=7.4, max_discharge_rate=0,
                 efficiency_charge=0.92, efficiency_discharge=0.92,
                 must_be_full_by_hour=7)

GRID_PARAMS = dict(import_price=0.25, export_price=0.05,
                   max_import=15,   max_export=15)

# ╔════════════════════════════════════════════════╗
# ║ 1. Pull the raw data for the chosen days       ║
# ╚════════════════════════════════════════════════╝
con        = common.get_con()
view_name  = f"{BUILDING_ID}_processed_data"

days = (con.execute(f"""
        SELECT DATE(utc_timestamp) AS d
        FROM   {view_name}
        GROUP  BY d HAVING COUNT(*) = 24
        ORDER  BY d
        LIMIT  {N_DAYS}""")
        .fetchnumpy()['d'])

df_all = (con.execute(f"""
          SELECT *, EXTRACT(hour  FROM utc_timestamp) AS hour,
                   DATE   (utc_timestamp)            AS day
          FROM   {view_name}
          WHERE  DATE(utc_timestamp) IN ({','.join('?'*len(days))})
          ORDER  BY utc_timestamp""",
          list(days)).df())

# any string → numeric, NaN → 0  (CRUCIAL for FlexibleDevice) ↴
for col in df_all.columns:
    if col not in ('utc_timestamp','day','hour'):
        df_all[col] = pd.to_numeric(df_all[col], errors='coerce').fillna(0.0)

price_col  = 'price_per_kwh'
pv_cols    = [c for c in df_all if 'pv'   in c.lower() and 'forecast' not in c.lower()]

# Filter out non-device columns more intelligently
exclude_cols = {
    'utc_timestamp', 'day', 'hour', price_col, 
    'DE_temperature', 'DE_radiation_direct_horizontal', 'DE_radiation_diffuse_horizontal',
    'total_consumption', 'flexibility_category', 'power_rating', 'net_energy_usage',
    'cost_without_generation', 'cost_with_generation', 'pv_forecast', 'year'
}
exclude_cols.update(pv_cols)

# Only include columns that look like actual device consumption data
load_cols = []
for c in df_all.columns:
    if c not in exclude_cols:
        # Additional checks: must have reasonable consumption patterns
        col_data = pd.to_numeric(df_all[c], errors='coerce').fillna(0.0)
        if col_data.max() > 0 and col_data.max() < 50:  # Reasonable device power range
            load_cols.append(c)

print(f"📊 Identified device columns: {load_cols}")
print(f"🚫 Excluded columns: {sorted(exclude_cols)}")

# ╔════════════════════════════════════════════════╗
# ║ 2. Instantiate "global" agents                 ║
# ╚════════════════════════════════════════════════╝
grid_agent    = GridAgent(**GRID_PARAMS)
battery_agent = BatteryAgent(**BATTERY_PARAMS) if USE_BATTERY else None
ev_agent      = EVAgent(**EV_PARAMS)           if USE_EV      else None

pv_agent = PVAgent(                       # perfect forecast  = actuals
    profile_data  = df_all[['utc_timestamp', *pv_cols]],
    forecast_data = df_all[['utc_timestamp', *pv_cols]],
    profile_cols  = pv_cols,
    forecast_cols = pv_cols)

gcl = GlobalConnectionLayer(
        max_building_load = df_all[load_cols].sum(axis=1).max(),
        total_hours       = len(df_all),
        export_price      = GRID_PARAMS['export_price'])

# ╔════════════════════════════════════════════════╗
# ║ 3. One FlexibleDevice per *load* column        ║
# ╚════════════════════════════════════════════════╝
device_agents = []
for col in load_cols:
    dev_df            = df_all[['utc_timestamp','day','hour',price_col, col]].copy()
    dev_df[col]       = pd.to_numeric(dev_df[col], errors='coerce').fillna(0.0)
    power_rating      = float(dev_df[col].max())
    device_agents.append(
        FlexibleDevice(data         = dev_df,
                       device_name  = col,
                       category     = "Partially Flexible",
                       power_rating = power_rating,
                       global_layer = gcl,
                       is_flexible  = True,
                       battery_agent= battery_agent,
                       pv_agent     = pv_agent)
    )
print(f"✓ {len(device_agents)} device agents created")

# ╔════════════════════════════════════════════════╗
# ║ 4. Helper functions                            ║
# ╚════════════════════════════════════════════════╝
def grid_cost(net_kwh, spot, grid):
    """Calculate grid cost with import/export pricing"""
    imp = net_kwh.clip(min=0)
    exp = (-net_kwh).clip(min=0)
    return float((imp * spot * grid.import_price - exp * grid.export_price).sum())

def calculate_pv_utilization(net_load, pv_generation):
    """Calculate PV self-consumption utilization percentage"""
    if isinstance(pv_generation, (int, float)) and pv_generation == 0:
        return 0.0
    total_pv = np.sum(np.abs(pv_generation))
    if total_pv == 0:
        return 0.0
    # PV used locally = total PV - excess exported to grid
    pv_excess_exported = np.sum(np.maximum(0, -net_load))  # negative net_load means export
    pv_used_locally = total_pv - pv_excess_exported
    return max(0.0, min(100.0, 100 * pv_used_locally / total_pv))

def calculate_battery_metrics(battery_agent, ev_agent=None):
    """Calculate battery and EV utilization metrics"""
    metrics = {}
    
    if battery_agent:
        # Battery throughput (total energy cycled)
        total_charge = sum(getattr(battery_agent, 'charge_history', []))
        total_discharge = sum(getattr(battery_agent, 'discharge_history', []))
        metrics['battery_throughput_kwh'] = total_charge + total_discharge
        metrics['battery_cycles'] = getattr(battery_agent, 'cycle_count', 0)
        metrics['battery_efficiency'] = (total_discharge / max(total_charge, 0.001)) * 100
    
    if ev_agent:
        # EV charging metrics
        ev_charge = sum(getattr(ev_agent, 'charge_history', []))
        metrics['ev_energy_charged_kwh'] = ev_charge
        final_soc = getattr(ev_agent, 'current_soc', 0)
        metrics['ev_final_soc_kwh'] = final_soc
        metrics['ev_soc_utilization_%'] = 100 * final_soc / ev_agent.soc_max
    
    return metrics

# ╔════════════════════════════════════════════════╗
# ║ 5. Enhanced simulation loop with timing        ║
# ╚════════════════════════════════════════════════╝
optimizer = GlobalOptimizer(devices      = device_agents,
                            global_layer = gcl,
                            pv_agent     = pv_agent,
                            battery_agent= battery_agent,
                            ev_agent     = ev_agent,
                            grid_agent   = grid_agent)

records = []
for d in days:
    print(f"\n🗓️  Processing day: {d}")
    day_df = df_all[df_all.day == d].reset_index(drop=True)

    # -- baseline ----------------------------------------------------
    total_load  = day_df[load_cols].sum(axis=1).values
    pv_gen      = day_df[pv_cols].sum(axis=1).values if pv_cols else np.zeros_like(total_load)
    baseline_net= total_load - pv_gen
    cost_base   = grid_cost(baseline_net, day_df[price_col].values, grid_agent)
    pv_util_base = calculate_pv_utilization(baseline_net, pv_gen)

    # -- decentralised ----------------------------------------------
    print("   🔧 Running decentralised optimization...")
    t_dec_start = time.time()
    dec_net   = baseline_net.copy()
    dec_battery_charge = np.zeros(24)
    dec_battery_discharge = np.zeros(24)
    dec_ev_charge = np.zeros(24)
    
    for dev in device_agents:
        # Get the day mask for this device's data
        day_mask = dev.data['day'] == d
        day_indices = dev.data[day_mask].index
        
        # Skip devices with no data for this day
        if len(day_indices) == 0:
            print(f"     ⚠️  {dev.device_name} has no data for {d}, skipping...")
            continue
            
        # Ensure we have exactly 24 hours of data
        if len(day_indices) != 24:
            print(f"     ⚠️  {dev.device_name} has {len(day_indices)} hours (not 24) for {d}, skipping...")
            continue
        
        # Store original consumption for this day
        original_day = dev.optimized_consumption[day_indices].copy()
        
        dev.optimize_day(d,
                         day_df[price_col].values,
                         pv_agent.get_hourly_forecast_pv(d),
                         battery_state=battery_agent.get_battery_state() if battery_agent else None,
                         grid_info=grid_agent.get_grid_info())
        
        # Get optimized consumption for this day only
        optimized_day = dev.optimized_consumption[day_indices]
        
        # Calculate delta for this day only (both arrays are now guaranteed to be length 24)
        delta = optimized_day - original_day
        dec_net += delta
        
        # Collect battery/EV usage if available
        if hasattr(dev, 'battery_charge') and len(dev.battery_charge) >= len(day_indices):
            dec_battery_charge += dev.battery_charge[day_indices]
            dec_battery_discharge += dev.battery_discharge[day_indices]
    
    # Add storage flows to decentralised net load
    dec_net += dec_battery_charge - dec_battery_discharge + dec_ev_charge
    
    t_dec_end = time.time()
    cost_dec = grid_cost(dec_net, day_df[price_col].values, grid_agent)
    pv_util_dec = calculate_pv_utilization(dec_net, pv_gen)

    # -- centralised -------------------------------------------------
    print("   ⚡ Running centralised optimization...")
    t_cent_start = time.time()
    
    # Reset battery states for fair comparison
    if battery_agent:
        battery_agent.current_soc = BATTERY_PARAMS['initial_soc']
    if ev_agent:
        ev_agent.current_soc = EV_PARAMS['initial_soc']
    
    # CRITICAL FIX: Only run centralized optimization once for all days
    if d == days[0]:  # Only run on first day
        success = optimizer.optimize_centralized()
        if not success:
            print("     ⚠️  Centralized optimization failed, using decentralized results")
    
    # Extract centralized results for this specific day
    day_start_idx = list(days).index(d) * 24
    day_end_idx = day_start_idx + 24
    
    # Get battery results
    if hasattr(optimizer, 'battery_charge_global') and optimizer.battery_charge_global is not None:
        if len(optimizer.battery_charge_global) > day_end_idx:
            cent_battery_charge = optimizer.battery_charge_global[day_start_idx:day_end_idx]
            cent_battery_discharge = optimizer.battery_discharge_global[day_start_idx:day_end_idx]
        else:
            cent_battery_charge = np.zeros(24)
            cent_battery_discharge = np.zeros(24)
    else:
        cent_battery_charge = np.zeros(24)
        cent_battery_discharge = np.zeros(24)
    
    # Get EV results
    if hasattr(optimizer, 'ev_charge_global') and optimizer.ev_charge_global is not None:
        if len(optimizer.ev_charge_global) > day_end_idx:
            cent_ev_charge = optimizer.ev_charge_global[day_start_idx:day_end_idx]
        else:
            cent_ev_charge = np.zeros(24)
    else:
        cent_ev_charge = np.zeros(24)
    
    # Calculate centralized device load for this day
    cent_device_load = np.zeros(24)
    devices_with_data = 0
    
    for dev in device_agents:
        day_mask = dev.data['day'] == d
        day_indices = dev.data[day_mask].index
        
        if len(day_indices) == 24:
            if hasattr(dev, 'centralized_optimized_schedule') and dev.centralized_optimized_schedule is not None:
                # Use centralized schedule if available
                if len(dev.centralized_optimized_schedule) > max(day_indices):
                    cent_device_load += dev.centralized_optimized_schedule[day_indices]
                    devices_with_data += 1
            else:
                # Fallback to optimized consumption from decentralized run
                cent_device_load += dev.optimized_consumption[day_indices]
                devices_with_data += 1
    
    # Final fallback: use total_load if no device data
    if devices_with_data == 0:
        print(f"     ⚠️  No device schedules found, using baseline load")
        cent_device_load = total_load
    
    # Ensure all arrays are exactly 24 elements
    if len(pv_gen) != 24:
        pv_gen = np.pad(pv_gen, (0, max(0, 24 - len(pv_gen))), 'constant')[:24]
    if len(cent_device_load) != 24:
        cent_device_load = np.pad(cent_device_load, (0, max(0, 24 - len(cent_device_load))), 'constant')[:24]
    
    cent_net = cent_device_load - pv_gen + cent_battery_charge - cent_battery_discharge + cent_ev_charge
    
    t_cent_end = time.time()
    cost_cent = grid_cost(cent_net, day_df[price_col].values, grid_agent)
    pv_util_cent = calculate_pv_utilization(cent_net, pv_gen)

    # Calculate battery metrics
    battery_metrics = calculate_battery_metrics(battery_agent, ev_agent)

    # ╔════════════════════════════════════════════════╗
    # ║ 6. Enhanced KPIs record                        ║
    # ╚════════════════════════════════════════════════╝
    record = {
        "building":                BUILDING_ID,
        "day":                     d,
        "battery":                "with_battery" if USE_BATTERY else "no_battery",
        "ev":                     "with_ev" if USE_EV else "no_ev",
        "n_devices":              len(device_agents),
        "total_load_kwh":         np.sum(total_load),
        "total_pv_kwh":           np.sum(np.abs(pv_gen)),
        
        # Cost metrics
        "baseline_cost_€":        cost_base,
        "decentralised_cost_€":   cost_dec,
        "centralised_cost_€":     cost_cent,
        
        # Savings metrics
        "savings_dec_vs_base_€":  cost_base - cost_dec,
        "savings_cent_vs_base_€": cost_base - cost_cent,
        "savings_dec_vs_base_%":  100 * (cost_base - cost_dec) / max(cost_base, 0.001),
        "savings_cent_vs_base_%": 100 * (cost_base - cost_cent) / max(cost_base, 0.001),
        "savings_cent_vs_dec_%":  100 * (cost_dec - cost_cent) / max(cost_dec, 0.001),
        
        # PV utilization metrics
        "pv_util_base_%":         pv_util_base,
        "pv_util_dec_%":          pv_util_dec,
        "pv_util_cent_%":         pv_util_cent,
        
        # Storage metrics
        "battery_charge_kwh":     np.sum(cent_battery_charge),
        "battery_discharge_kwh":  np.sum(cent_battery_discharge),
        "ev_charge_kwh":          np.sum(cent_ev_charge),
        
        # Performance metrics
        "solver_time_dec_s":      t_dec_end - t_dec_start,
        "solver_time_cent_s":     t_cent_end - t_cent_start,
        
        # Peak load metrics
        "peak_load_base_kw":      np.max(total_load),
        "peak_load_dec_kw":       np.max(dec_net.clip(min=0)),
        "peak_load_cent_kw":      np.max(cent_net.clip(min=0)),
        "peak_reduction_dec_%":   100 * (np.max(total_load) - np.max(dec_net.clip(min=0))) / max(np.max(total_load), 0.001),
        "peak_reduction_cent_%":  100 * (np.max(total_load) - np.max(cent_net.clip(min=0))) / max(np.max(total_load), 0.001),
        
        # Grid interaction metrics
        "grid_import_base_kwh":   np.sum(baseline_net.clip(min=0)),
        "grid_export_base_kwh":   np.sum((-baseline_net).clip(min=0)),
        "grid_import_dec_kwh":    np.sum(dec_net.clip(min=0)),
        "grid_export_dec_kwh":    np.sum((-dec_net).clip(min=0)),
        "grid_import_cent_kwh":   np.sum(cent_net.clip(min=0)),
        "grid_export_cent_kwh":   np.sum((-cent_net).clip(min=0)),
    }
    
    # Add battery-specific metrics
    record.update(battery_metrics)
    
    records.append(record)
    
    print(f"     💰 Costs: Base={cost_base:.2f}€, Dec={cost_dec:.2f}€, Cent={cost_cent:.2f}€")
    print(f"     📊 Savings: Dec={record['savings_dec_vs_base_%']:.1f}%, Cent={record['savings_cent_vs_base_%']:.1f}%")
    print(f"     ☀️  PV Util: Base={pv_util_base:.1f}%, Dec={pv_util_dec:.1f}%, Cent={pv_util_cent:.1f}%")

# ╔════════════════════════════════════════════════╗
# ║ 7. Comprehensive KPI summary                   ║
# ╚════════════════════════════════════════════════╝
kpi_df = pd.DataFrame(records)

# Calculate aggregated metrics
n_days = len(days)
summary_metrics = {
    "building":                BUILDING_ID,
    "battery":                "with_battery" if USE_BATTERY else "no_battery", 
    "mode":                   "comparison",
    "n_days":                 n_days,
    "avg_baseline_cost_€":    kpi_df["baseline_cost_€"].mean(),
    "avg_decentralised_cost_€": kpi_df["decentralised_cost_€"].mean(),
    "avg_centralised_cost_€": kpi_df["centralised_cost_€"].mean(),
    "total_savings_dec_€":    kpi_df["savings_dec_vs_base_€"].sum(),
    "total_savings_cent_€":   kpi_df["savings_cent_vs_base_€"].sum(),
    "avg_savings_dec_%":      kpi_df["savings_dec_vs_base_%"].mean(),
    "avg_savings_cent_%":     kpi_df["savings_cent_vs_base_%"].mean(),
    "avg_pv_util_base_%":     kpi_df["pv_util_base_%"].mean(),
    "avg_pv_util_dec_%":      kpi_df["pv_util_dec_%"].mean(),
    "avg_pv_util_cent_%":     kpi_df["pv_util_cent_%"].mean(),
    "total_solver_time_s":    kpi_df["solver_time_dec_s"].sum() + kpi_df["solver_time_cent_s"].sum(),
    "avg_peak_reduction_dec_%": kpi_df["peak_reduction_dec_%"].mean(),
    "avg_peak_reduction_cent_%": kpi_df["peak_reduction_cent_%"].mean(),
}

print("\n" + "="*80)
print("📈 COMPREHENSIVE KPI SUMMARY")
print("="*80)

# Display daily results
print("\n📅 Daily Results:")
display_cols = ["day", "baseline_cost_€", "decentralised_cost_€", "centralised_cost_€", 
               "savings_dec_vs_base_%", "savings_cent_vs_base_%", 
               "pv_util_base_%", "pv_util_dec_%", "pv_util_cent_%"]
print(kpi_df[display_cols].round(2).to_string(index=False))

# Display summary metrics
print(f"\n🎯 Summary Metrics:")
for key, value in summary_metrics.items():
    if isinstance(value, float):
        print(f"   {key:<25}: {value:>8.2f}")
    else:
        print(f"   {key:<25}: {value:>8}")

print("\n✅ Analysis complete!")

# Save detailed results
kpi_df.round(3).to_csv(f"{BUILDING_ID}_detailed_kpis.csv", index=False)
print(f"💾 Detailed KPIs saved to {BUILDING_ID}_detailed_kpis.csv")

✓ Successfully imported ALL agent classes
✓ MLflow tracking enabled
✓ Configuration system loaded
✓ Loaded parameters from centralized configuration
✓ run_helper loaded from scripts\01_run.py
📊 Identified device columns: ['DE_KN_residential1_dishwasher', 'DE_KN_residential1_grid_import', 'DE_KN_residential1_heat_pump', 'DE_KN_residential1_washing_machine']
🚫 Excluded columns: ['DE_KN_residential1_pv', 'DE_radiation_diffuse_horizontal', 'DE_radiation_direct_horizontal', 'DE_temperature', 'cost_with_generation', 'cost_without_generation', 'day', 'flexibility_category', 'hour', 'net_energy_usage', 'power_rating', 'price_per_kwh', 'pv_forecast', 'total_consumption', 'utc_timestamp', 'year']
✓ 4 device agents created

🗓️  Processing day: 2015-05-22T00:00:00.000000
   🔧 Running decentralised optimization...
     ⚠️  DE_KN_residential1_dishwasher has no data for 2015-05-22T00:00:00.000000, skipping...
     ⚠️  DE_KN_residential1_grid_import has no data for 2015-05-22T00:00:00.000000, skipping

In [None]:
# ──────────────────────────────────────────────────────────────
# SECTION ➋ – Load the comparison helpers & experiment config
# Why?  I want to reuse every rigor-tested helper living in
#       scripts/01_run.py (initialise_agents, select_days…)
#       so the notebook outputs are 100 % consistent with the
#       CLI pipeline.  I load that file as a module called
#       “run_helper” to dodge the digit-in-filename import quirk.
# ──────────────────────────────────────────────────────────────
import importlib.util, time
from pathlib import Path

run_path = Path(project_root) / "scripts" / "01_run.py"
if not run_path.exists():
    raise FileNotFoundError(f"Expected helper at {run_path} – double-check repo structure!")

spec = importlib.util.spec_from_file_location("run_helper", run_path)
run = importlib.util.module_from_spec(spec)
sys.modules["run_helper"] = run        # so we can `import run_helper` elsewhere if we want
spec.loader.exec_module(run)

# Reuse the config you already defined in the first cell
BUILDINGS      = [building_id]
N_DAYS         = n_days
BATTERY_MODES  = ["on", "off"]         # compare both variants
MODES          = ["decentralised", "centralised"]
USE_EV         = ev_enabled

print("✓ Helper module loaded, experiment parameters locked in.")


✓ Successfully imported ALL agent classes
✓ MLflow tracking enabled
✓ Configuration system loaded
✓ Loaded parameters from centralized configuration
✓ run_helper loaded from scripts\01_run.py


In [28]:
# ──────────────────────────────────────────────────────────────
# SECTION ➋-extra – Auto-discover *all* buildings in DuckDB
# ──────────────────────────────────────────────────────────────
# Uses the live connection `con` you created earlier.
tables = [row[0] for row in con.execute("SHOW TABLES").fetchall()]
BUILDINGS = [
    t.removesuffix("_processed_data")      # Python ≥3.9
    for t in tables
    if t.endswith("_processed_data")
]

if not BUILDINGS:
    raise RuntimeError(
        "No tables matching '*_processed_data' were found – check your DB."
    )

# (Re)-apply any global experiment knobs you want here
N_DAYS        = 3          # or bump up once everything runs smoothly
BATTERY_MODES = ["on", "off"]
USE_EV        = False      # flip if you have EV profiles for all sites

print("✓ Discovered buildings:", ", ".join(BUILDINGS))
print(f"Running each for {N_DAYS} day(s) × battery modes: {BATTERY_MODES}")



✓ Discovered buildings: DE_KN_industrial3, DE_KN_residential1, DE_KN_residential2, DE_KN_residential3, DE_KN_residential4, DE_KN_residential5, DE_KN_residential6
Running each for 3 day(s) × battery modes: ['on', 'off']


In [40]:
import duckdb, textwrap
from pathlib import Path

con = common.get_con()                     # the global connection

for bid in BUILDINGS:
    view = f"{bid}_processed_data"
    pq   = Path("data") / f"{bid}_processed_data.parquet"
    stmt = textwrap.dedent(f"""
        CREATE OR REPLACE VIEW {view} AS
        SELECT * FROM '{pq.as_posix()}';
    """)
    con.execute(stmt)

print("✅  All building views (re)created in DuckDB.")


✅  All building views (re)created in DuckDB.


In [45]:
BUILDINGS.pop(0)

'DE_KN_industrial3'

In [46]:
BUILDINGS

['DE_KN_residential1',
 'DE_KN_residential2',
 'DE_KN_residential3',
 'DE_KN_residential4',
 'DE_KN_residential5',
 'DE_KN_residential6']

In [47]:
# ──────────────────────────────────────────────────────────────
# SECTION ➌ – Helper utilities (full definitions)
# Why?  I need bullet-proof helpers that (1) respect the W→kWh
#       unit conversion, (2) dodge any stray strings in the DB,
#       and (3) cope gracefully when a PV column is absent.
# ──────────────────────────────────────────────────────────────
import numpy as np
import pandas as pd

def net_load_kwh(day_df: pd.DataFrame) -> np.ndarray:
    """
    Return the building’s net load per hour as **kWh**.
    All device columns in our DuckDB view are in W; dividing by
    1 000 yields kW, and because each row spans exactly one hour,
    the result doubles as kWh.
    """
    load_df = day_df.drop(
        columns=["utc_timestamp", "price_per_kwh", "day", "hour"],
        errors="ignore"
    )
    # force every cell to numeric; bad parses → NaN → 0.0
    load_df = load_df.apply(pd.to_numeric, errors="coerce").fillna(0.0)
    return load_df.sum(axis=1).values / 1_000.0          # kWh

def baseline_cost_euro(day_df: pd.DataFrame) -> float:
    """
    Energy bill (€) if we do **nothing**: pay spot price for net
    load every hour.
    """
    price = pd.to_numeric(day_df["price_per_kwh"], errors="coerce").fillna(0.0).values
    return float(np.sum(net_load_kwh(day_df) * price))

def pv_utilisation(day_df: pd.DataFrame, net_load: np.ndarray) -> float:
    """
    Fraction of on-site PV that is self-consumed.

    • Accepts any column whose name contains 'pv' (case-insensitive).
    • Returns 0.0 when the building has no PV or PV generation is 0.
    """
    pv_cols = [c for c in day_df.columns if "pv" in c.lower()]
    if not pv_cols:
        return 0.0

    pv_df = day_df[pv_cols].apply(pd.to_numeric, errors="coerce").fillna(0.0)
    pv_gen = -pv_df.sum(axis=1).values            # W → +W generation
    if pv_gen.sum() <= 0.0:
        return 0.0

    pv_used = np.minimum(net_load, pv_gen).clip(min=0.0)
    return float(pv_used.sum() / pv_gen.sum())


In [48]:
# ──────────────────────────────────────────────────────────────
# SECTION ➍ – Main KPI loop (create view per building)
# ──────────────────────────────────────────────────────────────
import time, numpy as np, pandas as pd
from IPython.display import display
from duckdb import IOException

results = []

for bld in BUILDINGS:

    # 0️⃣  Create/refresh the DuckDB view for this building
    try:
        con_bld, view_name = run.setup_duckdb_connection(bld)  # <<< NEW
    except IOException as err:
        print(f"⚠️  {bld}: data folder missing – skipped ({err})")
        continue

    # 1️⃣  Collect plenty of candidate days, then keep the first N with >0 kWh
    try:
        all_days = run.select_days_from_duckdb(con_bld, view_name, N_DAYS * 3)
    except Exception as err:
        print(f"⚠️  {bld}: could not list days – skipped ({err})")
        continue

    valid_days = []
    for d in all_days:
        df, _ = run.get_day_data_from_duckdb(con_bld, view_name, d)
        if net_load_kwh(df).sum() > 0.0:
            valid_days.append(d)
        if len(valid_days) == N_DAYS:
            break
    if not valid_days:
        print(f"⚠️  {bld}: no non-empty days – skipped")
        continue

    # 2️⃣  Loop over battery modes
    for batt_mode in BATTERY_MODES:
        batt_on = batt_mode == "on"
        battery_agent, ev_agent, pv_agent, grid_agent, weather_agent = run.initialize_agents(
            bld, con_bld, view_name, batt_on, USE_EV
        )

        tot_base = tot_dec = tot_cent = 0.0
        pv_base_acc = pv_dec_acc = pv_cent_acc = 0.0
        solver_time = 0.0
        cent_failed = 0
        valid_cent  = 0

        for day in valid_days:
            day_df, prices = run.get_day_data_from_duckdb(con_bld, view_name, day)
            devices = run.create_devices_from_duckdb(
                con_bld, view_name, bld, day, battery_agent, ev_agent
            )

            # Baseline
            base_load = net_load_kwh(day_df)
            tot_base += baseline_cost_euro(day_df)
            pv_base_acc += pv_utilisation(day_df, base_load)

            # Decentralised
            tic = time.perf_counter()
            dec_cost, dec_devs = run.run_decentralized_optimization(
                devices.copy(), day_df,
                battery_agent, ev_agent, pv_agent, grid_agent, prices
            )
            solver_time += time.perf_counter() - tic
            tot_dec += dec_cost
            load_dec = sum(np.asarray(getattr(d, "decentralized_optimized_schedule"))
                           for d in dec_devs)
            pv_dec_acc += pv_utilisation(day_df, load_dec)

            # Centralised (robust)
            try:
                tic = time.perf_counter()
                cent_cost, cent_devs = run.run_centralized_optimization(
                    devices, day_df,
                    battery_agent, ev_agent, pv_agent, grid_agent, weather_agent
                )
                solver_time += time.perf_counter() - tic
                tot_cent += cent_cost
                load_cent = sum(np.asarray(getattr(d, "centralized_optimized_schedule"))
                                 for d in cent_devs)
                pv_cent_acc += pv_utilisation(day_df, load_cent)
                valid_cent += 1
            except RuntimeError as err:
                cent_failed += 1
                print(f"⚠️  {bld} {day}: centralised MILP failed – {err}")

        n = len(valid_days)
        row = {
            "building":             bld,
            "battery":              batt_mode,
            "baseline_cost_€":      tot_base / n,
            "decentralised_cost_€": tot_dec  / n,
            "centralised_cost_€":   (tot_cent / valid_cent) if valid_cent else np.nan,
            "savings_dec_%":        100 * (tot_base - tot_dec)  / tot_base,
            "savings_cent_%":       100 * (tot_base - tot_cent) / tot_base if valid_cent else np.nan,
            "pv_util_base_%":       100 * pv_base_acc / n,
            "pv_util_dec_%":        100 * pv_dec_acc  / n,
            "pv_util_cent_%":       100 * pv_cent_acc / valid_cent if valid_cent else np.nan,
            "centralised_failures": cent_failed,
            "solver_time_s":        solver_time
        }
        results.append(row)

# 3️⃣  Final KPI table
kpi_tbl = (
    pd.DataFrame(results)
      .round(4)
      .sort_values(["building", "battery"])
      .reset_index(drop=True)
)
display(kpi_tbl)

📊 Setting up DuckDB connection for DE_KN_residential1...
✓ Connected to DuckDB: 15,872 rows, 19 columns
✓ Date range: 2015-05-21 00:00:00 to 2017-03-13 00:00:00
✓ All data remains in DuckDB - no unnecessary DataFrame loading
📅 Selecting 9 days using DuckDB queries...
✓ Selected 9 complete days from DuckDB
🤖 Initializing ALL agents with DuckDB...
✓ Initialized BatteryAgent: 10.0kWh capacity
✓ Initialized PVAgent with 1 PV columns and 1 forecast columns
✓ PVAgent connected to DuckDB for real-time queries
✓ Initialized GridAgent
✓ Initialized WeatherAgent with DuckDB
✓ Created 4 FlexibleDevice agents from DuckDB
  Running decentralized optimization...
    Device DE_KN_residential1_dishwasher: €0.0000
    Device DE_KN_residential1_freezer: €0.0000
    Device DE_KN_residential1_heat_pump: €0.2824
    Device DE_KN_residential1_washing_machine: €0.0000
    Battery SOC maintained: 7.00 kWh
  Decentralized total cost: €0.2824
  Running centralized optimization...
STARTING CENTRALIZED OPTIMIZATI

Unnamed: 0,building,battery,baseline_cost_€,decentralised_cost_€,centralised_cost_€,savings_dec_%,savings_cent_%,pv_util_base_%,pv_util_dec_%,pv_util_cent_%,centralised_failures,solver_time_s
0,DE_KN_residential1,off,4.0944,0.1471,0.0126,96.4061,99.6922,0.0,0.0,0.0,0,1.1591
1,DE_KN_residential1,on,4.0944,0.1471,-0.427,96.4061,110.4302,0.0,0.0,0.0,0,3.6065
2,DE_KN_residential2,off,8.693,0.0691,0.0803,99.205,99.0758,0.0,0.0,0.0,0,1.2658
3,DE_KN_residential2,on,8.693,0.0812,-0.2679,99.0659,103.0816,0.0,0.0,0.0,0,9.1906
4,DE_KN_residential3,off,2.8895,0.2914,0.2914,89.9159,89.9159,0.0,0.0,0.0,0,1.6295
5,DE_KN_residential3,on,2.8895,0.2914,0.0655,89.9159,97.7321,0.0,0.0,0.0,0,13.4166
6,DE_KN_residential4,off,6.4973,0.0481,0.0505,99.26,99.2229,0.0,0.0,0.0,0,1.8189
7,DE_KN_residential4,on,6.4973,0.052,-0.2551,99.1998,103.9269,0.0,0.0,0.0,0,10.6393
8,DE_KN_residential5,off,3.7915,0.1001,0.0997,97.3605,97.3698,0.0,0.0,0.0,0,1.1133
9,DE_KN_residential5,on,3.7915,0.1215,-0.2611,96.796,106.8878,0.0,0.0,0.0,0,10.2737


In [37]:
# ──────────────────────────────────────────────────────────────
# QUICK DIAGNOSTIC – do my parquet files actually exist & hold data?
# Run this cell while your CWD is  …/ems-optimization-pipeline/notebooks/
# ──────────────────────────────────────────────────────────────
from pathlib import Path
import pandas as pd

data_dir = Path("data")
rows = []

for pq_file in sorted(data_dir.glob("*_processed_data.parquet")):
    bid = pq_file.name.replace("_processed_data.parquet", "")
    rows.append({
        "building_id": bid,
        "repr(bid)":   repr(bid),           # reveals hidden \n, \t, spaces…
        "exists":      pq_file.exists(),
        "size_B":      pq_file.stat().st_size if pq_file.exists() else None,
    })

probe_tbl = pd.DataFrame(rows).sort_values("building_id").reset_index(drop=True)
display(probe_tbl)


Unnamed: 0,building_id,repr(bid),exists,size_B
0,DE_KN_industrial3,'DE_KN_industrial3',True,1446093
1,DE_KN_residential1,'DE_KN_residential1',True,1035737
2,DE_KN_residential2,'DE_KN_residential2',True,1119168
3,DE_KN_residential3,'DE_KN_residential3',True,1726998
4,DE_KN_residential4,'DE_KN_residential4',True,1187468
5,DE_KN_residential5,'DE_KN_residential5',True,990867
6,DE_KN_residential6,'DE_KN_residential6',True,1042138


In [38]:
from pathlib import Path
BUILDINGS = [
    p.name.replace("_processed_data.parquet", "").strip()
    for p in Path("data").glob("*_processed_data.parquet")
]
print("✓ BUILDINGS =", BUILDINGS)


✓ BUILDINGS = ['DE_KN_industrial3', 'DE_KN_residential1', 'DE_KN_residential2', 'DE_KN_residential3', 'DE_KN_residential4', 'DE_KN_residential5', 'DE_KN_residential6']
