In [None]:
!pip install --quiet numpy scipy scikit-learn lightning transformers datasets pytorch-crf

# Sentence Classification

The following are the steps to build a prototype for Sentence Classification using Pytorch Lightning and Hugging Face Transformers. There is a Dataset module as well as a LightningModule.

## Dataset for Sentence Classification

This is the prototype for a dataset class for Sentence Classification. 

I am not quite convinced that the tokenizer should be part of this, but in this way we do not have to store the whole padded data at once (unlike the version showed in [Lightining's documentation](https://lightning.ai/docs/pytorch/stable/notebooks/lightning_examples/text-transformers.html)).

As a drawback, in this version we cannot really benefit from the **Fast Tokenizers**, since there's a warning that using a Fast Tokenizer with a collator function for padding after is slower than processing the whole batch using the tokenizer call. However, if we have that version, we will have to pad/tokenize everything in advance which can be memory consuming.

The dataset is only for a single split. It needs to be bounded inside a LightningDataModule to have each split in a DataLoader.

In [None]:
import csv
import logging

from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, DataCollatorWithPadding, AutoConfig, AutoModelForSequenceClassification
from typing import Dict, List, Optional, Tuple

In [None]:
logger = logging.getLogger(__name__)

# Some fo the training inputs are too large, this is a hackish solution, should resort to limiting the file column size
# More info here: https://stackoverflow.com/questions/54042406/error-field-larger-than-field-limit-131072
csv.field_size_limit(256<<10);

In [None]:
DATASET = '../data/casimedicos/dev_relations.tsv'
MODEL = 'distilbert-base-uncased'

In [None]:
class SentenceClassificationDataset(Dataset):
    """
    TODO: Docstring. Explain the way the dataset is expected (label first with the __label__ at the beginning)
    """
    def __init__(self,
                 tokenizer_model_or_path: str,
                 path_to_dataset: Optional[str] = None,
                 dataset: Optional[List[Tuple[str, str, str]]] = None,
                 labels: Optional[Dict[str, int]] = None,
                 delimiter: str = '\t',
                 quotechar: str = '"'):
        if path_to_dataset is not None and dataset is not None:
            logger.warn("Both path and dataset were provided. Ignoring the path, using the parsed dataset.")
            path_to_dataset = None
        elif path_to_dataset is None and dataset is None:
            raise ValueError("Provide either path to a file or a dataset as a list of tuples")

        if path_to_dataset is not None:
            with open(path_to_dataset, "rt") as fh:
                csv_reader = csv.reader(fh, delimiter=delimiter, quotechar=quotechar)
                dataset = list(csv_reader)

        target = [d[0].lstrip('__label__') for d in dataset]  # TODO: this assumes the dataset was not parsed with this same function

        self.labels = labels if labels is not None else {lbl: idx for idx, lbl in enumerate(sorted(set(target)))}  # TODO: This is a problem if we have 3 different splits and not all labels appear in all splits
        self.dataset = [
            {
                "text": d[1],
                "text_pair": d[2]
            }
            for d in dataset
        ]
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_model_or_path, use_fast=True)
        self.target = [self.labels[t] for t in target]

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        data = self.dataset[idx]

        # WARNING: This does not work with a range of values (if idx is a range instead of a single value)
        tokenized_data = self.tokenizer(**data, truncation=True)
        tokenized_data['label'] = self.target[idx]

        return tokenized_data

As you can see, now we can just use the SentenceClassificationDataset class as is in the DataLoader along a DataCollatorWithPadding and the DataLoader will pad the batch and in general will give us the batch that can be directly used over the Transformer Model.

In [None]:
dataset = SentenceClassificationDataset(MODEL, DATASET)
loader = DataLoader(
    dataset=dataset,
    batch_size=32,
    shuffle=True,
    drop_last=False,
    collate_fn=DataCollatorWithPadding(dataset.tokenizer)
)

There's no need for extra steps and we have the loss for this specific batch of data already implemented within the model, we can directly use that as the loss function for the `LightningModule`'s `train_step`.

## Lightning Sentence Classification Model

We follow with the prototype implementation for a Lightning Module in charge of finetuning a transformer.

In [None]:
import lightning.pytorch as pl
import torch

from abc import ABCMeta
from transformers import AdamW, AutoConfig, AutoModelForSequenceClassification, get_linear_schedule_with_warmup
from typing import Dict

First define a base class for the Module (it will be reused for the case of the Sequence Tagging Module).

In [None]:
class BaseTransformerModule(pl.LightningModule, metaclass=ABCMeta):
    """
    Abstract Base Class for a Transformer Module.

    TODO: Add parameters
    """
    def __init__(self,
                 model_name_or_path: str,
                 id2label: Dict[int, str],
                 label2id: Dict[str, int],
                 learning_rate: float = 5e-5,
                 weight_decay: float = 0.0,
                 adam_epsilon: float = 1e-8,
                 warmup_steps: int = 0):
        super().__init__()
        self.save_hyperparameters()

        self.config = AutoConfig.from_pretrained(model_name_or_path,
                                                 num_labels=len(id2label),
                                                 id2label=id2label,
                                                 label2id=label2id)

    def configure_optimizers(self):
        """
        Method to prepare optimizer and scheduler (linear warmup and decay).
        """
        no_decay = ["bias", "LayerNorm.weight"]
        optimizer_grouped_parameters = [
            {
                "params": [p for n, p in self.model.named_parameters()
                           if not any(nd in n for nd in no_decay)],
                "weight_decay": self.hparams.weight_decay
            },
            {
                "params": [p for n, p in self.model.named_parameters()
                           if any(nd in n for nd in no_decay)],
                "weight_decay": 0.0
            }
        ]
        optimizer = AdamW(optimizer_grouped_parameters,
                          lr=self.hparams.learning_rate,
                          eps=self.hparams.adam_epsilon)

        scheduler = get_linear_schedule_with_warmup(
            optimizer,
            num_warmup_steps=self.hparams.warmup_steps,
            num_training_steps=self.trainer.estimated_stepping_batches
        )
        scheduler = {"scheduler": scheduler, "interval": "step", "frequency": 1}

        return [optimizer], [scheduler]

Now for the real Sentence Classification Transformer Module.

In [None]:
class SentenceClassificationTransformerModule(BaseTransformerModule):
    """
    TODO: Add docstring
    """
    def __init__(self,
                 model_name_or_path: str,
                 id2label: Dict[int, str],
                 label2id: Dict[str, int],
                 learning_rate: float = 5e-5,
                 weight_decay: float = 0.0,
                 adam_epsilon: float = 1e-8,
                 warmup_steps: int = 0):
        super().__init__(model_name_or_path, id2label, label2id,
                         learning_rate, weight_decay, adam_epsilon,
                         warmup_steps)

        self.model = AutoModelForSequenceClassification.from_pretrained(
            model_name_or_path,
            config=self.config
        )

    def forward(self, **inputs):
        return self.model(**inputs)

    def training_step(self, batch, batch_idx):
        outputs = self(**batch)
        return outputs.loss

Finally, with the Sentence Classification Module and the dataloader we can train a model using Lightning.

In [None]:
model = SentenceClassificationTransformerModule(model_name_or_path=MODEL,
                                                id2label={id: lbl for lbl, id in dataset.labels.items()},
                                                label2id=dataset.labels)
trainer = pl.Trainer(
    max_epochs=1,
    accelerator="auto",
    devices=1
)
trainer.fit(model, loader)

# Sequence Tagging

Next are the prototypes for creating a Sequence Tagging module, both dataset and Lightning Module as well

## Dataset for Sequence Tagging

In [None]:
import logging

from itertools import chain
from operator import itemgetter
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, DataCollatorForTokenClassification, AutoConfig
from typing import Dict, List, Optional, Tuple

In [None]:
logger = logging.getLogger(__name__)

In [None]:
DATASET = '../data/casimedicos/dev_revisited.conll'
MODEL = 'distilbert-base-uncased'

In [None]:
class SequenceTaggingDataset(Dataset):
    """
    TODO: Docstring. Explain the way the dataset is expected (label first with the __label__ at the beginning)
    """
    def __init__(self,
                 tokenizer_model_or_path: str,
                 path_to_dataset: Optional[str] = None,
                 dataset: Optional[List[Tuple[str, str, str]]] = None,
                 labels: Optional[Dict[str, int]] = None,
                 delimiter: str = '\t',
                 token_position: int = 1,
                 label_position: int = 4,
                 use_extension_label: bool = True,
                 copy_label_to_subtoken: bool = True):
        if path_to_dataset is not None and dataset is not None:
            logger.warn("Both path and dataset were provided. Ignoring the path, using the parsed dataset.")
            path_to_dataset = None
        elif path_to_dataset is None and dataset is None:
            raise ValueError("Provide either path to a file or a dataset as a list of tuples")

        if path_to_dataset is not None:
            sentences = self._load_conll_sentences(path_to_dataset, delimiter, token_position, label_position)

        # TODO: What happens if not all the labels are present? This might be better to do outside here
        if labels is None:
            self.labels = {lbl: idx for idx, lbl in enumerate(sorted(set(chain(*map(itemgetter('labels'), sentences)))))}
        else:
            self.labels = labels

        self.use_extension_label = use_extension_label
        self.copy_label_to_subtoken = copy_label_to_subtoken
        if use_extension_label:
            if 'X' not in self.labels:
                # Add the extended label
                self.labels['X'] = len(self.labels)
            if 'PAD' not in self.labels:
                # Add the pad label
                self.labels['PAD'] = len(self.labels)
        elif 'PAD' not in self.labels:
            self.labels['PAD'] = -100

        self.dataset = [
            {
                "tokens": sentence['tokens'],
                "labels": [self.labels[l] for l in sentence['labels']]
            } for sentence in sentences
        ]
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_model_or_path, use_fast=True)

    def _load_conll_sentences(self, path_to_dataset, delimiter='\t', token_position=1, label_position=4):
        with open(path_to_dataset, 'rt') as fh:
            sentences = []
            sentence_tokens = []
            sentence_labels = []
            for line in fh:
                line = line.strip().split(delimiter)

                if len(line) < 2:
                    # We have the end of a sentence
                    assert len(sentence_tokens) == len(sentence_labels)
                    if len(sentence_tokens) == 0:
                        # Happens after a paragraph change (there are 2 blank lines)
                        continue

                    sentences.append({
                        "tokens": sentence_tokens,
                        "labels": sentence_labels
                    })
                    sentence_tokens = []
                    sentence_labels = []
                else:
                    sentence_tokens.append(line[token_position])
                    sentence_labels.append(line[label_position])
        return sentences

    def _tokenize_and_align_labels(self, sentence):
        tokenized_sentence = self.tokenizer(sentence["tokens"], truncation=True, is_split_into_words=True)

        sentence_labels = []
        word_ids = tokenized_sentence.word_ids()
        previous_wid = None
        for wid in word_ids:
            if wid is None:
                # For special tokens ([CLS], [SEP], <s>, etc) that don't have an assigned label
                # we use a special extension_label that, dependind on configuration can be a value 
                # for a extension label X or a special PAD label equal to -100 (in case of using 
                # regular transformers and not CRF which requires all labels to be accounted).
                extension_label = self.labels['X'] if self.use_extension_label else self.labels['PAD']
                sentence_labels.append(extension_label)
            elif wid != previous_wid:
                # If it is the first subtoken of a work, use its corresponding label
                sentence_labels.append(sentence["labels"][wid])
            else:
                # The other subtokens depend on configuration. For some cases it can be
                # to replicate the label of the first subtoken. If not, the value depends
                # on whether there is an extension label X or plainly padding with -100
                if self.copy_label_to_subtoken:
                    # WARNING: This duplicates the "B-" type labels, but using another
                    # configuration to avoid that is outside this scope since it needs
                    # comparable experimentation as to wether it is useful or not
                    sentence_labels.append(sentence["labels"][wid])
                elif self.use_extension_label:
                    sentence_labels.append(self.labels['X'])
                else:
                    sentence_labels.append(self.labels['PAD'])
            previous_wid = wid

        tokenized_sentence['labels'] = sentence_labels

        assert len(tokenized_sentence['input_ids']) == len(tokenized_sentence['labels'])

        return tokenized_sentence

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        return self._tokenize_and_align_labels(self.dataset[idx])

In [None]:
dataset = SequenceTaggingDataset(MODEL, DATASET)
loader = DataLoader(
    dataset=dataset,
    batch_size=32,
    shuffle=False,
    collate_fn=DataCollatorForTokenClassification(dataset.tokenizer, label_pad_token_id=dataset.labels['PAD'])
)

## Sequence Tagging with CRF Module

In [None]:
import lightning.pytorch as pl
import torch
import torch.nn as nn

from abc import ABCMeta
from torchcrf import CRF
from transformers import AdamW, AutoConfig, AutoModel, get_linear_schedule_with_warmup
from typing import Dict

First define a base class for the Module (it is the same as the previous one, just copying it to be able to start loading from here).

In [None]:
class BaseTransformerModule(pl.LightningModule, metaclass=ABCMeta):
    """
    Abstract Base Class for a Transformer Module.

    TODO: Add parameters
    """
    def __init__(self,
                 model_name_or_path: str,
                 id2label: Dict[int, str],
                 label2id: Dict[str, int],
                 learning_rate: float = 5e-5,
                 weight_decay: float = 0.0,
                 adam_epsilon: float = 1e-8,
                 warmup_steps: int = 0):
        super().__init__()
        self.save_hyperparameters()

        self.config = AutoConfig.from_pretrained(model_name_or_path,
                                                 num_labels=len(id2label),
                                                 id2label=id2label,
                                                 label2id=label2id)

    def configure_optimizers(self):
        """
        Method to prepare optimizer and scheduler (linear warmup and decay).
        """
        no_decay = ["bias", "LayerNorm.weight"]
        optimizer_grouped_parameters = [
            {
                "params": [p for n, p in self.model.named_parameters()
                           if not any(nd in n for nd in no_decay)],
                "weight_decay": self.hparams.weight_decay
            },
            {
                "params": [p for n, p in self.model.named_parameters()
                           if any(nd in n for nd in no_decay)],
                "weight_decay": 0.0
            }
        ]
        optimizer = AdamW(optimizer_grouped_parameters,
                          lr=self.hparams.learning_rate,
                          eps=self.hparams.adam_epsilon)

        scheduler = get_linear_schedule_with_warmup(
            optimizer,
            num_warmup_steps=self.hparams.warmup_steps,
            num_training_steps=self.trainer.estimated_stepping_batches
        )
        scheduler = {"scheduler": scheduler, "interval": "step", "frequency": 1}

        return [optimizer], [scheduler]

In [None]:
class SequenceTaggingTransformerModule(BaseTransformerModule):
    """
    TODO: Add docstring
    """
    def __init__(self,
                 model_name_or_path: str,
                 id2label: Dict[int, str],
                 label2id: Dict[str, int],
                 learning_rate: float = 5e-5,
                 weight_decay: float = 0.0,
                 adam_epsilon: float = 1e-8,
                 warmup_steps: int = 0,
                 masked_label: int = -100):
        super().__init__(model_name_or_path, id2label, label2id,
                         learning_rate, weight_decay, adam_epsilon,
                         warmup_steps)

        self.model = AutoModel.from_pretrained(
            model_name_or_path,
            config=self.config
        )
        self.rnn = nn.GRU(self.config.hidden_size, self.config.hidden_size, batch_first=True, bidirectional=True)
        self.crf = CRF(self.config.num_labels, batch_first=True)
        self.linear = nn.Linear(2 * self.config.hidden_size, self.config.num_labels)

    def forward(self, **inputs):
        outputs = self.model(**inputs)
        rnn_out, _ = self.rnn(outputs[0])
        emissions = self.linear(rnn_out)
        path = torch.LongTensor(self.crf.decode(emissions))

        return path, emissions

    def training_step(self, batch, batch_idx):
        labels = batch.pop('labels')
        path, emissions = self(**batch)
        mask = (labels != self.hparams.masked_label).to(torch.uint8)

        return -self.crf(emissions, labels, mask=mask)

In [None]:
model = SequenceTaggingTransformerModule(model_name_or_path=MODEL,
                                         id2label={id: lbl for lbl, id in dataset.labels.items()},
                                         label2id=dataset.labels,
                                         masked_label=dataset.labels['PAD'])
trainer = pl.Trainer(
    max_epochs=1,
    accelerator="auto",
    devices=1
)
trainer.fit(model, loader)