In [9]:
import numpy as np
from ss_problem import SSProblem, SecureStateReconstruct
from scipy import linalg

n = 4
m = 2
p = 9
s = 5
# dtsys_a = np.random.normal(3,3,(n,n))
# eig_vals = linalg.eigvals(dtsys_a)
# this is slow for large matrices
# while(not all(np.isreal(eig_vals))):
#     dtsys_a = np.random.normal(3,3,(n,n))
#     eig_vals = linalg.eigvals(dtsys_a)
# by construction
# np.random.seed(100)
rng = np.random.default_rng(seed=100)
eig_vals = 8*rng.random((n,))+2 # eig value in [-10,-2] and [2,10]
eig_vals = np.multiply(np.sign(rng.random((n,)) - 1.0/2),eig_vals)
diag_eig = np.diag(eig_vals)
while True: 
    P = rng.random((n, n))
    for i in range(n):
        pi = P[:,i:i+1]
        pi_nom = pi / (np.linalg.norm(pi) + 1e-10)
        P[:,i:i+1] = pi_nom
    try:
        Pinv = np.linalg.inv(P)
        P_det = np.linalg.det(P)
        if P_det < 0.01:
            raise np.linalg.LinAlgError
        break
    except np.linalg.LinAlgError:
        pass

# choose an orthogonal basis

# Q = rng.random((n, n))
# P,_ = linalg.qr(Q)
# Pinv = P.transpose()

Pinv = np.linalg.inv(P)
dtsys_a = P@diag_eig@Pinv
# print(f'Ac eig_vals: {eig_vals}')
# print(f'dtsys_a: {dtsys_a}')
assert all(np.isreal(eig_vals))

dtsys_b = np.random.normal(1,1,(n,m))
dtsys_c = np.random.normal(1,1,(p,n)) # if given one sensor redundancy
dtsys_d = np.zeros((p,m))


# define input output data
init_state1 = np.random.normal(2,2,(n,1))
init_state2 = 2*init_state1
# u_seq = np.array([[1,1],[1,1],[1,1],[0,0]])
# assume sensors 1,3,5 are under attack
# sensor_initial_states = [init_state2,init_state2,init_state1,init_state1,
#                             init_state1,init_state1,init_state1,init_state1]
sensor_initial_states = [init_state2 if i < s else init_state1 for i in range(p)]

u_seq, tilde_y_his, noise_level = SSProblem.generate_attack_measurement(dtsys_a, dtsys_b, dtsys_c, dtsys_d,sensor_initial_states,
                                                                        s = s,is_noisy = False, noise_level=0.0001,u_seq = None)

# construct a problem instance
ss_problem = SSProblem(dtsys_a, dtsys_b, dtsys_c, dtsys_d, tilde_y_his, 
                        attack_sensor_count=s,input_sequence=u_seq,measurement_noise_level= noise_level )
# print(f'A: \n {ss_problem.A},  \n B:\n  {ss_problem.B}, \n C: \n {ss_problem.C}, \n D:\n {ss_problem.D}')

# ssr_solution = SecureStateReconstruct(ss_problem,possible_comb=comb1)
ssr_solution = SecureStateReconstruct(ss_problem)

soi = SecureStateReconstruct.compute_sparse_observability(ss_problem.A,ss_problem.C)
# eigenvalue observability index
eoi = SecureStateReconstruct.compute_eigenvalue_observability(ss_problem.A,ss_problem.C)
print(f'The problem have a sparse observability index {soi}, eigenvalue observability index: {eoi}, attacked sensor count: {s}')
# eigenspace related property
unique_eigvals, generalized_eigenspace_list, am_list, gm_list = SecureStateReconstruct.compute_sys_eigenproperty(ss_problem.A)
# print(f'Eigen properties. . \n generalized_unique_eigvals

# online operations 
# decompose ssr to sub_ssr problems
def to_test_code1(is_print = True):
    subprob_a, subprob_c, subprob_y = ssr_solution.generate_subssr_data()
    subspace_states_attacked_sensors_list = []
    for j in range(subprob_a.shape[2]):
        sub_a = subprob_a[:,:,j]
        sub_c = subprob_c
        sub_y_his = subprob_y[:,:,j]

        subproblem = SSProblem(sub_a, dtsys_b, sub_c, dtsys_d,sub_y_his, 
                                attack_sensor_count=s,is_sub_ssr=True)
        sub_solution = SecureStateReconstruct(subproblem)
        states, attacked_sensors_list = sub_solution.solve_initial_state_subssr(
                        eoi, generalized_eigenspace_list[j],error_bound = 1
            )

        if states is None:
            print(f'-------------solving subspace state x{j} fails---------------')
            return None
    
        states = states.transpose() # to zip
        subspace_states_attacked_sensors = list(zip(states,attacked_sensors_list))
        subspace_states_attacked_sensors_list.append(subspace_states_attacked_sensors) # [..., [xj,[0,1,2]], ...]

    full_state_list,corresp_sensors = sub_solution.compose_subspace_states(subspace_states_attacked_sensors_list)
    if full_state_list is None:
        print('--------------composition of subspace state fails---------------')
        for i in range(n):
            print(f'subspace_states_attacked_sensors_list entry{i}: {subspace_states_attacked_sensors_list[i]}')
        return None
    
    state_ind = 0
    if is_print:
        for full_state in full_state_list:
            print('--------------------------------------------------------------------')
            print(f'Estimated state {state_ind} - initial state 1 is \n {full_state.reshape(-1,1)-init_state1}')
            print(f'Estimated state {state_ind} - initial state 2 is \n {full_state.reshape(-1,1)-init_state2}')
            state_ind = state_ind +1
    return None

def to_test_code2(is_print = True):
    possible_states,corresp_sensors, _ = ssr_solution.solve_initial_state(error_bound = 1e-3)
    state_ind = 0
    if is_print:
        for full_state in possible_states.transpose():
            print('--------------------------------------------------------------------')
            print(f'Estimated state {state_ind} - initial state 1 is \n {full_state.reshape(-1,1)-init_state1}')
            print(f'Estimated state {state_ind} - initial state 2 is \n {full_state.reshape(-1,1)-init_state2}')
            state_ind = state_ind +1

print('------------------- decomposition approach ----------------')
to_test_code1()

The problem have a sparse observability index 8, eigenvalue observability index: 8, attacked sensor count: 5
------------------- decomposition approach ----------------
--------------------------------------------------------------------
Estimated state 0 - initial state 1 is 
 [[ 1.81679266]
 [ 3.18041834]
 [-0.27832965]
 [ 4.29587386]]
Estimated state 0 - initial state 2 is 
 [[-6.23579366e-09]
 [ 1.58674283e-08]
 [ 8.56909832e-09]
 [ 1.82841067e-08]]
--------------------------------------------------------------------
Estimated state 1 - initial state 1 is 
 [[ 1.32261957e-09]
 [-2.85984481e-09]
 [-1.38094203e-09]
 [-6.01899064e-09]]
Estimated state 1 - initial state 2 is 
 [[-1.81679267]
 [-3.18041833]
 [ 0.27832965]
 [-4.29587384]]


In [None]:
print('------------------- decomposition approach ----------------')
to_test_code1()


In [None]:
print('------------------- brute-force approach ----------------')
to_test_code2()


In [None]:
import timeit

execution_time = timeit.timeit(lambda : to_test_code1(is_print=False), number=1000)

print(f'execution time by subspace decomposition: {execution_time}s')
execution_time = timeit.timeit(lambda : to_test_code2(is_print=False), number=1000)

print(f'execution time by brute-force approach:   {execution_time}s')

In [None]:
import numpy as np
from ss_problem import SSProblem, SecureStateReconstruct
from scipy import linalg
import numpy as np

np.random.seed(100)
TS = 0.05
s  = 0
Ac = np.matrix(
                [
                    # [0, 0, 1, 0],
                    # [0, 0, 0, 1],
                    # [0, 0, 0, 0],
                    # [0, 0, 0, 0]
                    [0, 1],  # x xdot
                    [0, 0],
                ])
Bc = np.matrix(
                [
                    # [0, 0],
                    # [0, 0],
                    # [1, 0],
                    # [0, 1]
                    [1],
                    [0],
                ]
            )
Cc = np.matrix(
                [
                    # [1, 0, 0, 0],
                    # [1, 0, 0, 0],
                    # [0, 1, 0, 0],
                    # [0, 1, 0, 0],
                    # [0, 0, 1, 0],
                    # [0, 0, 1, 0],
                    # [0, 0, 0, 1],
                    # [0, 0, 0, 1],
                    [1, 0],  # x1 xdot1 x2 xdot2
                    [0, 1],
                    [1, 0],
                    [0, 1],
                ]
            )

Dc = np.matrix(
                [
                    # [0, 0],
                    # [0, 0],
                    # [0, 0],
                    # [0, 0],
                    # [0, 0],
                    # [0, 0],
                    # [0, 0],
                    # [0, 0]
                    [0],
                    [0],
                    [0],
                    [0],
                ]
            )

dtsys_a, dtsys_b, dtsys_c, dtsys_d = SSProblem.convert_ct_to_dt(Ac,Bc,Cc,Dc,TS)
# define input output data
init_state1 = np.array([[1.],[2.]])
# u_seq = np.array([[1,1],[1,1],[1,1],[0,0]])
# assume sensors 1,3,5 are under attack
sensor_initial_states = [init_state1,init_state1,init_state1,init_state1]
u_seq, tilde_y_his, noise_level = SSProblem.generate_attack_measurement(dtsys_a, dtsys_b, dtsys_c, dtsys_d,sensor_initial_states,
                                                                        s = s,is_noisy = False, noise_level=0.001,u_seq = None)
# u_seq = np.random.normal(3,2.5,size=(2,1))
# tilde_y_his = np.random.normal(10,4,size=(2,4))

noise_level=0.001
ss_problem = SSProblem(dtsys_a, dtsys_b, dtsys_c, dtsys_d, tilde_y_his, 
                        attack_sensor_count=s,input_sequence=u_seq,measurement_noise_level= noise_level )
ssr_solution = SecureStateReconstruct(ss_problem)
possible_initial_states,corresp_sensors, _ = ssr_solution.solve_initial_state(error_bound = 1000)

possible_states,corresp_sensors, _ = ssr_solution.solve(error_bound = 1000)
print(f'u_seq:{u_seq}')
print(f'tilde_y_his: {tilde_y_his}')
print(f'possible_initial states: {possible_initial_states}')
print(f'possible states: {possible_states}')



In [None]:
print('-------------------------------------')
self = ss_problem
print(f'dtsys_a: {ss_problem.A}, \n dtsys_b: {self.B}, \n dtsys_c:{self.C} ')
print(f'output_sequence:{self.tilde_y_his}')
print(f'input_sequence:{self.u_seq}')
print(f'observation matrix: {ssr_solution.obser[:,:,0]}')
print(f'observation matrix: {ssr_solution.obser[:,:,1]}')
print(f'clean output sequence" {ssr_solution.y_his}')
print(f'estimated state: {possible_states}')

print()

In [None]:
import numpy as np
from ss_problem import SSProblem, SecureStateReconstruct
from scipy import linalg
import control as ct

# def to_test_code():
# randomize system to test 
init_state1 = np.array([[1.],[1.],[1.],[1.]])
dtsys_a = np.array( [[ 7.09221296,  4.50145174,  6.45752449,  3.09727833],
 [ 6.9494097,   3.62746914,  3.57307166, -0.82619306],
 [11.04056775,  3.08802872,  3.86302024,  5.47661976],
 [ 5.27031611,  5.20731065,  2.81388349, -4.12382917]])
dtsys_c = np.random.normal(1,1,size=[8,4]) # if given enough sensor redundancy

eig_vals,eig_vecs = linalg.eig(dtsys_a)
I0 = np.eye(dtsys_a.shape[0])
# while(not all(np.isreal(eig_vals))):
#     dtsys_a = np.random.normal(3,3,size=[4,4])
#     eig_vals = linalg.eigvals(dtsys_a)
print(f'eig_vals: {eig_vals}')

eigenspaces = [eig_vecs[:,i:i+1] for i in range(eig_vecs.shape[1])]
r = len(eigenspaces)
# print(eigenspaces)
# print(f'test eigenvector: {linalg.norm((dtsys_a- eig_vals[0]*I0)@eigenspaces[0])}')

projections = []
x_components = []
for j in range(r):
    proj = SecureStateReconstruct.construct_proj(eigenspaces,j)
    xcomponent = proj@init_state1
    x_components.append(xcomponent)
    projections.append(proj)

x_recover  = np.sum(x_components,axis=0)
print(f'x:{init_state1}, x components: {x_components}, recovered: {x_recover}')


In [None]:
for i in range(dtsys_c.shape[0]):
    obsrv_spaces_i = []
    obsrv_projs_i = []
    Ci = dtsys_c[i:i+1,:]
    obsrv_matrix = ct.obsv(dtsys_a,Ci)
    Yi = obsrv_matrix@init_state1
    for j in range(r):
        obsrv_space_ij = obsrv_matrix@eigenspaces[j]
        obsrv_spaces_i.append(obsrv_space_ij)
    
    Y_i_list = []
    for j in range(r):
        proj_obsrv_ij = SecureStateReconstruct.construct_proj(obsrv_spaces_i,j)
        Y_ij = proj_obsrv_ij@Yi
        obsrv_projs_i.append(proj_obsrv_ij)
        Y_i_list.append(Y_ij)
    Y_ij_sum = np.sum(Y_i_list,axis= 0)
    print(f'{i}th sensor, Yi - Y_ij_sum is {linalg.norm(Yi- Y_ij_sum)}')

    # solve for state for subssr problems
    x_recovered_list = []
    for j in range(r):
        Aj = projections[j]@dtsys_a
        observ_matrix = ct.obsv(Aj,Ci)
        # x_j = linalg.pinv(observ_matrix)@Y_i_list[j]
        x_j, residuals = SecureStateReconstruct.solve_lstsq_from_subspace(observ_matrix,Y_i_list[j],eigenspaces[j]) # rank defficient, not accurate
        print(f'residuals: {residuals}')
        x_j = x_j.reshape(-1,1)
        x_recovered_list.append(x_j)

        # print(f'j = {j}, the difference btw x_components[j] and x_j is {linalg.norm(x_components[j]- x_j)}')
        err = linalg.norm(observ_matrix@x_components[j] - Y_i_list[j])
        # print(f'x_j {x_j.T}, x_components[j] {x_components[j].T}')
        # print(f'j = {j}, the error bound is {err}')
    print(f'The sum of recovered states is {np.sum(x_recovered_list,axis=0).T}')


    


In [None]:
import timeit

execution_time = timeit.timeit(to_test_code, number=1000)

print(f'execution time {execution_time}s')

In [None]:
from scipy import linalg
Ac = np.random.normal(3,3,size=[4,4])
eig_vals = linalg.eigvals(Ac)
i = 0
while(not all(np.isreal(eig_vals))):
    Ac = np.random.normal(3,3,size=[4,4])
    eig_vals = linalg.eigvals(Ac)
    i = i+1
    print(i)

print(f'eig_vals: {eig_vals}')

In [None]:
dtsys_a = np.array( [[ 7.09221296,  4.50145174,  6.45752449,  3.09727833],
 [ 6.9494097,   3.62746914,  3.57307166, -0.82619306],
 [11.04056775,  3.08802872,  3.86302024,  5.47661976],
 [ 5.27031611,  5.20731065,  2.81388349, -4.12382917]])

unique_eigvals, generalized_eigenspace_list, am_list, gm_list = SecureStateReconstruct.compute_sys_eigenproperty(dtsys_a)
print(f'gm_list: {gm_list}')
print(f'unique_eigvals:{unique_eigvals}')
print(f'generalized_eigenspace_list:{generalized_eigenspace_list}')

In [None]:
from scipy import linalg
eigval, eigvec = linalg.eig(dtsys_a)


In [None]:
a = np.array(range(24))
a = a.reshape(-1,2)
print(f'a before unvstack: {a}')
io_len = 4
a_prime = SecureStateReconstruct.unvstack(a,io_len)

print(f'a after unvstack: {a_prime[:,:,2]}')

a_colum_list = [a_prime[:,i:i+1] for i in range(a_prime.shape[1])]
a_v = np.vstack(a_colum_list)
print(f'a after unvstack then vstack: {a_v}')


In [None]:
dtsys_a = np.random.normal(3,3,size=[4,4])
b = dtsys_a@np.array([[1],[1],[1],[1]])
b.shape

In [None]:
I = np.eye(dtsys_a.shape[0])
nullspace = linalg.null_space(dtsys_a - eigval[0]*I)
linalg.norm((dtsys_a - eigval[0]*I)@eigvec[:,0])
print(f'nullspace:{nullspace}')

In [None]:
import numpy as np
from mpmath import mp, mpf, matrix, eig

# Set precision (e.g., 50 decimal places)
mp.dps = 50

def compute_high_precision_eigenspaces(matrix_A):
    # Convert matrix to mpmath matrix with higher precision
    mp_matrix = matrix([list(map(mpf, row)) for row in matrix_A])
    
    # Compute eigenvalues and eigenvectors using mpmath
    eigenvalues, eigenvectors = eig(mp_matrix)
    
    eigenspaces = {}
    
    for i, eigenvalue in enumerate(eigenvalues):
        eigenvector = eigenvectors[:, i]
        
        # Normalize the eigenvector
        norm = mp.sqrt(sum(x**2 for x in eigenvector))
        eigenvector = eigenvector / norm
        
        # Convert back to numpy arrays for convenience
        eigenspaces[float(eigenvalue)] = np.array([float(x) for x in eigenvector])
    
    return eigenspaces

# Example matrix
A = np.array([[4, -2],
              [1, 1]], dtype=float)

eigenspaces = compute_high_precision_eigenspaces(A)

for eigenvalue, eigenvector in eigenspaces.items():
    print(f"Eigenvalue: {eigenvalue}")
    print(f"Eigenvector: {eigenvector}\n")


In [None]:
import numpy as np
from scipy import linalg

dtsys_a = np.array( [[ 7.09221296,  4.50145174,  6.45752449,  3.09727833],
 [ 6.9494097,   3.62746914,  3.57307166, -0.82619306],
 [11.04056775,  3.08802872,  3.86302024,  5.47661976],
 [ 5.27031611,  5.20731065,  2.81388349, -4.12382917]])

eigval, eigvec = linalg.eig(dtsys_a)
I = np.eye(dtsys_a.shape[0])

A = dtsys_a - eigval[0]*I
nullspace = linalg.null_space(A,rcond=1e-10)


u, s, vh = linalg.svd(A, full_matrices=True)
M, N = u.shape[0], vh.shape[1]
rcond = np.finfo(s.dtype).eps * max(M, N)
tol = np.amax(s, initial=0.) * rcond
num = np.sum(s > tol, dtype=int)
# num = 2
Q = vh[num:,:].T.conj()
print(f'tol is {tol}, s is {s}, nullspace is {nullspace}, Q is {Q}')

In [None]:
vh

In [None]:
%timeit possible_states,corresp_sensors, _ = ssr_solution.solve(error_bound = 1e-2)


In [None]:
if possible_states is not None:
    for ind in range(corresp_sensors.shape[0]):
        sensors = corresp_sensors[ind,:]
        state = possible_states[:,ind]
        print(f'Identified possible states:{state} for sensors {sensors}')

In [None]:
import timeit

execution_time = timeit.timeit(lambda: ssr_solution.solve(error_bound = 1e-2), number=100)

print(execution_time)


In [None]:
att_dic

In [None]:
print(np.finfo(np.double).precision)

In [None]:
residuals_list = [1,2,3,4]
residual_min = min(residuals_list)
comb_list = residual_min<2*residual_min

In [None]:
residuals_list[0]

In [None]:
import numpy as np
from ss_problem import SSProblem, SecureStateReconstruct
from scipy import linalg
import numpy as np

TS = 0.02
s  = 0
Ac = np.matrix(
                [
                    # [0, 0, 1, 0],
                    # [0, 0, 0, 1],
                    # [0, 0, 0, 0],
                    # [0, 0, 0, 0]
                    [0, 1],  # x xdot
                    [0, 0],
                ])
Bc = np.matrix(
                [
                    # [0, 0],
                    # [0, 0],
                    # [1, 0],
                    # [0, 1]
                    [0],
                    [1],
                ]
            )
Cc = np.matrix(
                [
                    # [1, 0, 0, 0],
                    # [1, 0, 0, 0],
                    # [0, 1, 0, 0],
                    # [0, 1, 0, 0],
                    # [0, 0, 1, 0],
                    # [0, 0, 1, 0],
                    # [0, 0, 0, 1],
                    # [0, 0, 0, 1],
                    [1, 0],  # x1 xdot1 x2 xdot2
                    [0, 1],
                    [1, 0],
                    [0, 1],
                ]
            )

Dc = np.matrix(
                [
                    # [0, 0],
                    # [0, 0],
                    # [0, 0],
                    # [0, 0],
                    # [0, 0],
                    # [0, 0],
                    # [0, 0],
                    # [0, 0]
                    [0],
                    [0],
                    [0],
                    [0],
                ]
            )

dtsys_a, dtsys_b, dtsys_c, dtsys_d = SSProblem.convert_ct_to_dt(Ac,Bc,Cc,Dc,TS)
# define input output data
init_state1 = np.array([[1.],[2.]])
# u_seq = np.array([[1,1],[1,1],[1,1],[0,0]])
# assume sensors 1,3,5 are under attack
sensor_initial_states = [init_state1,init_state1,init_state1,init_state1]
# u_seq, tilde_y_his, noise_level = SSProblem.generate_attack_measurement(dtsys_a, dtsys_b, dtsys_c, dtsys_d,sensor_initial_states,
#                                                                         s = s,is_noisy = False, noise_level=0.001,u_seq = None)
u_seq = np.random.normal(3,2.5,size=(2,1))
tilde_y_his = np.random.normal(10,4,size=(2,4))

noise_level=0.001
ss_problem = SSProblem(dtsys_a, dtsys_b, dtsys_c, dtsys_d, tilde_y_his, 
                        attack_sensor_count=s,input_sequence=u_seq,measurement_noise_level= noise_level )
ssr_solution = SecureStateReconstruct(ss_problem)
possible_states,corresp_sensors, _ = ssr_solution.solve(error_bound = 100)
print(f'u_seq:{u_seq}')
print(f'tilde_y_his: {tilde_y_his}')
print(f'possible_states: {possible_states}')




In [None]:
import numpy as np
a = np.zeros((2,3))
total_state = np.zeros(shape = a.shape)
total_state