In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
%pip install -qU transformers evaluate peft seqeval sentencepiece accelerate datasets
import pandas as pd
import regex as re
import numpy as np
from tqdm import tqdm
import string
from collections import defaultdict

In [None]:
from datasets import load_dataset
hiner = load_dataset('cfilt/HiNER-collapsed')
# hiner = load_dataset('cfilt/HiNER-original')

label_list = ["B-LOCATION", "B-ORGANIZATION", "B-PERSON", "I-LOCATION", "I-ORGANIZATION","I-PERSON","O"]

# label_list = ["B-FESTIVAL","B-GAME","B-LANGUAGE","B-LITERATURE","B-LOCATION","B-MISC","B-NUMEX","B-ORGANIZATION",
#               "B-PERSON","B-RELIGION","B-TIMEX","I-FESTIVAL","I-GAME","I-LANGUAGE","I-LITERATURE","I-LOCATION",
#               "I-MISC","I-NUMEX","I-ORGANIZATION","I-PERSON","I-RELIGION","I-TIMEX","O"]

id2label = {idx:i for idx, i in enumerate(label_list)}
label2id = {i: idx for idx, i in enumerate(label_list)}

Downloading builder script:   0%|          | 0.00/3.08k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/7.06k [00:00<?, ?B/s]

Downloading data files:   0%|          | 0/3 [00:00<?, ?it/s]

Downloading data:   0%|          | 0.00/50.3M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/7.20M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/14.4M [00:00<?, ?B/s]

Extracting data files:   0%|          | 0/3 [00:00<?, ?it/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

In [None]:
vocab = dict()
vocab_count = 0
# vocab["<END>"] = vocab_count
# vocab_count += 1
vocab["<OOV>"] = vocab_count
vocab_count += 1

for sentence in tqdm(iter(hiner['train']["tokens"])):
  for word in sentence:
    if word not in vocab:
      vocab[word] = vocab_count
      vocab_count += 1

75827it [00:00, 81602.83it/s]


In [None]:
def get_data(data, vocab):
    tokens = []
    tags = []
    for s_token, s_tag in tqdm(zip(data["tokens"], data["ner_tags"])):
        for idx,(i,j) in enumerate(zip(s_token, s_tag)):
            if i in vocab:
                tokens += [i]
                tags += [id2label[j]]
            else:
                tokens += ["<OOV>"]
                tags += [id2label[j]]

    return list(zip(tokens, tags))

In [None]:
train = get_data(hiner["train"], vocab)
# validation = get_data(hiner["validation"], vocab)
# test = get_data(hiner["test"], vocab)

75827it [00:01, 59816.07it/s]


In [None]:
import numpy as np

import evaluate
seqeval = evaluate.load("seqeval")

def compute_metrics(labels, predictions):
    labels = [
        [label_list[l] for l in label]  for label in labels
    ]
    # labels = [
    #     [label_list[l] for (p, l) in zip(prediction, label) if l != (len(label_list)-1)]
    #     for prediction, label in zip(predictions, labels)
    # ]
    # predictions = [
    #     [p             for (p, l) in zip(prediction, label) if l != (len(label_list)-1)]
    #     for prediction, label in zip(predictions, labels)
    # ]

    results = seqeval.compute(predictions=predictions, references=labels, zero_division=1)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

In [None]:
def create_dictionaries(training_corpus, vocab):
    emission_counts = defaultdict(int)
    transition_counts = defaultdict(int)
    tag_counts = defaultdict(int)
    prev_tag = 'O'

    i = 0
    for word, tag in tqdm(training_corpus):
        i += 1
        transition_counts[(prev_tag, tag)] += 1
        emission_counts[(tag, word)] += 1
        tag_counts[tag] += 1
        prev_tag = tag

    return emission_counts, transition_counts, tag_counts

In [None]:
emission_counts, transition_counts, tag_counts = create_dictionaries(train, vocab)

100%|██████████| 1544838/1544838 [00:01<00:00, 827749.79it/s]


In [None]:
def predict_pos(test, emission_counts, vocab, label_list):
    result = []
    for sentence in tqdm(test):
        predicted = []
        for word in sentence:
            count_final = 0
            pos_final = 'O'
            if word not in vocab:
                word = "<OOV>"
            for pos in label_list:
                key = (pos,word)
                if key in emission_counts:
                    count = emission_counts[key]
                    if count > count_final:
                        count_final = count
                        pos_final = pos
            predicted += [pos_final]
        # print(predicted)
        result.append(predicted)
    return result

In [None]:
result = predict_pos(hiner["test"]['tokens'], emission_counts, vocab, label_list)
reply = compute_metrics(hiner["test"]['ner_tags'], result)
reply

100%|██████████| 21657/21657 [00:01<00:00, 16808.29it/s]


{'precision': 0.7838487358239489,
 'recall': 0.7457953815336936,
 'f1': 0.7643487270860972,
 'accuracy': 0.9507043685996027}

In [None]:
reply['model']="emission"
reply['split']="test"

reply = {i:[j] for i,j in reply.items()}

original = pd.concat([original, pd.DataFrame(reply)], ignore_index=True)
original

Unnamed: 0,precision,recall,f1,accuracy,model,split
0,0.824051,0.870495,0.846636,0.970821,emission,train
1,0.789533,0.744853,0.766542,0.950415,emission,validation
2,0.783849,0.745795,0.764349,0.950704,emission,test


In [None]:
original = pd.DataFrame()

In [None]:
def create_transition_matrix(alpha, label_list, tag_counts, transition_counts):
    num_tags = len(label_list)
    A = np.zeros((num_tags,num_tags))
    trans_keys = set(transition_counts.keys())
    for i in range(num_tags):
        for j in range(num_tags):
            count = 0
            key = (label_list[i],label_list[j])
            if key in transition_counts:
                count = transition_counts[key]
            count_prev_tag = tag_counts[label_list[i]]
            A[i,j] = (count + alpha) / (count_prev_tag + alpha*num_tags)
    return A

def create_emission_matrix(alpha, label_list, tag_counts, emission_counts, vocab):
    num_tags = len(tag_counts)
    num_words = len(vocab)
    B = np.zeros((num_tags, num_words))
    emis_keys = set(list(emission_counts.keys()))
    for i in range(num_tags):
        for j in range(num_words):
            count = 0
            key = (label_list[i],vocab[j])
            if key in emission_counts.keys():
                count = emission_counts[key]
            count_tag = tag_counts[label_list[i]]
            B[i,j] = (count + alpha) / (count_tag+ alpha*num_words)
    return B

In [None]:
A = create_transition_matrix(1e-15, label_list, tag_counts, transition_counts)
B = create_emission_matrix(1e-15, label_list, tag_counts, emission_counts, list(vocab))

# Virterbi

In [None]:
def initialize(A, B, input, label_list, tag_counts, vocab):
    num_tags = len(tag_counts)
    best_probs = np.zeros((num_tags, len(input)))
    best_paths = np.zeros((num_tags, len(input)), dtype=int)
    s_idx = label_list.index("O")
    for i in range(num_tags):
        if A[s_idx,i] == 0:
            best_probs[i,0] = float('-inf')
        else:
            best_probs[i,0] = np.log(A[s_idx,i]) + np.log(B[i,vocab[input[0]]])
    return best_probs, best_paths

def viterbi_forward(A, B, input, best_probs, best_paths, vocab):
    num_tags = best_probs.shape[0]
    for i in range(1, len(input)):
        for j in range(num_tags):
            best_prob_i =  float("-inf")
            best_path_i = None
            for k in range(num_tags):
                prob = best_probs[k,i-1]+np.log(A[k,j]) +np.log(B[j,vocab[input[i]]])
                if prob > best_prob_i:
                    best_prob_i = prob
                    best_path_i = k
            best_probs[j,i] = best_prob_i
            best_paths[j,i] = best_path_i
    return best_probs, best_paths

def viterbi_backward(best_probs, best_paths, label_list):
    m = best_paths.shape[1]
    num_tags = best_probs.shape[0]
    best_prob_for_last_word = float('-inf')
    pred = [None] * m
    pos_tag_for_word_i = np.argmax(best_probs[:,-1])
    pred[m - 1] = label_list[pos_tag_for_word_i]

    for i in range(m-1, 0, -1):
        pos_tag_for_word_i = best_paths[pos_tag_for_word_i,i]
        pred[i - 1] = label_list[pos_tag_for_word_i]
    return pred

In [None]:
def viterbi(data, A, B, label_list, tag_counts, vocab):
    outputs = []
    s = 0
    inputs = []
    for sentence in tqdm(data):
        if type(sentence)==str:
            sentence = sentence.split()
        sentence = [i if i in vocab else "<OOV>" for i in sentence]
        inputs.append(sentence)
        best_probs, best_paths = initialize(A, B, sentence, label_list, tag_counts, vocab)
        best_probs, best_paths = viterbi_forward(A, B, sentence, best_probs, best_paths, vocab)
        output = viterbi_backward(best_probs, best_paths, label_list)

        outputs.append(output)
    return inputs, outputs

In [None]:
ans =  viterbi(hiner["test"]["tokens"], A, B, label_list, tag_counts, vocab)
reply = compute_metrics(hiner["test"]["ner_tags"],ans[1])

100%|██████████| 21657/21657 [01:13<00:00, 294.39it/s]


In [None]:
reply['model']="viterbi"
reply['split']="test"

reply = {i:[j] for i,j in reply.items()}

original = pd.concat([original, pd.DataFrame(reply)], ignore_index=True)
original

Unnamed: 0,precision,recall,f1,accuracy,model,split
0,0.824051,0.870495,0.846636,0.970821,emission,train
1,0.789533,0.744853,0.766542,0.950415,emission,validation
2,0.783849,0.745795,0.764349,0.950704,emission,test
3,0.919479,0.915018,0.917243,0.982069,viterbi,train
4,0.860013,0.83922,0.84949,0.962522,viterbi,validation
5,0.855622,0.84115,0.848324,0.962123,viterbi,test


In [None]:
original.to_excel("collapseddd.xlsx", index=None)