In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Install transformers.
!pip install transformers

In [3]:
# Imports.
import pickle
import numpy as np
import torch

from transformers import (
    BertConfig,
    BertForMaskedLM,
    LineByLineTextDataset,
    Trainer,
    TrainingArguments,
    EvalPrediction
)
from sklearn.metrics import accuracy_score

# Requirements
* All sequences must be same length.
* Read sequences from a repo for example Google Storage or Google Drive.
* Divide data into 2. One for training one for validation.
* Read features in order to create tokenizer.

In [4]:
# Use rb for byte data, use r for string data.
with open("", "rb") as f:
    columns = pickle.load(f)

In [5]:
data_mapper = {}

# Change this according to your requirements.
for idx, col in enumerate(columns):
    data_mapper[int(col)] = int(idx + 1)

data_mapper[0] = 0
data_mapper[302] = len(data_mapper)
data_mapper[303] = len(data_mapper)
data_mapper[304] = len(data_mapper)
data_mapper[305] = len(data_mapper)
data_mapper[306] = len(data_mapper)
data_mapper[307] = len(data_mapper)

inverse_data_mapper = {value: key for key, value in data_mapper.items()}


class Tokenizer():
    """Tokenizer class for reading and parsing input data.
    """

    def __init__(self, input_dict, inverse_input_dict, str_dict):
        """Init func.
        
            input_dict: A dictionary holds event_ids as keys and indexes as values.
            inverse_input_dict: Reverse dictionary of the input_dict.
            str_dict: event ids as keys str' s as values.
        """
        self.dictionary = input_dict
        self.inverse_dictionary = inverse_input_dict
        self.str_dict = str_dict
        self.cls_token_ids = self.dictionary[304]
        self.sep_token_id = self.dictionary[305]
        self.pad_token_id = self.dictionary[0]
        self.mask_token_id = self.dictionary[306]
        self.unk_token_id = self.dictionary[303]
        self.padding_side = "left"
        self.model_input_names = ["token_type_ids"]

        # Change these tokens according to yyour needs.
        self.mask_token = 306
        self.sep_token = 305
        self.cls_token = 304
        self.unk_token = 303
        self.pad_token = 302

    def convert_tokens_to_ids(self, input_sequence):
        """Tokens to ids.
        """
        result = []
        for seq in input_sequence:
            result.append(self.dictionary[seq])
        
        return result
    
    def convert_ids_to_tokens(self, input_ids):
        """Ids to tokens.
        """
        result = []
        for inp in input_ids:
            result.append(self.inverse_dictionary[inp])
        
        return result

    def convert_tokens_to_string(self, tokens):
        """Tokens to string.
        """
        return_sequence = []
        for tok in tokens:
            return_sequence.append(self.str_dict[tok])
        
        return return_sequence

    def __call__(self, sequences, add_special_tokens=True, truncation=True, max_length=64):
        """This function called when a batch passed to the tokenizer.
            
            sequences: List of sequences. (Iterable.)
            add_special_tokens: Are special tokens added to the sequences.
            truncation: Longer sequences will be truncated.
                This is not implemented because all the sequences padded.
            max_length: Batch_size.
        """
        return_dict = {
            "input_ids": []
        }
        for sequence in sequences:
            return_dict["input_ids"].append(self.build_inputs_with_special_tokens(self.__convert_to_int_sequence(sequence)))

        return return_dict

    def __convert_to_int_sequence(self, sequence):
        new_seq = sequence.split(" ")
        return list(map(int, new_seq))

    def tokenize(self, input_seq):
        return input_seq

    def mask_last_element(self, sequence):
        sequence[-2] = self.mask_token_id
        return sequence

    @property
    def vocab_size(self):
        return len(self.dictionary)

    def get_vocab(self):
        return self.dictionary
    
    def build_inputs_with_special_tokens(
        self, token_ids_0, token_ids_1 = None
    ):
    
        if token_ids_1 is None:
            return [self.cls_token_ids] + token_ids_0 + [self.sep_token_id]
        cls = [self.cls_token_ids]
        sep = [self.sep_token_id]
        return cls + token_ids_0 + sep + token_ids_1 + sep

# Create tokenizer object.
tokenizer = Tokenizer(data_mapper, inverse_data_mapper)

In [6]:
# Read dataset.
dataset = LineByLineTextDataset(
    tokenizer=tokenizer,
    file_path="",
    block_size=128,
)



In [7]:
# Read dataset.
eval_dataset = LineByLineTextDataset(
    tokenizer=tokenizer,
    file_path="",
    block_size=128,
)



In [11]:
class DataCollator():
    """Process input on the fly.
    """
    def __call__(self, examples):
        """Called when a parameter passed to the object.
        """
        batch = {"input_ids": self._collate_batch(examples, tokenizer)}

        batch["input_ids"], batch["labels"] = self.mask_tokens(
            batch["input_ids"],
        )
        return batch
    
    def _collate_batch(self, examples, tokenizer):
        """Collate `examples` into a batch, using the information in `tokenizer` for padding if necessary."""
        # Tensorize if necessary.
        if isinstance(examples[0]["input_ids"], (list, tuple)):
            examples = [torch.tensor(e["input_ids"], dtype=torch.long) for e in examples]

        # Check if padding is necessary.
        length_of_first = examples[0]["input_ids"].size(0)
        are_tensors_same_length = all(x["input_ids"].size(0) == length_of_first for x in examples)
        if are_tensors_same_length:
            examples = [example["input_ids"] for example in examples]
            return torch.stack(examples, dim=0)
    
    def mask_tokens(
        self, inputs: torch.Tensor, special_tokens_mask = None
    ):
        """
        Prepare masked tokens inputs/labels for masked language modeling: 80% MASK, 10% random, 10% original.
        """
        labels = inputs.clone()
        # We sample a few tokens in each sequence for MLM training (with probability `self.mlm_probability`)
        probability_matrix = torch.full(labels.shape, .15)

        if special_tokens_mask is None:
            # special_tokens_mask = self.get_special_tokens_mask(labels, already_has_special_tokens=True)
            # special_tokens_mask = torch.tensor(special_tokens_mask, dtype=torch.bool)
            special_tokens_mask = [
                self.get_special_tokens_mask(val, already_has_special_tokens=True) for val in labels.tolist()
            ]
            special_tokens_mask = torch.tensor(special_tokens_mask, dtype=torch.bool)
        else:
            special_tokens_mask = special_tokens_mask.bool()
        
        probability_matrix.masked_fill_(special_tokens_mask, value=0.0)
        masked_indices = torch.bernoulli(probability_matrix).bool()
        labels[~masked_indices] = -100  # We only compute loss on masked tokens

        # 80% of the time, we replace masked input tokens with tokenizer.mask_token ([MASK])
        indices_replaced = torch.bernoulli(torch.full(labels.shape, 0.8)).bool() & masked_indices
        inputs[indices_replaced] = torch.tensor(tokenizer.convert_tokens_to_ids([306]))

        # 10% of the time, we replace masked input tokens with random word
        indices_random = torch.bernoulli(torch.full(labels.shape, 0.5)).bool() & masked_indices & ~indices_replaced
        random_words = torch.randint(len(tokenizer.dictionary), labels.shape, dtype=torch.long)
        inputs[indices_random] = random_words[indices_random]

        # The rest of the time (10% of the time) we keep the masked input tokens unchanged
        return inputs, labels

    def get_special_tokens_mask(
        self, token_ids_0, token_ids_1 = None, already_has_special_tokensl = False
    ):

        all_special_ids = [133, 129, 130, 131, 132, 0]
        special_tokens_mask = [1 if token in all_special_ids else 0 for token in token_ids_0]

        return special_tokens_mask

data_collator = DataCollator()

In [12]:
vocabulary_size = len(data_mapper)
seq_len = 42

# Create config.
config = BertConfig(
    vocab_size=308,
    hidden_size=720,
    num_hidden_layers=12,
    num_attention_heads=12,
    intermediate_size=1024,
    hidden_act="gelu",
    hidden_dropout_prob=.1,
    attention_probs_dropout_prob=.1,
    max_position_embeddings=seq_len,
    initializer_range=.02,
    layer_norm_eps=1e-12,
    gradient_checkpointing=False,
    position_embedding_type="absolute",
    use_cache=True,
)

# Create model.
model = BertForMaskedLM(config)

In [13]:
def make_metrics(pred: EvalPrediction):
    return_dict = {}
    return_dict["accuracy"] = accuracy_score(pred.label_ids, pred.predictions)
    
    return return_dict

# Trainer arguments for saving, number of epochs...
training_args = TrainingArguments(
    output_dir="",
    overwrite_output_dir=True,
    num_train_epochs=1,
    per_gpu_train_batch_size=128,
    save_steps=10_000,
    save_total_limit=2,
)

# Create a trainer.
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=dataset,
    eval_dataset=eval_dataset,
    compute_metrics=make_metrics,
)

In [None]:
trainer.train()