### Installing packages

In [None]:
!pip install lightning transformers sentencepiece protobuf evaluate seqeval tqdm

### Importing packages

In [2]:
import re
import json
import torch
from tqdm import tqdm
import evaluate
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl
from transformers import AdamW, MT5ForConditionalGeneration, AutoModelForSeq2SeqLM, AutoModel, AutoTokenizer
from sklearn.model_selection import train_test_split
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning import Trainer
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score

### Pytorch dataset class
Pytorch dataset class to manage and format our custom dataset

In [3]:
class NERDataset(Dataset):
  def __init__(self, data_json, tokenizer, max_len=512):
    self.data_json = data_json
    self.tokenizer = tokenizer
    self.max_len = max_len
    self.tokenizer.max_length = max_len
    self.tokenizer.model_max_length = max_len

  def __len__(self):
    return len(self.data_json)

  def __getitem__(self, index):
    #Extracting the paper title
    text = self.data_json[index]['title']

    #Label is formatted as: [Tag:Value;Tag:Value...]
    label = ';'.join([':'.join([annotation['label'], annotation['text']]) for annotation in self.data_json[index]['annotations']])

    #Tokenizing the title with model's predefined tokenizer
    source_encoding = self.tokenizer(text, max_length=self.max_len, padding="max_length", truncation=True,
                                      return_attention_mask=True, add_special_tokens=True, return_tensors="pt")

    #Tokenizing the formatted label with model's predefined tokenizer
    target_encoding = self.tokenizer(label, max_length=self.max_len, padding="max_length", truncation=True,
                                return_attention_mask=True, add_special_tokens=True, return_tensors="pt")

    #Replacing padded 0s with -100 so that pytorch can ignore while training
    label_encoding = target_encoding['input_ids']
    label_encoding[label_encoding == 0] = -100

    return dict(
        text = text,
        label = label,
        input_ids = source_encoding['input_ids'].flatten(),
        attention_mask = source_encoding['attention_mask'].flatten(),
        label_encoding = label_encoding.flatten()
    )

In [4]:
class NERDatasetGPT(Dataset):
  def __init__(self, data_json, tokenizer, max_len=512):
    self.data_json = data_json
    self.tokenizer = tokenizer
    self.max_len = max_len
    self.tokenizer.max_length = max_len
    self.tokenizer.model_max_length = max_len

  def __len__(self):
    return len(self.data_json)

  def __getitem__(self, index):
    #Extracting the paper title
    text_group = re.search('```(.+?)```', self.data_json[index]['prompt'])
    text = text_group.group(1) if text_group else ''

    #Label is formatted as: [Tag:Value;Tag:Value...]
    label = re.sub(': ', ':', self.data_json[index]['completion'].strip(" #{}'"))
    label = re.sub("'", '', label)
    label = ';'.join([annot.strip() for annot in label.split(',')])

    #Tokenizing the title with model's predefined tokenizer
    source_encoding = self.tokenizer(text, max_length=self.max_len, padding="max_length", truncation=True,
                                      return_attention_mask=True, add_special_tokens=True, return_tensors="pt")

    #Tokenizing the formatted label with model's predefined tokenizer
    target_encoding = self.tokenizer(label, max_length=self.max_len, padding="max_length", truncation=True,
                                return_attention_mask=True, add_special_tokens=True, return_tensors="pt")

    #Replacing padded 0s with -100 so that pytorch can ignore while training
    label_encoding = target_encoding['input_ids']
    label_encoding[label_encoding == 0] = -100

    return dict(
        text = text,
        label = label,
        input_ids = source_encoding['input_ids'].flatten(),
        attention_mask = source_encoding['attention_mask'].flatten(),
        label_encoding = label_encoding.flatten()
    )

### Pytorch Lightning Data Module
A class that encapsulates all the steps needed to process data

In [5]:
class NERDataModule(pl.LightningDataModule):
  def __init__(self, batch_size, train_dataset, test_dataset, num_workers = 2):
    super().__init__()
    self.batch_size = batch_size
    self.train_dataset = train_dataset
    self.test_dataset = test_dataset
    self.prepare_data_per_node = True
    self.num_workers = num_workers

  def train_dataloader(self):
    return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=self.num_workers)

  def val_dataloader(self):
    return DataLoader(self.test_dataset, batch_size=1, num_workers=self.num_workers)

  def test_dataloader(self):
    return DataLoader(self.test_dataset, batch_size=1, num_workers=self.num_workers)

### Pytorch Lightning Module
A pytorch lightning encapsulation to manage, train and test our models

In [6]:
class NERModel(pl.LightningModule):
  def __init__(self, model, model_name, learning_rate):
    super().__init__()
    self.model = model.from_pretrained(model_name)
    self.learning_rate = learning_rate

  def forward(self, input_ids, attention_mask, labels):
    return self.model(input_ids = input_ids, attention_mask = attention_mask, labels = labels)

  def training_step(self, batch, batch_idx):
    input_ids = batch['input_ids']
    attention_mask = batch['attention_mask']
    label_encoding = batch['label_encoding']
    loss = self(input_ids, attention_mask, label_encoding).loss
    self.log('Training Loss', loss, prog_bar=True, logger=True)
    return loss

  def validation_step(self, batch, batch_idx):
    input_ids = batch['input_ids']
    attention_mask = batch['attention_mask']
    label_encoding = batch['label_encoding']
    loss = self(input_ids, attention_mask, label_encoding).loss
    self.log('Validation Loss', loss, prog_bar=True, logger=True)
    return loss

  def test_step(self, batch, batch_idx):
    input_ids = batch['input_ids']
    attention_mask = batch['attention_mask']
    label_encoding = batch['label_encoding']
    loss = self(input_ids, attention_mask, label_encoding).loss
    self.log('Test Loss', loss, prog_bar=True, logger=True)
    return loss

  def configure_optimizers(self):
    return AdamW(self.parameters(), lr=self.learning_rate)

### Function to train the model of given type

In [7]:
def train_model(train_dataset, test_dataset, params):
  #Initializing model specific tokenizer
  tokenizer = AutoTokenizer.from_pretrained(params['model_name'])

  #Initializing the train and test dataset
  NER_train = NERDatasetGPT(train_dataset, tokenizer) if params['gpt_train_data'] else NERDataset(train_dataset, tokenizer)
  NER_test = NERDataset(test_dataset, tokenizer)

  #Encapsulating the train and test dataset with data module
  NER_datamodule = NERDataModule(params['batch_size'], NER_train, NER_test, params['num_workers'])

  #Initializing the NER model of specified type with parameters
  model = NERModel(params['model'], params['model_name'], params['learning_rate'])

  #Initializing the pytorch lightning trainer object to train our NER model
  trainer = Trainer(max_epochs=params['epochs'], enable_progress_bar=True, accumulate_grad_batches=params['accumulate_grad_batches'])

  #Training the model on the dataset
  trainer.fit(model, datamodule=NER_datamodule)

  #returning the trained NER model
  return model

### Function to convert annotations in NER's IOB format

In [8]:
def generate_NER_label(input_text, target):
  #Splitting the input title on spaces
  input_text = re.sub(r'[:“”]', '', input_text)
  input_text = input_text.split(' ')

  #Initializing a list with 'O'
  target_label = ['O']* len(input_text)

  if ':' not in target or ';' not in target:
    return target_label

  #Splitting the label into multiple annotations
  target = target.split(';')

  #Iterating over all the annotations
  for tar in target:
    #Splitting the label into annotation type and value
    annotation = tar.split(':')

    #Checking if both tag and value are present
    if len(annotation) < 2:
        continue

    #Saving the annotation type
    suffix = annotation[0]
    IOB_prefix = 'B-'

    #Iterating over all words in the annotation value
    for word in annotation[1].strip().split(' '):
      try:
        #Replacing the IOB value at the word index
        target_label[input_text.index(word)] = IOB_prefix + suffix
        IOB_prefix = 'I-'
      except:
        continue

  return target_label

### Function to evaluate the model on the test dataset

In [9]:
def compute_metric(model, dataset, params):
  #Putting the model in the evaluation mode
  model.model.eval()

  #Initializing model specific tokenizer
  tokenizer = AutoTokenizer.from_pretrained(params['model_name'])

  #Initializng lists to store true labels and predictions
  true_labels = []
  predictions = []

  for sample in tqdm(dataset):
    #Tokenizing the title with model's predefined tokenizer
    source_encoding = tokenizer(sample['title'], max_length=params['max_length'], padding="max_length", truncation=True,
                                return_attention_mask=True, add_special_tokens=True, return_tensors="pt")
    #Generating the model's prediction on the input title
    pred_ids = model.model.generate(input_ids = source_encoding['input_ids'], attention_mask = source_encoding['attention_mask'],
                                    max_length = params['max_length'])

    #Storing the true labels and model's prediction
    true_labels.append(generate_NER_label(sample['title'], ';'.join([':'.join([annotation['label'], annotation['text']]) for annotation in sample['annotations']])))
    predictions.append(generate_NER_label(sample['title'], [tokenizer.decode(pred_id, skip_special_tokens=True, clean_up_tokenization_spaces=True) for pred_id in pred_ids][0]))

  #Evaluating the predictions
  seqeval = evaluate.load('seqeval')
  seqeval_val = seqeval.compute(predictions= predictions, references=true_labels)
  return true_labels, predictions, seqeval_val

### Model's training parameters

In [10]:
params = dict(
    model = MT5ForConditionalGeneration,
    model_name = 'google/mt5-small',
    test_size = 0.1,
    batch_size = 2,
    learning_rate = 3e-4,
    epochs = 5,
    accumulate_grad_batches = 4,
    max_length = 512,
    num_workers = 2,
    gpt_train_data = True
)

### Reading the dataset and doing train test splits

#### Reading the file with complete astrophysics dataset

In [11]:
with open('./astrophysics_entity_dataset.json', 'r') as json_data:
    dataset = json.load(json_data)
train_dataset, test_dataset = train_test_split(dataset, test_size=params['test_size'])

#### Reading files containing train and test data used to for GPT finetuning

In [13]:
with open('./gpt3_finetuning_data.json', 'r') as gpt3_json_data:
  train_dataset = json.load(gpt3_json_data)
with open('./1500titles_human-annotators.json', 'r') as ha_json_data:
  test_dataset = json.load(ha_json_data)

In [41]:
test_ids = [td['id'] for td in test_dataset]
train_dataset = [annotation for annotation in dataset if annotation['id'] not in test_ids]

### MT5 NER model

In [None]:
mt5_model = train_model(train_dataset, test_dataset, params)

In [None]:
mt5_true, mt5_pred, mt5_metric_result = compute_metric(mt5_model, test_dataset, params)
mt5_metric_result

### FLAN-T5 NER model

In [12]:
params['model'] = AutoModelForSeq2SeqLM
params['model_name'] = 'google/flan-t5-small'

In [None]:
flant5_model = train_model(train_dataset, test_dataset, params)

In [None]:
flant5_true, flant5_pred, flant5_metric_result = compute_metric(flant5_model, test_dataset, params)
flant5_metric_result