# Hospital Readmission Prediction- Decision Tree Classification
#### This data set is made available by the Center for Clinical and Translational Research, Virginia Commonwealth University. It contains data about 10 years of clinical care at 130 US Hospitals. Each row represents a single patient. The columns include the characteristics of deidentified diabetes patients. This is a binary classification task to predict whether a diabetes patient is readmitted to the hospital within 30 days of their discharge (1=Yes, 0=No). This is an important performance metric for hospitals as they try to minimize these types of readmissions.

### Common Imports and Data load

In [0]:
# File location and type
file_location = "/FileStore/tables/healthcare.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

race,gender,age,admission_type,discharge_disposition,admission_source,time_in_hospital,payer_code,medical_specialty,num_lab_procedures,num_procedures,num_medications,number_outpatient,number_emergency,number_inpatient,diag_1,diag_2,diag_3,number_diagnoses,max_glu_serum,A1Cresult,insulin,change,diabetesMed,readmitted
Other,Female,70-80,2,3,1,14,,InternalMedicine,32,3,15,0,0,0,486,404,428,9,,,No,No,No,1
Caucasian,Female,80-90,1,3,5,4,MC,,44,0,15,0,0,0,38,438,599,9,,,Steady,Ch,Yes,0
AfricanAmerican,Male,50-60,5,1,1,6,HM,,29,1,15,0,0,0,296,585,428,9,,,Up,Ch,Yes,1
Caucasian,Female,50-60,1,1,6,3,HM,InternalMedicine,47,0,10,0,0,0,250.02,401,493,4,,>8,No,Ch,Yes,0
AfricanAmerican,Female,40-50,3,1,1,4,UN,Hematology,92,0,15,0,0,0,486,287,595,7,,>7,No,No,No,0
Caucasian,Male,50-60,3,18,1,4,,,43,4,30,0,0,0,414,411,496,8,,,Steady,Ch,Yes,0
Caucasian,Male,50-60,1,6,7,8,MC,Emergency/Trauma,60,2,21,0,0,0,491,486,344,8,,,Down,Ch,Yes,0
Caucasian,Male,70-80,3,1,1,2,MC,InternalMedicine,43,0,12,0,0,1,785,V45,786,7,,,No,No,Yes,1
Caucasian,Female,70-80,5,3,17,5,MC,Family/GeneralPractice,17,1,8,0,0,1,428,403,707,8,>200,,No,No,Yes,0
Caucasian,Female,40-50,2,1,7,3,BC,Surgery-General,50,1,23,1,0,10,38,250.4,403,9,,,Up,Ch,Yes,0


### Data Pre-processing

In [0]:
from pyspark.sql.functions import expr

In [0]:
# Creating total_visits by adding all types of visits
cols_list_1 = ['number_outpatient', 'number_emergency', 'number_inpatient']
# Creating an addition expression using `join`
expression = '+'.join(cols_list_1)
df = df.withColumn('total_visits', expr(expression))

# Creating total_procedures by adding emergency and non-emergency visits
cols_list_2 = ['num_lab_procedures', 'num_procedures']
# Creating an addition expression using `join`
expression = '+'.join(cols_list_2)
df = df.withColumn('total_procedures', expr(expression))
display(df)

race,gender,age,admission_type,discharge_disposition,admission_source,time_in_hospital,payer_code,medical_specialty,num_lab_procedures,num_procedures,num_medications,number_outpatient,number_emergency,number_inpatient,diag_1,diag_2,diag_3,number_diagnoses,max_glu_serum,A1Cresult,insulin,change,diabetesMed,readmitted,total_visits,total_procedures
Other,Female,70-80,2,3,1,14,,InternalMedicine,32,3,15,0,0,0,486,404,428,9,,,No,No,No,1,0.0,35.0
Caucasian,Female,80-90,1,3,5,4,MC,,44,0,15,0,0,0,38,438,599,9,,,Steady,Ch,Yes,0,0.0,44.0
AfricanAmerican,Male,50-60,5,1,1,6,HM,,29,1,15,0,0,0,296,585,428,9,,,Up,Ch,Yes,1,0.0,30.0
Caucasian,Female,50-60,1,1,6,3,HM,InternalMedicine,47,0,10,0,0,0,250.02,401,493,4,,>8,No,Ch,Yes,0,0.0,47.0
AfricanAmerican,Female,40-50,3,1,1,4,UN,Hematology,92,0,15,0,0,0,486,287,595,7,,>7,No,No,No,0,0.0,92.0
Caucasian,Male,50-60,3,18,1,4,,,43,4,30,0,0,0,414,411,496,8,,,Steady,Ch,Yes,0,0.0,47.0
Caucasian,Male,50-60,1,6,7,8,MC,Emergency/Trauma,60,2,21,0,0,0,491,486,344,8,,,Down,Ch,Yes,0,0.0,62.0
Caucasian,Male,70-80,3,1,1,2,MC,InternalMedicine,43,0,12,0,0,1,785,V45,786,7,,,No,No,Yes,1,1.0,43.0
Caucasian,Female,70-80,5,3,17,5,MC,Family/GeneralPractice,17,1,8,0,0,1,428,403,707,8,>200,,No,No,Yes,0,1.0,18.0
Caucasian,Female,40-50,2,1,7,3,BC,Surgery-General,50,1,23,1,0,10,38,250.4,403,9,,,Up,Ch,Yes,0,11.0,51.0


In [0]:
# Selecting the most important predictor coloums

data=df.select(['race','gender','age','admission_type','discharge_disposition',
                                 'admission_source','time_in_hospital','num_medications','number_diagnoses','max_glu_serum',
                'A1Cresult','insulin','change','diabetesMed','readmitted','total_visits','total_procedures'])

In [0]:
df_time_in_hospital = data.groupby("time_in_hospital").agg({'time_in_hospital': "sum"})
display(df_time_in_hospital)

time_in_hospital,sum(time_in_hospital)
7,3696.0
11,1452.0
3,4476.0
8,3528.0
5,4345.0
6,3828.0
9,2592.0
1,1073.0
10,2240.0
4,4732.0


In [0]:
df_num_medications = data.groupby("num_medications").agg({'num_medications': "sum"})
display(df_num_medications)

num_medications,sum(num_medications)
7,1925.0
51,306.0
15,7815.0
54,54.0
11,5192.0
29,2523.0
42,504.0
64,64.0
3,201.0
30,2010.0


In [0]:
from pyspark.sql.types import IntegerType

In [0]:
#Dropping Na values
data=data.dropna()


#Converting to Interger type for the numeric colums to avoid string input errors
data = data.withColumn("discharge_disposition", data["discharge_disposition"].cast(IntegerType()))
data = data.withColumn("admission_source", data["admission_source"].cast(IntegerType()))
data = data.withColumn("time_in_hospital", data["time_in_hospital"].cast(IntegerType()))
data = data.withColumn("admission_type", data["admission_type"].cast(IntegerType()))
data = data.withColumn("num_medications", data["num_medications"].cast(IntegerType()))
data = data.withColumn("number_diagnoses", data["number_diagnoses"].cast(IntegerType()))
data = data.withColumn("readmitted", data["readmitted"].cast(IntegerType()))

In [0]:
# Creating a 70-30 train test split

train_data,test_data=data.randomSplit([0.7,0.3], seed=20)

In [0]:
# Importing sparkml libraries required

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler,StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
# Using StringIndexer to convert the categorical columns to hold numerical data

race_indexer = StringIndexer(inputCol='race',outputCol='race_index',handleInvalid='keep')
gender_indexer = StringIndexer(inputCol='gender',outputCol='gender_index',handleInvalid='keep')
age_indexer = StringIndexer(inputCol='age',outputCol='age_index',handleInvalid='keep')
max_glu_serum_indexer = StringIndexer(inputCol='max_glu_serum',outputCol='max_glu_serum_index',handleInvalid='keep')
A1Cresult_indexer = StringIndexer(inputCol='A1Cresult',outputCol='A1Cresult_index',handleInvalid='keep')
insulin_indexer = StringIndexer(inputCol='insulin',outputCol='insulin_index',handleInvalid='keep')
change_indexer = StringIndexer(inputCol='change',outputCol='change_index',handleInvalid='keep')
diabetesMed_indexer = StringIndexer(inputCol='diabetesMed',outputCol='diabetesMed_index',handleInvalid='keep')


In [0]:
# Using Vector assembler to create a vector of input features

assembler = VectorAssembler(inputCols=['race_index','gender_index','age_index','discharge_disposition',
                                 'admission_source','time_in_hospital','num_medications','number_diagnoses','max_glu_serum_index','A1Cresult_index','insulin_index','change_index','diabetesMed_index','total_visits','total_procedures'],
                            outputCol="features")

### Decision Tree

In [0]:
# Creating an object for the Decesion Tree Classifier model
# Using the parameters maxBins,max depth to control overfitting

dt_model = DecisionTreeClassifier(labelCol='readmitted',maxDepth=5,maxBins=100)

In [0]:
# Pipeline to pass the data through indexer and assembler simultaneously.

pipe = Pipeline(stages=[race_indexer,gender_indexer,age_indexer,max_glu_serum_indexer,A1Cresult_indexer,
                        insulin_indexer,change_indexer,diabetesMed_indexer,assembler,dt_model])

In [0]:
# Fitting the train data

fit_model=pipe.fit(train_data)

In [0]:
# Storing the results in a dataframe

results = fit_model.transform(test_data)

In [0]:
#Evaluating the model Accuracy

ACC_evaluator = MulticlassClassificationEvaluator(
    labelCol="readmitted", predictionCol="prediction", metricName="accuracy")

accuracy = ACC_evaluator.evaluate(results)

print("The accuracy of the decision tree classifier is {}".format(accuracy))

The accuracy of the decision tree classifier is 0.5964982093115798


### Gradient-Boosted Tree

In [0]:
#GBTClassification

from pyspark.ml.classification import GBTClassifier

In [0]:
gbt_model = GBTClassifier(labelCol='readmitted',maxDepth=5,maxBins=100)

In [0]:
# Pipeline to pass the data through indexer and assembler simultaneously.

pipe_gbt = Pipeline(stages=[race_indexer,gender_indexer,age_indexer,max_glu_serum_indexer,A1Cresult_indexer,
                        insulin_indexer,change_indexer,diabetesMed_indexer,assembler,gbt_model])

In [0]:
fit_model_gbt = pipe_gbt.fit(train_data)

In [0]:
results_gbt = fit_model_gbt.transform(test_data)

In [0]:
#Evaluating the model Accuracy

ACC_evaluator_gbt = MulticlassClassificationEvaluator(
    labelCol="readmitted", predictionCol="prediction", metricName="accuracy")

accuracy_gbt = ACC_evaluator_gbt.evaluate(results_gbt)

print("The accuracy of the Gradient-Boosted tree classifier is {}".format(accuracy_gbt))

The accuracy of the Gradient-Boosted tree classifier is 0.6032630322323915


### Random Forest

In [0]:
from pyspark.ml.classification import RandomForestClassifier

In [0]:
# Create an object for the Random Forest model
rf_model = RandomForestClassifier(labelCol="readmitted", featuresCol="features",numTrees=20, seed=42)

In [0]:
# Pipeline to pass the data through indexer and assembler simultaneously.

pipe_rf = Pipeline(stages=[race_indexer,gender_indexer,age_indexer,max_glu_serum_indexer,A1Cresult_indexer,
                        insulin_indexer,change_indexer,diabetesMed_indexer,assembler,rf_model])

In [0]:
fit_model_rf=pipe_rf.fit(train_data)

In [0]:
results_rf = fit_model_rf.transform(test_data)

In [0]:
#Evaluating the model Accuracy

ACC_evaluator_rf = MulticlassClassificationEvaluator(
    labelCol="readmitted", predictionCol="prediction", metricName="accuracy")

accuracy_rf = ACC_evaluator_rf.evaluate(results_rf)

print("The accuracy of the model is {}".format(accuracy_rf))

The accuracy of the model is 0.6000795861520095


### Linear SVC

In [0]:
from pyspark.ml.classification import LinearSVC

In [0]:
svm_model = LinearSVC(labelCol='readmitted',maxIter=100)

In [0]:
# Pipeline to pass the data through indexer and assembler simultaneously.

pipe_svm = Pipeline(stages=[race_indexer,gender_indexer,age_indexer,max_glu_serum_indexer,A1Cresult_indexer,
                        insulin_indexer,change_indexer,diabetesMed_indexer,assembler,svm_model])

In [0]:
fit_model_svm = pipe_svm.fit(train_data)

In [0]:
results_svm = fit_model_svm.transform(test_data)

In [0]:
#Evaluating the model Accuracy

ACC_evaluator_svm = MulticlassClassificationEvaluator(
    labelCol="readmitted", predictionCol="prediction", metricName="accuracy")

accuracy_svm = ACC_evaluator_svm.evaluate(results_svm)

print("The accuracy of the linear svm classifier is {}".format(accuracy_svm))

The accuracy of the linear svm classifier is 0.5937126939912455


### Logistic Regression

In [0]:
#Logistic Regression
from pyspark.ml.classification import LogisticRegression

In [0]:
lr_model = LogisticRegression(labelCol='readmitted')

In [0]:
# Pipeline to pass the data through indexer and assembler simultaneously.

pipe_lr = Pipeline(stages=[race_indexer,gender_indexer,age_indexer,max_glu_serum_indexer,A1Cresult_indexer,
                        insulin_indexer,change_indexer,diabetesMed_indexer,assembler,lr_model])

In [0]:
fit_model_lr=pipe_lr.fit(train_data)

In [0]:
results_lr = fit_model_lr.transform(test_data)

In [0]:
#Evaluating the model Accuracy

ACC_evaluator_lr = MulticlassClassificationEvaluator(
    labelCol="readmitted", predictionCol="prediction", metricName="accuracy")

accuracy_lr = ACC_evaluator_lr.evaluate(results_lr)

print("The accuracy of the Logistic regression classifier is {}".format(accuracy_lr))

The accuracy of the Logistic regression classifier is 0.6008754476721051
