# In this notebook, we are going to create a model for detecting anomalies using the autoencoder method.

make the dataset and train the model on it

In [1]:
# import the lib
import tensorflow as tf
from tensorflow.keras import layers, models
import numpy as np
from sklearn.model_selection import train_test_split

In [2]:
# Generate random normal data
normal_data = np.random.normal(0, 1, size=(1000, 32))
normal_data.shape

(1000, 32)

In [3]:
# set the train data and the train labels
train_data = normal_data
train_labels = np.array(len(normal_data)*[0])  # make 0 array as length same as the nrmal data length

In [4]:
train_labels[:10]

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

In [5]:
# split the data set to the train and the test set
x_train, x_test, y_train, y_test = train_test_split(train_data, train_labels, test_size=0.2, random_state=42)

In [6]:
# test the x_train shape
x_train.shape

(800, 32)

In [7]:
# set the imput shape
input_dim = x_train.shape[1]

# set the encoder
encoder = models.Sequential([
    layers.Input(shape=(input_dim,)),
    layers.Dense(64, activation='relu'),
    layers.Dense(32, activation='relu'),
    layers.Dense(16, activation='relu')
])

# set the decoder
decoder = models.Sequential([
    layers.Input(shape=(16,)),
    layers.Dense(32, activation='relu'),
    layers.Dense(64, activation='relu'),
    layers.Dense(input_dim, activation='sigmoid')
])

# set the autoencoder usisng the encoder and the decoder
autoencoder = models.Sequential([
    encoder,
    decoder
])

# compile the auto encoder model
autoencoder.compile(optimizer='adam', loss='mean_squared_error')


In [8]:
# set the early stop callback
from tensorflow.keras.callbacks import EarlyStopping
early_stop = EarlyStopping(monitor = "val_loss", patience = 10, restore_best_weights = True)


# fit the auto encoder model
autoencoder.fit(x_train,
                x_train,
                epochs= 200,
                batch_size=128,
                validation_data=(x_test, x_test),
                callbacks = early_stop)


Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78

<keras.callbacks.History at 0x7fd4694f6230>

In [9]:
# evaluate the autoencoder mdoel
autoencoder.evaluate(x_test[y_test == 0], x_test[y_test == 0])



0.8836498260498047

Make another dataset and detect the anomalys

In [82]:
# now lets make a anomaly and the normaly (wat we are trainded for) data set for seprate the anomaly data

normal_data_2  = np.random.normal(0, 1, size = (1000, 32))
anomaly_data_2 = np.random.normal(7, 2, size = (300, 32))

In [83]:
# mix the anomal and the normal data

data = np.vstack([normal_data_2, anomaly_data_2])
data.shape

(1300, 32)

In [84]:
# set the labels
# lets difine as normal data as 0 and anomaly data 1
labels = np.array([0]*len(normal_data_2) + [1]*len(anomaly_data_2))

In [85]:

# Calculate reconstruction errors for all data
reconstructed_data = autoencoder.predict(data)
mse = np.mean(np.power(data - reconstructed_data, 2), axis=1)

print(len(mse))

1300


In [86]:
# Set a threshold for anomaly detection (you can adjust this threshold)
threshold = 1.7  # this is we can adjust how we want

# Classify anomalies based on the threshold
predictions = (mse > threshold).astype(int)
predictions_2 = [x for x in predictions if x == 1]

print(len(predictions))
print(len(predictions_2))

1300
301


In [87]:
print(predictions[800:])


[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 

We can see that the model has successfully identified the anomalies correctly. Therefore, our model can predict the anomalies in that dataset accurately using the 1.7 threshold.

end