# Simulation on Power System State Estimation and FDI Attacks

In [None]:
# Import packages
from pypower.api import *
from configs.config_mea_idx import define_mea_idx_noise
from configs.config import se_config, opt
from utils.class_se import SE
from utils.fdi_att import FDI
import numpy as np
import matplotlib.pyplot as plt
import random

## Test Load Generation

In [None]:
from utils.gen_data import gen_case, gen_load
from pypower.api import case14, runopf
from pypower.idx_bus import PD, QD
from pypower.idx_brch import RATE_A

case = case14()
case_mod = gen_case(case, 'case14')

print(f'The active load: {case_mod["bus"][:,PD]}')
print(f'The active power flow constraints: {case_mod["branch"][:,RATE_A]}')
print(f'The genenerator cost: {case_mod["gencost"][:,[3,4]]}')

# Test opf
result = runopf(case_mod, opt)

In [None]:
"""
Test the data generation
"""

load_active, load_reactive = gen_load(case_mod, 'case14')
load_active.shape

In [None]:
# Visualize the load condition
fig, (ax1, ax2) = plt.subplots(1,2, figsize = (20,5))
ax1.plot(load_active[:,1])
ax2.plot(load_active[:,2])

## Power System State Estimation (PSSE)
We consider different types and amount of measurements in the AC-PSSE. In the first case study, the entire measurement from both RTUs and PMUs are considered which includes.
1. "From" side active power flow: pf;
2. "To" side active power flow: pt;
3. Active power injection: pi;
4. Voltage angle: vang;
5. "From" side reactive power flow: qf;
6. "To" side reactive power flow: qt;
7. Reactive power injection: qi;
8. Voltage magnitude: vmag;

In this case, we set the FPR of BDD as 0.02, e.g. ~2% of the normal measurement can be detected as suspicious.

Firstly, let's do the state estimation with Jacobian updated (Honest State Estimation). We also print the jacobian change against the flat start.

In [None]:
# Determine the measurement, here we can the default "full" setting
mea_idx, no_mea, noise_sigma = define_mea_idx_noise(case, 'FULL')

# Instance the state estimation Class SE
se = SE(case_mod, noise_sigma=noise_sigma, idx=mea_idx, fpr = 0.02)

# Run opf to get the measurement
result = se.run_opf()

# Construct the measurement
z, z_noise, vang_ref, vmag_ref = se.construct_mea(result) # Get the measurement

# Run AC-SE
se_config['verbose'] = 1  # Default is 0
v_est = se.ac_se_pypower(z_noise, vang_ref, vmag_ref, config = se_config)

# BDD detection
residual = se.bdd_residual(z_noise, v_est)
print(f'BDD threshold: {se.bdd_threshold}')
print(f'residual: {residual}')

We can see that the Jacobian variation is not large. 

We can even run more state estimations to see the residual distributions. Note that we only run under the same load condition (Note: it may take a while for different test size.)

In [None]:
r_summary = []
for _ in range(1000):
    # Generate different noisy measurement
    z, z_noise, vang_ref, vmag_ref = se.construct_mea(result)
    # State estimation
    v_est = se.ac_se_pypower(z_noise, vang_ref, vmag_ref)
    # Residual
    residual = se.bdd_residual(z_noise, v_est)
    r_summary.append(residual)

fp_no = 0
for residual in r_summary:
    if residual >= se.bdd_threshold:
        fp_no += 1

fpr = fp_no/len(r_summary)
print(f'The false positive rate: {fpr}')   # close to fpr = 0.02
ax = plt.figure()
plt.hist(r_summary, cumulative=True, density=True, bins=50)
plt.vlines(ymin=0, ymax=1, x=se.bdd_threshold, colors='red')
plt.xlabel('Residual')
plt.ylabel('c.d.f.')
plt.title('The cdf of residual distribution')
plt.hlines(xmin=np.min(r_summary),xmax=np.max(r_summary), y=0.98, colors='red')
plt.show()

We can then repeat the procedure by Dishonest update where the Jacobian is unchanged as the flat start through the iteation

In [None]:
# Generate a basic case
case = case14()
# Determine the measurement, here we can the default "full" setting
mea_idx, no_mea, noise_sigma = define_mea_idx_noise(case, 'FULL')

# Instance the state estimation Class SE
se = SE(case, noise_sigma=noise_sigma, idx=mea_idx, fpr = 0.02)

# Run opf to get the measurement
result = se.run_opf()

# Construct the measurement
z, z_noise, vang_ref, vmag_ref = se.construct_mea(result) # Get the measurement

# Run AC-SE
se_config['verbose'] = 1       # Default is 0
v_est = se.ac_se_pypower(z_noise, vang_ref, vmag_ref, is_honest=False, config = se_config)

# BDD detection
residual = se.bdd_residual(z_noise, v_est)
print(f'BDD threshold: {se.bdd_threshold}')
print(f'residual: {residual}')

We then test the state estimation based on different measurement from various opf runs.

In [None]:
from tqdm import tqdm
r_summary = []
for i in tqdm(range(0,2000,5)):
    # Get the load
    pd = load_active[i,:]
    qd = load_reactive[i,:]
    # Run opf
    result = se.run_opf(active_load = pd, reactive_load = qd)
    # print(result['success'])
    # Generate different noisy measurement
    z, z_noise, vang_ref, vmag_ref = se.construct_mea(result)
    # State estimation
    se_config['verbose'] = 0
    se_config['is_honest'] = False
    v_est = se.ac_se_pypower(z_noise, vang_ref, vmag_ref, config = se_config)
    # Residual
    residual = se.bdd_residual(z_noise, v_est)
    r_summary.append(residual)

fp_no = 0
for residual in r_summary:
    if residual >= se.bdd_threshold:
        fp_no += 1

fpr = fp_no/len(r_summary)
print(f'The false positive rate: {fpr}')   # close to fpr = 0.02
ax = plt.figure()
plt.hist(r_summary, cumulative=True, density=True, bins=50)
plt.vlines(ymin=0, ymax=1, x=se.bdd_threshold, colors='red')
plt.xlabel('Residual')
plt.ylabel('c.d.f.')
plt.title('The cdf of residual distribution')
plt.hlines(xmin=np.min(r_summary),xmax=np.max(r_summary), y=0.98, colors='red')
plt.show()

We can see that the two methods does not vary a lot. The Dishonest state estimation can still give a similar FPR.

## False Data Injection (FDI)

### Attack on single phase angle

In [None]:
# Instance the Class FDI
fdi = FDI(case_mod, noise_sigma, mea_idx, 0.02)
# Specify the attack type
att_spec = {}
print(f'The pv and pq bus include {fdi.non_ref_index}')

# Define the attack specification file
att_spec['ang_posi'] = random.sample(fdi.non_ref_index, 1)
att_spec['ang_str'] = 0.2
att_spec['mag_posi'] = []
att_spec['mag_str'] = 0

# Do state estimation
v_est = fdi.ac_se_pypower(z_noise, vang_ref, vmag_ref)

# Generate FDI attack
v_att, ang_posi = fdi.gen_fdi_att(v_est, att_spec)

fig, (ax1, ax2) = plt.subplots(1,2, figsize=(12,4))
ax1.plot(np.angle(v_att), label = 'att ang')
ax1.plot(np.angle(v_est), label = 'est ang')
ax2.plot(np.abs(v_att), label = 'att ang')
ax2.plot(np.abs(v_est), label = 'est ang')
ax1.title.set_text('Angle')
ax2.title.set_text('Magnitude')

Test on the BDD detection result on the FDI attacks. It should give the residual smaller than the BDD threshold.

In [None]:
# Calculate the attacked measurement
z_att_est = fdi.h_x_pypower(v_att)
z_est = fdi.h_x_pypower(v_est)
z_att_noise = z_noise + z_att_est - z_est

# Do state estimation
v_att_est = fdi.ac_se_pypower(z_att_noise, vang_ref, vmag_ref)


# Find the detection residual
residual_att = fdi.bdd_residual(z_att_noise, v_att_est)
print(f'BDD threshold: {se.bdd_threshold}')
print(f'residual: {residual_att}')

Test more on the random attack (which may take a while):

In [None]:
r_summary_att = []
for _ in range(1000):
    # Generate different noisy measurement
    z, z_noise, vang_ref, vmag_ref = se.construct_mea(result)
    # Specify the attack type
    att_spec = {}
    ang_no = np.random.randint(fdi.no_bus-1)
    att_spec['ang_posi'] = random.sample(fdi.non_ref_index, ang_no)
    att_spec['ang_str'] = -0.5+np.random.rand()        #-0.5-0.4
    mag_no = np.random.randint(fdi.no_bus-1)
    att_spec['mag_posi'] = random.sample(fdi.non_ref_index, mag_no)
    att_spec['mag_str'] = -0.1+0.2*np.random.rand()    # -0.1-0.1
    
    # Do state estimation
    v_est = fdi.ac_se_pypower(z_noise, vang_ref, vmag_ref)
    # Generate FDI attack
    v_att = fdi.gen_att(v_est, att_spec)
    
    # Calculate the attacked measurement
    z_att_est = fdi.h_x_pypower(v_att)
    z_est = fdi.h_x_pypower(v_est)
    z_att_noise = z_noise + z_att_est - z_est
    # Find the detection residual
    residual_att = fdi.bdd_residual(z_att_noise, vang_ref, vmag_ref)
    r_summary_att.append(residual_att)

tp_no = 0
for residual in r_summary_att:
    if residual >= se.bdd_threshold:
        tp_no += 1

tpr = tp_no/len(r_summary_att)
print(f'The true positive rate: {tpr}')   # close to fpr = 0.02
ax = plt.figure()
plt.hist(r_summary_att, cumulative=True, density=True, bins=50)
plt.vlines(ymin=0, ymax=1, x=fdi.bdd_threshold, colors='red')
plt.xlabel('Residual')
plt.ylabel('c.d.f.')
plt.title('The cdf of residual distribution on attacks')
plt.hlines(xmin=np.min(r_summary_att),xmax=np.max(r_summary_att), y=0.98, colors='red')
plt.show()

# RTU Measurement Case
In this case, only RTU measurements are considered: e.g. no vang from the full measurement case
1. "From" side active power flow: pf;
2. "To" side active power flow: pt;
3. Active power injection: pi;
4. "From" side reactive power flow: qf;
5. "To" side reactive power flow: qt;
6. Reactive power injection: qi;
7. Voltage magnitude: vmag;

In [None]:
case = case14()
mea_idx, no_mea, noise_sigma = define_mea_idx_noise(case, 'RTU')
print(f'mea_idx: \n{mea_idx}')
se = SE(case, noise_sigma=noise_sigma, idx=mea_idx)
se.no_mea
result = runopf(case, opt)
z, z_noise, vang_ref, vmag_ref = se.construct_mea(result) # Get the measurement
print(f'measurement shape: \n{z_noise.shape}')
se_config['verbose'] = 1  # Default is 0
se.ac_se_pypower(z_noise, vang_ref, vmag_ref, config = se_config)


# Half RTU Measurement 
In this case, only from side and injection RTU measurements are considered
1. "From" side active power flow: pf;
2. Active power injection: pi;
3. "From" side reactive power flow: qf;
4. Reactive power injection: qi;

In [None]:
case = case14()
mea_idx, no_mea, noise_sigma = define_mea_idx_noise(case, 'HALF_RTU')
print(f'mea_idx: \n{mea_idx}')
se = SE(case, noise_sigma=noise_sigma, idx=mea_idx)
se.no_mea
result = runopf(case, opt)
z, z_noise, vang_ref, vmag_ref = se.construct_mea(result) # Get the measurement
print(f'measurement shape: \n{z_noise.shape}')
se_config['verbose'] = 1  # Default is 0
se.ac_se_pypower(z_noise, vang_ref, vmag_ref, config = se_config)

# A Random case
Each measurement type is randomly picked

In [None]:
case = case14()
mea_idx, no_mea, noise_sigma = define_mea_idx_noise(case, 'RANDOM')
print(f'mea_idx: \n{mea_idx}')
se = SE(case, noise_sigma=noise_sigma, idx=mea_idx)
se.no_mea
result = runopf(case, opt)
z, z_noise, vang_ref, vmag_ref = se.construct_mea(result) # Get the measurement
print(f'measurement shape: \n{z_noise.shape}')
se_config['verbose'] = 1  # Default is 0
se.ac_se_pypower(z_noise, vang_ref, vmag_ref, config = se_config)

# A Random Case which likely gives unobservability

In [None]:
case = case14()
mea_idx, no_mea, noise_sigma = define_mea_idx_noise(case, 'UNOBSERVABLE')
print(f'mea_idx: \n{mea_idx}')
se = SE(case, noise_sigma=noise_sigma, idx=mea_idx)
se.no_mea
result = runopf(case, opt)
z, z_noise, vang_ref, vmag_ref = se.construct_mea(result) # Get the measurement
print(f'measurement shape: \n{z_noise.shape}')
se_config['verbose'] = 1  # Default is 0
se.ac_se_pypower(z_noise, vang_ref, vmag_ref, config = se_config)