In [1]:
import numpy as np
from torchvision import transforms
from torch.utils.data import DataLoader, TensorDataset
import os
import torch
import torchvision
import torchvision.datasets as datasets
import torch.nn as nn
import torch.nn.functional as F
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from tqdm.auto import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
from torchvision.models import efficientnet_b3 as ENB3
from torchvision.models import efficientnet_v2_s as ENV2S
from torchvision.models import regnet_y_128gf as RNY128
from torchvision.models import regnet_y_16gf as RNY16

In [3]:
embeddings_file_name = 'dataset/embeddings-RNY16.npy'
if(os.path.exists(embeddings_file_name) == False):
    weights = torchvision.models.RegNet_Y_16GF_Weights.IMAGENET1K_SWAG_E2E_V1
    using_embedding_model = RNY16(weights=weights)
    embedding_size = using_embedding_model.fc.in_features # may change to 'classifier[1]' depending on the model


model_file = 'RNY16-similar.pth'
result_filename = 'results-RNY16-similar.txt'

n_epochs = 10

In [4]:
def generate_embeddings(embeddings_file_name):
    """
    Transform, resize and normalize the images and then use a pretrained model to extract 
    the embeddings.
    """
    # TODO: define a transform to pre-process the images
    # train_transforms = transforms.Compose(
    #     # maybe need to resize?
    #     # [transforms.Resize((224, 224)),]
    #     # maybe normalize?
    #     # [transforms.Normalize()]
    #     [transforms.ToTensor()])
    train_dataset = datasets.ImageFolder(root="./dataset/", transform=weights.transforms())
    # Hint: adjust batch_size and num_workers to your PC configuration, so that you don't 
    # run out of memory
    train_loader = DataLoader(dataset=train_dataset,
                              batch_size=1,
                              shuffle=False,
                              pin_memory=True, num_workers=6)

    # TODO: define a model for extraction of the embeddings (Hint: load a pretrained model,
    #  more info here: https://pytorch.org/vision/stable/models.html)
    # model = nn.Module()
    model = using_embedding_model
    embeddings = []
    # embedding_size = model.fc.in_features  # Dummy variable, replace with the actual embedding size once you pick your model

    num_images = len(train_dataset)
    embeddings = np.zeros((num_images, embedding_size))
    # TODO: Use the model to extract the embeddings. Hint: remove the last layers of the 
    # model to access the embeddings the model generates.

    # remove last layer
    model = nn.Sequential(*list(model.children())[:-1])

    model.to(device)

    # extract embeddings
    for i, (images, _) in enumerate(train_loader):
        embeddings[i] = model(images.to(device)).flatten().cpu().detach().numpy()

    np.save(embeddings_file_name, embeddings)

In [5]:
# generate embedding for each image in the dataset
if(os.path.exists(embeddings_file_name) == False):
    generate_embeddings(embeddings_file_name)

In [6]:
def get_data(file, train=True):
    """
    Load the triplets from the file and generate the features and labels.

    input: file: string, the path to the file containing the triplets
          train: boolean, whether the data is for training or testing

    output: X: numpy array, the features
            y: numpy array, the labels
    """
    triplets = []
    with open(file) as f:
        for line in f:
            triplets.append(line)

    # generate training data from triplets
    train_dataset = datasets.ImageFolder(root="dataset/",
                                         transform=None)
    filenames = [s[0].split('\\')[-1].replace('.jpg', '') for s in train_dataset.samples]
    embeddings = np.load(embeddings_file_name)
    # TODO: Normalize the embeddings across the dataset
    embeddings = StandardScaler().fit_transform(embeddings)

    file_to_embedding = {}
    for i in range(len(filenames)):
        file_to_embedding[filenames[i]] = embeddings[i]
    X = []
    # use the individual embeddings to generate the features and labels for triplets
    for t in triplets:
        emb = [file_to_embedding[a] for a in t.split()]
        X.append(np.vstack([emb[0], emb[1], emb[2]]))
    X = np.array(X)
    return X

    

In [7]:
# Hint: adjust batch_size and num_workers to your PC configuration, so that you don't run out of memory
def create_loader_from_np(X, y = None, train = True, batch_size=64, shuffle=True, num_workers = 4):
    """
    Create a torch.utils.data.DataLoader object from numpy arrays containing the data.

    input: X: numpy array, the features
           y: numpy array, the labels
    
    output: loader: torch.data.util.DataLoader, the object containing the data
    """

    dataset = TensorDataset(torch.from_numpy(X).type(torch.float))
    loader = DataLoader(dataset=dataset,
                        batch_size=batch_size,
                        shuffle=shuffle,
                        pin_memory=True, num_workers=num_workers)
    return loader

In [8]:
# TODO: define a model. Here, the basic structure is defined, but you need to fill in the details
class Net(nn.Module):
    """
    The model class, which defines our classifier.
    """
    def __init__(self, embedding_size):
        """
        The constructor of the model.
        """
        super().__init__()
        self.embedding_size = embedding_size
        self.hidden_size = 1024
        self.fc1 = nn.Linear(embedding_size, self.hidden_size)
        self.fc2 = nn.Linear(self.hidden_size, 128)
        self.fc3 = nn.Linear(128, 1)

    def forward_once(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        return x
    
    def forward(self, A, B):
        output1 = self.forward_once(A)
        output2 = self.forward_once(B)
        distance = torch.abs(output1 - output2)
        distance = self.fc3(distance)
        x = F.sigmoid(distance)
        return x


In [9]:
def train_model(train_loader):
    """
    The training procedure of the model; it accepts the training data, defines the model 
    and then trains it.

    input: train_loader: torch.data.util.DataLoader, the object containing the training data
    
    output: model: torch.nn.Module, the trained model
    """
    embedding_size = train_loader.dataset.tensors[0].shape[-1]
    model = Net(embedding_size)
    model.train()
    model.to(device)
    # TODO: define a loss function, optimizer and proceed with training. Hint: use the part 
    # of the training data as a validation split. After each epoch, compute the loss on the 
    # validation split and print it out. This enables you to see how your model is performing 
    # on the validation data before submitting the results on the server. After choosing the 
    # best model, train it on the whole training data.
    loss_fun = nn.BCELoss()
    # optimizer = torch.optim.Adam(model.parameters(), lr=0.0001, weight_decay=0.0001)
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.0001)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.3, patience=3, verbose=True)
    train_size = len(train_loader.dataset)
    valid_size = int(train_size * 0.1)
    train_size = train_size - valid_size
    train_dataset, valid_dataset = torch.utils.data.random_split(train_loader.dataset, [train_size, valid_size])
    epoch_train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True, pin_memory=True, num_workers=4)
    epoch_valid_loader = DataLoader(dataset=valid_dataset, batch_size=64, shuffle=True, pin_memory=True, num_workers=4)
    
    for epoch in tqdm(range(n_epochs)):
        train_loss = 0
        valid_loss = 0
        number_of_correct_train = 0
        number_of_correct_valid = 0
        for [x] in epoch_train_loader:
            optimizer.zero_grad()
            A, B, C = x[:, 0, :].to(device), x[:, 1, :].to(device), x[:, 2, :].to(device)
            similarity1 = model(A, B).flatten()
            similarity2 = model(A, C).flatten()
            loss = loss_fun(similarity1, 1) + loss_fun(similarity2, 0)
            train_loss += loss.item()
            number_of_correct_train += torch.sum(similarity1 > similarity2)
            loss.backward()
            optimizer.step()
        train_loss /= len(epoch_train_loader)
        
        with torch.no_grad():
            for [x] in epoch_valid_loader:
                A, B, C = x[:, 0, :].to(device), x[:, 1, :].to(device), x[:, 2, :].to(device)
                similarity1 = model(A, B).flatten()
                similarity2 = model(A, C).flatten()
                loss = loss_fun(similarity1, 1) + loss_fun(similarity2, 0)
                number_of_correct_valid += torch.sum(similarity1 > similarity2)
                valid_loss += loss.item()
            valid_loss /= len(epoch_valid_loader)

        scheduler.step(number_of_correct_valid / valid_size)
        print('Epoch: {}, Training Loss: {:.4f}, Validation Loss: {:.4f}'.format(epoch+1, train_loss, valid_loss))
        print('Training Accuracy: {:.4f}, Validation Accuracy: {:.4f}'.format(number_of_correct_train / train_size, number_of_correct_valid / valid_size))
        if (optimizer.param_groups[0]['lr'] < 1e-6):
            break
    return model

In [10]:
def test_model(model, loader, filename='results.txt'):
    """
    The testing procedure of the model; it accepts the testing data and the trained model and 
    then tests the model on it.

    input: model: torch.nn.Module, the trained model
           loader: torch.data.util.DataLoader, the object containing the testing data
        
    output: None, the function saves the predictions to a results.txt file
    """
    model.eval()
    predictions = []
    # Iterate over the test data
    with torch.no_grad(): # We don't need to compute gradients for testing
        for [x_batch] in tqdm(loader):
            A, B, C = x_batch[:, 0, :].to(device), x_batch[:, 1, :].to(device), x_batch[:, 2, :].to(device)
            similarity1 = model(A, B).flatten()
            similarity2 = model(A, C).flatten()
            predictions.append(similarity1 > similarity2)
        predictions = predictions.cpu().numpy().astype(int)
        np.savetxt(filename, predictions, fmt='%i')



    

In [11]:
def train_or_test_model(train=True):
    if train:
        # define a model and train it
        TRAIN_TRIPLETS = 'train_triplets.txt'

        # load the training and testing data
        X = get_data(TRAIN_TRIPLETS)
        print(X.shape)
        

        # Create data loaders for the training and testing data
        train_loader = create_loader_from_np(X, train = True, batch_size=64)
        model = train_model(train_loader)
        # model = train_without_valid(train_loader)
        torch.save(model.state_dict(), model_file)
    else:
            # test the model on the test data
            TEST_TRIPLETS = 'test_triplets.txt'
            X_test = get_data(TEST_TRIPLETS, train=False)
            test_loader = create_loader_from_np(X_test, train = False, batch_size=2048, shuffle=False)
            
            # load the model from the file
            model = Net(X_test.shape[-1])
            model.load_state_dict(torch.load(model_file))
            # model.to(device)
            
            test_model(model, test_loader, None, result_filename)
            print("Results saved to", result_filename)



In [12]:
train_or_test_model(train=True)

(59515, 3, 3024)


  0%|          | 0/10 [00:00<?, ?it/s]

AttributeError: 'int' object has no attribute 'size'

In [None]:
train_or_test_model(train=False)

  0%|          | 0/30 [00:00<?, ?it/s]

Results saved to results-RNY16-conv.txt
