# Course 6: Advanced NLP Tasks

We'll continues pre-training a language model on conll03 before adding a CRF on top of it in order to perform NER.


## Data and Preprocessing

We'll need to download several data files:
* [The language model's training corpus](https://github.com/Madjakul/MiNER/blob/main/data/conll/conll_train_corpus.txt)
* [The language model's validation corpus](https://github.com/Madjakul/MiNER/blob/main/data/conll/conll_dev_corpus.txt)
* [The list of labels](https://github.com/Madjakul/MiNER/blob/main/data/conll/labels.txt)
* [The NER's training corpus](https://github.com/Madjakul/MiNER/blob/main/data/conll/gold/conll_train.conll)
* [The NER's validation corpus](https://github.com/Madjakul/MiNER/blob/main/data/conll/gold/conll_dev.conll)
* [The NER's test corpus](https://github.com/Madjakul/MiNER/blob/main/data/conll/gold/conll_test.conll)

In [None]:
!pip install datasets transformers[torch] accelerate seqeval

In [None]:
import torch

if torch.cuda.is_available():
    DEVICE = "cuda"
    torch.cuda.empty_cache()
else: DEVICE = "cpu"

DEVICE

In [None]:
from typing import Literal

from transformers import RobertaForMaskedLM


class RoBERTa():
    """Transformer model for short english sentences. Based on RoBERTa [1]_.

    Parameters
    ----------
    device: str,  {"cuda", "cpu"}
        The hardware that will perform the computations.

    Attributes
    ----------
    model: transformers.RobertaForMaskedLM
        Transformer model for masked language modeling.

    References
    ----------
    ..  [1] Yinhan Liu et al. 2019. Roberta: A robustly optimized Bert
        pretraining approach. (July 2019). Retrieved January 31, 2023 from
        https://arxiv.org/abs/1907.11692
    """

    def __init__(self, device: Literal["cuda", "cpu"]):
        self.model = RobertaForMaskedLM.from_pretrained(
            "roberta-base"
        ).to(device)    # type: ignore

Look at the `add_vocab` function, why do we proceed this way?

In [None]:
import logging
from typing import List

import torch
import datasets
from transformers import AutoTokenizer, DataCollatorForLanguageModeling


class TransformerDataset():
    """Custom dataset used to pretrain Transformers checkpoints from
    **HuggingFace**.

    Parameters
    ----------
    train_corpus: List[str]
        List of training texts.
    valid_corpus: List[str]
        List of validation texts.
    max_length: int
        Maximum sequence length.
    mlm_probability: float
        Proportion of words to mask from the training and validation corpus.

    Attributes
    ----------
    mlm_ds:
        Maps the tokenising function to the **HuggingFace**'s ``datasets``.
    max_length: int
        Maximum sequence length.
    train_corpus: list
        List of training sentences.
    valid_corpus: List[str]
        List of validation sentences.
    tokenizer: transformers.AutoTokenizer
        Object from ``AutoTokenizer``. The object depends on the language
        model used.
    data_collator: transformers.DataCollatorForLanguageModeling
        Data collator to mask a given proportion of word from the corpus before
        returning a tokenized and encoded version of it.
    """

    def __init__(
        self, train_corpus: List[str], valid_corpus: List[str],
        max_length: int, mlm_probability: float
    ):
        self.mlm_ds = None
        self.max_length = max_length
        self.train_corpus = train_corpus
        self.valid_corpus = valid_corpus
        print(f"Using roberta-base tokenizer")
        self.tokenizer = AutoTokenizer.from_pretrained(
            "roberta-base",
        )
        self._build_mlm_dataset()
        self.data_collator = DataCollatorForLanguageModeling(
            tokenizer=self.tokenizer,
            mlm_probability=mlm_probability,
            return_tensors="pt"
        )

    def _build_mlm_dataset(self):
        train_ds = {"text": self.train_corpus}
        valid_ds = {"text": self.valid_corpus}
        ds = datasets.DatasetDict({
            "train": datasets.Dataset.from_dict(train_ds),
            "valid": datasets.Dataset.from_dict(valid_ds)
        })
        self.mlm_ds = ds.map(self._tokenize, batched=True)
        self.mlm_ds.remove_columns(["text"])

    def _tokenize(self, batch):
        return self.tokenizer(
            batch["text"],
            truncation=True,
            max_length=self.max_length,
            padding="max_length",
            return_special_tokens_mask=True,
            return_tensors="pt"
        )

    def add_vocab( self, corpus: List[str], lm: RoBERTa):
        """Adds new tokens to a pretrained LLM. The embedding of the added
        tokens are initialized using the mean of the already existing tokens
        plus some noise in order to avoid diverging too much from the initial
        distributions, thus converging faster during pretraining [1]_.

        Parameters
        ----------
        corpus: ``list``
            List of tokens per document.
        lm: miner.modules.RoBERTa
            Pretrained large language model.

        References
        ----------
        ..  [1] Hewitt John. 2021. Initializing new word embeddings for
            pretrained language models. (2021). Retrieved April 24, 2023 from
            https://nlp.stanford.edu/~johnhew/vocab-expansion.html
        """
        new_tokens = [
            token for text in corpus for token in text.split()
        ]
        new_tokens = set(new_tokens) - set(self.tokenizer.vocab.keys()) # New tokens don't already exist
        print( f"Adding {len(new_tokens)} new tokens to the vocabulary")
        self.tokenizer.add_tokens(list(new_tokens))
        print("Resizing the Language model")
        lm.model.resize_token_embeddings(len(self.tokenizer))
        # Computing the distribution of the new embeddings
        params = lm.model.state_dict()
        # embeddings = params["transformer.wte.weight"]
        embeddings_key = "roberta.embeddings.word_embeddings.weight"
        embeddings = params[embeddings_key]
        pre_expansion_embeddings = embeddings[:-3, :]
        mu = torch.mean(pre_expansion_embeddings, dim=0)
        n = pre_expansion_embeddings.size()[0]
        sigma = (
            (pre_expansion_embeddings - mu).T @ (pre_expansion_embeddings - mu)
        ) / n
        dist = torch.distributions.multivariate_normal.MultivariateNormal(
            mu,
            covariance_matrix=1e-5*sigma
        )
        # Loading the new embeddings in the model
        new_embeddings = torch.stack(
            tuple((dist.sample() for _ in range(3))),
            dim=0
        )
        embeddings[-3:, :] = new_embeddings
        params[embeddings_key][-3:, :] = new_embeddings
        lm.model.load_state_dict(params)

What's the relation between batch size and the gradient accumulation?

In [None]:
from typing import Union
from transformers import TrainingArguments, Trainer


class TransformerTrainer():
    """Wrapper for the transformers ``Trainer`` class to perform domain-
    specific MLM [1]_ before adding the NER head [2]_.

    Parameters
    ----------
    lm: miner.modules.RoBERTa
        Language model checkpoint from **HuggingFace**.
    lm_path: str
        Path to the local file that will contained the trained language model.
    lm_dataset: miner.utils.data.TransformerDataset
        Iterable object containing the training and validation data.
    per_device_train_batch_size: int
        Training batch size.
    seed: int
        Integers used to initialized the weight of the LLM. Used for
        replicability.
    per_device_eval_batch_size: int
        Validation batch size.
    num_train_epochs: int
        Maximum number of training epochs.
    gradient_accumulation_steps: int
        For how manys steps the gradient is accumulated.

    Attributes
    ----------
    training_args: transformers.TrainingArguments
        Stores the hyperparameters to pretrain the large language model.
    trainer: transformers.Trainer
        Stores the datasets to perform MLM and feed the large language model
        with.

    References
    ----------
    ..  [1] Jacob Devlin, Ming-Wei Chang, Kenton Lee, and Kristina Toutanova.
        2019. Bert: Pre-training of deep bidirectional Transformers for
        language understanding. (May 2019). Retrieved January 31, 2023 from
        https://arxiv.org/abs/1810.04805v2
    ..  [2] Suchin Gururangan et al. 2020. Don't stop pretraining: Adapt
        language models to domains and tasks. (May 2020). Retrieved January 31,
        2023 from https://arxiv.org/abs/2004.10964
    """

    def __init__(
        self, lm: RoBERTa, lm_path: str,
        lm_dataset: TransformerDataset, per_device_train_batch_size: int,
        seed: int, per_device_eval_batch_size: int, max_steps: int,
        gradient_accumulation_steps: int, wandb: bool
    ):
        self.lm_path = lm_path
        self.training_args = TrainingArguments(
            output_dir=lm_path,
            overwrite_output_dir=True,
            do_train=True,
            do_eval=True,
            evaluation_strategy="epoch",
            eval_accumulation_steps=gradient_accumulation_steps,
            per_device_train_batch_size=per_device_train_batch_size,
            per_device_eval_batch_size=per_device_eval_batch_size,
            gradient_accumulation_steps=gradient_accumulation_steps,
            max_steps=max_steps,
            num_train_epochs=max_steps/ (
                len(lm_dataset.mlm_ds["train"]) / per_device_train_batch_size
            ),
            logging_strategy="epoch",
            save_strategy="epoch",
            seed=seed,
            data_seed=seed,
            log_level="error",
            report_to="wandb" if wandb else "none",
            load_best_model_at_end=True,
            metric_for_best_model="eval_loss",
            greater_is_better=False,
            save_total_limit=1
        )
        self.trainer = Trainer(
            model=lm.model,
            args=self.training_args,
            data_collator=lm_dataset.data_collator,
            train_dataset=lm_dataset.mlm_ds["train"],   # type: ignore
            eval_dataset=lm_dataset.mlm_ds["valid"]     # type: ignore
        )

    def train(self):
        """Performs MLM to further pretrain a large language model.
        """
        self.trainer.train()
        self.trainer.save_model(self.lm_path)

Complete the hyperparameter to pretrain the LM

In [None]:
if __name__=="__main__":
    train_corpus_path = "" # Complete
    val_corpus_path = "" # Complete
    max_length = 256 # Can be modified
    mlm_probability = 0.15 # Can be modified
    lm_path = "./lm" # Can be modified
    lm_train_batch_size =  # Complete
    max_steps =  1 # Can be modified
    lm_accumulation_steps =  # Complete

    print(f"Loading training data from {train_corpus_path}")
    with open(train_corpus_path, "r", encoding="utf-8") as f:
        train_corpus = f.read().splitlines()
    print(f"Loading validation data from {val_corpus_path}")
    with open(val_corpus_path, "r", encoding="utf-8") as f:
        val_corpus = f.read().splitlines()

    print("Using  RoBERTa checkpoint as language model")
    lm = RoBERTa(DEVICE)

    print("Building the dataset...")
    lm_dataset = TransformerDataset(
        train_corpus=train_corpus,
        valid_corpus=val_corpus,
        max_length=max_length,
        mlm_probability=mlm_probability,
    )

    lm_dataset.add_vocab(train_corpus, lm)

    print("*** Training ***")
    lm_trainer = TransformerTrainer(
        lm=lm,
        lm_path=lm_path,
        lm_dataset=lm_dataset,
        per_device_train_batch_size=lm_train_batch_size,
        seed=0,
        per_device_eval_batch_size=lm_train_batch_size,
        max_steps=max_steps,
        gradient_accumulation_steps=lm_accumulation_steps,
        wandb=False
    )
    lm_trainer.train()
    print("=== Done ===")

## CRF

In [None]:
UNLABELED_INDEX = -1
IMPOSSIBLE_SCORE = -100


def create_possible_tag_masks(num_tags: int, tags: torch.LongTensor):
    """Creates a mask-like sparse tensor where the index of the correct tag has
    a value of 1, allowing for multilabel targets.

    Parameters
    ----------
    num_tags: int
        Number of different tags in the dataset.
    tags: torch.LongTensor
        Target labels. (batch_size, sequence_length).

    Returns
    -------
    masks: torch.ByteTensor
        Mask-like sparse tensor indicating the target label.
        (batch_size, sequence_length, num_tags).
    """
    copy_tags = tags.clone()
    no_annotation_idx = (copy_tags == UNLABELED_INDEX)
    copy_tags[no_annotation_idx] = 0
    masks = torch.zeros(
        copy_tags.size(0),
        copy_tags.size(1),
        num_tags,
        dtype=torch.uint8,
        device=tags.device
    )
    masks.scatter_(2, copy_tags.unsqueeze(2), 1)
    masks[no_annotation_idx] = 1    # (batch_size, sequence_length, num_tags)
    return masks    # type: ignore

### The LogSumExp trick

In [None]:
def log_sum_exp(tensor: torch.Tensor, dim=-1, keepdim=False):
    """Compute log sum exp a numerically stable way for the forward algorithm.

    Parameters
    ----------
    tensor: torch.Tensor
        Input tensor.
    dim: int
        Output dimension. Default is -1 for automatique determination.
    keepdim: bool
        If the output dimension shall be the same as the input's.

    Returns
    -------
    max_score: torch.Tensor
        Rank 0 tensor containing the highest scores of the input.
    """
    max_score, _ = tensor.max(dim, keepdim=keepdim)
    if keepdim:
        stable_vec = tensor - max_score
    else:
        stable_vec = tensor - max_score.unsqueeze(dim)
    return max_score + (stable_vec.exp().sum(dim, keepdim=keepdim)).log()

\Explain the `__init__` function, what's the `transitions` variable?

In [None]:
from abc import abstractmethod
from typing import Optional, Literal

import torch
import torch.nn as nn


class BaseCRF(nn.Module):
    """Abstract method for the conditional random field (CRF) [1]_ .

    Parameters
    ----------
    num_tags: int
        Number of possible tags (counting the padding one if needed).
    padding_idx: int, optional
        Padding index.
    device: str, {"cpu", "cuda"}
        Wether to do computation on GPU or CPU.

    Attributes
    ----------
    num_tags: int
        Number of possible tags (counting the padding one if needed).
    start_transitions: torch.nn.Parameter
        Begining scores of the transition matrix. Initialized with values
        values sampled from a uniform distribution in [-1; 1]. (num_tags).
    device: str, {"cpu", "cuda"}
        Wether to do computation on GPU or CPU.
    end_transitions: torch.nn.Parameter
        Ending scores of the transition matrix. Initialized with values
        values sampled from a uniform distribution in [-1; 1]. (num_tags).
    transitions: torch.nn.Parameter
        Transition matrix. Initialized using xavier [2]_'s method. Values are
        sampled from a uniform distribution in [-1; 1]. (num_tags, num_tags).

    References
    ----------
    ..  [1] Lafferty, John, Andrew McCallum, and Fernando CN Pereira.
            "Conditional random fields: Probabilistic models for segmenting and
            labeling sequence data." (2001).
    ..  [2] Glorot, Xavier, and Yoshua Bengio. "Understanding the difficulty of
            training deep feedforward neural networks." Proceedings of the
            thirteenth international conference on artificial intelligence and
            statistics. JMLR Workshop and Conference Proceedings, 2010.
    """
    def __init__(
        self, num_tags: int, device: Literal["cpu", "cuda"],
        padding_idx: Optional[int]=None
    ):
        super(BaseCRF, self).__init__()
        self.device = device
        self.num_tags = num_tags
        self.start_transitions = nn.Parameter(
            nn.init.uniform_(
                torch.empty(num_tags, device=self.device), -1., 1.
            )
        )
        self.end_transitions = nn.Parameter(
            nn.init.uniform_(
                torch.randn(num_tags, device=self.device), -1., 1.
            )
        )
        init_transition = torch.empty(num_tags, num_tags, device=self.device)
        if padding_idx is not None:
            init_transition[:, padding_idx] = IMPOSSIBLE_SCORE
            init_transition[padding_idx, :] = IMPOSSIBLE_SCORE
        self.transitions = nn.Parameter(
            nn.init.xavier_uniform_(init_transition)
        )

    @abstractmethod
    def forward(
        self, emissions: torch.FloatTensor, tags: torch.LongTensor,
        mask: Optional[torch.ByteTensor]=None
    ):
        raise NotImplementedError()

    def _forward_algorithm(
        self, emissions: torch.FloatTensor, mask: torch.ByteTensor,
        reverse_direction: bool=False
    ):
        """Computes the logarithm of the unary/emission scores of each token
        plus their transition score. Despite its name, this function is used to
        compute the `forward-backward algorithm https://en.wikipedia.org/wiki/Forward%E2%80%93backward_algorithm`__.

        Parameters
        ----------
        emissions: torch.FloatTensor
            Unary/emission score of each tokens.
            (batch_size, sequence_length, num_tags).
        mask: torch.ByteTensor
            Masked used to to discard subwords, special tokens or padding from
            being added to the log-probability. (batch_size, sequence_length).
        reverse_direction: bool, default=False
            ``True`` if you want to use the backward algorithm. ``False`` if
            you want to use the forward algorithm.

        Returns
        -------
        torch.FloatTensor
            Log-scores for each token. (sequence_length, batch_size, num_tags).
        """
        batch_size, sequence_length, num_tags = emissions.data.shape
        broadcast_emissions = \
            emissions.transpose(0, 1).unsqueeze(2).contiguous()             # (sequence_length, batch_size, 1, num_tags)
        mask = mask.float().transpose(0, 1).contiguous()                    # (sequence_length, batch_size) # type: ignore
        broadcast_transitions = self.transitions.unsqueeze(0)               # (1, num_tags, num_tags)
        sequence_iter = range(1, sequence_length)
        # backward algorithm
        if reverse_direction:
            # Transpose transitions matrix and emissions
            broadcast_transitions = broadcast_transitions.transpose(1, 2)   # (1, num_tags, num_tags)
            broadcast_emissions = broadcast_emissions.transpose(2, 3)       # (sequence_length, batch_size, num_tags, 1)
            sequence_iter = reversed(sequence_iter)
            # It is beta
            log_proba = [self.end_transitions.expand(batch_size, num_tags)] # [(batch_size, num_tags)]
        # forward algorithm
        else:
            # It is alpha
            log_proba = [                                                   # [(batch_size, num_tags)]
                emissions.transpose(0, 1).contiguous()[0]
                + self.start_transitions.unsqueeze(0)
            ]
        for i in sequence_iter:
            # Broadcast log probability
            broadcast_log_proba = log_proba[-1].unsqueeze(2)                # (batch_size, num_tags, 1)
            # Add all scores
            # inner: (batch_size, num_tags, num_tags)
            # broadcast_log_proba:   (batch_size, num_tags, 1)
            # broadcast_transitions: (1, num_tags, num_tags)
            # broadcast_emissions:   (batch_size, 1, num_tags)
            inner = (
                broadcast_log_proba
                + broadcast_transitions
                + broadcast_emissions[i]
            )
            # Append log proba
            log_proba.append(
                torch.logsumexp(inner, dim=1) * mask[i].unsqueeze(1)
                + log_proba[-1] * (1 - mask[i]).unsqueeze(1)
            )
        if reverse_direction:
            log_proba.reverse()
        return torch.stack(log_proba)                                       # type: ignore

    def marginal_probabilities(
        self, emissions: torch.FloatTensor,
        mask: Optional[torch.ByteTensor]=None
    ):
        """Computes the marginal probability of each token to belong to a given
        class.

        Parameters
        ----------
        emissions: torch.FloatTensor
            Unary/emission score of each tokens.
            (batch_size, sequence_length, num_tags).
        mask: torch.ByteTensor, optional
            Masked used to to discard subwords, special tokens or padding from
            being added to the log-probability. (batch_size, sequence_length).

        Returns
        -------
        torch.FloatTensor
            Marginal probability of each token to belong to a given class.
            (sequence_length, sequence_length, num_tags).
        """
        if mask is None:
            batch_size, sequence_length, _ = emissions.data.shape
            mask = torch.ones(                  # type: ignore
                [batch_size, sequence_length],
                dtype=torch.uint8,
                device=self.device
            )
        alpha = self._forward_algorithm(        # (sequence_length, batch_size, num_tags)
            emissions,
            mask,                               # type: ignore
            reverse_direction=False
        )
        beta = self._forward_algorithm(         # (sequence_length, batch_size, num_tags)
            emissions,
            mask,                               # type: ignore
            reverse_direction=True
        )
        z = torch.logsumexp(                    # (batch_size)
            alpha[alpha.size(0) - 1] + self.end_transitions,
            dim=1
        )
        proba = alpha + beta - z.view(1, -1, 1) # (sequence_length, batch_size, num_tags)
        return torch.exp(proba)                 # (sequence_length, batch_size, num_tags) # type: ignore

    def viterbi_decode(
        self, emissions: torch.Tensor, mask: Optional[torch.ByteTensor]=None
    ):
        """
        Dynamically computes the best sequence of tags.

        Parameters
        ----------
        emissions: torch.FloatTensor
            Unary/emission score of each tokens.
            (batch_size, sequence_length, num_tags).
        mask: torch.ByteTensor, optional
            Masked used to to discard subwords, special tokens or padding from
            being added to the log-probability. (batch_size, sequence_length).

        Returns
        -------
        best_tags_list: List[int]
            Best sequence of tag for each sequence in the batch.
            (batch_size, ``torch.where(mask.shape[i]==1)``).
        """
        batch_size, sequence_length, _ = emissions.shape
        if mask is None:
            mask = torch.ones(                                                  # type: ignore
                [batch_size, sequence_length],
                dtype=torch.uint8,
                device=self.device
            )
        emissions = emissions.transpose(0, 1).contiguous()
        mask = mask.transpose(0, 1).contiguous()                                # type: ignore
        # Start transition and first emission score
        score = self.start_transitions + emissions[0]
        history = []
        for i in range(1, sequence_length):
            broadcast_score = score.unsqueeze(2)
            broadcast_emissions = emissions[i].unsqueeze(1)
            next_score = \
                broadcast_score + self.transitions + broadcast_emissions
            next_score, indices = next_score.max(dim=1)
            score = torch.where(mask[i].unsqueeze(1) == 1, next_score, score)   # type: ignore
            history.append(indices)
        # Add end transition score
        score += self.end_transitions
        # Compute the best path
        seq_ends = mask.sum(dim=0) - 1                                          # type: ignore
        best_tags_list = []
        for i in range(batch_size):
            _, best_last_tag = score[i].max(dim=0)
            best_tags = [best_last_tag.item()]
            for hist in reversed(history[:seq_ends[i]]):
                best_last_tag = hist[i][best_tags[-1]]
                best_tags.append(best_last_tag.item())
            best_tags.reverse()
            best_tags_list.append(best_tags)
        return best_tags_list

This CRF layer is a bit modified as it is fuzzy but the logic remains the same.

In [None]:
from typing import Optional, Literal


class PartialCRF(BaseCRF):
    """Partial/Fuzzy Conditional random field.
    """

    def __init__(
        self, num_tags: int, device: Literal["cpu", "cuda"],
        padding_idx: Optional[int]=None
    ):
        super().__init__(num_tags, device, padding_idx)

    def _numerator_score(
        self, emissions: torch.FloatTensor, mask: torch.ByteTensor,
        possible_tags: torch.ByteTensor,
    ):
        """
        Computes the log of the emission/unary score plus the transition score
        for the whole sequence.

        Parameters
        ----------
        emissions: torch.FloatTensor
            Unary/emission score of each tokens.
            (batch_size, sequence_length, num_tags).
        mask: torch.ByteTensor
            Masked used to to discard subwords, special tokens or padding from
            being added to the log-probability. (batch_size, sequence_length).
        possible_tags: torch.ByteTensor
            Mask corresponding to the target label(s).
            (batch_size, sequence_length, num_tags).

        Returns
        -------
        torch.FloatTensor
            Log probability of the emission/unary score plus the transition
            score for the whole sequence. (batch_size,)
        """
        batch_size, sequence_length, num_tags = emissions.data.shape
        emissions = emissions.transpose(0, 1).contiguous()                  # type: ignore
        mask = mask.float().transpose(0, 1).contiguous()                    # type: ignore
        possible_tags = possible_tags.float().transpose(0, 1)               # type: ignore

        # Start transition score and first emission
        first_possible_tag = possible_tags[0]
        alpha = self.start_transitions + emissions[0]                       # (batch_size, num_tags)
        alpha[(first_possible_tag == 0)] = IMPOSSIBLE_SCORE

        for i in range(1, sequence_length):
            current_possible_tags = possible_tags[i-1]                      # (batch_size, num_tags)
            next_possible_tags = possible_tags[i]                           # (batch_size, num_tags)

            # Emissions scores
            emissions_score = emissions[i]
            emissions_score[(next_possible_tags == 0)] = IMPOSSIBLE_SCORE
            emissions_score = emissions_score.view(batch_size, 1, num_tags)

            # Transition scores
            transition_scores = self.transitions.unsqueeze(0).expand(
                batch_size, num_tags, num_tags
            ).clone()
            transition_scores[(current_possible_tags == 0)] = IMPOSSIBLE_SCORE
            transition_scores.transpose(1, 2)[(next_possible_tags == 0)] = \
                IMPOSSIBLE_SCORE

            # Broadcast alpha
            broadcast_alpha = alpha.unsqueeze(2)

            # Add all scores
            inner = broadcast_alpha + emissions_score + transition_scores   # (batch_size, num_tags, num_tags)
            alpha = (
                torch.logsumexp(inner, 1) * mask[i].unsqueeze(1)
                + alpha * (1 - mask[i]).unsqueeze(1)
            )

        # Add end transition score
        last_tag_indexes = mask.sum(0).long() - 1
        end_transitions = (
            self.end_transitions.expand(batch_size, num_tags)
            * possible_tags.transpose(0, 1).view(
                sequence_length * batch_size, num_tags
            )[
                last_tag_indexes
                + torch.arange(batch_size, device=possible_tags.device)
                * sequence_length
            ]
        )
        end_transitions[(end_transitions == 0)] = IMPOSSIBLE_SCORE
        stops = alpha + end_transitions                                     # (batch_size, num_tags)
        return torch.logsumexp(stops, 1)                                    # (batch_size,) # type: ignore

    def _denominator_score(
        self, emissions: torch.FloatTensor, mask: torch.ByteTensor,
    ):
        """
        Computes the log-partition score for the whole sequence.

        Parameters
        ----------
        emissions: torch.FloatTensor
            Unary/emission score of each tokens.
            (batch_size, sequence_length, num_tags).
        mask: torch.ByteTensor
            Masked used to to discard subwords, special tokens or padding from
            being added to the log-probability. (batch_size, sequence_length).

        Returns
        -------
        torch.FloatTensor
            Log-partition score. (batch_size,)
        """
        _, sequence_length, num_tags = emissions.data.shape
        emissions = emissions.transpose(0, 1).contiguous()                  # type: ignore
        mask = mask.float().transpose(0, 1).contiguous()                    # type: ignore
        # Start transition score and first emissions score
        alpha = self.start_transitions.view(1, num_tags) + emissions[0]

        for i in range(1, sequence_length):
            emissions_score = emissions[i].unsqueeze(1)                     # (batch_size, 1, num_tags)
            transition_scores = self.transitions.unsqueeze(0)               # (1, num_tags, num_tags)
            broadcast_alpha = alpha.unsqueeze(2)                            # (batch_size, num_tags, 1)
            inner = broadcast_alpha + emissions_score + transition_scores   # (batch_size, num_tags, num_tags)
            alpha = (
                torch.logsumexp(inner, 1) * mask[i].unsqueeze(1)
                + alpha * (1 - mask[i]).unsqueeze(1)
            )

        # Add end transition score
        stops = alpha + self.end_transitions.unsqueeze(0)
        return torch.logsumexp(stops, 1)                                    # (batch_size,) # type: ignore

    def forward(
        self, emissions: torch.FloatTensor, tags: torch.LongTensor,
        mask: Optional[torch.ByteTensor]=None
    ):
        """Performs the forward pass.

        Parameters
        ----------
        emissions: torch.FloatTensor
            Unary/emission score of each tokens.
            (batch_size, sequence_length, num_tags).
        tags: torch.LongTensor
            Tensor containing the target labels. (batch_size, sequence_length).
        mask: torch.ByteTensor, optional
            Masked used to to discard subwords, special tokens or padding from
            being added to the log-probability. (batch_size, sequence_length).

        Returns
        -------
        torch.FloatTensor
            Mean of the losses over the mini-batch. (0,)
        """
        possible_tags = create_possible_tag_masks(self.num_tags, tags)      # (batch_size, sequence_length, num_tags)
        gold_score = self._numerator_score(emissions, mask, possible_tags)  # (batch_size,) # type: ignore
        forward_score = self._denominator_score(emissions, mask)            # (batch_size,) # type: ignore
        nll = forward_score - gold_score                                    # (batch_size,)
        return torch.mean(nll)                                              # Mean instead of sum # type: ignore

In [None]:
batch_size = 2
sequence_length = 9
num_tags = 5
emissions = torch.randn(batch_size, sequence_length, num_tags)
emissions

In [None]:
tags = torch.randint(0, 5, (batch_size, sequence_length))
tags

In [None]:
mask = torch.bernoulli(torch.empty(batch_size, sequence_length).uniform_(0, 1)).byte()
mask

Compute the best sequence of tags for the emissions scores above.

In [None]:
crf = PartialCRF(num_tags, device="cpu")
# Complete

Explain the output, what is each dimension and what mean the numbers inside?

In [None]:
crf.marginal_probabilities(emissions, mask)

Compute the loss of the emission scores obtained above.

In [None]:
# Complete

## NER Model

First, we need to build a class to feed the NER model with the data during training.

In [None]:
import math
import random
from typing import Literal, List

import torch
from torch.utils.data import Dataset
import transformers
from transformers import AutoTokenizer, RobertaForMaskedLM


class PartialNERDataset(Dataset):
    """Custom Dataset used to train the partial NER.

    Parameters
    ----------
    device: str, {"cpu", "cuda"}
        Deveice where the computations are performed.
    max_length: int
        Maximum sequence length.
    iterable_corpus: List[List[str]]
        Corpus containing lists of segmented texts.
    labels: List[str]
        List of possible labels.
    iterable_labels: List[List[str]]
        Corpus containing lists of labels mapped to the text at the same index
        in ``iterable_corpus``.
    lm_path: str
        Path to a **transformers** pre-trained language model.

    Attributes
    ----------
    device: str, {"cpu", "cuda"}
        Deveice where the computations are performed.
    max_length: int
        Maximum sequence length.
    iterable_corpus: List[List[str]]
        Corpus containing lists of segmented texts.
    iterable_labels: List[List[str]]
        Corpus containing lists of labels mapped to the text at the same index
        in ``iterable_corpus``.
    label2idx: Dict[str, int]
        Maps the string label to a unique integer id. Also adds a mapping for
        the "UNK" label to -1.
    tokenizer: AutoTokenizer
        "roberta-base" tokenizer from **transformers**.
    lm: RobertaForMaskedLM
        Pre-trained RoBERTa used to perform language augmentation.
    """

    def __init__(
        self, device: Literal["cpu", "cuda"],
        max_length: int, iterable_corpus: List[List[str]], labels: List[str],
        iterable_labels: List[List[str]], lm_path: str
    ):
        self.iterable_corpus = iterable_corpus
        self.iterable_labels = iterable_labels
        self.label2idx = {label: idx for idx, label in enumerate(labels)}
        self.label2idx["B-UNK"] = -1
        self.label2idx["I-UNK"] = -1
        self.tokenizer = AutoTokenizer.from_pretrained(
            "roberta-base", add_prefix_space=True
        )
        self.lm = RobertaForMaskedLM.from_pretrained(lm_path).to(device)
        self.max_length = max_length
        self.device = device

    def __getitem__(self, idx):
        x = self.tokenize(idx)
        y = torch.tensor(
            self.align_labels(x, self.iterable_labels[idx]),
            dtype=torch.int64,
            device=self.device
        )
        return x, y
    def __len__(self):
        return len(self.iterable_corpus)

    def tokenize(self, idx: int):
        """Tokenizes a segmented text from ``self.iterable_corpus`` at a given
        index.

        Parameters
        ----------
        idx: int
            Index of the segmented text to tokenize.

        Returns
        -------
        inputs: transformers.BatchEncoding
            Tokenized text.
        """
        inputs = self.tokenizer(
            self.iterable_corpus[idx],
            is_split_into_words=True,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt",
        )
        return inputs.to(self.device)

    def align_labels(
        self, inputs: transformers.BatchEncoding, labels: List[str]
    ):
        """Align the sub-words with labels. All the sub-words are given the
        the label of the original word. The padding token are given an "O"
        label.

        Parameters
        ----------
        inputs: transformers.BatchEncoding
            Tokenized text.
        labels: List[str]
            Word-level labels of the original text.

        Returns
        -------
        label_ids: List[int]
            Token-level labels of the tokenized text.
        """
        word_ids = inputs.word_ids()    # type: ignore
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(self.label2idx["O"])
            else:
                label_ids.append(self.label2idx[labels[word_idx]])
        return label_ids


We will plug the CRF layer on top of the language model in order to perform NER.

In [None]:
import logging
from typing import Optional, Literal

import torch
import torch.nn as nn
import transformers
from transformers import RobertaModel


class PartialNER(nn.Module):
    """Partial Named Entity Recognizer (NER) Model.

    This class defines a Partial NER model for named entity recognition tasks.
    It extends the PyTorch ``nn.Module`` class and integrates with the
    **Hugging Face** ``transformers`` library for handling pre-trained language
    models.

    Parameters
    ----------
    lm_path: str
        The path or identifier of a pre-trained language model checkpoint.
    num_labels: int
        The number of unique labels or tags for NER.
    device: str, {"cpu", "cuda"}
        The device on which the model will be instantiated ("cpu" or "cuda").
    dropout: float
        The dropout probability to apply to the model's hidden states.
    padding_idx : Optional[int], optional
        The padding index for the input sequences. If None, the default index
        is used.

    Attributes
    ----------
    device: str
        The device on which the model is instantiated.
    transformer: transformers.RobertaModel
        The pre-trained transformer model used for feature extraction.
    linear_dropout: nn.Dropout
        The dropout layer applied to the model's linear layer.
    fc: nn.Linear
        The linear layer mapping features to label scores.
    crf: PartialCRF
        The partial conditional random field layer for structured prediction.
    """

    def __init__(
        self, lm_path: str, num_labels: int, device: Literal["cpu", "cuda"],
        dropout: float, padding_idx: Optional[int]=None
    ):
        super(PartialNER, self).__init__()
        self.device = device
        logging.info(f"Loading LM checkpoint from {lm_path}")
        self.transformer = RobertaModel.from_pretrained(lm_path)
        self.linear_dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(768, num_labels)    # (batch_size, max_length, num_labels) # Complete with the correct input dimensioon
        self.crf = PartialCRF(
            num_tags=num_labels,
            device=device,
            padding_idx=padding_idx
        )

    def forward(
        self, inputs: transformers.BatchEncoding,
        # inputs_augmented: transformers.BatchEncoding,
        outputs: torch.LongTensor,
    ):
        """Performs the forward pass.

        Parameters
        ----------
        inputs: transformer.BatchEncoding
            Original sentence, tokenized with ``transformers``.
        inputs_augmented: torch.BatchEncoding
            Language augmented input, tokenized with ``transformers``.
        outputs: torch.LongTensor
            List of true labels.

        Returns
        -------
        torch.FloatTensor
            Sum over the loss of the original input and the augmented input.
        """
        h = self.transformer(**inputs).last_hidden_state
        logits = self.fc(self.linear_dropout(h))# Complete: get the logits from the language model
        loss = self.crf(
            emissions=logits,
            tags=outputs,
            mask=inputs["attention_mask"],
        )
        return loss

    @torch.inference_mode()
    def viterbi_decode(self, inputs: transformers.BatchEncoding):
        """Computes the mostly likely label sequence.

        Parameters
        ----------
        inputs: transformers.BatchEncoding
            Input sentence tokenized with ``transformers``.

        Returns
        -------
        outputs: List[List[int]]
            Most likely tag sequence of each input in the batch.
        """
        h = self.transformer(**inputs).last_hidden_state
        logits = self.fc(self.linear_dropout(h))
        outputs = self.crf.viterbi_decode(
            logits,
            mask=inputs["attention_mask"]
        )
        return outputs

Use the `PartialNERDataset` class in order to pass training examples through the NER model. No need to train, you just need to output the loss at each pass in order to test your pipeline.

In [None]:
import re


def read_conll(path: str):
    """Reads a `conll` file and returns a tuple containing the list of tokens
    per doc and tags epr doc.

    Parameters
    ----------
    path: str
        Path to the conll file.

    Returns
    -------
    token_docs: List[List[str]]
        List of tokens per document.
    tag_docs: List[List[str]]
        List of labels per document.
    """
    with open(path, "r", encoding="utf-8") as f:
        raw_text = f.read().strip()

    raw_docs = re.split(r"\n\t?\n", raw_text)
    token_docs = []
    tag_docs = []
    for doc in raw_docs:
        tokens = []
        tags = []
        for line in doc.split("\n"):
            token, tag = line.split("\t")
            tokens.append(token)
            tags.append(tag)
        token_docs.append(tokens)
        tag_docs.append(tags)
    return token_docs, tag_docs

In [None]:
from torch.utils.data import DataLoader

iterable_corpus, iterable_labels = read_conll("./conll_train.conll")

with open("./labels.txt", "r", encoding="utf-8") as f:
  labels = f.read().splitlines()

dataset = PartialNERDataset(
    device="cpu",
    max_length=256,
    iterable_corpus=iterable_corpus,
    labels=labels,
    iterable_labels=iterable_labels,
    lm_path="roberta-base"
)

tmp_dataloader = DataLoader(
    dataset,
    batch_size=2,
    shuffle=True
)

ner = PartialNER(
    lm_path="roberta-base",
    num_labels=len(labels),
    device="cpu",
    dropout=0.1
)

for x, y in tmp_dataloader:
  print(x)
  print(ner(x, y))

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


{'input_ids': tensor([[[    0,   772,   439,    66,   321,     4,  2546,   715,   795,    23,
           8301,     4,  2546,   479,     2,     1,     1,     1,     1,     1,
              1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
              1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
              1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
              1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
              1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
              1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
              1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
              1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
              1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
              1,     1,     1,     1,     1,     1,     1,     1,     1,     1,
              1,     1,   

ValueError: too many values to unpack (expected 2)