# Overview of pipeline

The purpose of this notebook is to generate entity resolution results for a variety of combinations of Spark NLP for Healthcare models.

The algorithm will accept a dataframe of campaign urls and fund descriptions and will output a long-formatted csv with columns specifying each component of the algorithm.





# Import license keys

In [None]:
from google.colab import files
import json

license_keys = files.upload()

with open(list(license_keys.keys())[0]) as f:
    license_keys = json.load(f)

Saving keys.json to keys.json


# Install and import dependencies

In [None]:
import os
import csv
import io
import pandas as pd
import numpy as np
import copy


secret = license_keys['SECRET']
os.environ['SPARK_NLP_LICENSE'] = license_keys['SPARK_NLP_LICENSE']
os.environ['AWS_ACCESS_KEY_ID'] = license_keys['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = license_keys['AWS_SECRET_ACCESS_KEY']
sparknlp_version = license_keys["PUBLIC_VERSION"]
jsl_version = license_keys["JSL_VERSION"]

print ('SparkNLP Version:', sparknlp_version)
print ('SparkNLP-JSL Version:', jsl_version)

# Install Java
! apt-get update -qq
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
! java -version

# Install pyspark
! pip install --ignore-installed -q pyspark==2.4.4

# Install Spark NLP
! pip install --ignore-installed spark-nlp==$sparknlp_version
! python -m pip install --upgrade spark-nlp-jsl==$jsl_version --extra-index-url https://pypi.johnsnowlabs.com/$secret

os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ['PATH'] = os.environ['JAVA_HOME'] + "/bin:" + os.environ['PATH']

from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *

import sparknlp
from sparknlp.annotator import *
from sparknlp_jsl.annotator import *
from sparknlp.base import *
import sparknlp_jsl

spark = sparknlp_jsl.start(secret)

SparkNLP Version: 2.6.4
SparkNLP-JSL Version: 2.7.1
openjdk version "11.0.9.1" 2020-11-04
OpenJDK Runtime Environment (build 11.0.9.1+1-Ubuntu-0ubuntu1.18.04)
OpenJDK 64-Bit Server VM (build 11.0.9.1+1-Ubuntu-0ubuntu1.18.04, mixed mode, sharing)
[K     |████████████████████████████████| 215.7MB 63kB/s 
[K     |████████████████████████████████| 204kB 40.1MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
Collecting spark-nlp==2.6.4
[?25l  Downloading https://files.pythonhosted.org/packages/d9/26/f7a6ac12339d2f1ed271c46c16705665620059e4559f323695925f3c63b4/spark_nlp-2.6.4-py2.py3-none-any.whl (129kB)
[K     |████████████████████████████████| 133kB 6.4MB/s 
[?25hInstalling collected packages: spark-nlp
Successfully installed spark-nlp-2.6.4
Looking in indexes: https://pypi.org/simple, https://pypi.johnsnowlabs.com/2.7.1-c069474a59bb52cf25c5ed6e7beb05b04c42e7ca
Collecting spark-nlp-jsl==2.7.1
  Downloading https://pypi.johnsnowlabs.com/2.7.1-c069474a59bb52cf25c5e

# Define pipeline elements

In [None]:
document_assembler = DocumentAssembler() \
  .setInputCol('text')\
  .setOutputCol('document')

sentence_detector = SentenceDetector() \
  .setInputCols(['document'])\
  .setOutputCol('sentence')

tokenizer = Tokenizer()\
  .setInputCols(['sentence']) \
  .setOutputCol('token')

#-------------------------------------------------------------------------------
#embeddings
#-------------------------------------------------------------------------------
word_embeddings_healthcare = WordEmbeddingsModel.pretrained("embeddings_healthcare_100d","en","clinical/models")\
	.setInputCols(["document","token"])\
	.setOutputCol("embeddings")
 
word_embeddings_clinical = WordEmbeddingsModel.pretrained("embeddings_clinical", "en", "clinical/models")\
  .setInputCols(["sentence", "token"])\
  .setOutputCol("embeddings")
#-------------------------------------------------------------------------------


#-------------------------------------------------------------------------------
#NER
#-------------------------------------------------------------------------------
ner_healthcare = NerDLModel.pretrained("ner_healthcare", "en", "clinical/models") \
  .setInputCols(["sentence", "token", "embeddings"]) \
  .setOutputCol("ner")

ner_clinical = NerDLModel.pretrained("ner_clinical", "en", "clinical/models") \
  .setInputCols(["sentence", "token", "embeddings"]) \
  .setOutputCol("ner")

ner_diseases = NerDLModel.pretrained("ner_diseases", "en", "clinical/models") \
  .setInputCols(["sentence", "token", "embeddings"]) \
  .setOutputCol("ner")

ner_jsl = NerDLModel.pretrained("ner_jsl", "en", "clinical/models") \
  .setInputCols(["sentence", "token", "embeddings"]) \
  .setOutputCol("ner")
#-------------------------------------------------------------------------------

ner_converter_problem = NerConverter() \
  .setInputCols(["sentence", "token", "ner"]) \
  .setOutputCol("ner_chunk")\
  .setWhiteList(['PROBLEM'])

ner_converter_diseases = NerConverter() \
  .setInputCols(["sentence", "token", "ner"]) \
  .setOutputCol("ner_chunk")\
  .setWhiteList(['Disease'])

ner_converter_diagnosis = NerConverter() \
  .setInputCols(["sentence", "token", "ner"]) \
  .setOutputCol("ner_chunk")\
  .setWhiteList(['Diagnosis'])

chunk_embeddings = ChunkEmbeddings()\
    .setInputCols(["ner_chunk", "embeddings"])\
    .setOutputCol("chunk_embeddings")
    
athena = ChunkEntityResolverModel.pretrained("chunkresolve_athena_conditions_healthcare","en","clinical/models")\
	.setInputCols(["token","chunk_embeddings"])\
	.setOutputCol("entity")\
  .setDistanceFunction("EUCLIDEAN")
 
icd10 = ChunkEntityResolverModel.pretrained("chunkresolve_icd10cm_clinical","en","clinical/models")\
	.setInputCols(["token","chunk_embeddings"])\
	.setOutputCol("entity")\
  .setDistanceFunction("EUCLIDEAN")
 
snomed = ChunkEntityResolverModel.pretrained("chunkresolve_snomed_findings_clinical","en","clinical/models")\
	.setInputCols(["token","chunk_embeddings"])\
	.setOutputCol("entity")\
  .setDistanceFunction("EUCLIDEAN")

#------------------------
# sentence embeddings
#-------------------------

c2doc = Chunk2Doc().setInputCols("ner_chunk").setOutputCol("ner_chunk_doc") 

sbert_embedder = BertSentenceEmbeddings\
  .pretrained("sbiobert_base_cased_mli",'en','clinical/models')\
  .setInputCols(["ner_chunk_doc"])\
  .setOutputCol("sbert_embeddings")

icd10cm_resolver = SentenceEntityResolverModel.pretrained("sbiobertresolve_icd10cm","en", "clinical/models") \
  .setInputCols(["ner_chunk", "sbert_embeddings"]) \
  .setOutputCol("entity")\
  .setDistanceFunction("EUCLIDEAN")

embeddings_healthcare_100d download started this may take some time.
Approximate size to download 475.8 MB
[OK!]
embeddings_clinical download started this may take some time.
Approximate size to download 1.6 GB
[OK!]
ner_healthcare download started this may take some time.
Approximate size to download 13.4 MB
[OK!]
ner_clinical download started this may take some time.
Approximate size to download 13.8 MB
[OK!]
ner_diseases download started this may take some time.
Approximate size to download 13.7 MB
[OK!]
ner_jsl download started this may take some time.
Approximate size to download 14 MB
[OK!]
chunkresolve_athena_conditions_healthcare download started this may take some time.
Approximate size to download 180.7 MB
[OK!]
chunkresolve_icd10cm_clinical download started this may take some time.
Approximate size to download 166.3 MB
[OK!]
chunkresolve_snomed_findings_clinical download started this may take some time.
Approximate size to download 162.6 MB
[OK!]
sbiobert_base_cased_mli down

# Define functions

In [None]:
def RemoveStopwords(chunks):
  results = []
  for string in chunks:
    stopwords = ['a', 'an', 'the', 'this', 'that', 'these', 'his', 'her', 'their']
    words = string.split()
    resultwords  = [word for word in words if word.lower() not in stopwords]
    r = ' '.join(resultwords)
    results.append(r)
  return results

In [None]:
def ChunksToNewDoc(chunks):
  return ' and '.join(chunks)

In [None]:
def GetChunksAndSentences(lofd):
  results = []
  for d in lofd:
    dic = {}
    dic['url'] = d['url']
    dic['fund_description'] = d['fund_description']
    dic['num_sentences'] = len(d['sentence'])
    dic['ner_chunk'] = []
    for chunk in d['ner_chunk']:
      dic['ner_chunk'].append({'result': chunk['result'], 'sentence': int(chunk.metadata['sentence'])})
    results.append(dic)
  return results

In [None]:
def PrepareNewDoc(lofd):
  r = []
  for d in lofd:
    dic = copy.deepcopy(d)
    dic['chunks'] = []
    for chunk in d['ner_chunk']:
      dic['chunks'].append(chunk['result'])
      dic['chunks'] = RemoveStopwords(dic['chunks'])
      dic['new_doc'] = ChunksToNewDoc(dic['chunks'])
    r.append(dic)
  return pd.DataFrame(r)

In [None]:
def RunER(ner_result, entity_resolution_model):
  '''
  input: spark dataframe of NER results
  output: pandas dataframe with nested results
  '''

  #initiate empty df
  empty_df = spark.createDataFrame([['']]).toDF('fund_description')

  #define pipeline components
  if entity_resolution_model == 'chunkresolve_icd10cm_clinical':
    pipe = Pipeline(stages=[chunk_embeddings, icd10])
  if entity_resolution_model == 'chunkresolve_snomed_findings_clinical':
    pipe = Pipeline(stages=[chunk_embeddings, snomed])
  if entity_resolution_model == 'sbiobertresolve_icd10cm':
    pipe = Pipeline(stages=[ c2doc, sbert_embedder, icd10cm_resolver])
  
  #run entity resolution
  pipe_model = pipe.fit(empty_df)
  results = pipe_model.transform(ner_result)
  
  return results.toPandas()[['url','fund_description','ner_chunk','entity']]

In [None]:
def RunNER(feed, ner_model, first_column):
  '''
  input:
    1. pandas dataframe of feed data, col_1 = url, col_2 = fund_description
    2. string of ner model
    3. string of first column name
  output: pandas dataframe with nested results
  '''
  #initiate empty df
  empty_df = spark.createDataFrame([['']]).toDF(first_column)
  
  #load feed data into df
  df = spark.createDataFrame(feed)

  #define first entity recognition pipeline with appropriate column name
  document_assembler = DocumentAssembler() \
  .setInputCol(first_column)\
  .setOutputCol('document')
  
  if ner_model == 'ner_healthcare':

    word_healthcare_pipeline = Pipeline(stages=[
      document_assembler, 
      sentence_detector,
      tokenizer,
      word_embeddings_healthcare,
      ner_healthcare,
      ner_converter_problem])

    ner_healthcare_model = word_healthcare_pipeline.fit(empty_df)
    result_healthcare = ner_healthcare_model.transform(df)
    return result_healthcare
  
  if ner_model == 'ner_clinical':

    word_clinical_pipeline = Pipeline(stages=[
      document_assembler, 
      sentence_detector,
      tokenizer,
      word_embeddings_clinical,
      ner_clinical,
      ner_converter_problem])

    ner_clinical_model = word_clinical_pipeline.fit(empty_df)
    result_clinical = ner_clinical_model.transform(df)
    return result_clinical
  
  if ner_model == 'ner_diseases':

    word_diseases_pipeline = Pipeline(stages=[
      document_assembler, 
      sentence_detector,
      tokenizer,
      word_embeddings_clinical,
      ner_diseases,
      ner_converter_diseases])

    ner_diseases_model = word_diseases_pipeline.fit(empty_df)
    result_diseases = ner_diseases_model.transform(df)
    return result_diseases
  
  if ner_model == 'ner_jsl':

    word_jsl_pipeline = Pipeline(stages=[
      document_assembler, 
      sentence_detector,
      tokenizer,
      word_embeddings_clinical,
      ner_jsl,
      ner_converter_diagnosis])

    ner_jsl_model = word_jsl_pipeline.fit(empty_df)
    result_jsl = ner_jsl_model.transform(df)
    return result_jsl

In [None]:
def RunModel(feed, combos):
  '''
  input:
    1. pandas dataframe of feed data, col_1 = url, col_2 = fund_description
    2. list of strings specifyinh a combination of model components
  output: long-format pandas dataframe
  '''

  #extract model components
  ner_model = combos[0]
  entity_resolution_input = combos[1]
  entity_resolution_model = combos[2]

  #Run NER
  ner_result = RunNER(feed, ner_model, 'fund_description')
  
  #Run ER
  if entity_resolution_input == 'original':
    #Run NER -> entity resolution
    er_result = RunER(ner_result, entity_resolution_model)
    er_result['entity_resolution_input'] = [entity_resolution_input for x in er_result['url']]
    er_result['ner_model'] = [ner_model for x in er_result['url']]
    er_result['entity_resolution_model'] = [entity_resolution_model for x in er_result['url']]
  else:
    #Run NER -> concatenate new NER terms -> entity resolution
    
    #clean results
    results_ner_1_df = ner_result.toPandas()
    results_ner_1_df = results_ner_1_df[['url','fund_description','sentence','ner_chunk']]
    results_ner_1_lofd = results_ner_1_df.to_dict('records') 
    results_ner_1_lofd_new = GetChunksAndSentences(results_ner_1_lofd)

    #new doc for second round of NER
    new_doc = PrepareNewDoc(results_ner_1_lofd_new)

    #pandas df has some NaN values, convert to empty string
    new_doc = new_doc.fillna('')

    #run second NER
    new_doc = new_doc[['url','new_doc']]
    new_doc.columns = ['url','fund_description']
    ner_result_2 = RunNER(new_doc, ner_model, 'fund_description')

    #run ER
    er_result = RunER(ner_result_2, entity_resolution_model)
    er_result['entity_resolution_input'] = [entity_resolution_input for x in er_result['url']]
    er_result['ner_model'] = [ner_model for x in er_result['url']]
    er_result['entity_resolution_model'] = [entity_resolution_model for x in er_result['url']]

  return er_result

In [None]:
def GetCombos(combinations):
  '''
  input: pandas dataframe of model combinations
  output: list of lists of strings, each sub-list specifies a different 
          combination of model components
  '''

  #define results container
  r = []

  #create list of lists
  for i in range(len(combinations)):
    r.append(combinations.iloc[i].tolist())
    
  #return results
  return r

In [None]:
def Main(feed, combinations):
  '''
  input: 
    1. pandas dataframe of feed data, col_1 = url, col_2 = fund_description
    2. pandas dataframe of model combinations
  output: long-format pandas dataframes for NER and ER separately
  '''

  #define results container
  r = []

  #get list of lists of combinations of model components
  combos = GetCombos(combinations)

  #Run model for each combination of model components
  n = 1
  l_c = len(combos)
  for combo in combos:
    print('starting combo #{0} of {1}'.format(n, l_c))
    model_output = RunModel(feed, combo)
    r.append(model_output)
    #increment counter for printed progress  
    n = n + 1
  
  #concatenate results
  output = pd.concat(r, ignore_index=True)

  #process results
  #split into ner and er
  ner = output.drop(['entity'], axis=1)
  er = output.drop(['ner_chunk'], axis=1)

  #explode dataframes
  ner = ner.explode('ner_chunk')
  er = er.explode('entity')

  #add new data fields
  ner['ner_term'] = [x['result'] if x is not np.nan else np.nan for x in ner['ner_chunk']]
  ner['sentence'] = [x.metadata['sentence'] if x is not np.nan else np.nan for x in ner['ner_chunk']]
  ner['start_char'] = [x['begin'] if x is not np.nan else np.nan for x in ner['ner_chunk']]
  ner['end_char'] = [x['end'] if x is not np.nan else np.nan for x in ner['ner_chunk']]
  ner = ner.drop(['ner_chunk'], axis=1)

  er['target_text'] = [x.metadata['target_text'] if x is not np.nan else np.nan for x in er['entity']]
  er['sentence'] = [x.metadata['sentence'] if x is not np.nan else np.nan for x in er['entity']]
  er['start_char'] = [x['begin'] if x is not np.nan else np.nan for x in er['entity']]
  er['end_char'] = [x['end'] if x is not np.nan else np.nan for x in er['entity']]
  er['resolved_text'] = [x.metadata['resolved_text'] if x is not np.nan else np.nan for x in er['entity']]
  er['entity_code'] = [x['result'] if x is not np.nan else np.nan for x in er['entity']]
  er['confidence'] = [x.metadata['confidence'] if x is not np.nan else np.nan for x in er['entity']]
  er['distance'] = [x.metadata['distance'] if x is not np.nan else np.nan for x in er['entity']]
  er = er.drop(['entity'], axis=1)

  #return results
  return ner, er

# Import data

Feed Data

In [None]:
uploaded = files.upload()
feed = pd.read_csv(io.BytesIO(uploaded['example_51-100.csv']))

Saving example_51-100.csv to example_51-100.csv


Model combinations

In [None]:
uploaded = files.upload()
combinations = pd.read_csv(io.BytesIO(uploaded['model_combinations.csv']))

Saving model_combinations.csv to model_combinations (2).csv


# Execute functions

In [None]:
%%time
ner, er = Main(feed.iloc[:3], combinations.iloc[9:])

starting combo #1 of 9
starting combo #2 of 9
starting combo #3 of 9
starting combo #4 of 9
starting combo #5 of 9
starting combo #6 of 9
starting combo #7 of 9
starting combo #8 of 9
starting combo #9 of 9
CPU times: user 2.12 s, sys: 351 ms, total: 2.47 s
Wall time: 45 s


In [None]:
er.shape

(60, 13)

# Export data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
df.to_csv('/content/drive/My Drive/Crowdfunding/ex_51-100_ner.csv', index=False)