# Imports

In [1]:
from typing import List
import torch
import myutils
from transformers import AutoModel, AutoTokenizer

# Functions to read data

In [64]:
def read_sent(path):
    ents = []
    curEnts = []
    for line in open(path):
        line = line.strip()
        if line == '':
            ents.append(curEnts)
            curEnts = []
        elif line[0] == '#' and len(line.split('\t')) == 1:
            continue
        else:
            curEnts.append(line.split('\t')[1])
    return(ents)

def read_labels(path):
    ents = []
    curEnts = []
    for line in open(path):
        line = line.strip()
        if line == '':
            ents.append(curEnts)
            curEnts = []
        elif line[0] == '#' and len(line.split('\t')) == 1:
            continue
        else:
            curEnts.append(line.split('\t')[2])
    return(ents)

def read_index(path):
    ents = []
    curEnts = []
    for line in open(path):
        line = line.strip()
        if line == '':
            ents.append(curEnts)
            curEnts = []
        elif line[0] == '#' and len(line.split('\t')) == 1:
            continue
        else:
            curEnts.append(line.split('\t')[0])
    return(ents)

# Train, Dev and Test data

In [65]:
#Training data

#returns list of lists
training_labels = read_labels("baseline_data/en_ewt-ud-train.iob2")
training_sent = read_sent("baseline_data/en_ewt-ud-train.iob2")

#flatten to one list to be able to use myutils
train_labels = sum(training_labels, [])
train_sent = sum(training_sent, [])

In [61]:
#Evaluation data

dev_labels = read_labels("baseline_data/en_ewt-ud-dev.iob2")
dev_sent = read_sent("baseline_data/en_ewt-ud-dev.iob2")

dev_labels = sum(dev_labels, [])
dev_sent = sum(dev_sent, [])

In [54]:
#Test data
#Keeping track of indeces to save to required .iob2 format for model's predictions

test_labels = read_labels("baseline_data/en_ewt-ud-test.iob2")
test_sent = read_sent("baseline_data/en_ewt-ud-test.iob2")
test_index = read_index("baseline_data/en_ewt-ud-test.iob2")

test_labels = sum(test_labels, [])
test_sent = sum(test_sent, [])
test_index = sum(test_index, [])

# The model

Set up for the model

In [8]:
"""
A basic classifier based on the transformers (https://github.com/huggingface/transformers) 
library. It loads a masked language model (by default distilbert), and adds a linear layer for
prediction. Example usage:

python3 bert-topic.py topic-data/train.txt topic-data/dev.txt
"""


# set seed for consistency
torch.manual_seed(8446)
# Set some constants
MLM = 'bert-base-cased'
BATCH_SIZE = 8
LEARNING_RATE = 0.00001
EPOCHS = 10
# We have an UNK label for robustness purposes, it makes it easier to run on
# data with other labels, or without labels.
UNK = "[UNK]"
MAX_TRAIN_SENTS=64
DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu"


class ClassModel(torch.nn.Module):
    def __init__(self, nlabels: int, mlm: str):
        """
        Model for classification with transformers.

        The architecture of this model is simple, we just have a transformer
        based language model, and add one linear layer to converts it output
        to our prediction.
    
        Parameters
        ----------
        nlabels : int
            Vocabulary size of output space (i.e. number of labels)
        mlm : str
            Name of the transformers language model to use, can be found on:
            https://huggingface.co/models
        """
        super().__init__()

        # The transformer model to use
        self.mlm = AutoModel.from_pretrained(mlm)

        # Find the size of the output of the masked language model
        if hasattr(self.mlm.config, 'hidden_size'):
            self.mlm_out_size = self.mlm.config.hidden_size
        elif hasattr(self.mlm.config, 'dim'):
            self.mlm_out_size = self.mlm.config.dim
        else: # if not found, guess
            self.mlm_out_size = 768

        # Create prediction layer
        self.hidden_to_label = torch.nn.Linear(self.mlm_out_size, nlabels)

    def forward(self, input: torch.tensor):
        """
        Forward pass
    
        Parameters
        ----------
        input : torch.tensor
            Tensor with wordpiece indices. shape=(batch_size, max_sent_len).

        Returns
        -------
        output_scores : torch.tensor
            ?. shape=(?,?)
        """
        # Run transformer model on input
        mlm_out = self.mlm(input)

        # Keep only the last layer: shape=(batch_size, max_len, DIM_EMBEDDING)
        mlm_out = mlm_out.last_hidden_state
        # Keep only the output for the first ([CLS]) token: shape=(batch_size, DIM_EMBEDDING)
        mlm_out = mlm_out[:,:1,:].squeeze()

        # Matrix multiply to get scores for each label: shape=(?,?)
        output_scores = self.hidden_to_label(mlm_out)

        return output_scores

    def run_eval(self, text_batched: List[torch.tensor], labels_batched: List[torch.tensor]):
        """
        Run evaluation: predict and score
    
        Parameters
        ----------
        text_batched : List[torch.tensor]
            list with batches of text, containing wordpiece indices.
        labels_batched : List[torch.tensor]
            list with batches of labels (converted to ints).
        model : torch.nn.module
            The model to use for prediction.
    
        Returns
        -------
        score : float
            accuracy of model on labels_batches given feats_batches
        """
        self.eval()
        match = 0
        total = 0
        pred_labels_list = []
        for sents, labels in zip(text_batched, labels_batched):
            output_scores = self.forward(sents)
            pred_labels = torch.argmax(output_scores, 1)
            pred_labels_list.append(pred_labels)
            for gold_label, pred_label in zip(labels, pred_labels):
                total += 1
                if gold_label.item() == pred_label.item():
                    match+= 1
        score = match/total
        return score, pred_labels_list      

Training the model

In [9]:
train_sent = train_sent[:MAX_TRAIN_SENTS]
train_labels = train_labels[:MAX_TRAIN_SENTS]

id2label, label2id = myutils.labels2lookup(train_labels, UNK)
NLABELS = len(id2label)
print(train_labels)
print(label2id)

#converting BIO labels to numerical labels
train_labels = [label2id.get(label, label2id[UNK]) for label in train_labels]

dev_labels = [label2id.get(label, label2id[UNK]) for label in dev_labels]

print('tokenizing...')
tokzr = AutoTokenizer.from_pretrained(MLM)
train_tokked = myutils.tok(train_sent, tokzr)
dev_tokked = myutils.tok(dev_sent, tokzr)
PAD = tokzr.pad_token_id

print('converting to batches...')
train_text_batched, train_labels_batched = myutils.to_batch(train_tokked, train_labels, BATCH_SIZE, PAD, DEVICE)
# Note, some data is thrown away if len(text_tokked)%BATCH_SIZE!= 0
dev_text_batched, dev_labels_batched = myutils.to_batch(dev_tokked, dev_labels, BATCH_SIZE, PAD, DEVICE)

print('initializing model...')
model = ClassModel(NLABELS, MLM)
model.to(DEVICE)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
loss_function = torch.nn.CrossEntropyLoss(ignore_index=0, reduction='sum')

print('training...')
for epoch in range(EPOCHS):
    print('=====================')
    print('starting epoch ' + str(epoch))
    model.train() 

    # Loop over batches
    loss = 0
    for batch_idx in range(0, len(train_text_batched)):
        optimizer.zero_grad()

        output_scores = model.forward(train_text_batched[batch_idx])
        batch_loss = loss_function(output_scores, train_labels_batched[batch_idx])
        loss += batch_loss.item()

        batch_loss.backward()

        optimizer.step()

    dev_score = model.run_eval(dev_text_batched, dev_labels_batched)
    print('Loss: {:.2f}'.format(loss))
    print('Acc(dev): {:.2f}'.format(100*dev_score[0]))
    print()

['O', 'O', 'O', 'O', 'O', 'B-LOC', 'O', 'B-LOC', 'I-LOC', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-LOC', 'I-LOC', 'O', 'O', 'O', 'O', 'B-LOC', 'O', 'B-LOC', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O']
{'[UNK]': 0, 'O': 1, 'B-LOC': 2, 'I-LOC': 3}
tokenizing...
converting to batches...
initializing model...


Some weights of the model checkpoint at bert-base-cased were not used when initializing BertModel: ['cls.predictions.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.dense.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


training...
starting epoch 0
Loss: 57.58
Acc(dev): 94.05

starting epoch 1
Loss: 27.69
Acc(dev): 94.05

starting epoch 2
Loss: 22.78
Acc(dev): 94.06

starting epoch 3
Loss: 17.04
Acc(dev): 94.06

starting epoch 4
Loss: 13.08
Acc(dev): 94.06

starting epoch 5
Loss: 9.15
Acc(dev): 94.13

starting epoch 6
Loss: 6.88
Acc(dev): 94.28

starting epoch 7
Loss: 4.49
Acc(dev): 94.30

starting epoch 8
Loss: 2.96
Acc(dev): 94.32

starting epoch 9
Loss: 1.73
Acc(dev): 94.31



Testing the model on test data

In [10]:
#Convert BIO labels to numerical labels
test_labels = [label2id.get(label, label2id[UNK]) for label in test_labels]

# Tokenize testing data
test_tokked = myutils.tok(test_sent, tokzr)

# Convert testing data to batches
test_text_batched, test_labels_batched = myutils.to_batch(test_tokked, test_labels, BATCH_SIZE, PAD, DEVICE)

# Evaluate the model on test data
print('evaluating on testing data...')
test_score = model.run_eval(test_text_batched, test_labels_batched)
print('Accuracy on test data: {:.2f}%'.format(100 * test_score[0]))

evaluating on testing data...
Accuracy on test data: 93.45%


# Save to .iob2

In [12]:
# One list with predicted labels
pred_labels = [label.item() for batch_pred_labels in test_score[1] for label in batch_pred_labels]

#we define id2label earlier
#id2label = {v: k for k, v in label2id.items()}

# Convert numerical labels back to text labels
pred_labels = [id2label[int(label)] for label in pred_labels]

In [14]:
def save_to_iob2_file(index_list, word_list, tag_list, file_path):
    with open(file_path, 'w') as f:
        for index, word, tag in zip(index_list, word_list, tag_list):
            f.write(f"{index}\t{word}\t{tag}\n")
        f.write("\n")  # Add a newline to separate sentences

In [None]:
save_to_iob2_file(test_index, test_sent, pred_labels, 'wrong_format.iob2')

The above turned out to be wrong format, so we had to change it - adding new line separator between sentences.

In [None]:
def read_iob2_file(file_path):
    sentences = []
    with open(file_path, "r") as file:
        new_sentence = []
        for line in file:
            line = line.split()
            if line:  # Check if the line is not empty
                if int(line[0]) == 1:
                    if new_sentence:  # Append the previous sentence if it's not empty
                        sentences.append(new_sentence)
                    new_sentence = [line]  # Start a new sentence
                else:
                    new_sentence.append(line)
        if new_sentence:  # Append the last sentence if it's not empty
            sentences.append(new_sentence)
    return sentences

sentences = read_iob2_file("wrong_format.iob2")

In [None]:
def save_iob2_format(sentences, output_file):
    with open(output_file, "w") as file:
        for sentence in sentences:
            for token in sentence:
                file.write("\t".join(token) + "\n")
            file.write("\n")

In [None]:
save_iob2_format(sentences, "predictions.iob2")