## Purpose: To train/evaluate a custom assertion model for cancer_imaging_findings entity
entity pairs used:
- probability high - cancer imaging findings
- probability medium - cancer imaging findings
- probability low - cancer imaging findings
- probability uncertain - cancer imaging findings

In [4]:
# uncomment to run to create the subfolders, for the first time
#!mkdir tf_graphs saved_models assertion_output assertion_result inference

### Note: Before running this notebook, please configure the following paths

In [None]:
# we are using sparknlp clinical embedding word model
# specify your folder containing the downloaded clinical embedding word model file, or you can use .pretrained during training instead to load it online
embeddings_clinical_local_path = r"path\to\sparknlp_pretrained\embeddings_clinical_en_2.4.0_2.4_1580237286004"

In [None]:
# specify your sparknlp online license key-need internet connection
# we are using v3.4.2
sparknlp_licence_key = r"..\sparknlp_licence_key\yourkey.json"

# specify your sparknlp offline license key-airgap env
# we are using v3.4.2
sparknlp_airgap_licence_key = r"..\sparknlp_licence_key\yourairgapkey.json"

In [None]:
## configure folder path
data_folder = "dataset"
train_folder = data_folder+"\\02csv"
dataset_name = "train4522"

## Import Libraries

Note: Requires Spark NLP and Spark NLP for Healthcare (licensed version) packages to be installed

In [None]:
import json, os, re, sparknlp, sparknlp_jsl, datetime, time
import pandas as pd
import numpy as np

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.training import CoNLL
from sparknlp_jsl.annotator import *
from sparknlp_jsl.training import tf_graph
from sparknlp_display import AssertionVisualizer, NerVisualizer 

from sklearn.metrics import classification_report, accuracy_score, confusion_matrix, f1_score
from sklearn.model_selection import train_test_split

In [None]:
os.environ['PYSPARK_PYTHON'] = 'python'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
print(os.environ['PYSPARK_PYTHON'])
print(os.environ['PYSPARK_DRIVER_PYTHON'])

Note: Requires Spark NLP for Healthcare (licensed version) license key

### Start Spark Session (Offline)

In [None]:
# Offline-Load airgap license key
with open(sparknlp_airgap_licence_key) as f:
    airgap_license_keys = json.load(f)
    
# Defining license key-value pairs as local variables
locals().update(airgap_license_keys)
os.environ.update(airgap_license_keys)

# check variable
!echo $SECRET
!echo $JSL_VERSION
!echo $PUBLIC_VERSION

os.environ['PYSPARK_PYTHON'] = 'python'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
print(os.environ['PYSPARK_PYTHON'])
print(os.environ['PYSPARK_DRIVER_PYTHON'])

# Start Spark Session with Custom Params (OFFLINE)
def start(SECRET):
    builder = SparkSession.builder \
        .appName("Spark NLP Licensed radio_assertion") \
        .master("local[16]") \
        .config("spark.driver.memory", "16G") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "2000M") \
        .config("spark.driver.maxResultSize","4000M") \
        .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.4.2") \
        .config("spark.jars", f"d:\content\spark-nlp-jsl-{JSL_VERSION}.jar, d:\content\spark-nlp_2.12-3.4.2.jar" )

    return builder.getOrCreate()


print("Spark NLP Version :", sparknlp.version())
print("Spark NLP_JSL Version :", sparknlp_jsl.version())

spark = start(SECRET) 

spark

### Start Spark Session (Online)

## Import train csv

In [None]:
### Import train csv
processed_df = pd.read_csv(os.path.join(train_folder,"assertion_traintest.csv"))

In [None]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
#Create User defined Custom Schema using StructType
dfSchema = StructType([StructField("text", StringType(), True)\
                        ,StructField("target", StringType(), True)\
                        ,StructField("label", StringType(), True)\
                        ,StructField("start", IntegerType(), True)\
                        ,StructField("end", IntegerType(), True)\
                        ,StructField("doc_title", StringType(), True)\
                        ,StructField("dataset", StringType(), True)])

In [None]:
assertion_df=spark.createDataFrame(processed_df, schema=dfSchema) 
assertion_df.count()

In [None]:
assertion_df.show(3, truncate=100)

In [None]:
training_data = assertion_df.where("dataset='train'")
test_data = assertion_df.where("dataset='test'")

#===================================================
print("total row count:",assertion_df.count())
print("Training Dataset Count: " + str(training_data.count()))
print("Test Dataset Count: " + str(test_data.count()))

trainset_count = training_data.groupby('label').count().collect()
testset_count = test_data.groupby('label').count().collect()

In [None]:
training_data.groupBy('label').count().orderBy('count', ascending=False).show(truncate=False)

In [None]:
test_data.groupBy('label').count().orderBy('count', ascending=False).show(truncate=False)

## ------------------- START OF TRAINING --------------------

## Training Pipeline

In [None]:
document = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

chunk = Doc2Chunk()\
    .setInputCols("document")\
    .setOutputCol("chunk")\
    .setChunkCol("target")\
    .setStartCol("start")\
    .setStartColByTokenIndex(False)\
    .setFailOnMissing(False)\
    .setLowerCase(True)

token = Tokenizer()\
    .setInputCols(['document'])\
    .setOutputCol('token')

clinical_embeddings = WordEmbeddingsModel.load(embeddings_clinical_local_path)\
        .setInputCols(["document", "token"])\
        .setOutputCol("embeddings")

clinical_assertion_pipeline = Pipeline(
    stages = [
    document,
    chunk,
    token,
    clinical_embeddings])

In [None]:
assertion_test_data = clinical_assertion_pipeline.fit(test_data).transform(test_data)

In [None]:
assertion_test_data.columns

In [None]:
assertion_test_data.count()

In [None]:
#assertion_test_data.head()

In [None]:
testfile = 'assertion_test_data.parquet'
assertion_test_data.write.parquet(testfile)

In [None]:
# Create Output Dataframe
output_df = pd.DataFrame(columns = ['assertion_model','ner_model','trainset_count','testset_count','epoch', 'learning_rate', 'batch_size','start_time', 'end_time', 'duration', 'overall_accuracy','class_accuracy', 'classification_report','confusion_matrix'])

## Start Training 

In [None]:
# training parameters
#epoch = 5
epoch = 20
#epoch = 15
#batch_size = 8
batch_size = 16
learning_rate = 0.001
#learning_rate = 0.002

In [None]:
#======================================
# start training
#======================================
start = time.ctime()
start2 = time.time()
print('start time for training: ', start)
print('...setup training pipeline')

graph_folder= "./tf_graphs"
scope_window = [15,10]
#.setGraphFile(f"{graph_folder}/blstm_34_32_30_200_4.pb")\

# 03-apr-2023 add     .setIncludeConfidence(True)\
# 03-apr-2023 add     .setRandomSeed (not available in v3.4.2)
assertionStatus = AssertionDLApproach()\
    .setLabelCol("label")\
    .setInputCols("document", "chunk", "embeddings")\
    .setOutputCol("assertion")\
    .setBatchSize(batch_size)\
    .setDropout(0.1)\
    .setLearningRate(learning_rate)\
    .setEpochs(epoch)\
    .setValidationSplit(0.1)\
    .setStartCol("start")\
    .setEndCol("end")\
    .setMaxSentLen(250)\
    .setIncludeConfidence(True)\
    .setEnableOutputLogs(True)\
    .setOutputLogsPath('training_logs/')\
    .setGraphFolder(graph_folder)\
    .setTestDataset(path=testfile, read_as='SPARK', options={'format': 'parquet'})\
    .setScopeWindow(scope_window)

clinical_assertion_pipeline = Pipeline(
    stages = [
    document,
    chunk,
    token,
    clinical_embeddings,
    assertionStatus])


In [None]:
%%time
assertion_model = clinical_assertion_pipeline.fit(training_data)

print('...training completed')
done = time.ctime()
done2 = time.time()
duration = done2-start2
print('end time for training: ', done)
#======================================
# end training
# Wall time: 5min 47s (10 epochs)
#======================================

## save model to disk

In [None]:
# save model
assertion_model_name = "assertion_model_"+str(epoch)+"_"+str(batch_size)+"_"+str(learning_rate)+"_"+str(datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S")+"_"+dataset_name)
print('...save models to folder: ./saved_models/'+assertion_model_name)
assertion_model.stages[-1].write().overwrite().save('./saved_models/'+assertion_model_name)

In [None]:
# below codes on model evaluation has error, need to upgrade to sparknlp 4.x (this notebook is using v3.4.2)
# current workaround: use lightpipeline to annotate and get the prediction

## Model Evaluation using lightpipeline

In [None]:
# specify the name of NER model
radio_ner_model = "clinical_embeddings_5_8_0.001_u0.4o1_train4522"

In [None]:
# loading
document = DocumentAssembler()\
        .setInputCol("text")\
        .setOutputCol("document")

sentence = SentenceDetector()\
        .setInputCols(['document'])\
        .setOutputCol('sentences')

token = Tokenizer()\
        .setInputCols(['sentences'])\
        .setOutputCol('tokens')

words_embedder = WordEmbeddingsModel()\
    .load(embeddings_clinical_local_path)\
    .setInputCols(["sentences", "tokens"])\
    .setOutputCol("embeddings")
  
radio_ner_tagger = MedicalNerModel.load(radio_ner_model)\
    .setInputCols(["sentences", "tokens", "embeddings"])\
    .setOutputCol("ner_tags")

converter = NerConverter()\
        .setInputCols(["sentences", "tokens", "ner_tags"])\
        .setOutputCol("ner_span")\
        .setWhiteList(["cancer_imaging_findings"])

## add radio assertion model
radiology_assertion = AssertionDLModel.load('./saved_models/'+assertion_model_name) \
    .setInputCols(["sentences", "ner_span", "embeddings"]) \
    .setOutputCol("assertion")

ner_assertion_pipeline = Pipeline(stages = [
        document,
        sentence,
        token,
        words_embedder,
        radio_ner_tagger,
        converter,
        radiology_assertion
])

empty_data = spark.createDataFrame([['']]).toDF("text")

ner_assertion_model = ner_assertion_pipeline.fit(empty_data)

lmodel = LightPipeline(ner_assertion_model)

In [None]:
#======================================
# model evaluation
#======================================
print('...evaluate model')

# use the light model to get testset prediction
test_df = test_data.toPandas()
test_df = test_df.reset_index()
test_df.head()

In [None]:
test_df.groupby("label").count()

In [None]:
test_df['text'].count()

In [None]:
from sparknlp_display import NerVisualizer
visualiser = NerVisualizer()

i=21
#i=942
#i=948
ppres = lmodel.fullAnnotate(test_df['text'].loc[i])[0]
visualiser.display(ppres, label_col='ner_span', document_col='document')

In [None]:
assertion_vis = AssertionVisualizer()
assertion_vis.display(ppres, 'ner_span', 'assertion')

In [None]:
ppres['assertion']

In [None]:
ppres['ner_span']

In [None]:
ppres['assertion'][0].metadata['confidence']

In [None]:
# 03-apr-2023 add confidence to output, for manuscript
chunk=[]
entity=[]
status=[]
confidence=[]
for n,m in zip(ppres['ner_span'],ppres['assertion']):
    chunk.append(n.result)
    entity.append(n.metadata['entity']) 
    status.append(m.result)
    confidence.append(m.metadata['confidence'])

temp_df = pd.DataFrame({'index':test_df['index'].loc[i],'text':test_df['text'].loc[i],'target':test_df['target'].loc[i],'label':test_df['label'].loc[i],'chunk':chunk, 'entity':entity, 'assertion_prediction':status, 'confidence':confidence})    
temp_df['entity_index'] = temp_df.index
temp_df


In [None]:
# 03-apr-2023 add confidence to output, for manuscript
# get performance on test set
#####################################
preds_df = pd.DataFrame()

#for i in [3,4]:
for i in range(test_df['text'].count()):
    print(i)
    ppres = lmodel.fullAnnotate(test_df['text'].loc[i])[0]
    #visualiser.display(ppres, label_col='ner_span', document_col='document', save_path="./display_result_18oct2022/"+df_text['sn_report_number'].loc[i]+"_report.html")

    #output to csv
    chunk=[]
    entity=[]
    status=[]
    confidence=[]
    
    for n,m in zip(ppres['ner_span'],ppres['assertion']):
        chunk.append(n.result)
        entity.append(n.metadata['entity']) 
        status.append(m.result)
        confidence.append(m.metadata['confidence'])
        
    temp_df = pd.DataFrame({'index':test_df['index'].loc[i],'text':test_df['text'].loc[i],'target':test_df['target'].loc[i],'label':test_df['label'].loc[i],'chunk':chunk, 'entity':entity, 'assertion_prediction':status, 'confidence':confidence})         
    temp_df['entity_index'] = temp_df.index
    print(temp_df)
    preds_df = preds_df.append(temp_df)

#save prediction to csv, output where target = chunk
preds_df = preds_df[preds_df['target'] == preds_df['chunk']]
columns = ['index','text', 'target','label', 'entity','assertion_prediction','confidence']

filename = assertion_model_name+"_predictions_wconfidence.csv"
preds_df.to_csv("./assertion_output/"+"/"+filename, columns=columns, index=False)   

# get performance metrics
y_true = preds_df['label']
y_pred = preds_df['assertion_prediction']
accuracy = accuracy_score(y_true,y_pred)
print("accuracy: ", accuracy)
micro_f1 = f1_score(y_true,y_pred, average="micro")
print("micro_f1: ", micro_f1)

report = classification_report(y_true,y_pred, digits=4, labels=np.unique(y_true))
print(report)

cm = confusion_matrix(y_true,y_pred)
print(cm)

# get per class accuracy
# https://stackoverflow.com/questions/39770376/scikit-learn-get-accuracy-scores-for-each-class
classes=np.unique(y_true)

# We will store the results in a dictionary for easy access later
per_class_accuracies = {}

# Calculate the accuracy for each one of our classes
for idx, cls in enumerate(classes):
    # True negatives are all the samples that are not our current GT class (not the current row) 
    # and were not predicted as the current class (not the current column)
    true_negatives = np.sum(np.delete(np.delete(cm, idx, axis=0), idx, axis=1))
    
    # True positives are all the samples of our current GT class that were predicted as such
    true_positives = cm[idx, idx]
    
    # The accuracy for the current class is ratio between correct predictions to all predictions   
    # 03-jul-2023: dont consider TN, use TP/(TP+FP+FN), same formulae for whole manuscript
    per_class_accuracies[cls] = (true_positives) / (np.sum(cm)-true_negatives) 
    
# Combine class accuracies to classification report
report_dict = classification_report(y_true,y_pred, digits=4, labels=np.unique(y_true), output_dict=True)
classification_report_df = pd.DataFrame(report_dict).transpose()
per_class_accuracies_df = pd.DataFrame.from_dict(per_class_accuracies, orient='index', columns=['class_accuracy']) 
combine_report_df = pd.concat([per_class_accuracies_df,classification_report_df], axis=1)

# save performance to csv
# model,rels_set,trainset_count,testset_count,epoch,learning_rate,batch_size,start_time,end_time,duration,accuracy,classification_report,confusion_matrix
to_append = [assertion_model_name,radio_ner_model,trainset_count,testset_count,epoch,learning_rate,batch_size,start,done,duration,accuracy,per_class_accuracies_df,report,cm]
df_length = len(output_df)

output_df.loc[df_length] = to_append
filename_prefix = "./assertion_result/"+"/"+assertion_model_name
filename = "%s.csv" % filename_prefix
output_df.to_csv(filename, header=True)
print(50*'-')
print("<<<Model Performance saved!>>>")
print(50*'-')
print(50*'-')

In [None]:
# all labels
print(classification_report(y_true,y_pred, digits=4))

In [None]:
# 2 labels
print(classification_report(y_true,y_pred, digits=4, labels=["probability_high","probability_medium"]))

In [None]:
cm = confusion_matrix(y_true,y_pred)
print(cm)

## ------------------- END OF TRAINING--------------------

## ------------------- MODEL INFERENCE --------------------

# 4. Test Data Prediction

In [None]:
# loading
document = DocumentAssembler()\
        .setInputCol("text")\
        .setOutputCol("document")

sentence = SentenceDetector()\
        .setInputCols(['document'])\
        .setOutputCol('sentences')

token = Tokenizer()\
        .setInputCols(['sentences'])\
        .setOutputCol('tokens')

words_embedder = WordEmbeddingsModel()\
    .load(embeddings_clinical_local_path)\
    .setInputCols(["sentences", "tokens"])\
    .setOutputCol("embeddings")
  
radio_ner_tagger = MedicalNerModel.load(radio_ner_model)\
    .setInputCols(["sentences", "tokens", "embeddings"])\
    .setOutputCol("ner_tags")

converter = NerConverter()\
        .setInputCols(["sentences", "tokens", "ner_tags"])\
        .setOutputCol("ner_span")\
        .setWhiteList(["cancer_imaging_findings"])

## add radio assertion model
radiology_assertion = AssertionDLModel.load('./saved_models/'+'/'+assertion_model_name) \
    .setInputCols(["sentences", "ner_span", "embeddings"]) \
    .setOutputCol("assertion")

ner_assertion_pipeline = Pipeline(stages = [
        document,
        sentence,
        token,
        words_embedder,
        radio_ner_tagger,
        converter,
        radiology_assertion
])

empty_data = spark.createDataFrame([['']]).toDF("text")

ner_assertion_model = ner_assertion_pipeline.fit(empty_data)

lmodel = LightPipeline(ner_assertion_model)

In [None]:
## sample

In [None]:
# site of mets
mtext1 = """
your sample text
"""

In [None]:
text = mtext1
sample_data = spark.createDataFrame([[text]]).toDF("text")
sample_data.show(truncate=False)
sample_data.dtypes

In [None]:
preds = ner_assertion_model.transform(sample_data)

preds.select(F.explode(F.arrays_zip("ner_span.result","ner_span.metadata")).alias("entities")) \
.select(F.expr("entities['0']").alias("chunk"),
        F.expr("entities['1'].entity").alias("entity")).show(50,truncate=False)

In [None]:
preds.select(F.explode(F.arrays_zip(preds.ner_span.result, 
                                     preds.ner_span.metadata, 
                                     preds.assertion.result)).alias("cols")) \
      .select(F.expr("cols['0']").alias("chunks"),
              F.expr("cols['1']['entity']").alias("ner_label"),
              F.expr("cols['1']['sentences']").alias("sent_id"),
              F.expr("cols['2']").alias("assertion")).show(50,truncate=False)

## LightPipeline / Visualisation

In [None]:
#! mkdir display_result

In [None]:
ppres = lmodel.fullAnnotate(text)[0]
ppres.keys()

In [None]:
from sparknlp_display import NerVisualizer
visualiser = NerVisualizer()
visualiser.display(ppres, label_col='ner_span', document_col='document')

In [None]:
assertion_vis = AssertionVisualizer()
assertion_vis.display(ppres, 'ner_span', 'assertion')

## Get prediction with sample.csv

In [None]:
# change the column names accordinlgy to suit your dataset
df_text = pd.read_csv("./inference/samples.csv", usecols=['sn_report_number', 'report_date','findings','conclusion'])
df_text.count()

In [None]:
df_text.head(2)

In [None]:
# check for null text
df_text.isnull().sum()

In [None]:
# fill null
df_text['conclusion'] = df_text['conclusion'].fillna('')

In [None]:
# save the visualisation to html file for review
# save the annotation to csv for review
annotation_df = pd.DataFrame()
for i in range(df_text['sn_report_number'].count()):
    print(i)
    ppres = lmodel.fullAnnotate(df_text['conclusion'].loc[i])[0]
    assertion_vis.display(ppres, 'ner_span', 'assertion',save_path="./inference/display_result/"+df_text['sn_report_number'].loc[i]+"_report.html")
    #output to csv
    chunk=[]
    entity=[]
    status=[]
    for n,m in zip(ppres['ner_span'],ppres['assertion']):
        chunk.append(n.result)
        entity.append(n.metadata['entity']) 
        status.append(m.result)
    temp_df = pd.DataFrame({'sn_report_number':df_text['sn_report_number'].loc[i],'report_date':df_text['report_date'].loc[i],'chunk':chunk, 'entity':entity, 'assertion_status':status})    
    temp_df['entity_index'] = temp_df.index
    #print(temp_df)
    annotation_df = annotation_df.append(temp_df)
    #print(annotation_df)

columns = ['sn_report_number', 'report_date','entity_index', 'entity','chunk','assertion_status']
annotation_df.to_csv("./inference/display_result/sample_ner_assertion.csv", columns=columns, index=False)    