# **Momentum Contrast (MoCo) Self-Supervised Learning for Feature Extraction from Images**

**Momentum Contrast (MoCo)** is a **self-supervised learning** framework that can learn visual representations from unlabeled images.

**MoCo** is built around two concepts that differentiate it from earlier self-supervised learning methods:

**Dynamic Dictionary**: MoCo utilizes a queue-based dynamic dictionary of encoded representations, allowing for a consistent and extensive set of negative samples. This innovative approach enables effective learning from a broader range of data without necessitating large batches.

**Momentum Encoder**: The framework employs a dual-encoder structure, comprising a query encoder and a key encoder. While the query encoder updates through standard backpropagation, the key encoder updates are governed by a momentum-based mechanism. This ensures that the encoded representations evolve smoothly over time, enhancing the stability and effectiveness of the learning process.

In **this notebook**, we apply the MoCo framework to the **CIFAR-10** dataset (download available from Kaggle,  https://www.kaggle.com/swaroopkml/cifar10-pngs-in-folders), a collection of 60,000 32x32 color images in 10 classes, with 6,000 images per class. We will begin by ignoring the class labels for CIFAR-10 and pretend it is an unlabeled dataset. Our goal is to showcase how MoCo can be used to learn powerful visual representations from an unlabeled dataset. These extracted representations (i.e., features) can then be leveraged in downstream tasks such as classification, which we will demonstrate by building a linear classifier on the MoCo representations.

### **Data download and unzip**


Get CIFAR-10 dataset from Kaggle:
https://www.kaggle.com/swaroopkml/cifar10-pngs-in-folders


In [5]:
! ls sample_data

anscombe.json		     california_housing_train.csv  mnist_train_small.csv
california_housing_test.csv  mnist_test.csv		   README.md


In [6]:
# ! mkdir data_cifar10

In [7]:
# ! ls data_cifar10

In [31]:
# ! unzip "./data_cifar10/cifar10_c.zip"

In [11]:
! ls

cifar10  cifar10.zip  sample_data


In [12]:
# ! unzip "./cifar10.zip"

In [15]:
! ls cifar10/cifar10

test  train


In [30]:
# ! ls cifar10/cifar10/train/deer

### **MoCo Self-Supervised Learning on CIFAR-10**

### Installs

In [1]:
# ! pip install pytorch-lightning
# ! pip install lightly

### **Imports**

In [2]:
%matplotlib inline

import numpy as np
import pandas as pd

import copy

In [3]:
import torch
import torch.nn as nn

import torchvision

import pytorch_lightning as pl

In [16]:
from lightly.data import LightlyDataset
from lightly.loss import NTXentLoss
from lightly.models import ResNetGenerator
from lightly.models.modules.heads import MoCoProjectionHead
from lightly.models.utils import (
    batch_shuffle,
    batch_unshuffle,
    deactivate_requires_grad,
    update_momentum,
)
from lightly.transforms import MoCoV2Transform, utils

### **Initializations**

In [30]:
num_workers = 8
batch_size = 512
memory_bank_size = 4096
seed = 1
max_epochs = 5 #100 # 5 to see execution flow, 100 for actual results, to start with

In [18]:
! ls cifar10/cifar10

test  train


In [19]:
path_to_train = "./cifar10/cifar10/train/"
path_to_test = "./cifar10/cifar10/test/"

### **Configure augmentations and define loaders**

In [20]:
# disable blur because we're working with tiny images
transform = MoCoV2Transform(
    input_size=32,
    gaussian_blur=0.0,
)

In [21]:
# Augmentations typically used to train on cifar-10
train_classifier_transforms = torchvision.transforms.Compose(
    [
        torchvision.transforms.RandomCrop(32, padding=4),
        torchvision.transforms.RandomHorizontalFlip(),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize(
            mean=utils.IMAGENET_NORMALIZE["mean"],
            std=utils.IMAGENET_NORMALIZE["std"],
        ),
    ]
)


In [22]:
# No additional augmentations for the test set
test_transforms = torchvision.transforms.Compose(
    [
        torchvision.transforms.Resize((32, 32)),
        torchvision.transforms.ToTensor(),
        torchvision.transforms.Normalize(
            mean=utils.IMAGENET_NORMALIZE["mean"],
            std=utils.IMAGENET_NORMALIZE["std"],
        ),
    ]
)

In [23]:
# Use the moco augmentations for training moco
dataset_train_moco = LightlyDataset(input_dir=path_to_train, transform=transform)

In [24]:
# We will train a linear classifier on the pre-trained moco model, we
# reuse the test augmentations here (MoCo augmentations are very strong and
# usually reduce accuracy of models which are not used for contrastive learning.
# Our linear layer will be trained using cross entropy loss and labels provided
# by the dataset. Therefore we choose light augmentations.)

dataset_train_classifier = LightlyDataset(
    input_dir=path_to_train, transform=train_classifier_transforms
)

In [25]:
dataset_test = LightlyDataset(input_dir=path_to_test, transform=test_transforms)

### **Dataloaders**

Create the dataloaders to load and preprocess the data.

In [26]:
dataloader_train_moco = torch.utils.data.DataLoader(
    dataset_train_moco,
    batch_size=batch_size,
    shuffle=True,
    drop_last=True,
    num_workers=num_workers,
)

dataloader_train_classifier = torch.utils.data.DataLoader(
    dataset_train_classifier,
    batch_size=batch_size,
    shuffle=True,
    drop_last=True,
    num_workers=num_workers,
)

dataloader_test = torch.utils.data.DataLoader(
    dataset_test,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False,
    num_workers=num_workers,
)



### **Define the MoCo Model using PyTorch-Lightning**

In [27]:
class MocoModel(pl.LightningModule):
    def __init__(self):
        super().__init__()

        # create a ResNet backbone and remove the classification head
        resnet = ResNetGenerator("resnet-18", 1, num_splits=8)
        self.backbone = nn.Sequential(
            *list(resnet.children())[:-1],
            nn.AdaptiveAvgPool2d(1),
        )

        # create a moco model based on ResNet
        self.projection_head = MoCoProjectionHead(512, 512, 128)
        self.backbone_momentum = copy.deepcopy(self.backbone)
        self.projection_head_momentum = copy.deepcopy(self.projection_head)
        deactivate_requires_grad(self.backbone_momentum)
        deactivate_requires_grad(self.projection_head_momentum)

        # create our loss with the optional memory bank
        self.criterion = NTXentLoss(
            temperature=0.1, memory_bank_size=(memory_bank_size, 128)
        )

    def training_step(self, batch, batch_idx):
        (x_q, x_k), _, _ = batch

        # update momentum
        update_momentum(self.backbone, self.backbone_momentum, 0.99)
        update_momentum(self.projection_head, self.projection_head_momentum, 0.99)

        # get queries
        q = self.backbone(x_q).flatten(start_dim=1)
        q = self.projection_head(q)

        # get keys
        k, shuffle = batch_shuffle(x_k)
        k = self.backbone_momentum(k).flatten(start_dim=1)
        k = self.projection_head_momentum(k)
        k = batch_unshuffle(k, shuffle)

        loss = self.criterion(q, k)
        self.log("train_loss_ssl", loss)
        return loss

    def on_train_epoch_end(self):
        self.custom_histogram_weights()

    # We provide a helper method to log weights in tensorboard
    # which is useful for debugging.
    def custom_histogram_weights(self):
        for name, params in self.named_parameters():
            self.logger.experiment.add_histogram(name, params, self.current_epoch)

    def configure_optimizers(self):
        optim = torch.optim.SGD(
            self.parameters(),
            lr=6e-2,
            momentum=0.9,
            weight_decay=5e-4,
        )
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, max_epochs)
        return [optim], [scheduler]

In [31]:
#  Instantiate the model and train it using the lightning trainer.

model = MocoModel()
trainer = pl.Trainer(max_epochs=max_epochs, devices=1, accelerator="gpu")

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


In [32]:
trainer.fit(model, dataloader_train_moco)

INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name                     | Type               | Params
----------------------------------------------------------------
0 | backbone                 | Sequential         | 11.2 M
1 | projection_head          | MoCoProjectionHead | 328 K 
2 | backbone_momentum        | Sequential         | 11.2 M
3 | projection_head_momentum | MoCoProjectionHead | 328 K 
4 | criterion                | NTXentLoss         | 0     
----------------------------------------------------------------
11.5 M    Trainable params
11.5 M    Non-trainable params
23.0 M    Total params
91.977    Total estimated model params size (MB)


Training: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=5` reached.


### **Build a Linear Classifier Using Extracted MoCo Features**

In [33]:
class Classifier(pl.LightningModule):
    def __init__(self, backbone):
        super().__init__()
        # use the pretrained ResNet backbone
        self.backbone = backbone

        # freeze the backbone
        deactivate_requires_grad(backbone)

        # create a linear layer for our downstream classification model
        self.fc = nn.Linear(512, 10)

        self.criterion = nn.CrossEntropyLoss()
        self.validation_step_outputs = []

    def forward(self, x):
        y_hat = self.backbone(x).flatten(start_dim=1)
        y_hat = self.fc(y_hat)
        return y_hat

    def training_step(self, batch, batch_idx):
        x, y, _ = batch
        y_hat = self.forward(x)
        loss = self.criterion(y_hat, y)
        self.log("train_loss_fc", loss)
        return loss

    def on_train_epoch_end(self):
        self.custom_histogram_weights()

    # We provide a helper method to log weights in tensorboard
    # which is useful for debugging.
    def custom_histogram_weights(self):
        for name, params in self.named_parameters():
            self.logger.experiment.add_histogram(name, params, self.current_epoch)

    def validation_step(self, batch, batch_idx):
        x, y, _ = batch
        y_hat = self.forward(x)
        y_hat = torch.nn.functional.softmax(y_hat, dim=1)

        # calculate number of correct predictions
        _, predicted = torch.max(y_hat, 1)
        num = predicted.shape[0]
        correct = (predicted == y).float().sum()
        self.validation_step_outputs.append((num, correct))
        return num, correct

    def on_validation_epoch_end(self):
        # calculate and log top1 accuracy
        if self.validation_step_outputs:
            total_num = 0
            total_correct = 0
            for num, correct in self.validation_step_outputs:
                total_num += num
                total_correct += correct
            acc = total_correct / total_num
            self.log("val_acc", acc, on_epoch=True, prog_bar=True)
            self.validation_step_outputs.clear()

    def configure_optimizers(self):
        optim = torch.optim.SGD(self.fc.parameters(), lr=30.0)
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optim, max_epochs)
        return [optim], [scheduler]

In [34]:
model.eval()

MocoModel(
  (backbone): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): SplitBatchNorm(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): SplitBatchNorm(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): SplitBatchNorm(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (shortcut): Sequential()
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): SplitBatchNorm(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
   

In [35]:
classifier = Classifier(model.backbone)
trainer = pl.Trainer(max_epochs=max_epochs, devices=1, accelerator="gpu")

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


In [36]:
trainer.fit(classifier, dataloader_train_classifier, dataloader_test)

INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name      | Type             | Params
-----------------------------------------------
0 | backbone  | Sequential       | 11.2 M
1 | fc        | Linear           | 5.1 K 
2 | criterion | CrossEntropyLoss | 0     
-----------------------------------------------
5.1 K     Trainable params
11.2 M    Non-trainable params
11.2 M    Total params
44.696    Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=5` reached.


### **Conclusion**

In this Jupyter Notebook, we have implemented the Momentum Contrast (MoCo) framework and its applied it to the CIFAR-10 dataset. MoCo leverages a dynamic dictionary and a momentum encoder to learn visual representations from unlabeled images.

The MoCo framework's efficiently leverages large datasets without requiring extensive labeled data, presenting a significant advantage in many practical scenarios.

We have only trained the MoCo self-supervised feature extraction model and the linear classifier over 5 epochs to understand the workflow in a short time duration. Future studies will train over a larger number of epochs and quantify under what circumstances with regard to dataset size and relative class imbalances that MoCo can be employed most powerfullly.

### **References**

* He, Kaiming, Haoqi Fan, Yuxin Wu, Saining Xie, and Ross Girshick. **"Momentum contrast for unsupervised visual representation learning."** In Proceedings of the IEEE/CVF conference on computer vision and pattern recognition, pp. 9729-9738. 2020.  (MoCo v1)

* https://docs.lightly.ai/self-supervised-learning/examples/moco.html  


* Chen, Xinlei, Haoqi Fan, Ross Girshick, and Kaiming He. **"Improved baselines with momentum contrastive learning."** arXiv preprint arXiv:2003.04297 (2020). (MoCo v2)

* Chen, X., Xie, S. and He, K., 2021. **"An empirical study of training self-supervised vision transformers."** In Proceedings of the IEEE/CVF international conference on computer vision (pp. 9640-9649). (MoCo v3)

