### **Data Extraction Using PySpark and Spark-NLP**

Install pyspark

In [1]:
!pip3 install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 65kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 42.1MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=a4e782544b3bb2a1eeb3f9ed122e826977c75499fde2ca6c3079c03242efb573
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


Import pyspark SparkSession,ArrayType, StructField, StructType, StringType and IntegerType

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

Create SparkSession on the local master

In [14]:
spark = SparkSession.builder \
    .master("local") \
    .getOrCreate()

Create Parallel RDD

In [15]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
json_file_path = 'sample_data/NER.json'
rdd = sc.parallelize(json_file_path)

In [16]:
RDD = spark.read.json(json_file_path).rdd 
RDD.collect()

[Row(text="Senior Health Inc\nOffice Visit\nBounsy\n\nDOB: 1932-09-1\n\nGender: M\n\nDate Of Service: 04/3/2018\n\nProvider: Leah Marc, MD\n\nLocation: Primary Care, 55 Rainier Avenue South , Renton, WA 55555\n\nVitals and Measurements\nheart_rate - Heart rate\n\ne Value: 68bpm\ne Note:\n\nsbp - Systolic blood pressure\n\ne Value: 95mmHg\ne Note:\n\ndbp - Diastolic blood pressure\n\ne Value: 53mmHg\ne Note:\n\nspo2 - Pulse oximetry\n\ne Value: 98%\ne Note:\n\nheight - Height\n\ne Value: 67.5in\ne Note:\n\nweight - Weight\n\nValue: 144.6lbs\nNote:\n\nbmi - Body mass index\n\n \n\ne Value: 22.3kg/m?\ne Note:\n\nUnintentional weight loss\n\nDiagnosis (ICD10): Abnormal weight loss (R63.4)\n\nStatus: Active\n\nDepression, major, recurrent, moderate\n\nDiagnosis (ICD10): Major depressive disorder, recurrent, moderate (F33.1)\nStatus: Active\n\nDM (diabetes mellitus), type 2 with peripheral vascular complications\nDiagnosis (ICD10): Type 2 diabetes mellitus with diabetic peripheral angiopathy

First Create json Path then read that Json file and show the data as well as schema

In [17]:
df = spark.read.json(json_file_path)
# RDD = spark.read.json(json_file_path).rdd  
# df= spark.read.option("multiline", True).option("mode", "PERMISSIVE").json(json_file_path)
print(df.schema)
df.show()


StructType(List(StructField(text,StringType,true),StructField(title,StringType,true)))
+--------------------+--------------------+
|                text|               title|
+--------------------+--------------------+
|Senior Health Inc...|Bounsy_H78697137_...|
|Senior Health Inc...|Nancy_H68627234_2...|
|Health Primary Ca...|2018-06-15_Michel...|
|Senior Health Inc...|Geraldine_H306116...|
|Senior Health Inc...|Rita_H53014217_20...|
|Senior Health Inc...|Steve_H72654101_2...|
|Senior Health Inc...|Michael_H55890760...|
|Senior Health Inc...|Sandra_H68173617_...|
|Senior Health Inc...|Shirley_H05817212...|
|Senior Health Inc...|Maria_H53976251_2...|
+--------------------+--------------------+



**View the title column of the dataframe**

In [18]:
df.select('title').show(truncate=False)

+--------------------------------------------------------------+
|title                                                         |
+--------------------------------------------------------------+
|Bounsy_H78697137_2018-04-30_Editedregular_format              |
|Nancy_H68627234_2018-05-01_2018-05-31_Editedregular_format    |
|2018-06-15_Micheline_Editedregular_format                     |
|Geraldine_H30611625_2018-05-01_2018-05-31_Editedregular_format|
|Rita_H53014217_2018-06-01_2018-06-22_Editedregular_format     |
|Steve_H72654101_2018-06-01_2018-06-22_Editedregular_format    |
|Michael_H55890760_2018-05-01_2018-05-31_Editedregular_format  |
|Sandra_H68173617_2018-04-27_Editedregular_format              |
|Shirley_H05817212_2018-05-01_2018-05-31_Editedregular_format  |
|Maria_H53976251_2018-05-01_2018-05-31_Editedregular_format    |
+--------------------------------------------------------------+



Take a look at what is contained in the Text column of the dataframe . So we can then Extract Entities from it

In [19]:
df.select('text').show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Import spark sql functions library 

In [20]:
import pyspark.sql.functions as f

** Create Spark-NLP extractions from created Data frame**

Note: Some features have double new lines(\n\n) and that must be taken care of meanwhile a first split was carried out 

In [21]:
df.select(
        "title",
        f.split("text", "\n\n").alias("text"),
        f.posexplode(f.split("text", "\n\n")).alias("pos", "val")).drop("val").select("title", 
                                                                                    f.concat(f.lit("visit"), 
                                                                                             f.col("pos").cast("string")).alias("colname"), 
                                                                                    f.expr("text[pos]").alias("val")).groupby("title").pivot("colname").agg(f.first("val")).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------

Loop through all columns

In [22]:
cols = list(range(166))
cols_repeat = [str(i) for i in cols]

In [23]:
cols_repeat[4]

'4'

Single out the Text column for a look at the entity of interest

In [24]:
df.select('text').take(1)[0][0]

"Senior Health Inc\nOffice Visit\nBounsy\n\nDOB: 1932-09-1\n\nGender: M\n\nDate Of Service: 04/3/2018\n\nProvider: Leah Marc, MD\n\nLocation: Primary Care, 55 Rainier Avenue South , Renton, WA 55555\n\nVitals and Measurements\nheart_rate - Heart rate\n\ne Value: 68bpm\ne Note:\n\nsbp - Systolic blood pressure\n\ne Value: 95mmHg\ne Note:\n\ndbp - Diastolic blood pressure\n\ne Value: 53mmHg\ne Note:\n\nspo2 - Pulse oximetry\n\ne Value: 98%\ne Note:\n\nheight - Height\n\ne Value: 67.5in\ne Note:\n\nweight - Weight\n\nValue: 144.6lbs\nNote:\n\nbmi - Body mass index\n\n \n\ne Value: 22.3kg/m?\ne Note:\n\nUnintentional weight loss\n\nDiagnosis (ICD10): Abnormal weight loss (R63.4)\n\nStatus: Active\n\nDepression, major, recurrent, moderate\n\nDiagnosis (ICD10): Major depressive disorder, recurrent, moderate (F33.1)\nStatus: Active\n\nDM (diabetes mellitus), type 2 with peripheral vascular complications\nDiagnosis (ICD10): Type 2 diabetes mellitus with diabetic peripheral angiopathy without g

In [25]:
df.show()

+--------------------+--------------------+
|                text|               title|
+--------------------+--------------------+
|Senior Health Inc...|Bounsy_H78697137_...|
|Senior Health Inc...|Nancy_H68627234_2...|
|Health Primary Ca...|2018-06-15_Michel...|
|Senior Health Inc...|Geraldine_H306116...|
|Senior Health Inc...|Rita_H53014217_20...|
|Senior Health Inc...|Steve_H72654101_2...|
|Senior Health Inc...|Michael_H55890760...|
|Senior Health Inc...|Sandra_H68173617_...|
|Senior Health Inc...|Shirley_H05817212...|
|Senior Health Inc...|Maria_H53976251_2...|
+--------------------+--------------------+



The double new line in the json file was appopriately dealt with after using regexp to replace with single line

In [26]:
df_new = df.withColumn('text', f.regexp_replace('text', '\n\n', '\n'))
df_new = df_new.withColumn('text', f.regexp_replace('text', '\nDOB', ' DOB'))
df_new = df_new.withColumn('text', f.regexp_replace('text', 'DOB', '\nDOB'))
df_new = df_new.withColumn('text', f.regexp_replace('text', '\nGender', ' Gender'))
df_new = df_new.withColumn('text', f.regexp_replace('text', 'Gender', '\nGender'))
df_new = df_new.withColumn('text', f.regexp_replace('text', 'cos', '\nDOB'))
df_new = df_new.withColumn('text', f.regexp_replace('text', '\n', ','))
df_new.show()

+--------------------+--------------------+
|                text|               title|
+--------------------+--------------------+
|Senior Health Inc...|Bounsy_H78697137_...|
|Senior Health Inc...|Nancy_H68627234_2...|
|Health Primary Ca...|2018-06-15_Michel...|
|Senior Health Inc...|Geraldine_H306116...|
|Senior Health Inc...|Rita_H53014217_20...|
|Senior Health Inc...|Steve_H72654101_2...|
|Senior Health Inc...|Michael_H55890760...|
|Senior Health Inc...|Sandra_H68173617_...|
|Senior Health Inc...|Shirley_H05817212...|
|Senior Health Inc...|Maria_H53976251_2...|
+--------------------+--------------------+



The new json string into object to easily view the clean Json file for entities

In [27]:
import json
df_json = df_new.toJSON()

for row in df_json.collect():
    #json string
    print(row) 

    #json object
    line = json.loads(row) 
    #print(line[some_key]) 

{"text":"Senior Health Inc,Office Visit,Bounsy ,DOB: 1932-09-1 ,Gender: M,Date Of Service: 04/3/2018,Provider: Leah Marc, MD,Location: Primary Care, 55 Rainier Avenue South , Renton, WA 55555,Vitals and Measurements,heart_rate - Heart rate,e Value: 68bpm,e Note:,sbp - Systolic blood pressure,e Value: 95mmHg,e Note:,dbp - Diastolic blood pressure,e Value: 53mmHg,e Note:,spo2 - Pulse oximetry,e Value: 98%,e Note:,height - Height,e Value: 67.5in,e Note:,weight - Weight,Value: 144.6lbs,Note:,bmi - Body mass index, ,e Value: 22.3kg/m?,e Note:,Unintentional weight loss,Diagnosis (ICD10): Abnormal weight loss (R63.4),Status: Active,Depression, major, recurrent, moderate,Diagnosis (ICD10): Major depressive disorder, recurrent, moderate (F33.1),Status: Active,DM (diabetes mellitus), type 2 with peripheral vascular complications,Diagnosis (ICD10): Type 2 diabetes mellitus with diabetic peripheral angiopathy without gangrene (E11.51),Status: Active,COPD (chronic obstructive pulmonary disease),Dia

This step then searches for entities of interest 

In [28]:
## PATIENTS NAME
# df_new.select(
#         "title",
#         f.split("text", ",").getItem(2)).show()

## Doctors Name - Provider
# df_new.select("title",f.split("text", ",").getItem(6)).show()

## Phone number
# 24 hour Phone:

## Email
# None

## Address
# df_new.select("title",f.split("text", ",").getItem(9)).show(truncate=False)

## Organization
# df_new.select("title",f.split("text", ",").getItem(0)).show()

## DOB
# df_new.select("title",f.split("text", ",").getItem(3)).show()

## Location from given Data
# df_new.select("title",f.split("text", ",").getItem(8)).show(truncate=False)

# Gender
# df_new.select("title",f.split("text", ",").getItem(4)).show()

#Phone
#df_new.select("title",f.split("text", ",").getItem(188)).show(truncate=False)

#Fax
#df_new.select("title",f.split("text", ",").getItem(189)).show(truncate=False)

#df_new.printSchema()
df_new.select("title",f.split("text", ",").getItem(188)).show(truncate=False)

+--------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+
|title                                                         |split(text, ,, -1)[188]                                                                                             |
+--------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------+
|Bounsy_H78697137_2018-04-30_Editedregular_format              |null                                                                                                                |
|Nancy_H68627234_2018-05-01_2018-05-31_Editedregular_format    | but overall good foot elevation with steps. balancing with arms                                                    |
|2018-06-15_Micheline_Editedregular_format                     |null                      

**Organization,Patient name,DOB,  Location,Gender, Doctor’s name,phone number, email address were extracted from the given data.**

There is no email address contained in the file and only one Phone Number

In [29]:
df1 = df_new.withColumn('Organization', f.split(df_new['text'], ',').getItem(0))\
            .withColumn('PatientsName', f.split(df_new['text'], ',').getItem(2))\
            .withColumn('nDOB', f.split(df_new['text'], ',').getItem(3))\
            .withColumn('Address', f.split(df_new['text'], ',').getItem(9))\
            .withColumn('nLocation',f.split(df_new['text'], ",").getItem(8))\
            .withColumn('nGender',f.split(df_new['text'], ",").getItem(4))\
            .withColumn('nDoctorsName', f.split(df_new['text'], ",").getItem(6))
cols = ("nDOB","nLocation","nGender","nDoctorsName")

df1_update = df1.withColumn('Location', f.split(df1['nLocation'], ":").getItem(1))\
                .withColumn('DOB', f.split(df1['nDOB'], ":").getItem(1))\
                .withColumn('Gender', f.split(df1['nGender'], ":").getItem(1))\
                .withColumn('DoctorsName', f.split(df1['nDoctorsName'], ":").getItem(1))\
                .drop(*cols)
df1_update.show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Extracted Entity Solution. Kindly note there is no email address in Json file and there is only a single Phone Number.

In [30]:
df1_update.show()

+--------------------+--------------------+-------------------+------------+--------------------+-------------+------------+------+--------------------+
|                text|               title|       Organization|PatientsName|             Address|     Location|         DOB|Gender|         DoctorsName|
+--------------------+--------------------+-------------------+------------+--------------------+-------------+------------+------+--------------------+
|Senior Health Inc...|Bounsy_H78697137_...|  Senior Health Inc|     Bounsy | 55 Rainier Avenu...| Primary Care|  1932-09-1 |     M|           Leah Marc|
|Senior Health Inc...|Nancy_H68627234_2...|  Senior Health Inc|      Nancy | 5555 South Sable...| Primary Care|  1950-04-2 |     F|        Anibal Salsa|
|Health Primary Ca...|2018-06-15_Michel...|Health Primary Care|  Micheline |          Suite 201 |         null| 1940-11-27 |     F|           Daniel MD|
|Senior Health Inc...|Geraldine_H306116...|  Senior Health Inc| Geraldine  | 123 E

Result is returned in a rrd data frame format (standard pyspark format)

In [31]:
df1_update.rdd

MapPartitionsRDD[109] at javaToPython at NativeMethodAccessorImpl.java:0

In [32]:
df1_update.write.format("json").mode("overwrite").save('/content/sample_data/NLP_Entity.json')