#Diabetes Prediction with Pyspark MLib

In [8]:
! pip install pyspark



In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("diabetes").getOrCreate()

In [13]:

! git clone https://github.com/education454/diabetes_dataset

fatal: destination path 'diabetes_dataset' already exists and is not an empty directory.


In [18]:
df = spark.read.csv('/content/diabetes_dataset/diabetes.csv', header = True, inferSchema = True)

In [19]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [20]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [21]:
print(df.count(), len(df.columns))

2000 9


In [22]:
df.groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [26]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

#Cleaning Data


In [28]:
for col in df.columns:
  print(col+":", df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [31]:
def count_zeros():
  columns_list = ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']
  for i in columns_list:
    print(i+":", df[df[i]==0].count())

count_zeros()

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [35]:
from pyspark.sql.functions import *

for i in df.columns[1:6]:
  data = df.agg({i:'mean'}).first()[0]
  print("Mean value for {} is {}".format(i, int(data)))
  df = df.withColumn(i, when(df[i]==0, int(data)).otherwise(df[i]))

Mean value for Glucose is 121
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 80
Mean value for BMI is 32


In [36]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


In [42]:
for col in df.columns:
  print("Correlation to outcome for {} is {}".format(col, df.stat.corr('Outcome', col)))

Correlation to outcome for Pregnancies is 0.22443699263363961
Correlation to outcome for Glucose is 0.48796646527321064
Correlation to outcome for BloodPressure is 0.17171333286446713
Correlation to outcome for SkinThickness is 0.1659010662889893
Correlation to outcome for Insulin is 0.1711763270226193
Correlation to outcome for BMI is 0.2827927569760082
Correlation to outcome for DiabetesPedigreeFunction is 0.1554590791569403
Correlation to outcome for Age is 0.23650924717620253
Correlation to outcome for Outcome is 1.0


In [50]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols = ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age'], outputCol = 'features')

In [51]:
output_data = assembler.transform(df)

In [52]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [53]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

#Logistic Regression

In [54]:
from pyspark.ml.classification import LogisticRegression

final_data = output_data.select('features', 'Outcome')


In [55]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [56]:
train, test = final_data.randomSplit([0.7, 0.3])
models = LogisticRegression(labelCol = 'Outcome')
model = models.fit(train)

In [57]:
summary = model.summary

In [59]:
summary.predictions.describe().show()

+-------+------------------+-------------------+
|summary|           Outcome|         prediction|
+-------+------------------+-------------------+
|  count|              1375|               1375|
|   mean|0.3367272727272727|0.25163636363636366|
| stddev|0.4727626944923763|0.43411123060789014|
|    min|               0.0|                0.0|
|    max|               1.0|                1.0|
+-------+------------------+-------------------+



In [60]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.evaluate(test)

In [62]:
predictions.predictions.show(40)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|      0|[3.55998567769527...|[0.97234719257979...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.51901518973606...|[0.97122399341637...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.40259109645552...|[0.91702467428512...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.48379538036849...|[0.92299797926507...|       0.0|
|[0.0,86.0,68.0,32...|      0|[2.59508329463728...|[0.93054447774590...|       0.0|
|[0.0,86.0,68.0,32...|      0|[2.59508329463728...|[0.93054447774590...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.17528912166685...|[0.89800841430991...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.37048501385193...|[0.91454877168765...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.37048501385193...|[0.91454877168765...|    

In [63]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol= 'rawPrediction', labelCol = 'Outcome')
evaluator.evaluate(model.transform(test))

0.859392500336007

In [64]:
model.save("Diabetes_Prediction_Model")

In [65]:
from pyspark.ml.classification import LogisticRegressionModel
model_diabetes_prediction = LogisticRegressionModel.load('Diabetes_Prediction_Model')
