<a href="https://colab.research.google.com/github/rezamohammadloo/Surface_anomaly_detection_using_unsupervised_algorithms/blob/main/_Surface_Defect_Detection_Unsupervised.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Hi, this is a project that has been done to implement some unsupervised algorhimgs for "Anomaly Detection" or "Surface Anomaly Detection" problem on an image dataset with image sizes: 128*128*3.
4 different methodes are implemented for this purpose and the result of them are evaluated by AUROC and AP techniques to identify the best one.
The first algorithm is a VAE (Varitional Autoencoer)
the second one is a regular autoencoder.
the third one is OCSVM(One-Class Support Vector Machine)
and finally the last algorithm is Isolation Forest.
Befor designing and trainin model some of the main libararies and packages have been implemented to use and some functions and variables that are in common between methodes are defined.
Let's dive in!

In [2]:
# Mounting Google Drive to access the dataset stored in my Google Drive account
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [3]:
# Importing necessary libraries for building and training models
import os  # For operating system dependent functionality
import numpy as np  # For numerical operations and array manipulations
import tensorflow as tf  # For deep learning model creation and training
import matplotlib.pyplot as plt  # For plotting and visualizing data
from sklearn.svm import OneClassSVM  # For One-Class SVM, a traditional anomaly detection method
from sklearn.pipeline import Pipeline  # For creating pipelines that streamline preprocessing and model training
from tensorflow.keras.models import Model  # For building Keras models
from sklearn.ensemble import IsolationForest  # For Isolation Forest, another traditional anomaly detection method
from sklearn.preprocessing import StandardScaler  # For standardizing features by removing the mean and scaling to unit variance
from sklearn.model_selection import GridSearchCV  # For performing hyperparameter tuning using grid search
from tensorflow.keras.callbacks import EarlyStopping  # For stopping training early if the model stops improving
from sklearn.model_selection import train_test_split  # For splitting data into training and testing sets
from tensorflow.keras import layers, models, backend as K  # For creating layers, models, and accessing Keras backend
from sklearn.metrics import roc_auc_score, average_precision_score  # For evaluating model performance using ROC AUC and average precision
from tensorflow.keras.preprocessing.image import img_to_array, load_img  # For converting images to arrays and loading images
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D  # For creating convolutional autoencoders

#0.General Functions and parameters
There are some finctions and varaiables that will be used in some or all of the methods.


In [4]:
# Define a function to load images from a specified directories
def load_images_from_directory(directory, target_size=(128, 128)):
    images = []
    for filename in os.listdir(directory):
        if filename.endswith('.png') or filename.endswith('.jpg'):
            img = load_img(os.path.join(directory, filename), target_size=target_size)
            img_array = img_to_array(img) #/ 255.0  #Deviding by 255 in canceled here because we don't need to do it for third and furth method and we'll do it in the first and second method by hand.
            images.append(img_array)
    return np.array(images)

In [5]:
# Directories
# Specify the directories containing normal, anomaly, and mask images
normal_dir  = '/content/gdrive/MyDrive/glass-defect-sample/good'
anomaly_dir = '/content/gdrive/MyDrive/glass-defect-sample/anomaly'
mask_dir    = '/content/gdrive/MyDrive/glass-defect-sample/mask'

In [6]:
# Flatten images to vectors, it will be used for third and furth methodes
def flatten_images(images):
    return images.reshape(images.shape[0], -1)

In [7]:
# Define a function to compute reconstruction errors.
def compute_reconstruction_error(original, reconstructed):
    return np.mean(np.abs(original - reconstructed), axis=(1, 2, 3))

In [8]:
# Load data
# Load the normal, anomaly, and mask images from their respective directories
normal_images  = load_images_from_directory(normal_dir)
anomaly_images = load_images_from_directory(anomaly_dir)
mask_images    = load_images_from_directory( mask_dir )

# Print the shape of the loaded images
print("Normal Images Shape:", normal_images.shape)
print("Anomaly Images Shape:", anomaly_images.shape)
print("Mask Images Shape:", mask_images.shape)

Normal Images Shape: (157, 128, 128, 3)
Anomaly Images Shape: (40, 128, 128, 3)
Mask Images Shape: (40, 128, 128, 3)


#1.Varational Autoencoder

First we normalize loaded images to make the most use out of them.

In [9]:
#Normalizing their values to [0, 1]
normal_images  =  normal_images / 255.0
anomaly_images =  anomaly_images / 255.0
mask_images    =  mask_images / 255.0

Now it's time to design the architecture of VAE.

In [10]:
latent_dim = 32 #Dimensionality of the latent space for the Variational Autoencoder (VAE)

# Encoder model definition
def encoder_model(input_shape):
    # Define input layer
    inputs = layers.Input(shape=input_shape)

    # Convolutional layers with ReLU activation and same padding
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(inputs)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)

    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)

    x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)

    # Flatten the output and pass through dense layers to get z_mean and z_log_var
    x = layers.Flatten()(x)
    x = layers.Dense(128, activation='relu')(x)
    z_mean = layers.Dense(latent_dim)(x)      # Mean of the latent space
    z_log_var = layers.Dense(latent_dim)(x)   # Log variance of the latent space

    # Define and return the encoder model
    return models.Model(inputs, [z_mean, z_log_var], name='encoder')

# Sampling function definition for the VAE
def sampling(args):
    # Retrieve z_mean and z_log_var from arguments
    z_mean, z_log_var = args

    # Determine batch size and dimensionality of z_mean
    batch = K.shape(z_mean)[0]
    dim = K.int_shape(z_mean)[1]

    # Sample epsilon from normal distribution
    epsilon = K.random_normal(shape=(batch, dim))

    # Reparameterization trick: sample from the learned distribution
    return z_mean + K.exp(0.5 * z_log_var) * epsilon

# Decoder model definition
def decoder_model(latent_dim, input_shape):
    # Define input layer for latent space
    latent_inputs = layers.Input(shape=(latent_dim,))

    # Dense layer to reshape and feed into convolutional layers
    x = layers.Dense((input_shape[0] // 8) * (input_shape[1] // 8) * 128, activation='relu')(latent_inputs)
    x = layers.Reshape((input_shape[0] // 8, input_shape[1] // 8, 128))(x)

    # Convolutional transpose layers for upsampling and decoding
    x = layers.Conv2DTranspose(128, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)

    x = layers.Conv2DTranspose(64, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)

    x = layers.Conv2DTranspose(32, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)

    # Output layer with sigmoid activation for image reconstruction
    outputs = layers.Conv2DTranspose(3, (3, 3), activation='sigmoid', padding='same')(x)

    # Define and return the decoder model
    return models.Model(latent_inputs, outputs, name='decoder')

# Define input shape for the VAE
input_shape = (128, 128, 3)

# Build the encoder model
encoder = encoder_model(input_shape)
z_mean, z_log_var = encoder.output

# Use the sampling function to obtain latent space representation
z = layers.Lambda(sampling, output_shape=(latent_dim,), name='z')([z_mean, z_log_var])

# Build the decoder model using the latent space representation
decoder = decoder_model(latent_dim, input_shape)

# Connect encoder input to decoder output to create VAE model
outputs = decoder(z)

# Define the VAE model with encoder input and decoder output
vae = models.Model(encoder.input, outputs, name='vae')


For traing this model we need to combine two different loss funcions: "binary cross entropy" and "KL-divergence".

In [11]:
# Calculate the reconstruction loss using binary cross-entropy
reconstruction_loss = tf.keras.losses.binary_crossentropy(K.flatten(encoder.input), K.flatten(outputs))
# Scale the reconstruction loss based on the input shape
reconstruction_loss *= input_shape[0] * input_shape[1] * input_shape[2]

# Calculate the KL divergence loss
kl_loss = 1 + z_log_var - K.square(z_mean) - K.exp(z_log_var)
kl_loss = K.sum(kl_loss, axis=-1)
kl_loss *= -0.5

# Calculate the total VAE loss by combining the reconstruction and KL divergence losses
vae_loss = K.mean(reconstruction_loss + kl_loss)

# Add the VAE loss to the model
vae.add_loss(vae_loss)

# Compile the VAE model with the Adam optimizer
vae.compile(optimizer='adam')

In [12]:
# Define the history object to store training metrics
history = vae.fit(
    # Train the VAE model on normal images with normal images as both input and target
    normal_images, normal_images,
    epochs=500,  # Number of epochs for training
    batch_size=8,  # Batch size for training
    #validation_split=0.2,  # Fraction of training data to use for validation
    callbacks=EarlyStopping(  # Early stopping callback to prevent overfitting
        monitor="loss",  # Monitor validation loss
        verbose=1,  # Verbosity mode (1: progress bar, 0: silent)
        patience=6,  # Number of epochs with no improvement after which training will be stopped
        mode='min',  # Direction of improvement (minimize validation loss)
        restore_best_weights=True  # Restore the model weights from the epoch with the best validation loss
    )
)


Epoch 1/500
Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500
Epoch 6/500
Epoch 7/500
Epoch 8/500
Epoch 9/500
Epoch 10/500
Epoch 11/500
Epoch 12/500
Epoch 13/500
Epoch 14/500
Epoch 15/500
Epoch 16/500
Epoch 17/500
Epoch 18/500
Epoch 19/500
Epoch 20/500
Epoch 21/500
Epoch 22/500
Epoch 23/500
Epoch 24/500
Epoch 25/500
Epoch 26/500
Epoch 27/500
Epoch 28/500
Epoch 29/500
Epoch 30/500
Epoch 31/500
Epoch 32/500
Epoch 33/500
Epoch 34/500
Epoch 34: early stopping


Reconstructing images using designed model in order to determice threshold

In [13]:
# Reconstruct normal and anomaly images using the trained VAE model
reconstructed_normal  = vae.predict(normal_images)
reconstructed_anomaly = vae.predict(anomaly_images)

# Compute reconstruction errors for normal and anomaly images
normal_errors  = compute_reconstruction_error(normal_images,  reconstructed_normal)
anomaly_errors = compute_reconstruction_error(anomaly_images, reconstructed_anomaly)

# Print mean reconstruction errors for normal and anomaly images
print("Mean  reconstruction  error  for normal  images :",np.mean(normal_errors))
print("Mean reconstruction error for anomalous images:", np.mean(anomaly_errors))

Mean  reconstruction  error  for normal  images : 0.021452293
Mean reconstruction error for anomalous images: 0.09324009


In [15]:
# Combine the reconstruction errors and true labels
all_errors = np.concatenate([normal_errors, anomaly_errors])
# Define a threshold to classify anomalies based on reconstruction errors
threshold = np.mean(all_errors)  # Adjust this threshold based on your specific use case

true_labels = np.concatenate([np.zeros(len(normal_errors)), np.ones(len(anomaly_errors))])

# Predict labels based on the threshold
predicted_labels = (all_errors > threshold).astype(int)

# Calculate AUC-ROC
vae_aucroc = roc_auc_score(true_labels, all_errors)
print(f"auc_roc for VAE: {vae_aucroc:0.4f}")

# Calculate Average Precision (AP)
vae_ap = average_precision_score(true_labels, all_errors)
print(f"AP for VAE: {average_precision}")

auc_roc for VAE: 0.9239
AP for VAE: 0.787688091850614


This is the result for the first model,not so bad, but not great either, let's go the next method.

#2.Regular Autoencoder


In [None]:
#Loading and normalizing images
normal_images  = load_images_from_directory(normal_dir) / 255.0
anomaly_images = load_images_from_directory(anomaly_dir) / 255.0
mask_images    = load_images_from_directory( mask_dir ) / 255.0


In [None]:
# Define a function to build an autoencoder model for image reconstruction.

def build_autoencoder(input_shape):
    input_img = Input(shape=input_shape)  # Input layer for images of shape input_shape

    # Encoder layers
    x = Conv2D(32, (3, 3), activation='relu', padding='same')(input_img)  # Convolutional layer with 32 filters, ReLU activation, and same padding
    x = MaxPooling2D((2, 2), padding='same')(x)  # Max pooling layer with pool size (2, 2) and same padding
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)  # Convolutional layer with 64 filters, ReLU activation, and same padding
    x = MaxPooling2D((2, 2), padding='same')(x)  # Max pooling layer with pool size (2, 2) and same padding

    # Decoder layers
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)  # Convolutional layer with 64 filters, ReLU activation, and same padding
    x = UpSampling2D((2, 2))(x)  # Up-sampling layer with up-sampling size (2, 2)
    x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)  # Convolutional layer with 32 filters, ReLU activation, and same padding
    x = UpSampling2D((2, 2))(x)  # Up-sampling layer with up-sampling size (2, 2)
    decoded = Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)  # Convolutional layer with 3 filters, sigmoid activation, and same padding

    # Define the autoencoder model
    autoencoder = Model(input_img, decoded)  # Create a model instance mapping input_img to decoded
    autoencoder.compile(optimizer='adam', loss='binary_crossentropy')  # Compile the model with Adam optimizer and binary cross-entropy loss

    return autoencoder  # Return the constructed autoencoder model

# Build the autoencoder model with input shape (128, 128, 3)
autoencoder = build_autoencoder(input_shape=(128, 128, 3))

# Print the summary of the autoencoder model architecture
autoencoder.summary()  # Display the model layers, shapes, and number of parameters


Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_10 (InputLayer)       [(None, 128, 128, 3)]     0         
                                                                 
 conv2d_15 (Conv2D)          (None, 128, 128, 32)      896       
                                                                 
 max_pooling2d_15 (MaxPooli  (None, 64, 64, 32)        0         
 ng2D)                                                           
                                                                 
 conv2d_16 (Conv2D)          (None, 64, 64, 64)        18496     
                                                                 
 max_pooling2d_16 (MaxPooli  (None, 32, 32, 64)        0         
 ng2D)                                                           
                                                                 
 conv2d_17 (Conv2D)          (None, 32, 32, 64)        36928 

In [None]:
# Fit the autoencoder model on normal_images for image reconstruction.

history = autoencoder.fit(normal_images, normal_images,  # Train the model on normal_images for reconstruction
                          epochs=500,  # Number of training epochs
                          batch_size=8,  # Batch size for each iteration
                          verbose=1,  # Verbosity mode (1: progress bar, 0: silent)
                          callbacks=EarlyStopping(  # Early stopping callback to prevent overfitting
                              monitor="loss",  # Monitor training loss
                              verbose=1,  # Verbosity mode (1: progress bar, 0: silent)
                              patience=6,  # Number of epochs with no improvement after which training will be stopped
                              mode='min',  # Direction of improvement (minimize training loss)
                              restore_best_weights=True  # Restore the model weights from the epoch with the best training loss
                          )
                         )


Epoch 1/500
Epoch 2/500
Epoch 3/500
Epoch 4/500
Epoch 5/500
Epoch 6/500
Epoch 7/500
Epoch 8/500
Epoch 9/500
Epoch 10/500
Epoch 11/500
Epoch 12/500
Epoch 13/500
Epoch 14/500
Epoch 15/500
Epoch 16/500
Epoch 17/500
Epoch 18/500
Epoch 19/500
Epoch 20/500
Epoch 21/500
Epoch 22/500
Epoch 23/500
Epoch 24/500
Epoch 25/500
Epoch 26/500
Epoch 27/500
Epoch 28/500
Epoch 29/500
Epoch 30/500
Epoch 31/500
Epoch 32/500
Epoch 33/500
Epoch 34/500
Epoch 35/500
Epoch 36/500
Epoch 37/500
Epoch 38/500
Epoch 39/500
Epoch 40/500
Epoch 41/500
Epoch 42/500
Epoch 43/500
Epoch 44/500
Epoch 45/500
Epoch 46/500
Epoch 47/500
Epoch 48/500
Epoch 49/500
Epoch 50/500
Epoch 51/500
Epoch 51: early stopping


In [None]:
# Extract the loss and validation loss from the training history
loss = history.history['loss']

# Plot the loss and validation loss
epochs = range(1, len(loss) + 1)

plt.figure(figsize=(10, 6))
plt.plot(epochs, loss, 'b', label='Training loss')  # Plot training loss
plt.xlabel('Epochs')  # Set x-axis label
plt.ylabel('Loss')  # Set y-axis label
plt.legend()  # Display legend
plt.show()  # Show plot

In [None]:

# Predict on anomaly images
reconstructed_anomalies = autoencoder.predict(anomaly_images)

# Calculate reconstruction error
reconstruction_errors = calculate_reconstruction_error(anomaly_images, reconstructed_anomalies)

# Threshold for anomaly detection (this can be tuned)
threshold = np.percentile(reconstruction_errors, 95)

# Detect anomalies
anomalies_detected = reconstruction_errors > threshold

In [None]:
# Predict on both good and anomaly images
reconstructed_good = autoencoder.predict(normal_images)
reconstructed_anomalies = autoencoder.predict(anomaly_images)

# Calculate reconstruction error for both
good_errors = calculate_reconstruction_error(normal_images, reconstructed_good)
anomaly_errors = calculate_reconstruction_error(anomaly_images, reconstructed_anomalies)

# Combine errors and labels
all_errors = np.concatenate([good_errors, anomaly_errors])
all_labels = np.concatenate([np.zeros(len(good_errors)), np.ones(len(anomaly_errors))])

# Compute AUROC
auroc = roc_auc_score(all_labels, all_errors)
print(f'AUROC: {auroc:.4f}')

# Compute Average Precision (AP)
ap = average_precision_score(all_labels, all_errors)
print(f'Average Precision (AP): {ap:.4f}')

In [None]:
# Predict on both good and anomaly images
reconstructed_normal = autoencoder.predict(normal_images)
reconstructed_anomalies = autoencoder.predict(anomaly_images)

# Calculate reconstruction error
normal_reconstruction_errors = calculate_reconstruction_error(normal_images, reconstructed_normal)
anomaly_reconstruction_errors = calculate_reconstruction_error(anomaly_images, reconstructed_anomalies)
reconstruction_errors = (np.mean(anomaly_reconstruction_errors) + np.mean(normal_reconstruction_errors))/2

all_errors = np.concatenate([normal_reconstruction_errors, anomaly_reconstruction_errors])
all_labels = np.concatenate([np.zeros(len(good_errors)), np.ones(len(anomaly_errors))])

# Threshold for anomaly detection (this can be tuned)
threshold = np.percentile(reconstruction_errors, 20)
# threshold = np.mean(reconstruction_errors)

predicted_labels = (all_errors > threshold).astype(int)
# Detect anomalies

# Compute AUROC
auroc = roc_auc_score(all_labels, predicted_labels)
print(f'AUROC: {auroc:.4f}')

# Compute Average Precision (AP)
ap = average_precision_score(all_labels, predicted_labels)
print(f'Average Precision (AP): {ap:.4f}')

#One-class SVM

In [None]:
# Flatten the images
X_flat = normal_images.reshape(normal_images.shape[0], -1)
# Scale the data
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_flat)
# Train One-Class SVM
ocsvm = OneClassSVM(kernel='rbf', nu=0.1)  # Adjust parameters as needed
ocsvm.fit(X_scaled)

anomaly_scaled = anomaly_images.reshape(anomaly_images.shape[0], -1)
anomaly_scaled = scaler.transform(anomaly_scaled)


# Predict images
all_images = np.concatenate((X_scaled, anomaly_scaled), axis=0)
all_labels = np.concatenate((np.ones(X_scaled.shape[0]) , -1*np.ones(anomaly_scaled.shape[0])))

all_prediction = ocsvm.predict(all_images)

auc_roc = roc_auc_score(all_labels, all_prediction)
print(f"AUC-ROC: {auc_roc}")

# Calculate Average Precision (AP)
average_precision = average_precision_score(all_labels, all_prediction)
print(f"Average Precision (AP): {average_precision}")



AUC-ROC: 0.9522292993630573
Average Precision (AP): 0.9806007307058101


#Isonation Forest

In [None]:
X_good = flatten_images(normal_images)
X_anomaly = flatten_images(anomaly_images)

In [None]:
# Scale the data
scaler = StandardScaler()
X_good = scaler.fit_transform(X_good)
X_anomaly = scaler.transform(X_anomaly)

In [None]:
# Define the parameter grid
# param_grid = {
#     'n_estimators': [100, 200, 300],
#     'max_samples': [128, 256, 512, 'auto'],
#     'contamination': [0.01, 0.05, 0.1],
#     'max_features': [0.5, 0.75, 1.0],
#     'bootstrap': [True, False]
# }

param_grid = {
    'n_estimators': [100],
    'max_samples': ['auto'],
    'contamination': [0.01, 0.05, 0.1],
    'max_features': [0.5, 0.75, 1.0],
    'bootstrap': [False]
}

# Initialize the Isolation Forest model
isolation_forest = IsolationForest(random_state=42)

# Initialize the GridSearchCV object
grid_search = GridSearchCV(estimator=isolation_forest, param_grid=param_grid, scoring='roc_auc', cv=5, n_jobs=-1)

grid_search.fit(X_good)

# Print the best parameters and the corresponding score
print("Best parameters found: ", grid_search.best_params_)
print("Best AUCROC score: ", grid_search.best_score_)


all_images = np.concatenate((X_good, X_anomaly), axis=0)
all_labels = np.concatenate((np.ones(X_good.shape[0]) , -1*np.ones(X_anomaly.shape[0])))

predictions = isolation_forest.predict(all_images)

In [None]:
# Compute AUCROC
aucroc = roc_auc_score(predictions, all_labels)

# Compute AP
ap = average_precision_score(predictions, all_labels)

print(f'AUCROC: {aucroc}')
print(f'AP: {ap}')