In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from transformers import BertTokenizer, BertForMaskedLM
from torch.distributions.categorical import Categorical


In [2]:
model_name = "NbAiLab/nb-bert-base"
tokenizer = BertTokenizer.from_pretrained(model_name)
bert_model = BertForMaskedLM.from_pretrained(model_name)
bert_model.eval()

Some weights of the model checkpoint at NbAiLab/nb-bert-base were not used when initializing BertForMaskedLM: ['cls.seq_relationship.bias', 'cls.seq_relationship.weight']
- This IS expected if you are initializing BertForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


BertForMaskedLM(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(119547, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_

In [3]:
class SentenceCompressionAgent(nn.Module):
    def __init__(self, bert_model):
        super(SentenceCompressionAgent, self).__init__()
        self.bert_model = bert_model

    def forward(self, input_ids):
        outputs = self.bert_model(input_ids)
        action_probs = torch.softmax(outputs.logits, dim=-1)
        return action_probs


In [4]:
agent = SentenceCompressionAgent(bert_model)
optimizer = optim.Adam(agent.parameters(), lr=1e-5)


In [5]:
def sample_action(action_probs):
    action_dist = Categorical(action_probs)
    action = action_dist.sample()
    return action.item()


In [19]:
def sentence_probability(sentence, tokenizer, model):
    input_ids = tokenizer.encode(sentence, return_tensors="pt")
    with torch.no_grad():
        outputs = model(input_ids, labels=input_ids)
    loss = outputs.loss.item()
    return -loss

def fitness_function(sentence, compressed_sentence, tokenizer, model):
    p_s1 = sentence_probability(sentence, tokenizer, model)
    p_s2 = sentence_probability(compressed_sentence, tokenizer, model)
    return p_s1 - p_s2

def reward_function(sentence, compressed_sentence, tokenizer, model):
    fitness = fitness_function(sentence, compressed_sentence, tokenizer, model)
    reward = -fitness  # We want to maximize the reward, which is the negation of the fitness
    return reward

# test the probability of a sentence:
sentence = "Dette er en setning"
sentence = "dette setning"
sentence = "setning"
p_s = sentence_probability(sentence, tokenizer, bert_model)
print(f"p(s) = {p_s:.4f}")

p(s) = -10.7843


: 

In [7]:
def generate_compressed_sentence(sentence, tokenizer, agent):
    input_ids = tokenizer.encode(sentence, return_tensors="pt")
    action_probs = agent(input_ids)
    compressed_tokens = []

    for token, action_prob in zip(tokenizer.tokenize(sentence), action_probs.squeeze(0)):
        action = sample_action(action_prob)
        if action == 0:  # If the action is 0, we keep the word
            compressed_tokens.append(token)

    compressed_sentence = tokenizer.convert_tokens_to_string(compressed_tokens)
    return compressed_sentence


In [10]:
# data:
path = "/Users/tollef/Downloads/git/PHD/SUMMARIZATION/grammaticality/src/data/nocola/datasets/NoCoLa_zero.txt"
with open(path, "r") as f:
    # parallel data: ERROR_SENTENCE, CORRECT_SENT, ERROR_TYPE
    data = [line.strip().split("\t") for line in f.readlines()]

In [13]:
# create a dataloder class for the parallel data:
error_sents = [line[0] for line in data][:100]
correct_sents = [line[1] for line in data][:100]

In [15]:
# progress bar with rich:
from rich.progress import track

def train_agent(agent, tokenizer, sentences, epochs=1, batch_size=32):
    for epoch in range(epochs):
        for i in track(range(0, len(sentences), batch_size)):
            batch_sentences = sentences[i: i + batch_size]

            optimizer.zero_grad()
            total_loss = 0

            for sentence in batch_sentences:
                compressed_sentence = generate_compressed_sentence(sentence, tokenizer, agent)
                reward = reward_function(sentence, compressed_sentence, tokenizer, bert_model)

                input_ids = tokenizer.encode(sentence, return_tensors="pt")
                action_probs = agent(input_ids)
                action_dist = Categorical(action_probs)
                actions = [sample_action(action_prob) for action_prob in action_probs.squeeze(0)]

train_agent(agent, tokenizer, correct_sents)

Output()

KeyboardInterrupt: 