## Setup

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import os
import random
import tensorflow as tf
from pathlib import Path
from tensorflow.keras import applications
from tensorflow.keras import layers
from tensorflow.keras import losses
from tensorflow.keras import optimizers
from tensorflow.keras import metrics
from tensorflow.keras import Model
from tensorflow.keras.applications import resnet
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from keras.applications.vgg16 import VGG16
import itertools
from tensorflow.keras.layers import RandomFlip, RandomRotation, RandomZoom
from tensorflow.keras.models import Sequential
import pandas as pd
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import confusion_matrix
import seaborn as sns

### The code is based on this keras tutorial:
https://keras.io/examples/vision/siamese_network/

**The code was changed and modified to fit on our use-case**

**We did not use the class proposed in the lecture for loading the dataset**

`All code what was added by us is mentioned in the comments of the code`

#### The dataset gets loaded from the folders containing the different k-values, in general every dataset consists of:

* `anchor/undamaged` contains the rings that we will use as the anchor.
* `positive/undamaged` will also be used as the positive sample (rings that look like the anchor).
* `negative/damaged` will also be used as the negative sample (rings that are demaged).

In [None]:
#define the different k values add define the target shape of the images
k_values = [1,2,3,4,5,10,15,20,25,30]
target_shape = (224, 224)

In [None]:
#define the paths to get the images from
cache_dir = Path(Path.home()) / "./few-shot siamese"
positive_images_path = cache_dir / "undamaged"
anchor_images_path = cache_dir / "undamaged"

negative_images_paths = [os.path.join(cache_dir, f"damaged_k{i}") for i in k_values]
test_negative_images_path = cache_dir / "damaged"

In [None]:
#print all image datasets paths in negative_images_paths
negative_images_paths

## Preparing the data

We are going to use a `tf.data` pipeline to load the data and generate the triplets that we
need to train the Siamese network.

We'll set up the pipeline using a zipped list with anchor, positive, and negative filenames as
the source. The pipeline will load and preprocess the corresponding images. Furthermore we apply image augmentation.

In [None]:
#function to decode the image and resize it to the target shape
def preprocess_image(filename):
    """
    Load the specified file as a JPEG image, preprocess it and
    resize it to the target shape.
    """
    
    image_string = tf.io.read_file(filename)
    image = tf.image.decode_png(image_string, channels=3)
    image = tf.image.convert_image_dtype(image, tf.float32)
    image = tf.image.resize(image, target_shape)
    return image

#We added this augmentation function
def preprocess_augmentation(filename):
    image = preprocess_image(filename)
    image = tf.image.random_flip_left_right(image)
    image = tf.image.random_flip_up_down(image)
    image = tf.image.random_brightness(image, max_delta=0.2)
    image = tf.image.random_contrast(image, lower=0.8, upper=1.2)
    return image

#We added a function to preprocess the test dataset
#apply the preprocessing augmentation to the images
def preprocess_triplets(anchor, positive, negative):
    """
    Given the filenames corresponding to the three images, load and
    preprocess them.
    """

    return (
        preprocess_augmentation(anchor),
        preprocess_augmentation(positive),
        preprocess_augmentation(negative),
    )

#Function to preprocess the imagae without preprocessing
def preprocess_triplets_test(anchor, positive, negative):
    """
    Given the filenames corresponding to the three images, load and
    preprocess them.
    """

    return (
        preprocess_image(anchor),
        preprocess_image(positive),
        preprocess_image(negative),
    )

In the next code block a data pipeline using a zipped list with an anchor, positive,
and negative image is used. The output of the pipeline
contains the same triplet with every image loaded and preprocessed.

Here all the different datasets are created and preprocessed with the functions in the cell above. We generate a train, test and validation dataset based of the values in the list "negative_images_paths"

We added a test set to test our KNN classifier

In [None]:
#initialise empty lists
train_datasets = []
val_datasets = []

#loop over the propossed k values
for negative_images_path in negative_images_paths:
    #create anchor images
    anchor_images = sorted(
        [str(os.path.join(anchor_images_path, f)) for f in os.listdir(anchor_images_path)]
    )
    #create positive images
    positive_images = sorted(
        [str(os.path.join(positive_images_path, f)) for f in os.listdir(positive_images_path)]
    )
    #create negative images
    negative_images = []
    while len(negative_images) < len(anchor_images):
        negative_images.append(sorted(
            [str(os.path.join(negative_images_path, f)) for f in os.listdir(negative_images_path)]
        ))
    
    negative_images_cropped = list(itertools.chain(*negative_images))
    negative_images_cropped = negative_images_cropped[:len(anchor_images)]

    #additionaly to train and val dataset we add a test dataset
    test_negative_images = sorted(
        [str(os.path.join(test_negative_images_path, f)) for f in os.listdir(test_negative_images_path)]
    )[:len(anchor_images)]

    #check the image count of the anchor images
    image_count = len(anchor_images)

    #convert the datasets to row vector datasets
    anchor_dataset = tf.data.Dataset.from_tensor_slices(anchor_images)
    positive_dataset = tf.data.Dataset.from_tensor_slices(positive_images)
    negative_dataset = tf.data.Dataset.from_tensor_slices(negative_images_cropped)
    test_negative_images_dataset = tf.data.Dataset.from_tensor_slices(test_negative_images)
    
    #shuffle the datasets to have them in random order
    negative_dataset = negative_dataset.shuffle(buffer_size=4096)
    anchor_dataset = anchor_dataset.shuffle(buffer_size=4096)
    test_negative_images_dataset = test_negative_images_dataset.shuffle(buffer_size=4096)

    #add all seperate datasets: anchor, positive, negative in one dataset named dataset
    dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, negative_dataset))

    #shuffle the dataset again
    dataset = dataset.shuffle(buffer_size=1024)

    #apply the preprocessing to the dataset
    dataset = dataset.map(preprocess_triplets)

    #split the dataset into train and validation with a 60/40 ratio
    train_dataset = dataset.take(round(image_count * 0.6))
    val_dataset = dataset.skip(round(image_count * 0.6))

    #make batches of 1 
    train_dataset = train_dataset.batch(1, drop_remainder=False)
    #prefetch for better performance
    train_dataset = train_dataset.prefetch(8)

    #make batches of 1 
    val_dataset = val_dataset.batch(1, drop_remainder=False)
    #prefetch for better performance
    val_dataset = val_dataset.prefetch(8)

    #create the test dataset and apply same steps as on train and validation except the splitting
    test_dataset = tf.data.Dataset.zip((anchor_dataset, positive_dataset, test_negative_images_dataset))
    test_dataset = test_dataset.shuffle(buffer_size=1024)
    test_dataset = test_dataset.map(preprocess_triplets_test)
    test_dataset = test_dataset.batch(1, drop_remainder=False)
    test_dataset = test_dataset.prefetch(8)

    #add created datasets in a list
    train_datasets.append(train_dataset)
    val_datasets.append(val_dataset)

The visualization is same as in the keras tutorial

Here we take a look at a few examples of triplets. We can see the applied image augmentation.

In [None]:
def visualize(anchor, positive, negative):
    """Visualize a few triplets from the supplied batches."""

    def show(ax, image):
        ax.imshow(image)
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

    fig = plt.figure(figsize=(9, 9))

    axs = fig.subplots(1, 3)
    for i in range(1):
        show(axs[0], anchor[i])
        show(axs[1], positive[i])
        show(axs[2], negative[i])

visualize(*list(train_datasets[3].take(1).as_numpy_iterator())[0])
visualize(*list(train_datasets[3].take(1).as_numpy_iterator())[0])
visualize(*list(train_datasets[3].take(1).as_numpy_iterator())[0])

## Setting up the embedding generator model

Our Siamese Network will generate embeddings for each of the images of the
triplet. To do this, we will use a VGG16 model pretrained on ImageNet and
connect a two `Dense` layers to it so we can learn to separate these
embeddings.

We will freeze the weights of all the layers of the VGG16 model. The network was customized on our use case.
This is important to avoid affecting the weights that the model has already learned.
We are going to leave the bottom few layers trainable, so that we can fine-tune their weights
during training.

In [None]:
#make VGG16 the base CNN and add two fully conntected layers and batch normalization, we store the results in the embedding variable
base_cnn = VGG16(
    weights="imagenet", input_shape=target_shape + (3,), include_top=False
)
flatten = layers.Flatten()(base_cnn.output)
dense1 = layers.Dense(64, activation="relu")(flatten)
dense1 = layers.BatchNormalization()(dense1)
output = layers.Dense(16)(dense1)

embedding = Model(base_cnn.input, output, name="Embedding")

trainable = False
for layer in base_cnn.layers:
    layer.trainable = trainable

## Setting up the Siamese Network model

The Siamese network will receive each of the triplet images as an input,
generate the embeddings, and output the distance between the anchor and the
positive embedding, as well as the distance between the anchor and the negative
embedding.

To compute the distance, we can use a custom layer `DistanceLayer` that
returns both values as a tuple.

In [None]:
#this code block gets used with no changes from the tutorial
class DistanceLayer(layers.Layer):
    """
    This layer is responsible for computing the distance between the anchor
    embedding and the positive embedding, and the anchor embedding and the
    negative embedding.
    """

    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    def call(self, anchor, positive, negative):
        ap_distance = tf.reduce_sum(tf.square(anchor - positive), -1)
        an_distance = tf.reduce_sum(tf.square(anchor - negative), -1)
        return (ap_distance, an_distance)


anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
positive_input = layers.Input(name="positive", shape=target_shape + (3,))
negative_input = layers.Input(name="negative", shape=target_shape + (3,))

distances = DistanceLayer()(
    embedding(tf.keras.applications.vgg16.preprocess_input(anchor_input)),
    embedding(tf.keras.applications.vgg16.preprocess_input(positive_input)),
    embedding(tf.keras.applications.vgg16.preprocess_input(negative_input)),
)
 
siamese_network = Model(
    inputs=[anchor_input, positive_input, negative_input], outputs=distances
)

## Putting everything together

We now need to implement a model with custom training loop so we can compute
the triplet loss using the three embeddings produced by the Siamese network.

Let's create a `Mean` metric instance to track the loss of the training process.

In [None]:
#this class is almost identical to the keras tutorial with exception that we added gradient clipping in line 27
class SiameseModel(Model):
    """The Siamese Network model with a custom training and testing loops.

    Computes the triplet loss using the three embeddings produced by the
    Siamese Network.

    The triplet loss is defined as:
       L(A, P, N) = max(‖f(A) - f(P)‖² - ‖f(A) - f(N)‖² + margin, 0)
    """

    def __init__(self, siamese_network, margin=2):
        super().__init__()
        self.siamese_network = siamese_network
        self.margin = margin
        self.loss_tracker = metrics.Mean(name="loss")

    def call(self, inputs):
        return self.siamese_network(inputs)

    def train_step(self, data):
        # GradientTape is a context manager that records every operation that
        # you do inside. We are using it here to compute the loss so we can get
        # the gradients and apply them using the optimizer specified in
        # `compile()`.
        with tf.GradientTape() as tape:
            loss = self._compute_loss(data)

        # Storing the gradients of the loss function with respect to the
        # weights/parameters.
        gradients = tape.gradient(loss, self.siamese_network.trainable_weights)
        #added gradient clipping
        gradients = [(tf.clip_by_value(grad, -1.0, 1.0))
                                  for grad in gradients]

        # Applying the gradients on the model using the specified optimizer
        self.optimizer.apply_gradients(
            zip(gradients, self.siamese_network.trainable_weights)
        )

        # Let's update and return the training loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        loss = self._compute_loss(data)

        # Let's update and return the loss metric.
        self.loss_tracker.update_state(loss)
        return {"loss": self.loss_tracker.result()}

    def _compute_loss(self, data):
        # The output of the network is a tuple containing the distances
        # between the anchor and the positive example, and the anchor and
        # the negative example.
        ap_distance, an_distance = self.siamese_network(data)

        # Computing the Triplet Loss by subtracting both distances and
        # making sure we don't get a negative value.
        loss = ap_distance - an_distance
        loss = tf.maximum(loss + self.margin, 0.0)
        return loss

    @property
    def metrics(self):
        # We need to list our metrics here so the `reset_states()` can be
        # called automatically.
        return [self.loss_tracker]

In [None]:
#we added this function to plot the triplet loss over trained epochs and save as png
def plotTraining(history,k):
    loss_values = history.history['loss']
    val_loss_values = history.history['val_loss']
    epochs = range(1, len(loss_values)+1)

    plt.plot(epochs, loss_values,val_loss_values)
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend(['Training Loss', 'Validation Loss'])
    plt.savefig('test_1_'+str(k)+'.png')
    plt.clf()

### Train the network and KNN on the datasets and save the results in a CSV

We added a loop to train the approach on all the datasets and then classify the results with a KNN also we output the results to a csv as well as to store a confusion matrix

In [None]:
#we set fixed margin for training
margin = 2
#create dict for output in csv
result_dict = {
    'k': [],
    'Euclidean Accuracy': [],
    'Test Accuracy': [],
}
#define the euclidean distance
euclideanDistance = np.linalg.norm

#loop over the datasets in train_dataset and train the model on each 
for index, train_dataset in enumerate(train_datasets):
    #we had to include the VGG16 and the DistanceLayer() class to reinitiate the weights after each training run and train from 0 again
    base_cnn = VGG16(
        weights="imagenet", input_shape=target_shape + (3,), include_top=False
    )
    flatten = layers.Flatten()(base_cnn.output)
    dense1 = layers.Dense(64, activation="relu")(flatten)
    dense1 = layers.BatchNormalization()(dense1)
    output = layers.Dense(16)(dense1)

    embedding = Model(base_cnn.input, output, name="Embedding")

    trainable = False
    for layer in base_cnn.layers:
        layer.trainable = trainable

    #initialize the triplets
    anchor_input = layers.Input(name="anchor", shape=target_shape + (3,))
    positive_input = layers.Input(name="positive", shape=target_shape + (3,))
    negative_input = layers.Input(name="negative", shape=target_shape + (3,))
    #give triplets in distance layer class
    distances = DistanceLayer()(
        embedding(tf.keras.applications.vgg16.preprocess_input(anchor_input)),
        embedding(tf.keras.applications.vgg16.preprocess_input(positive_input)),
        embedding(tf.keras.applications.vgg16.preprocess_input(negative_input)),
    )
    #initialize the siamese network
    siamese_network = Model(
        inputs=[anchor_input, positive_input, negative_input], outputs=distances
    )
    
    #clear the keras seassion
    tf.keras.backend.clear_session()

    #we added a early stopping and learning rate reduction
    callback = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=10, restore_best_weights=True)
    reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", patience=5, factor=0.5) 

    #create, compile and train the model
    siamese_model = SiameseModel(siamese_network)
    siamese_model.compile(optimizer=optimizers.legacy.Adam(0.00005),weighted_metrics=False)
    history = siamese_model.fit(train_dataset, epochs=2, validation_data=val_datasets[index], shuffle = True, callbacks=[callback,reduce_lr])

    #we call the plot function
    plotTraining(history,k_values[index])

    #transform train data to python list
    list_test_data=list(test_dataset) 

    #calculate the total true positives
    totalTruePositives=0
    for i in iter(test_dataset):
        totalTruePositives = totalTruePositives+1

    eucCounterPositive = 0
    eucCounterNegative = 0

    #we calculate the distances of the embeddings and save the results in a list to calculate the total euclidean accuracy
    elem_counter = 0
    for i in iter(test_dataset):
        anchor, positive, negative = i
        anchor_embedding, positive_embedding, negative_embedding = (
            embedding(tf.keras.applications.vgg16.preprocess_input(anchor)),
            embedding(tf.keras.applications.vgg16.preprocess_input(positive)),
            embedding(tf.keras.applications.vgg16.preprocess_input(negative)),
        )
        #we calculate euclidean distance on the anchor and positive/undamaged image
        euclideanPos = euclideanDistance(tf.reduce_sum(np.array(positive_embedding))- tf.reduce_sum(np.array(anchor_embedding)))
        #we calculate euclidean distance on the anchor and negative/damaged image
        euclideanNeg = euclideanDistance(tf.reduce_sum(np.array(negative_embedding))- tf.reduce_sum(np.array(anchor_embedding)))

        #we count the correctly or not correctly classified images
        if euclideanPos < euclideanNeg:
            eucCounterPositive = eucCounterPositive +1
        else:
            eucCounterNegative = eucCounterNegative +1
        elem_counter = elem_counter +1

    #declare empty array for triplet embeddings
    pos_emb = []
    neg_emb = []
    anch_emb =[]

    #get the embeddings and add them in the arrays
    for i, element in enumerate(list_test_data):
        anchor, positive, negative = element
        anchor_embedding, positive_embedding, negative_embedding = (
            embedding(tf.keras.applications.vgg16.preprocess_input(anchor)),
            embedding(tf.keras.applications.vgg16.preprocess_input(positive)),
            embedding(tf.keras.applications.vgg16.preprocess_input(negative)),
        )
        anch_emb.append(anchor_embedding)
        pos_emb.append(positive_embedding)
        neg_emb.append(negative_embedding)

    """From here on we added a custom KNN classifier"""
    #merge the embeddings in one list
    merged_embeddings = np.concatenate((anch_emb, pos_emb, neg_emb), axis=1)

    #create lables for KNN -> pos = 0, neg = 1
    pos_emb_label = []
    neg_emb_label = []
    for i in range(len(pos_emb)):
        pos_emb_label.append(0)
        neg_emb_label.append(1)

    #generate train and test dataset
    X = np.concatenate((pos_emb,neg_emb), axis=0)
    y = np.concatenate((pos_emb_label,neg_emb_label), axis=0)

    X = X.reshape((len(pos_emb)+len(neg_emb), 16))

    #we split the data in train and test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

    knn = KNeighborsClassifier(n_neighbors=2)
    #fit the KNN
    knn.fit(X_train, y_train)
    #train the KNN
    predictions = knn.predict(X_test)

    #plot the confusion matrix and save it as png
    cm = confusion_matrix(y_test, predictions)
    heatmap_fig = plt.figure(2,figsize=(10, 7))
    sns.heatmap(cm, annot=True, cmap='Reds')
    plt.xlabel('Predicted Labels')
    plt.ylabel('True Labels')
    plt.title('Confusion Matrix')
    heatmap_fig.savefig(f'confusion_matrix_k_{k_values[index]}.png')
    plt.clf()
    accuracy = knn.score(X_test, y_test)
    print(f'Test accuracy: {accuracy:.2f}')

    #write results in dict
    result_dict["k"].append(k_values[index])
    result_dict["Euclidean Accuracy"].append(eucCounterPositive / (eucCounterPositive + eucCounterNegative))
    result_dict["Test Accuracy"].append(accuracy)

#we save results to csv
result_df = pd.DataFrame.from_dict(result_dict)
csv_file = f"final_results.csv"
result_df.to_csv(csv_file, index=False)