In [1]:
import pandas as pd
import numpy as np

from tensorflow import keras
from tensorflow.python.keras.layers import Input, Dense,RepeatVector, TimeDistributed, Dense, Dropout, LSTM
from tensorflow.python.keras.models import Sequential
import tensorflow as tf
import matplotlib.pyplot as plt
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

from sklearn.metrics import confusion_matrix, recall_score, accuracy_score, precision_score
from sklearn.metrics import confusion_matrix, recall_score, accuracy_score, precision_score


In [2]:
def conv_array(array):
    result=[]
    for r in array:
        result.append(r)
    return np.asarray(result)

def SSIMLoss(y_true, y_pred):
    return 1 - tf.reduce_mean(tf.image.ssim(y_true, y_pred,1.0))

In [3]:
data = pd.read_pickle("./DataSignal.pkl")
df_f40=data.loc[((data['Status'] == "OK")|(data['Status'] == "NOK"))&(data['Model'] == "F40")]

df_OK = df_f40.loc[(df_f40['Status']=="OK")]
df_NOK = df_f40.loc[(df_f40['Status']=="NOK")]

x_train = conv_array(df_OK['Acc'])
x_test = conv_array(df_NOK['Acc'])
y_train = np.asarray([1]*(len(df_OK['Status'])))
y_test = np.asarray([0]*(len(df_NOK['Status'])))



In [4]:
print(x_train.shape)
print(y_train.shape)
print(x_test.shape)
print(y_test.shape)



(2313, 3500)
(2313,)
(141, 3500)
(141,)


In [5]:
from sklearn.model_selection import train_test_split

pos = conv_array(df_f40['Pos'])


y = conv_array(df_f40['Status'])
lb = LabelEncoder()
y = lb.fit_transform(y)


X_t, X_te, y_t, y_te = train_test_split(pos, y, test_size=0.33, random_state=42)

In [6]:
nb_epoch = 50
batch_size = 10
input_dim =3500 #num of columns, 30
encoding_dim = 1000
hidden_dim_1 = int(encoding_dim / 2) #
hidden_dim_2=90 
learning_rate = 1e-7

In [7]:
#input Layer
input_layer = tf.keras.layers.Input(shape=(input_dim, ))
#Encoder
encoder = tf.keras.layers.Dense(encoding_dim, activation="tanh",activity_regularizer=tf.keras.regularizers.l2(learning_rate))(input_layer)
encoder=tf.keras.layers.Dropout(0.2)(encoder)
encoder = tf.keras.layers.Dense(hidden_dim_1, activation='relu')(encoder)
encoder = tf.keras.layers.Dense(hidden_dim_2, activation=tf.nn.leaky_relu)(encoder)
# Decoder
decoder = tf.keras.layers.Dense(hidden_dim_1, activation='relu')(encoder)
decoder=tf.keras.layers.Dropout(0.2)(decoder)
decoder = tf.keras.layers.Dense(encoding_dim, activation='relu')(decoder)
decoder = tf.keras.layers.Dense(input_dim, activation='tanh')(decoder)
#Autoencoder
autoencoder = tf.keras.Model(inputs=input_layer, outputs=decoder)
autoencoder.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 3500)]            0         
_________________________________________________________________
dense (Dense)                (None, 1000)              3501000   
_________________________________________________________________
dropout (Dropout)            (None, 1000)              0         
_________________________________________________________________
dense_1 (Dense)              (None, 500)               500500    
_________________________________________________________________
dense_2 (Dense)              (None, 90)                45090     
_________________________________________________________________
dense_3 (Dense)              (None, 500)               45500     
_________________________________________________________________
dropout_1 (Dropout)          (None, 500)               0     

In [8]:
autoencoder.compile(metrics=['accuracy'],
                    loss='mse',
                    optimizer='adam')

In [None]:
history = autoencoder.fit(X_t , X_t ,
                    epochs=50,
                    batch_size=batch_size,
                    validation_data=(X_te, X_te),
                    shuffle=True)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50

In [None]:
plt.plot(autoencoder.history.history['loss'], linewidth=2, label='Train')
plt.plot(autoencoder.history.history['val_loss'], linewidth=2, label='Valid')
plt.legend(loc='upper right')
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.show()

The loss funtion shows reptative behavior of input data

In [None]:
test_x_predictions = autoencoder.predict(X_te)
mse = np.mean(np.power(X_te - test_x_predictions, 2), axis=1)
error_df = pd.DataFrame({'Reconstruction_error': mse,
                        'True_class': y_te})
error_df.loc[(error_df['True_class']==0)]

# New Section

In [None]:
threshold_fixed =0.0004

LABELS = ["NOK","OK"]
pred_y = [1 if e > threshold_fixed else 0 for e in error_df.Reconstruction_error.values]
error_df['pred'] =pred_y
conf_matrix = confusion_matrix(error_df.True_class, pred_y)
plt.figure(figsize=(4, 4))
sns.heatmap(conf_matrix, xticklabels=LABELS, yticklabels=LABELS, annot=True, fmt="d");
plt.title("Confusion matrix")
plt.ylabel('True class')
plt.xlabel('Predicted class')
plt.show()
# print Accuracy, precision and recall
print(" Accuracy: ",accuracy_score(error_df['True_class'], error_df['pred']))
print(" Recall: ",recall_score(error_df['True_class'], error_df['pred']))
print(" Precision: ",precision_score(error_df['True_class'], error_df['pred']))

In [None]:
groups = error_df.groupby('True_class')
fig, ax = plt.subplots()
for name, group in groups:
    ax.plot(group.index, group.Reconstruction_error, marker='o', ms=3.5, linestyle='',
            label= "Normal" if name == 1 else "Anomaly")
ax.hlines(threshold_fixed, ax.get_xlim()[0], ax.get_xlim()[1], colors="r", zorder=100, label='Threshold')
ax.legend()
plt.title("Reconstruction error for normal and abnormal data")
plt.ylabel("Reconstruction error")
plt.xlabel("Data point index")
plt.show();