# Power Flow Neural Network Training with Pandapower

In [25]:
import pandapower as pp
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split, TensorDataset
from torch.utils.tensorboard import SummaryWriter # for pytorch visualization

from sklearn.preprocessing import StandardScaler # normalize input features and target values
from sklearn.model_selection import ParameterGrid # for hyperparameter tuning

import pickle
import pandas as pd

### 1. Import Data Set

In [26]:
data = np.load('./vector_data_solution_prediction.npy')
print(data.shape)

(1000, 23)


### 2. Create Deep Neural Network

In [27]:
class DeepNN(nn.Module):
    def __init__(self, input_size, hidden_layers, output_size):
        super(DeepNN, self).__init__()
        layers = []
        prev_size = input_size
        for hidden_size in hidden_layers:
            layers.append(nn.Linear(prev_size, hidden_size))
            layers.append(nn.ReLU())
            prev_size = hidden_size
        layers.append(nn.Linear(prev_size, output_size))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)

### 3. Training Pipeline

In [29]:
# Training setup
EPOCHS = 1000
BATCH_SIZE = 32

# add dataset here
features = torch.tensor(data[:,:12], dtype=torch.float32)
labels = torch.tensor(data[:,16:20], dtype=torch.float32)
dataset = TensorDataset(features, labels)

# dataloaders
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

# model, loss, optimizer
input_size = len(dataset[0][0])
output_size = len(dataset[0][1])
model = DeepNN(input_size=input_size, hidden_layers=[64,64], output_size=output_size)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# training loop
writer = SummaryWriter()
best_val_loss = np.inf

for epoch in range(EPOCHS):
    model.train()
    for X,Y in train_loader:
        outputs = model(X)
        loss = criterion(outputs, Y)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    model.eval()
    val_loss = 0
    with torch.no_grad():
        for X,Y in val_loader:
            outputs = model(X)
            loss = criterion(outputs, Y)
            val_loss += loss.item()
    val_loss /= len(val_loader)

    writer.add_scalar('Loss/train', loss.item(), epoch)
    writer.add_scalar('Loss/val', val_loss, epoch)

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), './saved_models/best_model.pth')
        print(f"------------- BEST MODEL - Epoch {epoch} - Val Loss {val_loss:.6f} -------------")

    if epoch % 10 == 0:
        print(f"Epoch {epoch:03}, Loss: {loss.item():.6f}, Val Loss: {val_loss:.6f}")

writer.close()

------------- BEST MODEL - Epoch 0 - Val Loss 0.215076 -------------
Epoch 000, Loss: 0.253147, Val Loss: 0.215076
------------- BEST MODEL - Epoch 1 - Val Loss 0.098882 -------------
------------- BEST MODEL - Epoch 2 - Val Loss 0.086900 -------------
------------- BEST MODEL - Epoch 3 - Val Loss 0.082290 -------------
------------- BEST MODEL - Epoch 4 - Val Loss 0.077584 -------------
------------- BEST MODEL - Epoch 5 - Val Loss 0.071921 -------------
------------- BEST MODEL - Epoch 6 - Val Loss 0.066185 -------------
------------- BEST MODEL - Epoch 7 - Val Loss 0.061007 -------------
------------- BEST MODEL - Epoch 8 - Val Loss 0.057656 -------------
------------- BEST MODEL - Epoch 9 - Val Loss 0.048979 -------------
------------- BEST MODEL - Epoch 10 - Val Loss 0.044682 -------------
Epoch 010, Loss: 0.054665, Val Loss: 0.044682
------------- BEST MODEL - Epoch 11 - Val Loss 0.039062 -------------
------------- BEST MODEL - Epoch 12 - Val Loss 0.032405 -------------
--------

### 4. Usage Example

In [30]:
class SimpleTwoBus:
    def __init__(self, V_ext, P, Q, G, B, V_init, theta_init):
        '''This class creates a simple 2-bus network.'''
        self.V_ext = V_ext
        self.P = P
        self.Q = Q
        self.G = G
        self.B = B
        self.V_init = V_init
        self.theta_init = theta_init
        self.net = pp.create_empty_network()
        self.create_two_bus_grid()

    def create_two_bus_grid(self):
        # Create two buses with initialized voltage and angle
        bus1 = pp.create_bus(self.net, vn_kv=20.0, name="Bus 1")
        bus2 = pp.create_bus(self.net, vn_kv=0.4, name="Bus 2")
    
        # Initialize voltage and angle for buses
        self.net.bus.loc[bus1, 'vm_pu'] = self.V_init[0]
        self.net.bus.loc[bus1, 'va_degree'] = self.theta_init[0]
        self.net.bus.loc[bus2, 'vm_pu'] = self.V_init[1]
        self.net.bus.loc[bus2, 'va_degree'] = self.theta_init[1]
    
        # create a line between the two buses
        pp.create_line_from_parameters(
            self.net,
            from_bus=0,
            to_bus=1,
            length_km=1.0,
            r_ohm_per_km=1/self.G,
            x_ohm_per_km=1/self.B,
            c_nf_per_km=0.0,
            g_us_per_km=0.0,
            max_i_ka=100.0,
        )

        # Create a transformer between the two buses
        # pp.create_transformer(self.net, bus1, bus2, std_type="0.25 MVA 20/0.4 kV")
    
        # Create a load at bus 2 with specified P and Q
        pp.create_load(self.net, bus2, p_mw=self.P, q_mvar=self.Q, name="Load")
    
        # Create an external grid connection at bus 1 with specified G and B
        pp.create_ext_grid(self.net, bus1, vm_pu=self.V_ext, name="Grid Connection")

In [48]:
VM_PU_RANGE = [0.9, 1.1]  #
P_MW_RANGE = [0.0, 0.2]  #
Q_MVAR_RANGE = [0.0, 0.1]  #
G_RANGE = [80, 120]  #
B_RANGE = [0.01, 0.2]  #
INIT_VM_PU_MIN = 0.9  #
INIT_VM_PU_MAX = 1.1  #
INIT_THETA_MAX = -1
INIT_THETA_MIN = 1

# net = pp.networks.example_simple()
V_ext = np.random.uniform(VM_PU_RANGE[0], VM_PU_RANGE[1])
P = np.random.uniform(P_MW_RANGE[0], P_MW_RANGE[1])
Q = np.random.uniform(Q_MVAR_RANGE[0], Q_MVAR_RANGE[1])
G = np.random.uniform(G_RANGE[0], G_RANGE[1])  # Short-circuit power in MVA
B = np.random.uniform(B_RANGE[0], B_RANGE[1])  # Short-circuit impedance

V_init = [
    np.random.uniform(INIT_VM_PU_MIN, INIT_VM_PU_MAX),
    np.random.uniform(INIT_VM_PU_MIN, INIT_VM_PU_MAX),
]
theta_init = [
    np.random.uniform(INIT_THETA_MIN, INIT_THETA_MAX),
    np.random.uniform(INIT_THETA_MIN, INIT_THETA_MAX),
]

Net = SimpleTwoBus(V_ext,P,Q,G,B,V_init,theta_init)
net = Net.net
pp.runpp(net, max_iteration=1, tolerance_mva=np.inf)

# Prepare input
Ybus = net._ppc["internal"]["Ybus"].toarray()
S = net._ppc["internal"]["Sbus"]
input_tensor = torch.FloatTensor(np.concatenate([
    S.real, 
    S.imag,
    Ybus.real.flatten(), 
    Ybus.imag.flatten(),
]))


# Load the best model
best_model = DeepNN(input_size=input_size, hidden_layers=[64,64], output_size=output_size)
best_model.load_state_dict(torch.load('./saved_models/best_model.pth'))
best_model.eval()

# Get prediction
with torch.no_grad():
    output = model(input_tensor)

# Split prediction into voltage magnitudes and angles
n_buses = len(net.bus)
V_mag_pred = output[:n_buses].numpy()
V_ang_pred = output[n_buses:].numpy()

# Run power flow to ensure internal data is available
# pp.runpp(net, calculate_voltage_angles=True)
pp.runpp(net,
         init="auto",
         init_vm_pu=V_mag_pred,
         init_va_degree=V_ang_pred,
         max_iteration=50,
         tolerance_mva=1e-5)

# Get reference values from the network
V_mag_ref = net.res_bus.vm_pu.values
V_ang_ref = net.res_bus.va_degree.values

print("Predicted voltage magnitudes:", V_mag_pred)
print("Predicted voltage angles:", V_ang_pred)
print("Reference voltage magnitudes:", V_mag_ref)
print("Reference voltage angles:", V_ang_ref)
print("Iterations:", net._ppc["iterations"])

  best_model.load_state_dict(torch.load('./saved_models/best_model.pth'))


Predicted voltage magnitudes: [1.0298587 1.0330721]
Predicted voltage angles: [-0.00131004 -0.07823545]
Reference voltage magnitudes: [0.92181296 0.92167052]
Reference voltage angles: [ 0.         -0.10363299]
Iterations: 3


### 6. Additional


In [32]:
# def evaluate_model(model, val_loader, criterion):
#     model.eval()
#     val_loss = 0
#     with torch.no_grad():
#         for batch in val_loader:
#             batch_inputs = batch['input']
#             batch_targets = batch['output']
#             outputs = model(batch_inputs)
#             loss = criterion(outputs, batch_targets)
#             val_loss += loss.item()
#     val_loss /= len(val_loader)
#     return val_loss

# def hyperparameter_tuning(base_network, param_grid):
#     best_model = None
#     best_loss = float('inf')
#     dataset = PowerFlowDataset(base_network)
#     train_size = int(0.8 * len(dataset))
#     val_size = len(dataset) - train_size
#     _, val_dataset = random_split(dataset, [train_size, val_size])
#     val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

#     for params in ParameterGrid(param_grid):
#         print(f"Training with parameters: {params}")
#         model = train_power_flow_model(base_network, num_epochs=params['num_epochs'], batch_size=params['batch_size'])
#         val_loss = evaluate_model(model, val_loader, nn.MSELoss())
#         if val_loss < best_loss:
#             best_loss = val_loss
#             best_model = model
#     return best_model

# base_network = pp.networks.example_simple()
# param_grid = {
#     'num_epochs': [50, 100],
#     'batch_size': [16, 32],
#     'hidden_size': [256, 512]
# }
# best_model = hyperparameter_tuning(base_network, param_grid)
# # ...