# Annotating Data & Training Custom NER model for Social Determinants of Health (SDOH) Prediction

In this tutorial, we'll see how we can easily pre-annotate data using pre-defined vocabulary / key-word matching, and upload them as pre-annotations to NLP Lab.

We'll be using the NLP Lab module to create, configure, export and import projects with minimal code (optional).

Note: The NLP Lab module is available in Spark NLP for Healthcare 4.2.2+.

## Following are the main steps in this exercise:

### 1. Using string matching and existing off-the-shelf vocabularies, create a simple pipeline to get rudimentary results.
### 2. Upload the initial results to NLP Lab, annotate, and download annotations(optinal, you can skip this part and use annotated data to continue).
### 3. Train an NER model on the annotated data to achieve better performance.

# 0. Initial configurations

In [0]:
import pandas as pd
import os
import json

import sparknlp
import sparknlp_jsl
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp_jsl.base import *
from sparknlp_jsl.annotator import *

from pyspark.ml import Pipeline, PipelineModel
from sparknlp.training import CoNLL

print('sparknlp.version : ',sparknlp.version())
print('sparknlp_jsl.version : ',sparknlp_jsl.version())

spark

For this exercise, we define two different layers of solutions:
1. Bronze: Text Matcher based rudimentary results - that will be uploaded in NLP Lab and refined.
2. Silver: After annotating the documents properly in NLP Lab, train an NER model, get results

Let's define these paths:

In [0]:
delta_bronze_path='/FileStore/SDOH/data/delta/bronze/'
dbutils.fs.mkdirs(delta_bronze_path)
os.environ['delta_bronze_path']=f'/dbfs{delta_bronze_path}'

delta_silver_path='/FileStore/SDOH/data/delta/silver/'
dbutils.fs.mkdirs(delta_silver_path)
os.environ['delta_silver_path']=f'/dbfs{delta_silver_path}'

# 1. Using string matching and existing off-the-shelf vocabularies, create a simple pipeline to get rudimentary results

First, we'll rely on existing vocabularies comprising of key words e.g: "unstable housing", "lack of insurance", "substance abusers", etc to get preliminary results.

The vocbulary set in this exercise is generated using JSL SDOH Internal Project data and can be obtained from https://github.com/JohnSnowLabs/spark-nlp-workshop/tree/master/databricks/python/healthcare_case_studies/data/sdoh

**About the dataset:** The dataset is a text data that includes medical patient files with SDOH information. It is generated artificially using ChatGPT 3.5. This data can be downloaded from https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/databricks/python/healthcare_case_studies/data/sdoh/sdoh_sample.csv

## 1.1. Download resources and explore vocabulary

The below list is the selected labels of the SDOH dataset.

In [0]:
label_list = [
"Housing",
"Substance_Use",
"Insurance_Status",
"Social_Exclusion",
"Violence_Or_Abuse",
"Spiritual_Beliefs",
"Financial_Status"
] 

Download vocabulary list for each label from JSL github repo.

In [0]:

%sh
url_base="https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/databricks/python/healthcare_case_studies/data/sdoh/"
label_list=(
    "Housing"
    "Substance_Use"
    "Insurance_Status"
    "Social_Exclusion"
    "Violence_Or_Abuse"
    "Spiritual_Beliefs"
    "Financial_Status"
)

cd $delta_bronze_path
for label in "${label_list[@]}"; do
    file_url="${url_base}${label}_word_list.txt"
    wget "$file_url"
done


Checking the files

In [0]:
data_vocab_path = f"{delta_bronze_path}"
dbutils.fs.ls(data_vocab_path)

Taking a look at one of the vocabulary list

In [0]:
%sh
head /dbfs/FileStore/SDOH/data/delta/bronze/Housing_word_list.txt

Now we will download SDOH text data. This text data includes medical patient files including SDOH information. We will first pre-annotate them, then annotate and finally train a NER model.

Let's define original text data folder:

In [0]:
original_data_path='/FileStore/SDOH/data/delta/original/'
dbutils.fs.mkdirs(original_data_path)
os.environ['original_data_path']=f'/dbfs{original_data_path}'

Now dowload the SDOH text data

In [0]:
%sh

cd $original_data_path
wget https://github.com/JohnSnowLabs/spark-nlp-workshop/raw/master/databricks/python/healthcare_case_studies/data/sdoh/sdoh_sample.csv

Check the text data

In [0]:
%sh
head /dbfs/FileStore/SDOH/data/delta/original/sdoh_sample.csv

## 1.2. Creating a Spark NLP pipeline using textmatchers to find entities in the data

Bleow pipeline is used for extracting vocabulary based entities. We used seperate TextMatcher annotator for each label, then merged all TextMatcher outputs at the end of the pipeline using ChunkMergeApproach.

In [0]:
documentAssembler = DocumentAssembler()\
  .setInputCol("text")\
  .setOutputCol("document")

sentenceDetector = SentenceDetector()\
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer = Tokenizer()\
  .setInputCols("sentence")\
  .setOutputCol("token")

# get a annotator dictionay for all label from labels list
text_matcher_dict={}
for label in label_list:
    text_matcher_dict[label] = TextMatcher().setInputCols("sentence","token").setOutputCol(label).setEntityValue(label)\
    .setEntities(data_vocab_path+f"{label}_word_list.txt").setCaseSensitive(False).setMergeOverlapping(True)\
    .setBuildFromTokens(True)

chunk_merger = ChunkMergeApproach()\
    .setInputCols(label_list)\
    .setOutputCol("all_chunks")\

pipeline =  Pipeline(
    stages=[
        documentAssembler,
        sentenceDetector,
        tokenizer,
        *text_matcher_dict.values(),
        chunk_merger
    ]
)

p_model = pipeline.fit(spark.createDataFrame([[""]]).toDF("text"))

l_model = LightPipeline(p_model)

Let's check our pipeline with a text input.

In [0]:
from sparknlp_display import NerVisualizer

result = l_model.fullAnnotate(" a 55-year-old veteran who has been experiencing housing insecurity since losing his job as a factory worker. Despite his service to his country, Carlos has been unable to secure stable housing, and has been forced to rely on shelters and transitional housing programs. He has financial problems.")

displayHTML(NerVisualizer().display(result[0], 'all_chunks', return_html=True))

In [0]:
result

Now read all SDOH text data as spark data frame

In [0]:
data = spark.read.csv("dbfs:/FileStore/SDOH/data/delta/original/sdoh_sample.csv", sep=',', multiLine=True, header=True).withColumnRenamed('_c0',"text")
print(data.count())
data.show(3)

Transform dataframe using pre-annatation pipeline

In [0]:
results = p_model.transform(data).collect()

## 1.3. Analyzing Results

Even the basic text matching approach works in general, but has the following challenges:
1. Lack of context, leading to too many false positives.
2. Less meaningful and incomplete chunks.

Let's check some of the results.

In [0]:
from sparknlp_display import NerVisualizer

displayHTML(NerVisualizer().display(results[55], 'all_chunks', return_html=True))

In [0]:
# Checking on a piece of text
text = """a 55-year-old veteran who has been experiencing housing insecurity since losing his job as a factory worker. Despite his service to his country, Carlos has been unable to secure stable housing, and has been forced to rely on shelters and transitional housing programs. He has financial problems. He has a history of drug abuse """

results_single = l_model.fullAnnotate(text)[0]

from sparknlp_display import NerVisualizer

displayHTML(NerVisualizer().display(results_single, 'all_chunks', return_html=True))

# 2. Upload pre-annotations to the NLP Lab

Now, we can use these results as pre-annotations and upload them to the NLP Lab. Pre-annotations help reduce manual annotation time as the annotator does not need to annotate everything, but rather make corrections.

For this exercise, we are using John Snow Lab's NLP Lab tool.

The NLP Lab is a stand-alone web interface designed to be used and installed inside any organization's environment to protect data privacy. It can be easily installed on a single VM.

More details and instructions can be found here: https://nlp.johnsnowlabs.com/docs/en/alab/install#aws-marketplace. 

While the tasks and pre-annotations can be uploaded directly via web interface as well, we are leveraging the API module for convenience.

## 2.1. Generate Pre-annotations using the Annotation Lab Module.

Initialize Alab module

In [0]:
from sparknlp_jsl.alab import AnnotationLab

alab = AnnotationLab()

In [0]:
# NOTE: "all_results" is the result of the pipeline after running on sample docs.
pre_annotations, summary = alab.generate_preannotations(all_results = results, document_column = 'document', ner_columns = ['all_chunks'])
pre_annotations[:10]

**What is the summary?** 

While a user would know all the entities coming out of the NLP pipeline, listing all of them manually is laborious.
The summary object helps identify how many types of entities, assertions, and relations are present.
This saves the user from listing all labels individually; we'll use this object while setting project configuration.

In [0]:
summary

## 2.2. Set NLP Lab credentials and Create a New Project

<font color=#FF0000>**Note: If you don't have credentials for the NLP Lab, we have provided the annotations for ~300 tasks. Jump to section 3.2 to directly download the exported JSON file, and start training**</font>.

In [0]:

# # Set Credentials
# username=''
# password=''
# client_secret="" # see https://nlp.johnsnowlabs.com/docs/en/alab/api#get-client-secret
# annotationlab_url="" # your alab instance URL (could even be internal URL if deployed on-prem).

# alab.set_credentials(

#   # required: username
#   username=username,

#   # required: password
#   password=password,

#   # required: secret for you alab instance (every alab installation has a different secret)
#   client_secret=client_secret, 

#   # required: http(s) url for you NLP Lab
#   annotationlab_url=annotationlab_url
# )

Create a new project "sdoh" at NLP Lab.

In [0]:
# alab.create_project('sdoh')

## 2.3. Set project configuration (NER label tags, assertion classes, and relation tags)

In [0]:
# # set configuration

# ## either manually define labels:
# # alab.set_project_config(
# #   project_name = 'suicide_detection',
# #   ner_labels = ["Housing", "Substance_Use", "Insurance_Status", "Social_Exclusion", "Violence_Or_Abuse", "Spiritual_Beliefs", "Financial_Status"]
# # )

# # OR use the summary object which already has all details

# alab.set_project_config(
#   project_name = 'sdoh',
#   ner_labels = summary['ner_labels'],
#   assertion_labels = summary['assertion_labels'],
#   relations_labels = summary['re_labels']
# )

## 2.4. Upload pre-annotations to the newly created project.

You can upload all the tasks and annotate. For demo purpose, we are only uploading 5 tasks.

In [0]:
# # Upload documents to Alab

# alab.upload_preannotations(
#   project_name = 'sdoh',
#   preannotations = pre_annotations[:5]) # testing with 5 annotations

## 2.5. Annotate documents on NLP Lab and make necessary corrections

**2.5.1 The first step for annotations is developing, and adhering to some guidelines, which are crucial for controlling the flow of annotations and avoiding confusion between entitiy types.**

>**An example Annotation Guideline (AG) is available [here](http://www.universalner.org/guidelines/).**

**2.5.2 Once annotation guidelines have been finalized, annotations can be started.**

**Since we have already uploaded pre-annotations to the NLP Lab, we can get started.**

1.       Go to the Projects -> sdoh -> tasks

2.       Select the first task

3.       Click Edit -> select NER type -> select corresponding text, as defined in Annotation Guidelines.

4.       Click Save -> Submit.

5.       Go to the next task, until all tasks are completed.

In the tasks overview, you should see all 5 tasks submitted.

# 3. Train NER model
Now, we can train an NER model using the annotations performed on the NLP Lab

## 3.1. Import the annotations from NLP Lab and save them as a JSON file.

First We will download annotations info from NLP Lab project as a JSON file. This JSON file includes url of the annotations zip file. Then we will extract it.

In [0]:
# exported_json = alab.get_annotations(
#                 project_name = 'sdoh', 
#                 output_name='result',
#                 save_dir=f"/dbfs{delta_silver_path}")


"exported_json" includes download link for the annotation zip file.

In [0]:
# exported_json

Save download_link and filename asenvironment variables to be used in command shell.

In [0]:
# os.environ['download_link'] = exported_json["download_link"]
# os.environ['export_filename']  = exported_json["download_link"].split("/")[-1]

We will dowload and unzip the zip file mentioned in the exported_json.

In [0]:
# %sh
# cd $delta_silver_path
# wget -q $download_link
# unzip -o $export_filename

In [0]:
# print (delta_silver_path)
# dbutils.fs.ls(delta_silver_path)

## 3.2. Convert the JSON file to CoNLL format for training an NER model

<font color=#FF0000>**Note: For demo purpose, we are downloading the annotated tasks. This is the same data as above NLP Lab annotations**</font>

In [0]:
%sh

cd $delta_silver_path
wget https://github.com/JohnSnowLabs/spark-nlp-workshop/raw/master/databricks/python/healthcare_case_studies/data/sdoh/sdoh_sample_export.json


In [0]:
print (delta_silver_path)
dbutils.fs.ls(delta_silver_path)

Initialize AnnotationLab modeule and create conll data from the annotation JSON file.

In [0]:

from sparknlp_jsl.alab import AnnotationLab

alab = AnnotationLab()

alab.get_conll_data(spark, f"/dbfs{delta_silver_path}sdoh_sample_export.json", output_name='conll_demo', save_dir=f"/dbfs{delta_silver_path}")


In [0]:
dbutils.fs.ls(delta_silver_path)

## 3.3. Train NER Model

Load conll data to be used for NER trainig.

In [0]:
conll_data = CoNLL().readDataset(spark, f"{delta_silver_path}conll_demo.conll")
conll_data.show(3)

Look at label distribution

In [0]:
from pyspark.sql import functions as F

conll_data.select(F.explode(F.arrays_zip(conll_data.token.result,
                                         conll_data.label.result)).alias("cols")) \
          .select(F.expr("cols['0']").alias("token"),
                  F.expr("cols['1']").alias("ground_truth"))\
          .groupBy('ground_truth')\
          .count()\
          .orderBy('count', ascending=False)\
          .show(100,truncate=False)



Select Embeddings

In [0]:
clinical_embeddings = WordEmbeddingsModel.pretrained('embeddings_clinical', "en", "clinical/models")\
    .setInputCols(["sentence", "token"])\
    .setOutputCol("embeddings")

Graph Builder to automatically generate a TensorFlow graph for training.

In [0]:
graph_folder_path = "/dbfs/ner/medical_ner_graphs"

ner_graph_builder = TFGraphBuilder()\
    .setModelName("ner_dl")\
    .setInputCols(["sentence", "token", "embeddings"]) \
    .setLabelColumn("label")\
    .setGraphFolder(graph_folder_path)\
    .setGraphFile("auto")\
    .setHiddenUnitsNumber(20)\
    .setIsLicensed(True) # False -> if you want to use TFGraphBuilder with NerDLApproach

Below is the NER Model training Aproach. We define all hyper-parameters within this step. At the end we define our trainig pipeline.

In [0]:
nerTagger = MedicalNerApproach()\
  .setInputCols(["sentence", "token", "embeddings"])\
  .setLabelColumn("label")\
  .setOutputCol("ner")\
  .setMaxEpochs(21)\
  .setBatchSize(32)\
  .setRandomSeed(0)\
  .setVerbose(1)\
  .setLr(0.001)\
  .setEvaluationLogExtended(True) \
  .setEnableOutputLogs(True)\
  .setOutputLogsPath('dbfs:/ner/ner_logs')\
  .setUseBestModel(True)\
  .setGraphFolder('dbfs:/ner/medical_ner_graphs')\
  .setLogPrefix("sdoh_demo")\
  .setValidationSplit(0.2)
  # .setEnableMemoryOptimizer(True) #>> If you have limited memory and a large conll file, you can set this True to train batch by batch       

ner_pipeline = Pipeline(stages=[
          clinical_embeddings,
          ner_graph_builder,
          nerTagger
 ])

Start NER training

In [0]:
model = ner_pipeline.fit(conll_data)

Check logs for loss and scores during batches.

In [0]:
ls -l /dbfs/ner/ner_logs/sdoh_demo*

In [0]:
sample_log_file_name = dbutils.fs.ls("/ner/ner_logs/")[0].name
with open(f'/dbfs/ner/ner_logs/{sample_log_file_name}', 'r') as f_:
  lines = ''.join(f_)
print (lines)

## 3.4. Save the model to disk, load from the disk and test on the new model

Save the trained NER model at "silver_path"

In [0]:
model.stages[-1].write().overwrite().save(delta_silver_path+'ner_model')

Now build prediction pipeline using trained & saved NER model. We will load saved modwel from the "silver_path"

In [0]:
documentAssembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

sentenceDetector = SentenceDetectorDLModel().pretrained('sentence_detector_dl_healthcare', 'en', 'clinical/models')\
    .setInputCols(["document"])\
    .setOutputCol("sentence")

tokenizer = Tokenizer()\
    .setInputCols(["sentence"])\
    .setOutputCol("token")

embeddings = WordEmbeddingsModel.pretrained('embeddings_clinical', 'en', 'clinical/models') \
    .setInputCols("sentence", "token") \
    .setOutputCol("embeddings")

#load the trained ner model
ner_model =MedicalNerModel().load(delta_silver_path+'ner_model')\
    .setInputCols(["sentence", "token", "embeddings"])\
    .setOutputCol("ner_tags")

#ner converter ner jsl enriched
ner_chunk = NerConverterInternal()\
    .setInputCols(['sentence', 'token', 'ner_tags']) \
    .setOutputCol('ner_chunk')


pipeline=Pipeline(stages=[
    documentAssembler,
    sentenceDetector,
    tokenizer,
    embeddings,
    ner_model,
    ner_chunk,
])

empty_data = spark.createDataFrame([[""]]).toDF("text")
model = pipeline.fit(empty_data)
light_model = LightPipeline(model)

Let's check the new model on a piece of sample text.

In [0]:
# Checking on a piece of text
text = """A 48-year-old woman presented at the community health clinic, highlighting a range of health concerns that were intricately connected to various social determinants of health.

Her housing situation revealed instability, as she navigated temporary shelters due to financial constraints. This financial stress further impacted her ability to access adequate nutrition and consistent healthcare.

The patient disclosed occasional substance use as a coping mechanism for her challenges, expressing a desire to explore healthier ways of managing stress.

Having no health insurance amplified her worries about affording medical care and essential medications, adding to her emotional distress.

Her social interactions were affected by feelings of exclusion arising from her housing instability and financial struggles. Additionally, the patient shared a history of emotional abuse, contributing to her overall stress levels.

In contrast, the patient's spiritual beliefs as Christianity acted as a source of strength and emotional support, offering her solace during difficult times.

The patient reported symptoms of anxiety and insomnia, warranting a comprehensive approach to her health management. Recommendations encompassed counseling for substance use, anxiety, and emotional support. Moreover, resources were provided to explore affordable healthcare services and potential insurance options."""

results_single = light_model.fullAnnotate(text)[0]

from sparknlp_display import NerVisualizer

displayHTML(NerVisualizer().display(results_single, 'ner_chunk', return_html=True))

The model is trained against the `Housing, Substance_Use, Insurance_Status, Social_Exclusion, Violence_Or_Abuse, Spiritual_Beliefs, Financial_Status` and we extracted these entities from a patients file as seen above.

## License

Copyright / License info of the notebook. Copyright [2022] the Notebook Authors.  The source in this notebook is provided subject to the [Apache 2.0 License](https://spdx.org/licenses/Apache-2.0.html).  All included or referenced third party libraries are subject to the licenses set forth below.
|Library Name|Library License|Library License URL|Library Source URL|
| :-: | :-:| :-: | :-:|
|Pandas |BSD 3-Clause License| <https://github.com/pandas-dev/pandas/blob/master/LICENSE> | <https://github.com/pandas-dev/pandas>|
|Numpy |BSD 3-Clause License| <https://github.com/numpy/numpy/blob/main/LICENSE.txt> | <https://github.com/numpy/numpy>|
|Apache Spark |Apache License 2.0| <https://github.com/apache/spark/blob/master/LICENSE> | <https://github.com/apache/spark/tree/master/python/pyspark>|
|Requests|Apache License 2.0|<https://github.com/psf/requests/blob/main/LICENSE>|<https://github.com/psf/requests>|
|Spark NLP Display|Apache License 2.0|<https://github.com/JohnSnowLabs/spark-nlp-display/blob/main/LICENSE>|<https://github.com/JohnSnowLabs/spark-nlp-display>|
|Spark NLP |Apache License 2.0| <https://github.com/JohnSnowLabs/spark-nlp/blob/master/LICENSE> | <https://github.com/JohnSnowLabs/spark-nlp>|
|Spark NLP for Healthcare|[Proprietary license - John Snow Labs Inc.](https://www.johnsnowlabs.com/spark-nlp-health/) |NA|NA|
|Author|
|-|
|Databricks Inc.|
|John Snow Labs Inc.|

## Disclaimers

Databricks Inc. (“Databricks”) does not dispense medical, diagnosis, or treatment advice. This Solution Accelerator (“tool”) is for informational purposes only and may not be used as a substitute for professional medical advice, treatment, or diagnosis. This tool may not be used within Databricks to process Protected Health Information (“PHI”) as defined in the Health Insurance Portability and Accountability Act of 1996, unless you have executed with Databricks a contract that allows for processing PHI, an accompanying Business Associate Agreement (BAA), and are running this notebook within a HIPAA Account.  Please note that if you run this notebook within Azure Databricks, your contract with Microsoft applies.

The job configuration is written in the RUNME notebook in json format. The cost associated with running the accelerator is the user's responsibility.