In [1]:
### downloading the datase
!wget https://www.mydrive.ch/shares/38536/3830184030e49fe74747669442f0f282/download/420938113-1629952094/mvtec_anomaly_detection.tar.xz

--2023-07-15 04:44:50--  https://www.mydrive.ch/shares/38536/3830184030e49fe74747669442f0f282/download/420938113-1629952094/mvtec_anomaly_detection.tar.xz
Resolving www.mydrive.ch (www.mydrive.ch)... 91.214.169.64
Connecting to www.mydrive.ch (www.mydrive.ch)|91.214.169.64|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5264982680 (4.9G) [application/x-xz]
Saving to: ‘mvtec_anomaly_detection.tar.xz’


2023-07-15 04:49:36 (17.6 MB/s) - ‘mvtec_anomaly_detection.tar.xz’ saved [5264982680/5264982680]



In [2]:
!mkdir /content/data

In [3]:
## unzipping tar file
!tar xf /content/mvtec_anomaly_detection.tar.xz -C /content/data

### Few-shot Learning
- Algotithm : Simese Network (Metric based Learning).


**Import dependencies**

In [4]:
import os
import glob
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.keras.backend as K
from sklearn.model_selection import train_test_split

**Configurations**

In [5]:
## Configurations
class CNF:
    seed = 42
    data_folder = "/content/data"  ### replace with your training data folder
    target_shape = (224, 224)
    labels = True # using labels or not.
    shots = 5 ## Number of samples in a class


    ### dataset configs
    buffer_size = 1024
    batch_size = 32
    validation_size = 0.2 ## 20% of data from the training dataset.

    epochs = 10

    ### training hyperparameters
    learning_rate = 0.00001

In [6]:
### checking gpus
### Detecting the GPU and Setting distribution strategy based on the GPU or CPU.
gpus = tf.config.experimental.list_logical_devices('GPU')
if(len(gpus)>0):
    CNF.device = "GPU"
    strategy = tf.distribute.MirroredStrategy(devices=gpus)
    print("> Running on {} | Number of devices {}".format(CNF.device, len(gpus)))

else:
    CNF.device = "CPU"
    strategy = tf.distribute.get_strategy() ## here we are getting the default strategy.
    print("> Running on {}".format(CNF.device))

print("> Number of replicas : ", strategy.num_replicas_in_sync)

> Running on GPU | Number of devices 1
> Number of replicas :  1


### **Preparing Dataset**

    > Normal -- not an anomaly
    > abnormal -- Anomaly

**Set pairs**

    > normal - normal
    > abnormal - abnormal
    > normal - abnormal



**Seeting Constrastive Learning**
- 0 : Similar (normal - normal) / (abnormal - abnormal)
- 1 : Not Similar (normal - abnormal)

In [7]:
CNF.classes = [d for d in os.listdir(CNF.data_folder) if os.path.isdir(os.path.join(CNF.data_folder, d))]

In [8]:
dataframes = []
for class_ in CNF.classes:
  dir_class = os.path.join(CNF.data_folder, class_,)
  normal_paths = random.sample(glob.glob(os.path.join(dir_class, "train", "good","*.png")), CNF.shots)

  abnormal_defect_classes = [d for d in os.listdir(os.path.join(dir_class, "test")) if d!="good"]
  abnormal_paths = list(set([random.sample(glob.glob(os.path.join(dir_class, "test", random.sample(abnormal_defect_classes, 1)[0],"*.png")), 1)[0] for i in range(CNF.shots)]))

  ### getting pairs for the training the deep learning models.
  normal_normal = []
  for i in range(len(normal_paths)-1):
    for j in range(i+1, len(normal_paths)):
      nn = [normal_paths[i], normal_paths[j]]
      normal_normal.append(nn)

  abnormal_abnormal = []
  for i in range(len(abnormal_paths)-1):
    for j in range(i+1, len(abnormal_paths)):
      nn = [abnormal_paths[i], abnormal_paths[j]]
      abnormal_abnormal.append(nn)


  normal_abnormal = []
  for i in range(len(normal_paths)):
    for j in range(len(abnormal_paths)):
      nn = [normal_paths[i], abnormal_paths[j]]
      normal_abnormal.append(nn)


  ## defining the labels
  normal_normal_label = [0]*len(normal_normal)
  abnormal_abnormal_label = [0]*len(abnormal_abnormal)
  normal_abnormal_label = [1]*len(normal_abnormal)

  image_samples = normal_normal + abnormal_abnormal + normal_abnormal
  labels = normal_normal_label + abnormal_abnormal_label + normal_abnormal_label
  classes = [class_]*len(labels)
  ### preparing the dataset from the pairs of dataset.
  df = pd.DataFrame(image_samples, columns=["img1_path", "img2_path"])
  df["labels"] = labels
  df["classes"] = classes
  dataframes.append(df)


In [9]:
### Concatenated dataframes
anomaly_df = pd.concat(dataframes)

In [10]:
anomaly_df.head()

Unnamed: 0,img1_path,img2_path,labels,classes
0,/content/data/carpet/train/good/124.png,/content/data/carpet/train/good/238.png,0,carpet
1,/content/data/carpet/train/good/124.png,/content/data/carpet/train/good/101.png,0,carpet
2,/content/data/carpet/train/good/124.png,/content/data/carpet/train/good/031.png,0,carpet
3,/content/data/carpet/train/good/124.png,/content/data/carpet/train/good/188.png,0,carpet
4,/content/data/carpet/train/good/238.png,/content/data/carpet/train/good/101.png,0,carpet


In [11]:
### Counting the labels
anomaly_df["labels"].value_counts()

1    355
0    284
Name: labels, dtype: int64

In [12]:
### splitting the dataset
anomaly_df = anomaly_df.sample(frac = 1)
train_data, test_data = train_test_split(anomaly_df, stratify= anomaly_df["labels"])

In [13]:
print("the shape of the training data : ", train_data.shape)
print("the shape of the testing data : ", test_data.shape)

the shape of the training data :  (479, 4)
the shape of the testing data :  (160, 4)


**Data Generator**

In [14]:
def samples_labels(img1_path, img2_path, c):
  image1 = tf.io.read_file(img1_path)
  image1 = tf.io.decode_png(image1, channels=3)
  image1 = tf.keras.preprocessing.image.smart_resize(image1, size = CNF.target_shape)

  image2 = tf.io.read_file(img2_path)
  image2 = tf.io.decode_png(image2, channels=3)
  image2 = tf.keras.preprocessing.image.smart_resize(image2, size = CNF.target_shape)

  return (image1/255, image2/255), c


### generating the tensorflow data generator.
def DataLoader(df):

  dataset = tf.data.Dataset.from_tensor_slices((df["img1_path"].values, df["img2_path"].values, df["labels"].values))
  dataset = dataset.map(samples_labels)
  dataset = dataset.shuffle(CNF.buffer_size)
  dataset = dataset.batch(CNF.batch_size)
  return dataset


### Few-shot Learning
- Building a siamese Network.
    - Here we will taking a VGG16 has a twin network.

In [29]:
base_model = tf.keras.applications.ResNet50(
    include_top=False,
    weights="imagenet",
    input_shape=(CNF.target_shape[0], CNF.target_shape[1], 3)
)
base_model.trainable = False

x = tf.keras.layers.GlobalAveragePooling2D()(base_model.outputs[0])
# x = tf.keras.layers.Dense(units=256, activation="relu")(x)
# x = tf.keras.layers.BatchNormalization()(x)
output = tf.keras.layers.Dense(units=128, activation="relu")(x)

twin_model = tf.keras.Model(base_model.inputs, outputs=output, name= "Embedding")




In [30]:
input_shape = (CNF.target_shape[0], CNF.target_shape[0], 3)
input1 = tf.keras.layers.Input(input_shape, name = "image1")
input2 = tf.keras.layers.Input(input_shape, name = "image2")

In [31]:
### Compute similarity layers
class Eucidiean_distance(tf.keras.layers.Layer):
  def __init__(self,margin=1, **kwargs):
    super().__init__(**kwargs)

  def call(self, input1, input2):
    sum_square = K.sum(K.square(input1 - input2), axis=1, keepdims=True)
    d =  K.sqrt(K.maximum(sum_square, K.epsilon()))
    return d


def euclidean_distance(vects):
    x, y = vects
    sum_square = K.sum(K.square(x - y), axis=1, keepdims=True)
    return K.sqrt(K.maximum(sum_square, K.epsilon()))


def eucl_dist_output_shape(shapes):
    shape1, shape2 = shapes
    return (shape1[0], 1)




In [32]:
net1 = twin_model(input1)
net2 = twin_model(input2)

# similarites = Compute_similarity()(net1, net2)
# out_distance = tf.keras.layers.Lambda(euclidean_distance, name="output_layer", output_shape=eucl_dist_output_shape)([net1, net2])
out_distance = Eucidiean_distance(margin=1)(net1, net2)
norm = tf.keras.layers.BatchNormalization()(out_distance)
sig_out = tf.keras.layers.Dense(1, activation = "sigmoid")(norm)


In [33]:
Siemese_Network = tf.keras.Model(inputs = [input1, input2], outputs = sig_out
                                 )

In [34]:
### Summary of the siemese Network
Siemese_Network.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 image1 (InputLayer)            [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 image2 (InputLayer)            [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Embedding (Functional)         (None, 128)          23849984    ['image1[0][0]',                 
                                                                  'image2[0][0]']           

In [35]:
!pip install tensorflow-addons



In [36]:
import tensorflow_addons as tfa

### Training the Network

In [37]:
loss = tf.keras.losses.BinaryCrossentropy()
optimizer = tf.keras.optimizers.Adam(learning_rate = 0.00001)
metrics = [tf.keras.metrics.Accuracy()]
def contrastive_loss(y, preds):
  margin = 1
  y = tf.cast(y, preds.dtype)
  squaredPreds = K.square(preds)
  squaredMargin = K.square(K.maximum(margin - preds, 0))
  loss = K.mean(y * squaredPreds + (1 - y) * squaredMargin)
  return loss



### metrics
# f1_score = tfa.metrics.F1Score(num_classes=2)
precision = tf.keras.metrics.Precision()
recall = tf.keras.metrics.Recall()

def f1_score(y_true, y_preds):
  pre = precision(y_true, y_preds)
  rec = recall(y_true, y_preds)

  return 2*((pre*rec)/ (pre + rec))



## loading the data generators
train_datagenerator = DataLoader(train_data)
val_datagenerator = DataLoader(test_data)


In [38]:
Siemese_Network.compile(loss=contrastive_loss, optimizer=optimizer, metrics=["accuracy", f1_score, precision, recall])

In [39]:
Siemese_Network.fit(train_datagenerator, epochs = CNF.epochs, validation_data=val_datagenerator)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7998bf63ae30>

**Preparing the testing data**

In [40]:
dataframes = []
for class_ in CNF.classes:
  dir_class = os.path.join(CNF.data_folder, class_,)
  train_normal_paths = glob.glob(os.path.join(dir_class, "train", "good","*.png"))[:1]

  test_abnormal_paths = []
  test_normal_paths = []
  for i in os.listdir(os.path.join(dir_class, "test")):
    dir_path = os.path.join(dir_class, "test", i)
    if(os.path.isdir(dir_path)):
      if(i =="good"):
        test_normal_paths += glob.glob(os.path.join(dir_path, "*.png"))
      else:
        test_abnormal_paths += glob.glob(os.path.join(dir_path, "*.png"))


  ### getting pairs for the training the deep learning models.
  test_normal_normal = []
  for i in range(len(train_normal_paths)):
    for j in range(len(test_normal_paths)):
      nn = [train_normal_paths[i], test_normal_paths[j]]
      test_normal_normal.append(nn)

  test_normal_abnormal = []
  for i in range(len(train_normal_paths)):
    for j in range(len(test_abnormal_paths)):
      nn = [train_normal_paths[i], test_abnormal_paths[j]]
      test_normal_abnormal.append(nn)


  ## defining the labels
  test_normal_normal_label = [0]*len(test_normal_normal)

  test_normal_abnormal_label = [1]*len(test_normal_abnormal)

  image_samples = test_normal_normal + test_normal_abnormal
  labels = test_normal_normal_label + test_normal_abnormal_label
  classes = [class_]*len(labels)
  ### preparing the dataset from the pairs of dataset.
  df = pd.DataFrame(image_samples, columns=["img1_path", "img2_path"])
  df["labels"] = labels
  df["classes"] = classes
  dataframes.append(df)

In [41]:
test_df =pd.concat(dataframes)

In [42]:
test_df.head()

Unnamed: 0,img1_path,img2_path,labels,classes
0,/content/data/carpet/train/good/082.png,/content/data/carpet/test/good/026.png,0,carpet
1,/content/data/carpet/train/good/082.png,/content/data/carpet/test/good/012.png,0,carpet
2,/content/data/carpet/train/good/082.png,/content/data/carpet/test/good/024.png,0,carpet
3,/content/data/carpet/train/good/082.png,/content/data/carpet/test/good/011.png,0,carpet
4,/content/data/carpet/train/good/082.png,/content/data/carpet/test/good/014.png,0,carpet


In [43]:
eval_class = []
eval_accuracy =  []
eval_f1score = []
eval_precision = []
eval_recall = []

for c in CNF.classes:
  temp = test_df.loc[test_df["classes"]==c]
  test_load = DataLoader(temp)
  hist = Siemese_Network.evaluate(test_load)
  eval_class.append(c)
  eval_accuracy.append(hist[1])
  eval_f1score.append(hist[2])
  eval_precision.append(hist[3])
  eval_recall.append(hist[4])




In [46]:
#### evaluations
evals = pd.DataFrame()
evals["class"] = eval_class
evals["accuracy"] = eval_accuracy
evals["f1_score"] = eval_f1score
evals["precision"] = eval_precision
evals["recall"] = eval_recall

In [47]:
evals

Unnamed: 0,class,accuracy,f1_score,precision,recall
0,carpet,0.760684,0.866301,0.760684,1.0
1,metal_nut,0.669565,0.785053,0.898551,0.666667
2,hazelnut,0.527273,0.711826,0.591837,0.828571
3,capsule,0.825758,0.891173,0.825758,1.0
4,toothbrush,0.619048,0.805556,0.733333,0.733333
5,pill,0.844311,0.933003,0.844311,1.0
6,grid,0.269231,,0.0,0.0
7,bottle,0.73494,0.871617,0.753086,0.968254
8,cable,0.38,,0.428571,0.032609
9,leather,0.741935,0.827076,0.741935,1.0
