# Robust MTD under DC assumption

This notebook is a simple walk through of the robust MTD algorithm under the DC assumption. IEEE bus 14 system is used.

Under the DC assumption, the settings of D-FACTS devices are constant and does not depend on the load condition. 

Note: the original paper is analysed under simplified AC model (with resistence and varying load conditions being considered). The simulations in the paper are performed under both simplified AC and full AC model.

Change the settings in utils.settings

In [4]:
# import packages and functions
import numpy as np
from pypower.api import case6ww, case14,case57, opf
from utils.grid_fun import dc_grid
import copy
from scipy.stats.distributions import chi2
from scipy.optimize import minimize, NonlinearConstraint, Bounds, linprog

In [5]:
# settings: for IEEE case-14
mpc = case14() # load the model
gencost = [20,20,40,40,40]
genlimit_max = [330,140,100,100,100]
genlimit_min = [0,0,0,0,0]
flowlimit = 300*np.ones(mpc['branch'].shape[0])
name = "case14"

case = dc_grid(mpc, gencost, genlimit_max, genlimit_min, flowlimit)
sigma = 0.01            # assume 1% std of 1p.u. noise
alpha = 0.05            # FPR confidence (for the BDD)

In [6]:
# assume resistance is 0
# assume the measurement consists of [PI, PF]
b = 1/case.x  # negative susceptance
Bff = np.diag(b)
A = case.A
Ar = case.Ar
M = np.concatenate([A.T, np.eye(case.no_branch)], axis = 0)
H = M@Bff@A
Hr = M@Bff@Ar

print('shape of Hr:', Hr.shape)
print('rank of Hr:', np.linalg.matrix_rank(Hr))

no_measure = case.no_branch + case.no_bus
R = np.eye(no_measure) * sigma**2  # covariance matrix

case.alpha = alpha
case.DoF = no_measure - (case.no_bus - 1)             # degree of freedom of chi-square distribution
case.BDD_threshold = chi2.ppf(1-alpha, df = case.DoF)      # threshold for chi-square test for normalized residual
case.R = R
case.R_inv = np.linalg.inv(R)
case.R_inv_12 = np.sqrt(case.R_inv)

shape of Hr: (34, 13)
rank of Hr: 13


In [7]:
# prepare the optimisation
#dfacts_index = np.array([1,3,5,8,9,18,19])-1
dfacts_index = np.array([2,3,4,12,15,18,20])-1 
k_min = 6
col_constraint_value = 0.9998

degree_one = np.array([8]) - 1 ## buses that are not in any loop
degree_one_posi = []
for i in degree_one:
    if i < case.ref_index:
        degree_one_posi.append(i)
    elif i > case.ref_index:
        degree_one_posi.append(i-1)

col_constraint = col_constraint_value*np.ones((case.no_bus-1,))
col_constraint[degree_one_posi] = 1
ratio = 0.2  # the level of D-FACTS perturbation ratio

x_max = copy.deepcopy(case.x)*1.0001
x_min = copy.deepcopy(case.x)*0.9999
x_max[dfacts_index] = x_max[dfacts_index]*(1+ratio)
x_min[dfacts_index] = x_min[dfacts_index]*(1-ratio)

## Construct the optimisation problem

Firstly define normalised measurement matrix $H_N = R^{-\frac{1}{2}}H$. 

The normalised projection matrix becomes $P_N = H_N(H_N^TH_N)^{-1}H_N^T$ which is constant.

Let the reactance $x'$ as the decision variable. The post-MTD measurement matrix becomes $H_N'=R^{-\frac{1}{2}}MB_{ff}'A_r$ where $B'_{ff} = \text{diag}[1/x']$. The corresponding projection matrix becomes $P_N'=H_N'(H_N'^TH_N')^{-1}H_N'^T$.

Because the bus-14 system cannot be complete, we need following two-step optimisation:

- Step One: minimise Frobenius norm $\text{min}\|P_NP_N'\|_F$

- Step Two: iteratively minimise the l2 norm $\text{min}\|P_NP_N' - U_0^{1}U_0^{1T}\|_F$

both the optimisation problem are subject to D-FACTS constraints and the column constraints.

We use scipy optimisation to solve the non-convex problem. Note that it is possible to relax the objective into convex optimisation. Please refer to https://arxiv.org/abs/2204.12970 for details.

In [8]:
# construct matrix
Hr_N = case.R_inv_12 @ Hr
Pr_N = Hr_N @ np.linalg.inv(Hr_N.T@Hr_N) @ Hr_N.T
M_N = case.R_inv_12 @ M

# test on random MTD
x_mtd = copy.deepcopy(case.x)
ratio_ = -ratio + 2*ratio*np.random.rand(len(dfacts_index))
x_mtd[dfacts_index] = case.x[dfacts_index]*(1+ratio_)

Hr_mtd_N = case.R_inv_12 @ M_N @ np.diag(1/x_mtd) @ Ar
composite_matrix = np.concatenate([Pr_N, Hr_mtd_N], axis = 1)
print('rank: ', np.linalg.matrix_rank(composite_matrix))

rank:  20


In [9]:
# use scipy to construct and solve the problem

# Frobenius norm
class incomplete_fro():
    def __init__(self, Hr_N, Pr_N, M_N, Ar, x_max, x_min, col_constraint):
        self.Hr_N = Hr_N
        self.Pr_N = Pr_N
        self.M_N = M_N
        self.Ar = Ar
        self.x_max = x_max
        self.x_min = x_min
        self.nonlinear_constraint_max = col_constraint
    
    def fun_loss(self, x):
        
        # decision variable: reactance x
        b = 1/(x)
        
        Hr_mtd_N = self.M_N @ np.diag(b) @ self.Ar
        Pr_mtd_N = Hr_mtd_N @ np.linalg.inv(Hr_mtd_N.T@Hr_mtd_N) @ Hr_mtd_N.T
        
        fro_norm = np.linalg.norm(self.Pr_N @ Pr_mtd_N, ord = 'fro')
        
        return fro_norm
    
    def fun_constraint(self, x):
        # nonlinear constraints on column constraint
        
        b = 1/(x)
        
        Hr_mtd_N = self.M_N @ np.diag(b) @ self.Ar
        Pr_mtd_N = Hr_mtd_N @ np.linalg.inv(Hr_mtd_N.T@Hr_mtd_N) @ Hr_mtd_N.T
        
        con = []  
        # find the cos of each bus
        for i in range(Hr_mtd_N.shape[-1]):
            # the consine of each column of H_N
            con.append(np.linalg.norm(Pr_mtd_N @ self.Hr_N[:,i:i+1], ord = 2)/np.linalg.norm(self.Hr_N[:,i:i+1], ord = 2))
        
        return con
    
    def nonlinear_constraint(self):
        
        return NonlinearConstraint(self.fun_constraint, 0, self.nonlinear_constraint_max)
    
    def fun_bound(self):
        # boundary constraint
        
        return Bounds(self.x_min, self.x_max) # don't forget to set zero on the non D-FACTS lines
        
# l2 norm
class incomplete_l2():
    def __init__(self, Hr_N, Pr_N, M_N, Ar, x_max, x_min, col_constraint, U_k):
        # U_k is the matrix consisting the basis of the intersection subspace
        self.Hr_N = Hr_N
        self.Pr_N = Pr_N
        self.M_N = M_N
        self.Ar = Ar
        self.x_max = x_max
        self.x_min = x_min
        self.nonlinear_constraint_max = col_constraint
        
        # projector to the intersection subspace (note that U_k is orthonormal)
        self.P_k = U_k@U_k.T 
        
    def fun_loss(self, x):
        
        # decision variable: reactance x
        b = 1/(x)
        Hr_mtd_N = self.M_N @ np.diag(b) @ self.Ar
        Pr_mtd_N = Hr_mtd_N @ np.linalg.inv(Hr_mtd_N.T@Hr_mtd_N) @ Hr_mtd_N.T
        
        l2_norm = np.linalg.norm(self.Pr_N @ Pr_mtd_N - self.P_k, ord = 2)
        
        return l2_norm
    
    def fun_constraint(self, x):
        # nonlinear constraints on column constraint
        
        b = 1/(x)
        
        Hr_mtd_N = self.M_N @ np.diag(b) @ self.Ar
        Pr_mtd_N = Hr_mtd_N @ np.linalg.inv(Hr_mtd_N.T@Hr_mtd_N) @ Hr_mtd_N.T
        
        con = []  
        # find the cos of each bus
        for i in range(Hr_mtd_N.shape[-1]):
            # the consine of each column of H_N
            con.append(np.linalg.norm(Pr_mtd_N @ self.Hr_N[:,i:i+1], ord = 2)/np.linalg.norm(self.Hr_N[:,i:i+1], ord = 2))
        
        return con
    
    def nonlinear_constraint(self):
        
        return NonlinearConstraint(self.fun_constraint, 0, self.nonlinear_constraint_max)
    
    def fun_bound(self):
        # boundary constraint
        
        return Bounds(self.x_min, self.x_max) # don't forget to set zero on the non D-FACTS lines
        

In [10]:
# define some useful functions
def return_matrix(x_mtd, M_N, Ar, Hr_N):
    b_mtd = 1/x_mtd
    Hr_mtd_N = M_N @ np.diag(b_mtd) @ Ar
    composite_matrix = np.concatenate([Hr_N, Hr_mtd_N], axis = -1)   
    rank_com = np.linalg.matrix_rank(composite_matrix)
    k = 2*(case.no_bus-1) - rank_com
    Pr_mtd_N = Hr_mtd_N @ np.linalg.inv(Hr_mtd_N.T@Hr_mtd_N) @ Hr_mtd_N.T
    
    return k, b_mtd, Pr_mtd_N

In [11]:
incomplete_fro_ = incomplete_fro(Hr_N, Pr_N, M_N, Ar, x_max, x_min, col_constraint)

# summary and record
success_no = 0            ## record the successful number
loss_summary = []         ## summary on the largest non-one singular value : the solution of incomplete_l2 after convergence
b_mtd_summary = []        ## the MTD perturbation of each run
x_mtd_summary = []

while True: 
    # multiple runs to find the best solution
    x0 = x_min + (x_max-x_min)*np.random.rand(len(case.x))
    # solve the incomplete_fro problem to find the wart start using SLSQP
    results = minimize(incomplete_fro_.fun_loss, x0, method = "SLSQP", 
                        constraints = incomplete_fro_.nonlinear_constraint(), 
                        bounds = incomplete_fro_.fun_bound(), 
                        options = {'ftol': 1e-4}) 
    # verify on the solution
    x_mtd = results.x
    assert np.all(x_mtd <= x_max + 1e-6)
    assert np.all(x_mtd >= x_min - 1e-6)

    k, b_mtd, Pr_mtd_N = return_matrix(x_mtd, M_N, Ar, Hr_N)
    
    if k != k_min or results.success == False:
        continue
    
    else:
        # if the k_min is satisfied, then solve the incomplete_l2 optimization problem
        
        singular_value_non_one_pre = 100     # set as a large value to pass the initial break test
        
        # iteration
        for ite_second in range(20):
            # set the maximum iteration number of the lower problem as 50
            # do T-SVD
            U, singular_value, V_transpose = np.linalg.svd(Pr_N@Pr_mtd_N)         # the SVD can slow the optimization problem
            U_k = U[:,:k_min]                                           # the orhornormal basis of the intersection subspaces
            singular_value_non_one = singular_value[k_min]              # the largest non one singular value
            V_k = V_transpose.T[:,:k_min]                               # the orthonormal basis of the intersection subspaces 
            
            assert np.max(np.abs(U_k-V_k)) < 1e-6                       # test the difference between U_k and V_k is small
            
            # instance the incomplete_l2 optimization problem 
            incomplete_l2_ = incomplete_l2(Hr_N, Pr_N, M_N, Ar, x_max, x_min, col_constraint, U_k)
            
            x0 = x_mtd                                                  # use the previous running result as the initial point
            
            results = minimize(incomplete_l2_.fun_loss, x0, method = "SLSQP", 
                            constraints = incomplete_l2_.nonlinear_constraint(),
                            bounds = incomplete_l2_.fun_bound(),
                            options = {'ftol': 1e-4})
            
            # analysis
            x_mtd = results.x
            
            assert np.all(x_mtd <= x_max + 1e-6)
            assert np.all(x_mtd >= x_min - 1e-6)
            
            k, b_mtd, Pr_mtd_N = return_matrix(x_mtd, M_N, Ar, Hr_N)
            
            # termination condition
            if np.abs(singular_value_non_one_pre-singular_value_non_one) < 1e-7 and results.success == True:
                
                U, singular_value, V_transpose = np.linalg.svd(Pr_N@Pr_mtd_N)         
                U_k = U[:,:k_min]                                           
                singular_value_non_one = singular_value[k_min]              
                V_k = V_transpose.T[:,:k_min]                               
                loss_summary.append(singular_value_non_one)     ## record the value of largest non one singular value
                b_mtd_summary.append(b_mtd)                     ## record the mtd perturbation
                x_mtd_summary.append(x_mtd)
                
                # print(np.arccos(singular_value_non_one))        ## see the minimal non-zero principal angle                      
                break                                             ## break the incomplete_l2 loop
            
            singular_value_non_one_pre = singular_value_non_one ## update the running result
        
        if success_no == 10-1:
            break  
        
        success_no += 1

b_mtd = b_mtd_summary[np.argmin(loss_summary)]                  ## reactance corresponding to smallest loss, e.g. smallest non-one singular value
x_mtd = x_mtd_summary[np.argmin(loss_summary)]

In [12]:
non_dfacts_index = list(set(list(range(14))) - set(dfacts_index))
x_mtd[non_dfacts_index] = case.x[non_dfacts_index]
k, b_mtd, Pr_mtd_N = return_matrix(x_mtd, M_N, Ar, Hr_N)
U, singular_value, V_transpose = np.linalg.svd(Pr_N@Pr_mtd_N)
singular_value_non_one = singular_value[k_min]
print(singular_value)

np.save('dc_mtd_result/x_mtd.npy', x_mtd)
np.save('dc_mtd_result/x.npy', case.x)

[1.00000000e+00 1.00000000e+00 1.00000000e+00 1.00000000e+00
 1.00000000e+00 1.00000000e+00 9.99792170e-01 9.99534914e-01
 9.99291499e-01 9.99082593e-01 9.98424031e-01 9.98226327e-01
 9.97790994e-01 8.84230152e-16 5.24044102e-16 4.24849753e-16
 3.82642438e-16 3.02269923e-16 2.86950634e-16 2.38721674e-16
 2.00483358e-16 1.80227969e-16 1.41457494e-16 1.07402131e-16
 1.05090765e-16 9.99198303e-17 9.99198303e-17 9.99198303e-17
 9.99198303e-17 9.99198303e-17 9.99198303e-17 9.99198303e-17
 5.83826831e-17 4.65102263e-17]
