# Learning Objective
Understand how self-supervised learning can utilize unlabeled data to make deep learning model generalize better.

# Task (30 points)
Implement a contrastive loss function (marked by #TODO in this notebook).

This contrastive loss function can be defined by yourself. You can consider using [SimCLR](https://arxiv.org/abs/2002.05709). You do not need to use [MoCo](https://arxiv.org/abs/1911.05722), as the implementation is slightly more complex.

Training a model with a proper contrastive loss function should make sure if two random transformations of the same image should still be similar in their latent space, while transformations of different images should be very different. There are many ways to implement this function. You can test your own ideas first before searching GitHub source code for inspiration.

In [1]:
# Load library
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import CIFAR10
from torchvision.models import resnet

# Recommend to use GPU as it will take quite some time if you only use cpu

use_gpu = True
if use_gpu and torch.cuda.is_available():
  device = torch.device("cuda")

In [2]:
# This cell is to prepare data loaders

class CIFAR10Pair(CIFAR10):
    """This class is a subclass of torch.utils.data.Dataset; read its documentation if your have questions
    For each image in the CIFAR10 dataset, return two random transformations of itself
    """
    def __getitem__(self, index):
        img = self.data[index]
        img = Image.fromarray(img)

        if self.transform is not None:
            # im_1 and im_2 are two versions of img and should be similar to each other in latent space
            im_1 = self.transform(img)
            im_2 = self.transform(img)

        return im_1, im_2

batch_size = 512
# Random transform a image into two versions; used for contrastive learning
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(32),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomApply([transforms.ColorJitter(0.4, 0.4, 0.4, 0.1)], p=0.8),
    transforms.RandomGrayscale(p=0.2),
    transforms.ToTensor(),
    transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])])
train_data = CIFAR10Pair(root='data', train=True, transform=train_transform, download=True)
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=16, pin_memory=True, drop_last=True)

# No random transformation; used for conventional supervised learning
test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010])])

memory_data = CIFAR10(root='data', train=True, transform=test_transform)
memory_loader = DataLoader(memory_data, batch_size=batch_size, shuffle=False, num_workers=16, pin_memory=True)

test_data = CIFAR10(root='data', train=False, transform=test_transform)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=16, pin_memory=True)

Files already downloaded and verified




In [3]:
class FeatureExtractor(nn.Module):
  """
  Use ResNet18 as the feature extractor
  Comparing with ImageNet ResNet:
  (i) replaces conv1 with kernel=3, stride=1
  (ii) removes pool1
  """
  def __init__(self, feature_dim=128):
    super().__init__()
    net = resnet.resnet18(num_classes=feature_dim)
    self.net = []
    for name, module in net.named_children():
      if name == 'conv1':
          module = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
      if isinstance(module, nn.MaxPool2d):
          continue
      if isinstance(module, nn.Linear):
          self.net.append(nn.Flatten(1))
      self.net.append(module)
    self.net = nn.Sequential(*self.net)

  def forward(self, x):
    x = self.net(x)
    return x

In [4]:
def train_model(feature_extractor, classifier, optimizer, num_epochs=100):
  """Train the model using only ~1% of labeled training data
  Args:
    feature_extractor: as defined above
    classifier: a simple nn.Linear layer
    optimizer: see code below for details
  """
  train_loss_history = []
  train_acc_history = []
  test_loss_history = []
  test_acc_history = []
  print_every = max(1, num_epochs // 20)
  for epoch in range(num_epochs):
    total_loss = 0
    total_acc = 0
    cnt = 0
    for i, (x_batch, y_batch) in enumerate(memory_loader):
      if i == 1: # Using roughly 1% of labeled training data because len(memory_loader) == 98
        break    # This training process only use the first batch, that is 1/98 entire training dataset.
      x_batch = x_batch.to(device)
      y_batch = y_batch.to(device)
      y_pred = classifier(feature_extractor(x_batch))
      loss = F.cross_entropy(y_pred, y_batch)
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
      total_loss += loss.item() * len(x_batch)
      total_acc += (y_pred.argmax(1) == y_batch).float().sum().item()
      cnt += len(x_batch)
    train_loss_history.append(total_loss / cnt)
    train_acc_history.append(total_acc / cnt)
    with torch.no_grad():
      total_loss = 0
      total_acc = 0
      for x_batch, y_batch in test_loader:
        x_batch = x_batch.to(device)
        y_batch = y_batch.to(device)
        y_pred = classifier(feature_extractor(x_batch))
        loss = F.cross_entropy(y_pred, y_batch)
        total_loss += loss.item() * len(x_batch)
        total_acc += (y_pred.argmax(1) == y_batch).float().sum().item()
      test_loss_history.append(total_loss / len(test_loader.dataset))
      test_acc_history.append(total_acc / len(test_loader.dataset))
    if epoch % print_every == 0:
      print(f'Epoch {epoch}: train_loss={train_loss_history[-1]:.2f}, train_acc={train_acc_history[-1]:.2f}, '
      f'test_loss={test_loss_history[-1]:.2f}, test_acc={test_acc_history[-1]:.2f}')
  plt.plot(train_loss_history, 'ro--', label='train_loss')
  plt.plot(test_loss_history, 'go--', label='test_loss')
  plt.legend()
  plt.show()

  plt.plot(train_acc_history, 'ro--', label='train_acc')
  plt.plot(test_acc_history, 'go--', label='test_acc')
  plt.legend()
  plt.show()

In [5]:
# Without self-supeversied learning, model does not generalize well. The test accuray is less than 30%
feature_dim = 256
num_classes = 10
feature_extractor = FeatureExtractor(feature_dim).to(device)
classifier = nn.Linear(feature_dim, num_classes).to(device)
optimizer = torch.optim.Adam(list(classifier.parameters()) + list(feature_extractor.parameters()),
                            lr=5e-3, weight_decay=1e-4)
#train_model(feature_extractor, classifier, optimizer)

In [15]:
##TODO
# This is what you need to implement
def contrast_loss(feature_extractor, x0, x1):
  """Given a feature extractor that transforms a batch of images to their latent representations,
  Compute the contrastive loss

  Refer to the next cell: x0 and x1 are a batch of transformed images from train_loader
  Suppose batch_size = 512, then both x0 and x1 have shape (512, 3, 32, 32);
  feature_extractor(x0) and feature_extractor(x1) with both return a matrix of shape (batch_size, feature_dim)

  x0[i], x1[i] (i=0,1,2,3...,511) are two random transformations of the i th image;
  therefore they should be similar in the latent space (i.e., the output of feature_extractor)

  In contrast, x0[i], x1[j] (i != j) should be different

  """
  y0 = feature_extractor(x0)
  y1 = feature_extractor(x1)
  #########################################TODO##########################################################
  ## Write your code here; it doesn't need to be too complex; the sample solution only has two lines, but that's just one of many solutions
  ## As long as you can significantly increase test accuracy using self-supervised learning, it means your implmentation of this loss function is working
  # issue with the final output value remaining the same value
  #implemtation from simclr github repsoitory
  z_i = F.normalize(y0, dim=1)
  z_j = F.normalize(y1, dim=1)

  # Concatenate the feature vectors
  z = torch.cat([z_i, z_j], dim=0)

  # Compute the cosine similarity matrix
  similarity_matrix = torch.mm(z, z.t()) / .5

  # Create a mask to exclude similarity with itself
  mask = torch.eye(len(z), dtype=torch.bool)

  # Calculate the numerator (exp(similarity between positive pairs))
  exp_sim_pos = torch.exp(similarity_matrix[~mask].view(len(z), -1))

  # Calculate the denominator (sum of exp(similarity between negative pairs))
  exp_sim_neg = torch.sum(torch.exp(similarity_matrix[~mask].view(len(z), -1)), dim=1)


  loss = torch.mean(-torch.log(exp_sim_pos / exp_sim_neg[:, None]))
  #my implemetation below

  #temp = 1
  #cos_sim = F.cosine_similarity(y0, y1,0)
  #print(cos_sim)

  #exp_cos_sim = torch.exp(cos_sim / temp)

  #sum_exp_sim = torch.sum(exp_cos_sim, axis=0) - exp_cos_sim

  #loss = torch.mean(-torch.log(exp_cos_sim / sum_exp_sim[:, None]))
  #print(loss)
  #######################################################################################################
  return loss

In [16]:
# With contrastive loss defined above, you train perferm self-supervised learning without using the actual labels in the training set
feature_extractor = FeatureExtractor(feature_dim).to(device)
optimizer = torch.optim.Adam(feature_extractor.parameters(), lr=1e-3)
train_loss_history = []
num_epochs = 20
print_every = max(1, num_epochs // 20)
for epoch in range(num_epochs):
  total_loss = 0
  for x0, x1 in train_loader:
    x0 = x0.to(device)
    x1 = x1.to(device)
    loss = contrast_loss(feature_extractor, x0, x1)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    total_loss += loss.item() * len(x0)
  train_loss_history.append(total_loss / len(train_loader.dataset))
  if epoch % print_every == 0:
    print(f'Epoch {epoch}: train_loss={train_loss_history[-1]:.2f}')
plt.plot(train_loss_history, 'ro--', label='train_loss')
plt.legend()
plt.show()



Epoch 0: train_loss=6.88


KeyboardInterrupt: ignored

In [17]:
# Freeze the parameters of the feature extractor after self-supervised learning
# Only train the last linear layer
# You should find out the model generalize much better than that without self-supervised learning
# Note here model generalization refers to the test accuracy;
# You can compare the test accuracy here and the previous one without self-supervised learning
feature_extractor.requires_grad_(False)
classifier = nn.Linear(feature_dim, num_classes).to(device)
optimizer = torch.optim.Adam(classifier.parameters(), lr=5e-3, weight_decay=1e-4)
train_model(feature_extractor, classifier, optimizer)

Epoch 0: train_loss=3.71, train_acc=0.10, test_loss=3.19, test_acc=0.10
Epoch 5: train_loss=3.93, train_acc=0.13, test_loss=4.45, test_acc=0.10


Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x7a46be56a5f0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/dataloader.py", line 1478, in __del__
    self._shutdown_workers()
  File "/usr/local/lib/python3.10/dist-packages/torch/utils/data/dataloader.py", line 1442, in _shutdown_workers
    w.join(timeout=_utils.MP_STATUS_CHECK_INTERVAL)
  File "/usr/lib/python3.10/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.10/multiprocessing/popen_fork.py", line 40, in wait
    if not wait([self.sentinel], timeout):
  File "/usr/lib/python3.10/multiprocessing/connection.py", line 931, in wait
    ready = selector.select(timeout)
  File "/usr/lib/python3.10/selectors.py", line 416, in select
    fd_event_list = self._selector.poll(timeout)
KeyboardInterrupt: 


KeyboardInterrupt: ignored