In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
import torchvision

from sklearn.preprocessing import StandardScaler    
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.utils import resample

from skimage.io import imread

In [None]:
# Set seed for reproducibility
seed = 69

np.random.seed(seed)
torch.manual_seed(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Load raw training and testing data
train_raw_triplets = np.loadtxt('drive/My Drive/Colab Notebooks/train_triplets.txt', dtype='str')
testing_raw_triplets = np.loadtxt('drive/My Drive/Colab Notebooks/test_triplets.txt', dtype='str')

# Split training IDs into training and validation
valid_size = 0.1 # percentage of training set to use as validation
ID_train, ID_val = train_test_split(train_raw_triplets, test_size = valid_size) # train/val split

In [None]:
# Create new tripelts for validation: ACB
ID_val_new = ID_val.copy()
ID_val_new[:, 1] = ID_val[:, 2]
ID_val_new[:, 2] = ID_val[:, 1]

# Add labels to validation data: ABC = 1 and ACB = 0
len_data = ID_val.shape[0]
labels_1 = np.ones(len_data)
labels_0 = 0*np.ones(len_data)
# split data into 2 : half is ABC and other half is ACB
labels = np.append(labels_1[0:math.floor(len_data/2)], labels_0[math.floor(len_data/2):], axis = 0)

# all validation data file names
ID_val_all = np.append(ID_val[0:math.floor(len_data/2)], ID_val_new[math.floor(len_data/2):], axis = 0)

In [None]:
# Define data and network parameters
num_workers = 5
batch_size = 64
learning_rate = 0.001
epochs = 10

# Classes to turn data into pytorch dataset for dataloaders
## train data class
class trainID(torch.utils.data.Dataset):
    
    def __init__(self, X_ID):
        self.X_ID = X_ID
        self.X_ID_A = X_ID[:, 0]
        self.X_ID_B = X_ID[:, 1]
        self.X_ID_C = X_ID[:, 2]
        
    def __getitem__(self, index):
        return self.X_ID_A[index], self.X_ID_B[index], self.X_ID_C[index]
        
    def __len__ (self):
        return self.X_ID.shape[0]

## val data class
class valID(torch.utils.data.Dataset):
    
    def __init__(self, X_ID, y_data):
        self.X_ID = X_ID
        self.X_ID_A = X_ID[:, 0]
        self.X_ID_B = X_ID[:, 1]
        self.X_ID_C = X_ID[:, 2]
        self.y_data = y_data
        
    def __getitem__(self, index):
        return self.X_ID_A[index], self.X_ID_B[index], self.X_ID_C[index], self.y_data[index]
        
    def __len__ (self):
        return self.X_ID.shape[0]
    

## test data class
class testID(torch.utils.data.Dataset):
    
    def __init__(self, X_ID):
        self.X_ID = X_ID
        self.X_ID_A = X_ID[:, 0]
        self.X_ID_B = X_ID[:, 1]
        self.X_ID_C = X_ID[:, 2]
        
    def __getitem__(self, index):
        return self.X_ID_A[index], self.X_ID_B[index], self.X_ID_C[index]
        
    def __len__ (self):
        return self.X_ID.shape[0]


In [None]:
# Initialise dataloaders for training and validation sets
train_index = trainID(ID_train)
train_loader = DataLoader(dataset=train_index, batch_size=batch_size, shuffle=True, num_workers = num_workers, pin_memory=True)

val_index = valID(ID_val_all, labels)
val_loader = DataLoader(dataset=val_index, batch_size=128, shuffle=True, num_workers = num_workers)

# Initialise dataloaders for testing set
test_index = testID(testing_raw_triplets)
test_loader = DataLoader(dataset=test_index, batch_size=128, shuffle=False, num_workers = num_workers)

In [None]:
# Model Creation

model = torchvision.models.resnet50(pretrained=True)
print(model)

def resnet18_new(model):
    """
    Construct a ResNet-18 model.
    Args:
        pretrained (bool): If True, returns a model pre-trained on ImageNet
    """
    modified_pretrained = nn.Sequential(*list(model.children())[:-1])
    for param in modified_pretrained.parameters():
        param.requires_grad = False
    
    modified_pretrained.add_module("embedding", EmbeddingNet(model))
    return modified_pretrained
    
class TripletNet(nn.Module):
    """Triplet Network."""

    def __init__(self, embeddingnet):
        """Triplet Network Builder."""
        super(TripletNet, self).__init__()
        self.embeddingnet = embeddingnet
        #self.classifier = nn.Linear(2000, 1)

    def forward(self, a, p, n):
        """Forward pass."""
        # anchor
        embedded_A = self.embeddingnet(a)

        # positive examples
        embedded_B = self.embeddingnet(p)

        # negative examples
        embedded_C = self.embeddingnet(n)
        
        # classifier
        #classi = embedded_A + embedded_B + embedded_C
        #classi = self.classifier(classi)
        
        return embedded_A, embedded_B, embedded_C#, classi


class EmbeddingNet(nn.Module):
    """EmbeddingNet using ResNet-18."""

    def __init__(self, resnet):
        """Initialize EmbeddingNet model."""
        super(EmbeddingNet, self).__init__()

        # Everything except the last linear layer
        #self.features = nn.Sequential(*list(resnet.children())[:-1])
        num_ftrs = resnet.fc.in_features
        self.fc1 = nn.Linear(num_ftrs, 1000)
        self.fc2 = nn.Linear(1000, 512)
        
        # Batch Norm
        self.bn1 = nn.BatchNorm1d(1000)
        
        # Drop-out
        self.drop1 = nn.Dropout(p=0)

    def forward(self, out):
        """Forward pass of EmbeddingNet."""
        out = out.view(out.size(0), -1)
        out = self.drop1(F.relu(self.bn1(self.fc1(out))))
        out = self.fc2(out)

        return out

net = TripletNet(resnet18_new(model))
print(net)

In [None]:
# Load batch images from image IDs
def batch_images(image_ID, train):
    if train==True:
        transforms = [torchvision.transforms.ToPILImage(),torchvision.transforms.Resize((256,256)),
                    torchvision.transforms.RandomCrop(224),
                  torchvision.transforms.RandomHorizontalFlip(p=0.5),
                  torchvision.transforms.RandomVerticalFlip(p=0.5), 
                  torchvision.transforms.RandomRotation(degrees=90), 
                  torchvision.transforms.ToTensor(),
                  torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])] # transforms to apply 
    else:
        transforms = [torchvision.transforms.ToPILImage(),
                  torchvision.transforms.Resize((224,224)), 
                  torchvision.transforms.CenterCrop(224),
                  torchvision.transforms.ToTensor(),
                  torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])] # transforms to apply
                  
    comp_transforms = torchvision.transforms.Compose(transforms)
    batch_img = []
    batch_img = torch.tensor(batch_img)
    for i in range(len(image_ID)):
        img_path = 'drive/My Drive/Colab Notebooks/food/' + image_ID[i] +'.jpg'
        image = imread(img_path)
        image = comp_transforms(image)
        #plt.figure()
        #plt.imshow(image.permute(1, 2, 0))
        batch_img = torch.cat((batch_img, image), dim = 0)
    batch_img = batch_img.reshape(-1, 3, 224, 224)
    return batch_img

def binary_acc(A, B, C, y_true):
    d_ab = torch.norm(A-B, p=2, dim=1)
    d_ac = torch.norm(A-C, p=2, dim=1)
    diff = (d_ac-d_ab).detach().cpu().numpy()
    y_pred = np.ceil(diff.clip(0, 1))
    accuracy = accuracy_score(y_true, y_pred)
    return accuracy

In [None]:
model = net
if torch.cuda.is_available():
    model = model.cuda()
    print('cuda true')

# Specify loss function and optimiser
criterion = nn.TripletMarginLoss(margin=1.0, p=2.0, reduce=None, reduction='mean')
optimizer = optim.Adam((model.parameters()) , lr=learning_rate, weight_decay=0.001)

# quick sanity check
for A, B, C in train_loader:
    print(A)
    break
    
plt.close

In [None]:
# Train model
valid_loss_min = np.Inf # track change in validation loss
valid_accuracy_max = 0.0 # track change in validation accuracy

for epoch in range(1, epochs+1):
    
    # keep track of training and validation loss
    train_loss = 0.0
    train_accuracy = 0.0
    valid_loss = 0.0
    valid_accuracy = 0.0
    
    ###################
    # train the model #
    ###################
    print('TRAINING')
    print('wait...')
    model.train()
    train = True
    for A, B, C in train_loader:
        # retrieve batch of training images: tensor 25x3x150x224
        X_train_A = batch_images(A, train)
        X_train_B = batch_images(B, train)
        X_train_C = batch_images(C, train)

        if torch.cuda.is_available():
            X_train_A=X_train_A.cuda()
            X_train_B=X_train_B.cuda()
            X_train_C=X_train_C.cuda()
              
        # clear the gradients of all optimized variables
        optimizer.zero_grad()
        
        # forward pass: compute predicted outputs by passing inputs to the model
        embedded_A, embedded_B, embedded_C = model(X_train_A, X_train_B, X_train_C)
        
        # calculate the batch loss
        loss = criterion(embedded_A, embedded_B, embedded_C)
        
        # backward pass: compute gradient of the loss with respect to model parameters
        loss.backward()
        # perform a single optimization step (parameter update)
        optimizer.step()
        
        # update training loss
        train_loss += loss.item()
        
    # calculate average loss
    train_loss = train_loss/len(train_loader)
    print('final loss')
    print(train_loss)
    
        
    ######################    
    # validate the model #
    ######################
    print('VALIDATION')
    print('wait...')
    model.eval()
    train = False
    with torch.no_grad():
        for A, B, C, y_val in val_loader:
            # retrieve batch of training images: tensor 10x3x150x224
            X_val_A = batch_images(A, train)
            X_val_B = batch_images(B, train)
            X_val_C = batch_images(C, train)
            y_val = y_val.reshape(-1,1)

            if torch.cuda.is_available():
                X_val_A=X_val_A.cuda()
                X_val_B=X_val_B.cuda()
                X_val_C=X_val_C.cuda()

            # forward pass: compute predicted outputs by passing inputs to the model
            embedded_A, embedded_B, embedded_C = model(X_val_A, X_val_B, X_val_C)
            # calculate the batch loss
            loss = criterion(embedded_A, embedded_B, embedded_C)

            # update average validation loss 
            valid_loss += loss.item()
            # update training accuracy
            valid_accuracy += binary_acc(embedded_A, embedded_B, embedded_C, y_val)
            
    
    valid_loss = valid_loss/len(val_loader)
    valid_accuracy = valid_accuracy/len(val_loader)
    
    # print training/validation statistics 
    print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(
        epoch, train_loss, valid_loss))
    print('Epoch: {} \tValidation Acc: {:.6f}'.format(
        epoch, valid_accuracy))
    
    # save model if validation loss has decreased
    if valid_accuracy >= valid_accuracy_max:
        print('Validation acc increased ({:.6f} --> {:.6f}).  Saving model ...'.format(
        valid_accuracy_max,
        valid_accuracy))
        torch.save(model.state_dict(), 'drive/My Drive/Colab Notebooks/model_cifar_triplet_50.pt')
        valid_accuracy_max = valid_accuracy

In [None]:
# Load model with highest accuracy
if torch.cuda.is_available():
    model.load_state_dict(torch.load('drive/My Drive/Colab Notebooks/model_cifar_triplet_50.pt'))
else:
    model.load_state_dict(torch.load('drive/My Drive/Colab Notebooks/model_cifar_triplet_50.pt', map_location=torch.device('cpu')))

In [None]:
# Test model and save labels

y_pred_list = []
model.eval()
train = False
# iterate over test data
with torch.no_grad():
    for A, B, C in test_loader:
        # retrieve batch of training images: tensor 20x9x200x200
        # retrieve batch of training images: tensor 10x3x150x224
        X_test_A = batch_images(A, train)
        X_test_B = batch_images(B, train)
        X_test_C = batch_images(C, train)

        if torch.cuda.is_available():
            X_test_A=X_test_A.cuda()
            X_test_B=X_test_B.cuda()
            X_test_C=X_test_C.cuda()
        
        # forward pass: compute predicted outputs by passing inputs to the model
        embedded_A, embedded_B, embedded_C = model(X_test_A, X_test_B, X_test_C)
      
        # convert output probabilities to predicted class
        d_ab = torch.norm(embedded_A-embedded_B, p=2, dim=1)
        d_ac = torch.norm(embedded_A-embedded_C, p=2, dim=1)
        diff = (d_ac-d_ab).detach().cpu().numpy()
        y_test_pred = np.ceil(diff.clip(0, 1))
        y_pred_list.append(y_test_pred) 
        
print(y_pred_list)

In [None]:
# reshaping step
print(np.array(y_pred_list[0][:]).shape)

y_pred = []
y_pred = np.array(y_pred)
for i in range(466):
    n_batch = np.array(y_pred_list[i][:]).size
    y_pred = np.concatenate((y_pred, y_pred_list[i][:].reshape(n_batch,)), axis = 0)

output = pd.DataFrame(np.array([0]), columns=['Predicted Label'])
y_pred = (np.array(y_pred)).reshape(-1, 1)

for i in range(len(y_pred)):
    output.loc[i,'Predicted Label']=y_pred[i]

# Save the output (predicted labels for the test features)
output.to_csv('drive/My Drive/Colab Notebooks/prediction_triplet_50.csv', index=False, header=False)