<a href="https://colab.research.google.com/github/stellaevat/ontology-mapping/blob/main/colabs/custom_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pronto transformers[torch] datasets evaluate \
&& pip install accelerate -U \
&& wget -O doid.obo https://gla-my.sharepoint.com/:u:/g/personal/2526934t_student_gla_ac_uk/EfUC_RdrfZdOsOrtmNATjuoBPDaIkSTUMyxJXyO2KKC6yw?download=1 \
&& wget -O ncit.obo https://gla-my.sharepoint.com/:u:/g/personal/2526934t_student_gla_ac_uk/ETmaJIC0fAlItdsp8WQxS_wBzKN_6x08EZrtsOxVnbzvSg?download=1

In [2]:
import os
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

In [3]:
import random
import pronto
import evaluate
import torch
import torch.nn.functional as F
import numpy as np
from tqdm import tqdm
from sklearn.model_selection import train_test_split
from datasets import Dataset, DatasetDict
from transformers import RobertaModel, AutoTokenizer, AutoModel, BioGptTokenizer, BioGptModel, AutoModelForSequenceClassification, DataCollatorWithPadding, TrainingArguments, Trainer
from transformers.modeling_outputs import SequenceClassifierOutput
from scipy.special import softmax

In [4]:
ncit = pronto.Ontology("ncit.obo")
doid = pronto.Ontology("doid.obo")

In [5]:
# Get subsumptions from CSV file to a dictionary

def get_mappings_from_file(filename):
  mappings = {}
  with open(filename) as f:
    for line in f:
      source_id, target_id = line.strip().split(',')
      mappings[source_id] = target_id
  return mappings

In [6]:
equiv_doid2ncit = get_mappings_from_file("equiv_doid2ncit.csv")
equiv_ncit2doid = get_mappings_from_file("equiv_ncit2doid.csv")
subs_doid2ncit = get_mappings_from_file("subs_doid2ncit.csv")
subs_ncit2doid = get_mappings_from_file("subs_ncit2doid.csv")
neg_subs_doid2ncit = get_mappings_from_file("neg_subs_doid2ncit.csv")
neg_subs_ncit2doid = get_mappings_from_file("neg_subs_ncit2doid.csv")

# Convert relations to sentences

> Currently considering parents, children & siblings for conceptual reasons, but could also take 'n-hop' appraoch, e.g. 1-hop only with parents and children, or 2-hop to include grandparents, grandchildren and siblings.

> How do I incorporate the desired mapping for training? Should I incorporate both all this AND target info, or too much? Could be SELF + desired relatives instead, or SELF + PARENT + DESIRED PARENT, etc.

In [7]:
entity_markers = ["[SUB]", "[/SUB]", "[SUP]", "[/SUP]"]
sep_token = "[SEP]"
cls_token = "[CLS]"

In [8]:
# Create sentence from the given entity, containing its direct parents & siblings

def get_sentence(entity_id, onto):
  sub_in, sub_out, sup_in, sup_out = entity_markers

  subsumer = onto.get_term(entity_id)
  supersumers = list(subsumer.superclasses(distance=1, with_self=False))

  sentence = [sub_in, subsumer.name, sub_out]
  for supersumer in supersumers:
    sentence.extend([sup_in, supersumer.name, sup_out])

  return "".join(sentence)

In [9]:
# Create sentence from the given source entity, containing its mapping's parent & siblings

def get_combined_sentence(source_id, target_id, source_onto, target_onto):
  sub_in, sub_out, sup_in, sup_out = entity_markers

  if source_id not in source_onto.terms() or target_id not in target_onto.terms():
    return

  subsumer = source_onto.get_term(source_id)
  equivalent = target_onto.get_term(target_id)
  parents = list(equivalent.superclasses(distance=1, with_self=False))

  if len(parents) != 1:
    return
  supersumer = parents[0]

  #sentence = "".join([sub_in, subsumer.name, sub_out, sup_in, supersumer.name, sup_out])
  sentence = (sep_token + cls_token).join([subsumer.name, supersumer.name])
  return sentence

In [10]:
source_id = "DOID:0014667"
tokenizer = AutoTokenizer.from_pretrained("microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract")
# tokenizer.add_tokens(entity_markers)

sentence = get_combined_sentence(source_id, subs_doid2ncit[source_id], doid, ncit)
tokenized = tokenizer(sentence)

print(sentence)
tokenizer.convert_ids_to_tokens(tokenized['input_ids'])

disease of metabolism[SEP][CLS]Non-Neoplastic Disorder


['[CLS]',
 'disease',
 'of',
 'metabolism',
 '[SEP]',
 '[CLS]',
 'non',
 '-',
 'neoplastic',
 'disorder',
 '[SEP]']

# Train end-to-end BERT model

In [11]:
def generate_labelled_samples(subs, negsubs, source_onto, target_onto):
  random.seed(3)
  samples = []
  labels = []

  pairs = list(subs.items()) + list(negsubs.items())
  zeros_and_ones = [0] * len(negsubs) + [1] * len(subs)
  labelled_pairs = list(zip(zeros_and_ones, pairs))
  random.shuffle(labelled_pairs)

  for label, (source_id, target_id) in tqdm(labelled_pairs):
    sample_sentence = get_combined_sentence(source_id, target_id, source_onto, target_onto)
    if sample_sentence:
      samples.append(sample_sentence)
      labels.append(label)

  print()
  return samples, labels

In [12]:
samples, labels = generate_labelled_samples(subs_doid2ncit, neg_subs_doid2ncit, doid, ncit)
print("Samples: ", len(samples))
print("Labels: ", len(labels))

100%|██████████| 3766/3766 [00:00<00:00, 17238.94it/s]


Samples:  2775
Labels:  2775





In [13]:
X = np.array(samples)
y = np.array(labels)
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.2, random_state=3)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.25, random_state=3)

In [14]:
id2label = {0:"not_subsumption", 1:"subsumption"}
label2id = {label:ind for (ind, label) in id2label.items()}

dataset_train = Dataset.from_dict({'sample':X_train, 'label':y_train})
dataset_val = Dataset.from_dict({'sample':X_val, 'label':y_val})
dataset_test = Dataset.from_dict({'sample':X_test, 'label':y_test})
dataset = DatasetDict({'train':dataset_train,'val':dataset_val,'test':dataset_test})

In [15]:
def compute_metrics(eval_pred):
  accuracy = evaluate.load("accuracy")
  macro_metrics = [evaluate.load("precision"), evaluate.load('recall'), evaluate.load('f1')]

  predictions, labels = eval_pred
  if predictions.shape[1] == 1:
    predictions = np.where(predictions.squeeze() >= 0, 1, 0)
  else:
    predictions = np.argmax(predictions, axis=1)

  metric_dict = accuracy.compute(predictions=predictions, references=labels)
  for metric in macro_metrics:
    metric_dict.update(metric.compute(predictions=predictions, references=labels, average='macro'))
  return metric_dict

In [16]:
class BiEncoderModel(torch.nn.Module):
  def __init__(self, model_name, num_labels, id2label=None, label2id=None, token_embeddings_size=None, hidden_layer=-1):
    super().__init__()
    self.base_model = AutoModel.from_pretrained(model_name)
    if token_embeddings_size:
      self.base_model.resize_token_embeddings(token_embeddings_size)
    self.base_model.config.pad_token_id = tokenizer.pad_token_id

    self.num_labels = num_labels
    self.hidden_layer = hidden_layer

    self.linear = torch.nn.Linear(768, num_labels)
    self.dropout = torch.nn.Dropout(0.1)

  def forward(self, input_ids=None, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, labels=None):

    outputs = self.base_model(
      input_ids,
      attention_mask=attention_mask,
      token_type_ids=token_type_ids,
      position_ids=position_ids,
      head_mask=head_mask,
      inputs_embeds=inputs_embeds,
    )

    pooled_output = outputs[1]
    pooled_output = self.dropout(pooled_output)
    logits = self.linear(pooled_output)

    if labels is not None:
      if self.num_labels == 1:
        loss_fct = torch.nn.BCEWithLogitsLoss()
        loss = loss_fct(logits.view(-1), labels.view(-1).float())
      else:
        loss_fct = torch.nn.CrossEntropyLoss()
        loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

    return SequenceClassifierOutput(loss=loss, logits=logits)

In [17]:
def prepare_model(dataset, id2label, label2id, entity_markers=entity_markers, learning_rate=1e-4, epochs=5, batch_size=64):
  pretrained = "microsoft/BiomedNLP-PubMedBERT-base-uncased-abstract"

  tokenizer = AutoTokenizer.from_pretrained(pretrained)
  # tokenizer.add_tokens(entity_markers)
  tokenizer.add_special_tokens({'pad_token': '[PAD]'})

  preprocess_tokenize = lambda examples: tokenizer(examples["sample"], padding="longest")
  tokenized_data = dataset.map(preprocess_tokenize, batched=True)
  data_collator = DataCollatorWithPadding(tokenizer=tokenizer, return_tensors="pt")
  model = BiEncoderModel(pretrained, num_labels=1, id2label=id2label, label2id=label2id, token_embeddings_size=len(tokenizer))

  training_args = TrainingArguments(
    output_dir="testing",
    evaluation_strategy="epoch",
    learning_rate=learning_rate,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    num_train_epochs=epochs,
    logging_steps=1
  )

  trainer = Trainer(
      model=model,
      args=training_args,
      train_dataset=tokenized_data['train'],
      eval_dataset=tokenized_data['val'],
      tokenizer=tokenizer,
      data_collator=data_collator,
      compute_metrics=compute_metrics,
  )

  return trainer, tokenized_data

In [18]:
trainer, tokenized_data = prepare_model(dataset, id2label, label2id)
trainer.train()

Map:   0%|          | 0/1665 [00:00<?, ? examples/s]

Map:   0%|          | 0/555 [00:00<?, ? examples/s]

Map:   0%|          | 0/555 [00:00<?, ? examples/s]

You're using a BertTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


Epoch,Training Loss,Validation Loss,Accuracy,Precision,Recall,F1
1,0.7852,0.14398,0.962162,0.968436,0.952475,0.959349
2,0.018,0.135625,0.967568,0.961348,0.972799,0.966165
3,0.0029,0.083461,0.978378,0.976368,0.978029,0.977184
4,0.0025,0.098864,0.981982,0.979433,0.982724,0.981019
5,0.0026,0.104971,0.976577,0.974847,0.975682,0.975261


TrainOutput(global_step=135, training_loss=0.10684824447423495, metrics={'train_runtime': 85.7427, 'train_samples_per_second': 97.093, 'train_steps_per_second': 1.574, 'total_flos': 0.0, 'train_loss': 0.10684824447423495, 'epoch': 5.0})

In [19]:
predictions, label_ids, metrics = trainer.predict(tokenized_data['val'])
labels_predicted = [id2label[prediction] for prediction in np.argmax(predictions, axis=1)]
for metric in metrics.items():
  print(metric)

('test_loss', 0.10497114807367325)
('test_accuracy', 0.9765765765765766)
('test_precision', 0.9748472058541399)
('test_recall', 0.9756815748290915)
('test_f1', 0.9752606800827044)
('test_runtime', 4.814)
('test_samples_per_second', 115.289)
('test_steps_per_second', 1.87)
