We want to create a database of N powergrids based on either IEEE 14, 118 or rteXX. Using these already existing networks we add small perturbation on the power P and reactive power Q (Normal, Uniform, indicator ...) on buses that are generators or loads. The goal is to then create a data base of N powergrids stored as npy files. 

- We want to parallelize on cpu
- We want to incorporate non constant power


In [3]:
#python 3.10
#!pip install 'pandapower[all]'
#!pip install matplotlib
#!pip install numba

In [None]:
import pandapower as pp
import numpy as np
from tqdm import tqdm
import copy
import multiprocessing as mp
import os
from functools import partial
import time
import math

os.getcwd()


'/home/pikoglu/Luxembourg/PowerFlow'

In [None]:

def generate_single_grid(base_case, sample_id, rng_seed, perturbation_factor=0.1):

    local_rng = np.random.default_rng(rng_seed + sample_id)
    
    net_sample = copy.deepcopy(base_case)
    
    # generator perturbation (only on active power)
    if len(net_sample.gen) > 0:
        net_sample.gen["p_mw"] = local_rng.normal(
            net_sample.gen["p_mw"].values,
            perturbation_factor * np.abs(net_sample.gen["p_mw"].values)
        )
    
    # load perturbation (active and reactive power)
    if len(net_sample.load) > 0:
        net_sample.load["p_mw"] = local_rng.normal(
            net_sample.load["p_mw"].values,
            perturbation_factor * np.abs(net_sample.load["p_mw"].values)
        )
        net_sample.load["q_mvar"] = local_rng.normal(
            net_sample.load["q_mvar"].values,
            perturbation_factor * np.abs(net_sample.load["q_mvar"].values)
        )
    
    # solving power flow using the Newton-Raphson method and checking convergence
    try:
        pp.runpp(net_sample)
        if net_sample["converged"]:
            
            bus_data = net_sample.res_bus[["vm_pu", "p_mw", "q_mvar"]].values
            line_data = net_sample.res_line[["p_from_mw", "q_from_mvar", "p_to_mw", "q_to_mvar"]].values
            return {
                "sample_id": sample_id,
                "converged": True,
                "bus_data": bus_data,
                "line_data": line_data
            }
        else:
            print(f"Sample {sample_id}: Power flow did not converge.")
            return {"sample_id": sample_id, "converged": False}
    except pp.LoadflowNotConverged:
        print(f"Sample {sample_id}: LoadflowNotConverged exception.")
        return {"sample_id": sample_id, "converged": False}
    except Exception as e:
        print(f"Sample {sample_id}: Unexpected error: {str(e)}")
        return {"sample_id": sample_id, "converged": False}

In [None]:
def save_result(result, case_name, output_dir="data"):
    """Save the results of a single simulation if it converged"""
    if not result["converged"]:
        return False
    
    sample_id = result["sample_id"]

    os.makedirs(output_dir, exist_ok=True)
    
    np.save(f"{output_dir}/case{case_name}_sample{sample_id}_bus.npy", result["bus_data"])
    np.save(f"{output_dir}/case{case_name}_sample{sample_id}_line.npy", result["line_data"])
    
    return True

In [None]:
def generate_powergrids_parallel(base_case, n_samples, seed, perturbation_factor=0.1, n_processes=None):
    
    # max cores usage
    if n_processes is None:
        n_processes = mp.cpu_count()
    
    
    # partial function with fixed arguments to pass the base_case and perturbation_factor to the worker function
    worker_func = partial(generate_single_grid, base_case, perturbation_factor=perturbation_factor, rng_seed=seed)
    
    sample_ids = list(range(n_samples))
    
    # run  in parallel
    with mp.Pool(processes=n_processes) as pool:
        results = list(tqdm(pool.imap(worker_func, sample_ids), total=n_samples))
    
    return results


In [None]:
def main():
    # Select base case
    case_name = "6470rte"  # or "118" or your rteXX case
    
    if case_name == "14":
        base_net = pp.networks.case14()
    elif case_name == "118":
        base_net = pp.networks.case118()
    elif case_name == "6470rte":
        base_net = pp.networks.case6470rte()
    else:
        raise ValueError(f"Unsupported case name: {case_name}")
    
    seed = 42
    n_samples = 1000
    output_dir = "data"
    perturbation_factor = 0.1
    
    
    results = generate_powergrids_parallel(
        base_net, 
        n_samples, 
        seed,
        perturbation_factor=perturbation_factor
    )
    
    successful_saves = 0
    for result in results:
        if save_result(result, case_name, output_dir):
            successful_saves += 1
    


In [9]:
if __name__ == "__main__":
    t_start = time.time()
    main()
    t_end = time.time()
    print(f"Total execution time for the script: {t_end - t_start:.2f} seconds")

Starting parallel generation with 16 processes for 1000 samples


100%|██████████| 1000/1000 [00:37<00:00, 26.50it/s]


Generated and saved 1000 out of 1000 samples.
Total execution time: 38.25 seconds


Adding solar panel

In [None]:
def solar_power_function(time_hour, max_power, random_factor=0.2):
    """
    simulate solar power output based on time of day (0-23 hours)
    return  power in MW with some randomness !!!
    """
    # the solar power curve is  zero at night, peak at noon
    if 6 <= time_hour <= 18:  # there is sunlight 

        # sine curve approcimation for solar power output
        base_output = max_power * math.sin(math.pi * (time_hour - 6) / 12)

        # randomness for cloud and things like that 
        randomness = np.random.uniform(1.0 - random_factor, 1.0 + random_factor)
        return max(0, base_output * randomness)  # Ensure non-negative
    else:
        return 0.0  # No solar power at night

In [None]:
def generate_single_grid_with_solar(base_case, sample_id, time_hour, rng_seed, 
                                   perturbation_factor=0.1, solar_penetration=0.5, 
                                   max_solar_power=2.0):
    """Generate a single perturbed powergrid with solar generation at a specific time"""
    local_rng = np.random.default_rng(rng_seed + sample_id)
    
    net_sample = copy.deepcopy(base_case)
    
    if len(net_sample.gen) > 0:
        net_sample.gen["p_mw"] = local_rng.normal(
            net_sample.gen["p_mw"].values,
            perturbation_factor * np.abs(net_sample.gen["p_mw"].values)
        )
    
    if len(net_sample.load) > 0:
        net_sample.load["p_mw"] = local_rng.normal(
            net_sample.load["p_mw"].values,
            perturbation_factor * np.abs(net_sample.load["p_mw"].values)
        )
        net_sample.load["q_mvar"] = local_rng.normal(
            net_sample.load["q_mvar"].values,
            perturbation_factor * np.abs(net_sample.load["q_mvar"].values)
        )
    
    num_buses = len(net_sample.bus)
    
    # decide which buses will have solar panels (randomly select a percentage of buses)
    num_solar_buses = max(1, int(num_buses * solar_penetration))
    solar_buses = local_rng.choice(net_sample.bus.index, size=num_solar_buses, replace=False)
    
    # calculate the solar power output for each selected bus
    for bus in solar_buses:
        #random max power capacity for this solar installation
        solar_capacity = local_rng.uniform(0.5, max_solar_power)
        
        # power output based on time of day
        p_solar = solar_power_function(time_hour, solar_capacity, random_factor=0.2)
        
        # static generator representing solar panel
        pp.create_sgen(
            net_sample, 
            bus=bus, 
            p_mw=p_solar, 
            q_mvar=0,  # Typically solar inverters operate at unity power factor
            name=f"Solar_{bus}", 
            type="PV"
        )
    
    # Run power flow
    try:
        pp.runpp(net_sample)
        if net_sample["converged"]:
            # Extract and return the data we want to save
            bus_data = net_sample.res_bus[["vm_pu", "p_mw", "q_mvar"]].values
            line_data = net_sample.res_line[["p_from_mw", "q_from_mvar", "p_to_mw", "q_to_mvar"]].values
            
            # Extract solar generation data
            solar_data = None
            if len(net_sample.sgen) > 0:
                solar_data = {
                    "bus": net_sample.sgen["bus"].values,
                    "p_mw": net_sample.sgen["p_mw"].values,
                    "q_mvar": net_sample.sgen["q_mvar"].values
                }
            
            return {
                "sample_id": sample_id,
                "time_hour": time_hour,
                "converged": True,
                "bus_data": bus_data,
                "line_data": line_data,
                "solar_data": solar_data
            }
        else:
            print(f"Sample {sample_id}, Time {time_hour}h: Power flow did not converge.")
            return {"sample_id": sample_id, "time_hour": time_hour, "converged": False}
    except pp.LoadflowNotConverged:
        print(f"Sample {sample_id}, Time {time_hour}h: LoadflowNotConverged exception.")
        return {"sample_id": sample_id, "time_hour": time_hour, "converged": False}
    except Exception as e:
        print(f"Sample {sample_id}, Time {time_hour}h: Unexpected error: {str(e)}")
        return {"sample_id": sample_id, "time_hour": time_hour, "converged": False} 

In [None]:
def save_result_with_time(result, case_name, output_dir="data"):
    """Save the results of a single simulation with time if it converged"""
    if not result["converged"]:
        return False
    
    sample_id = result["sample_id"]
    time_hour = result["time_hour"]
    
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Save bus and line data with time indicator
    np.save(f"{output_dir}/case{case_name}_sample{sample_id}_bus_T_{time_hour:02d}.npy", result["bus_data"])
    np.save(f"{output_dir}/case{case_name}_sample{sample_id}_line_T_{time_hour:02d}.npy", result["line_data"])
    
    # Save solar data if available
    if result["solar_data"] is not None:
        np.savez(f"{output_dir}/case{case_name}_sample{sample_id}_solar_T_{time_hour:02d}.npz", 
                bus=result["solar_data"]["bus"], 
                p_mw=result["solar_data"]["p_mw"], 
                q_mvar=result["solar_data"]["q_mvar"])
    
    return True