In [None]:
# a. Import required libraries
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

# b. Upload / access the dataset
# For this example, let's use the MNIST dataset to detect anomalies
# We'll consider digits 0-4 as "normal" and digits 5-9 as "anomalies"
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()

# Normalize and reshape the dataset
x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0
x_train = x_train.reshape((x_train.shape[0], -1))  # Flatten the images
x_test = x_test.reshape((x_test.shape[0], -1))

# Select only "normal" data for training
x_train_normal = x_train[y_train <= 4]  # Digits 0-4 are considered "normal"

# Split the normal data into training and validation sets
x_train_normal, x_val_normal = train_test_split(x_train_normal, test_size=0.2, random_state=42)

# c. Encoder converts it into latent representation
# Define the autoencoder model
encoding_dim = 32  # Size of the latent representation

# Input layer
input_layer = Input(shape=(x_train_normal.shape[1],))

# Encoder part
encoder = Dense(128, activation='relu')(input_layer)
encoder = Dense(64, activation='relu')(encoder)
encoder_output = Dense(encoding_dim, activation='relu')(encoder)

# d. Decoder networks convert it back to the original input
# Decoder part
decoder = Dense(64, activation='relu')(encoder_output)
decoder = Dense(128, activation='relu')(decoder)
decoder_output = Dense(x_train_normal.shape[1], activation='sigmoid')(decoder)

# Complete autoencoder model
autoencoder = Model(inputs=input_layer, outputs=decoder_output)

# Encoder model for anomaly detection
encoder_model = Model(inputs=input_layer, outputs=encoder_output)

# e. Compile the models with Optimizer, Loss, and Evaluation Metrics
autoencoder.compile(optimizer=Adam(learning_rate=0.001), loss='mse', metrics=['accuracy'])

# Train the autoencoder on normal data
history = autoencoder.fit(
    x_train_normal, x_train_normal,
    epochs=50,
    batch_size=64,
    validation_data=(x_val_normal, x_val_normal),
    verbose=2
)

# Evaluate the model on test data (to check for reconstruction loss)
x_test_normal = x_test[y_test <= 4]  # Normal samples in test set
x_test_anomalous = x_test[y_test > 4]  # Anomalous samples in test set

# Calculate reconstruction error
reconstructions = autoencoder.predict(x_test_normal)
mse_normal = mean_squared_error(x_test_normal, reconstructions)

reconstructions_anomalous = autoencoder.predict(x_test_anomalous)
mse_anomalous = mean_squared_error(x_test_anomalous, reconstructions_anomalous)

print(f"Mean Squared Error for Normal Test Data: {mse_normal:.4f}")
print(f"Mean Squared Error for Anomalous Test Data: {mse_anomalous:.4f}")

# Plot training and validation loss
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.show()

# Detect anomalies using a threshold on reconstruction error
threshold = mse_normal + 0.02  # Set an appropriate threshold based on validation

# Predict anomalies in the test set
reconstruction_errors = np.mean(np.power(x_test - autoencoder.predict(x_test), 2), axis=1)
anomalies = reconstruction_errors > threshold

# Display the proportion of anomalies detected
print("Anomaly Detection Results")
print(f"Total samples: {x_test.shape[0]}")
print(f"Detected anomalies: {np.sum(anomalies)}")
