In [None]:
import conllu
import tqdm
import torch
import json
from sklearn.metrics import confusion_matrix, classification_report
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch.nn import functional
from functools import partial
from transformers import BertModel
from torch import nn
from transformers import BertTokenizer
import os
import pandas as pd
from utils.get_parl_corpus_token_data import ParlamentaryCorpus
from seqeval.scheme import IOB2
from seqeval.metrics import classification_report as cr
from seqeval.metrics import performance_measure
from seqeval.scheme import IOB2
from seqeval.metrics import performance_measure



In [None]:
""" Could tidy up this one to just use device instead of calling gpu=true etc."""
if torch.cuda.is_available():
    gpu = True
    print("Using GPU")
else:
    gpu = False

device = torch.device("cuda")
print(device)

In [None]:
## Conllu stuff
def filter_tags(x):
    return x        

def convert_to_list_dict(path, file):
    path = path.format(file)
    with open(path, encoding="UTF-8") as infile:
        lst = []
        tokens = list(conllu.parse_incr(infile))
        for sent in tokens:
                dic = {
                "idx": sent.metadata["sent_id"],
                "text": sent.metadata["text"].lower(),
                "tokens": [token["form"].lower() for token in sent],
                "lemmas": [token["lemma"] for token in sent],
                "pos_tags": [token["upos"] for token in sent],
                "ner_tags": [filter_tags(token["misc"].get("name", "O")) for token in sent],
            }
                lst.append(dic) 
        print("Converting {} to list of dictionaries\n     {} elements converted..".format(file, len(lst)))
    return lst

In [None]:
# https://github.com/ltgoslo/NorBERT/blob/main/benchmarking/experiments/dataset.py



class CoNLLDataset(Dataset):
    def __init__(self, x_tokens, y_labels, ner_vocab=None):
        self.tokens = [[x for x in entry] for entry in x_tokens]
        self.ner_labels = [[y for y in entry] for entry in y_labels]

        # hard coded ner_vocab to avoid random shuffled instanciation of ordering of ner_vocab
        self.ner_vocab = ner_vocab
        self.ner_indexer = {i: n for n, i in enumerate(self.ner_vocab)}
    
    def __getitem__(self, index):
        tokens = self.tokens[index]
        ner_labels = self.ner_labels[index]

        x = tokens
        y = torch.LongTensor([self.ner_indexer[i] if i in self.ner_vocab
                              else self.ner_indexer['@UNK'] for i in ner_labels])
        return x, y

    def __len__(self):
        return len(self.tokens)



In [None]:
""" Dynamic padding. Takes the longest sentence in batch and pads other sentences to its length (if im not mistaken)"""
# Function borrowed from https://github.com/ltgoslo/NorBERT/blob/main/benchmarking/experiments/bert_ner.py

def collate_fn(batch, gpu=False):
    longest_y = max([y.size(0) for X, y in batch])
    x = [X for X, y in batch]
    y = torch.stack(
        [functional.pad(y, (0, longest_y - y.size(0)), value=-1) for X, y in batch]) #https://pytorch.org/docs/stable/generated/torch.nn.functional.pad.html
    if gpu:
        y = y.to("cuda")
    return x, y

In [None]:
class Bert(nn.Module):
    def __init__(self, ner_vocab, model_path=None, freeze=False):
        super().__init__()
        self._bert = BertModel.from_pretrained(
            model_path
        )
        hidden_size = self._bert.config.hidden_size
        self._linear = nn.Linear(hidden_size, len(ner_vocab))

        if freeze:
            for param in self._bert.parameters():
                param.requires_grad = False #Freezing bert layer

    def forward(self, batch, mask):
        b = self._bert(
            input_ids=batch["input_ids"], attention_mask=batch["attention_mask"]
        )
        pooler = b.last_hidden_state[:, mask].diagonal().permute(2, 0, 1) #https://pytorch.org/docs/stable/generated/torch.permute.html
        return self._linear(pooler)                                     #https://pytorch.org/docs/stable/generated/torch.diagonal.html 

In [None]:
# https://github.com/ltgoslo/NorBERT/blob/main/benchmarking/experiments/bert_ner.py 

def build_mask(tokenizer, ids):
    tok_sents = [tokenizer.convert_ids_to_tokens(i) for i in ids]
    mask = []
    for sentence in tok_sents:
        current = []
        for n, token in enumerate(sentence):
            if token in tokenizer.all_special_tokens[1:] or token.startswith("##"): # ## masked
                continue
            else:
                current.append(n)
        mask.append(current)

    mask = tokenizer.pad({"input_ids": mask}, return_tensors="pt")["input_ids"]
    return mask

In [None]:
def predict(input_data, tokenizer, model, gpu=False):
    input_data = tokenizer(
        input_data, is_split_into_words=True, return_tensors="pt", padding=True
    )
    if gpu:
        input_data = input_data.to("cuda")
    batch_mask = build_mask(tokenizer, input_data["input_ids"])
    y_pred = model(input_data, batch_mask).permute(0, 2, 1).argmax(dim=1)
    return y_pred

In [None]:
def predict_test(test_set, ner_vocab, tokenizer, model):
    model.eval()
    predicted_labels = []
    test_set = tqdm.tqdm(test_set)
    for x, y in test_set:
        y_pred = predict(x, tokenizer, model, gpu=gpu)
        predicted = [ner_vocab[element] for element in y_pred[0]]
        predicted_labels += predicted
    return predicted_labels

In [None]:
def is_freeze(freeze, model):
    if freeze:
        lr=0.001
        optimiser = torch.optim.Adam(model.parameters(), lr=lr)
    else:
        lr = 2e-5
        optimiser = torch.optim.Adam(model.parameters(), lr=lr)
    return optimiser, lr

In [None]:
def load_parl_corpus(rootdir_parl_corpus, lower=False):
    corpora_normal_cap, corpora_lower, paths = [], [], []
    
    for subdir, dirs, files in os.walk(rootdir_parl_corpus):
        for file in files:
            if "normalized_token_data.json" in file:
                path = (os.path.join(subdir, file))
                paths.append(path)

    for corpus, path in enumerate(paths):
        corpus = ParlamentaryCorpus(path)
        corpus = corpus.load_data()
        corpora_normal_cap.append(corpus)
        for k, v in corpus.items():
            if v == []:
                print(k)
                print(path)

    if lower==True:
        for corpus, path in enumerate(paths):
            corpus = ParlamentaryCorpus(path)
            corpus = corpus.load_data(lower=True)
            corpora_lower.append(corpus)


    return corpora_normal_cap, corpora_lower

In [None]:
def parl_sentences_only(dictionary_corpus):
    corp = []
    for diction in dictionary_corpus:
        corp += list(diction.values())
    return corp

In [None]:
""" Provides dummy y-data so CoNLLDataset class can be used """
""" Not elegant but works perfectly fine :)                 """

def make_dummy_y(test_corpus):
    y_dummy_data = []
    for sentence in test_corpus:
        dummy = []
        dummy.extend(len(sentence)*"O")
        y_dummy_data.append(dummy)
    return y_dummy_data

In [None]:
def binary_parl_from_iob2(array):
    temp_lst = []
    for tag in array:
        if tag == "O":
            temp_lst.append(0)
        else:
            temp_lst.append(1)
    return temp_lst

In [None]:
def gold_labels_unlabled_data(unlabeled):
    length = 0
    gold_labels = []
    for sentences in unlabeled:
        label = []
        length += len(sentences)
        for token in sentences:
            if token[0].isupper() == True: # Checks if token has captial letter
                label.append(1)
                
            else:
                label.append(0) # Adds 0 if token has lower 
        gold_labels.append(label)
    
    return gold_labels

In [None]:
def transform_to_binary(set):
    binary_labels = []
    for label in set:
        if label == "O":
            binary_labels.append(0)
        else:
            binary_labels.append(1)
    return binary_labels

In [None]:
"""Making list of lists preds, same len as NorNE gold labels"""
""" test_dataset_ner_labels should be test_dataset.ner_labels """

def split_list_preds(preds, test_dataset_ner_labels):
    split_list_preds = []
    start = 0

    for sublist in test_dataset_ner_labels:
        end = start + len(sublist)
        split_list_preds.append(preds[start:end])
        start = end
    return split_list_preds

In [None]:
# Assumes that it is the same vocab in dev and test
from itertools import chain
def get_labels(train_split, add_UNK=True):
    label_vocab = [set(y["ner_tags"]) for y in train_split]
    label_vocab = list(chain(*[d for d in label_vocab]))
    label_vocab = list(set(sorted(label_vocab)))
    
    if add_UNK==True:
        label_vocab.append("@UNK")

    tmp1, tmp2 = [], []
    for label in label_vocab:
        if "-" in label:
            tmp1.append(label)
        else:
            tmp2.append(label)
    tmp1.sort(key=lambda x: (x.split("-")[1], x.split("-")[0]))
    tmp1 = [x for x in tmp1]
    tmp1.extend(tmp2)
    label_vocab = tmp1
    return label_vocab

In [None]:
""" Norne Data loading stuff"""

path = "all_conllu/{0}.conllu"
file_list = ["no_bokmaal-ud-dev", "no_bokmaal-ud-test", "no_bokmaal-ud-train", "no_nynorsk-ud-dev", "no_nynorsk-ud-test", "no_nynorsk-ud-train"]

train_split_no = convert_to_list_dict(path, file_list[2])
train_split_ny = convert_to_list_dict(path, file_list[5])


train_split = train_split_no + train_split_ny
ner_vocab = get_labels(train_split)
print("Success!")





In [None]:
"""LOADING NPSC SAMPLES """


import csv

with open('200_annoterte_setninger.csv', newline='', encoding="UTF-8") as csvfile:
    csvreader = csv.reader(csvfile, delimiter=',')
    left_col = []
    right_col = []
    for row in csvreader:
        left_col.append(row[0])
        right_col.append(row[1])

x, y = [], []

with open("parl_annotation_comma_fixed 100 sents.csv", "r", encoding="UTF-8") as f_input:
    reader = csv.reader(f_input, delimiter=",")
    for i, row in enumerate(reader):
        if i%2:
            y.append(row)
        else:
            x.append(row)


def create_sublists(lst):
    sublists = []
    sublist = []
    for element in lst:
        if element == '':
            if sublist:
                sublists.append(sublist)
                sublist = []
        else:
            sublist.append(element)

    if sublist:
        sublists.append(sublist)

    return sublists

y200 = create_sublists(right_col)
x200 = create_sublists(left_col)


def lower_list_of_lists(lists):
    return [[word.lower() for word in sublist] for sublist in lists]


In [None]:
for i in y:
    print(i)

In [None]:
test=lower_list_of_lists(x)

In [None]:
flat_list = [item for sublist in y for item in sublist]


In [None]:
test_dataset = CoNLLDataset(test, y, ner_vocab)
test_loader = DataLoader(
        test_dataset, batch_size=1, shuffle=False, collate_fn=partial(collate_fn, gpu=gpu))

""" Gold labels for test_dataset"""
gold_labels = []
for sentence_labels in test_dataset.ner_labels:
    for label in sentence_labels:
        gold_labels.append(label)


In [None]:

def count_misclassification (gold_sentences_samples_splits, sample_sentence_preds_BINARY_splits, sample_sentences_lower, sample_sentences):
    false_negatives = {}
    false_positives = {}
    for i, (gold, pred) in enumerate(zip(gold_sentences_samples_splits, sample_sentence_preds_BINARY_splits)):
        if gold != pred:
            for word_index, (word_g, word_p) in enumerate(zip(gold,pred)):
                if word_g != word_p:
                    if word_p == 0:
                            if sample_sentences_lower[i][word_index] not in false_negatives.keys():
                                sentence_list = [sample_sentences[i]]
                                false_negatives[sample_sentences_lower[i][word_index]] = {
                                    "Count": 1,
                                    "Sentence": sentence_list
                                    }
                            
                            elif sample_sentences_lower[i][word_index] in false_negatives.keys():
                                count = false_negatives[sample_sentences_lower[i][word_index]]["Count"]
                                count += 1
                                false_negatives[sample_sentences_lower[i][word_index]]["Count"] = count
                                false_negatives[sample_sentences_lower[i][word_index]]["Sentence"].append(sample_sentences[i])



                    elif word_p == 1:
                            if sample_sentences_lower[i][word_index] not in false_positives.keys():
                                sentence_list = [sample_sentences[i]]
                                false_positives[sample_sentences_lower[i][word_index]] = {
                                    "Count": 1,
                                    "Sentence": sentence_list
                                    }
                            
                            elif sample_sentences_lower[i][word_index] in false_positives.keys():
                                count = false_positives[sample_sentences_lower[i][word_index]]["Count"]
                                count += 1
                                false_positives[sample_sentences_lower[i][word_index]]["Count"] = count
                                false_positives[sample_sentences_lower[i][word_index]]["Sentence"].append(sample_sentences[i])

    return false_negatives, false_positives


In [None]:


# relative paths only works half of the time on my windows machine :(
model_types = {
    "ltgoslo/norbert": r"C:\Users\Aarne\OneDrive - University of Bergen\Dokumenter\MSTR-PY\TRAIN ALL BERTS\trained\norBERT",
    "ltgoslo/norbert2": r"C:\Users\Aarne\OneDrive - University of Bergen\Dokumenter\MSTR-PY\TRAIN ALL BERTS\trained\norBERT2",
    "NbAiLab/nb-bert-base": r"C:\Users\Aarne\OneDrive - University of Bergen\Dokumenter\MSTR-PY\TRAIN ALL BERTS\trained\NB-BERT",
    "saattrupdan/nbailab-base-ner-scandi": r"C:\Users\Aarne\OneDrive - University of Bergen\Dokumenter\MSTR-PY\TRAIN ALL BERTS\trained\scandi_ner",
    r"C:\Users\Aarne\bert-base-multilingual-cased": r"C:\Users\Aarne\OneDrive - University of Bergen\Dokumenter\MSTR-PY\TRAIN ALL BERTS\trained\mBERT",
}

for model_type, trained_models_path in model_types.items():
    tokenizer = BertTokenizer.from_pretrained(model_type, do_basic_tokenize=False)
    for subdir, dirs, files in os.walk(trained_models_path):
        for file in files:
            if ".json" not in file:
                bert_model = Bert(ner_vocab, model_type, freeze=False).to(device)
                bert_train_path = os.path.join(subdir, file)
                bert_model.load_state_dict(torch.load(bert_train_path))
                
                # Writing stuff for norne data

                y_pred_norne = predict_test(test_loader, ner_vocab, tokenizer, bert_model)

                # Transformations
                gold_binary = transform_to_binary(gold_labels)
                preds_binary = transform_to_binary(y_pred_norne)
                split_preds = split_list_preds(y_pred_norne, test_dataset.ner_labels)

                # classification reports
                norne_cr_df_all_labels = classification_report(gold_labels, y_pred_norne, labels = ner_vocab[:-2], digits=5, output_dict=True)
                norne_cr_IOB2 = cr(test_dataset.ner_labels, split_preds, mode='strict', scheme=IOB2, digits=4, output_dict=True)
                norne_cr_binary = classification_report(gold_binary, preds_binary, digits=5, output_dict=True)

                norne_cr_df_all_labels = pd.DataFrame(norne_cr_df_all_labels).transpose()
                norne_cr_df_IOB2 = pd.DataFrame(norne_cr_IOB2).transpose()
                norne_cr_df_binary = pd.DataFrame(norne_cr_binary).transpose()


                # confusion matrixes
                norne_cf_matrix_no_O = confusion_matrix(gold_labels, y_pred_norne, labels = ner_vocab[:-2])
                norne_cf_matrix_O = confusion_matrix(gold_labels, y_pred_norne, labels = ner_vocab[:-1])
                norne_cr_matrix_binary = confusion_matrix(gold_binary, preds_binary)

                norne_cf_df_no_O = pd.DataFrame(norne_cf_matrix_no_O)
                norne_cf_df_O = pd.DataFrame(norne_cf_matrix_O)
                norne_cf_df_binary = pd.DataFrame(norne_cr_matrix_binary)

                norne_cf_df_no_O.columns = ner_vocab[:-2]
                norne_cf_df_no_O.index = ner_vocab[:-2]
                norne_cf_df_O.columns = ner_vocab[:-1]
                norne_cf_df_O.index = ner_vocab[:-1]

                if "/" in model_type:
                    model_type_name = model_type.rsplit('/',1)[1]
                else:
                    model_type_name = model_type
                    
                    
                file_name_cr_all_labels = f"NSPC_SAMPLES_CR_all_labels_{model_type_name}.csv" 
                file_name_cr_IOB2 = f"NSPC_SAMPLES_CR_IOB2_{model_type_name}.csv" 
                file_name_cr_binary = f"NSPC_SAMPLES_CR_binary_{model_type_name}.csv" 


                file_name_cf_no_O = f"NSPC_SAMPLES_CF_MATRIX_NO_O_{model_type_name}.csv"
                file_name_cf_O = f"NSPC_SAMPLES_CF_MATRIX_O_{model_type_name}.csv"
                file_name_cf_binary = f"NSPC_SAMPLES_CF_MATRIX_binary_{model_type_name}.csv"

                current_directory = os.getcwd()
                final_directory = os.path.join(current_directory, rf"{model_type_name}")
                if not os.path.exists(final_directory):
                    os.makedirs(final_directory)

                # CR REPROTS
                with open(f"{final_directory}\{file_name_cr_all_labels}", "a") as f:
                    f.write("\n")
                    f.write(f"{file_name_cr_all_labels[3:-4]}_seed_{bert_train_path.rsplit('_')[-1]}")
                    f.write("\n")
                with open(f"{final_directory}\{file_name_cr_IOB2}", "a") as f:
                    f.write("\n")
                    f.write(f"{file_name_cr_IOB2[3:-4]}_seed_{bert_train_path.rsplit('_')[-1]}")
                    f.write("\n")
                with open(f"{final_directory}\{file_name_cr_binary}", "a") as f:
                    f.write("\n")
                    f.write(f"{file_name_cr_binary[3:-4]}_seed_{bert_train_path.rsplit('_')[-1]}")
                    f.write("\n")
                
                # matrixes
                with open(f"{final_directory}\{file_name_cf_no_O}", "a") as f:
                    f.write("\n")
                    f.write(f"{file_name_cf_no_O[3:-4]}_seed_{bert_train_path.rsplit('_')[-1]}")
                    f.write("\n")
                with open(f"{final_directory}\{file_name_cf_O}", "a") as f:
                    f.write("\n")
                    f.write(f"{file_name_cf_O[3:-4]}_seed_{bert_train_path.rsplit('_')[-1]}")
                    f.write("\n")
                with open(f"{final_directory}\{file_name_cf_binary}", "a") as f:
                    f.write("\n")
                    f.write(f"{file_name_cf_binary[3:-4]}_seed_{bert_train_path.rsplit('_')[-1]}")
                    f.write("\n")

                # class report
                norne_cr_df_all_labels.to_csv(f"{final_directory}\{file_name_cr_all_labels}", mode="a")
                norne_cr_df_IOB2.to_csv(f"{final_directory}\{file_name_cr_IOB2}", mode="a")
                norne_cr_df_binary.to_csv(f"{final_directory}\{file_name_cr_binary}", mode="a")

                #confusion matrix
                norne_cf_df_no_O.to_csv(f"{final_directory}\{file_name_cf_no_O}", mode="a")
                norne_cf_df_O.to_csv(f"{final_directory}\{file_name_cf_O}", mode="a")
                norne_cf_df_binary.to_csv(f"{final_directory}\{file_name_cf_binary}", mode="a")








In [33]:
with open("sample_NPSC_for_NSRFpp.bmes", 'w', encoding='utf-8') as f:
        for sentence, label in zip(test, y):
            for word, label in zip(sentence, label):
                f.write(f"{word} {label} \n")
            f.write('\n')