In [1]:
import torch
from pl_model import BertClassifier

  warn(f"Failed to load image Python extension: {e}")


In [2]:
import yaml
from argparse import ArgumentParser

params = yaml.safe_load(open("params.yaml", "r"))["trainer"]
args_list = []

for k, v in params.items():
    args_list.append(f"--{k}={v}")

parser = ArgumentParser()
parser.add_argument("--max_epochs", type=int, default=1)
parser.add_argument("--train_batch_size", type=int, default=32)
parser.add_argument("--val_batch_size", type=int, default=64)
parser.add_argument("--accelerator", type=str, default="cpu")
parser.add_argument("--filename", type=str, default=None)
parser.add_argument("--wandb_project_name", type=str, default=None)
parser = BertClassifier.add_model_specific_args(parser)
args = parser.parse_args(args_list)

In [3]:
model = BertClassifier.load_from_checkpoint('model_ckpt/best_model-v10.ckpt')
model.eval()

BertClassifier(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(28, 128, padding_idx=0)
      (position_embeddings): Embedding(64, 128)
      (token_type_embeddings): Embedding(2, 128)
      (LayerNorm): LayerNorm((128,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-5): 6 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=128, out_features=128, bias=True)
              (key): Linear(in_features=128, out_features=128, bias=True)
              (value): Linear(in_features=128, out_features=128, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=128, out_features=128, bias=True)
              (LayerNorm): LayerNorm((128,), eps=1e-12, elementwise_affine=T

In [4]:
torch.save(model, 'model.trained.pt')

In [5]:
from dataset import CustomDataset

test_dataset = CustomDataset(args.filename, split="test", max_len=args.max_len)
it = iter(test_dataset)

In [6]:
sample = next(it)

In [7]:
from collections import OrderedDict
vocab = test_dataset.tokenizer.vocab
idx2char = list(vocab.keys())

@torch.no_grad()
def predict_char(sample: dict, model: torch.nn.Module, idx2char: list, vocab: dict):
    input_ids = sample['input_ids'].unsqueeze(0)
    attention_mask = sample['attention_mask'].unsqueeze(0)
    output = torch.softmax(model(input_ids, attention_mask, torch.zeros(1, 28)), dim=-1)
    topk = output[0,sample['labels'] != -100].topk(2)
    scores = topk.values
    indices = topk.indices
    indices_n_scores = zip(indices.view(-1).tolist(), scores.view(-1).tolist())
    ordered_set = OrderedDict()

    for k, v in indices_n_scores:
        if k not in ordered_set:
            ordered_set[k] = v

    predictions = list(map(lambda x: idx2char[x], ordered_set.keys()))
    ground_truth = [idx2char[x] for x in sample['labels'][sample['labels'] != -100].tolist()]
    input = [idx2char[x] if (x != vocab['<MASK>']) else '_' for x in input_ids[input_ids != vocab['<PAD>']].tolist()]
    print(f'Input: {input}')
    print(f'Ground Truth: {ground_truth}')
    print(f'Predictions: {predictions}')

In [8]:
from ipywidgets import interact

it = iter(test_dataset)
def f(x: int):
    sample = next(it)
    predict_char(sample, model, idx2char, vocab)
    print('------')

interact(f, x=range(0, len(test_dataset), 1))    

interactive(children=(Dropdown(description='x', options=(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,…

<function __main__.f(x: int)>

In [9]:
vocab = dict(zip('abcedfghijklmnopqrstuvwxyz', range(2, 28)))
idx2char = list('__abcedfghijklmnopqrstuvwxyz')
vocab['_'] = 1

def tensorify(word, negative_samples):
    word = torch.tensor([vocab[c] for c in word])
    negative_sample = [0] * 28
    for c in negative_samples:
        negative_sample[vocab[c]] = 1
    negative_sample = torch.tensor(negative_sample).float()
    attention_mask = torch.tensor([1]*len(word))
    return word, attention_mask, negative_sample

In [10]:
word = "pred_al"
negative_samples = list("predlnamcvt")
with torch.no_grad():
    input_ids, attention_mask, negative_sample = tensorify(word, negative_samples)
    idx = list(map(lambda x: x[0], filter(lambda x: x[1] == "_", enumerate(word))))
    logits = model(
        input_ids.unsqueeze(0),
        attention_mask.unsqueeze(0),
        negative_sample.unsqueeze(0),
    )
    topk = logits.squeeze()[idx].topk(26, dim=-1)
    scores = torch.softmax(topk.values, dim=-1)
    characters = [[idx2char[xx] for xx in x] for x in topk.indices.tolist()]
    for i in range(len(scores)):
        print(list(zip(characters[i], scores[i])))

[('g', tensor(0.5295)), ('x', tensor(0.1003)), ('b', tensor(0.0660)), ('f', tensor(0.0640)), ('w', tensor(0.0614)), ('s', tensor(0.0521)), ('i', tensor(0.0335)), ('y', tensor(0.0284)), ('h', tensor(0.0109)), ('k', tensor(0.0098)), ('u', tensor(0.0080)), ('o', tensor(0.0072)), ('j', tensor(0.0057)), ('v', tensor(0.0053)), ('z', tensor(0.0044)), ('c', tensor(0.0037)), ('n', tensor(0.0033)), ('t', tensor(0.0027)), ('m', tensor(0.0025)), ('e', tensor(0.0003)), ('q', tensor(0.0002)), ('l', tensor(0.0002)), ('r', tensor(0.0002)), ('p', tensor(0.0002)), ('a', tensor(5.2482e-05)), ('d', tensor(1.6473e-05))]


In [50]:
import pdb
import random
from typing import List
from dataset import CharacterLevelTokenizer
from collections import OrderedDict, defaultdict, namedtuple


class HangmanBertPredictor:
    def __init__(self, model_file: str, word_file: str) -> None:
        self.model = torch.load(model_file, map_location='cuda')
        self.model.eval()
        with open(word_file, "r") as f:
            self.words = f.read().split("\n")

        self.tokenizer = CharacterLevelTokenizer(64)
        self.vocab = self.tokenizer.vocab
        self.idx2char = list(self.vocab.keys())

    def get_a_random_word(self):
        # random.seed(10)
        idx = random.randint(0, len(self.words) - 1)
        word = self.words[idx]
        return word

    @torch.no_grad()
    def predict_from_word(
        self, word: str, label: str, negative_sample: str
    ) -> List[str]:
        tokenized_word, _ = self.tokenizer.tokenize(word, label)
        tokenized_word = tokenized_word.unsqueeze(0).cuda()
        attention_mask = torch.ones_like(tokenized_word).cuda()
        idx = tokenized_word == self.tokenizer.vocab["<PAD>"]
        attention_mask[idx] = 0
        negative_sample_tensor = torch.tensor([0.0] * len(self.vocab))
        negative_sample_tensor[[self.vocab[c] for c in set(negative_sample)]] = 1
        negative_sample_tensor = negative_sample_tensor.unsqueeze(0).cuda()
        output = torch.softmax(
            self.model(tokenized_word, attention_mask, negative_sample_tensor), dim=-1
        ).cpu()
        idx = torch.tensor(list(map(lambda c: c != "_", word)))
        output = output[0, : len(word)]
        # print(output[~idx].shape)
        scores = output[~idx]
        indices = self.vocab.keys()
        indices_n_scores = sorted(
            zip(indices, scores.view(-1).tolist()), key=lambda x: x[1], reverse=True
        )

        ordered_set = OrderedDict()

        for k, v in indices_n_scores:
            if k not in ordered_set:
                ordered_set[k] = v

        predictions = list(ordered_set.keys())
        return predictions

    def predict_n_steps(self, b):
        rand_word = self.get_a_random_word()
        og_word = rand_word
        label = rand_word
        rand_word = ["_"] * len(label)
        labels_indexes = defaultdict(list)
        gts = set(label)
        for i, c in enumerate(label):
            labels_indexes[c].append(i)
        steps = 0
        taken_steps = []
        negative_sample = set()
        while "_" in rand_word:
            predictions = self.predict_from_word(rand_word, label, negative_sample)
            # print(rand_word, predictions)
            steps += 1
            # hit
            for c in predictions:
                # hit
                negative_sample.add(c)
                if c in gts:
                    for i in labels_indexes[c]:
                        rand_word[i] = c
                    taken_steps.append(rand_word[:])
                    gts.remove(c)
                    break
        return og_word, steps, taken_steps

In [51]:
hangman_bert_predictor = HangmanBertPredictor(
    model_file='model.trained.pt',
    word_file='words_250000_train.txt'
)

In [55]:
from tqdm import tqdm

steps_distribution = []
for i in tqdm(range(1_000)):
    og_word, steps, taken_steps = hangman_bert_predictor.predict_n_steps(None)
    steps_distribution.append(steps)

100%|██████████| 1000/1000 [00:58<00:00, 17.03it/s]


In [56]:
import polars as pl

print(pl.Series(steps_distribution).describe())

for i in range(1, 11):
    print(i/10, pl.Series(steps_distribution).quantile(i/10))

shape: (6, 2)
┌────────────┬──────────┐
│ statistic  ┆ value    │
│ ---        ┆ ---      │
│ str        ┆ f64      │
╞════════════╪══════════╡
│ min        ┆ 1.0      │
│ max        ┆ 13.0     │
│ null_count ┆ 0.0      │
│ mean       ┆ 7.406    │
│ std        ┆ 2.008282 │
│ count      ┆ 1000.0   │
└────────────┴──────────┘
0.1 5.0
0.2 6.0
0.3 6.0
0.4 7.0
0.5 7.0
0.6 8.0
0.7 8.0
0.8 9.0
0.9 10.0
1.0 13.0
