# **Adversarial Learning Attacks on Autoencoder-based Models for Nonlinear Dynamical Systems**

Marco Ledda, *Student Member, IEEE*

Alessandro Giua, *Fellow, IEEE*  

Mauro Franceschelli, *Senior Member, IEEE*

**Abstract:**
The use of learning-based methods in sys-
tems and control is gaining significant interest from the
research community. These methods are flexible and pow-
erful when input/output data is abundant. Despite their
potential, the reliability of these estimation methods can
raise concerns, especially in adversarial settings. Unlike
in the fields of computer vision and image recognition,
which have widely explored adversarial vulnerabilities, ad-
versarial learning in the context of control systems re-
mains an open research topic, largely unexplored. This
paper presents novel adversarial attack techniques for
autoencoder-based models used for system identification.
We propose two scenarios of attack in which we generate
a control input designed to maximize the output error of
the autoencoder, thereby degrading system performance.
Through numerical experiments on benchmark systems,
we provide a counterexample demonstrating the impact
of the proposed attacks. The findings highlight the impor-
tance of developing formal guarantees to enhance the relia-
bility and security of learning-based control systems. Also,
the methods can be used to certify unreliability of models
of the identified systems for certain input sequences.

## Setup

In [None]:
!pip install tensorflow==2.9.1
!pip install keras==2.9.0

In [None]:
# Importazioni dei moduli
import sys
import time
import random as rn
import numpy as np
from numpy import sqrt, cosh, tanh
from enum import Enum, unique
import tensorflow as tf
import keras
from keras import backend as K
from keras.layers import Input, Dense
from tensorflow.keras import layers, losses
from keras.models import Model
from keras.regularizers import Regularizer, l1, l2
from keras.constraints import *
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint
from keras.layers import Lambda
from scipy import optimize
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import warnings
from functools import partial
from enum import Enum, unique
import scipy.io.matlab
from scipy import optimize
from scipy.integrate import solve_ivp

In [None]:
print(tf.__version__)
print(keras.__version__)

2.9.1
2.9.0


## Dummy Model

In [None]:
class DummyModel:
    def __init__(self,nonLinearInputChar=False, useExternalDataset=False):
        print("***Warning:dummyModel***")

    def stateMap(self, xk, u):
        print("***Warning:dummyModel***")
        return 0

    def outputMap(self, xk):
        print("***Warning:dummyModel***")
        return 0

    def systemDynamics(self, dim, flag=True, U=-1):
        print("***Warning:dummyModel***")
        return 0, 0

    def loop(self, x_k, duk):
        print("***Warning:dummyModel***")
        return 0, 0

    def setU(self, U, U_val):
        print("***Warning:dummyModel***")

    def prepareDataset(self, sizeT, sizeV):
        print("***Warning:dummyModel***")
        return 0, 0, 0, 0

## L21 Regularizer

In [None]:
class l21(Regularizer):
    """
    Regularizer for L21 regularization.
    Arguments:
        C: Float; L21 regularization factor.
    """

    def __init__(self, C=0.0, a=0, b=0, bias=0.000):
        self.a = a
        self.b = b
        C = K.cast_to_floatx(C)
        self.C = (bias + C) * np.square(np.concatenate([0 +
                                                        a -
                                                        np.array(range(0,a)),
                                                        0 +
                                                        b-np.array(range(0,b))
                                                        ]))
        print("****Squared weigthing enabled****")
        print(self.C)


    def __call__(self, x):
        print(x)
        w = K.sum(K.abs(x), 1)
        print(str(w))
        w = w[0 : self.a + self.b];
        print(w * self.C)
        return K.sum(w * self.C)


    def get_config(self):
        return {'C': float(self.l1)}


## Two Tanks Model

In [None]:
class TwoTanks:
    def __init__(self, nonLinearInputChar=False):
        self.nonLinearInputChar = nonLinearInputChar
        self.stateSize = 2
        self.input_size = 1
        self.inputStateLinearity = True
        self.outputLinearity = True
        self.exponent = 0.5

    def stateMap(self, x, u):
        if self.nonLinearInputChar:
            x1 = x[0] - .5 * sqrt(x[0]) + .4 * np.sign(u[0]) * (u[0])**2
        else:
            x1 = x[0] - .5 * sqrt(x[0]) + .4 * u[0];

        x2 = x[1] + .2 * sqrt(x[0]) - .3 * sqrt(x[1]);
        x = np.zeros((self.stateSize, 1))

        if  x1 > 0:
            x[0] = x1
        else:
            x[0] = 0

        if x2 > 0:
            x[1] = x2
        else:
            x[1] = 0

        return x

    def outputMap(self, xk):
        return np.array(xk[1])

    def systemDynamics(self, dim, flag=True):
        x_k = np.ones((self.stateSize, 1))
        y_n = np.zeros((dim, 1))
        u_n = np.zeros((dim, self.input_size))
        noise = np.random.normal(1, 1.0, size=(self.input_size, dim))

        if  flag:
            print('a')
        else:
            print('b')

        u = np.array([0.0])

        for i in range(0, dim):
            if i % 10000 == 0:
                print('.',  end='')

            u[0] = noise[0][int(i/5)]
            y_n[i] = self.outputMap(x_k)*1
            x_k = self.stateMap(x_k, np.reshape(u, (self.input_size, 1))) * 1
            u_n[i] = u

        return y_n, u_n


    def loop(self, x_k, duk):
        y_n = np.array([])

        for i in range(0, len(duk)):
            u = np.reshape(np.array(duk[i]),(1, 1))
            u = u * self.stdU + self.meanU
            temp = self.outputMap(x_k)
            temp = (temp - self.meanY) / self.stdY
            y_n = np.append(y_n, temp)
            x_k = self.stateMap(x_k, u)

        return y_n*1, x_k*1


    def prepareDataset(self, sizeT, sizeV):
        y_n, u_n = self.systemDynamics(sizeT, True)
        y_Vn, u_Vn = self.systemDynamics(sizeV, True)
        self.meanY = np.mean(y_n)
        self.meanU = np.mean(u_n)
        self.stdY = np.std(y_n)
        self.stdU = np.std(u_n)
        y_n = (y_n-self.meanY) / self.stdY + np.random.normal(0, 0.02, (sizeT, 1))
        y_Vn = (y_Vn - self.meanY) / self.stdY + np.random.normal(0, 0.02, (sizeV, 1))
        u_n=(u_n-self.meanU)/self.stdU+np.random.normal(0,0.02,(sizeT,1))
        u_Vn = (u_Vn - self.meanU) / self.stdU + np.random.normal(0, 0.02, (sizeV, 1))
        print(y_n.shape)
        print(u_n.shape)
        print(y_Vn.shape)
        print(u_Vn.shape)
        return np.reshape(u_n, (sizeT, 1)), np.reshape(y_n, (sizeT, 1)),\
                    np.reshape(u_Vn, (sizeV, 1)), np.reshape(y_Vn, (sizeV, 1))



## Hammerstein Weiner Model

In [None]:
# It's actually the hammerstein-wiener! The class is called "LinearSystem" as the the I/O non linearities are a later addition.
class LinearSystem:
    def __init__(self,nonLinearInputChar=True,satValue=1000,sigma=0.02):
        self.nonLinearInputChar=nonLinearInputChar
        self.U=-1;
        self.sigma=sigma;
        self.satValue=satValue
        self.flag=True
        self.stateSize=5
        self.input_size=1
        self.A=np.array([[0.7555,   -0.1991,   0.00000 ,  0.00000  ,0*-0.00909],
                           [0.25000,   0.00000,   0.00000,   0.00000,   0*0.03290],
                           [0.00000,  -0.00000,   0.00000,   0.00000,   0*0.29013],
                           [0.00000,   0.00000,   .00000,   0.00000,  0*-1.05376],
                           [0.00000,   0.00000,   0.00000,   .00000,   0*1.69967]]).T
#        self.B=-np.array([[0.71985,   0.57661,  1.68733,  2.14341,   1.00000]]).T
        self.B=np.array([[-0.5,   0.,  0, 0,   0]]).T
        self.C=np.array([[  0.6993 ,  -0.4427,  0,   0,   0 ,]])
#

    def stateMap(self,xk,u):

#        if sum(abs(u-np.clip(u,-1000,1000)))>0.01:
#            print('.',end='')
#            pass

        u=np.clip(u,-self.satValue,self.satValue)

#        if u<-.6 and self.nonLinearInputChar:
#            u=u*1.2
        if u>0 and self.nonLinearInputChar:
            u=np.sqrt(u)
            print('.',end='')
#        print(u)

        xk=np.dot(self.A,xk)+np.dot(self.B,u)
        return xk


    def outputMap(self,xk):
        #print(np.dot(self.C,xk))
        y=np.dot(self.C,xk)
        return y+5*np.sin(y)*int(self.nonLinearInputChar)

    def systemDynamics(self,dim, flag=True,U=-1):
        x_k=np.ones((self.stateSize,1))

        y_n=np.zeros((dim,1))
        u_n=np.zeros((dim,self.input_size))
        #u_n=0
        noise=np.random.normal(0,1,size=(self.input_size,dim))
        if  flag:
            print('a')
        else:
            print('b')
        u=np.array([ 0.0])

        for i in range(0,dim):
            if i%10000==0:
                print('.',  end='')
            u[0]=noise[0][int(i/7)]
            uSat=np.clip(u[0],-self.satValue,self.satValue)

            y_n[i]=self.outputMap(x_k)*1
            x_k=self.stateMap(x_k,np.reshape(uSat,(self.input_size,1))) *1
            #u+=np.random.normal(0,0.05,size=(self.input_size))
            u_n[i]=u[0]
#            u_n[i]=uSat

        return y_n,u_n


    def loop(self,x_k,duk):
        y_n=np.array([])
        x_k=np.reshape(x_k,(self.stateSize,1))
        for i in range(0,len(duk)):
            u=np.array(duk[i])+np.random.normal(0,self.sigma,(1,1))*.0
            u=u*self.stdU+self.meanU
#            print(u)
            temp=self.outputMap(x_k);
#            print(temp)
            temp=(temp-self.meanY)/self.stdY
            y_n=np.append(y_n,temp)+np.random.normal(0,self.sigma,(1,1))*.0

            #print(temp)
            x_k=self.stateMap(x_k,u)*1

        return y_n[0]*1,x_k


    def prepareDataset(self,sizeT,sizeV):
        y_n,u_n=self.systemDynamics(sizeT,True)
        y_Vn,u_Vn=self.systemDynamics(sizeV,True)
        self.meanY=np.mean(y_n)
        self.meanU=np.mean(u_n)
        self.stdY=np.std(y_n)
        self.stdU=np.std(u_n)
        y_n=(y_n-self.meanY)/self.stdY+np.random.normal(0,self.sigma,(sizeT,1))
        y_Vn=(y_Vn-self.meanY)/self.stdY+np.random.normal(0,self.sigma,(sizeV,1))
        u_n=(u_n-self.meanU)/self.stdU+np.random.normal(0,self.sigma,(sizeT,1))
        u_Vn=(u_Vn-self.meanU)/self.stdU+np.random.normal(0,self.sigma,(sizeV,1))
        print(y_n.shape)
        print(u_n.shape)
        print(y_Vn.shape)
        print(u_Vn.shape)
        return np.reshape(u_n,(sizeT,1)),np.reshape(y_n,(sizeT,1)),\
                    np.reshape(u_Vn,(sizeV,1)),np.reshape(y_Vn,(sizeV,1))

## ANN Model


In [None]:
class datasetLoadUtility:
    def __init__(self):
        pass

    def loadDatasetFromMATfile(self, filename='-1'):
        dataset = scipy.io.loadmat(filename)
        U, Y, U_val, Y_val= [dataset.get(x) for x in ['U', 'Y', 'U_val', 'Y_val']]
        return U, Y, U_val, Y_val

    def loadFieldFromMATFile(selfself, filename, fields):
        dataset = scipy.io.loadmat(filename)
        return [dataset.get(x) for x in fields]

# AutoEncoder Class
class AdvAutoencoder:
    batch_size = 24
    stateSize = -1
    nonlinearity = ''
    n_neurons = -1
    validation_split = -1
    n_a = -1
    n_b = -1
    freeloading = 10


    def __init__(self, nonlinearity='relu', n_neurons=30, n_layer=3,
                 fitHorizon=5, useGroupLasso=True, stateReduction=False,
                 validation_split=0.05, stateSize=-1, strideLen=10,
                 outputWindowLen=2, affineStruct=True,
                 regularizerWeight=0.0005):
        self.nonlinearity = nonlinearity
        self.outputWindowLen = outputWindowLen
        self.stateSize = stateSize
        self.n_neurons = n_neurons
        self.stateReduction = stateReduction
        self.validation_split = validation_split
        self.strideLen = strideLen
        self.n_layer = n_layer
        self.model = None
        self.affineStruct = affineStruct
        self.MaxRange = fitHorizon
        self.regularizerWeight = regularizerWeight
        self.kernel_regularizer = l2(self.regularizerWeight)
        self.useGroupLasso = useGroupLasso
        self.shuffledIndexes = None
        self.constraintOnInputHiddenLayer = None

        if useGroupLasso and regularizerWeight > 0.0:
            if stateReduction:
                self.inputLayerRegularizer = l21(self.regularizerWeight, a=0,
                                                 b=stateSize)
            else:
                self.inputLayerRegularizer = l21(self.regularizerWeight,
                                                 a=strideLen, b=strideLen)
            self.kernel_regularizer = l2(0.0001)
            print("=> if GroupLasso is used,  l2 regularizer is set to 0.0001 in all bridge, encoder, and decoder")
            # print("=> if GroupLasso is used,  l2 regularizer is set to the same weigth in all bridge, encoder, and decoder")
        else:
            self.inputLayerRegularizer = self.kernel_regularizer


    def mean_pred(y_true, y_pred):
        return keras.losses.mae(y_pred * 0, y_pred)
        #return keras.losses.mae(y_true[:5], y_pred)

    def setDataset(self, U, Y, U_val, Y_val):
        if U is not None:
            self.U = U.copy()
            self.N_U = U.shape[1]

        if Y is not None:
            self.Y = Y.copy()
            self.N_Y = Y.shape[1]

        if U_val is not None:
            self.U_val = U_val.copy()

        if Y_val is not None:
            self.Y_val = Y_val.copy()

    def encoderNetwork(self, future=0):
        inputs_U = Input(shape=((self.strideLen) * self.N_U,))
        inputs_Y = Input(shape=((self.strideLen) * self.N_Y,))
        inputConcat = keras.layers.concatenate([inputs_Y,inputs_U],
                                               name='concatIk')
        iKR = self.kernel_regularizer

        if self.useGroupLasso and (not self.stateReduction):
            iKR = self.inputLayerRegularizer

        x = Dense(kernel_regularizer = iKR,
                  kernel_constraint = self.constraintOnInputHiddenLayer,
                  units = self.n_neurons, activation = self.nonlinearity,
                  name = 'enc0' + str(future))(inputConcat)

        for i in range(0, self.n_layer-1):
            x = Dense(use_bias = True,
                      kernel_regularizer = self.kernel_regularizer,
                      units = self.n_neurons,
                      activation = self.nonlinearity,
                      name = 'enc' + str(i+1) + str(future))(x)

        # x is the output of the Encoder Network
        x = Dense(kernel_regularizer = self.kernel_regularizer,
                  units = self.stateSize, activation = "linear",
                  name = 'encf' + str(future))(x)

        ann = Model(inputs=[inputs_Y, inputs_U], outputs=[x])

        return ann

    def decoderNetwork(self, future=0):
        inputs_state = Input(shape=(self.stateSize,))
        iKR = self.kernel_regularizer

        if self.useGroupLasso and self.stateReduction:
            iKR = self.inputLayerRegularizer

        x = Dense(kernel_regularizer = iKR,
                  kernel_constraint = self.constraintOnInputHiddenLayer,
                  units = self.n_neurons, activation = self.nonlinearity,
                  name = 'dec0' + str(future))(inputs_state)

        for i in range(0, self.n_layer-1):
            x = Dense(use_bias=True,
                      kernel_regularizer=self.kernel_regularizer,
                      units=self.n_neurons, activation=self.nonlinearity,
                      name='dec'+str(i+1)+str(future))(x)

        if self.affineStruct:
            x = Dense(units = self.outputWindowLen*self.stateSize,
                     activation="linear",name='decf'+str(future))(x)
            x = keras.layers.Reshape((self.outputWindowLen,self.stateSize))(x)
            out = keras.layers.dot([x,inputs_state],axes=-1)
        else:
            out = Dense(kernel_regularizer = self.kernel_regularizer,
                        units = self.outputWindowLen*self.N_Y,
                        activation="linear", name='decf'+str(future))(x)
            x = out

        ann = Model(inputs=[inputs_state], outputs=[out, x])
        return ann

    def bridgeNetwork(self, future=0):
        inputs_state = Input(shape=(self.stateSize,), name='inputState')

        # inputs_novelU to attack
        inputs_novelU = Input(shape=(self.N_U,), name='novelInput')

        inputConcat = keras.layers.concatenate([inputs_state, inputs_novelU])
        iKR = self.kernel_regularizer

        if self.useGroupLasso and self.stateReduction:
            iKR = self.inputLayerRegularizer
            print('Using group lasso on state')

        x = Dense(kernel_regularizer=iKR,
                kernel_constraint=self.constraintOnInputHiddenLayer,
                units=self.n_neurons, activation=self.nonlinearity,
                name='bridge0' + str(future))(inputConcat)

        for i in range(0,self.n_layer-1):
            x = Dense(
                    use_bias=True,
                    kernel_regularizer=self.kernel_regularizer,
                    units=self.n_neurons, activation=self.nonlinearity,
                    name='bridge'+str(i+1) + str(future))(x)

        bias = Dense(
                    kernel_regularizer=self.kernel_regularizer,
                    units=self.stateSize, activation="linear",
                    name='bridgeBias' + str(future))(x)

        if self.affineStruct:
            ABunshape = Dense(units=self.stateSize*(self.stateSize+self.N_U),
                      activation="linear", name='bridgef' + str(future))(x)
            AB = keras.layers.Reshape((self.stateSize,self.stateSize+self.N_U))(ABunshape)
            out = keras.layers.dot([AB,inputConcat],axes=-1)
            out = keras.layers.add([bias,out])
            ann = Model(inputs=[inputs_novelU,inputs_state], outputs=[out,AB,bias])
        else:
            out = bias
            ann = Model(inputs=[inputs_novelU,inputs_state], outputs=[out,x,bias])

        return ann

    def ANNModel(self):
        inputs_Y = Input(shape=((self.strideLen+self.MaxRange) * self.N_Y,),
                         name="input_y")
        inputs_U = Input(shape=((self.strideLen + self.MaxRange) * self.N_U,),
                         name="input_u")

        strideLen = self.strideLen
        bridgeNetwork = self.bridgeNetwork()  # Bottleneck layer
        convEncoder = self.encoderNetwork()   # Encoder
        outputEncoder = self.decoderNetwork() # Decoder
        predictionErrorCollection = []
        forwardErrorCollection = []
        forwardedPredictedErrorCollection = []
        predictedOKCollection = []          # O_k
        stateKCollection = []               # x_k
        MaxRange = self.MaxRange
        forwardedState = None

        for k in range(0, MaxRange):
            IYk = keras.layers.Lambda(lambda x:x[:, k : strideLen + k])(inputs_Y)
            IUk = keras.layers.Lambda(lambda x:x[:, k : strideLen + k])(inputs_U)
            ITargetk = keras.layers.Lambda(lambda x:x[:, strideLen + k - self.outputWindowLen + 1 : strideLen + k + 1])(inputs_Y)
            novelIUk = keras.layers.Lambda(lambda x:x[:, strideLen + k : strideLen + k + 1])(inputs_U)
            #print(ITargetk, 'dd')
            #print(novelIUk)
            # Add founded xk and Ok to corresponding collections
            stateK = convEncoder([IYk, IUk])
            predictedOK = outputEncoder(stateK)[0]
            predictedOKCollection += [predictedOK]
            stateKCollection+=[stateK]
            predictionErrork=keras.layers.subtract([predictedOK,ITargetk],name='oneStepDecoderError'+str(k))
            predictionErrork=keras.layers.Lambda(lambda x:K.abs(x))(predictionErrork)
            predictionErrorCollection+=[predictionErrork]
            if not(forwardedState is None):
                forwardedStateN = [bridgeNetwork([novelIUk, stateK])[0]] # x_k+1

                for thisF in forwardedState:
                    forwardErrork = keras.layers.subtract([stateK, thisF])
                    forwardErrork = keras.layers.Lambda(lambda x:K.abs(x))(forwardErrork)
                    forwardErrorCollection += [forwardErrork]
                    forwardedPredictedOutputK = outputEncoder(thisF)[0] # y_k+1
                    forwardedPredictedErrork = keras.layers.subtract([forwardedPredictedOutputK,ITargetk])
                    forwardedPredictedErrorCollection += [forwardedPredictedErrork]
                    forwardedStateN += [bridgeNetwork([novelIUk,thisF])[0]]

                forwardedState = forwardedStateN
            else:
                forwardedState = [bridgeNetwork([novelIUk, stateK])[0]]

        oneStepAheadPredictionError = keras.layers.concatenate(predictionErrorCollection, name='oneStepDecoderError')

        if len(forwardedPredictedErrorCollection) > 1:
            forwardedPredictedError = keras.layers.concatenate(forwardedPredictedErrorCollection,name='multiStep_decodeError')
        else:
            forwardedPredictedError = keras.layers.Lambda(lambda x:K.abs(x), name='multiStep_decodeError')(forwardedPredictedErrorCollection[0])

        if len(forwardErrorCollection) > 1:
            forwardError = keras.layers.concatenate(forwardErrorCollection, name='forwardError')
        else:
            forwardError = keras.layers.Lambda(lambda x:K.abs(x), name='forwardError')(forwardErrorCollection[0])

        ann = Model(inputs=[inputs_Y,inputs_U], outputs=[predictedOKCollection[0], stateKCollection[0], oneStepAheadPredictionError, forwardedPredictedError, forwardError])
        return ann, convEncoder, outputEncoder, bridgeNetwork


    def computeGradients(self, train_stateVector=None, train_inputVector=None, index=0):
        if (train_inputVector is None) or (train_stateVector is None):
            train_inputVector, train_outputVector = self.prepareDataset(self.U, self.Y)

        self.sess = K.get_session()
        gr = self.sess.run(self.gradientState, feed_dict={self.model.input[0]: train_stateVector,self.model.input[1]: train_inputVector})
        return gr


    def prepareDataset(self, U=None, Y=None):
        if U is None: U = self.U
        if Y is None: Y = self.Y
        pad = self.MaxRange - 2
        strideLen = self.strideLen + pad
        print(strideLen)
        lenDS = U.shape[0]
        inputVector = np.zeros((lenDS-2, self.N_U * (strideLen + 2)))
        outputVector = np.zeros((lenDS-2, self.N_Y * (strideLen + 2)))
        offset = self.strideLen + 1 + pad

        for i in range(offset, lenDS):
            regressor_StateInputs = np.ravel(U[i-strideLen-1 : i+1])
            regressor_StateOutputs = np.ravel(Y[i-strideLen-1 : i+1])

            inputVector[i-offset] = regressor_StateInputs.copy()
            outputVector[i-offset] = regressor_StateOutputs.copy()

        return inputVector[:i-offset+1].copy(), outputVector[:i-offset+1].copy()


    def trainModel(self, shuffled:bool =True):
        tmp = self.privateTrainModel(shuffled, None, kFPE=.0, kAEPrediction=10, kForward=.3) # gamma, alpha, beta
        tmp = self.privateTrainModel(shuffled, tmp, 1, 0., 10) # gamma, alpha, beta


    def privateTrainModel(self, shuffled:bool =True, tmp=None, kFPE=1, kAEPrediction=1, kForward=1):
        inputVector, outputVector = self.prepareDataset()
        optimizer = keras.optimizers.Adam(learning_rate=0.002, amsgrad=True, clipvalue=0.5)

        if not (tmp is None):
            model = self.model
            convEncoder=self.convEncoder
            outputEncoder=self.outputEncoder
            bridgeNetwork=self.bridgeNetwork
            model.set_weights(tmp)
        else:
            model, convEncoder, outputEncoder, bridgeNetwork = self.ANNModel()

        if self.shuffledIndexes is None:
            self.shuffledIndexes=np.random.permutation(list(range(0,np.shape(outputVector)[0])))

        shuffledIndexes=self.shuffledIndexes

        model.compile(optimizer=optimizer,
                      loss_weights={'multiStep_decodeError':kFPE,
                                    'oneStepDecoderError':kAEPrediction,
                                    'forwardError':kForward},
                      loss={"multiStep_decodeError": AdvAutoencoder.mean_pred,
                            "oneStepDecoderError": AdvAutoencoder.mean_pred,
                            'forwardError': AdvAutoencoder.mean_pred},)

        model.fit({'input_y':outputVector[shuffledIndexes,:], 'input_u':inputVector[shuffledIndexes,:]},
                  {'multiStep_decodeError':outputVector[:,0:5]*0,'oneStepDecoderError':outputVector[:,0:5]*0,
                   'forwardError':outputVector[:,0:5]*0
                   },
                  epochs=150,
                  verbose=1,
                  validation_split=self.validation_split, shuffle=shuffled,
                  batch_size=self.batch_size,
                  callbacks=[ReduceLROnPlateau(factor=0.3, min_delta=0.001, patience=3),
                             EarlyStopping(patience=8, min_delta=0.001, monitor='val_loss'),
                             ], )
        self.model = model
        self.convEncoder = convEncoder
        self.outputEncoder = outputEncoder
        self.bridgeNetwork = bridgeNetwork
        tmp = model.get_weights()
        return tmp


    def getModel(self):
        return self.model


    def evaluateNetwork(self, U_val, Y_val):
        train_stateVector, train_inputVector, train_outputVector = self.prepareDataset(U_val,Y_val)
        t = time.time()
        fitted_Y = self.model.predict([train_stateVector, train_inputVector])
        elapsed = time.time() - t

        return fitted_Y, train_outputVector, elapsed


    def validateModel(self, plot:bool =True):
        fitted_Y, train_outputVector, elapsed = self.evaluateNetwork(self.U_val, self.Y_val)
        fitted_Y = fitted_Y[0]
        if plot:
            plt.figure(figsize=(7, 7))
            plt.plot(fitted_Y)
            plt.plot(train_outputVector)
            plt.show()
        return fitted_Y, train_outputVector, elapsed

# Main


## Training

In [None]:
# Configurazione di matplotlib
plt.rcParams["figure.figsize"] = [8, 6]
plt.rcParams["lines.linewidth"] = 2
plt.rcParams["figure.dpi"] = 100
plt.rcParams["font.size"] = 14
#plt.rcParams["text.usetex"] = True

# Inizializzazione del seed
np.random.seed(1)

# Definizione delle classi e delle funzioni
from enum import Enum

class systemSelectorEnum(Enum):
    @staticmethod
    def loadFromDataset(filename, nonLinearInputChar=False):
        dynamicModel = DummyModel()
        dsLoading = datasetLoadUtility()
        Uvero, Yvero, UV, YV = dsLoading.loadDatasetFromMATfile(filename)
        numel = Uvero.shape[0]
        numelV = UV.shape[0]
        u_n = np.reshape(Uvero.T[0], (numel, 1))
        y_n = np.reshape(Yvero.T[0], (numel, 1))
        u_Vn = np.reshape(UV.T[0], (numelV, 1))
        y_Vn = np.reshape(YV.T[0], (numelV, 1))
        meanY = np.mean(y_n)
        meanU = np.mean(u_n)
        stdY = np.std(y_n)
        stdU = np.std(u_n)
        y_n = (y_n - meanY) / stdY
        y_Vn = (y_Vn - meanY) / stdY
        u_n = (u_n - meanU) / stdU
        u_Vn = (u_Vn - meanU) / stdU
        return dynamicModel, u_n, y_n, u_Vn, y_Vn

    @staticmethod
    def MAGNETOdataset():
        return systemSelectorEnum.loadFromDataset('Magneto.mat')

    @staticmethod
    def INVERSEPENDULUMdataset():
        return systemSelectorEnum.loadFromDataset('InversePendulum.mat')

    @staticmethod
    def INVERSEPENDULUM(nonLinearInputChar=False):
        dynamicModel = InvertedPendulum()
        U, Y, U_val, Y_val = dynamicModel.prepareDataset(20000, 1000)
        return dynamicModel, U, Y, U_val, Y_val

    @staticmethod
    def TANKSdataset():
        return systemSelectorEnum.loadFromDataset('TwoTanksMatlab.mat')

    @staticmethod
    def SILVERBOXdataset():
        return systemSelectorEnum.loadFromDataset('Silverbox.mat')

    @staticmethod
    def TWOTANKS(nonLinearInputChar=False):
        dynamicModel = TwoTanks(nonLinearInputChar=Option.nonLinearInputChar)
        U, Y, U_val, Y_val = dynamicModel.prepareDataset(20000, 1000)
        return dynamicModel, U, Y, U_val, Y_val

    @staticmethod
    def BILINEAR(nonLinearInputChar=False):  # It's actually the hammerstein-wiener! But the old name stuck
        dynamicModel = LinearSystem(nonLinearInputChar=Option.nonLinearInputChar)
        U, Y, U_val, Y_val = dynamicModel.prepareDataset(10000, 1000)
        return dynamicModel, U, Y, U_val, Y_val


class Options:
    def __init__(self):
        self.nonLinearInputChar = True
        self.dynamicalSystemSelector = systemSelectorEnum.TWOTANKS
        # Estrae il nome del selettore di sistema dinamico per rappresentazioni sotto forma di stringa
        self.stringDynamicalSystemSelector = str(self.dynamicalSystemSelector).replace('<function systemSelectorEnum.', '').split(' at ')[0]
        self.affineStruct = True
        self.openLoopStartingPoint = 15
        self.horizon = 5  # per Model Predictive Control (MPC)
        self.TRsteps = 1
        self.fitHorizon = 5  # 5 o 2, è +1 rispetto al paper
        self.n_a = 10  # n_a = n_b
        self.useGroupLasso = False
        self.stateReduction = True
        self.regularizerWeight = 0.0001
        self.closedLoopSim = True
        self.enablePlot = True
        self.stateSize = 6


def prepareMatrices(uSequence, x0):
    logY = []
    logX = []
    uSequence = np.array(uSequence)

    for u in uSequence:
        u = np.reshape(u, (1, 1))
        x0 = model.bridgeNetwork.predict([u, x0])
        y = model.outputEncoder.predict([x0[0]])
        logY.append(y)
        logX.append(x0)
        x0 = x0[0]

    return logX, logY


def costFunction(uSequence, r, um1, logAB, logC, x0):
    logY = []
    uSequence = np.array(uSequence)
    um1 = np.array(um1)
    i = 0

    for u in uSequence:
        asda = np.concatenate([x0.ravel(), u.ravel()])
        asda = np.reshape(asda, (Option.stateSize + 1, 1))
        x0 = np.dot(logAB[i][1], asda) + np.reshape(logAB[i][2], (Option.stateSize, 1))
        y = np.dot(logC[i][1], x0)
        logY.append(y[0][-1])
        i += 1

    logY = np.array(logY)
    cost = 0.001 * np.sum(np.square(uSequence)) + \
           0.01 * np.sum(np.square(uSequence[1:] - uSequence[:-1])) + \
           0.01 * np.sum(np.square(uSequence[0] - um1)) + \
           np.sum(np.square(logY - r))

    return cost

def evaluateFeatureImportance():
    if not Option.stateReduction:
        w = model.convEncoder.get_layer('enc00').get_weights()
        ax = plt.figure(figsize=[8, 2]).gca()
        ax.xaxis.set_major_locator(MaxNLocator(integer=True))
        neuronsCount = np.sum(np.abs(w[0]) > 1e-3, axis=1)
        windowsLen = int(len(neuronsCount) / 2)
        yAxis = list(range(0, windowsLen))[::-1]
        print(neuronsCount, 'encoder=>')
        plt.title('encoder')
        plt.step(yAxis, neuronsCount[:windowsLen], where='mid')
        plt.step(yAxis, neuronsCount[windowsLen:], where='mid')
        plt.tight_layout()
    else:
        w1 = model.bridgeNetwork.get_layer('bridge00').get_weights()
        w = model.outputEncoder.get_layer('dec00').get_weights()
        neuronsCount = np.sum(np.abs(w1[0][:-1]) > 1e-3, axis=1)
        yAxis = list(range(0, len(neuronsCount)))
        print(neuronsCount, 'bridge=>')
        plt.figure(figsize=[8, 2])
        plt.title('bridge')
        plt.step(yAxis, neuronsCount, where='mid')
        plt.tight_layout()
        neuronsCount = np.sum(np.abs(w[0]) > 1e-3, axis=1)
        print(neuronsCount, 'decoder=>')
        yAxis = list(range(0, len(neuronsCount)))
        ax = plt.figure(figsize=[8, 2]).gca()
        ax.xaxis.set_major_locator(MaxNLocator(integer=True))
        plt.title('decoder')
        plt.step(yAxis, neuronsCount, where='mid')
        plt.tight_layout()

# These functions are used to generate plots for the paper
def prettyPrintStatsUseNA(aOutput, aInput):
    aOutput = np.array(aOutput)
    aInput = np.array(aInput)
    xAxis = range(0,aInput.shape[1])[::-1]
    ax = plt.figure(figsize=[8,2]).gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    #Notice: they are inverted with respect to the output of the feature importance function
    lineOutput ,= plt.step(xAxis,aOutput.T,where='mid')
    lineInput ,= plt.step(xAxis,aInput.T,where='mid')
    plt.legend([lineOutput,lineInput],['y_k', 'u_k'])
    plt.grid()
    plt.xlabel('time-step~delay')
    plt.tight_layout()


def prettyPrintStatsUseNX(ADecoder, aBridge):
    aBridge = np.array(aBridge)
    ADecoder = np.array(ADecoder)
    xAxis = range(1,ADecoder.shape[1]+1)[::-1]
    ax = plt.figure(figsize=[8,2]).gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    lineBridge ,= plt.step(xAxis, aBridge.T, where='mid')
    lineDecoder ,= plt.step(xAxis, ADecoder.T, where='mid')
    plt.legend([lineBridge,lineDecoder],['bridge','decoder'])
    plt.grid()
    plt.xlabel('state~component')
    plt.tight_layout()

warnings.filterwarnings("ignore")


# Parsing dei parametri e inizializzazione
Option = Options()

Option.fitHorizon = 5
Option.dynamicalSystemSelector = systemSelectorEnum.TWOTANKS
Option.nonLinearInputChar = True
Option.stateSize = 6
Option.n_a = 10
Option.affineStruct = False
Option.useGroupLasso = False
Option.regularizerWeight = 0.0001

# Generazione del sistema dinamico e apprendimento del modello
simulatedSystem, U_n, Y_n, U_Vn, Y_Vn = Option.dynamicalSystemSelector()

model = AdvAutoencoder(affineStruct=Option.affineStruct,
                      useGroupLasso=Option.useGroupLasso,
                      stateReduction=Option.stateReduction,
                      fitHorizon=Option.fitHorizon,
                      strideLen=Option.n_a, outputWindowLen=2, n_layer=3,
                      n_neurons=30,
                      regularizerWeight=Option.regularizerWeight,
                      stateSize=Option.stateSize)

model.setDataset(U_n.copy(), Y_n.copy(), U_Vn.copy(), Y_Vn.copy())

inputU, inputY = model.prepareDataset()

# Train the model
model.trainModel()
predictedLeft, stateLeft, oneStepAheadPredictionError, forwardedPredictedError, forwardError = model.model.predict([inputY, inputU])

## Clean Validation

In [None]:
def openLoopValidation(validationOnMultiHarmonic=True, reset=-1, YTrue=None, U_Vn=None):
    openLoopStartingPoint = Option.openLoopStartingPoint
    pastY = np.zeros((model.strideLen,1))
    pastU = np.zeros((model.strideLen,1))

    if YTrue is None:
        x0RealSystem = np.zeros((simulatedSystem.stateSize,))

    x0 = model.convEncoder.predict([pastY.T,pastU.T])

    logY = []
    logU = []
    logYR = []
    finalRange=1000

    if not(YTrue is None):
        finalRange = YTrue.shape[0]

    for i in range(0,finalRange):
        u = 0.5 * np.array([[np.sin(i/(20+0.01*i))]]) + 0.5

        if not validationOnMultiHarmonic:
            u = [U_Vn[i, 0]]
            print(u)


        if YTrue is None:
            y_kReal, x0RealSystem_ = simulatedSystem.loop(x0RealSystem, u)
            x0RealSystem = np.reshape(x0RealSystem_, (simulatedSystem.stateSize,))
        else:
            y_kReal = YTrue[i]
            u = [U_Vn[i]]


        pastU = np.reshape(np.append(pastU, u)[1:],(model.strideLen, 1))
        pastY = np.reshape(np.append(pastY, y_kReal)[1:],(model.strideLen, 1))

        if i < openLoopStartingPoint or (i % reset == 0 and reset > 0):
            x0 = model.convEncoder.predict([pastY.T,pastU.T])
            print('*',end='')
        else:
            x0 = model.bridgeNetwork.predict([np.reshape(u, (1,1)), x0])[0]

        y = model.outputEncoder.predict([x0])[0]

        if i >= openLoopStartingPoint:
            logY+=[y[0][-2]]
            logYR+=[y_kReal[0]]
            logU+=[u[0]]

        print('.',end='')

        pass
    print('\n')
    logY = np.array(logY)
    logYR = np.array(logYR)
    a = np.linalg.norm(np.array(logY)-np.array(logYR))
    b = np.linalg.norm(np.mean(np.array(logYR))-np.array(logYR))
    fit = 1 - (a / b)
    NRMSE = 1 - np.sqrt(np.mean(np.square(np.array(logY)-np.array(logYR))))/(np.max(logYR)-np.min(logYR))
    fit = np.max([0, fit])
    NRMSE = np.max([0, NRMSE])
    print('fit: ',fit)
    print('NRMSE: ',NRMSE)
    if Option.enablePlot:
        plt.figure(1)
        plt.title('Inverse pendulum Open-loop validation - BFR = '+str(fit))
        y = plt.plot(logY)
        yr = plt.plot(logYR)
        plt.legend(['y estimated', 'y real'])
        plt.grid()
        plt.show()

    return fit, NRMSE, logY, logYR


validationOnMultiHarmonic=[False]
reset=[10]

for r in reset:
    for voM in validationOnMultiHarmonic:
        start = time.time()
        YtrueToPass = None

        if 'dataset' in Option.stringDynamicalSystemSelector:
            YtrueToPass = Y_Vn.copy()

        fit, NRMSE, logY, logYR = openLoopValidation(validationOnMultiHarmonic=voM, reset=r, YTrue=YtrueToPass, U_Vn=U_Vn.copy())

        end= time.time()

        print('elapsed time in simulation:',end-start)
        print("validationOnMultiHarmonic:",voM,end=' ')
        print("reset every:",r,end=' ')
        print('fit: ',fit,' NRMSE: ',NRMSE)

## Adversarial Attack

In [None]:
class AdversarialAttack():
    def __init__(self, model, epsilon=0.1, iterations=10):
        """
        Initialize the adversarial attack class with the model and its components.

        :param model: The overall model that includes encoder, bridge, and decoder.
        :param epsilon: The limit for the perturbation magnitude.
        :param iterations: The number of iterations for the attack.
        """
        self.model = model
        self._epsilon = epsilon
        self._iterations = iterations
        self.gradient_history = []  # Store gradient history for plotting

    @property
    def epsilon(self):
        return self._epsilon

    @property
    def iterations(self):
        return self._iterations

    def run_output(self, uk, x0, yk_true):
        """
        Returns the crafted adversarial input to deceive the output of the
        autoencoder.

        Parameters:
        - uk: The original input to the network f.
        - x0: Latent representation of the current state.
        - yk_true: True output of the real dynamical system.

        Returns:
        - uk_adv: Crafted adversarial input.
        """
        uk = tf.convert_to_tensor(uk, dtype=tf.float32)
        yk_true = tf.convert_to_tensor(yk_true, dtype=tf.float32)
        x0 = tf.convert_to_tensor(x0, dtype=tf.float32)

        uk_adv = uk

        for _ in range(self._iterations):
            with tf.GradientTape() as tape:
                tape.watch(uk_adv)
                next_state_pred = self.model.bridgeNetwork([uk_adv, x0])[0]
                yk_pred = self.model.outputEncoder([next_state_pred])[0]
                yk_pred = yk_pred[0][-2]
                loss = tf.keras.losses.mean_absolute_error(yk_true, yk_pred)

            # Calculate gradients of loss w.r.t. adversarial_u
            gradients = tape.gradient(loss, uk_adv)

            # Update adversarial_u by moving in the direction that maximizes the loss
            uk_adv = uk_adv + self.epsilon * tf.sign(gradients)
            #print(f"Loss: {loss}, Gradient: {gradients}, uk_adv: {uk_adv}")

        return uk_adv, yk_pred

### Evasion

In [None]:
def openLoopValidation(validationOnMultiHarmonic=True, reset=-1, YTrue=None,
                       U_Vn=None, attack_params=None):
    openLoopStartingPoint = Option.openLoopStartingPoint
    pastY = np.zeros((model.strideLen, 1))
    pastU = np.zeros((model.strideLen, 1))

    if YTrue is None:
        x0RealSystem = np.zeros((simulatedSystem.stateSize,))

    x0 = model.convEncoder.predict([pastY.T, pastU.T])
    logY, logU, logYR, logUAdv = [], [], [], []
    finalRange = 1000

    if not(YTrue is None):
        finalRange = YTrue.shape[0]

    attack = AdversarialAttack(model, epsilon=attack_params["epsilon"],
                               iterations=attack_params["iterations"])

    for i in range(0, finalRange):
        u = 0.5 * np.array([[np.sin(i / (20 + 0.01 * i))]]) + 0.5

        if not validationOnMultiHarmonic:
            u = [U_Vn[i]]

        if YTrue is None:
            y_kReal, x0RealSystem_ = simulatedSystem.loop(x0RealSystem, u)
            x0RealSystem = np.reshape(x0RealSystem_, (simulatedSystem.stateSize,))
        else:
            y_kReal = YTrue[i]
            u = [U_Vn[i]]

        uk_adv, yk_pred = attack.run_output(u, x0, y_kReal)

        pastU = np.reshape(np.append(pastU, uk_adv)[1:],(model.strideLen, 1))
        pastY = np.reshape(np.append(pastY, y_kReal)[1:],(model.strideLen, 1))

        if i < openLoopStartingPoint or (i % reset == 0 and reset > 0):
            x0 = model.convEncoder.predict([pastY.T, pastU.T])
            print('*', end='')
        else:
            # Prediction of the next state based on the bridge network
            x0 = model.bridgeNetwork.predict([uk_adv, x0])[0]

        y = model.outputEncoder.predict([x0])[0]

        if i >= openLoopStartingPoint:
            logY += [y[0][-2]]
            logYR += [y_kReal[0]]
            logU += [u[0]]
            logUAdv += [uk_adv[0]]

        print('.', end='')

    print('\n')
    logY = np.array(logY)
    logYR = np.array(logYR)
    logU = np.array(logU)
    logUAdv = np.array(logUAdv)

    a = np.linalg.norm(np.array(logY) - np.array(logYR))
    b = np.linalg.norm(np.mean(np.array(logYR)) - np.array(logYR))
    fit = 1 - (a / b)
    NRMSE = 1 - np.sqrt(np.mean(np.square(np.array(logY) - np.array(logYR)))) / (np.max(logYR) - np.min(logYR))
    fit = np.max([0, fit])
    NRMSE = np.max([0, NRMSE])
    print('fit: ', fit)
    print('NRMSE: ', NRMSE)

    if Option.enablePlot:
        # Plot real output
        plt.figure(1, figsize=(12, 10), dpi=1000)
        plt.xlabel("Iterations", fontsize=32)
        plt.ylabel(r"$y$", fontsize=32)
        plt.xticks(fontsize=28)
        plt.yticks(fontsize=28)
        plt.plot(logYR, color='orange')
        plt.savefig("output_real.pdf", dpi=1000)
        plt.show()

        # Plot estimated output and difference between real and estimated output
        plt.figure(2, figsize=(12,10), dpi=1000)
        plt.xlabel("Iterations", fontsize=32)
        plt.ylabel(r"$\hat{y}$", fontsize=32)
        plt.xticks(fontsize=28)
        plt.yticks(fontsize=28)
        plt.plot(logY)
        plt.savefig("output_estimation.pdf", dpi=1000)
        plt.show()

        # 1) Plot difference between logYR and logY
        plt.figure(3, figsize=(12,10), dpi=1000)
        diff_logY = logYR - logY
        plt.xlabel("Iterations", fontsize=32)
        plt.xticks(fontsize=28)
        plt.yticks(fontsize=28)
        plt.plot(diff_logY)
        plt.savefig("output_difference.pdf", dpi=1000)
        plt.show()

        # 2) Plot original input sequence logU
        plt.figure(4, figsize=(12,10), dpi=1000)
        plt.xlabel("Iterations", fontsize=32)
        #plt.ylabel(r"$u$", fontsize=32)
        plt.plot(logU)
        plt.xticks(fontsize=28)
        plt.yticks(fontsize=28)
        plt.savefig("input_original.pdf", dpi=1000)
        plt.show()

        # 3) Plot adversarial input sequence logU_adv
        plt.figure(5, figsize=(12,10), dpi=1000)
        plt.xlabel("Iterations", fontsize=32)
        #plt.ylabel(r"$\Tilde{u}$", fontsize=32)
        plt.xticks(fontsize=28)
        plt.yticks(fontsize=28)
        plt.plot(logUAdv)
        plt.savefig("input_adversarial.pdf", dpi=1000)
        plt.show()

        # 4) Plot difference between logU and logU_adv
        plt.figure(6, figsize=(12, 10), dpi=1000)
        diff_logU = logUAdv - logU
        plt.xlabel("Iterations", fontsize=32)
        plt.xticks(fontsize=28)
        plt.yticks(fontsize=28)
        plt.ylim([-5, 5])
        plt.plot(diff_logU)
        plt.savefig("input_difference.pdf", dpi=1000)
        plt.show()


    return fit, NRMSE, logY, logYR

In [None]:
# Model Validation Under Attack Scenario 1
print("Open Loop Validation Under Attack Scenario 1")

epsilons = [0.01, 0.1, 0.3, 0.5]
n_iterations = [1, 10, 30]

# VoM is a parameter to set the nature of the input signal.
# True means setting an input signal with multi-harmonic nature.
# False means setting a casual input signal.

validation_params = dict(VoM=True, reset=10)

for e in epsilons:
    for n in n_iterations:
        attack_params = dict(epsilon=e, iterations=n)
        start = time.time()
        YtrueToPass = None

        if 'dataset' in Option.stringDynamicalSystemSelector:
            YtrueToPass = Y_Vn.copy()

        fit, NRMSE, logY, logYR = openLoopValidation(validationOnMultiHarmonic=validation_params["VoM"],
                                                        reset=validation_params["reset"], YTrue=YtrueToPass,
                                                        U_Vn=U_Vn.copy(), attack_params=attack_params)

        end = time.time()
        print('Elapsed time in simulation:', end - start)
        print("ValidationOnMultiHarmonic:", validation_params["VoM"], end=' ')
        print("Reset every:", validation_params["reset"], end=' ')
        print('Fit: ', fit, ' NRMSE: ', NRMSE)

### Poisoning

In [None]:
class PoisoningAttack():
    def __init__(self, model, pastU, pastY, u, uk_adv, y_kReal, yk_pred):
        self.model = model
        self.pastU = pastU
        self.pastY = pastY
        self.u = u
        self.uk_adv = uk_adv
        self.y_kReal = y_kReal
        self.yk_pred = yk_pred

    def poison_information_vector(self, i, poisoning_percentage):
        if poisoning_percentage == 5:
            if i % 10 == 0 and i < 500 :
                self.pastU = np.reshape(np.append(self.pastU, self.uk_adv)[1:], (self.model.strideLen, 1))
                self.pastY = np.reshape(np.append(self.pastY, self.y_kReal)[1:], (self.model.strideLen, 1))
            elif i % 10 == 0 and i >= 500:
                self.pastU = np.reshape(np.append(self.pastU, self.u)[1:], (self.model.strideLen, 1))
                self.pastY = np.reshape(np.append(self.pastY, self.yk_pred)[1:], (self.model.strideLen, 1))
            else:
                self.pastU = np.reshape(np.append(self.pastU, self.u)[1:], (self.model.strideLen, 1))
                self.pastY = np.reshape(np.append(self.pastY, self.y_kReal)[1:], (self.model.strideLen, 1))
        elif poisoning_percentage == 10:
            if i % 10 == 0:
                self.pastU = np.reshape(np.append(self.pastU, self.uk_adv)[1:], (self.model.strideLen, 1))
                self.pastY = np.reshape(np.append(self.pastY, self.yk_pred)[1:], (self.model.strideLen, 1))
            else:
                self.pastU = np.reshape(np.append(self.pastU, self.u)[1:], (self.model.strideLen, 1))
                self.pastY = np.reshape(np.append(self.pastY, self.y_kReal)[1:], (self.model.strideLen, 1))
        elif poisoning_percentage == 15:
            if i % 10 == 0:
                self.pastU = np.reshape(np.append(self.pastU, self.uk_adv)[1:], (self.model.strideLen, 1))
                self.pastY = np.reshape(np.append(self.pastY, self.yk_pred)[1:], (self.model.strideLen, 1))
                self.pastY = np.reshape(np.append(self.pastY, self.yk_pred)[1:], (self.model.strideLen, 1))
            else:
                self.pastU = np.reshape(np.append(self.pastU, self.u)[1:], (self.model.strideLen, 1))
                self.pastY = np.reshape(np.append(self.pastY, self.y_kReal)[1:], (self.model.strideLen, 1))
        elif poisoning_percentage == 20:
            if i % 10 == 0:
                if i == 0:
                    self.pastU[-2:] = self.uk_adv
                    self.pastY[-2:] = self.yk_pred
                else:
                    self.pastU = np.reshape(np.append(self.pastU, self.uk_adv)[1:], (self.model.strideLen, 1))
                    self.pastY = np.reshape(np.append(self.pastY, self.yk_pred)[1:], (self.model.strideLen, 1))
            elif i % 10 == 9:
                self.pastU[0] = self.uk_adv
                self.pastU[-1] = self.yk_pred
                self.pastY[0] = self.uk_adv
                self.pastY[-1] = self.yk_pred
        else:
            self.pastU = np.reshape(np.append(self.pastU, self.u)[1:], (self.model.strideLen, 1))
            self.pastY = np.reshape(np.append(self.pastY, self.y_kReal)[1:], (self.model.strideLen, 1))


In [None]:
def openLoopValidation(validationOnMultiHarmonic=True, reset=-1, YTrue=None,
                       U_Vn=None, attack_params=None, poisoning_percentage=5):
    openLoopStartingPoint = Option.openLoopStartingPoint
    pastY = np.zeros((model.strideLen, 1))
    pastU = np.zeros((model.strideLen, 1))

    if YTrue is None:
        x0RealSystem = np.zeros((simulatedSystem.stateSize,))

    x0 = model.convEncoder.predict([pastY.T, pastU.T])
    logY, logU, logYR, logUAdv = [], [], [], []
    finalRange = 1000

    if not(YTrue is None):
        finalRange = YTrue.shape[0]

    attack = AdversarialAttack(model, epsilon=attack_params["epsilon"],
                               iterations=attack_params["iterations"])

    poisoning_attack = PoisoningAttack(model, pastU, pastY, None, None, None, None)

    for i in range(0, finalRange):
        print(f"Iteration {i}")
        u = 0.5 * np.array([[np.sin(i / (20 + 0.01 * i))]]) + 0.5

        if not validationOnMultiHarmonic:
            u = [U_Vn[i]]

        if YTrue is None:
            y_kReal, x0RealSystem_ = simulatedSystem.loop(x0RealSystem, u)
            x0RealSystem = np.reshape(x0RealSystem_, (simulatedSystem.stateSize,))
        else:
            y_kReal = YTrue[i]
            u = [U_Vn[i]]

        uk_adv, yk_pred = attack.run_output(u, x0, y_kReal)

        poisoning_attack.u = u
        poisoning_attack.uk_adv = uk_adv
        poisoning_attack.y_kReal = y_kReal
        poisoning_attack.yk_pred = yk_pred

        poisoning_attack.poison_information_vector(i, poisoning_percentage)

        pastU = poisoning_attack.pastU
        pastY = poisoning_attack.pastY


        if i < openLoopStartingPoint or (i % reset == 0 and reset > 0):
            x0 = model.convEncoder.predict([pastY.T, pastU.T])
            print('*', end='')
        else:
            # Prediction of the next state based on the bridge network
            x0 = model.bridgeNetwork.predict([uk_adv, x0])[0]

        y = model.outputEncoder.predict([x0])[0]

        if i >= openLoopStartingPoint:
            logY += [y[0][-2]]
            logYR += [y_kReal[0]]
            logU += [u[0]]
            logUAdv += [uk_adv[0]]

        print('.', end='')

    print('\n')
    logY = np.array(logY)
    logYR = np.array(logYR)
    logU = np.array(logU)
    logUAdv = np.array(logUAdv)

    a = np.linalg.norm(np.array(logY) - np.array(logYR))
    b = np.linalg.norm(np.mean(np.array(logYR)) - np.array(logYR))
    fit = 1 - (a / b)
    NRMSE = 1 - np.sqrt(np.mean(np.square(np.array(logY) - np.array(logYR)))) / (np.max(logYR) - np.min(logYR))
    fit = np.max([0, fit])
    NRMSE = np.max([0, NRMSE])
    print('fit: ', fit)
    print('NRMSE: ', NRMSE)

    if Option.enablePlot:
        plt.figure(1, figsize=(13,10), dpi=1000)
        fit = round(fit, 4)
        plt.xlabel("Iterations", fontsize=32)
        plt.xticks(fontsize=28)
        plt.ylabel(r"$\hat{y}$, $y$", fontsize=32)
        plt.yticks(fontsize=28)
        y = plt.plot(logY)
        yr = plt.plot(logYR, linestyle="--")
        epsilon = str(attack.epsilon).replace(".", "")
        plt.savefig("plot_poisoning_"+str(poisoning_percentage)+".pdf", dpi=1000)
        plt.grid()
        plt.show()


    return fit, NRMSE, logY, logYR

In [None]:
# Model Validation Under Attack Scenario 2
print("Open Loop Validation Under Attack Scenario 2")

epsilons = [0.01]
n_iterations = [1]
poison_p = [5, 10, 15, 20]

# VoM is a parameter to set the nature of the input signal.
# True means setting an input signal with multi-harmonic nature.
# False means setting a casual input signal.
validation_params = dict(VoM=True, reset=10)

for p in poison_p:
    for e in epsilons:
        for n in n_iterations:
            attack_params = dict(epsilon=e, iterations=n)
            start = time.time()
            YtrueToPass = None

            if 'dataset' in Option.stringDynamicalSystemSelector:
                YtrueToPass = Y_Vn.copy()

            fit, NRMSE, logY, logYR = openLoopValidation(validationOnMultiHarmonic=validation_params["VoM"],
                                                            reset=validation_params["reset"], YTrue=YtrueToPass,
                                                            U_Vn=U_Vn.copy(), attack_params=attack_params, poisoning_percentage=p)

            end = time.time()
            print('Elapsed time in simulation:', end - start)
            print("ValidationOnMultiHarmonic:", validation_params["VoM"], end=' ')
            print("Reset every:", validation_params["reset"], end=' ')
            print('Fit: ', fit, ' NRMSE: ', NRMSE)