# Self-Supervised Learning <a id="top"></a>

---
## Table of Contents

* [Self-Supervised Learning Overview](#ssl_overview)
    * [Core Concepts](#core_concepts)
    * [How It Works](#how_it_works)
* [Self-Supervised Learning Tutorial](#ssl_tutorial)
    * [Imports](#imports)
    * [Dataset Preparation](#dataset_prep)
    * [Model Architecture](#model_architecture)
    * [Projection Head](#projection_head)
    * [Simple Contrastive Learning of Representations](#simclr)
    * [Constrastive Loss Function](#constrastive_loss)
    * [Init the Model](#init_model)
    * [Training Loop](#training_loop)
* [Downstream Task (Image Classification)](#img_classification)
    * [Setup](#setup)
    * [Feature Extraction](#feature_extraction)
    * [Linear Classifier](#linear_classifier)
    * [Init the Classifier](#init_classifier)
    * [Train the Classifier](#train_classifier)
    * [Evaluate the Classifier](#eval_classifier) 

# Self-Supervised Learning Overview <a class="anchor" id="ssl_overview"></a>

Self-supervised learning (SSL) leverages unsupervised learning for tasks that conventionally require supervised learning. SSL has been gaining a lot of interests in recent years for its ability to learn from unlabeled data, reduce annotation costs, and facilitate transferable representations

Instead of relying on labeled datasets to understand semantic meanings, self-supervised models generate implicit labels from unstructured data. This enables the model to extract meaningful features from the data, allowing it to learn useful representations even without explicit labels.

SSL is particularly useful in fields like computer vision and natural language processing (NLP) where obtaining large amounts of labeled data can be challenging (i.e. anomaly detection).

A core technique in self-supervised learning is contrastive learning which focuses on maximizing the similarity between representations of similar data points and minimizing the similarity between dissimilar ones. Imagine showing your model two images: one of a cat and another of a dog. Contrastive learning encourages the model to create representations where the cat image's representation is closer to another cat image's representation than it is to the dog image's representation.

## How It Works <a class="anchor" id="how_it_works"></a>

- In supervised learning, ground truth labels are directly provided by human experts.
- In self-supervised learning, tasks are designed such that “ground truth” can be inferred from unlabeled data.
- SSL tasks fall into two categories:
  - Pretext Tasks: Train AI systems to learn meaningful representations of unstructured data. These learned representations can be subsequently used in downstream tasks.
  - Downstream Tasks: Reuse pre-trained models on new tasks, a technique known as "transfer learning"

# Self-Supervised Learning Tutorial <a class="anchor" id="ssl_tutorial"></a>

## Imports <a class="anchor" id="imports"></a>

In [1]:
import time

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

In [2]:
# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

## Dataset Preparation <a class="anchor" id="dataset_prep"></a>

For this tutorial, we'll use the CIFAR-10 dataset. You can download and load it using torchvision

In [3]:
# Data Augmentation
class Cutout:
    def __init__(self, n_holes, length):
        self.n_holes = n_holes
        self.length = length

    def __call__(self, img):
        h, w = img.size(1), img.size(2)
        mask = np.ones((h, w), np.float32)

        for n in range(self.n_holes):
            y = np.random.randint(h)
            x = np.random.randint(w)

            y1 = int(max(0, y - self.length // 2))
            y2 = int(min(h, y + self.length // 2))
            x1 = int(max(0, x - self.length // 2))
            x2 = int(min(w, x + self.length // 2))

            mask[y1:y2, x1:x2] = 0

        mask = torch.from_numpy(mask)
        mask = mask.expand_as(img)

        return img * mask

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomResizedCrop(32),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomAffine(degrees=10, translate=(0.1, 0.1)),
    transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.2),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    Cutout(n_holes=1, length=16)  # Introduce holes in images
])

In [4]:
batch_size = 128
num_workers = 16

# Load CIFAR-10 Train Dataset
train_dataset = datasets.CIFAR10(root='./data', train=True, 
                                 download=True, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, 
                          shuffle=True, num_workers=num_workers)

# Load CIFAR-10 Test Dataset
test_dataset = datasets.CIFAR10(root='./data', train=False, 
                                download=True, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, 
                         shuffle=False, num_workers=num_workers)

Files already downloaded and verified
Files already downloaded and verified


## Model Architecture <a class="anchor" id="model_architecture"></a>

Define a simple convolutional neural network (CNN) as our base encoder

In [5]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2),
            # Add additional convolutional layers
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2)
            )

    def forward(self, x):
        return self.encoder(x)

## Projection Head <a class="anchor" id="projection_head"></a>

Add a projection head to project the encoded features into a lower-dimensional space

In [6]:
class ProjectionHead(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(ProjectionHead, self).__init__()
        self.projection_head = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(inplace=True),
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        return self.projection_head(x)

## Simple Contrastive Learning of Representations <a class="anchor" id="simclr"></a>

Combine the encoder and projection head into the SimCLR model

In [7]:
class SimCLR(nn.Module):
    def __init__(self, encoder, projection_head):
        super(SimCLR, self).__init__()
        self.encoder = encoder
        self.projection_head = projection_head

    def forward(self, x):
        features = self.encoder(x)
        features = features.view(features.size(0), -1)  # Flatten the features
        projections = self.projection_head(features)
        return features, projections

## Contrastive Loss Function <a class="anchor" id="constrastive_loss"></a>

Define the contrastive loss function

In [8]:
class ContrastiveLoss(nn.Module):
    def __init__(self, temperature=0.5):
        super(ContrastiveLoss, self).__init__()
        self.temperature = temperature

    def forward(self, features, projections):
        bs = features.size(0)
        features = nn.functional.normalize(features, dim=1)
        similarity_matrix = torch.matmul(features, features.T) / self.temperature
        mask = torch.eye(bs, dtype=torch.bool).cuda()
        loss = F.cross_entropy(similarity_matrix, torch.arange(bs).cuda())
        return loss

## Init Model <a class="anchor" id="init_model"></a>

In [9]:
# Compiling the model
encoder = Encoder().to(device)
projection_head = ProjectionHead(2048, 256, 128).to(device) # Update projection head input dimension
model = SimCLR(encoder, projection_head).to(device)

# Hyperparameter Tuning (Experiment with different learning rates and epochs)
learning_rate = 0.0005  # Adjust based on experimentation
num_epochs = 1000  # Adjust based on experimentation

# Define optimizer and loss function
criterion = ContrastiveLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

## Training Loop <a class="anchor" id="training_loop"></a>

Define the training loop

In [10]:
start = time.time()

for epoch in range(num_epochs):
    total_loss = 0
    for batch in train_loader:
        images, _ = batch
        images = images.to(device)
        features, projections = model(images)
        loss = criterion(features, projections)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        
    # Print information every 5 epochs or at the last epoch
    if (epoch + 1) % 50 == 0 or epoch == num_epochs - 1:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss/len(train_loader):.4f}")

end = time.time()
print(f"\nTraining took {(end - start)/60} mins")

Epoch [50/1000], Loss: 4.6771
Epoch [100/1000], Loss: 4.8399
Epoch [150/1000], Loss: 4.8239
Epoch [200/1000], Loss: 4.7703
Epoch [250/1000], Loss: 4.7547
Epoch [300/1000], Loss: 4.6278
Epoch [350/1000], Loss: 4.8477
Epoch [400/1000], Loss: 4.7877
Epoch [450/1000], Loss: 4.7393
Epoch [500/1000], Loss: 4.8495
Epoch [550/1000], Loss: 4.8394
Epoch [600/1000], Loss: 4.8507
Epoch [650/1000], Loss: 4.8476
Epoch [700/1000], Loss: 4.8485
Epoch [750/1000], Loss: 4.8497
Epoch [800/1000], Loss: 4.8508
Epoch [850/1000], Loss: 4.8508
Epoch [900/1000], Loss: 4.8508
Epoch [950/1000], Loss: 4.8508
Epoch [1000/1000], Loss: 4.8508

Training took 86.49322359959284 mins


# Downstream Task (Image Classification) <a class="anchor" id="img_classification"></a>

Simple linear classifier trained on top of the frozen encoder of your SimCLR model

## Setup <a class="anchor" id="setup"></a>

In [11]:
# Load your trained SimCLR model and set it to evaluation mode
model.eval()
model.to(device)

SimCLR(
  (encoder): Encoder(
    (encoder): Sequential(
      (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (1): ReLU(inplace=True)
      (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (4): ReLU(inplace=True)
      (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (6): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (7): ReLU(inplace=True)
      (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
      (9): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (10): ReLU(inplace=True)
      (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
  )
  (projection_head): ProjectionHead(
    (projection_head): Sequential(
      (0): Linear(in_features=2048, out_features=256, bias=True)
      (1): ReLU(in

### Feature Extraction <a class="anchor" id="feature_extraction"></a>

In [12]:
# # Extract features from the test dataset using the encoder of SimCLR
# def extract_features(data_loader, model):
#   features = []
#   labels = []
#   for images, targets in data_loader:
#     with torch.no_grad():
#       features_batch, _ = model(images.to(device))
#       features.append(features_batch)
#       labels.append(targets)
#   return torch.cat(features, dim=0), torch.cat(labels, dim=0)

# # Extract features from the test dataset
# test_features, test_labels = extract_features(test_loader, model)

### Linear Classifier <a class="anchor" id="linear_classifier"></a>

In [None]:
# Define a simple linear classifier
class LinearClassifier(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(LinearClassifier, self).__init__()
        self.fc1 = nn.Linear(input_dim, 512)  # Increased hidden layer size
        self.relu = nn.ReLU(inplace=True)
        self.dropout1 = nn.Dropout(p=0.2)  # Dropout layer for regularization
        self.fc2 = nn.Linear(512, 256)
        self.dropout2 = nn.Dropout(p=0.2)
        self.fc3 = nn.Linear(256, num_classes)  # Additional hidden layer

    def forward(self, x):
        x = self.dropout1(self.relu(self.fc1(x)))
        x = self.dropout2(self.relu(self.fc2(x)))
        x = self.fc3(x)
        return x

### Init the Classifier <a class="anchor" id="init_classifier"></a>

In [13]:
# Initialize the classifier (assuming reduced feature dim is 192 * 4 * 4)
classifier = LinearClassifier(input_dim=192 * 4 * 4, 
                              num_classes=10).to(device)

# Hyperparameter Tuning (Experiment with different learning rates and epochs)
learning_rate = 0.0001  # Adjust based on experimentation
num_epochs = 100  # Adjust based on experimentation

# Define optimizer and loss function
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(classifier.parameters(), lr=learning_rate, 
                             weight_decay=0.001)

### Train the Classifier <a class="anchor" id="train_classifier"></a>

In [14]:
start = time.time()

# Train the linear classifier
for epoch in range(num_epochs):
    classifier.train()
    for features, labels in train_loader:
        # Reshape features if necessary (same as in extract_features)
        features = features.view(features.size(0), -1).to(device)
        # print(f"Feature Shape: {features.shape}")
        labels = labels.to(device)
        outputs = classifier(features)
        loss = criterion(outputs, labels)
    
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Print information every 5 epochs or at the last epoch
    if (epoch + 1) % 5 == 0 or epoch == num_epochs - 1:
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

end = time.time()
print(f"\nTraining took {(end - start)/60} mins")

Epoch [5/100], Loss: 1.9865
Epoch [10/100], Loss: 2.0728
Epoch [15/100], Loss: 1.9549
Epoch [20/100], Loss: 1.9938
Epoch [25/100], Loss: 1.7308
Epoch [30/100], Loss: 1.9159
Epoch [35/100], Loss: 1.9260
Epoch [40/100], Loss: 1.9332
Epoch [45/100], Loss: 1.7531
Epoch [50/100], Loss: 1.9723
Epoch [55/100], Loss: 2.0345
Epoch [60/100], Loss: 1.9939
Epoch [65/100], Loss: 1.9165
Epoch [70/100], Loss: 1.8173
Epoch [75/100], Loss: 1.8681
Epoch [80/100], Loss: 2.0231
Epoch [85/100], Loss: 1.8092
Epoch [90/100], Loss: 1.9763
Epoch [95/100], Loss: 1.8566
Epoch [100/100], Loss: 1.9622

Training took 8.280885032812755 mins


### Evaluate the Classifier <a class="anchor" id="eval_classifier"></a>

In [15]:
classifier.eval()
correct = 0
total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images = images.view(images.size(0), -1).to(device)
        labels = labels.to(device)
        outputs = classifier(images)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Accuracy on the test set: {(100 * correct / total):.2f}%")

Accuracy on the test set: 33.28%


## Notes

This tutorial provides a basic implementation of contrastive learning with SimCLR. You can further experiment by adjusting hyperparameters, using different datasets, or exploring advanced techniques like data augmentations and different architectures.

**[Go to Top](#top)**