In [1]:
import os
import time
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import matplotlib.ticker as ticker
import seaborn as sns

from torch_geometric.loader import DataLoader
from CustomDataset import CustomDataset

%config InlineBackend.figure_format = 'png2x'
plt.rcParams['font.size'] = 12
plt.rcParams['font.family'] = 'Times New Roman'
plt.rcParams['mathtext.fontset'] = 'stix'  # 'stix' is similar to Times New Roman
# If you want to use LaTeX to render all text (for more consistency)
plt.rcParams['text.usetex'] = True

plt.rcParams['axes.linewidth'] = 0.5    # Axis line width
plt.rcParams['lines.linewidth'] = 1    # Line width
plt.rcParams['xtick.major.width'] = 0.5  # Set x major tick width to 2 points
plt.rcParams['ytick.major.width'] = 0.5
plt.rcParams['xtick.major.size'] = 3
plt.rcParams['ytick.major.size'] = 3
plt.rcParams['lines.markersize'] = 3
colors = plt.rcParams['axes.prop_cycle'].by_key()['color']

In [2]:
additional_colors = [
    '#191970',  # Midnight Blue
    (0.498, 1.0, 0.831),  # Aqua Marine
    '#228B22',  # Forest Green
    '#FF7F50',  # Coral
    (0.855, 0.647, 0.125),  # Golden Rod
    '#FF1493',  # Deep Pink
    (0.901, 0.901, 0.980),  # Lavender
    '#A0522D',  # Sienna
    (0.0, 0.501, 0.501),  # Teal
    '#DA70D6'   # Orchid
]

colors += additional_colors

In [3]:
# Number of time steps
nt = 12

# Number of testing data
test_size = 200

# Number of branches
num_branches = 186

# Power flow limit
file_path = f'../../data/IEEE_Case118/branch_params/PF_max_category1.csv'
PF_max_category1 = pd.read_csv(file_path, header=None, index_col=None).to_numpy().astype(bool).flatten()
file_path = f'../../data/IEEE_Case118/branch_params/PF_max_category2.csv'
PF_max_category2 = pd.read_csv(file_path, header=None, index_col=None).to_numpy().astype(bool).flatten()
file_path = f'../../data/IEEE_Case118/branch_params/PF_max_category3.csv'
PF_max_category3 = pd.read_csv(file_path, header=None, index_col=None).to_numpy().astype(bool).flatten()

# Set line flow threshold
epsilon = 0.95

PF_max1 = 200
PF_max2 = 300
PF_max3 = 500

PF_threshold = np.zeros((num_branches, nt))
PF_threshold[PF_max_category1, :] = epsilon * PF_max1
PF_threshold[PF_max_category2, :] = epsilon * PF_max2
PF_threshold[PF_max_category3, :] = epsilon * PF_max3

In [4]:
# Matrix to store results
branch_flow_true = []
branch_flow_pred = []

over_load_true = []
over_load_pred = []

# Read PF file
for i in range(test_size):
    # Ground truth
    file_path = f'../../res/IEEE_Case118/model_evaluation/PF/true_corr/true_corr_{i+1}.csv'
    temp = pd.read_csv(file_path, header=None, index_col=None).to_numpy()
    branch_flow_true.append(temp.copy())

    # Prediction
    file_path = f'../../res/IEEE_Case118/model_evaluation/PF/pred_corr/pred_corr_{i+1}.csv'
    temp = pd.read_csv(file_path, header=None, index_col=None).to_numpy()
    branch_flow_pred.append(temp.copy())

# Convert to numpy array
branch_flow_true = np.abs(np.array(branch_flow_true))
branch_flow_pred = np.abs(np.array(branch_flow_pred))

over_load_true = branch_flow_true >= PF_threshold
over_load_pred = branch_flow_pred >= PF_threshold

In [None]:
# Probability of heavying loading
prob_true = over_load_true.mean(axis=0)
prob_pred = over_load_pred.mean(axis=0)

# Get sorted branch index according to mean probability of ground truth
sorted_idx = np.argsort(prob_true.T.mean(axis=0))

# Select six important branches
num_top_branches = 9
top_branch_idx = sorted_idx[-num_top_branches:]

# Sort top branch index
# top_branch_idx = np.sort(top_branch_idx)

top_branch_idx

## All branches

In [None]:
fig = plt.figure(figsize=(10, 3))
ax1 = fig.add_subplot(211)
sns.heatmap(prob_true.T, ax=ax1, vmin=0, vmax=1, xticklabels=[], yticklabels=[])
ax1.set_title('Probability of branch heavy loading')
ax1.set_xlabel('Branches')
ax1.set_ylabel('Time steps')

ax2 = fig.add_subplot(212)
sns.heatmap(prob_pred.T, ax=ax2, vmin=0, vmax=1, xticklabels=[], yticklabels=[])
ax2.set_xlabel('Branches')
ax2.set_ylabel('Time steps')

plt.tight_layout()
plt.show()
plt.close('all')

## Significant branches only

## Failure mode 1 probability: 

$$\mathcal{P}^I_o(t) = p\left( \Gamma_t \right)$$

In [None]:
# Plot probability for individual branch
fig = plt.figure(figsize=(12, 1.75))

x = np.arange(1, 13)

ax = fig.add_subplot(141)
ax.plot(x, prob_true[top_branch_idx[-1]], color='red', marker='^', label='MILP')
ax.plot(x, prob_pred[top_branch_idx[-1]], color='blue', marker='^', label='GNN')
ax.set_xlim((0, 13))
ax.set_xticks([2, 4, 6, 8, 10, 12])
ax.set_xlabel('Time step $t$ (h)')
ax.set_ylim((-0.1, 1.1))
ax.set_ylabel('$\mathcal{P}^I_o(t)$')
ax.legend(loc='lower left', ncols=2, frameon=False)

ax = fig.add_subplot(142)
ax.plot(x, prob_true[top_branch_idx[-2]], color='red', marker='^', label='MILP')
ax.plot(x, prob_pred[top_branch_idx[-2]], color='blue', marker='^', label='GNN')
ax.set_xlim((0, 13))
ax.set_xticks([2, 4, 6, 8, 10, 12])
ax.set_xlabel('Time step $t$ (h)')
ax.set_ylim((-0.1, 1.1))
ax.set_ylabel('$\mathcal{P}^I_o(t)$')

ax = fig.add_subplot(143)
ax.plot(x, prob_true[top_branch_idx[-3]], color='red', marker='^', label='MILP')
ax.plot(x, prob_pred[top_branch_idx[-3]], color='blue', marker='^', label='GNN')
ax.set_xlim((0, 13))
ax.set_xticks([2, 4, 6, 8, 10, 12])
ax.set_xlabel('Time step $t$ (h)')
ax.set_ylim((-0.1, 1.1))
ax.set_ylabel('$\mathcal{P}^I_o(t)$')
# ax.legend(loc='upper right', ncols=2, frameon=False)

ax = fig.add_subplot(144)
ax.plot(x, prob_true[top_branch_idx[-4]], color='red', marker='^', label='MILP')
ax.plot(x, prob_pred[top_branch_idx[-4]], color='blue', marker='^', label='GNN')
ax.set_xlim((0, 13))
ax.set_xticks([2, 4, 6, 8, 10, 12])
ax.set_xlabel('Time step $t$ (h)')
ax.set_ylim((-0.1, 1.1))
ax.set_ylabel('$\mathcal{P}^I_o(t)$')
# ax.legend(loc='upper right', ncols=2, frameon=False)


plt.tight_layout()
plt.show()
plt.close('all')

In [None]:
# fig = plt.figure(figsize=(3, 4))
fig, ax = plt.subplots(1, 1, figsize=(3, 2.5))

x = np.arange(1, 13)

for i in range(1, 5):
    ax.plot(x, prob_true[top_branch_idx[-i]], color=colors[i-1], marker='^', clip_on=False)
    ax.plot(x, prob_pred[top_branch_idx[-i]], color=colors[i-1], marker='^', linestyle='dashed')

# ax.spines['left'].set_position(('outward', 5))
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
# ax.spines['left'].set_visible(False)
ax.set_xlim((0.5, 13))
ax.set_ylim((0, 1.0))
ax.spines['bottom'].set_visible(True)
ax.set_xticks(np.arange(1, 15, 2))
ax.set_xlabel('Time step $t$ (h)')
ax.set_ylabel('$\mathcal{P}^I_o(t)$')

plt.tight_layout()
plt.show()
# fig.savefig(f'../../submission/118_overloading_standalone_reliability.png', bbox_inches='tight', dpi=300)
plt.close('all')

## Failure mode 2 probability:

$$\mathcal{P}^{I+}_o(t) = p\left( \bigcup_{t'=t+1}^{t+\Delta T} \Gamma_{t'} \, \vert \, \Gamma_t \right)$$

In [9]:
# Number of time steps
T = 12
delta_T = 2

num_sig_branches = 4

cond_prob_true = np.zeros((num_sig_branches, T))
cond_prob_pred = np.zeros((num_sig_branches, T))

# Calculate conditional probability ground truth
for i in range(num_sig_branches):
    for j in range(T-delta_T):
        idx = over_load_true[:, sorted_idx[-(i+1)], j] == True
        num = (over_load_true[idx, sorted_idx[-(i+1)], j+1:j+1+delta_T] == True).sum(axis=1)
        cond_prob_true[i, j] = (num >= 1).sum() / idx.sum()

# Calculate conditional probability prediction
for i in range(num_sig_branches):
    for j in range(T-delta_T):
        idx = over_load_pred[:, sorted_idx[-(i+1)], j] == True
        num = (over_load_pred[idx, sorted_idx[-(i+1)], j+1:j+1+delta_T] == True).sum(axis=1)
        cond_prob_pred[i, j] = (num >= 1).sum() / idx.sum()

In [None]:
# Plot probability for individual branch
fig = plt.figure(figsize=(12, 1.75))

x = np.arange(1, 11)

ax = fig.add_subplot(141)
ax.plot(x, cond_prob_true[0][:-delta_T], color='red', marker='^', label='MILP')
ax.plot(x, cond_prob_pred[0][:-delta_T], color='blue', marker='^', label='GNN')
ax.set_xlim((0, 13))
ax.set_xticks([2, 4, 6, 8, 10, 12])
ax.set_xlabel('Time step $t$ (h)')
ax.set_ylim((-0.1, 1.1))
ax.set_ylabel('$\mathcal{P}^{I+}_o(t)$')
ax.legend(loc='lower left', ncols=2, frameon=False)

ax = fig.add_subplot(142)
ax.plot(x, cond_prob_true[1][:-delta_T], color='red', marker='^', label='MILP')
ax.plot(x, cond_prob_pred[1][:-delta_T], color='blue', marker='^', label='GNN')
ax.set_xlim((0, 13))
ax.set_xticks([2, 4, 6, 8, 10, 12])
ax.set_xlabel('Time step $t$ (h)')
ax.set_ylim((-0.1, 1.1))
ax.set_ylabel('$\mathcal{P}^{I+}_o(t)$')

ax = fig.add_subplot(143)
ax.plot(x, cond_prob_true[2][:-delta_T], color='red', marker='^', label='MILP')
ax.plot(x, cond_prob_pred[2][:-delta_T], color='blue', marker='^', label='GNN')
ax.set_xlim((0, 13))
ax.set_xticks([2, 4, 6, 8, 10, 12])
ax.set_xlabel('Time step $t$ (h)')
ax.set_ylim((-0.1, 1.1))
ax.set_ylabel('$\mathcal{P}^{I+}_o(t)$')
# ax.legend(loc='upper right', ncols=2, frameon=False)

ax = fig.add_subplot(144)
ax.plot(x, cond_prob_true[3][:-delta_T], color='red', marker='^', label='MILP')
ax.plot(x, cond_prob_pred[3][:-delta_T], color='blue', marker='^', label='GNN')
ax.set_xlim((0, 13))
ax.set_xticks([2, 4, 6, 8, 10, 12])
ax.set_xlabel('Time step $t$ (h)')
ax.set_ylim((-0.1, 1.1))
ax.set_ylabel('$\mathcal{P}^{I+}_o(t)$')
# ax.legend(loc='upper right', ncols=2, frameon=False)

plt.tight_layout()
plt.show()
plt.close('all')

In [None]:
# fig = plt.figure(figsize=(3, 4))
fig, ax = plt.subplots(1, 1, figsize=(3, 2.5))

x = np.arange(1, 11)

for i in range(4):
    ax.plot(x, cond_prob_true[i, :-2], color=colors[i], marker='^', clip_on=False)
    ax.plot(x, cond_prob_pred[i, :-2], color=colors[i], marker='^', linestyle='dashed')

# ax.spines['left'].set_position(('outward', 5))
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
# ax.spines['left'].set_visible(False)
ax.set_xlim((0.5, 11.5))
ax.set_ylim((0, 1.0))
ax.spines['bottom'].set_visible(True)
ax.set_xticks(np.arange(1, 13, 2))
ax.set_xlabel('Time step $t$ (h)')
ax.set_ylabel('$\mathcal{P}^{I+}_o(t)$')

plt.tight_layout()
plt.show()
# fig.savefig(f'../../submission/118_overloading_progressive_reliability.png', bbox_inches='tight', dpi=300)
plt.close('all')

## Faliure mode 3 probability

$$\mathcal{P}^{II}_o(t) = \left[ p\left( \Gamma_t^j \, \vert \, \Gamma_t^i \right) \right]_{i, j}$$

In [12]:
# List to store results
mode3_prob_true = np.zeros((nt, num_sig_branches, num_sig_branches))
mode3_prob_pred = np.zeros((nt, num_sig_branches, num_sig_branches))

# Calculate conditional probability ground truth
for i in range(nt):
    for j in range(num_sig_branches):
        for k in range(num_sig_branches):
            idx = over_load_true[:, sorted_idx[-(j+1)], i] == True  
            mode3_prob_true[i, j, k] = (over_load_true[idx, sorted_idx[-(k+1)], i] == True).sum() / idx.sum()

# Calculate conditional probability prediction
for i in range(nt):
    for j in range(num_sig_branches):
        for k in range(num_sig_branches):
            idx = over_load_pred[:, sorted_idx[-(j+1)], i] == True  
            mode3_prob_pred[i, j, k] = (over_load_pred[idx, sorted_idx[-(k+1)], i] == True).sum() / idx.sum()

In [None]:
## Save matrix plot
fig = plt.figure(figsize=(9, 5))

for i in range(nt):
    ax = fig.add_subplot(3, 4, i+1)
    ax.set_title(f'$t = {i+1} \, h$', fontsize=10)
    sns.heatmap(mode3_prob_true[i], ax=ax, vmin=0, vmax=1, 
                cbar_kws={"ticks":[0, 0.5, 1]},
                xticklabels=['i', 'ii', 'iii', 'iv'], 
                yticklabels=['i', 'ii', 'iii', 'iv'],
                linewidths=1)

plt.tight_layout()
plt.show()
# fig.savefig('../../res/IEEE_Case118/reliability_risk/branch_overloading/mode3_prob_true.png', dpi=300)
plt.close('all')

In [None]:
## Save matrix plot
fig = plt.figure(figsize=(9, 5))

for i in range(nt):
    ax = fig.add_subplot(3, 4, i+1)
    ax.set_title(f'$t = {i+1} \, h$', fontsize=10)
    sns.heatmap(mode3_prob_pred[i], ax=ax, vmin=0, vmax=1, 
                cbar_kws={"ticks":[0, 0.5, 1]},
                xticklabels=['i', 'ii', 'iii', 'iv'], 
                yticklabels=['i', 'ii', 'iii', 'iv'],
                linewidths=1)

plt.tight_layout()
plt.show()
# fig.savefig('../../res/IEEE_Case118/reliability_risk/branch_overloading/mode3_prob_pred.png', dpi=300)
plt.close('all')

## Failure mode 4 probability

$$\mathcal{P}^{II+}_o(t) = \left[ p\left(\bigcup_{t'=t+1}^{t+\Delta T} \Gamma_{t'}^j \, \vert \, \Gamma_t^i \right) \right]_{i, j}$$

## Risk analysis

#### Failure model 1

In [15]:
# Define cost function
Co = 1 # Cost of overloading, $5/MW

# Calculate the amount of overloading 
branch_overflow_true = np.abs(branch_flow_true - PF_threshold)
branch_overflow_pred = np.abs(branch_flow_pred - PF_threshold)

## Risk associated with overloading at time t
cost_true = branch_overflow_true * Co
cost_pred = branch_overflow_pred * Co
risk_true = cost_true.mean(axis=0)[top_branch_idx[:4]].sum(axis=0)
risk_pred = cost_pred.mean(axis=0)[top_branch_idx[:4]].sum(axis=0)

#### Failure mode 2

In [16]:
cond_risk_true = np.zeros((num_branches, T))
cond_risk_pred = np.zeros((num_branches, T))

# Calculate conditional probability ground truth
for i in range(num_branches):
    for j in range(T-delta_T):
        idx = over_load_true[:, sorted_idx[-(i+1)], j] == True
        cond_cost_true = branch_overflow_true[:, sorted_idx[-(i+1)], j+1:j+1+delta_T] * Co
        cond_risk_true[i, j] = cond_cost_true.mean(axis=0).sum()
cond_risk_true = cond_risk_true[top_branch_idx[:4]].sum(axis=0)

# Calculate conditional probability prediction
for i in range(num_branches):
    for j in range(T-delta_T):
        idx = over_load_true[:, sorted_idx[-(i+1)], j] == True
        cond_cost_pred = branch_overflow_pred[:, sorted_idx[-(i+1)], j+1:j+1+delta_T] * Co
        cond_risk_pred[i, j] = cond_cost_pred.mean(axis=0).sum()
cond_risk_pred = cond_risk_pred[top_branch_idx[:4]].sum(axis=0)

#### Overall risk

In [17]:
overall_risk_true = risk_true + cond_risk_true
overall_risk_pred = risk_pred + cond_risk_pred

In [None]:
# Plot overall risk
x = np.arange(1, 11)

fig = plt.figure(figsize=(4, 1.75))
ax = fig.add_subplot(111)
ax.plot(x, overall_risk_true[:-delta_T]/1e3, color='red', marker='^', label='MILP')
ax.plot(x, overall_risk_pred[:-delta_T]/1e3, color='blue', marker='^', label='GNN')
ax.set_xlabel('Time step $t$ (h)')
ax.set_xlim((0, 11))
ax.set_xticks([1, 3, 5, 7, 9])
ax.set_ylim((1, 2))
ax.set_ylabel('$\Re_o(t)$ ($\$10^4$)')
ax.legend(ncols=2, frameon=False)

plt.tight_layout()
plt.show()
plt.close('all')


In [None]:
# fig = plt.figure(figsize=(3, 4))
fig, ax = plt.subplots(1, 1, figsize=(3, 2.5))

x = np.arange(1, 11)

ax.plot([], [], color='black', marker='s', label='MILP')
ax.plot([], [], color='black', marker='s', label='GNN', linestyle='dashed')

ax.plot(x, overall_risk_true[:-delta_T]/1e3, color='red', marker='^', clip_on=False)
ax.plot(x, overall_risk_pred[:-delta_T]/1e3, color='blue', marker='^', linestyle='dashed')

# ax.spines['left'].set_position(('outward', 5))
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
# ax.spines['left'].set_visible(False)
ax.set_xlim((0.5, 11))
ax.set_ylim((0, 2))
ax.spines['bottom'].set_visible(True)
ax.set_xticks(np.arange(1, 13, 2))
ax.set_xlabel('Time step $t$ (h)')
ax.set_ylabel('$\Re_o(t)$, $\\times 10^3$')
# ax.legend(loc='lower right', frameon=False, ncols=2, fontsize=10)

plt.tight_layout()
plt.show()
fig.savefig(f'../../submission/118_overloading_overall_risk.png', bbox_inches='tight', dpi=300)
plt.close('all')