In [1]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkConf
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

### Dataset description

In [2]:
#loading the dataset
df = spark.read.csv('/home/nayana/Desktop/diabetic_data.csv', inferSchema='true', header='true')

In [3]:
df.head()

Row(encounter_id=2278392, patient_nbr=8222157, race='Caucasian', gender='Female', age='[0-10)', weight='?', admission_type_id=6, discharge_disposition_id=25, admission_source_id=1, time_in_hospital=1, payer_code='?', medical_specialty='Pediatrics-Endocrinology', num_lab_procedures=41, num_procedures=0, num_medications=1, number_outpatient=0, number_emergency=0, number_inpatient=0, diag_1='250.83', diag_2='?', diag_3='?', number_diagnoses=1, max_glu_serum='None', A1Cresult='None', metformin='No', repaglinide='No', nateglinide='No', chlorpropamide='No', glimepiride='No', acetohexamide='No', glipizide='No', glyburide='No', tolbutamide='No', pioglitazone='No', rosiglitazone='No', acarbose='No', miglitol='No', troglitazone='No', tolazamide='No', examide='No', citoglipton='No', insulin='No', glyburide-metformin='No', glipizide-metformin='No', glimepiride-pioglitazone='No', metformin-rosiglitazone='No', metformin-pioglitazone='No', change='No', diabetesMed='No', readmitted='NO')

In [4]:
df.count()

101766

In [5]:
df.columns

['encounter_id',
 'patient_nbr',
 'race',
 'gender',
 'age',
 'weight',
 'admission_type_id',
 'discharge_disposition_id',
 'admission_source_id',
 'time_in_hospital',
 'payer_code',
 'medical_specialty',
 'num_lab_procedures',
 'num_procedures',
 'num_medications',
 'number_outpatient',
 'number_emergency',
 'number_inpatient',
 'diag_1',
 'diag_2',
 'diag_3',
 'number_diagnoses',
 'max_glu_serum',
 'A1Cresult',
 'metformin',
 'repaglinide',
 'nateglinide',
 'chlorpropamide',
 'glimepiride',
 'acetohexamide',
 'glipizide',
 'glyburide',
 'tolbutamide',
 'pioglitazone',
 'rosiglitazone',
 'acarbose',
 'miglitol',
 'troglitazone',
 'tolazamide',
 'examide',
 'citoglipton',
 'insulin',
 'glyburide-metformin',
 'glipizide-metformin',
 'glimepiride-pioglitazone',
 'metformin-rosiglitazone',
 'metformin-pioglitazone',
 'change',
 'diabetesMed',
 'readmitted']

### Dataset pre-processing
#### 1. Age Feature :  
The values of age are in the form of [0–10),[10–20),[20–30) and are ordinal.  The age feature is must be converted to categorical by converting it into numerical values.

In [6]:
df.groupBy('age').count().show()

+--------+-----+
|     age|count|
+--------+-----+
| [70-80)|26068|
|[90-100)| 2793|
| [40-50)| 9685|
| [10-20)|  691|
| [20-30)| 1657|
| [30-40)| 3775|
|  [0-10)|  161|
| [80-90)|17197|
| [50-60)|17256|
| [60-70)|22483|
+--------+-----+



In [7]:
df = df.replace(['[0-10)','[10-20)','[20-30)','[30-40)','[40-50)','[50-60)','[60-70)','[70-80)','[80-90)','[90-100)'],
               ['5','15','25','35','45','55','65','75','85','95'], 'age')

In [8]:
df.groupBy('age').count().show()

+---+-----+
|age|count|
+---+-----+
| 15|  691|
| 85|17197|
| 35| 3775|
|  5|  161|
| 75|26068|
| 55|17256|
| 95| 2793|
| 25| 1657|
| 65|22483|
| 45| 9685|
+---+-----+



#### 2. Dealing with Duplicate Records :  
We found out that, for some patient there are number of entries in the dataset these duplicate entries are not helpful in solving the task hence we removed these entries.

In [9]:
df.select('patient_nbr').count()

101766

In [10]:
df = df.dropDuplicates(['patient_nbr'])
df = df.na.drop()
df.select('patient_nbr').count()

71518

#### 3. Transformation,mapping and Merging into fewer categories :
Some of the transformation like UP to 10, Down to -10, similarly some of the similar values are merged into single value like in admission source id [2, 3] to 1, [5, 6, 10, 22, 25] to 4. We also map the values in the redmitted to 1 and 0 for binary classification. The values >30 as well as <30 will be mapped to 1 (meaning readmitted) and 0 means not readmitted 

In [11]:
df.select('discharge_disposition_id').show()

+------------------------+
|discharge_disposition_id|
+------------------------+
|                      18|
|                       1|
|                       3|
|                      25|
|                       1|
|                       1|
|                      23|
|                       1|
|                       1|
|                       1|
|                       1|
|                       6|
|                       1|
|                       1|
|                       1|
|                       1|
|                       1|
|                       1|
|                      18|
|                       6|
+------------------------+
only showing top 20 rows



In [12]:
print(type(df))
df=df.toPandas()
print(type(df))
df.head()

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,encounter_id,patient_nbr,race,gender,age,weight,admission_type_id,discharge_disposition_id,admission_source_id,time_in_hospital,...,citoglipton,insulin,glyburide-metformin,glipizide-metformin,glimepiride-pioglitazone,metformin-rosiglitazone,metformin-pioglitazone,change,diabetesMed,readmitted
0,42462216,113652,Caucasian,Female,55,?,2,18,4,5,...,No,No,No,No,No,No,No,No,Yes,NO
1,100131384,132318,AfricanAmerican,Male,35,?,1,1,6,4,...,No,Steady,No,No,No,No,No,Ch,Yes,NO
2,69445656,244629,Caucasian,Female,75,?,1,3,7,4,...,No,No,No,No,No,No,No,Ch,Yes,NO
3,4476318,300825,Caucasian,Male,75,?,6,25,7,5,...,No,No,No,No,No,No,No,No,Yes,>30
4,36236244,308619,Caucasian,Female,75,?,2,1,1,2,...,No,Steady,No,No,No,No,No,No,Yes,>30


In [13]:
df['discharge_disposition_id'] = df['discharge_disposition_id'].apply(lambda x : 1 if int(x) in [6, 8, 9, 13] 
                                                                           else ( 2 if int(x) in [3, 4, 5, 14, 22, 23, 24]
                                                                           else ( 10 if int(x) in [12, 15, 16, 17]
                                                                           else ( 11 if int(x) in [19, 20, 21]
                                                                           else ( 18 if int(x) in [25, 26] 
                                                                           else int(x) )))))

df = df[~df.discharge_disposition_id.isin([11,13,14,19,20,21])]

df['admission_type_id'] = df['admission_type_id'].apply(lambda x : 1 if int(x) in [2, 7]
                                                            else ( 5 if int(x) in [6, 8]
                                                            else int(x) ))

df['admission_source_id'] = df['admission_source_id'].apply(lambda x : 1 if int(x) in [2, 3]
                                                            else ( 4 if int(x) in [5, 6, 10, 22, 25]
                                                            else ( 9 if int(x) in [15, 17, 20, 21]
                                                            else ( 11 if int(x) in [13, 14]
                                                            else int(x) ))))

for col in ["metformin", "repaglinide", "nateglinide", "chlorpropamide", "glimepiride", "acetohexamide", "glipizide", "glyburide", "tolbutamide", "pioglitazone", "rosiglitazone", "acarbose", "miglitol", "troglitazone", "tolazamide", "examide", "citoglipton", "insulin", "glyburide-metformin", "glipizide-metformin", "glimepiride-pioglitazone", "metformin-rosiglitazone", "metformin-pioglitazone"]:
    df[col] = df[col].apply(lambda x : 10 if x == 'Up' 
                                              else ( -10 if x == 'Down'                                                          
                                              else ( 0 if x == 'Steady'
                                              else  -20)))


df['change'] = df['change'].apply(lambda x : 1 if x == 'Ch'
                                                 else -1)


df['diabetesMed'] = df['diabetesMed'].apply(lambda x : -1 if x == 'No'
                                                else 1)


df['max_glu_serum'] = df['max_glu_serum'].apply(lambda x : 200 if x == '>200' 
                                                            else ( 300 if x == '>300'                                                          
                                                            else ( 100 if x == 'Norm'
                                                            else  0)))

df['A1Cresult'] = df['A1Cresult'].apply(lambda x : 7 if x == '>7' 
                                                         else (8 if  x == '>8'                                                        
                                                         else ( 5 if x == 'Norm'
                                                         else  0)))

df['readmitted'] = df['readmitted'].apply(lambda x : 1 if x == '>30'
                                               else (1 if x == '<30'
                                               else 0))

df['race'] = df['race'].apply(lambda x : 'Unknown' if x == '?'
                                                   else x)


In [14]:
df=spark.createDataFrame(data=df)
print(type(df))
print(len(df.columns))

<class 'pyspark.sql.dataframe.DataFrame'>
50


#### 4. Removing unwanted columns 
Removing columns dont provide any value to predicition

In [15]:
df = df.drop('weight')
df = df.drop('encounter_id', 'patient_nbr', 'payer_code')
df = df.drop('glimepiride', 'acetohexamide', 'tolbutamide')
df = df.drop('pioglitazone', 'rosiglitazone', 'acarbose', 'miglitol')
df = df.drop('troglitazone', 'examide', 'citoglipton')
df = df.drop('glyburide-metformin', 'glipizide-metformin')
df = df.drop('glimepiride-pioglitazone', 'metformin-rosiglitazone')
df = df.drop('metformin-pioglitazone')
print(len(df.columns))

31


In [16]:
df.columns

['race',
 'gender',
 'age',
 'admission_type_id',
 'discharge_disposition_id',
 'admission_source_id',
 'time_in_hospital',
 'medical_specialty',
 'num_lab_procedures',
 'num_procedures',
 'num_medications',
 'number_outpatient',
 'number_emergency',
 'number_inpatient',
 'diag_1',
 'diag_2',
 'diag_3',
 'number_diagnoses',
 'max_glu_serum',
 'A1Cresult',
 'metformin',
 'repaglinide',
 'nateglinide',
 'chlorpropamide',
 'glipizide',
 'glyburide',
 'tolazamide',
 'insulin',
 'change',
 'diabetesMed',
 'readmitted']

In [17]:
df.select('readmitted').show()

+----------+
|readmitted|
+----------+
|         0|
|         0|
|         0|
|         1|
|         1|
|         1|
|         0|
|         0|
|         0|
|         0|
|         0|
|         1|
|         0|
|         0|
|         1|
|         0|
|         0|
|         0|
|         0|
|         1|
+----------+
only showing top 20 rows



In [18]:
df.select('readmitted').distinct().show()

+----------+
|readmitted|
+----------+
|         0|
|         1|
+----------+



### Preparing the ML Classifier

#### 1. Converting categorical columns features to numerical

In [19]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import VectorIndexer
from pyspark.sql.functions import col, explode, array, lit
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [20]:
df.printSchema()

root
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- admission_type_id: long (nullable = true)
 |-- discharge_disposition_id: long (nullable = true)
 |-- admission_source_id: long (nullable = true)
 |-- time_in_hospital: long (nullable = true)
 |-- medical_specialty: string (nullable = true)
 |-- num_lab_procedures: long (nullable = true)
 |-- num_procedures: long (nullable = true)
 |-- num_medications: long (nullable = true)
 |-- number_outpatient: long (nullable = true)
 |-- number_emergency: long (nullable = true)
 |-- number_inpatient: long (nullable = true)
 |-- diag_1: string (nullable = true)
 |-- diag_2: string (nullable = true)
 |-- diag_3: string (nullable = true)
 |-- number_diagnoses: long (nullable = true)
 |-- max_glu_serum: long (nullable = true)
 |-- A1Cresult: long (nullable = true)
 |-- metformin: long (nullable = true)
 |-- repaglinide: long (nullable = true)
 |-- nateglinide: long (nullable = true)
 |

In [21]:
print("The categorical columns are :")
catCols=['race','gender','age','medical_specialty','diag_1','diag_2','diag_3']
print(catCols)
print(len(catCols))

The categorical columns are :
['race', 'gender', 'age', 'medical_specialty', 'diag_1', 'diag_2', 'diag_3']
7


In [22]:
steps=[]
for categoryCol in catCols:
    stringIdx=StringIndexer(inputCol=categoryCol, outputCol=categoryCol + "Index").setHandleInvalid("skip")
    encoder = OneHotEncoder(inputCols=[categoryCol + "Index"], outputCols=[categoryCol + "classVec"])
    steps += [stringIdx,encoder]
   
print(len(steps))
steps

14


[StringIndexer_076a54378422,
 OneHotEncoder_8fb7fe5096e6,
 StringIndexer_e4d90ae8c8b7,
 OneHotEncoder_8e1671fe1bd3,
 StringIndexer_e80817b07d3f,
 OneHotEncoder_75c727030beb,
 StringIndexer_6012266ce94e,
 OneHotEncoder_92030f8fe0b1,
 StringIndexer_105e2a2f3381,
 OneHotEncoder_a382b411b1e3,
 StringIndexer_ca3b2f48de53,
 OneHotEncoder_88a648429df1,
 StringIndexer_e79130209d7d,
 OneHotEncoder_d41e62df1b6b]

In [23]:
numberCols=['admission_type_id','discharge_disposition_id','admission_source_id','time_in_hospital',
            'num_lab_procedures','num_procedures','num_medications','number_outpatient','number_emergency',
            'number_inpatient','number_diagnoses','max_glu_serum','A1Cresult','metformin','repaglinide',
            'nateglinide','chlorpropamide','glipizide','glyburide','tolazamide','insulin','change',
            'diabetesMed','readmitted']
print("The count of number cols {}".format(len(numberCols)))
assInputs=[c + "classVec" for c in catCols] + numberCols
print(assInputs)
asmblr= VectorAssembler(inputCols=assInputs, outputCol='features')
print("\n")
steps +=[asmblr]
print(steps)
print(len(steps))

The count of number cols 24
['raceclassVec', 'genderclassVec', 'ageclassVec', 'medical_specialtyclassVec', 'diag_1classVec', 'diag_2classVec', 'diag_3classVec', 'admission_type_id', 'discharge_disposition_id', 'admission_source_id', 'time_in_hospital', 'num_lab_procedures', 'num_procedures', 'num_medications', 'number_outpatient', 'number_emergency', 'number_inpatient', 'number_diagnoses', 'max_glu_serum', 'A1Cresult', 'metformin', 'repaglinide', 'nateglinide', 'chlorpropamide', 'glipizide', 'glyburide', 'tolazamide', 'insulin', 'change', 'diabetesMed', 'readmitted']


[StringIndexer_076a54378422, OneHotEncoder_8fb7fe5096e6, StringIndexer_e4d90ae8c8b7, OneHotEncoder_8e1671fe1bd3, StringIndexer_e80817b07d3f, OneHotEncoder_75c727030beb, StringIndexer_6012266ce94e, OneHotEncoder_92030f8fe0b1, StringIndexer_105e2a2f3381, OneHotEncoder_a382b411b1e3, StringIndexer_ca3b2f48de53, OneHotEncoder_88a648429df1, StringIndexer_e79130209d7d, OneHotEncoder_d41e62df1b6b, VectorAssembler_4f114b780946]
1

In [24]:
pipl = Pipeline(stages=steps)
piplModel = pipl.fit(df)
dataset = piplModel.transform(df)
featureIndexer = VectorIndexer(inputCol="features", \
                               outputCol="indexedFeatures", \
                               maxCategories=8).fit(dataset)
featureIndexer.transform(dataset).show(5, True)

+---------------+------+---+-----------------+------------------------+-------------------+----------------+--------------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+---------+---------+----------+-------+------+-----------+----------+---------+-------------+-----------+--------------+--------+-------------+----------------------+-------------------------+-----------+----------------+-----------+-----------------+-----------+----------------+--------------------+--------------------+
|           race|gender|age|admission_type_id|discharge_disposition_id|admission_source_id|time_in_hospital|   medical_specialty|num_lab_procedures|num_procedures|num_medications|number_outpatient|number_emergency|number_inpatient|diag_1|diag_2|diag_3|number_diagnoses|max_glu_serum|A1Cresult|metformin|repaglinide|nateglinide|chlorpro

In [25]:
dataset.select('features').take(1)

[Row(features=SparseVector(2286, {0: 1.0, 5: 1.0, 9: 1.0, 18: 1.0, 90: 1.0, 783: 1.0, 1523: 1.0, 2262: 1.0, 2263: 18.0, 2264: 4.0, 2265: 5.0, 2266: 44.0, 2268: 11.0, 2272: 8.0, 2274: 8.0, 2276: -20.0, 2277: -20.0, 2278: -20.0, 2279: -20.0, 2280: -20.0, 2281: -20.0, 2282: -20.0, 2283: -1.0, 2284: 1.0}))]

In [26]:
dataset.select('readmitted').distinct().show()

+----------+
|readmitted|
+----------+
|         0|
|         1|
+----------+



In [27]:
colsSelected = ['readmitted', 'features']
data = dataset.select(colsSelected)
data.printSchema()

root
 |-- readmitted: long (nullable = true)
 |-- features: vector (nullable = true)



In [28]:
data.groupBy('readmitted').count().show()

+----------+-----+
|readmitted|count|
+----------+-----+
|         0|41901|
|         1|28533|
+----------+-----+



After the feature engineering, we select the labels(readmitted) and its features of 24 columns as our dataset. In this dataset, we split into training and testing for ML.

In [29]:
(train, test) = data.randomSplit([0.7,0.3])
print(train.count())
print(test.count())

49348
21086


### Logistic Regression

In [30]:
logReg = LogisticRegression(featuresCol ='features', labelCol = 'readmitted', maxIter = 5)
logRegModel = logReg.fit(train)

In [31]:
predictions = logRegModel.transform(test)
predictions.select('features','readmitted', 'prediction').show(10)

+--------------------+----------+----------+
|            features|readmitted|prediction|
+--------------------+----------+----------+
|(2286,[0,5,7,16,8...|         0|       0.0|
|(2286,[0,5,7,16,8...|         0|       0.0|
|(2286,[0,5,7,16,8...|         0|       0.0|
|(2286,[0,5,7,16,8...|         0|       0.0|
|(2286,[0,5,7,16,8...|         0|       0.0|
|(2286,[0,5,7,16,8...|         0|       1.0|
|(2286,[0,5,7,16,8...|         0|       0.0|
|(2286,[0,5,7,16,8...|         0|       0.0|
|(2286,[0,5,7,16,8...|         0|       0.0|
|(2286,[0,5,7,16,8...|         0|       0.0|
+--------------------+----------+----------+
only showing top 10 rows



In [32]:
predictions.show(5)

+----------+--------------------+--------------------+--------------------+----------+
|readmitted|            features|       rawPrediction|         probability|prediction|
+----------+--------------------+--------------------+--------------------+----------+
|         0|(2286,[0,5,7,16,8...|[3.81504414457149...|[0.97843840565406...|       0.0|
|         0|(2286,[0,5,7,16,8...|[3.29597199823587...|[0.96429036778233...|       0.0|
|         0|(2286,[0,5,7,16,8...|[4.71940391154253...|[0.99115837727803...|       0.0|
|         0|(2286,[0,5,7,16,8...|[1.69173978205874...|[0.84445282114395...|       0.0|
|         0|(2286,[0,5,7,16,8...|[0.48798335803085...|[0.61963124809649...|       0.0|
+----------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [33]:
evaluator = BinaryClassificationEvaluator(labelCol="readmitted", rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy of LR =", accuracy)

Accuracy of LR = 0.974942377133431
