In [19]:
# %%
import numpy as np
from torchvision import transforms
from torchvision.models import resnet50, ResNet50_Weights
from torch.utils.data import DataLoader, TensorDataset
import os
import torch
from torchvision import transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
from IPython.display import clear_output

# %%
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")


# %%
def generate_embeddings():
    """
    Transform, resize and normalize the images and then use a pretrained model to extract
    the embeddings.
    """
    # TODO: define a transform to pre-process the images for ResNet50
    train_transforms = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    # Load the images and apply the pre-process transform
    train_dataset = datasets.ImageFolder(root="dataset/", transform=train_transforms)
    # Hint: adjust batch_size and num_workers to your PC configuration, so that you don't
    # run out of memory
    train_loader = DataLoader(dataset=train_dataset,
                              batch_size=64,
                              shuffle=False,
                              pin_memory=True, num_workers=4)

    # TODO: define a model for extraction of the embeddings (Hint: load a pretrained model,
    #  more info here: https://pytorch.org/vision/stable/models.html)
    weights = ResNet50_Weights.DEFAULT
    model = resnet50(weights=weights)

    for p in model.parameters():
        p.requires_grad = False
    model.to(device)

    model.eval()  # put model in evaluation mode in case it uses dropout and batch normalization layers

    embedding_size = 2048
    num_images = len(train_dataset)
    embeddings = []
    # embeddings = np.zeros((num_images, embedding_size))

    # TODO: Use the model to extract the embeddings. Hint: remove the last layers of the
    # model to access the embeddings the model generates.

    # Remove last layer
    model.fc = torch.nn.Sequential()

    # Pass the data to the model to get the embedding
    for idx, (data_batch, target) in enumerate(train_loader):
        output_batch = model(data_batch).cpu().detach().numpy()  # get embedding per batch and convert to numpy array
        # iterate over batch and write each embedding to embeddings
        for output in output_batch:  # iterates over first dimension of tensor of shape (batch_size, 2048, 1, 1)
            embeddings.append(np.squeeze(output))  # remove superfluouse dimensions by squeezig and save embedding
        print("Image embedding #{} done!".format(idx))

    embeddings = np.vstack(embeddings)

    np.save('dataset/embeddings.npy', embeddings)


# %%
def get_data(file, train=True):
    """
    Load the triplets from the file and generate the features and labels.

    input: file: string, the path to the file containing the triplets
          train: boolean, whether the data is for training or testing

    output: X: numpy array, the features
            y: numpy array, the labels
    """
    triplets = []
    with open(file) as f:
        for line in f:
            triplets.append(line)

    # generate training data from triplets
    train_dataset = datasets.ImageFolder(root="dataset/",
                                         transform=None)
    filenames = [s[0].split('/')[-1].replace('.jpg', '') for s in train_dataset.samples]
    embeddings = np.load('dataset/embeddings.npy')

    # TODO: Normalize the embeddings across the dataset
    """
    # Calculate mean and standard deviation along each dimension
    mean = np.mean(embeddings, axis=0)
    std = np.std(embeddings, axis=0)
    # Normalize the embeddings by subtracting the mean and dividing by the standard deviation
    normalized_embeddings = (embeddings - mean) / std
    """

    # Calculate L2 norm for each embedding vector
    norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
    # Normalize the embeddings by dividing by the L2 norm
    normalized_embeddings = embeddings / norms

    """
    # Find largest embedding
    embedding_norms = np.zeros((embeddings.shape[0]))
    for i, embedding in enumerate(embeddings):
        embedding_norm = np.linalg.norm(embedding) # calculate norm of embedding
        embedding_norms[i] = embedding_norm # remember norm
    max_norm = embedding_norms.max() # find largest norm
    # Normalize embeddings, s.t. largest embedding vector has size 1
    normalized_embeddings = embeddings/max_norm
    """

    # print(normalized_embeddings[0,0:20])

    file_to_embedding = {}
    for i in range(len(filenames)):
        file_to_embedding[filenames[i]] = normalized_embeddings[i]
    X = []
    y = []
    # use the individual embeddings to generate the features and labels for triplets
    for t in triplets:
        for a in t.split():
            if a not in file_to_embedding:
                print(f"Key {a} not found in file_to_embedding")
        emb = [file_to_embedding[a] for a in t.split()]

    emb = [file_to_embedding[a] for a in t.split()]
    X.append(np.hstack([emb[0], emb[1], emb[2]]))
    y.append(1)
    """
    # Generating negative samples (data augmentation)
    if train:
        X.append(np.hstack([emb[0], emb[2], emb[1]]))
        y.append(0)
    """
    X = np.vstack(X)
    y = np.hstack(y)
    return X, y


# %%
# Hint: adjust batch_size and num_workers to your PC configuration, so that you don't run out of memory
def create_loader_from_np(X, y=None, train=True, batch_size=64, shuffle=True, num_workers=4):
    """
    Create a torch.utils.data.DataLoader object from numpy arrays containing the data.

    input: X: numpy array, the features
           y: numpy array, the labels

    output: loader: torch.data.util.DataLoader, the object containing the data
    """
    if train:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float),
                                torch.from_numpy(y).type(torch.long))
    else:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float))
    loader = DataLoader(dataset=dataset,
                        batch_size=batch_size,
                        shuffle=shuffle,
                        pin_memory=True, num_workers=num_workers)
    return loader


# %%
# TODO: define a model. Here, the basic structure is defined, but you need to fill in the details
class Net(nn.Module):
    """
    The model class, which defines our classifier.
    """

    def __init__(self):
        """
        The constructor of the model.
        """
        super().__init__()
        self.input_layer = nn.Linear(2048, 1024)
        # self.hidden_layer = nn.Linear(1024, 1024)
        self.output_layer = nn.Linear(1024, 256)

    def forward(self, x):
        """
        The forward pass of the model.

        input: x: torch.Tensor, the input to the model

        output: x: torch.Tensor, the output of the model
        """
        x = self.input_layer(x)
        x = F.relu(x)
        # x = self.hidden_layer(x)
        # x = F.relu(x)
        x = self.output_layer(x)
        return x


# %%
def get_accuracy(emb_image1, emb_image2, emb_image3, y, verbose=False):
    """
    output: accuracy of this batch
    """
    # calculate the distances between the embeddings in this batch
    dist12 = torch.cdist(emb_image1.unsqueeze(dim=1).flatten(2), emb_image2.unsqueeze(dim=1).flatten(2)).squeeze()
    dist13 = torch.cdist(emb_image1.unsqueeze(dim=1).flatten(2), emb_image3.unsqueeze(dim=1).flatten(2)).squeeze()

    # check which distance is larger
    diff = dist13 - dist12  # positive if distance between image 1 and 2 is smaller than distance between image 1 and 3
    predicted = (diff > 0).long()  # 1 if diff is positive, 0 otherwise

    # convert to numpy
    predicted = predicted.cpu().detach().numpy()
    y = y.cpu().detach().numpy()

    if verbose:
        print(y)
        print(predicted)
        print(y == predicted)
    num_correct = np.sum((y == predicted).astype(int))
    num_tot = len(y)

    return num_correct / num_tot


# %%
def train_model(train_loader):
    """
    The training procedure of the model; it accepts the training data, defines the model
    and then trains it.

    input: train_loader: torch.data.util.DataLoader, the object containing the training data

    output: model: torch.nn.Module, the trained model
    """
    model = Net()
    model.train()
    model.to(device)
    n_epochs = 1

    loss_train_progress = []
    loss_val_progress = []
    accuracy_train_progress = []
    accuracy_val_progress = []

    # TODO: define a loss function, optimizer and proceed with training. Hint: use the part
    # of the training data as a validation split. After each epoch, compute the loss on the
    # validation split and print it out. This enables you to see how your model is performing
    # on the validation data before submitting the results on the server. After choosing the
    # best model, train it on the whole training data.
    optimizer = torch.optim.Adam(params=model.parameters(), lr=0.005)
    triplet_loss = torch.nn.TripletMarginLoss(margin=1, p=2)

    for epoch in range(n_epochs):
        print("Epoch {} has started.".format(epoch))
        total_train_loss, total_val_loss, count_train, count_val = 0, 0, 0, 0
        train_accuracies = []
        val_accuracies = []

        for i, [X, y] in enumerate(train_loader):
            """
            # Test validation split
            X_val = X[50:,:]
            y_val = y[50:]

            X = X[:50,:]
            y = y[:50]
            """

            # Test set
            # create embedding and apply triplet loss
            anchor = model(X[:, :2048])
            positive = model(X[:, 2048:4096])
            negative = model(X[:, 4096:6144])
            loss = triplet_loss(anchor, positive, negative)

            # keep track of progress
            total_train_loss += loss
            count_train += len(y)
            train_accuracies.append(get_accuracy(anchor, positive, negative, y))

            # optimize model
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            """
            # Validation set
            val_anchor = model(X_val[:,:2048])
            val_positive = model(X_val[:,2048:4096])
            val_negative = model(X_val[:,4096:6144])
            val_loss = triplet_loss(val_anchor, val_positive, val_negative)
            total_val_loss += val_loss
            count_val += len(y_val)
            val_accuracies.append(get_accuracy(val_anchor, val_positive, val_negative, y_val))
            """
        # average accuracies over this epoch
        train_accuracy = np.average(np.hstack(train_accuracies))
        # val_accuracy = np.average(np.hstack(val_accuracies))

        # Clear the previous output
        # clear_output(wait=True)

        # calculate and print progress
        total_train_loss = total_train_loss.item() / count_train
        loss_train_progress.append(total_train_loss)
        print("Training loss in epoch {} is: {}".format(epoch, total_train_loss))

        accuracy_train_progress.append(train_accuracy)
        print("Training accuracy in epoch {} is: {}".format(epoch, train_accuracy))
        """
        total_val_loss = total_val_loss.item()/count_val
        loss_val_progress.append(total_val_loss)
        print("Validation loss in epoch {} is: {}".format(epoch, total_val_loss))

        accuracy_val_progress.append(val_accuracy)
        print("Validation accuracy in epoch {} is: {}".format(epoch, val_accuracy))
        """
        """
        # Create an empty plot
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 4))
        ax1.set_title('Loss')
        ax2.set_title('Accuracy')
        ax1.set_xlabel('Epochs')
        ax1.set_ylabel('Loss')
        ax2.set_xlabel('Epochs')
        ax2.set_ylabel('Accuracy')

        # Clear the previous plot
        #ax1.cla()
        #ax2.cla()

        # Plot the updated data points
        ax1.plot(loss_train_progress, color='red', label='Training Loss')
        #ax1.plot(loss_val_progress, color='blue', label='Validation Loss')
        ax2.plot(accuracy_train_progress, color='red', label='Training Accuracy')
        #ax2.plot(accuracy_val_progress, color='blue', label='Validation Accuracy')
        # Add legend
        ax1.legend()
        ax2.legend()
        # Show the plot without blocking
        plt.tight_layout()
        plt.show(block=False)
        """

    return model


# %%
def test_model(model, loader):
    """
    The testing procedure of the model; it accepts the testing data and the trained model and
    then tests the model on it.

    input: model: torch.nn.Module, the trained model
           loader: torch.data.util.DataLoader, the object containing the testing data

    output: None, the function saves the predictions to a results.txt file
    """
    model.eval()
    predictions = []
    # Iterate over the test data
    with torch.no_grad():  # We don't need to compute gradients for testing
        for [x_batch] in loader:
            x_batch = x_batch.to(device)

            # predict embeddings
            emb_image1 = model(x_batch[:, :2048])
            emb_image2 = model(x_batch[:, 2048:4096])
            emb_image3 = model(x_batch[:, 4096:6144])
            # print(emb_image1[0])

            # calculate the distances between the embeddings in this batch
            dist12 = torch.cdist(emb_image1.unsqueeze(dim=1).flatten(2),
                                 emb_image2.unsqueeze(dim=1).flatten(2)).squeeze()
            dist13 = torch.cdist(emb_image1.unsqueeze(dim=1).flatten(2),
                                 emb_image3.unsqueeze(dim=1).flatten(2)).squeeze()
            # print(dist12.shape)
            # print(dist13.shape)

            # check which distance is larger
            diff = dist13 - dist12  # positive if distance between image 1 and 2 is smaller than distance between image 1 and 3
            predicted = (diff > 0).long()  # 1 if diff is positive, 0 otherwise
            # print(diff)
            # print(predicted.shape)

            predictions.append(predicted)

        predictions = np.hstack(predictions)
    np.savetxt("results.txt", predictions, fmt='%i')


# %%
# Main function. You don't have to change this
if __name__ == '__main__':
    TRAIN_TRIPLETS = 'train_triplets.txt'
    TEST_TRIPLETS = 'test_triplets.txt'

    # generate embedding for each image in the dataset
    if (os.path.exists('dataset/embeddings.npy') == False):
        generate_embeddings()

    # load the training and testing data
    print("Fetching data...")

    X, y = get_data(TRAIN_TRIPLETS)
    X_test, _ = get_data(TEST_TRIPLETS, train=False)

    # Create data loaders for the training and testing data
    print("Creating data loaders...")
    train_loader = create_loader_from_np(X, y, train=True, batch_size=64)
    test_loader = create_loader_from_np(X_test, train=False, batch_size=2048, shuffle=False)

    # define a model and train it
    print("Training model...")
    model = train_model(train_loader)

    # test the model on the test data
    print("Testing the model...")
    test_model(model, test_loader)
    print("Results saved to results.txt")



Image embedding #0 done!
Image embedding #1 done!
Image embedding #2 done!
Image embedding #3 done!
Image embedding #4 done!
Image embedding #5 done!
Image embedding #6 done!
Image embedding #7 done!
Image embedding #8 done!
Image embedding #9 done!
Image embedding #10 done!
Image embedding #11 done!
Image embedding #12 done!
Image embedding #13 done!
Image embedding #14 done!
Image embedding #15 done!
Image embedding #16 done!
Image embedding #17 done!
Image embedding #18 done!
Image embedding #19 done!
Image embedding #20 done!
Image embedding #21 done!
Image embedding #22 done!
Image embedding #23 done!
Image embedding #24 done!
Image embedding #25 done!
Image embedding #26 done!
Image embedding #27 done!
Image embedding #28 done!
Image embedding #29 done!
Image embedding #30 done!
Image embedding #31 done!
Image embedding #32 done!
Image embedding #33 done!
Image embedding #34 done!
Image embedding #35 done!
Image embedding #36 done!
Image embedding #37 done!
Image embedding #38 do

KeyError: '02461'