In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install transformers
!pip install tensorflow-addons
!pip install datasets
!pip install --upgrade accelerate

In [None]:
import warnings
warnings.filterwarnings("ignore")

import numpy as np 
import pandas as pd 
import os
import ast
import spacy
import random
import itertools
import matplotlib.pyplot as plt
from typing import List, Tuple

import tensorflow as tf
import tensorflow_addons as tfa

from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
from transformers import AutoTokenizer, AutoConfig, TFAutoModel

from transformers import DataCollatorForLanguageModeling
from transformers import DebertaTokenizer, DebertaForMaskedLM
from transformers import Trainer, TrainingArguments
from datasets import Dataset

# Load data set and get the TEST data with specific seed

In [None]:
train = pd.read_csv('/content/drive/MyDrive/TAR - projekt/TAR-dataset/train.csv')
train['annotation'] = train['annotation'].apply(ast.literal_eval) # Construct an object from a string
train['location'] = train['location'].apply(ast.literal_eval) # Construct an object from a string
print(f"train.shape: {train.shape}")

features = pd.read_csv('/content/drive/MyDrive/TAR - projekt/TAR-dataset/features.csv')
print(f"features.shape: {features.shape}")

patient_notes = pd.read_csv('/content/drive/MyDrive/TAR - projekt/TAR-dataset/patient_notes.csv')
print(f"patient_notes.shape: {patient_notes.shape}")

train = train.merge(features, on=['feature_num', 'case_num'], how='left')
train = train.merge(patient_notes, on=['pn_num', 'case_num'], how='left')
train['annotation_length'] = train['annotation'].apply(len)
print(f"train.shape: {train.shape}")

train.shape: (14300, 6)
features.shape: (143, 3)
patient_notes.shape: (42146, 3)
train.shape: (14300, 9)


In [None]:
seed=42
train, test = train_test_split(train[['pn_history', 'pn_num', 'feature_text','annotation_length', 'location']], test_size=0.15, random_state=seed)

# Funcions for data processing

In [None]:
MAX_LEN = 512

# this functions are from:
# https://www.kaggle.com/yasufuminakama/nbme-deberta-base-baseline-train
# https://www.kaggle.com/code/ammarnassanalhajali/nbme-fine-tuning-deberta-tensorflow

def prepare_location(locations: str) -> List[Tuple[int]]:
    """
    This function returns list of tuples of locations
    """
    location_tuple_list = []
    for location in locations:
        for loc in [s.split() for s in location.split(';')]:
            start, end = int(loc[0]), int(loc[1])
            location_tuple_list.append((start, end))
    
    return location_tuple_list

def prepare_input(pn_history: str, feature_text: str):
    """
    This function tokenizes pn_history and feature text and
    returns numpy array of input_ids and attention_masks
    """
    tokens = tokenizer(
        pn_history,
        feature_text,
        max_length=MAX_LEN,
        padding="max_length",
        add_special_tokens=True,
    )
    
    input_ids = tokens['input_ids']
    attention_mask = tokens["attention_mask"]
    return np.array(input_ids), np.array(attention_mask)
    
def prepare_labels(pn_history, annotation_length, location_list):
    """
    This function creates labels with are vectors of zeros (no entity)
    and ones (entity)
    """
    tokenized = tokenizer(
        pn_history,
        add_special_tokens=True,
        max_length=MAX_LEN,
        padding="max_length",
        return_offsets_mapping=True
    )
    offset_mapping = tokenized["offset_mapping"]
    #print(offset_mapping)
    #print(f'Len of offset: {len(offset_mapping)}')
    label = np.zeros(len(offset_mapping))
    if annotation_length != 0:
        locations = prepare_location(location_list)
        #print(locations)
        for location in locations:
            start_idx, end_idx = -1, -1
            start, end = location
            for idx in range(len(offset_mapping)):
                if (start_idx == -1) & (start < offset_mapping[idx][0]):
                    start_idx = idx - 1
                if (end_idx == -1) & (end <= offset_mapping[idx][1]):
                    end_idx = idx + 1
            if start_idx == -1:
                start_idx = end_idx
            if (start_idx != -1) & (end_idx != -1):
                label[start_idx:end_idx] = 1
            
    return np.array(label)

def create_data(dataframe: pd.DataFrame,train=True):
    pn_history = dataframe["pn_history"].values # stavi sve notes u array
    feature_text = dataframe["feature_text"].values # svi featuri u array
    if train:
        annotation_length = dataframe['annotation_length'].values # ak treniram uzmi i duzinu anotacije
        location = dataframe['location'].values # uzmi anotacije
    input_ids = []
    attention_mask = []
    labels = []

    for i in range(len(dataframe)):
        inputs, masks = prepare_input(pn_history[i], feature_text[i])
        input_ids.append(inputs)
        attention_mask.append(masks)
        if train:
            lbls = prepare_labels(pn_history[i], annotation_length[i], location[i])
            labels.append(lbls)
    return {"input_ids":input_ids,"attention_mask":attention_mask}, labels

# **Metrics**

In [None]:
class F1Score(tf.keras.metrics.Metric):
    def __init__(self, name='f1', **kwargs):
        super(F1Score, self).__init__(name=name, **kwargs)
        self.f1 = tfa.metrics.F1Score(num_classes=2, average='micro', threshold=0.50)

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.reshape(y_true, (-1,MAX_LEN))
        y_pred = tf.reshape(y_pred, (-1,MAX_LEN))
        self.f1.update_state(y_true, y_pred)
        
    def reset_state(self):
        self.f1.reset_state()
    
    def result(self):
        return self.f1.result()

class Precision(tf.keras.metrics.Metric):
    def __init__(self, name='precision', **kwargs):
        super(Precision, self).__init__(name=name, **kwargs)
        self.precision = tf.keras.metrics.Precision()

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.reshape(y_true, (-1, MAX_LEN))
        y_pred = tf.reshape(y_pred, (-1, MAX_LEN))
        self.precision.update_state(y_true, y_pred)
        
    def reset_state(self):
        self.precision.reset_state()
    
    def result(self):
        return self.precision.result()

class Recall(tf.keras.metrics.Metric):
    def __init__(self, name='recall', **kwargs):
        super(Recall, self).__init__(name=name, **kwargs)
        self.recall = tf.keras.metrics.Recall()

    def update_state(self, y_true, y_pred, sample_weight=None):
        y_true = tf.reshape(y_true, (-1, MAX_LEN))
        y_pred = tf.reshape(y_pred, (-1, MAX_LEN))
        self.recall.update_state(y_true, y_pred)
        
    def reset_state(self):
        self.recall.reset_state()
    
    def result(self):
        return self.recall.result()

# 1. RoBERTa-base


In [None]:
tokenizer = AutoTokenizer.from_pretrained('roberta-base')
config = AutoConfig.from_pretrained('roberta-base')

def create_model() -> tf.keras.Model:
    input_tokens = tf.keras.layers.Input(shape=(MAX_LEN,), dtype=tf.int32) # layer za 1d input duljine 512
    attention_mask = tf.keras.layers.Input(shape=(MAX_LEN,), dtype=tf.int32) # layer za 1d mask

    backbone = TFAutoModel.from_pretrained('roberta-base', config=config)

    out = backbone(input_tokens, attention_mask=attention_mask)[0]
    out = tf.keras.layers.Dropout(0.2)(out)
    out = tf.keras.layers.Dense(1, activation='sigmoid')(out)

    return tf.keras.Model(inputs=[input_tokens, attention_mask], outputs=out)

model = create_model()
model.load_weights('/content/drive/MyDrive/TAR - projekt/savings/roBERTa_tuned/tuned.h5')

Downloading (…)olve/main/merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

Downloading tf_model.h5:   0%|          | 0.00/657M [00:00<?, ?B/s]

Some layers from the model checkpoint at roberta-base were not used when initializing TFRobertaModel: ['lm_head']
- This IS expected if you are initializing TFRobertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFRobertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
All the layers of TFRobertaModel were initialized from the model checkpoint at roberta-base.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFRobertaModel for predictions without further training.


In [None]:
test_data, test_labels = create_data(test, train=True)

In [None]:
preds_test = model.predict((np.asarray(test_data['input_ids']),
                            np.asarray(test_data['attention_mask']),))

preds_test_resh = np.reshape((preds_test > 0.50).astype(int), (2145, 512))



In [None]:
f1 = tfa.metrics.F1Score(num_classes=2, average='micro', threshold=0.50)
f1.update_state(test_labels, preds_test_resh)
result_f1 = f1.result()

precision = tf.keras.metrics.Precision()
precision.update_state(test_labels, preds_test_resh)
result_prec = precision.result()

recall = tf.keras.metrics.Recall()
recall.update_state(test_labels, preds_test_resh)
result_recall = recall.result()

In [None]:
print(f"F1 score: {round(result_f1.numpy(), 4)}")
print(f"Precision score: {round(result_prec.numpy(), 4)}")
print(f"Recall score: {round(result_recall.numpy(), 4)}")

F1 score: 0.85589998960495
Precision score: 0.8493000268936157
Recall score: 0.8626999855041504


# 2. BioClinical-BERT

In [None]:
tokenizer = AutoTokenizer.from_pretrained("emilyalsentzer/Bio_ClinicalBERT")
config = AutoConfig.from_pretrained("emilyalsentzer/Bio_ClinicalBERT")

def create_model() -> tf.keras.Model:
    input_tokens = tf.keras.layers.Input(shape=(MAX_LEN,), dtype=tf.int32) # layer za 1d input duljine 512
    attention_mask = tf.keras.layers.Input(shape=(MAX_LEN,), dtype=tf.int32) # layer za 1d mask

    backbone = TFAutoModel.from_pretrained("emilyalsentzer/Bio_ClinicalBERT", config=config)

    out = backbone(input_tokens, attention_mask=attention_mask)[0]
    out = tf.keras.layers.Dropout(0.2)(out)
    out = tf.keras.layers.Dense(1, activation='sigmoid')(out)

    return tf.keras.Model(inputs=[input_tokens, attention_mask], outputs=out)

model = create_model()
model.load_weights('/content/drive/MyDrive/TAR - projekt/savings/BioClinicalBERT/tuned.h5')

Some layers from the model checkpoint at emilyalsentzer/Bio_ClinicalBERT were not used when initializing TFBertModel: ['nsp___cls', 'mlm___cls']
- This IS expected if you are initializing TFBertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFBertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
All the layers of TFBertModel were initialized from the model checkpoint at emilyalsentzer/Bio_ClinicalBERT.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFBertModel for predictions without further training.


In [None]:
test_data, test_labels = create_data(test, train=True)

In [None]:
preds_test = model.predict((np.asarray(test_data['input_ids']),
                            np.asarray(test_data['attention_mask']),))

preds_test_resh = np.reshape((preds_test > 0.50).astype(int), (2145, 512))



In [None]:
f1 = tfa.metrics.F1Score(num_classes=2, average='micro', threshold=0.50)
f1.update_state(test_labels, preds_test_resh)
result_f1 = f1.result()

precision = tf.keras.metrics.Precision()
precision.update_state(test_labels, preds_test_resh)
result_prec = precision.result()

recall = tf.keras.metrics.Recall()
recall.update_state(test_labels, preds_test_resh)
result_recall = recall.result()

In [None]:
print(f"F1 score: {round(result_f1.numpy(), 4)}")
print(f"Precision score: {round(result_prec.numpy(), 4)}")
print(f"Recall score: {round(result_recall.numpy(), 4)}")

F1 score: 0.8406999707221985
Precision score: 0.8629000186920166
Recall score: 0.8197000026702881



# 3. DeBERTa - not pretrained

In [None]:
tokenizer = AutoTokenizer.from_pretrained("microsoft/deberta-base")
config = AutoConfig.from_pretrained("microsoft/deberta-base")

def create_model() -> tf.keras.Model:
    input_tokens = tf.keras.layers.Input(shape=(MAX_LEN,), dtype=tf.int32) # layer za 1d input duljine 512
    attention_mask = tf.keras.layers.Input(shape=(MAX_LEN,), dtype=tf.int32) # layer za 1d mask

    backbone = TFAutoModel.from_pretrained("microsoft/deberta-base", config=config)

    out = backbone(input_tokens, attention_mask=attention_mask)[0]
    out = tf.keras.layers.Dropout(0.2)(out)
    out = tf.keras.layers.Dense(1, activation='sigmoid')(out)

    return tf.keras.Model(inputs=[input_tokens, attention_mask], outputs=out)

model = create_model()
model.load_weights('/content/drive/MyDrive/TAR - projekt/savings/deBERTA_noPretrained_remake/tuned.h5')

Downloading tf_model.h5:   0%|          | 0.00/555M [00:00<?, ?B/s]

All model checkpoint layers were used when initializing TFDebertaModel.

All the layers of TFDebertaModel were initialized from the model checkpoint at microsoft/deberta-base.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFDebertaModel for predictions without further training.


In [None]:
test_data, test_labels = create_data(test, train=True)

In [None]:
preds_test = model.predict((np.asarray(test_data['input_ids']),
                            np.asarray(test_data['attention_mask']),))

preds_test_resh = np.reshape((preds_test > 0.50).astype(int), (2145, 512))



In [None]:
f1 = tfa.metrics.F1Score(num_classes=2, average='micro', threshold=0.50)
f1.update_state(test_labels, preds_test_resh)
result_f1 = f1.result()

precision = tf.keras.metrics.Precision()
precision.update_state(test_labels, preds_test_resh)
result_prec = precision.result()

recall = tf.keras.metrics.Recall()
recall.update_state(test_labels, preds_test_resh)
result_recall = recall.result()

In [None]:
print(f"F1 score: {round(result_f1.numpy(), 4)}")
print(f"Precision score: {round(result_prec.numpy(), 4)}")
print(f"Recall score: {round(result_recall.numpy(), 4)}")

F1 score: 0.864799976348877
Precision score: 0.8741000294685364
Recall score: 0.8557999730110168


# 4. DeBERTa - pretrained model

In [None]:
tokenizer = AutoTokenizer.from_pretrained("microsoft/deberta-base")
config = AutoConfig.from_pretrained("/content/drive/MyDrive/TAR - projekt/savings/deberta-preTrain-converted")

def create_model() -> tf.keras.Model:
    input_tokens = tf.keras.layers.Input(shape=(MAX_LEN,), dtype=tf.int32) # layer za 1d input duljine 512
    attention_mask = tf.keras.layers.Input(shape=(MAX_LEN,), dtype=tf.int32) # layer za 1d mask

    backbone = TFAutoModel.from_pretrained("/content/drive/MyDrive/TAR - projekt/savings/deberta-preTrain-converted", config=config)

    out = backbone(input_tokens, attention_mask=attention_mask)[0]
    out = tf.keras.layers.Dropout(0.2)(out)
    out = tf.keras.layers.Dense(1, activation='sigmoid')(out)

    return tf.keras.Model(inputs=[input_tokens, attention_mask], outputs=out)

model = create_model()
model.load_weights('/content/drive/MyDrive/TAR - projekt/savings/deBERTA_final/tuned.h5')

Some layers from the model checkpoint at /content/drive/MyDrive/TAR - projekt/savings/deberta-preTrain-converted were not used when initializing TFDebertaModel: ['cls']
- This IS expected if you are initializing TFDebertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFDebertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
All the layers of TFDebertaModel were initialized from the model checkpoint at /content/drive/MyDrive/TAR - projekt/savings/deberta-preTrain-converted.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFDebertaModel for predictions without further training.


In [None]:
test_data, test_labels = create_data(test, train=True)

In [None]:
preds_test = model.predict((np.asarray(test_data['input_ids']),
                            np.asarray(test_data['attention_mask']),))

preds_test_resh = np.reshape((preds_test > 0.50).astype(int), (2145, 512))



In [None]:
f1 = tfa.metrics.F1Score(num_classes=2, average='micro', threshold=0.50)
f1.update_state(test_labels, preds_test_resh)
result_f1 = f1.result()

precision = tf.keras.metrics.Precision()
precision.update_state(test_labels, preds_test_resh)
result_prec = precision.result()

recall = tf.keras.metrics.Recall()
recall.update_state(test_labels, preds_test_resh)
result_recall = recall.result()

In [None]:
print("PreTrained DeBERTa model\n\n")
print(f"F1 score: {round(result_f1.numpy(), 4)}")
print(f"Precision score: {round(result_prec.numpy(), 4)}")
print(f"Recall score: {round(result_recall.numpy(), 4)}")

PreTrained DeBERTa model


F1 score: 0.8770999908447266
Precision score: 0.8574000000953674
Recall score: 0.8978000283241272
