### Install Dependencies

In [1]:
#!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=250c45feeec8dce2e38c5cea554d2c5c371f017be016f0d3e1adf7b2a770145c
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


### Run a SparkSession

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark").getOrCreate()

### Clone Diabetes Dataset

In [3]:
! git clone https://github.com/education454/diabetes_dataset

Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0[K
Receiving objects: 100% (6/6), 13.02 KiB | 1.86 MiB/s, done.


In [4]:
! ls diabetes_dataset

diabetes.csv  new_test.csv


In [5]:
df = spark.read.csv('/content/diabetes_dataset/diabetes.csv', header=True, inferSchema=True)

In [6]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [7]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [9]:
print((df.count(), len(df.columns)))

(2000, 9)


In [10]:
df.groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [11]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

### Cleaning Data

In [13]:
#find for null values
for col in df.columns:
  print(col+":", df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [14]:
# find total number of 0 values in columns: Glucose, Bloodpressure, SkinThickness, Insulin and BMI
def count_zeros():
  columns_list = ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']
  for i in columns_list:
    print(i+":", df[df[i]==0].count())

In [15]:
count_zeros()

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [18]:
from pyspark.sql.functions import *
for i in df.columns[1:6]:
  data = df.agg({i: 'mean'}).first()[0]
  print("Mean value for {} is {}".format(i, int(data)))
  df = df.withColumn(i, when(df[i]==0, int(data)).otherwise(df[i]))

Mean value for Glucose is 121
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 80
Mean value for BMI is 32


In [19]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


In [20]:
for col in df.columns:
  print("correlation to outcome for {} is {}".format(col, df.stat.corr('Outcome', col)))

correlation to outcome for Pregnancies is 0.22443699263363961
correlation to outcome for Glucose is 0.48796646527321064
correlation to outcome for BloodPressure is 0.17171333286446713
correlation to outcome for SkinThickness is 0.1659010662889893
correlation to outcome for Insulin is 0.1711763270226193
correlation to outcome for BMI is 0.2827927569760082
correlation to outcome for DiabetesPedigreeFunction is 0.1554590791569403
correlation to outcome for Age is 0.23650924717620253
correlation to outcome for Outcome is 1.0


In [23]:
#convert all columns in one column "features"
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pregnancies',
                                       'Glucose',
                                       'BloodPressure',
                                       'SkinThickness',
                                       'Insulin',
                                       'BMI',
                                       'DiabetesPedigreeFunction',
                                       'Age'], outputCol='features')
output_data = assembler.transform(df)

In [25]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [26]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

In [28]:
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select('features', 'Outcome')

In [29]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [30]:
train, test = final_data.randomSplit([0.7, 0.3])
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)

In [31]:
summary = model.summary

In [32]:
summary.predictions.describe().show()

+-------+------------------+-------------------+
|summary|           Outcome|         prediction|
+-------+------------------+-------------------+
|  count|              1406|               1406|
|   mean|0.3314366998577525|0.24395448079658605|
| stddev|0.4708971507614184|0.42961839649833605|
|    min|               0.0|                0.0|
|    max|               1.0|                1.0|
+-------+------------------+-------------------+



### Evaluation & Test Model

In [33]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [36]:
predictions.predictions.show(30)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,67.0,76.0,20...|      0|[2.23856234057849...|[0.90365936954836...|       0.0|
|[0.0,73.0,69.0,20...|      0|[4.07619412877912...|[0.98331130359106...|       0.0|
|[0.0,78.0,88.0,29...|      0|[2.72737596900438...|[0.93862284062239...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.47327835795519...|[0.92224717280921...|       0.0|
|[0.0,86.0,68.0,32...|      0|[2.64179687819792...|[0.93350359190717...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.26120310496104...|[0.90561252083396...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.36081769741827...|[0.91379024357165...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.36432634115091...|[0.91406624523050...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.36432634115091...|[0.91406624523050...|    

In [37]:
#model acuracy
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.8565415772008592

In [38]:
model.save("model")

In [39]:
#load saved model back
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load("model")