In [None]:
!pip install transformers==4.10.1 --quiet

In [None]:
!pip install pytorch_lightning

In [None]:
!pip install "torchmetrics<0.7"

In [None]:
#testing gpu prescence
import tensorflow as tf
tf.test.gpu_device_name()

In [None]:
import torch
torch.cuda.empty_cache()

In [None]:

#imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import json
from tqdm.auto import tqdm

#pytorch
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizerFast as BertTokenizer, BertModel as BM, AdamW as Adam

#pytorch scheduler
from torch.optim.lr_scheduler import ExponentialLR

#pytorch lightning - lots of features like model checkpoints, logging, metrics, etc
import transformers
import pytorch_lightning as pl
#from torchmetrics import Accuracy
from torchmetrics.functional import auroc 
#going to create a confusion matrix
from sklearn.metrics import multilabel_confusion_matrix
from seqeval.metrics import classification_report as classification_report_seqeval

In [None]:
#IMP Params
EPOCHS = 4
BATCH_SIZE = 16
MAX_TOKEN_NUM = 256

In [None]:
#creating a training and testing dataset
labeledFile = open('news_blobs_labeled.json')
testingFile = open('news_blobs_test.json')
#loading data from file
labeledData = json.load(labeledFile)
testingData = json.load(testingFile)
labeledSize = len(labeledData)
testingSize = len(testingData)
print("Labeled data shape:", len(labeledData))
print("Testing data shape: ", len(testingData))


Labeled data shape: 738
Testing data shape:  81


In [None]:
#conversion into dataframes - training + validation
tdLabels = pd.DataFrame([labeledData[i]['labels'] for i in range(labeledSize)])
tdText= pd.DataFrame([{"text": labeledData[i]['text']} for i in range(labeledSize)])
totaldf = pd.concat([tdText, tdLabels], axis=1)
totaldf.head()

In [None]:
COLUMNS = totaldf.columns.tolist()[1:]
COLUMNS

In [None]:
#use of validation data will help with accuracy prediction 
from sklearn.model_selection import train_test_split
traindf, valdf = train_test_split(totaldf, test_size=0.1)
traindf.shape, valdf.shape

In [None]:
#what is the distribution of the labeled data given
samples = dict()
for col in COLUMNS:
  samples[col] = sum(totaldf[col])
samples

In [None]:
#we may want pre-process the training dataset to counter the imbalance 
other_df = traindf[traindf["OTHER"] == 1]
other_df.head()

In [None]:
#reduce the number of clinical trial alone ones
SELECT_COLUMNS = ['REGULATORY', 'COLLAB', "FINANCING", "PRESENTATION"]
clinic_df = traindf[traindf['CLINICAL_TRIAL'] == 1]
clinic_df = clinic_df[clinic_df[SELECT_COLUMNS ].sum(axis = 1) == 0]
clinic_df.head()

In [None]:
#training samples without the other label
non_other_df = traindf[traindf[SELECT_COLUMNS ].sum(axis = 1) > 0]
non_other_df.head(40)

In [None]:
#sample accordingly to avoid biased training data
print(other_df.shape, non_other_df.shape)
minSize = min(other_df.shape[0], non_other_df.shape[0])
traindf = pd.concat([non_other_df.sample(minSize), clinic_df.sample(50), other_df.sample(minSize)])

In [None]:
#removing the "other" column
if "OTHER" in traindf.columns:
  traindf = traindf.drop("OTHER", 1)
if "OTHER" in valdf.columns:
  valdf = valdf.drop("OTHER", 1)
traindf.head()

In [None]:
traindf.head(50)

In [None]:
#labels with "other"
if "OTHER" in COLUMNS:
    COLUMNS.remove("OTHER")

In [None]:
#conversion into dataframes - testing  
testdf = pd.DataFrame([{"text": testingData[i]['text']} for i in range(testingSize)])
testdf.head()

In [None]:
#initial look into the data
print('Research text sample:', traindf['text'].iloc[0])

In [None]:
#clean titles
for i in range(traindf.shape[0]):
  traindf['text'].iloc[i].strip().lower()
for i in range(valdf.shape[0]):
  valdf['text'].iloc[i].strip().lower()
for i in range(testingSize):
  testdf['text'].iloc[i].strip().lower()

In [None]:
#before processing data, create tokenizer instance
'''
BERT is a transformers model pretrained on a large corpus of English data in a self-supervised fashion.
'''
BERT_MODEL_NAME_0 = 'bert-base-cased'
BERT_MODEL_NAME = "emilyalsentzer/Bio_ClinicalBERT"
tokenizer = BertTokenizer.from_pretrained(BERT_MODEL_NAME)

In [None]:
#creating a pytorch dataset class
class MedicalDataset(Dataset):
  def __init__(self, data,  tokenizer):
    super().__init__()
    self.tokenizer = tokenizer
    self.data = data
    self.dataLen = len(self.data)

  def __len__(self):
    return self.dataLen

  def __getitem__(self, index: int):
    data_row = self.data.iloc[index]

    medical_text = data_row['text']
    labels = data_row[COLUMNS]

    #creating an encoding 
    encoding = self.tokenizer.encode_plus(
      medical_text,
      add_special_tokens=True,
      max_length=MAX_TOKEN_NUM,
      return_token_type_ids=False,
      padding="max_length",
      truncation=True,
      return_attention_mask=True,
      return_tensors='pt',
    )

    return dict(
      medical_text=medical_text,
      input_ids=encoding["input_ids"].flatten(),
      attention_mask=encoding["attention_mask"].flatten(),
      labels=torch.FloatTensor(labels)
    )

In [None]:
class MedicalTextDataModule(pl.LightningDataModule):
  #three datasets - training, testing, validation dataset
  def __init__(self, train_df, val_df, test_df, tokenizer):
    super().__init__()
    self.train_df = train_df
    self.test_df = test_df
    self.val_df = val_df
    self.tokenizer = tokenizer

  #datasets setup - training, validation, testing  
  def setup(self, stage=None):
    self.train_dataset = MedicalDataset(
      self.train_df,
      self.tokenizer
    )
    self.val_dataset = MedicalDataset(
        self.val_df, 
        self.tokenizer)
    
    self.test_dataset = MedicalDataset(
      self.test_df,
      self.tokenizer
    )

  #The num_workers attribute tells the data loader instance how many sub-processes to use for data loading.
  def train_dataloader(self):
    return DataLoader(
      self.train_dataset,
      batch_size= BATCH_SIZE,
      shuffle=True,
      num_workers=2)
    
  def val_dataloader(self):
    return DataLoader(
      self.val_dataset,
      batch_size= BATCH_SIZE,
      shuffle=True,
      num_workers=2)

  def test_dataloader(self):
    return DataLoader(
      self.test_dataset,
      batch_size=BATCH_SIZE,
      num_workers=2)

In [None]:
#load a pre-trained BERT model - transfer learning
bert_model = BM.from_pretrained(BERT_MODEL_NAME, return_dict=True)

In [None]:
#create a data module with all datasets in place
med_data_module = MedicalTextDataModule(traindf, valdf, valdf, tokenizer)

In [None]:
class MedicalTextLabelClassifier(pl.LightningModule):

  def __init__(self, bert_model, n_classes, criterion, n_training_steps=None):
    super().__init__()
    #pre-trained bert_model
    self.bert = bert_model
    #classifier linear model
    self.classifier = nn.Linear(self.bert.config.hidden_size, n_classes)
    #how many times step is called
    self.n_training_steps = n_training_steps
    self.criterion = criterion

  #the forward function enables us to define how the model goes from input to output
  def forward(self, input_ids, attention_mask, labels=None):
    output = self.bert(input_ids, attention_mask=attention_mask)
    #last layer to a classifier output
    output = self.classifier(output.pooler_output)
    #our activation function
    output = torch.sigmoid(output)    
    loss = 0
    if labels is not None:
        loss = self.criterion(output, labels)
    return loss, output

  def training_step(self, batch, batch_idx):
    input_ids = batch["input_ids"]
    #attention masks - binary masks to hide the padded indices
    attention_mask = batch["attention_mask"]
    labels = batch["labels"]
    loss, outputs = self(input_ids, attention_mask, labels)
    if batch_idx % 5000:
      print({"loss": loss})
    return {"loss": loss, "predictions": outputs, "labels": labels}

  def validation_step(self, batch, batch_idx):
    input_ids = batch["input_ids"]
    attention_mask = batch["attention_mask"]
    labels = batch["labels"]
    loss, outputs = self(input_ids, attention_mask, labels)
    return loss

  def test_step(self, batch, batch_idx):
    input_ids = batch["input_ids"]
    attention_mask = batch["attention_mask"]
    labels = batch["labels"]
    loss, outputs = self(input_ids, attention_mask, labels)
    return loss

  def training_epoch_end(self, outputs):
    #collecting the labels and predictions from the output
    labels = []
    predictions = []
    for output in outputs:
      for output_labels in output["labels"].detach().cpu():
        labels.append(output_labels)
      for output_predictions in output["predictions"].detach().cpu():
        predictions.append(output_predictions)

    labels = torch.stack(labels).int()
    predictions = torch.stack(predictions)


  def configure_optimizers(self):

    optimizer = Adam(self.parameters(), lr=1e-5)

    #the scheduler enables us to adjust the learning rate
    scheduler = ExponentialLR(optimizer, gamma=0.9)

    return dict(
      optimizer=optimizer,
      lr_scheduler=dict(
        scheduler=scheduler,
        interval='step'
      )
    )

In [None]:
#criterion - measuring the Binary Cross Entropy between the target and the input probabilities
criterion = nn.BCELoss()

In [None]:
#training
#calculating number of train steps
total_training_steps = BATCH_SIZE * EPOCHS

model = MedicalTextLabelClassifier(
    bert_model,
  n_classes=len(COLUMNS),
  criterion = criterion,
  n_training_steps=total_training_steps 
)

#lightning module training configs
trainer = pl.Trainer(
  logger= None,
  checkpoint_callback= None,
  callbacks= None,
  gpus=1,
  max_epochs=EPOCHS
)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs


In [None]:
#clear memory
import torch, gc

gc.collect()
torch.cuda.empty_cache()

In [None]:
#fitting 
trainer.fit(model, med_data_module)

In [None]:
#evaluation mode
model.eval()
model.freeze()

In [None]:
device = torch.device('cpu')
trained_model = model.to(device)
#evaluation
val_dataset = MedicalDataset(valdf, tokenizer)

predictions = []
labels = []

#wrap the iterable with tqdm
for item in tqdm(val_dataset):
  _, prediction = trained_model(
    item["input_ids"].unsqueeze(dim=0).to(device), 
    item["attention_mask"].unsqueeze(dim=0).to(device)
  )
  predictions.append(prediction.flatten())
  labels.append(item["labels"].int())

predictions = torch.stack(predictions).detach().cpu()
labels = torch.stack(labels).detach().cpu()

  0%|          | 0/74 [00:00<?, ?it/s]

In [None]:
CUTOFF = 0.5
#fscore, precision, recall, etc
y_pred = predictions.numpy()

#true labels, vs predictions - assigned an upper and lower bound
y_true = labels.numpy()
y_pred = np.where(y_pred >= CUTOFF, 1, 0)


In [None]:
#confusion matrices
cm = multilabel_confusion_matrix(y_true, y_pred)
cm

In [None]:
#accuracy
accDict = dict()
for i, label in enumerate(COLUMNS):
  row = cm[i]
  tp = row[0][0]
  tn = row[1][1]
  fp = row[0][1]
  fn = row[1][0]
  accDict[label] = ((tp+tn)/(tp+tn+fp+fn))
accDict
  

In [None]:
print("AUROC per label")
for i, name in enumerate(COLUMNS):
  tag_auroc = auroc(predictions[:, i], labels[:, i], pos_label=1)
  print(f"{name}: {tag_auroc}")

In [None]:
def create_test_encodings(text):
  encoding = tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=MAX_TOKEN_NUM,
        return_token_type_ids=False,
        padding="max_length",
        return_attention_mask=True,
        return_tensors='pt',
      )
  return encoding

In [None]:
#testing 
def run_tests(path):
    results = []
    resultsFilePath = path
    for i in range(testingSize):
      #extract the text of the test news blobs
      test_text = testdf.iloc[i]['text']
      #create an encoding
      encoding = create_test_encodings(test_text)
      #use our trained model
      _, test_prediction = trained_model(encoding["input_ids"], encoding["attention_mask"])
      
      test_prediction = test_prediction.flatten().numpy()
      predDict = {}
      for label, prediction in zip(COLUMNS, test_prediction):
        if prediction < CUTOFF - 0.1:
          predDict[label] = 0
        else:
          
          predDict[label] = 1

      sumVar = 0 
      for label in predDict:
          sumVar += predDict[label]
      if sumVar == 0:
        predDict["OTHER"] = 1
      else:
        predDict["OTHER"] = 0
      predDict['text'] = test_text
      results.append(predDict)

    resultsDF = pd.DataFrame(results)  
    resultsDF.to_csv(path, index=False)
    return 
    

In [None]:
run_tests("text_classification.csv")