# Utility Functions

This notebook contains utility functions which I found useful for my research. Here's a quick summary:

### ML
1. Accuracy in Pytorch
2. Precision, Recall, F1 in Pytorch
3. Setting seeds

### NLP
1. Gradient Clipping
2. BERT Word Embeddings

In [1]:
import torch
import torch.nn as nn

## ML

### Accuracy in PyTorch
Used pytorch instead of sklearn to avoid converting tensor from gpu to cpu.

In [2]:
def get_accuracy(y_true, y_prob):
    y_pred = y_prob.argmax(1)
    correct = (y_pred == y_true).type(torch.float).sum().item()
    return correct / len(y_true)

In [3]:
from sklearn.metrics import accuracy_score

y_prob = torch.randn(3, 2)
y_prob = nn.Softmax(dim=-1)(y_prob)
y_true=torch.empty(3, dtype=torch.long).random_(2)
my_acc = get_accuracy(y_true, y_prob)

y_pred = y_prob.argmax(1)
exp_acc = accuracy_score(y_true, y_pred)
assert my_acc == exp_acc

### Precision, Recall, F1 in PyTorch
Used pytorch instead of sklearn to avoid converting tensor from gpu to cpu. For my experiments, this led to an ~eyeballed~ estimated average speedup for 1s/it.

Full credits to [this stackoverflow post](https://stackoverflow.com/questions/62265351/measuring-f1-score-for-multiclass-classification-natively-in-pytorch). I've simply exposed precision and recall in addition to f1.

In [5]:
from typing import Tuple

class PRFScore:
    """
    Class for precision, recall, f1 scores in Pytorch.
    """

    def __init__(self, average: str = 'macro', pos_label: int = 1):
        """
        Init.

        Args:
            average: averaging method
        """
        self.average = average
        self.pos_label = pos_label
        if average not in [None, 'micro', 'macro', 'weighted', 'binary']:
            raise ValueError('Wrong value of average parameter')

    @staticmethod
    def calc_f1_micro(predictions: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
        """
        Calculate f1 micro.

        Args:
            predictions: tensor with predictions
            labels: tensor with original labels

        Returns:
            f1 score
        """
        true_positive = torch.eq(labels, predictions).sum().float()
        f1_score = torch.div(true_positive, len(labels))  # micro f1 = micro precision = micro recall = avg
        return f1_score, f1_score, f1_score

    @staticmethod
    def calc_prf_count_for_label(labels: torch.Tensor, predictions: torch.Tensor, label_id: int) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        Calculate precision, recall, f1 and true count for the label

        Args:
            labels: tensor with original labels
            predictions: tensor with predictions
            label_id: id of current label

        Returns:
            f1 score and true count for label
        """
        # label count
        true_count = torch.eq(labels, label_id).sum()

        # true positives: labels equal to prediction and to label_id
        true_positive = torch.logical_and(torch.eq(labels, predictions),
                                          torch.eq(labels, label_id)).sum().float()
        # precision for label
        precision = torch.div(true_positive, torch.eq(predictions, label_id).sum().float())
        # replace nan values with 0
        precision = torch.where(torch.isnan(precision),
                                torch.zeros_like(precision).type_as(true_positive),
                                precision)

        # recall for label
        recall = torch.div(true_positive, true_count)
        # f1
        f1 = 2 * precision * recall / (precision + recall)
        # replace nan values with 0
        f1 = torch.where(torch.isnan(f1), torch.zeros_like(f1).type_as(true_positive), f1)
        return precision, recall, f1, true_count

    def __call__(self, labels: torch.Tensor, predictions: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        """
        Calculate f1 score based on averaging method defined in init.

        Args:
            predictions: tensor with predictions
            labels: tensor with original labels

        Returns:
            f1 score
        """
        assert labels.dim() == 1, "Flatten labels first!"
        assert predictions.dim() == 1, "Flatten predictions first!"

        # simpler calculation for micro
        if self.average == 'micro':
            return self.calc_f1_micro(labels, predictions)
        if self.average == 'binary':
            p, r, f1, _ = self.calc_prf_count_for_label(labels, predictions, self.pos_label)
            return p, r, f1

        scores = torch.zeros(3)
        for label_id in range(0, len(labels.unique())):
            p, r, f1, true_count = self.calc_prf_count_for_label(labels, predictions, label_id)

            if self.average == 'weighted':
                scores += torch.tensor([p, r, f1]) * true_count
            elif self.average == 'macro':
                scores += torch.tensor([p, r, f1])

        if self.average == 'weighted':
            scores = scores / len(labels)
        elif self.average == 'macro':
            scores = scores / len(labels.unique())

        return scores[0], scores[1], scores[2]

In [9]:
from sklearn.metrics import precision_recall_fscore_support
import numpy as np
for _ in range(10):
    labels = torch.randint(0, 10, (4096, 100)).flatten()
    predictions = torch.randint(0, 10, (4096, 100)).flatten()

    for av in ['macro', 'weighted', 'micro']:
        my_p, my_r, my_f1 = PRFScore(av)(labels, predictions)

        p, r, f1, _ = precision_recall_fscore_support(labels, predictions, average=av)
        e_f1 = f1_score(labels, predictions, average=av)
        assert np.isclose(my_p.item(), p)
        assert np.isclose(my_r.item(), r)
        assert np.isclose(my_f1.item(), f1)
        assert np.isclose(my_f1.item(), e_f1)

    labels = torch.randint(0, 2, (4096, 100)).flatten()
    predictions = torch.randint(0, 2, (4096, 100)).flatten()
    prf_metric = PRFScore("binary")
    my_p, my_r, my_f1 = prf_metric(labels, predictions)

    p, r, f1, _ = precision_recall_fscore_support(labels, predictions, average="binary")
    assert np.isclose(my_p.item(), p)
    assert np.isclose(my_r.item(), r)
    assert np.isclose(my_f1.item(), f1)
print("tests passed!")

### Setting Seed

In [12]:
import random

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)

In [13]:
set_seed(100)

### Reduce tensorboard training repeats with [`tb-reducer`](https://github.com/janosh/tensorboard-reducer)


```
#!/usr/bin/env bash
set -e
ipd=out/movies_320/fr/supervised_cat
opd=$ipd

# rm -rf $opd
tb-reducer -i "$ipd/*" -o $opd/ -r mean --lax-steps -f --lax-tags
```

## NLP

### Gradient Clipping
Useful for RNNs which are known to suffer from exploding gradients.

In [None]:
torch.nn.utils.clip_grad_norm_(model.parameters(), 1)

In [None]:
loss.backward()
normalize_gradient(model)
optimizer.step()

### BERT Word Embeddings
Extended [this tutorial](https://mccormickml.com/2019/05/14/BERT-word-embeddings-tutorial/) that extracts wordpiece embeddings to extract word embeddings.

In [19]:
from typing import Dict, Tuple
from torch import Tensor
import nltk

def get_token_embeddings(string, tokenizer, embedding_model, merge_strategy = "average"):
    """ Retrieves token embeddings by accumulating wordpiece embeddings based on merge strategy.
    Identify wordpiece to tokens by checking if their character span is in subset of the original token char span. """
    def _merge_embeddings(wp_e, stack):
        if merge_strategy == "average": 
            t_e = torch.mean(wp_e[stack], 0)
        elif merge_strategy == "first":
            t_e = wp_e[stack[0]]
        else:
            raise NotImplementedError
        return t_e


    inputs = tokenizer(string, truncation=True, return_tensors="pt", add_special_tokens = False)
    wp_e = get_wordpiece_embeddings(inputs, embedding_model)

    ws_tokenizer = nltk.tokenize.WhitespaceTokenizer()
    token_spans = ws_tokenizer.span_tokenize(string)

    # merging wordpiece embeddings
    result = []
    stack = []  # initialise stack with idx

    t_span = next(token_spans)
    for i in range(len(wp_e)):
        wp_span = inputs.token_to_chars(i)
        if is_subspan(wp_span, t_span): stack.append(i)
        else: 
            t_e = _merge_embeddings(wp_e, stack)
            result.append(t_e)
            t_span = next(token_spans) # if error is thrown, sth is wrong as every wp should be a subspan of some token

            assert is_subspan(wp_span, t_span)
            stack = [i]  # initialise stack with current idx
    
    # clear remaining accumulated tensors
    t_e = _merge_embeddings(wp_e, stack)
    result.append(t_e)

    result = torch.stack(result, dim = 0)
    assert len(result) == len(string.split()), f"{len(result)} != {len(string.split())}"
    return result

def is_subspan(subspan: Tuple[int], span: Tuple[int]) -> bool:
    assert len(subspan) == 2
    assert len(span) == 2
    return subspan[0] >= span[0] and subspan[1] <= span[1]

def get_model_device(model):
    return next(model.parameters()).device

def get_wordpiece_embeddings(inputs: Dict, model, layer_merge_strategy=="cat") -> Tensor:
    """ Based on https://mccormickml.com/2019/05/14/BERT-word-embeddings-tutorial/#2-input-formatting """
    with torch.no_grad():
        inputs = {k: v.to(get_model_device(model)) for k, v in inputs.items()}
        outputs = model(**inputs)
        hidden_states = outputs[2]  # 13x1xtx768

    # stack list of tensors 
    wordpiece_embeddings = torch.stack(hidden_states[-4:], dim = 0)  # 4x1xtx768

    # remove batch dimension
    wordpiece_embeddings = torch.squeeze(wordpiece_embeddings, dim = 1)  # 4xtx768

    # order by wordpiece tokens
    wordpiece_embeddings = wordpiece_embeddings.permute(1, 0, 2)  # tx4x768

    if layer_merge_strategy == "second-to-last":
        wordpiece_embeddings = wordpiece_embeddings[:, -2]
    elif layer_merge_strategy == "weighted_sum":
        wordpiece_embeddings = wordpiece_embeddings.sum(dim=1)  # tx768
    elif layer_merge_strategy == "cat":  # best results according to BERT's paper
        s = wordpiece_embeddings.size()
        wordpiece_embeddings = wordpiece_embeddings.reshape(s[0], s[1] * s[2])  # tx3072
    else:
        raise NotImplementedError
    
    return wordpiece_embeddings

In [20]:
def test_get_wordpiece_embeddings():
    from transformers import AutoModel
    from transformers import AutoTokenizer
    """ Tests for contextual embeddings. Follows https://mccormickml.com/2019/05/14/BERT-word-embeddings-tutorial/#2-input-formatting. """

    from scipy.spatial.distance import cosine

    # model_name = 'bert-base-uncased'
    model_name = 'bert-base-multilingual-cased'
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name, output_hidden_states = True)
    model.eval()
    
    # english
    en = "After stealing money from the bank vault, the bank robber was seen fishing on the Mississippi river bank."
    inputs = tokenizer(en, return_tensors="pt", truncation=True, add_special_tokens=False)
    we_en = get_wordpiece_embeddings(inputs, model)

    tokens = tokenizer.tokenize(en, truncation=True, add_special_tokens=False)
    assert len(we_en) == len(tokens)
    ids = [i for i, x in enumerate(tokens) if x == "bank"]

    same_bank = 1 - cosine(we_en[ids[0]], we_en[ids[1]])
    diff_bank = 1 - cosine(we_en[ids[0]], we_en[ids[2]])
    print(f"diff_bank vs same_bank for en: {diff_bank} vs {same_bank}")
    assert same_bank > diff_bank

def test_get_token_embeddings():
    from transformers import AutoModel
    from transformers import AutoTokenizer
    """ Tests for contextual embeddings. Follows https://mccormickml.com/2019/05/14/BERT-word-embeddings-tutorial/#2-input-formatting. """

    from scipy.spatial.distance import cosine

    # model_name = 'bert-base-uncased'
    model_name = 'bert-base-multilingual-cased'
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModel.from_pretrained(model_name, output_hidden_states = True)
    model.eval()
    
    # english
    en = "After stealing money from the bank vault, the bank robber was seen fishing on the Mississippi river bank."
    t_en = get_token_embeddings(en, tokenizer, model, merge_strategy="first")

    ids = [i for i, x in enumerate(en.split()) if "bank" in x]

    same_bank = 1 - cosine(t_en[ids[0]], t_en[ids[1]])  # cosine similarity
    diff_bank = 1 - cosine(t_en[ids[0]], t_en[ids[2]])
    print(f"diff_bank vs same_bank for en: {diff_bank} vs {same_bank}")
    assert same_bank > diff_bank

    inputs = tokenizer(en, return_tensors="pt", truncation=True, add_special_tokens=False)
    we_en = get_wordpiece_embeddings(inputs, model)

    tokens = tokenizer.tokenize(en, truncation=True, add_special_tokens=False)
    assert len(we_en) == len(tokens)
    we_ids = [i for i, x in enumerate(tokens) if x == "bank"]
    # token and wordpiece embeddings should be same since "bank" is not split further.
    assert torch.equal(t_en[ids[0]], we_en[we_ids[0]])
    assert torch.equal(t_en[ids[1]], we_en[we_ids[1]])
    assert torch.equal(t_en[ids[2]], we_en[we_ids[2]])

In [21]:
test_get_wordpiece_embeddings()
test_get_token_embeddings()
print("Tests passed!")

Some weights of the model checkpoint at bert-base-multilingual-cased were not used when initializing BertModel: ['cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


diff_bank vs same_bank for en: 0.8148452043533325 vs 0.9431322813034058


Some weights of the model checkpoint at bert-base-multilingual-cased were not used when initializing BertModel: ['cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


diff_bank vs same_bank for en: 0.8148452043533325 vs 0.9431322813034058
Tests passed!
