# Import packages and load dataset

In [None]:
from datasets import load_dataset
import nltk as nltk
from nltk.tag import pos_tag
from nltk.tag import CRFTagger
import numpy as np
import re, unicodedata
nltk.download('averaged_perceptron_tagger')
dataset = load_dataset(
    "tner/bionlp2004", 
    cache_dir='./data_cache'
)

print('Our dataset is a dictionary that has {} splits: \n\n{}'.format(len(dataset),dataset))

# Formatting the dataset splits

In [None]:

train_sentences_ner = [item['tokens'] for item in dataset['train']]
train_labels_ner = [[str(tag) for tag in item['tags']] for item in dataset['train']]

val_sentences_ner = [item['tokens'] for item in dataset['validation']]
val_labels_ner = [[str(tag) for tag in item['tags']] for item in dataset['validation']]

test_sentences_ner = [item['tokens'] for item in dataset['test']]
test_labels_ner = [[str(tag) for tag in item['tags']] for item in dataset['test']]

In [None]:
print('Number of training sentences = {}'.format(len(train_sentences_ner)))
print('Number of validation sentences = {}'.format(len(val_sentences_ner)))
print('Number of test sentences = {}'.format(len(test_sentences_ner)))

In [None]:
print('An instance from the training set looks like this: \n\n{}'.format(train_sentences_ner[101]))
print('Corresponding label: \n\n{}'.format(train_labels_ner[101]))

In [None]:
print('Number of unique labels: {}'.format(np.unique(np.concatenate(train_labels_ner))))

In [None]:
# mapping from labels to the tags

all_labels = {
    "O": 0,
    "B-DNA": 1,
    "I-DNA": 2,
    "B-protein": 3,
    "I-protein": 4,
    "B-cell_type": 5,
    "I-cell_type": 6,
    "B-cell_line": 7,
    "I-cell_line": 8,
    "B-RNA": 9,
    "I-RNA": 10
}

mapping = {value:key for key, value in all_labels.items()}
print(mapping)

In [None]:
train_set = [list(zip(train_sentences_ner[index],[mapping[int(i)]for i in train_labels_ner[index]]))for index, sentence in enumerate(train_sentences_ner)]

val_set = [list(zip(val_sentences_ner[index],[mapping[int(i)]for i in val_labels_ner[index]]))for index, sentence in enumerate(val_sentences_ner)]
val_tokens = [tok for tok in val_sentences_ner]
val_tags = [[mapping[int(i)]for i in item] for item in val_labels_ner]

test_set = [list(zip(test_sentences_ner[index],[mapping[int(i)]for i in test_labels_ner[index]]))for index, sentence in enumerate(test_sentences_ner)]
test_tokens = [tok for tok in test_sentences_ner]

test_tags = [[mapping[int(i)]for i in item] for item in test_labels_ner]
print(val_set[0])

# Create plain CRF model and train it

In [None]:
model = CRFTagger(verbose= True)
model.train(train_set,'model.crf.my_tagger')

# Predict on validation set

In [None]:
predicted_tags = model.tag_sents(val_tokens)

# Create functions for fetching the F1 scores

In [None]:
# Create DF to hold F1 scores
import pandas as pd
import numpy as np
from copy import copy
df = pd.DataFrame(columns=['DNA','protein ','cell_type ','cell_line ','RNA ', 'Macro Avg.'])
def append_to_scores_table(scores, model_name):
    # Append a placeholder for MacroAvg.
    scores.append(0)
    df.loc[model_name]= scores
    return
def append_macro_avg(score, model_name):
    df.loc[model_name]['Macro Avg.']= score
    return

In [None]:
def span_fill(index, token, label, spans,start,id_):
    # Check if beginning
    if 'B-' in label:
        start = index
        ending = index + 1
        named_entity_type = label[2:]
    # check if inside
    elif 'I-' in label:
        ending = index + 1
    # check if not entity type
    elif start >= 0 and label == 'O':
        if named_entity_type not in spans:
            spans[named_entity_type] = []
        spans[named_entity_type].append((start, ending, id_))
        start = -1   
    if start >= 0:    
        if named_entity_type not in spans:
            spans[named_entity_type] = []
        spans[named_entity_type].append((start, ending, id_))

In [None]:
def score_printer(named_entity_types, true_spans, predicted_spans, F1_score_for_each_class, model_name):
    
    # Manually calculating F1, precision, recall. 
    for named_entity_type in named_entity_types:
        # We loop through all the named entity tpes
        # set TP, FN, and FP to zero.
        true_positive = 0
        false_positive = 0
        false_negative = 0
        
        for span in true_spans[named_entity_type]:
            # check if current true span not in the predicted spans
            if span not in predicted_spans[named_entity_type]:
                # If so...increment false negative value.
                false_negative = false_negative + 1
        
        for span in predicted_spans[named_entity_type]:
            # check if current predicted span in the true spans
            if span in true_spans[named_entity_type]:
                # If so, increment true positive val
                true_positive = true_positive + 1
            else:
                # otherwise increment false negative val
                false_positive = false_positive + 1       
        
            
        if true_positive + false_negative== 0:
            # set recall
            recall = 0
        else:
            # calculate recall using TP and FN
            recall = true_positive / float(true_positive + false_negative)

            
        if true_positive + false_positive == 0:
            # Set precision
            precision = 0
        else:
            # calculate precision using FP and TP
            precision = true_positive / float(false_positive + true_positive)
            

        if recall + precision == 0:
            # Set F1 score
            F1 = 0
        else:
            # Calculate F1 using precision and recall
            F1 = 2 * precision * recall / (precision + recall)
            

        F1_score_for_each_class.append(F1)
        print('F1 score for Class: {} = {}'.format(named_entity_type, F1))
        
    macro_avrg = copy(np.mean(F1_score_for_each_class))
    print('Macro averaged F1 score for all classes: {}'.format(np.mean(F1_score_for_each_class)))
    append_to_scores_table(F1_score_for_each_class, model_name)
    append_macro_avg(macro_avrg, model_name)

In [None]:

def get_spans(tagged_sentences):
    # Create a dict to hold spans
    spans_dict = {}   
    for id_, sentence in enumerate(tagged_sentences):
        start = -1
        entity_type = None
        for index, (token, label) in enumerate(sentence):
            span_fill(index, token, label, spans_dict, start, id_)  
    return spans_dict

def get_f1_scores(test_sents, test_sents_with_pred, model_name):
    true_spans = get_spans(test_sents)
    predicted_spans = get_spans(test_sents_with_pred)
    # A list to hold F1 scores
    F1_score_for_each_class = []
    # Set named entity types
    named_entity_types = true_spans.keys()
    
    score_printer(named_entity_types, true_spans, predicted_spans, F1_score_for_each_class, model_name)
    

# Get F1 scores for validation set on plain model

In [None]:
get_f1_scores(val_set, predicted_tags, 'Plain model')

In [None]:
df

# Create new version of the model that uses previous and next words as additional features

In [None]:
class Current_next_previous_word_CRFTagger(CRFTagger):
    def _get_features(self, toks, i):
            tok = toks[i]
            # Get features from original method
            features = super()._get_features(toks,i)
            # Append the current word
            features.append("CURRENT_WORD" + tok)
            if i < len(toks)-1:
                # Append the next word
                features.append("NEXT_WORD_" + toks[i+1])
                # Append the previous word
            if i > 0:
                features.append("PREVIOUS_WORD_" + toks[i-1])
            return features

# Train the new model

In [None]:
multi_word_model = Current_next_previous_word_CRFTagger(verbose=True)
multi_word_model.train(train_set, 'model.crf.next_previous_word_CRFTagger')

# Get F1 score for validation set predictions on the new model

In [None]:
predicted_tags = multi_word_model.tag_sents(val_tokens)
get_f1_scores(val_set, predicted_tags,'Prev-Next-WRD-Model')
df

# Create a third model that also uses parts of speech tags in addition to all the other features

In [None]:
class POSBasedTagger(Current_next_previous_word_CRFTagger):
    _tokens = None
    def _get_features(self, toks, i):
        # Adding POS tags as a feature on top of the current features
        features = super()._get_features(toks,i)
        # Set Pos tagged toks
        if toks != self._tokens:
            self._pos_tagged_toks = pos_tag(toks)
            self._tokens = toks
        features.append(self._pos_tagged_toks[i][1])
        return features

# Instantiate the model and train it

In [None]:
POSmodel = POSBasedTagger(verbose=True)
POSmodel.train(train_set, 'model.crf.POS_Based_Tagger')

# Predict on validation set and get F1 scores

In [None]:
predicted_tags = POSmodel.tag_sents(val_tokens)
get_f1_scores(val_set, predicted_tags,'POS_model')
df

# Use the best performing model to predict on unseen (test) data for generalisation

In [None]:
# Create new DF for best performing model on Test Set.
df = pd.DataFrame(columns=['DNA','protein ','cell_type ','cell_line ','RNA ', 'Macro Avg.'])
# This model performs the best, so let's choose it to predict on unseen (test data) split to see how well it generalises.
predicted_tags = multi_word_model.tag_sents(test_tokens)
get_f1_scores(test_set, predicted_tags,'Prev-Next-WRD-Model')
df