## **Install Dependencies**

In [2]:
! pip install pyspark # ! is for command line script in Colab

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/89/db/e18cfd78e408de957821ec5ca56de1250645b05f8523d169803d8df35a64/pyspark-3.1.2.tar.gz (212.4MB)
[K     |████████████████████████████████| 212.4MB 68kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.1MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=34d688116368ed93174884e2c345136c27b6e2d032b8dde2e9d7ef77f5efe51b
  Stored in directory: /root/.cache/pip/wheels/40/1b/2c/30f43be2627857ab80062bef1527c0128f7b4070b6b2d02139
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


## **Run SparkSession**

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark").getOrCreate()

## **Clone Diabetes Repo**

In [4]:
! git clone https://github.com/education454/diabetes_dataset

Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (6/6), done.


In [5]:
! ls diabetes_dataset

diabetes.csv  new_test.csv


In [6]:
df = spark.read.csv("/content/diabetes_dataset/diabetes.csv", header=True, inferSchema=True)

In [7]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [8]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [9]:
print(df.count(),len(df.columns)) # Rows and Columns

2000 9


In [10]:
df.groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [11]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

## **Cleaning Data**

In [12]:
# Find for Null Values
for col in df.columns:
  print(col+":", df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [14]:
def count_zeros():
  columns_list = ["Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI"]
  for i in columns_list:
    print(i+":", df[df[i]==0].count())

count_zeros() # Total number of zero values in respective columns

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [18]:
from pyspark.sql.functions import *
# Find mean value
for i in df.columns[1:6]:
  data = df.agg({i:"mean"}).first()[0]
  print("Mean Value for {} is {}".format(i, int(data)))

  # Update values which are zero to mean values
  df = df.withColumn(i, when(df[i]==0, int(data)).otherwise(df[i]))

Mean Value for Glucose is 121
Mean Value for BloodPressure is 69
Mean Value for SkinThickness is 20
Mean Value for Insulin is 80
Mean Value for BMI is 32


In [19]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


### **Finding Co-relation between Input Values**

In [20]:
for col in df.columns:
  print("Corelation to outcome variable for {} is {}".format(col, df.stat.corr("Outcome", col)))

Corelation to outcome variable for Pregnancies is 0.22443699263363961
Corelation to outcome variable for Glucose is 0.48796646527321064
Corelation to outcome variable for BloodPressure is 0.17171333286446713
Corelation to outcome variable for SkinThickness is 0.1659010662889893
Corelation to outcome variable for Insulin is 0.1711763270226193
Corelation to outcome variable for BMI is 0.2827927569760082
Corelation to outcome variable for DiabetesPedigreeFunction is 0.1554590791569403
Corelation to outcome variable for Age is 0.23650924717620253
Corelation to outcome variable for Outcome is 1.0


In [21]:
from pyspark.ml.feature import VectorAssembler # Merges different columns in a single vector column
assembler = VectorAssembler(inputCols=["Pregnancies", "Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI", "DiabetesPedigreeFunction", "Age"], outputCol="features")

In [22]:
output_data = assembler.transform(df)

In [23]:
output_data

DataFrame[Pregnancies: int, Glucose: int, BloodPressure: int, SkinThickness: int, Insulin: int, BMI: double, DiabetesPedigreeFunction: double, Age: int, Outcome: int, features: vector]

In [24]:
output_data.printSchema

<bound method DataFrame.printSchema of DataFrame[Pregnancies: int, Glucose: int, BloodPressure: int, SkinThickness: int, Insulin: int, BMI: double, DiabetesPedigreeFunction: double, Age: int, Outcome: int, features: vector]>

In [25]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [26]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

## **Training ML Model using Logistic Regression**

In [27]:
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select("features", "Outcome")
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [29]:
train, test = final_data.randomSplit([0.6, 0.4])
models = LogisticRegression(labelCol="Outcome")
model = models.fit(train)

In [30]:
summary = model.summary
summary.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|               1190|               1190|
|   mean| 0.3352941176470588|0.25210084033613445|
| stddev|0.47229166540046863|0.43440140643979414|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



## **Evaluation & Testing of Model**

In [31]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [32]:
predictions.predictions.show(10)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,67.0,76.0,20...|      0|[2.67787833914568...|[0.93570860662752...|       0.0|
|[0.0,67.0,76.0,20...|      0|[2.67787833914568...|[0.93570860662752...|       0.0|
|[0.0,73.0,69.0,20...|      0|[4.32257574221833...|[0.98690800355989...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.72281490802774...|[0.97640436122882...|       0.0|
|[0.0,78.0,88.0,29...|      0|[2.70014820078138...|[0.93703538836572...|       0.0|
|[0.0,78.0,88.0,29...|      0|[2.70014820078138...|[0.93703538836572...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.33523862466574...|[0.91175374066746...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.33523862466574...|[0.91175374066746...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.41769732166085...|[0.91816689598209...|    

In [33]:
predictions.predictions.show(50)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,67.0,76.0,20...|      0|[2.67787833914568...|[0.93570860662752...|       0.0|
|[0.0,67.0,76.0,20...|      0|[2.67787833914568...|[0.93570860662752...|       0.0|
|[0.0,73.0,69.0,20...|      0|[4.32257574221833...|[0.98690800355989...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.72281490802774...|[0.97640436122882...|       0.0|
|[0.0,78.0,88.0,29...|      0|[2.70014820078138...|[0.93703538836572...|       0.0|
|[0.0,78.0,88.0,29...|      0|[2.70014820078138...|[0.93703538836572...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.33523862466574...|[0.91175374066746...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.33523862466574...|[0.91175374066746...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.41769732166085...|[0.91816689598209...|    

In [35]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="Outcome")
evaluator.evaluate(model.transform(test))

0.8290726817042591

In [36]:
model.save("DiabetesPredictorModel")

In [37]:
from pyspark.ml.classification import LogisticRegressionModel
DiabetesPredictorModel = LogisticRegressionModel.load("DiabetesPredictorModel")