# Contrastive Loss (SimCLR)


In this session, we are going to implement the SimCLR loss function (https://arxiv.org/abs/2002.05709).

This follows the InfoNCE loss, i.e., uses two different augmented versions of the same image as positive pair and the other images in the batch as negative samples, and the batch construction of the N-pair-mc loss.

prima si calcola in maniera matriciale le feature, poi matrice di similitudine


Con lo scorso lab abbiamo ottenuto y e y', quello che fa SimCLR è aggiungere g() ovvero un projector. Quindi non avremo l'Identity() finale, ma un MLP. Tipicamente MLP ha questa forma: Linear, BatchNorm, ReLU, Linear, BatchNorm.

Generare MLP da cui escono le proiezioni delle uscite della backbone. Su queste z e z' andremo a fare la Contrastive Loss.
Andremo a fare poi forward e backprop.

In [1]:
import os
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

from torchvision.io import read_image

import torchvision
import torchvision.models as models
import torchvision.transforms as transforms

from PIL import Image

In [2]:
class CustomImageDataset(Dataset):
    def __init__(self, data, targets = None, transform=None, target_transform=None): # valori di default
        self.imgs = data
        self.targets = targets
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, idx):
        img = self.imgs[idx]
        if isinstance(img, str):
          image = read_image(img)
        else:
          image = Image.fromarray(img.astype('uint8'), 'RGB')
        if self.transform: # arriva qui con una PIL image
            image1 = self.transform(image) # fa due trasformazioni
            image2 = self.transform(image)
        else:
            image1 = image
            image2 = image
        return image1, image2


class Identity(torch.nn.Module):
  def forward(self, x):
    return x


class SiameseNet(nn.Module):
    def __init__(self, backbone):
        super().__init__()
        self.encoder1 = backbone
        self.encoder1.fc = Identity()

        self.encoder2 = backbone
        self.encoder2.fc = Identity()


    def forward(self, x1, x2, return_dict = True):
        x1 = self.encoder1(x1)
        x2 = self.encoder2(x2)
        return torch.cat((x1, x2), dim = 0)

In [3]:
backbone = models.resnet18()

backbone.fc = nn.Identity()

model = SiameseNet(backbone)

Versione 1 (più efficiente)

In [4]:
class ContrastiveLoss(nn.Module):
    def __init__(self, temperature=0.07):
        super().__init__()
        self.temperature = temperature
        self.criterion = torch.nn.CrossEntropyLoss()

    def forward(self, features):

        # torch.cat((x1, x2), dim=0) # copiare output della siamese
        # normalize features to later compute cosine distance/similarity btw them
        features = F.normalize(features, dim=1)
        # compute the similarity matrix btw features
        # (consider that feature are normalized! so the cosine similarity is ...)

        # Calcola la matrice di similarità coseno
        similarity_matrix = torch.matmul(features, features.T) # prodotto matricale element wise # spesso compare come feature @ features.T

        # andiamo a costruire un array di label di numeri da 0 a 63, fa un arange da 0 a bs, va a prendersi idx

        labels = torch.cat([torch.arange(features.shape[0]//2) for i in range(2)], dim=0) # print
        labels = (labels.unsqueeze(0) == labels.unsqueeze(1)).float() # la unsqueeze aumenta la dimensione di un tensore, qui fa 128,1, modifica la shape # print

        mask = torch.eye(labels.shape[0], dtype = torch.bool)
        labels = labels[~mask].view(labels.shape[0], -1)
        similarity_matrix = similarity_matrix[~mask].view(similarity_matrix.shape[0], -1)
        # assert similarity_matrix.shape == labels.shape

        # select and combine multiple positives
        positives = similarity_matrix[labels.bool()].view(labels.shape[0], -1)

        # select only the negatives
        negatives = similarity_matrix[~labels.bool()].view(similarity_matrix.shape[0], -1) # che effetto ha view? è simile al re-shape, si fa a riforzare la shape di un tensore
        # questo reshape che dimensione ha? 8,


        #print(similarity_matrix)
        #print(similarity_matrix[:, 0].shape)

        # moltiplicazione element wise (elemento dim 512)

        # alla positive do una ground truth (0)
        # sim_mat
        # for
        # si prende la positive pair e la metto in posizione 0, tutti gli altri (2n-1) li metto dalla posizione 1 alla 2n, qui ho tutti i negative
        # i logit sono gli 0, gli altri sono gli elementi bassi, 0 alti


        # create the logits tensor where:
        #   - in the first position there is the similarity of the positive pair
        #   - in the other 2N-1 positions there are the similarity w negatives
        # the shape of the tensor need to be 2Nx2N-1, with N is the batch size
        logits = torch.cat([positives, negatives], dim = 1)
        logits = logits / self.temperature


        # to compute the contrastive loss using the CE loss, we just need to
        # specify where is the similarity of the positive pair in the logits tensor
        # since we put in the first position we create a gt of all zeros
        # N.B.: this is just one of the possible implementation!
        gt = torch.zeros(logits.shape[0], dtype=torch.long) # ground truth
        return self.criterion(logits, gt)

In SimCLR abbiamo la temperature che è un iperparametro.
Per ogni feature andiamo a calcolarci la similitudine in maniera matriciale. Prima le normalizziamo e poi si fa la matrice di similitudine (che è la moltiplicazione element wise delle features tra di loro). Una volta fatto questo, si va distinguere la positive pair rispetto alla negative pair. infine si va a calcolare la cross-entropy loss della positive rispetto ai negative. Per farlo, possiamo fare così: una volta calcolate le similitudini delle positive e delle negative pair, alla positive diamo un gt, una label, che è 0. Conviene fare così: una volta calcolata la matrice di similitudine, ci conviene mettere, dato ogni elemento (quindi con un for), per ogni elemento andiamo a prendere la positive pair e andiamo a metterla in un'altra matrice in prima posizione 0. Tutti gli altri, che sono le 2N - 1, li mettete dalla posizione 1 fino alla 2N. Lo facciamo per ogni elemento.

Alla fine, per calcolare la loss finale, facciamo un cross-entropy. (ha un tensore, logits) Nella prima posizione mettiamo la similitudine della positive pair e nelle altre 2N-1 mettere le similitudini con i negative. Alla fine avremo una matrice che ha come dimensioni 2N x 2N - 1. Quindi nella matrice dei logits tolgo l'elemento stesso, metto in prima posizione l'elemento 0 e dopo metto tutti gli altri. Il risultato è che logits sarà una matrice 2N x 2N - 1

VERSIONE 2 - BASATA SU TENSORI

In [5]:
class ContrastiveLoss(nn.Module):
    def __init__(self, temperature=0.07):
        super().__init__()
        self.temperature = temperature
        self.criterion = torch.nn.CrossEntropyLoss()

    def forward(self, features): # IMPLEMENTAZIONE PIù SEMPLICE
        features = F.normalize(features, dim=1) # dimensione 2n x 512 delle features
        similarity_matrix = torch.matmul(features, features.T) # prodotto matricale element wise # spesso compare come feature @ features.T

        print(similarity_matrix[0])

        batch_size = features.shape[0]//2
        logits = torch.zeros(2*batch_size, 2*batch_size-1)

        # nella prima metà del batch = positive + 64, negative - 64

        for idx, val in enumerate(similarity_matrix): # id elemento con cui stiamo facendo
          row = torch.zeros(2*batch_size-1) # row è di 0

          pos_idx = idx + batch_size if idx < batch_size else idx - batch_size # per trovare l'indice del positive sample bisogna fare idx + batch size nel caso in cui l'indice sia pi+ piccolo del batch size # nell'altro caso io farò idx - bs
          # metto nella prima posizione di row
          row[0] = val[pos_idx]
          row[1:] = torch.tensor([v for i, v in enumerate(val) if i!=idx and i!=pos_idx]) # list of compression
          # prendo 126 elementi che sono in val che non sono nè idx nè pos_idx

          '''
          STESSA COSA DI FARE:

          negatives = []
          for i, v in enumerate(val):
            if i!=idx and i! = pos_idx:
              negatives.append(v)
            row[1:] = torch.tensor(negatives)
          '''

          logits[idx] = row

        logits = logits / self.temperature # formula

        gt = torch.zeros(logits.shape[0], dtype=torch.long) # positive in first position # cross entropy

        return self.criterion(logits, gt)

Let's now use the Dataset which creates the two augmented views for each image and the Siamese Network from the past lab session [1](https://colab.research.google.com/drive/1NJwAFbRiD4MdwWf__6P2Lm0xYk_DNdVu?usp=sharing) and [2](https://colab.research.google.com/drive/1AMkh0q8L5nJScx7v6cMWoK336zqOqDY6?usp=sharing) and create a training loop

In [None]:
data = torchvision.datasets.CIFAR10(root='./data', train=True, download=True) # dataset

color_jitter = transforms.ColorJitter(0.8, 0.8, 0.8, 0.2) # si distorce il colore con una certa probabilità tutti i canali


transform = transforms.Compose([transforms.RandomResizedCrop(size=32),
                                  transforms.RandomHorizontalFlip(),
                                  transforms.RandomApply([color_jitter], p=0.8),
                                  transforms.RandomGrayscale(p=0.2),
                                  transforms.GaussianBlur(kernel_size=int(0.1 * 32)),
                                  transforms.ToTensor()])

trainset = CustomImageDataset(data.data, transform = transform)
dataloader = DataLoader(trainset, batch_size=64, shuffle=True)

model = SiameseNet(models.resnet18())
optimizer = optim.Adam(model.parameters())
criterion = ContrastiveLoss()

for idx, data in enumerate(dataloader):
    view1, view2 = data

    optimizer.zero_grad()
    features = model(view1, view2)
    loss = criterion(features)
    # tensore su cui fare la bp
    loss.backward()
    optimizer.step()

    print(f"batch {idx} loss {loss.item()}")
    print()

    if idx == 3:
        break

# sistemare