# NLP Visualization Tests

This notebook tests the visualization utilities for NLP as provided by `trulens.visualizations.NLP`.

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys

# ! pip uninstall -y trulens

# Use this if running this notebook from within its place in the truera repository.
sys.path.insert(0, "..")

import pandas as pd
import numpy as np
import random
from dataclasses import dataclass
from pathlib import Path
import torch
from torch import Tensor
from torch import nn
import re
from typing import Iterable, List

from trulens.nn.models import get_model_wrapper
from trulens.visualizations import NLP

In [3]:
max_tokens = 32

class ToyTokenizer:
    def __init__(self, max_tokens: int = 8):
        self.max_tokens = max_tokens

        self.pad_token = '[PAD]'
        self.mask_token = '[MASK]'
        self.sep_token = '[SEP]'
        self.cls_token = '[SEP]'
        self.unk_token = '[UNK]'

        self.vocab = {
            self.unk_token: 0,
            self.pad_token: 1,
            self.mask_token: 2,
            self.sep_token: 3,
            "neutral": 4,
            "good": 5,
            "bad": 6
        }
        
        self.ids = {i: k for k, i in self.vocab.items()}

        self.pad_token_id = self.vocab[self.pad_token]
        self.cls_token_id = self.vocab[self.cls_token]
        self.sep_token_id = self.vocab[self.sep_token]
        self.unk_token_id = self.vocab[self.unk_token]

        self.pat = re.compile(r"\w+")

    def decode(self, token_id: int) -> str:
        return self.ids[token_id]

    def id_of_token(self, token: str) -> str:
        if token in self.vocab:
            return self.vocab[token]
        else:
            return self.vocab[self.unk_token]

    def split_sentence_into_words(self, sentence: str) -> List[str]:
        tokens = []
        for token in re.findall(self.pat, sentence):
            tokens.append(token)
        return tokens

    def _ids_of_tokens(self, tokens: Iterable[str]) -> Iterable[int]:
        return list(map(self.id_of_token, tokens))

    def tokenize(self, texts: Iterable[str]):
        tokenization = []
        masks = []

        # track longest sentence length in number of tokens
        n_tokens = 0

        for sentence in texts:
            tokens = [self.sep_token_id] + self._ids_of_tokens(self.split_sentence_into_words(sentence)) + [self.cls_token_id]

            # crop to max_tokens
            tokens = tokens[0:self.max_tokens]

            tokenization.append(tokens)
            
            if len(tokens) > n_tokens:
                n_tokens = len(tokens)

        # pad to max length input
        for tokens in tokenization:
            mask = [1] * len(tokens)
            while len(tokens) < n_tokens:
                tokens.append(self.pad_token_id)
                mask.append(0)
            masks.append(mask)

        tokenization = torch.tensor(tokenization)
        masks = torch.tensor(masks)

        return dict(input_ids=tokenization, attention_mask=masks)

    
tokenizer = ToyTokenizer(max_tokens=max_tokens)

In [4]:
# TODO: Test the model to make sure it is 100% accurate on its task.
def generate_dataset(n, l):
    """Generate random sentiment sentences and their labels."""

    ret = []
    cls = []
    for i in range(n):
        sent = []
        while len(sent) < l:
            r = random.random()

            word = "neutral"

            if r > 0.9 and len(sent) > 0:
                continue
            elif r > 0.8:
                word = "good"
            elif r > 0.7:
                word = "bad"

            sent.append(word)

        ret.append(" ".join(sent))

        gt = 0 # neutral
        if "good" in sent and "bad" in sent:
            gt = 3 # confused
        elif "good" in sent:
            gt = 1 # positive
        elif "bad" in sent:
            gt = 2 # negative

        cls.append(gt)

    return pd.DataFrame(dict(sentence=ret, sentiment=cls))

In [5]:
dataset = generate_dataset(1000, max_tokens-1)
sentences = dataset['sentence'].to_numpy()
sentiments = dataset['sentiment'].to_numpy()

In [27]:
@dataclass
class Outputs:
    logits: Tensor = None
    probits: Tensor = None

class Sentiment(torch.nn.Module):
    def set_parameters(self) -> None:
        """Set model parameters as per fixed specification."""

        Wi = torch.zeros_like(self.lstm.weight_ih_l0)
        bi = torch.zeros_like(self.lstm.bias_ih_l0)
        Wh = torch.zeros_like(self.lstm.weight_hh_l0)
        bh = torch.zeros_like(self.lstm.bias_hh_l0)

        big = 8.0 # Multipliers to help dealing with LSTM sigmoids.
        half = 4.0 # Intention here is that sigmoid((x*big) - half) is ~0 if x is ~0; and
                    # ~1 when x is >~ 1.

        S_NEUTRAL = 0
        S_GOOD = 1
        S_BAD = 2
        S_GOOD_TO_BAD = 3
        S_BAD_TO_GOOD = 4
        W_UNKNOWN = 0
        W_NEUTRAL = tokenizer.vocab['neutral']
        W_GOOD = tokenizer.vocab['good']
        W_BAD = tokenizer.vocab['bad']

        hs = self.hidden_size

        # make sure c gate is always big
        bi[0:hs*3] = big*10.0
        bh[0:hs*3] = big*10.0

        # o gate weights:
        Wi[3*hs+S_GOOD,    W_GOOD]    = big # read good word
        Wi[3*hs+S_BAD,     W_BAD]     = big # read bad word
        Wh[3*hs+S_NEUTRAL, S_NEUTRAL] = big * 10.0 # keep prior neutral, good, bad states
        Wh[3*hs+S_GOOD,    S_GOOD]    = big * 10.0 # 
        Wh[3*hs+S_BAD,     S_BAD]     = big * 10.0 #
        bi[3*hs:4*hs] = -half * 1.5 # sigmoid will be 0 unless one of good/bad words was read
        # bh[3*hs:4*hs] = -big * 10.0

        # set "good to bad" confused if prior was good, and input was bad
        Wh[3*hs+S_GOOD_TO_BAD, S_GOOD]        = big      # (prior state was good
        Wi[3*hs+S_GOOD_TO_BAD, W_BAD]         = big      #  and input was bad)
        Wh[3*hs+S_GOOD_TO_BAD, S_GOOD_TO_BAD] = 10.0*big # or (was already in this confused state)
        bh[3*hs+S_GOOD_TO_BAD] = -(half*1.85) # Want at least 2 of first two to fire, or just the last one to fire.

        # set "bad to good" confused if prior was bad, and input was good
        Wh[3*hs+S_BAD_TO_GOOD, S_BAD]         = big      # (prior state was bad
        Wi[3*hs+S_BAD_TO_GOOD, W_GOOD]        = big      #  and input was good)
        Wh[3*hs+S_BAD_TO_GOOD, S_BAD_TO_GOOD] = 10.0*big # or (was already confused)
        bh[3*hs+S_BAD_TO_GOOD] = -(half*1.85) #

        self.lstm.weight_hh_l0 = nn.Parameter(Wh)
        self.lstm.bias_hh_l0 = nn.Parameter(bh)
        self.lstm.weight_ih_l0 = nn.Parameter(Wi)
        self.lstm.bias_ih_l0 = nn.Parameter(bi)

        self.embedding.weight = nn.Parameter(torch.eye(self.emb_size))

        self.logits.weight = nn.Parameter(torch.tensor(
            [
                [10.0, 0.0,  0.0,  0.0,  0.0],
                [0.0, 20.0,  0.0,  0.0,  0.0],
                [0.0,  0.0, 20.0,  0.0,  0.0],
                [0.0,  0.0,  0.0, 30.0, 30.0]
            ]
        ))

    def __init__(self):
        super().__init__()

        self.labels = ['0', '+', '-', '?']

        self.emb_size = 7 # len(tokanizer.vocab)

        self.hidden_size = 5
        # 5 states, one for neutral, one for positive, one for negative, and two for confused. Requiring two
        # confused states for simplicity of the model; it is easier to encode semantics of confusion based on 
        # which initial positive/negative state was set first.

        # Identity embedding, each vocab word has its own dimension where its presence is encoded.
        self.embedding = nn.Embedding(
            padding_idx=0, 
            embedding_dim=self.emb_size, 
            num_embeddings=self.emb_size
        )

        self.lstm = nn.LSTM(
            input_size=self.emb_size,
            hidden_size=self.hidden_size,
            num_layers=1,
            batch_first=True
        )

        # Linear layer to combine the two types of confused state and weight things so that
        # confused outweighs positive and negative, while positive and negative outweigh neutral 
        # if more than one of these states is set.
        self.logits = torch.nn.Linear(
            in_features=self.hidden_size,
            out_features=4,
            bias=False
        )

        # Finally add a softmax for classification.
        self.softmax = torch.nn.Softmax(dim=1)

    def forward(self,
        inputs_embeds: torch.Tensor = None,
        attention_mask: torch.Tensor = None, 
        input_ids: torch.Tensor = None, 
        token_type_ids: torch.Tensor = None
    ) -> torch.Tensor:
        # Signature and some functionality imitiates huggingface models.

        if input_ids is not None:
            batch_size = input_ids.shape[0]
            device = input_ids.device
        else:
            batch_size = inputs_embeds.shape[0]
            device = inputs_embeds.device

        h0 = torch.zeros(1, batch_size, self.hidden_size).to(device)
        h0[:,:,0] = 10.0 # initial state is neutral
        c0 = 10.0 * torch.ones(1, batch_size, self.hidden_size).to(device)

        if inputs_embeds is None:
            embeds = self.embedding(input_ids)
        else:
            embeds = inputs_embeds

        _, (hn, _) = self.lstm(embeds, (h0, c0))

        logits = self.logits(hn)[0]

        probits = self.softmax(logits)

        # outputs also imitate hugging face
        return Outputs(logits=logits, probits=probits)

model = Sentiment()
model.set_parameters()

In [28]:
scores = model(**tokenizer.tokenize(sentences)).logits.detach().numpy()
preds = np.argmax(scores, axis=1)
acc = (preds == sentiments).mean()
print(f"accuracy={100.0*acc:0.3f}%")

wrongs = np.argwhere(preds != sentiments)
for t, p, gt in zip(sentences[wrongs], preds[wrongs], sentiments[wrongs]):
    print (model.labels[p[0]], model.labels[gt[0]], t, tokenizer.tokenize(t))

accuracy=100.000%


In [29]:
wrapper = get_model_wrapper(model, input_shape=(None, tokenizer.max_tokens), device="cpu")

INFO: Detected pytorch backend for <class '__main__.Sentiment'>.
INFO: Using backend Backend.PYTORCH.
INFO: If this seems incorrect, you can force the correct backend by passing the `backend` parameter directly into your get_model_wrapper call.
DEBUG: Input dtype was not passed in. Defaulting to `torch.float32`.


In [30]:
wrapper.print_layer_names()

'embedding':	Embedding(7, 7, padding_idx=0)
'lstm':	LSTM(7, 5, batch_first=True)
'logits':	Linear(in_features=5, out_features=4, bias=False)
'softmax':	Softmax(dim=1)


In [31]:
texts=['good', "bad", "nothing", "good and bad"]

In [32]:
# Minimal usage provides only tokenize and input_accessor if neccessary.

V = NLP(
    tokenize=lambda sentences: tokenizer.tokenize(sentences),
    input_accessor=lambda x: x['input_ids'],
)
V.tokens(texts=texts)

In [33]:
# Test hidden tokens

V = NLP(
    tokenize=lambda sentences: tokenizer.tokenize(sentences),
    input_accessor=lambda x: x['input_ids'],
    hidden_tokens=set([tokenizer.pad_token_id])
    # do not display these tokens
)
V.tokens(texts=texts)

In [34]:
# Test decode to show readable representations of token id's.

V = NLP(
    decode=lambda x: tokenizer.decode(x),
    tokenize=lambda sentences: tokenizer.tokenize(sentences),
    input_accessor=lambda x: x['input_ids'],
    hidden_tokens=set([tokenizer.pad_token_id])
)
V.tokens(texts=texts)

In [35]:
# Show token id's alongside readable forms.

V.tokens(texts=texts, show_id=True)

In [36]:
# Also show pre-tokenized text.

V.tokens(texts=texts, show_id=True, show_text=True)

In [37]:
# Test model outputs.

V = NLP(
    wrapper=wrapper,
    # labels=model.labels,
    decode=lambda x: tokenizer.decode(x),
    tokenize=lambda sentences: tokenizer.tokenize(sentences),
    # huggingface models can take as input the keyword args as per produced by their tokenizers.

    input_accessor=lambda x: x['input_ids'],
    # for huggingface models, input/token ids are under input_ids key in the input dictionary

    output_accessor=lambda x: x.logits,
    # and logits under 'logits' key in the output dictionary

    hidden_tokens=set([tokenizer.pad_token_id])
    # do not display these tokens
)
V.tokens(texts=texts)

In [38]:
# Test model outputs with labels.

V = NLP(
    wrapper=wrapper,
    labels=model.labels,
    decode=lambda x: tokenizer.decode(x),
    tokenize=lambda sentences: tokenizer.tokenize(sentences),
    # huggingface models can take as input the keyword args as per produced by their tokenizers.

    input_accessor=lambda x: x['input_ids'],
    # for huggingface models, input/token ids are under input_ids key in the input dictionary

    output_accessor=lambda x: x.logits,
    # and logits under 'logits' key in the output dictionary

    hidden_tokens=set([tokenizer.pad_token_id])
    # do not display these tokens
)
V.tokens(texts=texts)

In [39]:
# Test attributions; various QoI, point DoI.

from trulens.nn.distributions import PointDoi, GaussianDoi, LinearDoi
from trulens.nn.attribution import InternalInfluence, IntegratedGradients, Cut, OutputCut
from trulens.nn.quantities import MaxClassQoI, ClassQoI, ComparativeQoI

common_args = dict(doi=PointDoi(Cut('embedding')), model=wrapper, cuts=(Cut('embedding'), OutputCut(accessor=lambda o: o.logits)))

attributors = [
    InternalInfluence(
        qoi=MaxClassQoI(),
        **common_args
    ),
    InternalInfluence(
        qoi=ClassQoI(1),
        **common_args
    ),
    InternalInfluence(
        qoi=ClassQoI(3),
        **common_args
    ),
    InternalInfluence(
        qoi=ComparativeQoI(1,2),
        **common_args
    )  
]

for infl in attributors:

    V = NLP(
        wrapper=wrapper,
        labels=model.labels,
        decode=lambda x: tokenizer.decode(x),
        tokenize=lambda sentences: tokenizer.tokenize(sentences),
        # huggingface models can take as input the keyword args as per produced by their tokenizers.

        input_accessor=lambda x: x['input_ids'],
        # for huggingface models, input/token ids are under input_ids key in the input dictionary

        output_accessor=lambda x: x.logits,
        # and logits under 'logits' key in the output dictionary

        hidden_tokens=set([tokenizer.pad_token_id])
        # do not display these tokens
    )

    display(V.tokens(texts=texts, attributor=infl))

In [40]:
common_args = dict(qoi=MaxClassQoI(), model=wrapper, cuts=(Cut('embedding'), OutputCut(accessor=lambda o: o.logits)))

attributors = [
    InternalInfluence(
        doi=PointDoi(Cut('embedding')),
        **common_args
    ),
    InternalInfluence(
        doi=GaussianDoi(var=0.1, resolution=10, cut=Cut('embedding')),
        **common_args
    ),
    InternalInfluence(
        doi=LinearDoi(resolution=10, cut=Cut('embedding')),
        **common_args
    ),
]

for infl in attributors:

    V = NLP(
        wrapper=wrapper,
        labels=model.labels,
        decode=lambda x: tokenizer.decode(x),
        tokenize=lambda sentences: tokenizer.tokenize(sentences),
        # huggingface models can take as input the keyword args as per produced by their tokenizers.

        input_accessor=lambda x: x['input_ids'],
        # for huggingface models, input/token ids are under input_ids key in the input dictionary

        output_accessor=lambda x: x.logits,
        # and logits under 'logits' key in the output dictionary

        hidden_tokens=set([tokenizer.pad_token_id])
        # do not display these tokens
    )

    display(V.tokens(texts=texts, attributor=infl))

In [41]:
# Test stability pairs.

V = NLP(
    wrapper=wrapper,
    labels=model.labels,
    decode=lambda x: tokenizer.decode(x),
    tokenize=lambda sentences: tokenizer.tokenize(sentences),
    # huggingface models can take as input the keyword args as per produced by their tokenizers.

    input_accessor=lambda x: x['input_ids'],
    # for huggingface models, input/token ids are under input_ids key in the input dictionary

    output_accessor=lambda x: x.logits,
    # and logits under 'logits' key in the output dictionary

    hidden_tokens=set([tokenizer.pad_token_id])
    # do not display these tokens
)

texts1 = ['this is good', "good good bad good"]
texts2 = ['this is bad', "good bad bad bad"]

V.tokens_stability(texts1=texts1, texts2=texts2)

In [42]:
# Test doi enumeration.

common_args = dict(return_grads=True, return_doi=True, qoi=MaxClassQoI(), model=wrapper, cuts=(Cut('embedding'), OutputCut(accessor=lambda o: o.logits)))

attributors = [
    InternalInfluence(
        doi=PointDoi(Cut('embedding')),
        **common_args
    ),
    InternalInfluence(
        doi=GaussianDoi(var=0.4, resolution=20, cut=Cut('embedding')),
        **common_args
    ),
    InternalInfluence(
        doi=LinearDoi(resolution=20, cut=Cut('embedding')),
        **common_args
    ),
]

for infl in attributors:

    V = NLP(
        wrapper=wrapper,
        labels=model.labels,
        decode=lambda x: tokenizer.decode(x),
        tokenize=lambda sentences: tokenizer.tokenize(sentences),
        embedder=model.embedding,
        embeddings=list(model.embedding.parameters())[0].detach(),
        # huggingface models can take as input the keyword args as per produced by their tokenizers.

        input_accessor=lambda x: x['input_ids'],
        # for huggingface models, input/token ids are under input_ids key in the input dictionary

        output_accessor=lambda x: x.logits,
        # and logits under 'logits' key in the output dictionary

        hidden_tokens=set([tokenizer.pad_token_id])
        # do not display these tokens
    )

    display(f"doi={infl.doi}")
    display(V.tokens(texts=texts, attributor=infl, show_doi=True))

'doi=PointDoi(_cut=Cut(name=embedding,accessor=None,anchor=out))'

'doi=GaussianDoi(_cut=Cut(name=embedding,accessor=None,anchor=out),_var=0.4,_resolution=20)'

'doi=LinearDoi(_cut=Cut(name=embedding,accessor=None,anchor=out),_baseline=None,_resolution=20)'

In [43]:
# Test doi enumeration with token stability pairs.

common_args = dict(
    return_grads=True, 
    return_doi=True, 
    qoi=MaxClassQoI(), 
    model=wrapper, 
    cuts=(Cut('embedding'), OutputCut(accessor=lambda o: o.logits))
)

attributors = [
    InternalInfluence(
        doi=PointDoi(Cut('embedding')),
        **common_args
    ),
    InternalInfluence(
        doi=GaussianDoi(var=0.1, resolution=10, cut=Cut('embedding')),
        **common_args
    ),
    InternalInfluence(
        doi=LinearDoi(resolution=10, cut=Cut('embedding')),
        **common_args
    ),
]

for infl in attributors:

    V = NLP(
        wrapper=wrapper,
        labels=model.labels,
        decode=lambda x: tokenizer.decode(x),
        tokenize=lambda sentences: tokenizer.tokenize(sentences),
        embedder=model.embedding,
        embeddings=list(model.embedding.parameters())[0].detach(),
        # huggingface models can take as input the keyword args as per produced by their tokenizers.

        input_accessor=lambda x: x['input_ids'],
        # for huggingface models, input/token ids are under input_ids key in the input dictionary

        output_accessor=lambda x: x.logits,
        # and logits under 'logits' key in the output dictionary

        hidden_tokens=set([tokenizer.pad_token_id])
        # do not display these tokens
    )

    display(V.tokens_stability(texts1=texts1, texts2=texts2, attributor=infl, show_doi=True))

In [48]:
# Test IG baselines.

from trulens.utils.nlp import token_baseline_swap
from trulens.utils.typing import ModelInputs

inputs_swap_baseline_ids, inputs_swap_baseline_embeddings = token_baseline_swap(
    
    token_pairs = [(
        tokenizer.vocab['good'],
        tokenizer.vocab['bad']
    )],

    input_accessor=lambda x: x.kwargs['input_ids'],

    ids_to_embeddings=model.embedding
    # Callable to produce embeddings from token ids.
)

from trulens.utils.nlp import token_baseline

inputs_baseline_ids, inputs_baseline_embeddings = token_baseline(
    keep_tokens=set([tokenizer.cls_token_id, tokenizer.sep_token_id]),
    # Which tokens to preserve.

    replacement_token=tokenizer.pad_token_id,
    # What to replace tokens with.

    input_accessor=lambda x: x.kwargs['input_ids'],
    # input_accessor = lambda x: x, 

    ids_to_embeddings=model.embedding
    # Callable to produce embeddings from token ids.
)

common_args = dict(
    return_grads=True, 
    return_doi=True, 
    qoi=ClassQoI(1), 
    model=wrapper, 
    cuts=(Cut('embedding'), OutputCut(accessor=lambda o: o.logits))
)

attributors = [InternalInfluence(
        doi=LinearDoi(resolution=20, cut=Cut('embedding'), baseline=baseline),
        **common_args
    ) for baseline in [
        None,
        inputs_baseline_embeddings,
        inputs_swap_baseline_embeddings
    ]]

for infl in attributors:

    V = NLP(
        wrapper=wrapper,
        labels=model.labels,
        decode=lambda x: tokenizer.decode(x),
        tokenize=lambda sentences: tokenizer.tokenize(sentences),
        embedder=model.embedding,
        embeddings=list(model.embedding.parameters())[0].detach(),
        # huggingface models can take as input the keyword args as per produced by their tokenizers.

        input_accessor=lambda x: x['input_ids'],
        # for huggingface models, input/token ids are under input_ids key in the input dictionary

        output_accessor=lambda x: x.logits,
        # and logits under 'logits' key in the output dictionary

        hidden_tokens=set([tokenizer.pad_token_id])
        # do not display these tokens
    )
    display(f"baseline={infl.doi.baseline}")

    display(V.tokens(texts=texts, attributor=infl, show_doi=True))

'baseline=None'

'baseline=<function token_baseline.<locals>.base_embeddings at 0x7fd757cd3290>'

'baseline=<function token_baseline_swap.<locals>.base_embeddings at 0x7fd757cd3e60>'