Implementasi Python Spark untuk klasifikasi gejala yang di alami oleh pasien pada FHIR dataset berdasarkan observasi yang dihasilkan. Ada 4 label klasifikasi,  yaitu: ambulatory, wellness, outpatient, dan wellness. 

Ada beberapa step yang dilakukan yaitu:



1.   **Install Spark pada Google Colab**
2.   **Data preparation** 
3.   **Decision Tree**
4.   **Evaluation**

# Step 1: Installing Spark on Google Colab

In [1]:
# Checking folder
!ls

sample_data


In [2]:
# Installing JDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
# Getting Spark installer (check the path on spark.apache.org)
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz

In [4]:
# Checking if the file is copied
!ls

sample_data  spark-3.2.1-bin-hadoop2.7.tgz


In [5]:
# Untar the Spark installer
!tar -xvf spark-3.2.1-bin-hadoop2.7.tgz

spark-3.2.1-bin-hadoop2.7/
spark-3.2.1-bin-hadoop2.7/LICENSE
spark-3.2.1-bin-hadoop2.7/NOTICE
spark-3.2.1-bin-hadoop2.7/R/
spark-3.2.1-bin-hadoop2.7/R/lib/
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/DESCRIPTION
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/INDEX
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/Rd.rds
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/features.rds
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/hsearch.rds
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/links.rds
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/nsInfo.rds
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/Meta/package.rds
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/NAMESPACE
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/R/
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/R/SparkR
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/R/SparkR.rdb
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/R/SparkR.rdx
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/help/
spark-3.2.1-bin-hadoop2.7/R/lib/SparkR/help/An

In [6]:
# Checking the Spark folder after untar
!ls 

sample_data  spark-3.2.1-bin-hadoop2.7	spark-3.2.1-bin-hadoop2.7.tgz


In [7]:
# Installing findspark - a python library to find Spark
!pip install -q findspark

In [8]:
# Setting environment variables: Setting Java and Spark home based on the location where they are stored
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"

In [9]:
# Creating a local Spark session
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Step 2: Data Preparation

In [10]:
# Loading data observations in pandas dataframe
import pandas as pd
observation_df = pd.read_csv("https://raw.githubusercontent.com/laksmitawidya/fhir-project/master/observations.csv") 
observation_df.head()

Unnamed: 0,DATE,PATIENT,ENCOUNTER,CODE,DESCRIPTION,VALUE,UNITS,TYPE
0,2012-01-23T17:45:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e88bc3a9-007c-405e-aabc-792a38f4aa2b,8302-2,Body Height,193.3,cm,numeric
1,2012-01-23T17:45:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e88bc3a9-007c-405e-aabc-792a38f4aa2b,72514-3,Pain severity - 0-10 verbal numeric rating [Sc...,2.0,{score},numeric
2,2012-01-23T17:45:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e88bc3a9-007c-405e-aabc-792a38f4aa2b,29463-7,Body Weight,87.8,kg,numeric
3,2012-01-23T17:45:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e88bc3a9-007c-405e-aabc-792a38f4aa2b,39156-5,Body Mass Index,23.5,kg/m2,numeric
4,2012-01-23T17:45:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e88bc3a9-007c-405e-aabc-792a38f4aa2b,8462-4,Diastolic Blood Pressure,82.0,mm[Hg],numeric


In [11]:
# Loading data encounter in pandas dataframe
import pandas as pd
encounter_df = pd.read_csv("https://raw.githubusercontent.com/laksmitawidya/fhir-project/master/encounters.csv") 
encounter_df.head()

Unnamed: 0,Id,START,STOP,PATIENT,ORGANIZATION,PROVIDER,PAYER,ENCOUNTERCLASS,CODE,DESCRIPTION,BASE_ENCOUNTER_COST,TOTAL_CLAIM_COST,PAYER_COVERAGE,REASONCODE,REASONDESCRIPTION
0,d0c40d10-8d87-447e-836e-99d26ad52ea5,2010-01-23T17:45:28Z,2010-01-23T18:10:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e002090d-4e92-300e-b41e-7d1f21dee4c6,e6283e46-fd81-3611-9459-0edb1c3da357,6e2f1a2d-27bd-3701-8d08-dae202c58632,ambulatory,185345009,Encounter for symptom,129.16,129.16,54.16,10509002.0,Acute bronchitis (disorder)
1,e88bc3a9-007c-405e-aabc-792a38f4aa2b,2012-01-23T17:45:28Z,2012-01-23T18:00:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,772ee193-bb9f-30eb-9939-21e86c8e4da5,6f1d59a7-a5bd-3cf9-9671-5bad2f351c28,6e2f1a2d-27bd-3701-8d08-dae202c58632,wellness,162673000,General examination of patient (procedure),129.16,129.16,129.16,,
2,8f104aa7-4ca9-4473-885a-bba2437df588,2001-05-01T15:02:18Z,2001-05-01T15:17:18Z,1d604da9-9a81-4ba9-80c2-de3375d59b40,5d4b9df1-93ae-3bc9-b680-03249990e558,af01a385-31d3-3c77-8fdb-2867fe88df2f,b1c428d6-4f07-31e0-90f0-68ffa6ff8c76,ambulatory,185345009,Encounter for symptom,129.16,129.16,0.0,36971009.0,Sinusitis (disorder)
3,b85c339a-6076-43ed-b9d0-9cf013dec49d,2011-07-28T15:02:18Z,2011-07-28T15:17:18Z,1d604da9-9a81-4ba9-80c2-de3375d59b40,3dc9bb2d-5d66-3e61-bf9a-e234c6433577,bb17e691-262b-3546-93d5-d88e7de93246,b1c428d6-4f07-31e0-90f0-68ffa6ff8c76,wellness,162673000,General examination of patient (procedure),129.16,129.16,0.0,,
4,dae2b7cb-1316-4b78-954f-fa610a6c6d0e,2010-07-27T12:58:08Z,2010-07-27T13:28:08Z,10339b10-3cd1-4ac3-ac13-ec26728cb592,b03dba4f-892f-365c-bfd1-bfcfa7a98d5d,7ed6b84a-b847-3744-9d42-15c42297a0c2,d47b3510-2895-3b70-9897-342d681c769d,wellness,162673000,General examination of patient (procedure),129.16,129.16,129.16,,


In [12]:
# Dropping all null rows & change NaN to 0
observation_df.dropna(how="all", inplace=True)
observation_df.fillna(0)
observation_df.shape

encounter_df.dropna(how="all", inplace=True)
encounter_df.fillna(0)
encounter_df.shape

(53346, 15)

In [13]:
# Replacing carriage return and new line characters with a space
observation_df = observation_df.replace({r'\r\n': ' '}, regex=True)
observation_df.head()

encounter_df = encounter_df.replace({r'\r\n': ' '}, regex=True)
encounter_df.head()

Unnamed: 0,Id,START,STOP,PATIENT,ORGANIZATION,PROVIDER,PAYER,ENCOUNTERCLASS,CODE,DESCRIPTION,BASE_ENCOUNTER_COST,TOTAL_CLAIM_COST,PAYER_COVERAGE,REASONCODE,REASONDESCRIPTION
0,d0c40d10-8d87-447e-836e-99d26ad52ea5,2010-01-23T17:45:28Z,2010-01-23T18:10:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e002090d-4e92-300e-b41e-7d1f21dee4c6,e6283e46-fd81-3611-9459-0edb1c3da357,6e2f1a2d-27bd-3701-8d08-dae202c58632,ambulatory,185345009,Encounter for symptom,129.16,129.16,54.16,10509002.0,Acute bronchitis (disorder)
1,e88bc3a9-007c-405e-aabc-792a38f4aa2b,2012-01-23T17:45:28Z,2012-01-23T18:00:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,772ee193-bb9f-30eb-9939-21e86c8e4da5,6f1d59a7-a5bd-3cf9-9671-5bad2f351c28,6e2f1a2d-27bd-3701-8d08-dae202c58632,wellness,162673000,General examination of patient (procedure),129.16,129.16,129.16,,
2,8f104aa7-4ca9-4473-885a-bba2437df588,2001-05-01T15:02:18Z,2001-05-01T15:17:18Z,1d604da9-9a81-4ba9-80c2-de3375d59b40,5d4b9df1-93ae-3bc9-b680-03249990e558,af01a385-31d3-3c77-8fdb-2867fe88df2f,b1c428d6-4f07-31e0-90f0-68ffa6ff8c76,ambulatory,185345009,Encounter for symptom,129.16,129.16,0.0,36971009.0,Sinusitis (disorder)
3,b85c339a-6076-43ed-b9d0-9cf013dec49d,2011-07-28T15:02:18Z,2011-07-28T15:17:18Z,1d604da9-9a81-4ba9-80c2-de3375d59b40,3dc9bb2d-5d66-3e61-bf9a-e234c6433577,bb17e691-262b-3546-93d5-d88e7de93246,b1c428d6-4f07-31e0-90f0-68ffa6ff8c76,wellness,162673000,General examination of patient (procedure),129.16,129.16,0.0,,
4,dae2b7cb-1316-4b78-954f-fa610a6c6d0e,2010-07-27T12:58:08Z,2010-07-27T13:28:08Z,10339b10-3cd1-4ac3-ac13-ec26728cb592,b03dba4f-892f-365c-bfd1-bfcfa7a98d5d,7ed6b84a-b847-3744-9d42-15c42297a0c2,d47b3510-2895-3b70-9897-342d681c769d,wellness,162673000,General examination of patient (procedure),129.16,129.16,129.16,,


## Create PySpark DataFrame

In [14]:
# Converting Pandas Dataframe into Spark Dataframe
encounter_df = encounter_df.astype(str) # Converting pandas df to string first
encounter_df.dtypes
encounter_sdf = spark.createDataFrame(encounter_df)
encounter_sdf.show(10, False) # False allows us to show entire content of the columns

+------------------------------------+--------------------+--------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+--------------+---------+------------------------------------------+-------------------+----------------+--------------+-----------+----------------------------------+
|Id                                  |START               |STOP                |PATIENT                             |ORGANIZATION                        |PROVIDER                            |PAYER                               |ENCOUNTERCLASS|CODE     |DESCRIPTION                               |BASE_ENCOUNTER_COST|TOTAL_CLAIM_COST|PAYER_COVERAGE|REASONCODE |REASONDESCRIPTION                 |
+------------------------------------+--------------------+--------------------+------------------------------------+------------------------------------+------------------------------------+-----------------

In [15]:
# Converting Pandas Dataframe into Spark Dataframe
observation_df = observation_df.astype(str) # Converting pandas df to string first
observation_df.dtypes
observation_sdf = spark.createDataFrame(observation_df)
observation_sdf.show(10, False) # False allows us to show entire content of the columns

+--------------------+------------------------------------+------------------------------------+-------+-------------------------------------------------------------+-----+-------+-------+
|DATE                |PATIENT                             |ENCOUNTER                           |CODE   |DESCRIPTION                                                  |VALUE|UNITS  |TYPE   |
+--------------------+------------------------------------+------------------------------------+-------+-------------------------------------------------------------+-----+-------+-------+
|2012-01-23T17:45:28Z|034e9e3b-2def-4559-bb2a-7850888ae060|e88bc3a9-007c-405e-aabc-792a38f4aa2b|8302-2 |Body Height                                                  |193.3|cm     |numeric|
|2012-01-23T17:45:28Z|034e9e3b-2def-4559-bb2a-7850888ae060|e88bc3a9-007c-405e-aabc-792a38f4aa2b|72514-3|Pain severity - 0-10 verbal numeric rating [Score] - Reported|2.0  |{score}|numeric|
|2012-01-23T17:45:28Z|034e9e3b-2def-4559-bb2a-7850888ae

## Inner Join Query PySpark

In [16]:
fhir_sdf = observation_sdf.join(encounter_sdf,(observation_sdf.ENCOUNTER == encounter_sdf.Id) & (observation_sdf.PATIENT == encounter_sdf.PATIENT), "inner").select(observation_sdf.PATIENT, 'VALUE', 'UNITS', observation_sdf.DESCRIPTION,'ENCOUNTERCLASS', encounter_sdf.DESCRIPTION.alias("INFORMATION")).collect()
fhir_sdf 

[Row(PATIENT='01207ecd-9dff-4754-8887-4652eda231e2', VALUE='62.4', UNITS='cm', DESCRIPTION='Body Height', ENCOUNTERCLASS='wellness', INFORMATION='Well child visit (procedure)'),
 Row(PATIENT='01207ecd-9dff-4754-8887-4652eda231e2', VALUE='1.0', UNITS='{score}', DESCRIPTION='Pain severity - 0-10 verbal numeric rating [Score] - Reported', ENCOUNTERCLASS='wellness', INFORMATION='Well child visit (procedure)'),
 Row(PATIENT='01207ecd-9dff-4754-8887-4652eda231e2', VALUE='7.0', UNITS='kg', DESCRIPTION='Body Weight', ENCOUNTERCLASS='wellness', INFORMATION='Well child visit (procedure)'),
 Row(PATIENT='01207ecd-9dff-4754-8887-4652eda231e2', VALUE='80.4', UNITS='%', DESCRIPTION='Weight-for-length Per age and sex', ENCOUNTERCLASS='wellness', INFORMATION='Well child visit (procedure)'),
 Row(PATIENT='01207ecd-9dff-4754-8887-4652eda231e2', VALUE='41.1', UNITS='cm', DESCRIPTION='Head Occipital-frontal circumference', ENCOUNTERCLASS='wellness', INFORMATION='Well child visit (procedure)'),
 Row(PATIEN

In [17]:
# Columns in df
fhir_sdf = spark.createDataFrame(fhir_sdf)

In [18]:
# Schema: Datatypes associated with columns
fhir_sdf

DataFrame[PATIENT: string, VALUE: string, UNITS: string, DESCRIPTION: string, ENCOUNTERCLASS: string, INFORMATION: string]

In [19]:
# Total number of rows
fhir_sdf.select('ENCOUNTERCLASS').distinct().count()

5

## Categorical columns Transformation 

In [20]:
from pyspark.ml.feature import StringIndexer

# Create an indexer
indexer = StringIndexer(inputCol='UNITS', outputCol='unit_id')

# Indexer identifies categories in the data
indexer_model = indexer.fit(fhir_sdf)

# Indexer creates a new column with numeric index values
fhir_indexed = indexer_model.transform(fhir_sdf)
fhir_indexed = StringIndexer(inputCol='VALUE', outputCol='value_id').fit(fhir_indexed).transform(fhir_indexed)
fhir_indexed = StringIndexer(inputCol='PATIENT', outputCol='patient_id').fit(fhir_indexed).transform(fhir_indexed)
fhir_indexed = StringIndexer(inputCol='INFORMATION', outputCol='info_id').fit(fhir_indexed).transform(fhir_indexed)
fhir_indexed = StringIndexer(inputCol='ENCOUNTERCLASS', outputCol='label').fit(fhir_indexed).transform(fhir_indexed)
fhir_indexed = StringIndexer(inputCol='DESCRIPTION', outputCol='description_id').fit(fhir_indexed).transform(fhir_indexed)

In [21]:
fhir_indexed

DataFrame[PATIENT: string, VALUE: string, UNITS: string, DESCRIPTION: string, ENCOUNTERCLASS: string, INFORMATION: string, unit_id: double, value_id: double, patient_id: double, info_id: double, label: double, description_id: double]

## Assembling columns

In [28]:
from pyspark.ml.feature import VectorAssembler

# Create an assembler object
assembler = VectorAssembler(inputCols=[
    'unit_id', 
    'value_id',
    'patient_id',
    'description_id'
], outputCol='features_unnormalized')

# Consolidate predictor columns
fhir_assembled = assembler.transform(fhir_indexed)

# Check the resulting column
fhir_assembled.select('features_unnormalized').show(5, truncate=False)

+-----------------------+
|features_unnormalized  |
+-----------------------+
|[6.0,1274.0,1042.0,3.0]|
|[4.0,1.0,1042.0,0.0]   |
|[5.0,176.0,1042.0,4.0] |
|[8.0,335.0,1042.0,46.0]|
|[6.0,825.0,1042.0,45.0]|
+-----------------------+
only showing top 5 rows



## Normalization

In [29]:
from pyspark.ml.feature import Normalizer
normalizer = Normalizer().setInputCol("features_unnormalized").setOutputCol("features").setP(1.0)
fhir_normalized = normalizer.transform(fhir_assembled)
fhir_normalized.select('features').show(5, truncate=False)

+-----------------------------------------------------------------------------------+
|features                                                                           |
+-----------------------------------------------------------------------------------+
|[0.0025806451612903226,0.5479569892473118,0.4481720430107527,0.0012903225806451613]|
|[0.0038204393505253103,9.551098376313276E-4,0.9952244508118434,0.0]                |
|[0.004074979625101874,0.14343928280358598,0.8492257538712307,0.0032599837000814994]|
|[0.005590496156533892,0.23410202655485673,0.7281621243885394,0.03214535290006988]  |
|[0.0031282586027111575,0.4301355578727841,0.543274244004171,0.02346193952033368]   |
+-----------------------------------------------------------------------------------+
only showing top 5 rows



# Step 3: Decision Tree

In [24]:
# Split into training and test sets in a 80:20 ratio
fhir_train, fhir_test = fhir_normalized.randomSplit([0.8, 0.2], seed=17)

# Check that training set has around 80% of records
training_ratio = fhir_train.count() / fhir_normalized.count()
print(training_ratio)

0.7988482701775491


In [25]:
from pyspark.ml.classification import DecisionTreeClassifier

# Create a classifier object and fit to the training data
tree = DecisionTreeClassifier()
tree_model = tree.fit(fhir_train)

# Create predictions for the testing data and take a look at the predictions
prediction = tree_model.transform(fhir_test)
prediction.select('label', 'prediction', 'probability').show(5, False)

+-----+----------+------------------------------------------------------------------------------------------------------+
|label|prediction|probability                                                                                           |
+-----+----------+------------------------------------------------------------------------------------------------------+
|0.0  |0.0       |[0.858298201841177,0.08080529983653102,0.04886001892798761,0.006624795663770111,0.0054116837305342854]|
|0.0  |0.0       |[0.858298201841177,0.08080529983653102,0.04886001892798761,0.006624795663770111,0.0054116837305342854]|
|0.0  |0.0       |[0.858298201841177,0.08080529983653102,0.04886001892798761,0.006624795663770111,0.0054116837305342854]|
|0.0  |0.0       |[0.858298201841177,0.08080529983653102,0.04886001892798761,0.006624795663770111,0.0054116837305342854]|
|0.0  |0.0       |[0.858298201841177,0.08080529983653102,0.04886001892798761,0.006624795663770111,0.0054116837305342854]|
+-----+----------+------

# Step 4: Evaluation

In [26]:
# Convert data using map matching with multiclass metrics evaluation
prediction_rdd = prediction.select('label', 'prediction')
prediction_map = prediction_rdd.rdd.map(lambda pred: (pred.label, pred.prediction)).collect()

In [33]:
# Creating Spark Context
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [34]:
from pyspark.mllib.evaluation import MulticlassMetrics
prediction.groupBy('label', 'prediction').count().show()

predictionAndLabels = sc.parallelize(prediction_map)
metrics = MulticlassMetrics(predictionAndLabels)
metrics.confusionMatrix().toArray()

# Evaluation 
print("Acuracy: ", metrics.accuracy)
print("Precision: ", metrics.precision(1.0))
print("Recall: ", metrics.recall(2.0))
print("fMeasure: ", metrics.fMeasure(0.0, 2.0))

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  2.0|       0.0| 1947|
|  1.0|       1.0| 5226|
|  3.0|       2.0| 1183|
|  4.0|       2.0|   58|
|  0.0|       1.0|  860|
|  1.0|       0.0| 3699|
|  2.0|       2.0| 5203|
|  3.0|       1.0|  395|
|  2.0|       3.0|  248|
|  2.0|       1.0|  639|
|  1.0|       2.0| 2502|
|  0.0|       0.0|27522|
|  1.0|       3.0|  557|
|  0.0|       2.0| 2069|
|  4.0|       0.0|  195|
|  3.0|       3.0| 1521|
|  0.0|       3.0|   75|
|  3.0|       0.0|  216|
|  4.0|       1.0|   61|
|  4.0|       3.0|    1|
+-----+----------+-----+





Acuracy:  0.728574856488916
Precision:  0.43608144192256343
Recall:  0.47235587834770765
fMeasure:  0.8347993836522246


In [35]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

# Find weighted precision
multi_evaluator = MulticlassClassificationEvaluator()
weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: "accuracy"})
print(weighted_precision)

0.728574856488916
