In [1]:
# default_exp fastbert

In [2]:
#hide
from nbdev.showdoc import *

# FastBert

> fast.ai API customization for separateable sequence Bert model

<img src="data/fastbert.jpg">

In [3]:
# export
# pytorch
import torch

# transformers
from transformers import PreTrainedModel, PreTrainedTokenizer, PretrainedConfig
from transformers import BertForSequenceClassification, BertTokenizer, BertConfig

# fast.ai
from fastai import *
from fastai.text import *

In [4]:
# for the other models from huggingfaces go to
# https://www.kaggle.com/maroberti/fastai-with-transformers-bert-roberta
# the notebook will work as well
# just pay attention at the customized model section

model_class, tokenizer_class, config_class = BertForSequenceClassification, BertTokenizer, BertConfig

## Bert + fast.ai

Customization is following the work of https://www.kaggle.com/maroberti/fastai-with-transformers-bert-roberta. I would recommemend to read the pipeline before, if not done already. When additional work is done we are explaining the steps.

First we are going into the implementations from former collueges and customize them to make the tokenizer capable for our special needs. 

Namely we have to tokenize a List[sequence] element, where sequence in string form, to return a List[tokens] respecting the seperated format. 

The respecting format is:

\begin{equation*}
[CLS] + tokens(seq_1) + tokens(seq_2) + [SEP] + tokens(seq_3) + [SEP]
\end{equation*}

what will be cut by the max_len parameter of Bert.

In [5]:
# export
class TransformersBaseTokenizer(BaseTokenizer):
    """Wrapper around PreTrainedTokenizer to be compatible with fast.ai"""
    def __init__(self, pretrained_tokenizer: PreTrainedTokenizer, model_type = 'bert', max_len=64, **kwargs):
        self._pretrained_tokenizer = pretrained_tokenizer
        self.max_seq_len = max_len
        self.model_type = model_type

    def __call__(self, *args, **kwargs): 
        return self

    def tokenizer(self, t:List[str]) -> List[str]:
        """Limits the maximum sequence length and add the spesial tokens"""
        CLS = self._pretrained_tokenizer.cls_token
        SEP = self._pretrained_tokenizer.sep_token
        turns = [self._pretrained_tokenizer.tokenize(turn) for turn in t]
        tokens = [CLS] + turns[0] + turns[1] + [SEP] + turns[2] + [SEP]
        return tokens[:self.max_seq_len]

In the Tokenizer class we just change the type annotations from str to List[str].

In [6]:
# export
class SeqTokenizer(Tokenizer):
    "Put together rules and a tokenizer function to tokenize text with multiprocessing."
    def __init__(self, tok_func:Callable=SpacyTokenizer, lang:str='en', pre_rules:ListRules=None,
                 post_rules:ListRules=None, special_cases:Collection[str]=None, n_cpus:int=None):
        self.tok_func,self.lang,self.special_cases = tok_func,lang,special_cases
        self.pre_rules  = ifnone(pre_rules,  defaults.text_pre_rules )
        self.post_rules = ifnone(post_rules, defaults.text_post_rules)
        self.special_cases = special_cases if special_cases is not None else defaults.text_spec_tok
        self.n_cpus = ifnone(n_cpus, defaults.cpus)

    def process_text(self, t:List[str], tok:BaseTokenizer) -> List[str]:
        "Process one text `t` with tokenizer `tok`."
        for rule in self.pre_rules: t = rule(t)
        toks = tok.tokenizer(t)
        for rule in self.post_rules: toks = rule(toks)
        return toks

    def _process_all_1(self, texts:Collection[List[str]]) -> List[List[str]]:
        "Process a list of `texts` in one process."
        tok = self.tok_func(self.lang)
        if self.special_cases: tok.add_special_cases(self.special_cases)
        return [self.process_text(t, tok) for t in texts]

    def process_all(self, texts:Collection[List[str]]) -> List[List[str]]:
        "Process a list of `texts`."
        if self.n_cpus <= 1: return self._process_all_1(texts)
        with ProcessPoolExecutor(self.n_cpus) as e:
            return sum(e.map(self._process_all_1, partition_by_cores(texts, self.n_cpus)), [])

Now the challenge is to make the TokenizeProcessor class able to use the new input form of List[sequence]. 

In fast.ai the TokenizeProcessor class processes a list of str to a concatination and uses then the tokenizer for the whole text. So we built a customized version for the Tokenizer. Here we changed the class functions to use our customized tokenizer and got rid of the function for concatination (\_join\_texts). 

It would be more elegant to use the \_join\_texts function in a customized form to do our approach by concatinating the sentences via a special token. One problem with that approach would be that the BertTokenizer is not able to distinguish between text and special tokens in the text. We have to add the special tokens after tokenization.

In [7]:
# export 
class SeqTokenizeProcessor(TokenizeProcessor):
    "`PreProcessor` that tokenizes the texts in `ds`."
    def __init__(self, ds:ItemList=None, tokenizer:Tokenizer=None, chunksize:int=10000, 
                 mark_fields:bool=False, include_bos:bool=True, include_eos:bool=False):
        self.tokenizer,self.chunksize,self.mark_fields = ifnone(tokenizer, Tokenizer()),chunksize,mark_fields
        self.include_bos, self.include_eos = include_bos, include_eos

    def process_one(self, item):
        return self.tokenizer._process_all_1(item)[0]

    def process(self, ds):
        tokens = []
        for i in progress_bar(range(0,len(ds),self.chunksize), leave=False):
            tokens += self.tokenizer.process_all(ds.items[i:i+self.chunksize])
        ds.items = tokens

In [8]:
# setting it up 
# pretrained models can be shown by
# model_class.pretrained_model_archive_map.keys()
pretrained_model_name = 'bert-large-uncased-whole-word-masking'

transformer_tokenizer = tokenizer_class.from_pretrained(pretrained_model_name)
transformer_base_tokenizer = TransformersBaseTokenizer(pretrained_tokenizer = transformer_tokenizer, model_type = model_type)
fastai_tokenizer = SeqTokenizer(tok_func = transformer_base_tokenizer, pre_rules=[], post_rules=[])
tokenize_processor = SeqTokenizeProcessor(tokenizer=fastai_tokenizer, include_bos=False, include_eos=False)

In [9]:
# testing of the TokenizeProcessor subclass
test_item_1 = ['turn one','turn two','turn three.']
test_item_2 = ['turn four', 'turn five', 'turn six.']
test_tokens_1 = ['[CLS]', 'turn', 'one', 'turn', 'two', '[SEP]', 
               'turn', 'three', '.', '[SEP]']
test_tokens_2 = ['[CLS]', 'turn', 'four', 'turn', 'five', '[SEP]', 
               'turn', 'six', '.', '[SEP]']

test_items = ItemList(items = [test_item_1, test_item_2])

try:
    tokenize_processor.process(test_items)
except e:
    print(e) 
    
assert tokenize_processor.process_one([test_item_1]) == test_tokens_1 
assert test_items[0] == test_tokens_1
assert test_items[1] == test_tokens_2

Now that the tokenizer works like we wanted it to be, we have to move to the next problem. We need not only the input_ids of the sentences but also the attention_mask and token_type_ids. 

When looking into the model setup we can do a simple trick by using utility functions in the forward pass. Hence we have two utitliy functions retrieving the masks from an input_ids batch.

In [10]:
# export
def segment(input_ids):
    """util function for token_type_ids in bert"""
    segment_ids = input_ids.clone().cpu() # make sure VRAM will not explode
    segs = (segment_ids==102).nonzero().cpu().numpy()
    state = -1
    # current tensor
    cur = -1
    for seg in segs:
        if cur != seg[0]:
            cur = seg[0]
            segment_ids[cur][:seg[1]] = 0
            segment_ids[cur] = \
            (segment_ids[cur]!=0).type(segment_ids[cur].type())

    segs_set = set(segs.transpose()[0])
    if segs_set != segment_ids.shape[0]:
        for i in (set(range(segment_ids.shape[0])) - segs_set):
            segment_ids[i] = 0

    return segment_ids

In [11]:
# testing the segment function
# 101 - CLS token, 102 - SEP token
# example batch is in the form of 
# the output from the processor built before
tokens_batch = [[101, 2735, 102, 2737, 121, 4243, 1001],
               [101, 219, 102, 2482, 1239, 1234, 102],
               [101, 419, 102, 4202, 102, 0, 0]]
tokens_batch = torch.tensor(tokens_batch)

segs_batch = [[0, 0] + [1]*5,
               [0, 0] + [1]*5,
               [0, 0, 1, 1, 1, 0, 0]]
segs_batch = torch.tensor(segs_batch)

seg_ids = segment(tokens_batch); 
assert torch.equal(seg_ids, segs_batch)

Now as we have a function that can retrieve the token_type_ids of our inputs on the fly, we can use it in the forward pass of our model.

attention_mask is a one liner as we will see.

In [12]:
# export
class CustomTransformerModel(nn.Module):
    """custom transformer model for fast.ai"""
    def __init__(self, transformer_model: PreTrainedModel):
        super(CustomTransformerModel,self).__init__()
        self.transformer = transformer_model
        
    def forward(self, input_ids):
        
        attention_mask = (input_ids!=0).type(input_ids.type())
        segmentation_mask = segment(input_ids).type(input_ids.type())
        
        logits = self.transformer(input_ids,
                                  attention_mask=attention_mask,
                                  token_type_ids=segmentation_mask)[0]
        
        return logits

In [13]:
# testing the CustomTransformerModel 
config = config_class.from_pretrained(pretrained_model_name)
config.num_labels = 4

transformer_model = model_class.from_pretrained(pretrained_model_name, config = config)
custom_transformer_model = CustomTransformerModel(transformer_model = transformer_model)

custom_transformer_model.eval()
try:
    logits_batch = custom_transformer_model.forward(tokens_batch)
except e:
    print(e)

assert logits_batch.shape == torch.Size([tokens_batch.shape[0], 4])

As shown in the other solutions we are customizing the numericalizer as well. Here we are lucky because there is no special needs to the different input form.

In [14]:
# export
class TransformersVocab(Vocab):
    def __init__(self, tokenizer: PreTrainedTokenizer):
        super(TransformersVocab, self).__init__(itos = [])
        self.tokenizer = tokenizer
    
    def numericalize(self, t:Collection[str]) -> List[int]:
        "Convert a list of tokens `t` to their ids."
        return self.tokenizer.convert_tokens_to_ids(t)
        #return self.tokenizer.encode(t)

    def textify(self, nums:Collection[int], sep=' ') -> List[str]:
        "Convert a list of `nums` to their tokens."
        nums = np.array(nums).tolist()
        return sep.join(self.tokenizer.convert_ids_to_tokens(nums)) if sep is not None else self.tokenizer.convert_ids_to_tokens(nums)

The customization work is done here. If you want to see an example how to use it, go to the 01_task3 notebook. There we applied it to the SemEval-2019 Task 3.

In [15]:
from nbdev.export import *
notebook2script()

Converted 00_fastbert.ipynb.
Converted 01_task3.ipynb.
Converted index.ipynb.
