In [3]:
import random
import string

import torch
from torch.utils.data import Dataset
from transformers import RobertaConfig, RobertaForMaskedLM, RobertaTokenizer
from transformers.modeling_outputs import MaskedLMOutput


# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = "cpu"

print(torch.cuda.is_available())
print(torch.cuda.device_count())
print(device)

True
1
cpu


In [4]:
EOS = "eos"
CHAR_PAD = "char_pad"
UNK = "unk"
NGRAM_PAD = "ngram_pad"
MASK = "mask"

CHAR_TOKENS: list[str] = list(string.printable) + [EOS, CHAR_PAD, UNK, NGRAM_PAD, MASK]
NGRAM_SIZE: int = 3
HIDDEN_SIZE: int = 768
MAX_SEQ_LEN = 16
PROB_MASK = 0.15


num_chars = len(CHAR_TOKENS)
char_to_idx = {c: i for i, c in enumerate(CHAR_TOKENS)}


def tokenize(seq: str):
    seq = list(seq)
    # Pad such that len(seq) is divisible by NGRAM_SIZE
    if len(seq) % NGRAM_SIZE > 0:
        seq += [CHAR_PAD] * (NGRAM_SIZE - (len(seq) % NGRAM_SIZE))
    seq += [EOS] * NGRAM_SIZE
    return torch.tensor([char_to_idx[c] for c in seq])


def collate(tokenized_seqs: list[torch.tensor], masking_probability: float = 0.15):
    """Pad short seqs, truncate long seqs."""
    tokenized_seqs = [tokenize(x)[: MAX_SEQ_LEN - 1] for x in example_data]
    max_len = max(x.shape[-1] for x in tokenized_seqs)
    labels = torch.full(
        size=[len(tokenized_seqs), max_len],
        fill_value=char_to_idx[NGRAM_PAD],
        dtype=torch.long,
    )
    attention_mask = torch.ones_like(labels)
    for i, x in enumerate(tokenized_seqs):
        labels[i, 0 : len(x)] = x
        attention_mask[i, len(x) :] = 0
    # Masking
    masked_labels = labels.clone().detach()
    for row_idx in range(masked_labels.shape[0]):
        for ngram_idx in range(0, masked_labels.shape[1], NGRAM_SIZE):
            if random.random() < masking_probability:
                masked_labels[
                    row_idx, ngram_idx : ngram_idx + NGRAM_SIZE
                ] = char_to_idx[MASK]
    return labels, masked_labels, attention_mask


example_data = ["Hi..", "This is a second sentence."]
example_x_batch, masked_labels, attention_mask = collate(
    [tokenize(s) for s in example_data]
)
example_x_batch.shape
print(example_x_batch)
print(masked_labels)
print(attention_mask)

tensor([[ 43,  18,  75,  75, 101, 101, 100, 100, 100, 103, 103, 103, 103, 103,
         103],
        [ 55,  17,  18,  28,  94,  18,  28,  94,  10,  94,  28,  14,  12,  24,
          23]])
tensor([[ 43,  18,  75,  75, 101, 101, 104, 104, 104, 103, 103, 103, 104, 104,
         104],
        [ 55,  17,  18,  28,  94,  18,  28,  94,  10,  94,  28,  14,  12,  24,
          23]])
tensor([[1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0],
        [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]])


In [5]:
class EsperantoDataset(Dataset):
    def __init__(self, evaluate: bool = False):
        self.examples = [random.choices(string.printable, 10) for _ in range(1000)]

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, i):
        # We’ll pad at the batch level.
        return tokenize(self.examples[i])

In [36]:
class CharModel(torch.nn.Module):
    def __init__(self):
        super().__init__()
        # An embedding table for each slot in the the ngram, (e.g. 0, 1, 2 for a NGRAM_SIZE=3).
        self.ngram_embedding_tables = [
            torch.nn.Embedding(
                num_embeddings=num_chars,
                embedding_dim=HIDDEN_SIZE,
                padding_idx=char_to_idx[NGRAM_PAD],
            )
            for _ in range(NGRAM_SIZE)
        ]
        self.language_model = RobertaForMaskedLM(
            config=RobertaConfig(
                vocab_size=2,  # won't use
                hidden_size=HIDDEN_SIZE,  # default 768
                max_position_embeddings=514,
                num_attention_heads=12,
                num_hidden_layers=6,
                type_vocab_size=1,
                attention_probs_dropout_prob=0,
                hidden_dropout_prob=0,
            )
        )
        # To map from the lm embeddings back to the chars
        self.ngram_prediction_heads = [
            torch.nn.Linear(HIDDEN_SIZE, num_chars) for _ in range(NGRAM_SIZE)
        ]

    def forward(self, x_batch):
        input_embeddings = self.get_input_embeddings(x_batch)
        lm_embeddings = self.language_model.roberta.forward(
            inputs_embeds=input_embeddings,
            attention_mask=attention_mask[:, ::NGRAM_SIZE],
        ).last_hidden_state
        logits = self.get_predicted_char_logits(lm_embeddings)
        return logits, lm_embeddings, input_embeddings

    def get_loss(self, logits, labels):
        return torch.nn.functional.cross_entropy(
            logits.reshape(-1, num_chars), labels.reshape(-1)
        )

    def get_input_embeddings(self, x_batch: torch.tensor):
        x_batch = example_x_batch
        result = []
        for ngram_slot_idx in range(NGRAM_SIZE):
            ngram_slot_embeddings = self.ngram_embedding_tables[ngram_slot_idx](
                x_batch[:, ngram_slot_idx::NGRAM_SIZE]
            )
            result.append(ngram_slot_embeddings)
        result = torch.stack(result).sum(dim=0)
        return result

    def get_predicted_char_logits(self, xbatch_lm_embeddings: torch.tensor):
        """Map from the lm embeddings back to the chars"""
        result = []
        for ngram_slot_idx in range(NGRAM_SIZE):
            predicted_char = self.ngram_prediction_heads[ngram_slot_idx](
                xbatch_lm_embeddings
            )
            result.append(predicted_char)
        result = torch.concatenate(result, dim=1)
        return result

In [38]:
model = CharModel()
logits = model(example_x_batch)[0]
loss = model.get_loss(logits, example_x_batch)

print(logits.shape)
print(example_x_batch.shape)
print(loss)

torch.Size([2, 15, 105])
torch.Size([2, 15])
tensor(4.7036, grad_fn=<NllLossBackward0>)


In [11]:
# TODO: find out how to use custom model with huggingface trainer
# TODO: dataset + dataloader