In [None]:
from google.colab import drive
drive.mount("/content/gdrive")

Mounted at /content/gdrive


In [None]:
# dataset is stored in gdrive
Data = '/content/gdrive/My Drive/MVTec_dataset/mvtec_anomaly_detection.tar.xz'

In [None]:
import os
import cv2
import lzma
import tarfile
import shutil
import contextlib
import numpy as np
from PIL import Image
import tensorflow as tf
from tensorflow import keras
import matplotlib.pyplot as plt
from tensorflow.keras import layers
from sklearn.metrics import f1_score, confusion_matrix
from tensorflow.keras.losses import BinaryCrossentropy
from tensorflow.keras.models import model_from_json, Model

In [None]:
# since it is tar archive file --> using Lzma to extract this archive compressed file
with contextlib.closing(lzma.LZMAFile(Data)) as xz:
    with tarfile.open(fileobj=xz) as f:
        f.extractall('./MVTec-AD')

In [None]:
# preparing training set --- contains only good images

# Path to the MVTec-AD dataset folder
dataset_path = './MVTec-AD/'

# List of object names
object_names = [
    'bottle', 'cable', 'capsule', 'carpet', 'grid', 'hazelnut',
    'leather', 'metal_nut', 'pill', 'screw', 'tile', 'toothbrush',
    'wood', 'transistor', 'zipper'
]

output_folder = './train'
if not os.path.exists(output_folder):
    os.makedirs(output_folder)

for object_name in object_names:
    object_folder = os.path.join(dataset_path, object_name, 'train', 'good')
    good_images = [f for f in os.listdir(object_folder) if f.endswith('.png')]

    for image in good_images:
        src_path = os.path.join(object_folder, image)
        dst_path = os.path.join(output_folder, image)
        shutil.copy(src_path, dst_path)

print("All good images have been collected into the 'train' folder.")

All good images have been collected into the 'train' folder.


In [None]:
# Resizing the training images to 256 x 256 size
train_folder = './train'

# Destination folder to store resized training images
output_folder ='./train_normal_images'


if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# Desired image size
target_size = (256, 256)


for filename in os.listdir(train_folder):
    if filename.endswith('.png'):
        image_path = os.path.join(train_folder, filename)
        image = Image.open(image_path)
        image = image.convert('RGB')  # Convert to RGB format
        image = image.resize(target_size, Image.ANTIALIAS)  # Resize
        output_path = os.path.join(output_folder, filename)
        image.save(output_path)

print("Images have been resized and converted to RGB format.")

Images have been resized and converted to RGB format.


In [None]:
# check if some image is out of shape
for filename in os.listdir('./train_normal_images/'):
  image_path = os.path.join('train_normal_images', filename)
  img = Image.open(image_path)
  if img.size != (256, 256):
    print()
    print( filename, " is Out of shape!")
    break
print("All images are of same size")

All images are of same size


In [None]:
# Preprocessing images to numpy array
def load_and_preprocess_images(folder_path, target_size):
    images = []
    for filename in os.listdir(folder_path):
        if filename.endswith('.png'):
            image_path = os.path.join(folder_path, filename)
            img = keras.preprocessing.image.load_img(image_path, target_size=target_size)
            img_array = keras.preprocessing.image.img_to_array(img)
            images.append(img_array)
    return np.array(images)

In [None]:
# autoencoder architecture
def build_autoencoder(input_shape, latent_dim):
    # Encoder
    encoder_input = keras.Input(shape=input_shape)
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(encoder_input)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = layers.MaxPooling2D((2, 2), padding='same')(x)
    x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    encoder_output = layers.MaxPooling2D((2, 2), padding='same')(x)

    # Decoder Architecture (Modified to output (256, 256, 3))
    x = layers.Conv2D(128, (3, 3), activation='relu', padding='same')(encoder_output)
    x = layers.UpSampling2D((2, 2))(x)
    x = layers.Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)
    x = layers.Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = layers.UpSampling2D((2, 2))(x)
    decoder_output = layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same')(x)

    # Combine encoder and decoder into an autoencoder model
    autoencoder = keras.Model(encoder_input, decoder_output)
    return autoencoder

In [None]:
# Input shape and latent dimension for the autoencoder
input_shape = (256, 256, 3)
latent_dim = 64


train_normal_data = load_and_preprocess_images('./train_normal_images/',
                                               target_size=input_shape[:2])

# Build the autoencoder
autoencoder = build_autoencoder(input_shape, latent_dim)
autoencoder.summary()

# Compile the autoencoder
autoencoder.compile(optimizer='adam', loss='mean_squared_error')
# autoencoder.compile(optimizer='adam', loss='BinaryCrossentropy')  even after Using

# Ensure that all images have the same shape (256x256)
train_normal_data = np.array([img if img.shape == input_shape else cv2.resize(img, input_shape[:2], interpolation=cv2.INTER_AREA) for img in train_normal_data])

# Normalize and reshape the data if needed
train_normal_data = train_normal_data.astype('float32') / 255.0

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 256, 256, 3)]     0         
                                                                 
 conv2d (Conv2D)             (None, 256, 256, 32)      896       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 128, 128, 32)     0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 128, 128, 64)      18496     
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 64, 64, 64)       0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 64, 64, 128)       73856 

In [None]:
train_normal_data.shape

(391, 256, 256, 3)

In [None]:
# Train the autoencoder on the normal data -- Images without any anomaly
epochs = 50
batch_size = 32
autoencoder.fit(train_normal_data, train_normal_data,
                epochs=epochs,
                batch_size=batch_size,
                shuffle=True)

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


<keras.callbacks.History at 0x7c0ab0772530>

In [None]:
model_json = autoencoder.to_json()
with open("autoencoder_model.json", "w") as json_file:
    json_file.write(model_json)

# Save the learned weights to an HDF5 file
autoencoder.save_weights("autoencoder_weights.h5")

In [None]:
  # Path to the MVTec-AD dataset folders
data_folder = './MVTec-AD/'

# Path to the test folder where all images will be combined (Good + anamoly)
combined_test_folder = 'test_images'

if not os.path.exists(combined_test_folder):
    os.makedirs(combined_test_folder)


for obj_folder in os.listdir(data_folder):
    if obj_folder == '.ipynb_checkpoints': continue
    obj_test_folder = os.path.join(data_folder, obj_folder, 'test')

    # Combine good and defect images into the combined test folder
    for subfolder in os.listdir(obj_test_folder):
        subfolder_path = os.path.join(obj_test_folder, subfolder)
        if subfolder == 'good':
            combined_subfolder = os.path.join(combined_test_folder, 'normal')
        else:
            combined_subfolder = os.path.join(combined_test_folder, 'anomaly')

        # Ensure the combined subfolder exists
        os.makedirs(combined_subfolder, exist_ok=True)

        # Copy images from the current subfolder to the combined subfolder
        for filename in os.listdir(subfolder_path):
            src_image_path = os.path.join(subfolder_path, filename)
            dest_image_path = os.path.join(combined_subfolder, filename)
            shutil.copy(src_image_path, dest_image_path)

In [None]:
# Path to the test data folders
test_data_folder = './test_images/'
anomaly_folder = os.path.join(test_data_folder, 'anomaly')
normal_folder = os.path.join(test_data_folder, 'normal')


target_size = (256, 256)

# Load and preprocess the images from the anomaly and normal folders
anomaly_test_data = load_and_preprocess_images(anomaly_folder, target_size)
normal_test_data = load_and_preprocess_images(normal_folder, target_size)

# Combine the anomaly and normal test data
test_data = np.concatenate((anomaly_test_data, normal_test_data), axis=0)


# Normalize the pixel values to the range [0, 1]
test_data = test_data.astype('float32') / 255.0
test_images_reconstructed = autoencoder.predict(test_data)
reconstruction_errors = np.mean(np.square(test_data - test_images_reconstructed), axis=(1, 2, 3))


anomaly_threshold = 0.002

# Perform Zero-Anomaly Classification
# Note :  0 for normal images and 1 for anomalous images

predictions = (reconstruction_errors > anomaly_threshold).astype(int)

anomaly_indices = np.where(reconstruction_errors > anomaly_threshold)[0]




In [None]:
#true labels of the test data (0 for normal, 1 for anomaly)
true_labels = np.concatenate((np.zeros(len(normal_test_data)), np.ones(len(anomaly_test_data))))

# F1-score
f1 = f1_score(true_labels, predictions)

# confusion matrix
conf_matrix = confusion_matrix(true_labels, predictions)

print("F1-score:", f1)
print("Confusion Matrix:")
print(conf_matrix)


F1-score: 0.5233644859813085
Confusion Matrix:
[[11 49]
 [ 2 28]]


In [None]:
# Create a new model that extracts embeddings from the bottleneck layer
bottleneck_layer_name = 'conv2d_6'  # Replace with the name of your bottleneck layer
encoder_model = Model(inputs=autoencoder.input, outputs=autoencoder.get_layer(bottleneck_layer_name).output)

# Obtain the embeddings from the encoder model
test_embeddings = encoder_model.predict(test_data)



In [None]:
semantic_embeddings_anomalies = {}

for i in range(len(test_embeddings)):
  semantic_embeddings_anomalies['anomaly_type_'+str(i)] = test_embeddings[i]

dict_keys(['anomaly_type_0', 'anomaly_type_1', 'anomaly_type_2', 'anomaly_type_3', 'anomaly_type_4', 'anomaly_type_5', 'anomaly_type_6', 'anomaly_type_7', 'anomaly_type_8', 'anomaly_type_9', 'anomaly_type_10', 'anomaly_type_11', 'anomaly_type_12', 'anomaly_type_13', 'anomaly_type_14', 'anomaly_type_15', 'anomaly_type_16', 'anomaly_type_17', 'anomaly_type_18', 'anomaly_type_19', 'anomaly_type_20', 'anomaly_type_21', 'anomaly_type_22', 'anomaly_type_23', 'anomaly_type_24', 'anomaly_type_25', 'anomaly_type_26', 'anomaly_type_27', 'anomaly_type_28', 'anomaly_type_29', 'anomaly_type_30', 'anomaly_type_31', 'anomaly_type_32', 'anomaly_type_33', 'anomaly_type_34', 'anomaly_type_35', 'anomaly_type_36', 'anomaly_type_37', 'anomaly_type_38', 'anomaly_type_39', 'anomaly_type_40', 'anomaly_type_41', 'anomaly_type_42', 'anomaly_type_43', 'anomaly_type_44', 'anomaly_type_45', 'anomaly_type_46', 'anomaly_type_47', 'anomaly_type_48', 'anomaly_type_49', 'anomaly_type_50', 'anomaly_type_51', 'anomaly_ty

In [None]:
def similarity_score(embedding1, embedding2):
    embedding1 = embedding1.flatten()
    embedding2 = embedding2.flatten()

    return np.dot(embedding1, embedding2) / (np.linalg.norm(embedding1) * np.linalg.norm(embedding2))

def zero_shot_anomaly_type_classification(anomalous_embeddings, semantic_embeddings_anomalies):
    anomaly_types = []
    for anomaly_embedding in anomalous_embeddings:
        best_anomaly_type = None
        max_similarity = -1

        for anomaly_type, semantic_embedding in semantic_embeddings_anomalies.items():
            # Calculate the similarity score between the anomaly embedding and each semantic embedding
            similarity = similarity_score(anomaly_embedding, semantic_embedding)
            if similarity > max_similarity:
                max_similarity = similarity
                best_anomaly_type = anomaly_type

        anomaly_types.append(best_anomaly_type)

    return anomaly_types



# Perform zero-shot anomaly type classification
anomalous_embeddings = test_embeddings[anomaly_indices]
predicted_anomaly_types = zero_shot_anomaly_type_classification(anomalous_embeddings, semantic_embeddings_anomalies)

In [None]:

anomaly_images = test_data[anomaly_indices]

# Visualize the anomaly images along with their predicted anomaly types
for i in range(len(anomaly_images)):
    image = anomaly_images[i]
    predicted_anomaly_type = predicted_anomaly_types[i]

    # Plot the image
    plt.imshow(image)
    plt.title(f"Predicted Anomaly Type: {predicted_anomaly_type}")
    plt.axis('off')
    plt.show()