In [None]:
import pandas as pd
import numpy as np
from scipy.optimize import minimize
from scipy.io import savemat
import datetime



Please confirm/update LOAD_DEFINITIONS in the script with your specific load details, especially for Golf Carts and Security Lights.
Optimizing for 8 loads over 174 timesteps...
Total variables: 1392
Optimization failed: STOP: TOTAL NO. of f AND g EVALUATIONS EXCEEDS LIMIT
Results saved to simulated_load_profiles.csv
Results saved to simulated_load_profiles.mat

--- Next Steps ---
1. Review the generated CSV and MAT files.
2. Import the .mat file into MATLAB/Simulink. Each load profile can be used as a time-varying input signal (0 to 1) multiplying its rated power.
3. Compare the 'simulated_total_kW' (sum of optimized_profiles * power_kW for each load) against your original 'measured_power'. Plot them to see the fit.
4. If the fit is not good, or if load profiles seem unrealistic:
   - Adjust the 'DAILY_HOURS_PENALTY'.
   - Re-verify load power ratings and estimated daily hours/windows.
   - Consider if any loads have interdependencies not yet captured.
   - The optimization problem (e

In [None]:
# --- 1. Configuration & Data Loading ---
def load_and_prepare_data(csv_path, start_time_str, end_time_str):
    df = pd.read_csv(csv_path)
    df['X-Data'] = pd.to_datetime(df['X-Data'])
    
    # Handle potential comma decimal in the relevant column
    # Assuming the column name is exactly as in the sample
    # Adjust if your full CSV has a slightly different name for total demand
    if df['TOTDemand(kWh/h)Value[kWh/h]'].dtype == 'object':
        df['TOT_kW'] = df['TOTDemand(kWh/h)Value[kWh/h]'].str.replace(',', '.', regex=False).astype(float)
    else:
        df['TOT_kW'] = df['TOTDemand(kWh/h)Value[kWh/h]'].astype(float)

    # Filter by the specified date range
    start_time = pd.to_datetime(start_time_str)
    end_time = pd.to_datetime(end_time_str)
    df = df[(df['X-Data'] >= start_time) & (df['X-Data'] <= end_time)].reset_index(drop=True)
    
    if df.empty:
        raise ValueError("No data found in the specified time range. Check your CSV and time range.")
        
    measured_power = df['TOT_kW'].values
    time_index = df['X-Data']
    return measured_power, time_index, df

# Define loads (pending your clarifications for Golf Cart, Security Lights)
# Structure: {'name': str, 'power_kW': float, 'estimated_daily_hours': float, 
#             'window_start_h': float/None, 'window_end_h': float/None, 'always_on': bool}
# Example for window_start_h/window_end_h: 8.0 for 8:00, 17.0 for 17:00
# 'always_on': True if it runs 24/7 (overrides window_start/end and daily_hours for penalty)

# Placeholder - update with your final values
LOAD_DEFINITIONS = [
    {'name': 'Houses_Total', 'power_kW': 3.0, 'estimated_daily_hours': 24, 'always_on': True},
    {'name': 'Borehole_Pump1', 'power_kW': 1.25, 'estimated_daily_hours': 5, 'always_on': False},
    {'name': 'Borehole_Pump2', 'power_kW': 2.0, 'estimated_daily_hours': 6, 'always_on': False},
    {'name': 'Security_Fence', 'power_kW': 5.0, 'estimated_daily_hours': 24, 'always_on': True},
    {'name': 'Cold_Room', 'power_kW': 2.5, 'estimated_daily_hours': 10, 'always_on': False},
    {'name': 'Cruncher_Mixers_Block', 'power_kW': 45.0, 'estimated_daily_hours': 6, 'always_on': False, 'window_start_h': 8.0, 'window_end_h': 17.0},
    # TODO: Add Security_Lights with correct window/hours
    {'name': 'Security_Lights_Total', 'power_kW': 3.0, 'estimated_daily_hours': 12, 'always_on': False, 'window_start_h': 18.0, 'window_end_h': 6.0}, # Example: 6 PM to 6 AM
    {'name': 'Electrical_Tools', 'power_kW': 1.5, 'estimated_daily_hours': 8, 'always_on': False, 'window_start_h': 8.0, 'window_end_h': 17.0},
    # TODO: Add Golf_Carts_Charging_Total with correct power, hours, window
    # {'name': 'Golf_Carts_Total', 'power_kW': 7.68, 'estimated_daily_hours': 4, 'always_on': False, 'window_start_h': 20.0, 'window_end_h': 4.0}, # Example: 8 PM to 4 AM
]
# Filter out placeholder loads if not ready
LOAD_DEFINITIONS = [ld for ld in LOAD_DEFINITIONS if 'TODO' not in ld['name']]




In [None]:
# --- 2. Optimization Setup ---
def create_optimization_vars_and_bounds(loads, time_index):
    num_loads = len(loads)
    num_timesteps = len(time_index)
    
    # Initialize bounds to (0, 1) for all variables
    bounds = [(0, 1)] * (num_loads * num_timesteps)
    
    # Pre-calculate time masks
    # mask_l_t = 1 if load l can run at time t, else 0
    # For variables outside allowed windows, set bounds to (0, 0)
    for i, load in enumerate(loads):
        if not load['always_on'] and (load.get('window_start_h') is not None and load.get('window_end_h') is not None):
            start_h = load['window_start_h']
            end_h = load['window_end_h']
            for t in range(num_timesteps):
                current_hour = time_index.iloc[t].hour + time_index.iloc[t].minute / 60.0
                # Handle overnight windows
                if start_h <= end_h: # Daytime window
                    if not (start_h <= current_hour < end_h):
                        bounds[i * num_timesteps + t] = (0, 0)
                else: # Overnight window (e.g., 18:00 to 06:00)
                    if not (current_hour >= start_h or current_hour < end_h):
                        bounds[i * num_timesteps + t] = (0, 0)
    
    # Initial guess (e.g., 0.1 for all, or smarter based on daily hours)
    X0 = np.full(num_loads * num_timesteps, 0.1)
    return X0, bounds



In [None]:
def objective_function(X_flat, loads, measured_power, time_index, daily_hours_penalty_weight):
    num_loads = len(loads)
    num_timesteps = len(measured_power)
    X = X_flat.reshape((num_loads, num_timesteps))
    
    load_powers = np.array([load['power_kW'] for load in loads]).reshape(-1, 1) # Column vector
    simulated_power_at_t = np.sum(X * load_powers, axis=0)
    
    # Primary objective: Minimize squared error
    error = np.sum((simulated_power_at_t - measured_power)**2)
    
    # Secondary objective: Penalize deviation from estimated daily hours
    penalty = 0
    time_step_duration_hours = (time_index.iloc[1] - time_index.iloc[0]).total_seconds() / 3600.0
    
    unique_days = time_index.dt.date.unique()
    
    for i, load in enumerate(loads):
        if not load['always_on'] and load['estimated_daily_hours'] > 0:
            for day in unique_days:
                day_mask = (time_index.dt.date == day)
                # Sum of fractional usage for load i on this day
                actual_hours_on_day = np.sum(X[i, day_mask] * time_step_duration_hours)
                penalty += (actual_hours_on_day - load['estimated_daily_hours'])**2
                
    return error + daily_hours_penalty_weight * penalty



In [None]:
# --- 3. Run Optimization ---
def run_optimization(loads, measured_power, time_index, daily_hours_penalty_weight=0.1):
    X0, bounds = create_optimization_vars_and_bounds(loads, time_index)
    
    print(f"Optimizing for {len(loads)} loads over {len(time_index)} timesteps...")
    print(f"Total variables: {len(X0)}")

    # Check if there's any actual work for the optimizer
    if not any(b != (0,0) for b in bounds):
        print("Warning: All variable bounds are (0,0). No optimization possible with current settings.")
        # Return a zero array or handle as an error
        return np.zeros_like(X0).reshape((len(loads), len(time_index)))


    result = minimize(
        objective_function,
        X0,
        args=(loads, measured_power, time_index, daily_hours_penalty_weight),
        method='L-BFGS-B', # Good for bound constraints
        bounds=bounds,
        options={'disp': True, 'maxiter': 200} # Adjust maxiter as needed
    )
    
    if result.success:
        X_optimized = result.x.reshape((len(loads), len(time_index)))
        return X_optimized
    else:
        print(f"Optimization failed: {result.message}")
        # You might want to return X0 or handle the failure appropriately
        return X0.reshape((len(loads), len(time_index))) # Or raise an error



In [None]:
# --- 4. Process and Export Results ---
def export_results(X_optimized, loads, time_index, base_filename="simulated_load_profiles"):
    num_loads = len(loads)
    load_names = [load['name'] for load in loads]
    
    # Create a DataFrame for CSV export
    results_df = pd.DataFrame(X_optimized.T, columns=load_names) # Transpose for loads as columns
    results_df.insert(0, 'Timestamp', time_index)
    csv_filename = f"{base_filename}.csv"
    results_df.to_csv(csv_filename, index=False)
    print(f"Results saved to {csv_filename}")
    
    # Create a dictionary for .mat export
    mat_dict = {'timestamps': time_index.astype(str).tolist()} # Convert datetime to string for .mat if needed, or use datenum
    for i in range(num_loads):
        mat_dict[load_names[i]] = X_optimized[i, :]
    # Add total simulated power for comparison
    load_powers = np.array([load['power_kW'] for load in loads]).reshape(-1, 1)
    simulated_total_power = np.sum(X_optimized * load_powers, axis=0)
    mat_dict['simulated_total_kW'] = simulated_total_power

    mat_filename = f"{base_filename}.mat"
    savemat(mat_filename, mat_dict)
    print(f"Results saved to {mat_filename}")



In [None]:
# --- Main Execution ---
if __name__ == '__main__':
    # IMPORTANT: Replace with the ACTUAL path to your full CSV data file
    # The snippet you provided earlier only has a few hours of data.
    # This script expects a CSV file covering the entire period.
    # If your data is in multiple files, they need to be combined first.
    
    # User Inputs
    CSV_FILE_PATH = r"C:\Users\hp\Desktop\REG108S\Project\MeasuredDataUpdated.csv" # <--- USER: REPLACE THIS
    DATA_START_TIME = '2025-04-30 09:30:00'
    DATA_END_TIME = '2025-04-30 23:55:00' # This is a period of over a week. Ensure your CSV has this data.
    
    # Tunable parameter for how much to penalize deviation from daily hours
    DAILY_HOURS_PENALTY = 0.1 # Adjust this based on results

    print("Please confirm/update LOAD_DEFINITIONS in the script with your specific load details, especially for Golf Carts and Security Lights.")
    
    try:
        measured_power, time_index, _ = load_and_prepare_data(CSV_FILE_PATH, DATA_START_TIME, DATA_END_TIME)
        
        # Ensure LOAD_DEFINITIONS is finalized before this step
        final_loads = [ld for ld in LOAD_DEFINITIONS if 'power_kW' in ld] # Use only fully defined loads
        if not final_loads:
            raise ValueError("LOAD_DEFINITIONS is empty or not correctly populated.")

        optimized_profiles = run_optimization(final_loads, measured_power, time_index, DAILY_HOURS_PENALTY)
        export_results(optimized_profiles, final_loads, time_index)
        
        print("\n--- Next Steps ---")
        print("1. Review the generated CSV and MAT files.")
        print("2. Import the .mat file into MATLAB/Simulink. Each load profile can be used as a time-varying input signal (0 to 1) multiplying its rated power.")
        print("3. Compare the 'simulated_total_kW' (sum of optimized_profiles * power_kW for each load) against your original 'measured_power'. Plot them to see the fit.")
        print("4. If the fit is not good, or if load profiles seem unrealistic:")
        print("   - Adjust the 'DAILY_HOURS_PENALTY'.")
        print("   - Re-verify load power ratings and estimated daily hours/windows.")
        print("   - Consider if any loads have interdependencies not yet captured.")
        print("   - The optimization problem (especially for many variables over a long period) can be complex. 'maxiter' might need adjustment.")

    except FileNotFoundError:
        print(f"ERROR: The data file '{CSV_FILE_PATH}' was not found. Please update CSV_FILE_PATH in the script.")
    except ValueError as ve:
        print(f"ERROR: {ve}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")