<a href="https://colab.research.google.com/github/russell-ai/SparkNLP-CustomNER/blob/main/1_Pipeline_Implementation_with_Chunk_Merger.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

![JohnSnowLabs](https://nlp.johnsnowlabs.com/assets/images/logo.png)

# **Interview Task**
[Running a Spark NLP Healthcare Pipeline and Training a Custom NER Model](https://docs.google.com/document/d/1l_SpYGAlVGAEe9x-b8avgvKipCXetdap2ttc4UKreO4/edit?tab=t.0)  
## **PART-I Pipeline Implementation:**


## 1. Set Up Spark NLP for Healthcare

In [None]:
import json
import os

from google.colab import files

if 'spark_jsl.json' not in os.listdir():
  license_keys = files.upload()
  os.rename(list(license_keys.keys())[0], 'spark_jsl.json')

with open('spark_jsl.json') as f:
    license_keys = json.load(f)

# Defining license key-value pairs as local variables
locals().update(license_keys)
os.environ.update(license_keys)

In [None]:
license_keys.keys()

In [None]:
license_keys['JSL_VERSION']

In [None]:
license_keys['PUBLIC_VERSION']

In [None]:
# Installing pyspark and spark-nlp
! pip install --upgrade -q pyspark==3.4.1 spark-nlp==$PUBLIC_VERSION


# Installing Spark NLP Healthcare
! pip install --upgrade -q spark-nlp-jsl==$JSL_VERSION  --extra-index-url https://pypi.johnsnowlabs.com/$SECRET

# Installing Spark NLP Display Library for visualization
! pip install -q spark-nlp-display

In [None]:
import json
import os

import sparknlp
import sparknlp_jsl

from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp_jsl.annotator import *
from sparknlp_jsl.pipeline_tracer import PipelineTracer
from sparknlp_jsl.pipeline_output_parser import PipelineOutputParser

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Pipeline,PipelineModel

import pandas as pd
pd.set_option('display.max_colwidth', 200)

import warnings
warnings.filterwarnings('ignore')

params = {"spark.driver.memory":"16G",
          "spark.kryoserializer.buffer.max":"2000M",
          "spark.driver.maxResultSize":"2000M"}

print("Spark NLP Version :", sparknlp.version())
print("Spark NLP_JSL Version :", sparknlp_jsl.version())

spark = sparknlp_jsl.start(license_keys['SECRET'],params=params)

spark

In [None]:
from sparknlp_jsl.pretrained import InternalResourceDownloader

ner_models = InternalResourceDownloader.returnPrivateModels("MedicalNerModel")
for model, lang, version in ner_models:
  if lang == "en" and (model.startswith("ner_clinical") or model.startswith("ner_posology")):
    print(model)

## 2. Dataset Selection

In [None]:
# mt_samples dataset from John Snow Labs
! wget -q https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Healthcare/data/mt_samples_10.csv

In [None]:
mt_samples_df = spark.read.csv("mt_samples_10.csv", header=True, multiLine=True)

In [None]:
mt_samples_df.printSchema()

In [None]:
mt_samples_df.show(truncate=100)

In [None]:
print(mt_samples_df.limit(1).collect()[0]['text'])

## 3. NER Pipeline Execution:

### Pipeline

In [None]:
# Document assembler
document_assembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

# Sentence detector
sentence_detector = SentenceDetector()\
    .setInputCols(["document"])\
    .setOutputCol("sentence")

# Tokenizer
tokenizer = Tokenizer()\
    .setInputCols(["sentence"])\
    .setOutputCol("token")

# Word Embeddings
embeddings = WordEmbeddingsModel.pretrained("embeddings_clinical", "en", "clinical/models")\
    .setInputCols(["sentence", "token"])\
    .setOutputCol("embeddings")

# NER models
ner_clinical = MedicalNerModel.pretrained("ner_clinical", "en", "clinical/models")\
    .setInputCols(["sentence", "token", "embeddings"])\
    .setOutputCol("ner_clinical")

ner_posology_greedy = MedicalNerModel.pretrained("ner_posology_greedy", "en", "clinical/models")\
    .setInputCols(["sentence", "token", "embeddings"])\
    .setOutputCol("ner_posology")

ner_deid_generic = MedicalNerModel.pretrained("ner_deid_generic_augmented", "en", "clinical/models")\
    .setInputCols(["sentence", "token", "embeddings"])\
    .setOutputCol("ner_deid")

# NER Converters
ner_conv_clinical = NerConverterInternal()\
    .setInputCols(["sentence", "token", "ner_clinical"])\
    .setOutputCol("ner_chunk_clinical")

ner_conv_posology = NerConverterInternal()\
    .setInputCols(["sentence", "token", "ner_posology"])\
    .setOutputCol("ner_chunk_posology")\
    .setWhiteList(["DRUG"])

ner_conv_deid = NerConverterInternal()\
    .setInputCols(["sentence", "token", "ner_deid"])\
    .setOutputCol("ner_chunk_deid")\
    .setWhiteList(["NAME", "DATE"])

# Chunk Merger
chunk_merger = ChunkMergeApproach()\
    .setInputCols(["ner_chunk_clinical", "ner_chunk_posology", "ner_chunk_deid"])\
    .setOutputCol("merged_chunks")\
    .setMergeOverlapping(True)

# Pipeline Creation
pipeline = Pipeline(
    stages=[
        document_assembler,
        sentence_detector,
        tokenizer,
        embeddings,
        ner_clinical,
        ner_posology_greedy,
        ner_deid_generic,
        ner_conv_clinical,
        ner_conv_posology,
        ner_conv_deid,
        chunk_merger
    ]
)

### Test pipeline

In [None]:
# Fit the pipeline to an empty dataframe
empty_df = spark.createDataFrame([[""]]).toDF("text")
model = pipeline.fit(empty_df)

In [None]:
# Run the pipeline
text = "Patient John Doe was prescribed 500mg of Aspirin on 2023-05-15 for his chronic pain."
test_df = spark.createDataFrame([(text,)]).toDF("text")
results = model.transform(test_df)

# Display results
results.select("merged_chunks.result").show(truncate=False)

### Extract **ner_clinical** predictions from the mt_samples via the pipeline

#### Prepare input texts

In [None]:
# total_rows = mt_samples_df.count()
# texts = [mt_samples_df.select("text").collect()[i]['text'] for i in range(total_rows)]
texts = mt_samples_df.select("text").rdd.flatMap(lambda x: x).collect()
print(len(texts))
print(type(texts), type(texts[0]))

In [None]:
for idx, text in enumerate(texts):
  print(f"Text {idx + 1}:".center(100, '-'))
  print(text)
  if idx == 2:
    break

#### **pipeline_tracer** to get structured output

In [None]:
pipeline_tracer = PipelineTracer(light_pipeline_model)

column_maps = pipeline_tracer.createParserDictionary()
column_maps.update({"document_identifier": "ner_pipeline"})
pipeline_parser = PipelineOutputParser(column_maps)

#### Getting prediction

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
import pandas as pd

# This Empty list stores the results
all_results = []
all_texts = []

# Apply the pipeline to each text
for idx, text in enumerate(texts):
    light_result = light_model.fullAnnotate([text])
    result = pipeline_parser.run(light_result)

    # add the results to the list
    for entity in result['result'][0]['entities']:
        all_results.append({
            'text_id': idx,
            'begin': entity['begin'],
            'end': entity['end'],
            'chunk': entity['chunk'],
            'entity': entity['ner_label']
        })

    all_texts.append({'text_id': idx,'text': text})

# convert the list to a pandas dataframe
result_df = pd.DataFrame(all_results)
text_df = pd.DataFrame(all_texts)

In [None]:
result_df.head(10)

In [None]:
entity_counts = result_df['entity'].value_counts()
print(entity_counts)

#### **Save** the results of the NER and the corresponding texts as a csv file.

In [None]:
result_df.to_csv("/content/ner_clinical_mtsamples_ner_results_for_conll.csv", index=False)
text_df.to_csv("/content/mtsamples_texts.csv", index=False)

### Test ner_posology pipeline

In [None]:
text ='''The patient was prescribed 1 capsule of Parol with meals .
He was seen by the endocrinology service and she was discharged on 40 units of insulin glargine at night , 12 units of insulin lispro with meals , and metformin 1000 mg two times a day .
It was determined that all SGLT2 inhibitors should be discontinued indefinitely fro 3 months .'''

In [None]:
embeddings = 'embeddings_clinical'
model_name = 'ner_posology'

light_pipeline_model = get_pipeline_model(embeddings, model_name)
light_model = LightPipeline(light_pipeline_model)
light_result = light_model.fullAnnotate(text)

In [None]:
pprint(light_result)

### Extract **ner_posology** predictions from the mt_samples dataset via the pipeline

In [None]:
# Input texts for pipeline
texts = mt_samples_df.select("text").withColumn("text_id", monotonically_increasing_id()).collect()

In [None]:
texts[0]

In [None]:
# Initialize empty lists to store results
all_results = []
all_texts = []

# Process each text through the pipeline
for row in texts:
  text_id = row['text_id']
  text = row['text']

  # Save the text and its id
  all_texts.append({'text_id': int(text_id),'text': text})

  # Run the posology NER pipeline
  light_result = light_model.fullAnnotate([text])
  result = pipeline_parser.run(light_result)

  # Create a row for each entity
  for entity in result['result'][0]['entities']:
      all_results.append({
          'text_id': int(text_id),
          'begin': entity['begin'],
          'end': entity['end'],
          'chunk': entity['chunk'],
          'entity': entity['ner_label']
      })

# Convert results to DataFrames
result_df = pd.DataFrame(all_results)
texts_df = pd.DataFrame(all_texts)

# Save as CSV files
result_df.to_csv(f"ner_posology_mtsamples_ner_results.csv", index=False)
texts_df.to_csv(f"mtsamples_texts.csv", index=False)

In [None]:
texts_df.iloc[0]['text']

In [None]:
result_df.head(10)

In [None]:
result_df["entity"].value_counts()

---  
*`R.Caliskan`*
