# Dog Embeddings Extractor

## Accessing the Dog Breed Recognition dataset

I have created a directory called "dog-breed-recognition". There, I have put the directory called "dogs" as refering to the dataset itself. For training, it is only used the samples contained at "train" directory.

In [1]:
from google.colab import drive
drive.mount('/content/drive/')
root = '/content/drive/My Drive/Colab Notebooks/dog-breed-recognition'

Mounted at /content/drive/


## Importing basic Python libraries

In [2]:
import os
import sys
import tqdm
import random
import copy

from PIL import Image
import numpy as np
import cv2
import matplotlib.pyplot as plt

## Importing PyTorch library

For GPU usage, go to "Edit > Notebook Settings" and make sure the hardware accelerator is set to GPU.

In [3]:
import torch
import torchvision
from torchvision import transforms

# Creating a PyTorch device, so that inputs, outputs and models are apllied to
#   the available GPU
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

## Splitting dataset into training, validation and test

Considering a split ratio between these three categories, the instances of each class (dog breed) are randomly distributed.

`dataset_labels[<PHASE>]` is a list of `(class_index, instance_index)` occurences, where:
- `class_index` refers to the index of its dog breed;
- `instance_index` refers to the index of the instance at the current dog breed list.

In [4]:
def get_dataset_split_labels(dataset_path, split_ratio):
  '''
  Calculates the split ratio of the training, validation and test sets.

  Parameters
  ----------
  dataset_path : str
    root of the dataset
  split_ratio : list<float>
    ratios for the dataset splitting

  Returns
  -------
  dataset_labels : dict
    list of instance labels for each set
  '''

  # Attribute `split_ratio` to each set individually
  train_ratio, val_ratio, test_ratio = split_ratio

  # `dataset_labels` encodes the list of instance labels for each set
  dataset_labels = { 'train': [], 'val': [], 'test': [] }

  # `dataset_path` divides the dataset in a list of directories, where each
  #   directory represent a class (dog breed). When listing the presented
  #   directories in `dataset_path`, `classes` will contain the list of dog
  #   breeds presented in the dataset
  classes = sorted(os.listdir(dataset_path))
  
  # Iterate through each existing class (`curr_class`) and its index (`i_class`)
  #   (the dataset splitting is done for each class individually)
  for i_class, curr_class in enumerate(classes):

    # `class_path` appends the root of the dataset (`dataset_path`) to the
    #   current class' directory name    
    class_path = os.path.join(dataset_path, curr_class)
    
    # `instances` list all images' filename of the current class directory
    instances = sorted(os.listdir(class_path))
    
    # `n_instances` computes the number of instances of the presented class
    n_instances = len(instances)

    # `labels` encodes a list of pairs of class index and instance index, which
    #   will be used when loading the dataset later
    labels = [(i_class, label) for label in list(range(n_instances))]
    
    # randomize the labels occurences for dataset splitting afterwards
    random.shuffle(labels)

    # Calculate the number of instances of each split for the current class
    train_l = int(n_instances * train_ratio)
    val_l = int(n_instances * val_ratio)
    test_l = int(n_instances * test_ratio)

    # Access the current labels list (`labels`) according to the number of
    #   instances of each split and apply to the list of labels of each split
    #   (these three sets are disjoints)
    curr_train_labels = labels[:train_l]
    curr_val_labels = labels[train_l:train_l + val_l]
    curr_test_labels = labels[train_l + val_l:train_l + val_l + test_l]
    
    # Apply the current labels lists to the final list containing all classses
    dataset_labels['train'] += curr_train_labels
    dataset_labels['val'] += curr_val_labels
    dataset_labels['test'] += curr_test_labels

  return dataset_labels

## Creating the dataset loader

- For reading an entry from the dataset from an index:
- `class_index` and `instance_index` are obtained from the previously generated labels;
- The image path is obtained (`img_path`);
- The image is read and converted to RGB channels (`img`), just in case the original image has a transparency channel (which will be not used) or the original image is in grayscale;
- The network input (`x`) is generated by preprocessing the image. This preprocess depends of the current phase (training, validation or test), since the training phase deals with data augmentation.

This dataset loader aims to provide samples considering the Triplet Margin Loss metric, which consists of:
- An anchor image, which is the image refered to the accessed index;
- A positive image, which refers to an image with the same class (dog breed) as the anchor image;
- A negative image, which refers to an image with a different class from the anchor image.

In [5]:
class ImageDataset(torch.utils.data.Dataset):
  """
  A class to read the dataset instances.

  Attributes
  ----------
  labels : list<tuple<int,int>>
    list of dataset instances that can be accessed
  transform : torch.transforms
    input preprocessing pipeline
  n_classes : int
    number of existing classes (dog breeds) on the dataset
  instances_path : list<list<str>>
    Path to the instances of the dataset presented in `labels`

  Data descriptors
  ----------------
  __getitem__
    Gets the model's triplet inputs from a dataset instance's index.

  __len__
    Gets the number of samples presented in the dataset.
  """

  def __init__(self, dataset_path, labels, transform):
    '''
    Constructs all the attributes for the dataset object.

    Parameters
    ----------
    dataset_path : str
      root of the dataset
    labels : list<tuple<int,int>>
      list of dataset instances that can be accessed
    transform : torch.transforms
      input preprocessing pipeline
    '''

    self.labels = labels
    self.transform = transform

    # `dataset_path` divides the dataset in a list of directories, where each
    #   directory represent a class (dog breed). When listing the presented
    #   directories in `dataset_path`, `self.classes` will contain the list of
    #   dog breeds presented in the dataset
    classes = sorted(os.listdir(dataset_path))
    self.n_classes = len(classes)

    # `classes_path` appends the root of the dataset (`dataset_path`) to the
    #   directory name of all classes (`classes`)
    classes_path = [os.path.join(dataset_path, c) for c in classes]
    self.instances_path = [[os.path.join(class_path, instance)
        for instance in sorted(os.listdir(class_path))]
      for class_path in classes_path]

  def __getitem__(self, index):
    '''
    Gets the model's triplet inputs from a dataset instance's index.

    Parameters
    ----------
    index : int
      index of the instance to be accessed

    Returns
    -------
    anc_x : torch.Tensor
      tensor refering to the preprocessed anchor sample
    pos_x : torch.Tensor
      tensor refering to the preprocessed positive sample
    neg_x : torch.Tensor
      tensor refering to the preprocessed negative sample
    '''

    # Access the indexes of the class (`anc_class_index`) and instance
    #   (`anc_instance_index`) of the anchor image, present in the labels
    anc_class_index, anc_instance_index = self.labels[index]
    
    # Access the indexes of the class (`pos_class_index`) and instance
    #   (`pos_instance_index`) of the positive image
    # As the positive image is from the same dog breed as the anchor image,
    #   they have the same class index
    pos_class_index = anc_class_index

    # As the positive image and the anchor image are not the same, a different
    #   image from the same dog breed is randomly selected
    pos_instance_index = random.choice([instance_index
        for instance_index in range(len(self.instances_path[pos_class_index]))
        if instance_index != anc_instance_index])
    
    # Access the indexes of the class (`neg_class_index`) and instance
    #   (`neg_instance_index`) of the negative image
    # As the negative image is from a different dog breed as the anchor image,
    #   a image from a randomly different dog breed is randomly selected
    neg_class_index = random.choice([class_index
        for class_index in range(self.n_classes)
        if class_index != anc_class_index])
    neg_instance_index = random.choice(
        range(len(self.instances_path[neg_class_index])))
    
    # `[type]_img_path` refers to the filepath of the image refering to
    #   (`[type]_class_index`, `[type]_instance_index`)
    anc_img_path = self.instances_path[anc_class_index][anc_instance_index]
    pos_img_path = self.instances_path[pos_class_index][pos_instance_index]
    neg_img_path = self.instances_path[neg_class_index][neg_instance_index]
    
    # Read image (`[type]_img`) and convert to red-green-blue channels (RGB),
    #   ensuring the inputs will have 3 channels
    anc_img = Image.open(anc_img_path).convert('RGB')
    pos_img = Image.open(pos_img_path).convert('RGB')
    neg_img = Image.open(neg_img_path).convert('RGB')

    # `[type]_x` refers to the image when the preprocessing pipeline
    #   (`self.transform`) is applied to the image (`[type]_img`)
    anc_x = self.transform(anc_img)
    pos_x = self.transform(pos_img)
    neg_x = self.transform(neg_img)

    return anc_x, pos_x, neg_x

  def __len__(self):
    '''
    Gets the number of samples presented in the dataset.
    
    Returns
    -------
    l : int
      the length of the dataset
    '''
    
    l = len(self.labels)
    
    return l

## Creating the CNN model architecture

The model shall have a fixed input size with 3 channels (corresponding to the red, green and blue channels). Also, the model shall output a vector with a size of `n_embeddings`, representing the embeddings from the input image.

A ResNet50-based model was used; its last layer (which corresponds to a fully connecter layer) is replaced by another FCL whose output size correspond to `n_embeddings`.

In [6]:
def embedder_model(n_embeddings):
  '''
  Generates a new CNN ResNet50-based model.

  Parameters
  ----------
  n_embeddings : int
    number of embeddings to be outputted

  Returns
  -------
  x : torch.nn
    the model
  '''

  # First, `x` is a new ResNet50 CNN model, containing pre-trained weights from
  #   ImageNet
  x = torchvision.models.resnet50(pretrained=True)
  
  # Change the final fully connected layer so that the output size matches the
  #   desired `n_embeddings` size. Also, apply sigmoid function, as suggested by
  #   the Lossless Triplet Loss approach
  x.fc = torch.nn.Sequential(
      torch.nn.Linear(2048, n_embeddings),
      torch.nn.Sigmoid())

  return x

## Lossless Triplet Loss

Based on https://towardsdatascience.com/lossless-triplet-loss-7e932f990b24

In [7]:
class LosslessTripletLoss(torch.nn.Module):
  def __init__(self, n_embeddings):
    super(LosslessTripletLoss, self).__init__()
    self.epsilon = 1e-8
    self.n_embeddings = n_embeddings
    self.beta = n_embeddings
      
  def get_distance(self, x1, x2):
    dist = (x1 - x2).pow(2).sum(1)

    return dist

  def forward(self, anc, pos, neg):
    anc_pos_dist = self.get_distance(anc, pos)
    anc_neg_dist = self.get_distance(anc, neg)

    anc_pos_dist = -torch.log(-torch.div(anc_pos_dist, self.beta) + 1 + self.epsilon)
    anc_neg_dist = -torch.log(-torch.div((self.n_embeddings - anc_neg_dist), self.beta) + 1 + self.epsilon)

    loss = anc_neg_dist + anc_pos_dist

    loss = loss.mean()

    return loss

## Model training and validation algorithm

In [7]:
def train(model, criterion, optimizer, n_epochs):
  '''
  Trains the model.
  
  Parameters
  ----------
  model : torch.nn
    the model to be trained
  criterion : torch.nn
    the model's loss metric
  optimizer : torch.optim
    pptimization algorithm
  n_epochs : int
    number of iterations of the training

  Returns
  -------
  model : torch.nn
    the trained model
  best_acc : float
    the accuracy of the trained model
  '''
  
  # Keep track of the best achieved accuracy, loss and the corresponding model
  #   weights
  best_weights = copy.deepcopy(model.state_dict())
  best_acc = 0.0
  best_loss = float('inf')

  # The model iterates a number of times (`n_epochs`)
  for epoch in range(n_epochs):
    
    # Each epoch includes the model weights tuning phase (indicated by the
    #   `train` flag) and the model validation (indicated by the `val` flag)
    for phase in ['train', 'val']:
      
      # Change the model mode (training or validation)
      if phase == 'train':
        model.train()
      else:
        model.eval()

      # `epoch_loss` computes the loss sum (according to the used `criterion`)
      #   of the model iteratively as batches are read
      epoch_loss = 0.0
      
      # `epoch_acc` computes the accuracy of the model when comparing the
      #   difference between anchor-positive embeddings distance and anchor
      #   negative embeddings distance
      epoch_acc = 0.0
      
      # `n_seen_samples` computes the number of triplet input samples that were
      #   already read in the current epoch-phase
      n_seen_samples = 0

      # Using tqdm to iteratively keep track on the number of iterated batches
      #   on the console
      dataloader = tqdm.tqdm(dataloaders[phase], total=len(dataloaders[phase]),
          position=0, leave=True)
      
      # The dataloader refering to the current phase is iterated, in order to
      #   access all triplets of anchor-positive-negative inputs, denoted by
      #   `(anc_x, pos_x, neg_x)`
      for anc_x, pos_x, neg_x in dataloader:

        # `curr_batch_size` computes the number of samples in the current batch
        #   (this may vary when the current batch is the last one)
        curr_batch_size = anc_x.shape[0]
        
        # Increment the number of seen samples on the current epoch-phase
        n_seen_samples += curr_batch_size

        # Reset current gradients
        optimizer.zero_grad()

        # Pass the triplet inputs tensors to the used device (GPU or CPU)
        anc_x = anc_x.to(device)
        pos_x = pos_x.to(device)
        neg_x = neg_x.to(device)

        # Use the model to calculate the embeddings of the triplet inputs
        #   (`anc_y`, `pos_y` and `neg_y`)
        anc_y = model(anc_x)
        pos_y = model(pos_x)
        neg_y = model(neg_x)

        # Calculate the loss of the current batch (according to the `criterion`
        #   used) with respect to the triplet outputs `(anc_y, pos_y, neg_y)`
        loss = criterion(anc_y, pos_y, neg_y)

        # Update the model weights, if the current phase is `train` (if the 
        #   current phase is `val`, the model weights are not changed)
        if phase == 'train':
          loss.backward()
          optimizer.step()

        # Add the current batch's loss to `epoch_loss`
        epoch_loss += loss.item() * curr_batch_size

        # Compute the distance between anchor and positive embeddings of the
        #   current batch (`anc_pos_dists`)
        anc_pos_dists = (anc_y - pos_y).pow(2).sum(1)
        
        # Compute the distance between anchor and negative embeddings of the
        #   current batch (`anc_neg_dists`)
        anc_neg_dists = (anc_y - neg_y).pow(2).sum(1)
        
        # Compute which anchor-positive embeddings distance were smaller than
        #   anchor-negative embeddings distance to `epoch_acc`
        epoch_acc += torch.sum(anc_pos_dists + 1 < anc_neg_dists).item()
        
        # Calculate the current epoch-phase's average loss and correct
        #   anchor-positive and anchor-negative embeddings distance rate
        #   (`curr_loss` and `curr_acc`)
        curr_loss = epoch_loss / n_seen_samples
        curr_acc = epoch_acc / n_seen_samples

        # Iteratively print on console the number of iterated batches, as well
        #   as the current loss and accuracy
        dataloader.set_postfix(Epoch='%s/%s' % (epoch+1, n_epochs),
            Loss=curr_loss, Acc=curr_acc, refresh=True)
      
      # Calculate the final loss and accuracy of the current epoch
      epoch_loss /= len(datasets[phase])
      epoch_acc /= len(datasets[phase])
    
      # Save new weights if the best loss is achieved in the `val` phase
      if phase == 'val' and epoch_loss < best_loss:
        best_acc = epoch_acc
        best_loss = epoch_loss
        best_weights = copy.deepcopy(model.state_dict())
  
  # Apply best weights to the model
  model.load_state_dict(best_weights)

  return model, best_acc, best_loss

## Model testing algorithm

In [8]:
def test(model):
  '''
  Tests the model on an unseen set of samples.

  Parameters
  ----------
  model : torch.nn
    the model to be evaluated

  Returns
  -------
  acc : float
    the final accuracy of the model
  '''

  # Set the model mode to `eval`, as the model weights are not updated
  model.eval()

  # Keep track of the current accuracy (`acc`)
  acc = 0.0
  
  # `n_seen_samples` computes the number of triplet input samples that were
  #   already read
  n_seen_samples = 0

  # Using tqdm to iteratively keep track on the number of iterated batches on
  #   the console
  dataloader = tqdm.tqdm(dataloaders['test'], total=len(dataloaders['test']),
      position=0, leave=True)
  
  # The dataloader refering to the test phase is iterated, in order to
  #   access all triplets of anchor-positive-negative inputs, denoted by
  #   `(anc_x, pos_x, neg_x)`
  for anc_x, pos_x, neg_x in dataloader:
  
    # `curr_batch_size` computes the number of samples in the current batch
    #   (this may vary when the current batch is the last one)
    curr_batch_size = anc_x.shape[0]
  
    # Increment the number of seen samples
    n_seen_samples += curr_batch_size

    # Pass the triplet inputs tensors to the used device (GPU or CPU)
    anc_x = anc_x.to(device)
    pos_x = pos_x.to(device)
    neg_x = neg_x.to(device)

    # Use the model to calculate the embeddings of the triplet inputs
    #   (`anc_y`, `pos_y` and `neg_y`)
    anc_y = model(anc_x)
    pos_y = model(pos_x)
    neg_y = model(neg_x)

    # Compute the distance between anchor and positive embeddings of the
    #   current batch (`anc_pos_dists`)
    anc_pos_dists = (anc_y - pos_y).pow(2).sum(1)
  
    # Compute the distance between anchor and negative embeddings of the
    #   current batch (`anc_neg_dists`)
    anc_neg_dists = (anc_y - neg_y).pow(2).sum(1)
  
    # Compute which anchor-positive embeddings distance were smaller than
    #   anchor-negative embeddings distance to `acc`
    acc += torch.sum(anc_pos_dists + 1 < anc_neg_dists).item()

    # Calculate the current epoch-phase's correct anchor-positive and
    #   anchor-negative embeddings distance rate (`curr_acc`)    
    curr_acc = acc / n_seen_samples

    # Iteratively print on console the number of iterated batches, as well as
    #   the current accuracy
    dataloader.set_postfix(Acc=curr_acc, refresh=True)

  # Calculate the final accuracy of the test
  acc / len(datasets['test'])

  return acc

## Sets of preprocessing operations

Each set refers to each phase (training, validation and test)

- For training phase, samples are randomly cropped to a fixed (224,224) size, randomly flipped horizontally, applied to a torch.Tensor and normalized according to a set of RGB mean and standard deviation values;

- For validation phase, samples are centrally cropped to a fixed (224,224) size, applied to a torch.Tensor and normalized according to a set of RGB mean and standard deviation values;

- For testing phase, samples are resized to a fixed (256,256) size, centrally cropped to a fixed (224,224) size, applied to a torch.Tensor and normalized according to a set of RGB mean and standard deviation values.

In [9]:
dataset_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
    'val': transforms.Compose([
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
    'test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]) }

## Setting up training parameters

In [10]:
# Path containing the dog breed training dataset
dataset_path = os.path.join(root, 'dogs', 'train')

# Split ratio of the training, validation and test portions (these portions sum
#   up to 1.0)
split_ratio = [0.7, 0.15, 0.15]

# Get dataset labels, splitted for each phase
dataset_labels = get_dataset_split_labels(dataset_path, split_ratio)

# Creating PyTorch dataset instance for each phase (training, validation and
#   test)
datasets = {
    'train': ImageDataset(dataset_path, dataset_labels['train'],
        dataset_transforms['train']),
    'val': ImageDataset(dataset_path, dataset_labels['val'],
        dataset_transforms['val']),
    'test': ImageDataset(dataset_path, dataset_labels['test'],
        dataset_transforms['test']) }

# Number of embeddings to be outputted by the model
n_embeddings = 128

# Number of epochs for the model to be trained
n_epochs = 20

# Batch size for each phase
batch_size = 16

# Number of workers for multiprocessing the data loading
n_workers = 8

# Instantiate CNN embeddings extractor model
model = embedder_model(n_embeddings)
model = torch.jit.script(model).to(device)

# # Use Triplet Margin loss
criterion = torch.nn.TripletMarginLoss()
# Use Lossless Triplet loss
# criterion = LosslessTripletLoss(n_embeddings)

# Use SGD as the optimizer
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# Create data loaders for the training, validation and testing steps
#   (Also shuffling samples for unbiased performance)
dataloaders = {
    'train': torch.utils.data.DataLoader(datasets['train'],
        batch_size=batch_size, num_workers=n_workers, shuffle=True),
    'val': torch.utils.data.DataLoader(datasets['val'], batch_size=batch_size,
        num_workers=n_workers, shuffle=True),
    'test': torch.utils.data.DataLoader(datasets['test'], batch_size=batch_size,
        num_workers=n_workers, shuffle=True) }

# Path where to save the trained model
trained_model_ckpt_path = os.path.join(root, 'models', 'embedder.pth')

Downloading: "https://download.pytorch.org/models/resnet50-19c8e357.pth" to /root/.cache/torch/hub/checkpoints/resnet50-19c8e357.pth


HBox(children=(FloatProgress(value=0.0, max=102502400.0), HTML(value='')))




## Executing training

In [12]:
# Generate a trained model (`trained_model`), as well as its accuracy
#   (`val_acc`) and loss (`val_loss`)
trained_model, val_acc, val_loss = train(model, criterion, optimizer, n_epochs)

# Save model to `trained_model_ckpt_path`
torch.save({
    'state_dict': trained_model.state_dict(),
    'acc': val_acc,
    'loss': val_loss,
    'n_embeddings': n_embeddings }, trained_model_ckpt_path)

100%|██████████| 757/757 [21:15<00:00,  1.68s/it, Acc=0.701, Epoch=1/20, Loss=0.364]
100%|██████████| 160/160 [01:26<00:00,  1.85it/s, Acc=0.923, Epoch=1/20, Loss=0.14]
100%|██████████| 757/757 [13:45<00:00,  1.09s/it, Acc=0.869, Epoch=2/20, Loss=0.233]
100%|██████████| 160/160 [01:00<00:00,  2.65it/s, Acc=0.924, Epoch=2/20, Loss=0.138]
100%|██████████| 757/757 [13:44<00:00,  1.09s/it, Acc=0.888, Epoch=3/20, Loss=0.209]
100%|██████████| 160/160 [01:00<00:00,  2.64it/s, Acc=0.934, Epoch=3/20, Loss=0.126]
100%|██████████| 757/757 [13:44<00:00,  1.09s/it, Acc=0.891, Epoch=4/20, Loss=0.202]
100%|██████████| 160/160 [01:00<00:00,  2.65it/s, Acc=0.949, Epoch=4/20, Loss=0.106]
100%|██████████| 757/757 [13:44<00:00,  1.09s/it, Acc=0.898, Epoch=5/20, Loss=0.193]
100%|██████████| 160/160 [01:00<00:00,  2.64it/s, Acc=0.946, Epoch=5/20, Loss=0.109]
100%|██████████| 757/757 [13:45<00:00,  1.09s/it, Acc=0.901, Epoch=6/20, Loss=0.189]
100%|██████████| 160/160 [01:00<00:00,  2.64it/s, Acc=0.957, Epoch

## Executing test

In [11]:
# Instantiating the architecture of the model
trained_model = embedder_model(n_embeddings)

# Load weights from the trained model
trained_model.load_state_dict(torch.load(trained_model_ckpt_path)['state_dict'])
trained_model.eval()
trained_model = torch.jit.script(trained_model).to(device)

# Perform testing and getting test accuracy (`test_acc`)
test_acc = test(trained_model)

100%|██████████| 160/160 [06:30<00:00,  2.44s/it, Acc=0.976]
