In [26]:
import os
import numpy as np
import keras
import matplotlib.pyplot as plt
import shutil
from keras.models import Sequential
from keras.layers import LSTM, GRU, Dense, Dropout, Layer, Bidirectional, Conv2D, Flatten, BatchNormalization
from keras.layers import RepeatVector, concatenate
from keras.layers import TimeDistributed
from keras.callbacks import EarlyStopping
from keras.utils.vis_utils import plot_model
from keras.optimizers import Adam
from sklearn.ensemble import IsolationForest
import keras.backend as K
import tensorflow as tf

In [27]:
autoencoder = tf.keras.models.load_model(r'C:\\Users\\ricky\\FYP\\RNNAE_public\\RNN_autoencoder_model')
print(autoencoder.summary)

<bound method Model.summary of <tensorflow.python.keras.engine.functional.Functional object at 0x000002564DC01A60>>


In [28]:
encoder = tf.keras.models.load_model(r'C:\\Users\\ricky\\FYP\\RNNAE_public\\RNN_encoder_model')
print(encoder.summary)

<bound method Model.summary of <tensorflow.python.keras.engine.functional.Functional object at 0x00000256306C0190>>


In [29]:
os.chdir(r'C:\\Users\\ricky\\FYP\\RNNAE_public\\RNN_npy')

def import_data(x):

    x = np.load(f'{x}.npy', allow_pickle=True)
    x = np.asarray(x).astype('float32')

    return x

input = import_data('input')
input_train = import_data('input_train')
input_test = import_data('input_test')
type_train = import_data('type_train')
type_test = import_data('type_test')
mask_train = import_data('mask_train')
mask_test = import_data('mask_test')

In [30]:
os.chdir(r'C:\\Users\\ricky\\FYP\\RNNAE_public')

data_GP = np.load('data_GP.npy', allow_pickle=True)
data_GP = np.array(data_GP)

In [39]:
def rnnae_test(autoencoder, input_tmp, mask_tmp):

    yhat = autoencoder.predict(x=[input_tmp, mask_tmp], verbose=1)
    yhat_loss = autoencoder.evaluate(x=[input_tmp, mask_tmp], y=None, verbose=1)

    return yhat

def latent_space_demo(encoder, input_tmp):

    latent_space = encoder.predict(input_tmp, verbose=1)

    os.chdir('C:\\Users\\ricky\\FYP\\RNNAE_public\\RNN_latent_space_graph')

    for i in range(latent_space.shape[1] - 1):
        for j in range(latent_space.shape[1] - 1 - i):
            fig = plt.figure(figsize=(6, 6))
            plt.grid()
            plt.scatter(latent_space[:,i], latent_space[:,i+j+1], s=8)
            plt.title(f'id {i} vs id {i+j+1}.pdf')
            plt.savefig(f'id_{i}_vs_id_{i+j+1}.pdf')
            plt.close()

    return latent_space

def isolation_forest(latent_space, split):

    clf = IsolationForest(n_estimators=500)
    clf.fit(latent_space)
    anomaly = clf.score_samples(latent_space)
    anomaly_id = np.argsort(anomaly)

    for i, ano in enumerate(anomaly_id):
        #shutil.copy(f'/home/ricky/RNNAE/GP_graph/{data_GP[i+split][-1]}.pdf', f'/home/ricky/RNNAE/RNN_anomaly_graph/')
        shutil.copy(f'C:\\Users\\ricky\\FYP\\RNNAE_public\\GP_graph\\{data_GP[ano+split][-1]}.pdf', f'C:\\Users\\ricky\\FYP\\RNNAE_public\\RNN_anomaly_graph\\{i}_{data_GP[ano+split][-1]}.pdf')

    return anomaly

def reconstruction_graph(input_tmp, yhat, split, filters=['u', 'g', 'i']):

    color1 = ['darkviolet', 'seagreen', 'crimson', 'maroon']
    color2 = ['darkmagenta', 'darkgreen', 'firebrick', 'darkred']

    for i in range(input_tmp.shape[0]):

        os.chdir(r'C:\\Users\\ricky\\FYP\\RNNAE_public\\RNN_reconstruction_graph')

        isExist = os.path.exists(f'.\\{data_GP[i][-1]}')

        if not isExist:
            os.makedirs(f'.\\{data_GP[i][-1]}')
            os.chdir(f'.\\{data_GP[i][-1]}')

        for j, filter in enumerate(filters):
            fig = plt.figure(figsize=(12, 8))
            ax = fig.add_subplot(1, 1, 1)

            plt.gca().invert_yaxis()

            # And a corresponding grid
            ax.grid(which='major', alpha=0.8)
            ax.grid(which='minor', alpha=0.3)

            plt.xlabel('Timestep', fontsize=15)
            plt.ylabel('Absolute Magnitude', fontsize=15)

            plt.xlim(-50, 185)

            plt.title(f'{data_GP[i+split][-1]}, {data_GP[i+split][-2]}, {filter}')

            #plt.errorbar(data_GP[i+split][0], data_GP[i+split][j+1], y_err=data_GP[i+split][j+4], fmt='v')

            plt.scatter(input_tmp[i,:,0], input_tmp[i,:,j+1], s=2, marker='o', color=color1[j], label=f'test data'.format('o'))
            plt.scatter(input_tmp[i,:,0], yhat[i,:,j], s=12, marker='X', color=color2[j], label=f'reconstruction'.format('x'))
            
            plt.legend()

            plt.savefig(f'.\\{data_GP[i][-1]}_{filter}_band.pdf')

            plt.close()

    return

In [40]:
yhat = rnnae_test(autoencoder, input_test[0], mask_test)
latent_space = latent_space_demo(encoder, input_train[0])
anomalies = isolation_forest(latent_space, 0)
reconstruction_graph(input_test[0], yhat, int(0.8*(data_GP.shape[0])))

