This notebook shows how to refactor and train a [SentenceTransformer](https://www.sbert.net/) model architecture using [NVIDIA NeMo](https://docs.nvidia.com/deeplearning/nemo/user-guide/docs/en/main/starthere/intro.html).

Install all necessary packages

In [None]:
"""
You can run either this notebook locally (if you have all the dependencies and a GPU) or on Google Colab.

Instructions for setting up Colab are as follows:
1. Open a new Python 3 notebook.
2. Import this notebook from GitHub (File -> Upload Notebook -> "GITHUB" tab -> copy/paste GitHub URL)
3. Connect to an instance with a GPU (Runtime -> Change runtime type -> select "GPU" for hardware accelerator)
4. Run this cell to set up dependencies.
"""
# If you're using Google Colab and not running locally, run this cell.

## Install dependencies
!pip install wget
!apt-get install sox libsndfile1 ffmpeg
!pip install text-unidecode

## Install NeMo
BRANCH = 'r1.20.0'
!python -m pip install git+https://github.com/NVIDIA/NeMo.git@$BRANCH#egg=nemo_toolkit[all]

## Install Huggingface PEFT
!pip install peft

Import all necessary libraries

In [None]:
from transformers import AutoTokenizer, AutoModel
from sentence_transformers.losses import BatchSemiHardTripletLoss
from peft import get_peft_config, PeftModel, PeftConfig, get_peft_model, LoraConfig, TaskType

import torch
from torch import nn, Tensor
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.optim import AdamW

import nemo
from nemo.core.classes import NeuralModule, ModelPT, typecheck
from nemo.core.neural_types import *
from nemo.core.config import Config
import pytorch_lightning as ptl
from omegaconf import OmegaConf, MISSING
from nemo.core.classes.common import PretrainedModelInfo
from nemo.core import Dataset

import os
from sklearn.preprocessing import LabelEncoder
import pandas as pd
import copy

If running on Google Colab, we load the training data as a Pandas dataframe from Google Drive. Otherwise, we assume it is in the current folder

In [None]:
if 'google.colab' in str(get_ipython()):
  from google.colab import drive
  drive.mount('/content/gdrive')
  csv_path = '/content/gdrive/My Drive/Colab Data'
else:
  csv_path = '.'

df_sample = pd.read_csv(os.path.join(csv_path, 'df_sample_train.csv')
df_sample

Unnamed: 0,news,category
0,The Hiking Trip That Helped Me Reconnect With ...,HEALTHY LIVING
1,Michelle Obama Wants You To Drink More Water B...,WELLNESS
2,9 Sins Against Superfoods You've probably seen...,WELLNESS
3,Being Tall Might Up Your Risk For This Cancer ...,HEALTHY LIVING
4,Why Is It So Hard to Forgive Yourself? It won'...,HEALTHY LIVING
...,...,...
12050,Provigil: Narcolepsy Drug Being Taken By Peopl...,WELLNESS
12051,Debunking The Holiday Suicide Myth,HEALTHY LIVING
12052,Here's The Real Truth About Mental Health It's...,HEALTHY LIVING
12053,Tamiflu Approved By FDA For Infants With New F...,WELLNESS


Here we create a custom NeMo *Dataset* class that prepares the inputs for the model

In [None]:
class NewsCategoryDataset(Dataset):
    def __init__(self, csv_path, tokenizer):
        """
        Args:
        csv_path (string): Path to the CSV file with 'news' and 'category'.
        tokenizer (AutoTokenizer): Tokenizer from Hugging Face's Transformers library.
        """
        # Load the dataset from the CSV file
        self.data = pd.read_csv(csv_path)

        # Initialize the tokenizer
        self.tokenizer = tokenizer

        # Initialize and fit the label encoder
        self.label_encoder = LabelEncoder()
        self.labels = self.label_encoder.fit_transform(self.data['category'])

    def __len__(self):
        """Returns the size of the dataset."""
        return len(self.data)

    def __getitem__(self, idx):
        """
        Args:
        idx (int): The index of the sample to fetch.

        Returns:
        dict: Dictionary containing the tokenized 'news' and its corresponding label-encoded 'category'.
        """
        # Fetch the news text from the dataset
        news_text = self.data.loc[idx, 'news']

        # Tokenize the news text
        tokens = self.tokenizer(news_text, padding='max_length', truncation=True, return_tensors='pt')

        # Fetch the label-encoded category for this news
        label = self.labels[idx]

        return tokens['input_ids'].squeeze(0), tokens['token_type_ids'].squeeze(0), tokens['attention_mask'].squeeze(0), label

    @property
    def output_types(self):
      return {
          'input_ids': NeuralType(('B', 'T'), EmbeddedTextType()),
          'token_type_ids': NeuralType(('B', 'T'), IntType(), optional=True),
          'attention_mask': NeuralType(('B', 'T'), MaskType()),
          'label': NeuralType(('B'), LabelsType())
      }

We create all NeMo *NeuralModule* classes that will be part of our model. Notice that we are using LoRA to fine tune the encoder layer

In [None]:
# Encoder Layer
class EncoderLayer(NeuralModule):
    def __init__(self, model_name='sentence-transformers/all-MiniLM-L6-v2', **kwargs):
        super().__init__(**kwargs)

        self.encoder = AutoModel.from_pretrained(model_name)
        lora_target_modules = [f'encoder.layer.{n}.attention.self.query' for n in range(6)]
        lora_target_modules = lora_target_modules + [f'encoder.layer.{n}.attention.self.key' for n in range(6)]
        lora_target_modules = lora_target_modules + [f'encoder.layer.{n}.attention.self.value' for n in range(6)]
        peft_config = LoraConfig(
        task_type=TaskType.FEATURE_EXTRACTION, inference_mode=False, r=16, lora_alpha=16, lora_dropout=0.1, bias='all', target_modules=lora_target_modules
        )
        self.encoder = get_peft_model(self.encoder, peft_config)

        self.tokenizer = AutoTokenizer.from_pretrained(model_name)

    @typecheck()
    def forward(self, **encoded_inputs):
        return self.encoder(input_ids=encoded_inputs['input_ids'],
                            token_type_ids=encoded_inputs['token_type_ids'],
                            attention_mask=encoded_inputs['attention_mask'])

    @property
    def input_types(self):
        return {
            'input_ids': NeuralType(('B', 'T'), EmbeddedTextType()),
            'token_type_ids': NeuralType(('B', 'T'), IntType(), optional=True),
            'attention_mask': NeuralType(('B', 'T'), MaskType())
        }

    @property
    def output_types(self):
        return {'last_hidden_states': NeuralType(('B', 'T', 'D'), EncodedRepresentation())}

# Pooling Layer
class PoolingLayer(NeuralModule):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @typecheck()
    def forward(self, **pooling_inputs):
        last_hidden_states = pooling_inputs['last_hidden_states']
        attention_mask = pooling_inputs['attention_mask']
        input_mask_expanded = attention_mask.unsqueeze(-1).expand(last_hidden_states.size()).float()
        return torch.sum(last_hidden_states * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

    @property
    def input_types(self):
        return {
            'last_hidden_states': NeuralType(('B', 'T', 'D'), EncodedRepresentation()),
            'attention_mask': NeuralType(('B', 'T'), MaskType())
        }

    @property
    def output_types(self):
        return {'sentence_embeddings': NeuralType(('B', 'D'), EncodedRepresentation())}

# Normalization Layer
class NormLayer(NeuralModule):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)

    @typecheck()
    def forward(self, sentence_embeddings):
        return F.normalize(sentence_embeddings, p=2, dim=1)

    @property
    def input_types(self):
        return {'sentence_embeddings': NeuralType(('B', 'D'), EncodedRepresentation())}

    @property
    def output_types(self):
        return {'sentence_embeddings': NeuralType(('B', 'D'), EncodedRepresentation())}

# Loss Function
class TripletLoss(BatchSemiHardTripletLoss, NeuralModule):
    def __init__(self):
        super(TripletLoss, self).__init__(None)

    @typecheck()
    def forward(self, **loss_inputs):
        return self.batch_semi_hard_triplet_loss(loss_inputs['labels'], loss_inputs['sentence_embeddings'])

    @property
    def input_types(self):
        return {
            'sentence_embeddings': NeuralType(('B', 'D'), EncodedRepresentation()),
            'labels': NeuralType(('B'), LabelsType())
        }

    @property
    def output_types(self):
        return {'loss': NeuralType((), LossType())}

And then we create the NeMo *ModelPT* class, which makes use of all *NeuralModule* classes we defined earlier, and also includes everything we need to train and test it with PyTorch Lightning

In [None]:
# Sentence Transformer Model
class SentenceTransformer(ModelPT):
    def __init__(self, cfg: OmegaConf, trainer: ptl.Trainer = None):
      super().__init__(cfg=cfg, trainer=trainer)

      self.encoder_layer = self.from_config_dict(self.cfg.encoder)
      self.pooling_layer = self.from_config_dict(self.cfg.pooling)
      self.normalization_layer = self.from_config_dict(self.cfg.normalization)

      self.loss = TripletLoss()

    # def forward(self, encoded_inputs):
    #   outputs = self.encoder_layer(encoded_inputs)
    def forward(self, **encoded_inputs):
      outputs = self.encoder_layer(input_ids=encoded_inputs['input_ids'],
                                   token_type_ids=encoded_inputs['token_type_ids'],
                                   attention_mask=encoded_inputs['attention_mask'])
      sentence_embeddings = self.pooling_layer(last_hidden_states=outputs[0],
                                               attention_mask=encoded_inputs['attention_mask'])
      norm_embeddings = self.normalization_layer(sentence_embeddings=sentence_embeddings)
      return norm_embeddings

    @property
    def input_types(self):
      return {
          'input_ids': NeuralType(('B', 'T'), EmbeddedTextType()),
          'token_type_ids': NeuralType(('B', 'T'), IntType(), optional=True),
          'attention_mask': NeuralType(('B', 'T'), MaskType())
      }

    @property
    def output_types(self):
      return {'sentence_embeddings': NeuralType(('B', 'D'), EncodedRepresentation())}

    @classmethod
    def list_available_models(cls) -> PretrainedModelInfo:
      return None

    def _setup_data_loader(self, cfg):
      dataset = NewsCategoryDataset(csv_path=cfg.csv_path, tokenizer=EncoderLayer().tokenizer)
      return DataLoader(
        dataset=dataset,
        batch_size=cfg.batch_size,
        shuffle=cfg.shuffle,
        collate_fn=dataset.collate_fn,  # <-- this is necessary for type checking
        pin_memory=cfg.pin_memory if 'pin_memory' in cfg else False,
        num_workers=cfg.num_workers if 'num_workers' in cfg else 0
    )

    def setup_training_data(self, train_data_config: OmegaConf):
      self._train_dl = self._setup_data_loader(train_data_config)

    def setup_validation_data(self, val_data_config: OmegaConf):
      self._validation_dl = self._setup_data_loader(val_data_config)

    def setup_test_data(self, test_data_config: OmegaConf):
      self._test_dl = self._setup_data_loader(test_data_config)

    def configure_optimizers(self):
      no_decay = ['bias', 'LayerNorm.weight']
      optimizer_grouped_parameters = [
          {'params': [p for n, p in self.named_parameters() if not any(nd in n for nd in no_decay)],
          'weight_decay_rate': self.cfg.optim.weight_decay},
          {'params': [p for n, p in self.named_parameters() if any(nd in n for nd in no_decay)],
          'weight_decay_rate': 0.0}
      ]
      optimizer = AdamW(optimizer_grouped_parameters, lr=self.cfg.optim.learning_rate, eps=self.cfg.optim.adam_epsilon)
      return optimizer

    def step_(self, split, batch, batch_idx=None):
      # labels = batch['label']
      # embeddings = self({'input_ids': batch['input_ids'], 'token_type_ids': batch['token_type_ids'], 'attention_mask': batch['attention_mask']})
      labels = batch[-1]
      # inputs = {'input_ids': batch[0], 'token_type_ids': batch[1], 'attention_mask': batch[2]}
      sentence_embeddings = self(input_ids=batch[0], token_type_ids=batch[1], attention_mask=batch[2])
      loss = self.loss(labels=labels, sentence_embeddings=sentence_embeddings)
      key = 'loss' if split == 'train' else f"{split}_loss"
      self.log(key, loss)
      return {key: loss}

    def training_step(self, *args, **kwargs):
      return self.step_('train', *args, **kwargs)

    def validation_step(self, *args, **kwargs):
      return self.step_('val', *args, **kwargs)

    def test_step(self, *args, **kwargs):
      return self.step_('test', *args, **kwargs)

    def multi_validation_epoch_end(self, outputs, dataloader_idx: int = 0):
      val_loss_mean = torch.stack([x['val_loss'] for x in outputs]).mean()
      return {'val_loss': val_loss_mean}

    def multi_test_epoch_end(self, outputs, dataloader_idx: int = 0):
      test_loss_mean = torch.stack([x['test_loss'] for x in outputs]).mean()
      return {'test_loss': test_loss_mean}

Here we define a simple set of configurations that we will use to instantiate, train, and test the model, using *omegaconf*

In [None]:
# utility function for building the class path
def get_class_path(cls):
  return f'{cls.__module__}.{cls.__name__}'

common_config = OmegaConf.create({
    'model_name': MISSING,
    'csv_path_train': MISSING,
    'csv_path_val': MISSING,
    'csv_path_test': MISSING
})

encoder_config = OmegaConf.create({
    '_target_': get_class_path(EncoderLayer),
    'model_name': '${model.model_name}'
})

pooling_config = OmegaConf.create({
    '_target_': get_class_path(PoolingLayer)
})

normalization_config = OmegaConf.create({
    '_target_': get_class_path(NormLayer)
})

optim_config = OmegaConf.create({
    'learning_rate': 1e-5,
    'adam_epsilon': 1e-8,
    'weight_decay': 0.2

})

train_ds_config = OmegaConf.create({
    'csv_path': '${model.csv_path_train}',
    'shuffle': True,
    'batch_size': 32
})

val_ds_config = OmegaConf.create({
    'csv_path': '${model.csv_path_val}',
    'shuffle': False,
    'batch_size': 4
})

test_ds_config = OmegaConf.create({
    'csv_path': '${model.csv_path_test}',
    'shuffle': False,
    'batch_size': 4
})

# create the model config with the common config first
model_config = OmegaConf.create({
    'model': common_config
})

# then attach the sub-module configs
model_config.model.encoder = encoder_config
model_config.model.pooling = pooling_config
model_config.model.normalization = normalization_config
model_config.model.optim = optim_config
model_config.model.train_ds = train_ds_config
model_config.model.val_ds = val_ds_config
model_config.model.test_ds = test_ds_config

In [None]:
print(OmegaConf.to_yaml(model_config))

model:
  model_name: ???
  csv_path_train: ???
  csv_path_val: ???
  csv_path_test: ???
  encoder:
    _target_: __main__.EncoderLayer
    model_name: ${model.model_name}
  pooling:
    _target_: __main__.PoolingLayer
  normalization:
    _target_: __main__.NormLayer
  optim:
    learning_rate: 1.0e-05
    adam_epsilon: 1.0e-08
    weight_decay: 0.2
  train_ds:
    csv_path: ${model.csv_path_train}
    shuffle: true
    batch_size: 32
  val_ds:
    csv_path: ${model.csv_path_val}
    shuffle: false
    batch_size: 4
  test_ds:
    csv_path: ${model.csv_path_test}
    shuffle: false
    batch_size: 4



Here we define a PyTorch Lightning *Trainer*

In [None]:
if torch.cuda.is_available():
  accelerator = 'gpu'
else:
  accelerator = 'cpu'

trainer = ptl.Trainer(devices=1, accelerator=accelerator, precision=16, max_epochs=3)

INFO:pytorch_lightning.utilities.rank_zero:Using 16bit None Automatic Mixed Precision (AMP)
INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs


We instantiate the model with the configuration we defined earlier, but here passing the paths to the datasets and changing some of the training parameters

In [None]:
cfg = copy.deepcopy(model_config)
cfg.model.model_name = "sentence-transformers/all-MiniLM-L6-v2"
cfg.model.csv_path_train = "/content/gdrive/My Drive/Colab Data/df_sample_train.csv"
cfg.model.csv_path_val = "/content/gdrive/My Drive/Colab Data/df_sample_val.csv"
cfg.model.csv_path_test = "/content/gdrive/My Drive/Colab Data/df_sample_test.csv"
cfg.model.optim.learning_rate = 5e-4
cfg.model.train_ds.batch_size = 64

model = SentenceTransformer(cfg.model, trainer=trainer)

Print the model architecture

In [None]:
model

SentenceTransformer(
  (encoder_layer): EncoderLayer(
    (encoder): PeftModelForFeatureExtraction(
      (base_model): LoraModel(
        (model): BertModel(
          (embeddings): BertEmbeddings(
            (word_embeddings): Embedding(30522, 384, padding_idx=0)
            (position_embeddings): Embedding(512, 384)
            (token_type_embeddings): Embedding(2, 384)
            (LayerNorm): LayerNorm((384,), eps=1e-12, elementwise_affine=True)
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (encoder): BertEncoder(
            (layer): ModuleList(
              (0-5): 6 x BertLayer(
                (attention): BertAttention(
                  (self): BertSelfAttention(
                    (query): Linear(
                      in_features=384, out_features=384, bias=True
                      (lora_dropout): ModuleDict(
                        (default): Dropout(p=0.1, inplace=False)
                      )
                      (lora_A): ModuleDict(


Checking the number of trainable parameters after setting LoRA for the encoder self-attention layers

In [None]:
model.encoder_layer.encoder.print_trainable_parameters()

trainable params: 247,296 || all params: 22,934,400 || trainable%: 1.0782754290498116


Train the model

In [None]:
trainer.fit(model)

INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:pytorch_lightning.callbacks.model_summary:
  | Name                | Type         | Params
-----------------------------------------------------
0 | encoder_layer       | EncoderLayer | 22.9 M
1 | pooling_layer       | PoolingLayer | 0     
2 | normalization_layer | NormLayer    | 0     
3 | loss                | TripletLoss  | 0     
-----------------------------------------------------
247 K     Trainable params
22.7 M    Non-trainable params
22.9 M    Total params
45.869    Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

      rank_zero_warn("One of given dataloaders is None and it will be skipped.")
    


Training: 0it [00:00, ?it/s]

INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=3` reached.


Evaluate the model on the test dataset

In [None]:
trainer.test(model)

INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

[{'test_loss': 4.773415565490723}]

Save the model to disk using the serialization provided by NeMo

In [None]:
model.save_to('./nemo_sentence_transformer.nemo')

In [None]:
temp_model = SentenceTransformer.restore_from('./nemo_sentence_transformer.nemo')

[NeMo W 2023-08-21 00:28:20 modelPT:161] If you intend to do training or fine-tuning, please call the ModelPT.setup_training_data() method and provide a valid configuration file to setup the train data loader.
    Train config : 
    csv_path: /content/gdrive/My Drive/Colab Data/df_sample_train.csv
    shuffle: true
    batch_size: 64
    
[NeMo W 2023-08-21 00:28:20 modelPT:174] Please call the ModelPT.setup_test_data() or ModelPT.setup_multiple_test_data() method and provide a valid configuration file to setup the test data loader(s).
    Test config : 
    csv_path: /content/gdrive/My Drive/Colab Data/df_sample_test.csv
    shuffle: false
    batch_size: 4
    


[NeMo I 2023-08-21 00:28:21 save_restore_connector:249] Model SentenceTransformer was successfully restored from /content/nemo_sentence_transformer.nemo.


Read the saved model from disk and evaluate it again on the test dataset

In [None]:
temp_model.setup_multiple_test_data(temp_model.cfg.test_ds)
trainer.test(temp_model)

INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Testing: 0it [00:00, ?it/s]

[{'test_loss': 4.773415565490723}]