# Task 3
This serves as a template which will guide you through the implementation of this task. It is advised to first read the whole template and get a sense of the overall structure of the code before trying to fill in any of the TODO gaps.
This is the jupyter notebook version of the template. For the python file version, please refer to the file `template_solution.py`.

First, we import necessary libraries:

In [1]:
import numpy as np
from torchvision import transforms
from torch.utils.data import DataLoader, TensorDataset
import os
import torch
from torchvision import transforms
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision.models import resnet50, ResNet50_Weights
from torch.optim import Adam
import torch.optim.lr_scheduler as lr_scheduler

In [2]:
# The device is automatically set to GPU if available, otherwise CPU
# If you want to force the device to CPU, you can change the line to
# device = torch.device("cpu")
# When using the GPU, it is important that your model and all data are on the
# same device.
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
global_batch_size = 128
global_num_workers = 16

In [3]:
def generate_embeddings():
  train_transforms = transforms.Compose([
      transforms.ToTensor(),
      transforms.Resize(256),
      transforms.CenterCrop(224),
      transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])
  train_dataset = datasets.ImageFolder(root="dataset/", transform=train_transforms)
  train_loader = DataLoader(dataset=train_dataset,
                          batch_size=50,
                          shuffle=False,
                          num_workers=8)
  model = resnet50(weights=ResNet50_Weights.DEFAULT)
  model.eval()

  for param in model.parameters():
    param.requires_grad = False

  model.fc = nn.Sequential()
  embeddings = []
  for features, labels in train_loader:
    embeddings.append(model(features).T.numpy())

  np.save('embeddings.npy', embeddings)

In [4]:
def get_data(file, train=True):
    triplets = []
    with open(file) as f:
        for line in f:
            triplets.append(line)

    # generate training data from triplets
    train_dataset = datasets.ImageFolder(root="dataset/", transform=None)
    #filenames = [s[0].split('/')[-1].replace('.jpg', '') for s in train_dataset.samples]
    filenames = [s[0].split('/')[-1].replace('.jpg', '').replace('food\\', '') for s in train_dataset.samples]

    #embeddings = np.load('embeddings3.npy')
    # TODO: Normalize the embeddings
    embeddings = np.load('embeddings.npy')
    #embeddings /= np.linalg.norm(embeddings, axis=1, keepdims=True)
    embeddings = np.swapaxes(embeddings, 1, 2)
    file_to_embedding = {}
    for i in range(200):
        for j in range(50):
            file_to_embedding[filenames[j+i*50]] = embeddings[i][j]
    X = []
    y = []
    for t in triplets:
        emb = [file_to_embedding[a] for a in t.split()]
        X.append(np.hstack([emb[0], emb[1], emb[2]]))
        y.append([[1]])
        if train:
            X.append(np.hstack([emb[0], emb[2], emb[1]]))
            y.append([[0]])
    X = np.vstack(X)
    y = np.hstack(y).T
    return X, y

Hint: adjust batch_size and num_workers to your PC configuration, so that you don't run out of memory (VRAM if on GPU, RAM if on CPU)

In [7]:
def create_loader_from_np(X, y=None, train=True, batch_size=16, shuffle=True, num_workers=8):
    if train:
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float),
                                torch.from_numpy(y).type(torch.float))
    else:
        """if y is not None:
            dataset = TensorDataset(torch.from_numpy(X).type(torch.float),
                                    torch.from_numpy(y).type(torch.long))
        else:"""
        dataset = TensorDataset(torch.from_numpy(X).type(torch.float))
    loader = DataLoader(dataset=dataset,
                        batch_size=batch_size,
                        shuffle=shuffle,
                        pin_memory=True, num_workers=num_workers)
    return loader


TODO: define a model. Here, the basic structure is defined, but you need to fill in the details

In [16]:
class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(6144, 128)
        self.fc2 = nn.Linear(128, 32)
        self.fc3 = nn.Linear(32, 16)
        self.fc4 = nn.Linear(16, 8)
        self.fc5 = nn.Linear(8, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = F.relu(self.fc4(x))
        x = self.fc5(x)
        return x

In [17]:
def test_model(model, test_loader):
    model.eval()
    predictions = []
    with torch.no_grad():
      for [X] in test_loader:
        predicted = model(X)
        predicted[predicted >= 0.5] = 1
        predicted[predicted < 0.5] = 0
        predictions.append(predicted)
      predictions = np.vstack(predictions)
    np.savetxt("results_task3_24.txt", predictions, fmt='%i')

In [18]:
def train_model(train_loader):
    model = Net()
    n_epochs = 5
    criterion = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01)
    scheduler = lr_scheduler.LinearLR(optimizer, start_factor=1.0, end_factor=0.7, total_iters=n_epochs)
    
    for epoch in range(n_epochs):
      running_loss = 0.0      
      for i, [X, y] in enumerate(train_loader):
        optimizer.zero_grad()
        output = model(X)
        loss = criterion(output, y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
      scheduler.step()
    return model

In [19]:
# load the training data
TRAIN_TRIPLETS = 'train_triplets.txt'
TEST_TRIPLETS = 'test_triplets.txt'
#generate_embeddings()

In [20]:
X, y = get_data(TRAIN_TRIPLETS)
# Create data loaders for the training data
train_loader = create_loader_from_np(X, y, train = True, batch_size=128)
# delete the loaded training data to save memory, as the data loader copies
del X
del y

# repeat for testing data
X_test, y_test = get_data(TEST_TRIPLETS, train=False)
test_loader = create_loader_from_np(X_test, train = False, batch_size=2048, shuffle=False)
del X_test
del y_test

# define a model and train it
model = train_model(train_loader)

# test the model on the test data
test_model(model, test_loader)