# Initial Setup

Includes downloading pandas

In [1]:
# Following code is taken from Infosys Labs
# Section must be included at the beginning of each new notebook. Remember to change the app name. 
# If you're using VirtualBox, change the below to '/home/user/spark-2.1.1-bin-hadoop2.7'
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('missing').getOrCreate()
from pyspark.ml.classification import LogisticRegression

# data was taken from Machine Learning Repository url: http://archive.ics.uci.edu/ml/datasets/Diabetes+130-US+hospitals+for+years+1999-2008
df = spark.read.csv('dataset/diabetic_data3.csv',header=True,inferSchema=True)

# Import pandas.
import pandas as pd

# Step 2: Data Exploration

Data Schema

In [2]:
# print the attributes
df.printSchema()

root
 |-- encounter_id: integer (nullable = true)
 |-- patient_nbr: integer (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- weight: string (nullable = true)
 |-- admission_type_id: integer (nullable = true)
 |-- discharge_disposition_id: integer (nullable = true)
 |-- admission_source_id: integer (nullable = true)
 |-- time_in_hospital: integer (nullable = true)
 |-- payer_code: string (nullable = true)
 |-- medical_specialty: string (nullable = true)
 |-- num_lab_procedures: integer (nullable = true)
 |-- num_procedures: integer (nullable = true)
 |-- num_medications: integer (nullable = true)
 |-- number_outpatient: integer (nullable = true)
 |-- number_emergency: integer (nullable = true)
 |-- number_inpatient: integer (nullable = true)
 |-- diag_1: string (nullable = true)
 |-- diag_2: string (nullable = true)
 |-- diag_3: string (nullable = true)
 |-- number_diagnoses: integer (nullable = true)
 |-

Data Sample

In [3]:
# Take the first five rows of data, and visualise.
pd.DataFrame(df.take(5), columns=df.columns)
# df.describe().toPandas()

Unnamed: 0,encounter_id,patient_nbr,race,gender,age,weight,admission_type_id,discharge_disposition_id,admission_source_id,time_in_hospital,...,citoglipton,insulin,glyburide-metformin,glipizide-metformin,glimepiride-pioglitazone,metformin-rosiglitazone,metformin-pioglitazone,change,diabetesMed,readmitted
0,2278392,8222157,Caucasian,Female,[0-10),,6,25,1,1,...,No,No,No,No,No,No,No,No,No,NO
1,149190,55629189,Caucasian,Female,[10-20),,1,1,7,3,...,No,Up,No,No,No,No,No,Ch,Yes,>30
2,64410,86047875,AfricanAmerican,Female,[20-30),,1,1,7,2,...,No,No,No,No,No,No,No,No,Yes,NO
3,500364,82442376,Caucasian,Male,[30-40),,1,1,7,2,...,No,Up,No,No,No,No,No,Ch,Yes,NO
4,16680,42519267,Caucasian,Male,[40-50),,1,1,7,1,...,No,Steady,No,No,No,No,No,Ch,Yes,NO


Summary

In [4]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
encounter_id,101766,1.652016456229782E8,1.0264029598345788E8,12522,443867222
patient_nbr,101766,5.4330400694947235E7,3.869635934653421E7,135,189502619
race,101766,,,AfricanAmerican,Other
gender,101766,,,Female,Unknown/Invalid
age,101766,,,[0-10),[90-100)
weight,101766,,,>200,[75-100)
admission_type_id,101766,2.024006053102215,1.4454028297561132,1,8
discharge_disposition_id,101766,3.7156417664052825,5.280165509299248,1,28
admission_source_id,101766,5.754436648782501,4.064080834283912,1,25


In [5]:
print('General Patiet Stats')
df.groupBy('race').count().orderBy('race').show()
df.groupBy('gender').count().orderBy('gender').show()
df.groupBy('weight').count().orderBy('weight').show()
# df.groupBy('weight').count().orderBy('weight').toPandas()

General Patiet Stats
+---------------+-----+
|           race|count|
+---------------+-----+
|AfricanAmerican|19210|
|          Asian|  641|
|      Caucasian|76099|
|       Hispanic| 2037|
|             NA| 2273|
|          Other| 1506|
+---------------+-----+

+---------------+-----+
|         gender|count|
+---------------+-----+
|         Female|54708|
|           Male|47055|
|Unknown/Invalid|    3|
+---------------+-----+

+---------+-----+
|   weight|count|
+---------+-----+
|     >200|    3|
|       NA|98569|
|   [0-25)|   48|
|[100-125)|  625|
|[125-150)|  145|
|[150-175)|   35|
|[175-200)|   11|
|  [25-50)|   97|
|  [50-75)|  897|
| [75-100)| 1336|
+---------+-----+



# Step 3: Preparation and transformation

Missing Data Point

#### 3.1 Data Selection
Remove unwanted attributes

In [6]:
uni_p = df.select('patient_nbr').distinct().count()
uni_e = df.select('encounter_id').distinct().count()
print(uni_p,'unique patients out of the',uni_e,'records in dataset')

71518 unique patients out of the 101766 records in dataset


In [7]:
ds_df=df.drop('encounter_id')
ds_df=ds_df.drop('patient_nbr')
ds_df=ds_df.drop('admission_type_id')

ds_df=ds_df.drop('discharge_disposition_id')
ds_df=ds_df.drop('admission_source_id')
ds_df=ds_df.drop('payer_code')
ds_df=ds_df.drop('medical_specialty')
ds_df=ds_df.drop('diag_1')
ds_df=ds_df.drop('diag_2')
ds_df=ds_df.drop('diag_3')
# ds_df=df.drop('')

# weight attribute was also dropped as 97% of the data was missing. possibly due to being bedridden
ds_df=ds_df.drop('weight')

# ds_df=ds_df.drop('diag_3')
ds_df.printSchema()

root
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- time_in_hospital: integer (nullable = true)
 |-- num_lab_procedures: integer (nullable = true)
 |-- num_procedures: integer (nullable = true)
 |-- num_medications: integer (nullable = true)
 |-- number_outpatient: integer (nullable = true)
 |-- number_emergency: integer (nullable = true)
 |-- number_inpatient: integer (nullable = true)
 |-- number_diagnoses: integer (nullable = true)
 |-- max_glu_serum: string (nullable = true)
 |-- A1Cresult: string (nullable = true)
 |-- metformin: string (nullable = true)
 |-- repaglinide: string (nullable = true)
 |-- nateglinide: string (nullable = true)
 |-- chlorpropamide: string (nullable = true)
 |-- glimepiride: string (nullable = true)
 |-- acetohexamide: string (nullable = true)
 |-- glipizide: string (nullable = true)
 |-- glyburide: string (nullable = true)
 |-- tolbutamide: string (nullable = true)
 |-- pioglitazone: str

#### Data Cleaning, Integration and Reformatting
Outliers and and extremes are kept as they provide insight

In [8]:
# drop missing data
ds_df = ds_df.na.drop()
# some data are not explicitly stated and must be done manually
ds_df=ds_df.filter("gender != 'Unknown/Invalid'")
ds_df=ds_df.filter("race != 'NA'")

#  remove ages outside of the definition of premature mortality which is between 30 and 70
ds_df=ds_df.filter("age = '[30-40)' OR age = '[40-50)' OR age = '[50-60)' OR age = '[60-70)'")

# map some values to string version

from pyspark.sql.functions import *
ds_df = ds_df.withColumn('change', regexp_replace('change', 'Ch', 'Yes'))
# ds_df = df.withColumn('readmitted', regexp_replace('readmitted', 'NO', 'low'))
# ds_df = df.withColumn('readmitted', regexp_replace('readmitted', '>30', 'medium'))
# ds_df = df.withColumn('readmitted', regexp_replace('readmitted', '<30', 'high'))

# change columns names
ds_df = ds_df.withColumnRenamed('glyburide-metformin', 'glyburide_metformin')
ds_df = ds_df.withColumnRenamed('glipizide-metformin', 'glipizide_metformin')
ds_df = ds_df.withColumnRenamed('glimepiride-pioglitazone', 'glimepiride_pioglitazone')
ds_df = ds_df.withColumnRenamed('metformin-rosiglitazone', 'metformin_rosiglitazone')
ds_df = ds_df.withColumnRenamed('metformin-pioglitazone', 'metformin_pioglitazone')
ds_df = ds_df.withColumnRenamed('max_glu_serum', 'blood_sugar_test')
ds_df = ds_df.withColumnRenamed('diabetesMed', 'diabetes_med_given')
# ds_df = ds_df.withColumnRenamed('readmitted', 'readmission_risk')

ds_df = ds_df.drop('metformin')
ds_df = ds_df.drop('repaglinide')
ds_df = ds_df.drop('nateglinide')
ds_df = ds_df.drop('chlorpropamide')
ds_df = ds_df.drop('glimepiride')
ds_df = ds_df.drop('glyburide')
ds_df = ds_df.drop('tolbutamide')
ds_df = ds_df.drop('pioglitazone')
ds_df = ds_df.drop('rosiglitazone')
ds_df = ds_df.drop('acarbose')
ds_df = ds_df.drop('miglitol')
ds_df = ds_df.drop('troglitazone')
ds_df = ds_df.drop('examide')
ds_df = ds_df.drop('citoglipton')
ds_df = ds_df.drop('insulin')
ds_df = ds_df.drop('glyburide_metformin')
ds_df = ds_df.drop('glipizide_metformin')
ds_df = ds_df.drop('glimepiride_pioglitazone')
ds_df = ds_df.drop('metformin_pioglitazone')
ds_df = ds_df.drop('acetohexamide')
ds_df = ds_df.drop('glipizide')
ds_df = ds_df.drop('tolazamide')
ds_df = ds_df.drop('metformin_rosiglitazone')

In [9]:
print(ds_df.count(),'out of',df.count(),'remain after cleaning')
ds_df.printSchema()

52047 out of 101766 remain after cleaning
root
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- time_in_hospital: integer (nullable = true)
 |-- num_lab_procedures: integer (nullable = true)
 |-- num_procedures: integer (nullable = true)
 |-- num_medications: integer (nullable = true)
 |-- number_outpatient: integer (nullable = true)
 |-- number_emergency: integer (nullable = true)
 |-- number_inpatient: integer (nullable = true)
 |-- number_diagnoses: integer (nullable = true)
 |-- blood_sugar_test: string (nullable = true)
 |-- A1Cresult: string (nullable = true)
 |-- change: string (nullable = true)
 |-- diabetes_med_given: string (nullable = true)
 |-- readmitted: string (nullable = true)



ID mapping of specific admission_type_id

In [10]:
# data was taken from Machine Learning Repository url: http://archive.ics.uci.edu/ml/datasets/Diabetes+130-US+hospitals+for+years+1999-2008
map_df = spark.read.csv('dataset/diabetic_data_IDs_mapping.csv',header=True,inferSchema=True)

map_df.printSchema()
map_df.show()

root
 |-- admission_type_id: string (nullable = true)
 |-- description: string (nullable = true)

+--------------------+--------------------+
|   admission_type_id|         description|
+--------------------+--------------------+
|                   1|           Emergency|
|                   2|              Urgent|
|                   3|            Elective|
|                   4|             Newborn|
|                   5|       Not Available|
|                   6|                NULL|
|                   7|       Trauma Center|
|                   8|          Not Mapped|
|                null|                null|
|discharge_disposi...|         description|
|                   1|  Discharged to home|
|                   2|Discharged/transf...|
|                   3|Discharged/transf...|
|                   4|Discharged/transf...|
|                   5|Discharged/transf...|
|                   6|Discharged/transf...|
|                   7|            Left AMA|
|                   8|

## Building training and testing set

### Converting columns with nominal values

In [11]:
# redo 
# Import the relevant packages.
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)

# First create a string indexer which converts every string into a number, such as male = 0 and female = 1.
# A number will be assigned to every category in the column.
race_indexer = StringIndexer(inputCol='race',outputCol='raceIndex')
gender_indexer = StringIndexer(inputCol='gender',outputCol='genderIndex')
age_indexer = StringIndexer(inputCol='age',outputCol='ageIndex')
blood_sugar_test_indexer = StringIndexer(inputCol='blood_sugar_test',outputCol='blood_sugar_testIndex')
A1Cresult_indexer = StringIndexer(inputCol='A1Cresult',outputCol='A1CresultIndex')
change_indexer = StringIndexer(inputCol='change',outputCol='changeIndex')
diabetes_med_given_indexer = StringIndexer(inputCol='diabetes_med_given',outputCol='diabetes_med_givenIndex')
readmitted_indexer = StringIndexer(inputCol='readmitted',outputCol='label')


# Now we can one hot encode these numbers. This converts the various outputs into a single vector.
# Multiple columns are collapsed into one. 
# This makes it easier to process when you have multiple classes.
race_encoder = OneHotEncoder(inputCol='raceIndex',outputCol='raceVec')
gender_encoder = OneHotEncoder(inputCol='genderIndex',outputCol='genderVec')
age_encoder = OneHotEncoder(inputCol='ageIndex',outputCol='ageVec')
blood_sugar_test_encoder = OneHotEncoder(inputCol='blood_sugar_testIndex',outputCol='blood_sugar_testVec')
A1Cresult_encoder = OneHotEncoder(inputCol='A1Cresult',outputCol='A1CresultVec')
change_encoder = OneHotEncoder(inputCol='changeIndex',outputCol='changeVec')
diabetes_med_given_encoder = OneHotEncoder(inputCol='diabetes_med_givenIndex',outputCol='diabetes_med_givenVec')

# And finally, using vector assembler to turn all of these columns into one column (named features).
assembler = VectorAssembler(inputCols=['raceVec','genderVec','ageVec','blood_sugar_testVec',
                                       'A1CresultVec','changeVec','diabetes_med_givenVec',
                                       'time_in_hospital', 'num_lab_procedures', 'num_procedures', 
                                       'num_medications', 'number_outpatient','number_emergency',
                                       'number_inpatient','number_diagnoses'],
                            outputCol="features")



### Pipelines

In [None]:
from pyspark.ml import Pipeline

# Then go through our steps. It's essentially sequential to the above.
pipeline = Pipeline(stages=[race_indexer, gender_indexer, age_indexer, blood_sugar_test_indexer,
                            A1Cresult_indexer, change_indexer, diabetes_med_given_indexer, readmitted_indexer,
                            race_encoder, gender_encoder, age_encoder,blood_sugar_test_encoder, 
                            A1Cresult_encoder, change_encoder, diabetes_med_given_encoder, assembler])



# Now that we've got a number of steps, let's apply it to the DataFrame.
pipeline_model = pipeline.fit(ds_df)

# Incorporate results into a new DataFrame.
pipe_df = pipeline_model.transform(ds_df)

# Remove all variables other than features and label. 
pipe_df = pipe_df.select('label', 'features')

## Train and Test Split

Splitting data for all iterations are don in this section

#### Iteration 1

In [None]:
# Split our data. Note that the new DataFrame is being used.
train_data1, test_data1 = pipe_df.randomSplit([0.7,0.3])
print("Training Dataset Count: " + str(train_data1.count()))
print("Test Dataset Count: " + str(test_data1.count()))

#### Iteration 2

In [None]:
# Split our data. Note that the new DataFrame is being used.
train_data2, test_data2 = pipe_df.randomSplit([0.75,0.25])
print("Training Dataset Count: " + str(train_data2.count()))
print("Test Dataset Count: " + str(test_data2.count()))

#### Iteration 3

In [None]:
# Split our data. Note that the new DataFrame is being used.
train_data3, test_data3 = pipe_df.randomSplit([0.8,0.2])
print("Training Dataset Count: " + str(train_data3.count()))
print("Test Dataset Count: " + str(test_data3.count()))

## Algorithm 1 - Logisitic Regression Model

#### Iteration 1

In [None]:
from pyspark.ml.classification import LogisticRegression

# Instantiate the model.
lr_model = LogisticRegression(featuresCol='features',labelCol='label'labelCol='label',maxIter=100, regParam=0.1, elasticNetParam=0)

# Fit the model.
lr_model = lr_model.fit(train_data1)

# And evaluate the model using the test data.
results = lr_model.transform(test_data1)

print('Model Evaluation')

import matplotlib.pyplot as plt
import numpy as np

# Visualising the coefficients. Sort from lowest to highest.
beta = np.sort(lr_model.coefficients)

# Plot the data.
plt.plot(beta)

# Add a label to the data.
plt.ylabel('Beta Coefficients')

# Show the graph. 
plt.show()

# Let's get a summary of the data.
training_summary = lr_model.summary

# Convert the DataFrame to a Pandas DataFrame.
ROC = training_summary.roc.toPandas()

# Plot the true positive and false positive rates.
plt.plot(ROC['FPR'],ROC['TPR'])

# Define the labels.
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.title('ROC Curve')
plt.show()

# Print the AUC statistic. 
print('Area Under the Curve: ' + str(training_summary.areaUnderROC))

print('Precision and Recall')
# Convert DataFrame to Pandas DataFrame.
pr = training_summary.pr.toPandas()

# Plot model recall and precision.
plt.plot(pr['recall'],pr['precision'])

# Define the labels and show the graph. 
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

#### Iteration 2

In [None]:
# Instantiate the model.
lr_model = LogisticRegression(featuresCol='features',labelCol='label'labelCol='label',maxIter=200, regParam=0.12, elasticNetParam=.73)

# Fit the model.
lr_model = lr_model.fit(train_data2)

# And evaluate the model using the test data.
results = lr_model.transform(test_data2)

print('Model Evaluation')

import matplotlib.pyplot as plt
import numpy as np

# Visualising the coefficients. Sort from lowest to highest.
beta = np.sort(lr_model.coefficients)

# Plot the data.
plt.plot(beta)

# Add a label to the data.
plt.ylabel('Beta Coefficients')

# Show the graph. 
plt.show()

# Let's get a summary of the data.
training_summary = lr_model.summary

# Convert the DataFrame to a Pandas DataFrame.
ROC = training_summary.roc.toPandas()

# Plot the true positive and false positive rates.
plt.plot(ROC['FPR'],ROC['TPR'])

# Define the labels.
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.title('ROC Curve')
plt.show()

# Print the AUC statistic. 
print('Area Under the Curve: ' + str(training_summary.areaUnderROC))

print('Precision and Recall')
# Convert DataFrame to Pandas DataFrame.
pr = training_summary.pr.toPandas()

# Plot model recall and precision.
plt.plot(pr['recall'],pr['precision'])

# Define the labels and show the graph. 
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

#### Iteration 3

In [None]:
# Instantiate the model.
lr_model = LogisticRegression(featuresCol='features',labelCol='label'labelCol='label',maxIter=150, regParam=0.34, elasticNetParam=0.1)

# Fit the model.
lr_model = lr_model.fit(train_data3)

# And evaluate the model using the test data.
results = lr_model.transform(test_data3)

print('Model Evaluation')

import matplotlib.pyplot as plt
import numpy as np

# Visualising the coefficients. Sort from lowest to highest.
beta = np.sort(lr_model.coefficients)

# Plot the data.
plt.plot(beta)

# Add a label to the data.
plt.ylabel('Beta Coefficients')

# Show the graph. 
plt.show()

# Let's get a summary of the data.
training_summary = lr_model.summary

# Convert the DataFrame to a Pandas DataFrame.
ROC = training_summary.roc.toPandas()

# Plot the true positive and false positive rates.
plt.plot(ROC['FPR'],ROC['TPR'])

# Define the labels.
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.title('ROC Curve')
plt.show()

# Print the AUC statistic. 
print('Area Under the Curve: ' + str(training_summary.areaUnderROC))

print('Precision and Recall')
# Convert DataFrame to Pandas DataFrame.
pr = training_summary.pr.toPandas()

# Plot model recall and precision.
plt.plot(pr['recall'],pr['precision'])

# Define the labels and show the graph. 
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

## Decision Tree and Random Forest Classification

#### Iteration 1

In [None]:
from pyspark.ml.classification import (DecisionTreeClassifier, RandomForestClassifier, 
                                      GBTClassifier)
from pyspark.ml import Pipeline

dt = DecisionTreeClassifier(labelCol = 'label', featuresCol = 'features')
rf = RandomForestClassifier(labelCol = 'label', featuresCol = 'features')
gb = GBTClassifier(labelCol = 'label', featuresCol = 'features')

In [None]:
dt_model = dt.fit(train_data1)
rf_model = rf.fit(train_data1)
gb_model = gb.fit(train_data1)

In [None]:
dt_predictions = dt_model.transform(test_data1)
rf_predictions = rf_model.transform(test_data1)
gb_predictions = gb_model.transform(test_data1)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

binary_evaluator = BinaryClassificationEvaluator(labelCol = 'PrivateIndex')

print('Decision Tree:', binary_evaluator.evaluate(dt_predictions))
print('Random Forest:' , binary_evaluator.evaluate(rf_predictions))
print('Gradient-boosted Trees:', binary_evaluator.evaluate(gb_predictions))

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'label', metricName = 'accuracy')
print('Decision Tree Accu:', multi_evaluator.evaluate(dt_predictions))


In [None]:
multi_evaluator.getMetricName()


In [None]:
print('Random Forest Accu:', multi_evaluator.evaluate(rf_predictions))
print('Gradient-boosted Trees Accu:', multi_evaluator.evaluate(gb_predictions))

#### Iteration 2

In [None]:
dt = DecisionTreeClassifier(labelCol = 'label', featuresCol = 'features')
rf = RandomForestClassifier(labelCol = 'label', featuresCol = 'features')
gb = GBTClassifier(labelCol = 'label', featuresCol = 'features')

dt_model = dt.fit(train_data2)
rf_model = rf.fit(train_data2)
gb_model = gb.fit(train_data2)

dt_predictions = dt_model.transform(test_data2)
rf_predictions = rf_model.transform(test_data2)
gb_predictions = gb_model.transform(test_data2)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'label', metricName = 'accuracy')
print('Decision Tree Accu:', multi_evaluator.evaluate(dt_predictions))

multi_evaluator.getMetricName()

print('Random Forest Accu:', multi_evaluator.evaluate(rf_predictions))
print('Gradient-boosted Trees Accu:', multi_evaluator.evaluate(gb_predictions))

#### Iteration 2

In [None]:
dt = DecisionTreeClassifier(labelCol = 'label', featuresCol = 'features')
rf = RandomForestClassifier(labelCol = 'label', featuresCol = 'features')
gb = GBTClassifier(labelCol = 'label', featuresCol = 'features')

dt_model = dt.fit(train_data3)
rf_model = rf.fit(train_data3)
gb_model = gb.fit(train_data3)

dt_predictions = dt_model.transform(test_data3)
rf_predictions = rf_model.transform(test_data3)
gb_predictions = gb_model.transform(test_data3)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'label', metricName = 'accuracy')
print('Decision Tree Accu:', multi_evaluator.evaluate(dt_predictions))

multi_evaluator.getMetricName()

print('Random Forest Accu:', multi_evaluator.evaluate(rf_predictions))
print('Gradient-boosted Trees Accu:', multi_evaluator.evaluate(gb_predictions))