<a href="https://colab.research.google.com/github/sharwinbobde/siamese-nn-oneshot-reproduction/blob/validation-and-fix/notebooks/test-bench/siamese-colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Reproducing Omniglot experiment in the Siamese NNs for One Shot Recognition Paper

In this notebook we reproduce Table 1 in the original 
[Siamese NN Paper](https://www.cs.cmu.edu/~rsalakhu/papers/oneshot1.pdf)

[Original MSc Thesis](http://www.cs.toronto.edu/~gkoch/files/msc-thesis.pdf).

We start from this [code](https://github.com/sorenbouma/keras-oneshot) implemented in Keras and try to translate it to use the PyTorch library


# Running the experiment on Google Colab

First import the libraries necessary to run and define our Siamese NN implementation


In [0]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torchsummary import summary

from torch.utils import data
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image


## Definition of the dataset class that will hold our examples

We define a special dataset that will hold our samples and labels. Tha dataset has certaion functionality that varies depending on the parameters used:

1. Create a dataset based on input files
2. Create a dataset given another dataset
3. Create a dataset given the numpy arrays of the data and the labels
4. Define extra transformations: 
  - In case no transformations are given the images are just transformed into Tensors and normalized
  - In case we provide some affine transformations the dataset outputs random transformed images instead of the originals. If we want to transform the dataset we need to record and save these modified images and then put them in another dataset with transformations turned off so the images are consistent across runs.

In [0]:
class SiameseDataset(data.Dataset):
    """Dataset that reads the data from an npy file and 
    returns a pair to the loader"""
    def __init__(self, data_path=None, labels_path=None, 
                 transform=None, dataset: data.Dataset =None, 
                 data : np.ndarray = None, labels: np.ndarray = None,
                 mean : float = None, std : float = None,
                 transform_data=False):
        self.transform_data = transform_data

        # If we're given another dataset, just take that
        if dataset is not None:
            self.data = dataset.data
            self.labels = dataset.labels
            self.transforms = dataset.transforms

        # We can also pass the data and labels as an array
        elif data is not None:
            self.data = data
            self.labels = labels

            self.mean = mean
            self.std = std 

            
            self.normalize = transforms.Normalize(mean=(self.mean,),
                                                std = (self.std,))
            self.transforms = transforms.Compose([
                transforms.ToTensor(),
                self.normalize
            ])

        # If not, that means that we load it from a file
        else:
            # Load the data and labels
            self.data = np.load(data_path)
            self.labels = np.load(labels_path)

            # for training set, calculate mean and std
            # to normalize
            if mean == None and std == None:
                # stats of the dataset
                self.mean = np.mean(self.data[:,:,:])
                self.std = np.std(self.data[:,:,:])
            # for test set, use mean and std from
            # the train set to normalize
            else:
                self.mean = mean
                self.std = std
            # Normalize by default!
            self.normalize = transforms.Normalize(mean=(self.mean,),
                                                std = (self.std,))
            # We apply the transformations that are given, so we can 
            # join the datasets

            if transform is not None:
              # If we're given transforms it means
              # that we're trying to apply the affine transformations
              self.transforms = transforms.Compose([
                  transform
              ])
            else:
              # If we're not given transforms just return the
              # normalized tensor
              print("Using the default transformations")
              self.transforms = transforms.Compose([
                    transforms.ToTensor(),
                    self.normalize                                
              ])
              
    def __len__(self):
        return len(self.data)
    
    def get_images(self, index):
        _x1 = self.data[index,0,:,:]
        _x2 = self.data[index,1,:,:]
        label = self.labels[index]
        return Image.fromarray(_x1), Image.fromarray(_x2), label
        
    
    def __getitem__(self, index):
        """ Gets the next pair from 
        the dataset and its corresponding label
        (0 or 1 depending on if they're the same
        or a different letter)"""
        _x1 = self.data[index,0,:,:]
        _x2 = self.data[index,1,:,:]
        label = self.labels[index]
        
        # Convert to PIL Images so 
        # we can transform them with affine transforms
        # Just needed to generate the dataset
        if self.transform_data:
            _x1 = Image.fromarray(_x1)
            _x2 = Image.fromarray(_x2)
            
            # we need to convert the x's to images to apply the transforms
            return np.array(self.transforms(_x1)), np.array(self.transforms(_x2)), label
        else:
          # We're trying to train the dataset, so give
          # the data in float32 version that's better for training
          # and apply the ToTensor and normalization transformations
            _x1 = _x1.astype(np.float32)
            _x2 = _x2.astype(np.float32)
            label = label.astype(np.float32)
            return self.transforms(_x1), self.transforms(_x2), label
    
# Some easy functions to visualize the data 
def show_pair(x1, x2, lab):
    """Function to show two images of the dataset side by side"""
    # x1 = x1.numpy()
    # x2 = x2.numpy()
    f ,(ax1, ax2) = plt.subplots(1, 2, sharey= True)
    ax1.imshow(x1.squeeze())
    ax2.imshow(x2.squeeze())
    plt.show()
    print('same' if lab == 1 else 'different')
    
def show_image_pair(i1, i2, lab):
    f ,(ax1, ax2) = plt.subplots(1, 2, sharey= True)
    ax1.imshow(i1)
    ax2.imshow(i2)
    plt.show()
    print('same' if lab == 1 else 'different')
    

### Set up the folder in Google Drive and define the data path

In [0]:
import os
from google.colab import drive
drive.mount('/content/drive/')
!ls "/content/drive/My Drive/Deep Learning Q3"

# Change the current directory to the path so it's more comfortable to work
path = "/content/drive/My Drive/Deep Learning Q3"
os.chdir(path)

## Define the workflow for Dataset Augmentation

In case we wish to augment a given dataset we just need to pass said dataset to this function. It will take care of:
1. Creating 8x affine transformations of the original images
2. Computing the mean and std of the resulting dataset
3. Returning a concatenated version of the original dataset and the one with the transformed images

In [0]:
# In case we want to create affine transformations...
import gc


def augment_dataset(d: SiameseDataset) -> SiameseDataset:
  """ Augments the dataset and returns a siamese dataset
  with 9x as much data, the original data in the argument dataset
  plus 8 affine transformations of that input data"""
  # Create a data loader of the dataset
  loader = data.DataLoader(d, batch_size=15000)

  # Altered samples of the input data
  _altered = None
  mean = None
  std = None

  # Check the size of the batches and so on
  # Read in batches of 15000, and do it 
  for j in range(8):
      gc.collect()
      print("starting with round ",j)
      for i, (x1, x2, _) in enumerate(loader):
          if i % 1 == 0:
              print(i)
          x1 = np.expand_dims(x1, 1)
          x2 = np.expand_dims(x2, 1)
          # concatenate the arrays by their second axis
          _data = np.concatenate((x1,x2), axis = 1)
          _mean = np.mean(_data)
          _std = np.std(_data)
          if mean is None:
            mean = _mean
            std = _std
          else:
            mean = (mean*len(_altered) +  _mean*len(_data))/(len(_altered)+len(_data))
            std = (std*len(_altered) +  _std*len(_data))/(len(_altered)+len(_data))
          # add them to the dataset
          if _altered is None:
              _altered = _data
          else:
              # Concatenate the existing data and the new batch
              _altered = np.concatenate((_altered, _data), axis = 0)
      
      print(f'Size of the datasets -> {_altered.shape}')

  # Now create a new dataset with the newly defined data
  # Concatenate the original dataset with the new one
  all_data = np.concatenate((d.data, _altered), axis = 0)
  labels = np.tile(d.labels, 9)
  # Add mean of the original datset
  mean = (mean*len(_altered) +  d.mean*len(d))/(len(_altered)+len(d))
  std = (std*len(_altered) +  d.std*len(d))/(len(_altered)+len(d))
  d = SiameseDataset(data = all_data, labels = labels, mean = mean, std = std)
  return d

## Load the different datasets form the files

We load the training, validation and test data from their respective files.

In case we want to transform the dataset we create the training dataset with the affine transformations and `transform_data=True` so that it outputs altered images, and call the function defined above.

(With larger datasets calculating the mean and variance when creating them can cause OOM error, for that reason we approximate by inputing the mean and variance of another training dataset so that step is ommited)



In [0]:
# define the paths of the training data
train_data_path = "datasets/trainX_150k.npy"
train_labels_path = "datasets/trainY_150k.npy"

# Affine transformations to be done on the train data
affine = transforms.RandomAffine(degrees = (-10,10), 
                                 translate=(0.1,0.1),
                                 scale = (0.8, 1.2),
                                 shear = (-0.3, 0.3), 
                                 fillcolor=255)

# Create the dataset without affine transformations
# Mean of the 270k dataset 236.26801215923408
# std of the 270k dataset 66.52629108400019
train_d = SiameseDataset(train_data_path, train_labels_path, 
                         mean = 236.26801215923408, std = 66.52629108400019)


# In case we want to augment the dataset we would run the two lines 
# below instead:
# train_d = SiameseDataset(train_data_path, train_labels_path, 
#                          transform_data=True, transform=affine)
# train_d = augment_dataset(train_d)


# Print the statistics of the dataset
print(f"Mean of {train_d.mean} and {train_d.std}")
print("Loaded data")

In [0]:
# Load the validation data
valid_data_path = "datasets/validationX.npy"
valid_labels_path = "datasets/validationY.npy"

valid_d = SiameseDataset(valid_data_path, valid_labels_path)

# Print the statistics of the dataset
print("Loaded validation set with shape ",valid_d.data.shape)
print(f"Mean of {valid_d.mean} and {valid_d.std}")

In [0]:
# Load the test data
test_data_path = "datasets/testX.npy"
test_labels_path = "datasets/testY.npy"

test_d = SiameseDataset(test_data_path, test_labels_path)

# Print the statistics of the dataset
print("Loaded test set with shape ",test_d.data.shape)
print(f"Mean of {test_d.mean} and {test_d.std}")

-------------------------------------
## Definition of the network architecture

In [0]:
# This is the CNN that will be used within the Siamese Network 
# to calculate the similarity score.
# It ouputs a 4096 flat representation of the image that will be used 
# later in the output layer of the whole Siamese Network
class ConvNet(nn.Module):
  """ Convolutional NN used in pair inside the siamese Network """
  def __init__(self):
    super(ConvNet, self).__init__()
    self.conv1 = nn.Conv2d(1, 64, 10)
    self.pool = nn.MaxPool2d(2, 2)
    self.conv2 = nn.Conv2d(64, 128, 7)
    self.conv3 = nn.Conv2d(128,128,4)
    self.conv4 = nn.Conv2d(128,256, 4)
    self.fc1 = nn.Linear(256*6*6, 4096)
  
  def forward(self, x):
    out = self.pool(F.relu(self.conv1(x)))
    out = self.pool(F.relu(self.conv2(out)))
    out = self.pool(F.relu(self.conv3(out)))
    out = F.relu(self.conv4(out))
    out = out.view(-1, 256*6*6)
    # We get the h feature vectors
    out = torch.sigmoid(self.fc1(out))
    return out


# Siamese network that wraps the convnet and performs the computations 
# of the final layer so we have a similarity score as an output
class SiameseNet(nn.Module):
  """Siamese Net combining two ConvNets"""
  def __init__(self, net):
    super(SiameseNet, self).__init__()
    # Instantiate the convnet
    self.convnet = net
    # Final layer and output
    self.prediction_layer = nn.Linear(4096,1)

  def forward(self,x1, x2):
    """Computes the forward given two images"""
    # We use the same convnet twice with the same 
    # weights and store the outputs. 
    # It's another way of simmulating having two networks
    h1 = self.convnet(x1)
    h2 = self.convnet(x2)
    h = self.calculate_l1_distance(h1, h2)
    out = self.prediction_layer(h)
    # We don't perform the sigmoid here but include it 
    # in the BCE with logits for numerical stability
    # Without the Log-sum-exp trick it underflows quite regularly
    return out
  
  def calculate_l1_distance(self, h1, h2):
    """Calculates l1 distance between the two given vectors"""
    return torch.abs(h1-h2)


torch.manual_seed(12)

# How to initialize the weights according to the paper
def weights_init(model):
  np.random.seed(12)
  if isinstance(model, nn.Conv2d):
    nn.init.normal_(model.weight, mean = 0.0, std = 1e-2)
    nn.init.normal_(model.bias, mean=0.5, std = 1e-2)
  elif isinstance(model, nn.Linear):
    nn.init.normal_(model.weight, mean= 0.0, std = 0.2)
    nn.init.normal_(model.bias, mean=0.5, std = 1e-2)



### Create the Siamese Network and Initialize weights according to specifications
- Conv layers: 
  - Weights: Normal(0, 1e-2)
  - Bias: Normal(0.5, 1e-2)
- Linear layers: 
  - Weights: Normal(0, 0.2)
  - Bias: Normal(0.5, 1e-2)

In [0]:
# Create the siamese network and initialize the weights
conv = ConvNet()
siamese = SiameseNet(conv)
siamese.apply(weights_init)

# Send the network to the GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 
siamese.to(device)

---
## Hyperparameter Setting 

We use the Adam optimizer with different weight decays for the internal Convnet layers and for the final output layer from the siamese network

We use a constant small LR of 3e-4 for the entire network

In [0]:
import torch.optim as optim

learning_rate = 3e-4
regularization = 2e-4

optimizer = optim.Adam(
    [
     {"params": siamese.convnet.parameters()},
     {"params": siamese.prediction_layer.parameters(), "weight_decay": 1e-3}
    ],
    lr = learning_rate,
    weight_decay = regularization
)

n_epochs = 200

### Define the Loss (CrossEntropy) and the Adam optimizer

We set two different weight decay rates as done in the keras code, as it certainly shows really good results this way, as well as a fixed (could be reduced in the future) learning rate of 3e-4 using the Adam Optimizer

We choose BCEWithLogits in order to improve the stability of teh network compared to when we use just BCE, since it makes use of the log sum exp trick thus avoiding underflow.

In [0]:
criterion = nn.BCEWithLogitsLoss()

---------------------------------
## Training and Validation

Definition of the train and validation loops. We perform basic operations and print the results of the forward passes every N iterations to track the performance.

We choose to validate every round and save the best model yet to disk so it can be loaded again for the validation loop

In [0]:
def train(model, train_data_loader, validate_data_loader,
          model_save_path, checkpoint_path,
          validate_every = 10 , save_every = 50):
  """ Train the network with two parameters, one is how often should we validate
  and the other is how often should we save a checkpoint"""

  best_accuracy = 0

  # define the loader of the dataset


  for epoch in range(n_epochs):
    running_loss = 0.0
    i = 0
    
    for X1, X2, y in train_data_loader:
      # set network to learning mode
      model.train()

      # send to gpu
      X1 = X1.to(device)
      X2 = X2.to(device)
      y = y.to(device)
      
      # make gradients zero before forward prop
      optimizer.zero_grad()

      # reshape inputs
      X1 = X1.view(-1, 1, 105, 105)
      X2 = X2.view(-1, 1, 105, 105)
      y = y.view(-1, 1)

      # forward prop
      outputs = model(X1, X2)

      # compute loss
      loss = criterion(outputs, y)

      # backprop and gradient descent step
      loss.backward()
      optimizer.step()

      # print statistics
      running_loss += loss.item()
      if i % 50 == 0:
        print('[%d, %5d] loss: %.3f lr: %.5f' %
                  (epoch + 1, i + 1, running_loss / (i+1), optimizer.param_groups[0]['lr']))
      i+=1

    # Update the learning rate
    # optim_scheduler.step()

    # every `validate_every` epochs,
    # get metrics from validation set
    if epoch % validate_every == 0:
      accuracy = validate(model, validate_data_loader)

      # if accuracy is higest till now,
      # save model
      if accuracy > best_accuracy:
        print("Saving best model")
        best_accuracy = accuracy
        torch.save(model.state_dict(), model_save_path)

    # save model every `save_every` epochs
    if epoch > 0 and epoch % save_every == 0:
      torch.save(model.state_dict(), checkpoint_path)


## Validation Loop

In this validation loop we loop through the validation set and calculate the accuracy of the model.

We can do this as often as it is said in the training loop. For a thorough evaluation we can use validate_every= 1

In [0]:
def validate(model, data_loader):
  """ Validates the model and computes the accuracy"""
  
  # set network to validation mode
  model.eval()
  print("Validating model!")

  correct = 0
  total = 0
  with torch.no_grad():
    for x1, x2, y in data_loader:

      # Send data to device
      x1 = x1.to(device)
      x2= x2.to(device)
      y = y.to(device)

      # Appropriate view
      x1 = x1.view(-1, 1, 105, 105)
      x2 = x2.view(-1, 1, 105, 105)
      y = y.view(-1,1)

      # forward prop
      outputs = model(x1, x2)
      # Translate the outputs to 0 or 1
      predicted = torch.round(torch.sigmoid(outputs))

      total += y.size(0)
      correct += (predicted == y).sum().item()
    
    # return the accuracy
    print("Accuracy of the network on the val set %.3f %%" % (100*correct /total))
    return 100*correct/total

---
## Running the Experiment

In [0]:
# Define where the model will be saved to persistent storage
model_save_path = os.path.join("saved_models", "best.th")
checkpoint_path = os.path.join("saved_models", "checkpoint.th")

# Create the data loaders from the training and validation set
train_loader = data.DataLoader(train_d, batch_size=128, shuffle=True, pin_memory=True, num_workers=4)
val_loader = data.DataLoader(valid_d, shuffle=True, batch_size=128, pin_memory=True, num_workers=4)

# Train the model
train(siamese, train_loader, val_loader, model_save_path, checkpoint_path, validate_every=1) 

# Validating on the Test Set

In [0]:
# load best model for testing
model_save_path = os.path.join("saved_models", "best270k.th")
siamese.load_state_dict(torch.load(model_save_path, map_location=device))
# create data loader for test set
test_loader = data.DataLoader(test_d, shuffle=True, batch_size=128, pin_memory=True, num_workers=4)

# Here we can validate on the val_loader or test_loader
validate(siamese, val_loader)

# One-Shot task

Below we show the special structures and tasks needed to conduct the oneshot task.
1. We define a oneshot dataset that returns ready-made batches or subset for the one-shot task
2. We define the loop that iterates through all the subsets of the dataset and checks the one-shot accuracy



In [0]:
# Create a specific oneshot dataset since the dimensionality
# of the data is a bit special and different from the traditional
# siamese dataset
class OneShotDataset(data.Dataset):
  def __init__(self, data_path, labels_path, mean = None, std = None):
    # Data is 400 x set_size x 2 x H x W
    # We use the set size as a batch size
    self.data = np.load(data_path)
    self.labels = np.load(labels_path)

    # Calculate the mean for normalization if not 
    # given
    if mean is None:
      self.mean = np.mean(self.data)
      self.std = np.std(self.data)
    else:
      self.mean = mean
      self.std = std
    
    # Define the transformations 
    self.normalize = transforms.Normalize(mean = (self.mean,),
                                          std = (self.std, ))
    self.transforms = transforms.Compose([
                                          self.normalize
    ])

  def __len__(self):
    return len(self.data)

  # When trying to get data items return the tensor and normalize
  # We need for every batch to extract the test image and the subset 
  # and return X1, X2 and labels with X1 and X2 being (20x105x105) and
  # the labels being (20x1)
  def __getitem__(self, index):
    batches = self.data[index]
    x1 = torch.Tensor(batches[:,0, :, :].reshape(20,105, 105).astype(np.float32))
    x2 = torch.Tensor(batches[:,1, :, :].reshape(20,105,105).astype(np.float32))
    
    labels = torch.Tensor(self.labels[index].astype(np.float32))
    return self.transforms(x1), self.transforms(x2), labels

  def get_images(self, index):
    batches = self.data[index]
    x1 = batches[:,0, :, :][0]
    x2 = batches[:,1, :, :]
    label = self.labels[index]
    return Image.fromarray(x1), [Image.fromarray(i) for i in x2], label

# Visualize the images returned by the loader
def show_oneshot_pair(x1, x2, lab):
    """Function to show two images of the dataset side by side"""
    x1 = x1 *oneshot_d.std + oneshot_d.mean
    x2 = x2 *oneshot_d.std + oneshot_d.mean
    x1 = x1.numpy()
    x2 = x2.numpy()
    f ,(ax1, ax2) = plt.subplots(1, 2, sharey= True)
    ax1.imshow(x1.squeeze())
    ax2.imshow(x2.squeeze())
    plt.show()


In [0]:
# Test the argmax of the network output is the same 
# to the one of the labels
def oneshot_test(model, data_loader):
  model.eval()

  print("Oneshot testing!")

  correct = 0
  total = 0
  with torch.no_grad():
    for x1, x2, y in data_loader:

      # Send data to device
      x1 = x1.to(device)
      x2= x2.to(device)
      y = y.to(device)

      # Appropriate view
      x1 = x1.view(-1, 1, 105, 105)
      x2 = x2.view(-1, 1, 105, 105)
      y = y.view(-1,1)

      # forward prop
      outputs = torch.sigmoid(model(x1, x2))
      # Translate the outputs to 0 or 1

      # Check accuracy for this batch
      total += 1
      if torch.argmax(outputs) == torch.argmax(y):
        correct += 1
    
    # return the accuracy
    print("Accuracy of the network on the oneshot set %.3f %%" % (100*correct /total))
    return 100*correct/total


In [0]:
# Load the dataset from storage
oneshot_data_path = "datasets/X_oneshot.npy"
oneshot_labels_path = "datasets/Y_oneshot.npy"
oneshot_d = OneShotDataset(oneshot_data_path, oneshot_labels_path)

# Print statistics
print("Loaded oneshot set with shape ",oneshot_d.data.shape)
print(f"Mean of {oneshot_d.mean} and {oneshot_d.std}")

# load best model for testing
model_save_path = os.path.join("saved_models", "best270k.th")
siamese.load_state_dict(torch.load(model_save_path, map_location=device))

# create the loader
oneshot_loader = data.DataLoader(oneshot_d, shuffle=False, batch_size=1, pin_memory=True)

# test the output
oneshot_test(siamese, oneshot_loader)

## Some code to visualize all the examples of the task

Shows all the pairs of images considered inside a batch and their similarity score

In [0]:
a = iter(oneshot_loader)
a.next()
a.next()
x1, x2, y = a.next()

print(x1.shape)
# Appropriate view
x1 = x1.view(-1, 1, 105, 105)
x2 = x2.view(-1, 1, 105, 105)
y = y.view(-1,1)

outputs = torch.sigmoid(siamese(x1.cuda(), x2.cuda()))



for i in range(x1.shape[0]):
  show_oneshot_pair(x1[i], x2[i], y[i])
  print(outputs[i].item())
