In [None]:
#Imports
import numpy as np
from numpy import random as ran

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as T
from torchvision import transforms
import torchvision.models as models

import time

from PIL import Image

In [None]:
def swap(train):
    y = ran.choice(a=[0, 1], size=(train.shape[0],1))

    for i in range(train.shape[0]):
        if y[i] == 0:
            tmp = train[i,1]
            train[i,1] = train[i,2]
            train[i,2] = tmp
    
    train = np.hstack(train,y)

    return train

In [None]:
def get_ima(train,i):
    A,B,C = train[i,:]
    
    filename = '../Data/food/'
    A_im = Image.open(filename + A + '.jpg').convert('RGB')
    B_im = Image.open(filename + B + '.jpg').convert('RGB')
    C_im = Image.open(filename + C + '.jpg').convert('RGB')
    
    resize = T.Resize(size=(250,350))
    A_im = resize(A_im)
    B_im = resize(B_im)
    C_im = resize(C_im)
        
    to_tensor = T.ToTensor()
    A_ten = to_tensor(A_im)
    B_ten = to_tensor(B_im)
    C_ten = to_tensor(C_im)
    
    return A_ten, B_ten, C_ten


In [None]:
def image_to_tensor(img):

    filename = '../Data/food/' + str(img) + '.jpg'
    image = Image.open(filename)
    to_tensor = transforms.ToTensor()
    tensor = to_tensor(image)
    resize = transforms.Resize((250,350))
    tensor = resize(tensor)
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                     std=[0.229, 0.224, 0.225])
    tensor = normalize(tensor)

In [None]:
#data
fname = '../Data/'
food = fname + 'food/'
train = np.loadtxt(fname + "train_triplets.txt", dtype=str)
#test = np.loadtxt(fname + "test_triplets.txt", dtype=str)

y, train = swap(train)
i = 1000

A,B,C = get_ima(train,i)

print(A.shape)
print(B.shape)
print(C.shape)
print(y.shape)
print('Hello World!')

In [None]:
def get_batch(idx, train):

    batch = torch.empty((64,3,3,250,350))

    for i in range(64):

        batch[i][0] = image_to_tensor(train[idx*64+i][0])
        batch[i][1] = image_to_tensor(train[idx*64+i][1])
        batch[i][2] = image_to_tensor(train[idx*64+i][2])

    return batch

In [None]:
vgg16 = models.vgg16(pretrained=True)

class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        self.features = vgg16.features
        self.avgpool = vgg16.avgpool
        self.classifier = vgg16.classifier
        self.classifier[6] = nn.Linear(4096,1,bias=True)
        self.classifier[0] = nn.Linear(75264,4096,bias=True)

    # x represents our data
    def forward(self, x):

        a = self.features(x[0])
        a = self.avgpool(a)
        b = self.features(x[1])
        b = self.avgpool(b)
        c = self.features(x[2])
        c = self.avgpool(c)

        a = torch.cat((a,b,c))
        a = torch.flatten(a)

        return self.classifier(a)

model = Net().cuda()

Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /root/.cache/torch/hub/checkpoints/vgg16-397923af.pth


  0%|          | 0.00/528M [00:00<?, ?B/s]

In [None]:
loss_fn = torch.nn.BCELoss()

In [None]:
# define utility functions to compute classification accuracy and
# perform evaluation / testing

def accuracy(logits: torch.Tensor, label: torch.Tensor) -> torch.Tensor:
  # computes the classification accuracy
  correct_label = torch.argmax(logits, axis=-1) == torch.argmax(label, axis=-1)
  assert correct_label.shape == (logits.shape[0],)
  acc = torch.mean(correct_label.float())
  assert 0. <= acc <= 1.
  return acc

def evaluate(model: torch.nn.Module) -> torch.Tensor:
  # goes through the test dataset and computes the test accuracy
  model.eval()  # bring the model into eval mode
  with torch.no_grad():
    acc_cum = 0.0
    num_eval_samples = 0
    for x_batch_test, y_label_test in test_loader:
      x_batch_test, y_label_test = x_batch_test.cuda(), y_label_test.cuda()
      batch_size = x_batch_test.shape[0]
      num_eval_samples += batch_size
      acc_cum += accuracy(model(x_batch_test), y_label_test) * batch_size
    avg_acc = acc_cum / num_eval_samples
    assert 0 <= avg_acc <= 1
    return avg_acc

In [None]:
# Setup the optimizer (adaptive learning rate method)
optim = torch.optim.Adam(model.parameters(), lr=1e-3)

fname = '../Data/'
food = fname + 'food/'
train = np.loadtxt(fname + "train_triplets.txt", dtype=str)
train = swap(train)

for epoch in range(50):
    # reset statistics trackers
    train_loss_cum = 0.0
    acc_cum = 0.0
    num_samples_epoch = 0
    t = time.time()
    np.random.shuffle(train)
    X = train[:,:3]
    y = train[:,3:]

    # Go once through the training dataset (-> epoch)
    for i in range(train.shape[0]/64 + 1):

        x_batch = get_batch(i,X[i*64:(i+1)*64])
        y_batch = y[i*64:(i+1)*64]

        # zero grads and put model into train mode
        optim.zero_grad()
        model.train()

        # move data to GPU
        x_batch, y_batch = x_batch.cuda(), y_batch.cuda()

        # forward pass
        logits = model(x_batch)
        loss = loss_fn(logits, y_batch)

        # backward pass and gradient step
        loss.backward()
        optim.step()

        # keep track of train stats
        num_samples_batch = x_batch.shape[0]
        num_samples_epoch += num_samples_batch
        train_loss_cum += loss * num_samples_batch
        acc_cum += accuracy(logits, y_batch) * num_samples_batch

    # average the accumulated statistics
    avg_train_loss = train_loss_cum / num_samples_epoch
    avg_acc = acc_cum / num_samples_epoch
    test_acc = evaluate(model)
    epoch_duration = time.time() - t

    # print some infos
    print(f'Epoch {epoch} | Train loss: {train_loss_cum.item():.4f} | '
          f' Train accuracy: {avg_acc.item():.4f} | Test accuracy: {test_acc.item():.4f} |'
          f' Duration {epoch_duration:.2f} sec')

    # save checkpoint of model
    if epoch % 5 == 0 and epoch > 0:
        save_path = f'model_epoch_{epoch}.pt'
        torch.save({'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optim.state_dict()},
                   save_path)
        print(f'Saved model checkpoint to {save_path}')