<a href="https://colab.research.google.com/github/ucl-casa-ce/casa0018/blob/main/Week7/anomaly_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Set Up

First set up the necessary Python imports and set up Tensor Board which provides a visual output of the training process.

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Input, LSTM, Dense, RepeatVector, TimeDistributed
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.utils import timeseries_dataset_from_array
import matplotlib.pyplot as plt


# Create Data

Create sine function data. We’ll use the NumPy linspace to generate x values ranging between 0 and 200*2*Pi (200 cycles) and NumPy sine function to generate sine values to the corresponding x. We subdivide the range into 10000 data points.

We then add some noise to the data. Finally, we visualize the data.

In [None]:
x = np.linspace(0, 200*2*np.pi, 10001) # in radians, 200 cycles, 50 data points per cycle
y = np.sin(x)
y += 0.05*np.random.randn(len(y)) #Add some noise to the data to make it more realistic

# Inject anomalies (random spikes)
num_anomalies = 10
anomaly_indices = np.random.choice(2000, num_anomalies, replace=False)
y[anomaly_indices+8000] += np.random.uniform(-5, 5, num_anomalies)  # Add large spikes

# Plot our data
plt.figure(figsize=(12,8))
plt.plot(range(len(y)), y)
plt.show()


Divide the data set into windows of data

In [None]:
sequence_length = 50 # sequence length - the length of the training window

data = []
for i in range(len(y) - sequence_length):
    data.append(y[i : i + sequence_length])

Y = np.array(data).reshape(-1, sequence_length, 1)  # Reshape for LSTM
print(len(Y))

# Create a model

The code to create a LSTM is similar to that for earlier NNs you have already seen.

The variable (n_features) defined stands for the number of features in the training data i.e., as we are dealing with univariate data we’ll only have one feature whereas if we are using multivariate data containing multiple features then we must specify the count of features in our data.

In [None]:
n_features = 1
latent_dim = 50
batch_size = 16

# Encoder
encoder = Sequential()
encoder.add(LSTM(latent_dim, activation="relu", return_sequences=False, input_shape=(sequence_length, n_features)))
encoder.add(RepeatVector(sequence_length)) # Repeat for decoder input

# Decoder
decoder = Sequential()
decoder.add(LSTM(latent_dim, activation="relu", return_sequences=True)) # Pass the output shape of the encoder
decoder.add(TimeDistributed(Dense(1)))

# Autoencoder
autoencoder = Sequential([encoder, decoder])
autoencoder.compile(loss='mse', optimizer='adam')
autoencoder.summary()

#Train
Train the autoencoder on 80% 0f the data. Set aside 20% for testing

In [None]:
# ==== Train Autoencoder on Normal Data ====
Y_train = Y[:8000]  # Train on 80% of data

Y_test = Y[8000:]

autoencoder.fit(Y_train, Y_train, epochs=20, batch_size=batch_size, validation_split=0.1)


# Comparing Predictions Using The Test Data

Now let's compare the predictions of the LSTM model using our test data set.


In [None]:

# ==== Anomaly Detection ====
Y_pred = autoencoder.predict(Y_test)  # Reconstruct sequences
reconstruction_errors = np.mean(np.abs(Y_test - Y_pred), axis=(1, 2))  # Compute reconstruction error per sequence

# Set Anomaly Threshold (e.g., 95th percentile)
threshold = np.percentile(reconstruction_errors, 95)
anomalies = reconstruction_errors > threshold  # Flag anomalies


# ==== Plot Results ====
plt.figure(figsize=(10, 4))
plt.plot(range(len(y[8000:])), y[8000:], label="Original Data")

# Extract the first element of each sequence in Y_pred
Y_pred_first_element = Y_pred[:, 0, 0]  # Shape will be (1951,)

# Plot the first element of each predicted sequence against the original data
plt.plot(range(len(Y_pred_first_element)), Y_pred_first_element, label="Reconstructed Data")
#last_sequence_pred = Y_pred[-1, :, 0]  # Shape will be (50,)
#plt.plot(range(1951,2001), last_sequence_pred, color='orange')

plt.scatter(np.where(anomalies)[0] + sequence_length, y[8000 + sequence_length:][anomalies], color="red", label="Anomalies")

plt.legend()
plt.xlabel("Time Step")
plt.ylabel("Value")
plt.title("Anomaly Detection in Sinusoidal Time Series")
plt.show()
