<a href="https://colab.research.google.com/github/parul6695/Diabetes-Prediction-using-Machine-Learning-in-Apache-Spark/blob/master/diabetes_prediction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Install dependencies to run **Pyspark**

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark==2.4.4

Collecting pyspark==2.4.4
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 61kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 35.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130388 sha256=db4605b4c86922c376879c8a74a2b2b0388d5fac5e6f79d1c00d1222bfc4ded5
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4

Setting Environment Path

In [7]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"


Run a SparkSession

In [8]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark").getOrCreate()

Clone Diabetes Dataset

In [4]:
!git clone https://github.com/education454/diabetes_dataset

Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 3, done.[K
remote: Counting objects: 100% (3/3), done.[K
remote: Compressing objects: 100% (2/2), done.[K
remote: Total 3 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (3/3), done.


In [6]:
!ls diabetes_dataset

diabetes.csv


Setting "header=True" will make sure that headers already present in file get treated as it is, if set False, it will treat it as of type string and as a record. Setting "inferSchema=True" will ensure the reading of the datatypes of column as it is present in original dataset.

In [9]:
df=spark.read.csv("/content/diabetes_dataset/diabetes.csv",header=True,inferSchema=True)

In [10]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


printSchema(): to check the datatype

In [10]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [26]:
print(("Rows=",df.count(),"Columns=",len(df.columns)))

('Rows=', 2000, 'Columns=', 9)


Exploring dataset to know how many patients are diabetic, represented by 1 in dataset.

In [30]:
df.groupBy("Outcome").count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



Use "describe" to calculate stats of numerical columns in our dataset.

In [32]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

From here we observed that value can be 0 for pregranancy but it can't be zero for othr numerical columns like Glucose, BloodPressure and so on. Let's replace these invalid values with mean. These invalid values may lead to bad machine learning model. There might be null values present in these columns values.

**Lets do Data Cleaning**

In [11]:
#checking count of null values in each column
for col in df.columns:
  print(col+":",df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


From above, we observed that there is no Null value present in those columns, so lets check for number of zero values in those 5 columns which can't be zero in real life.

In [12]:
#define a function to count number of zeroes in 5 columns:Glucose,BloodPressure,SkinThickness,Insulin,BMI.
def count_zeros():
  col=["Glucose","BloodPressure","SkinThickness","Insulin","BMI"]
  for i in col:
    print(i+":",df[df[i]==0].count())
count_zeros()

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [22]:
#finding mean for each column
from pyspark.sql.functions import *
for i in df.columns[1:6]:
  data=df.agg({i:"mean"}).first()[0]
  print("Mean for {} is {}".format(i,data))
  #withColumn return new dataframe, now replacing 0s with mean value
  df=df.withColumn(i,when(df[i]==0,int(data)).otherwise(df[i]))
#checking dataframe if values replaced
df.show()

Mean for Glucose is 121.969
Mean for BloodPressure is 72.2505
Mean for SkinThickness is 26.665
Mean for Insulin is 118.494
Mean for BMI is 32.640999999999984
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|       

Corelation: Now we will find corelation between input and output variable. Positive corelation means if input variable increase,output variable also increase and negative corelation vice-a-versa.

In [23]:
for col in df.columns:
  print("Corelation to outcome for {} is {} ".format(col,df.stat.corr('Outcome',col)))

Corelation to outcome for Pregnancies is 0.22443699263363961 
Corelation to outcome for Glucose is 0.48796646527321064 
Corelation to outcome for BloodPressure is 0.17171333286446713 
Corelation to outcome for SkinThickness is 0.1659010662889893 
Corelation to outcome for Insulin is 0.1711763270226193 
Corelation to outcome for BMI is 0.2827927569760082 
Corelation to outcome for DiabetesPedigreeFunction is 0.1554590791569403 
Corelation to outcome for Age is 0.23650924717620253 
Corelation to outcome for Outcome is 1.0 


From above we have observed that there is no highly corelated input variable, so we will keep all of them. To apply ML model on them, we need them in a single column vector, for that we need to import VectorAssembler. VectorAssembler is a input transformer to get a single input vector from multiple columns.

In [27]:
from pyspark.ml.feature import VectorAssembler
assembler=VectorAssembler(inputCols=["Pregnancies","Glucose","BloodPressure","SkinThickness","Insulin","BMI","DiabetesPedigreeFunction","Age"],outputCol='features')
outputfeatures=assembler.transform(df)
outputfeatures.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [28]:
outputfeatures.show()


+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

Now applying Logistic regression ML model on our data

In [30]:
from pyspark.ml.classification import LogisticRegression
#selecting only relevant columsn to train our model
finalData=outputfeatures.select("features","Outcome")
finalData.printSchema()
train,test=finalData.randomSplit([0.7,0.3])
#creating model
models=LogisticRegression(labelCol="Outcome")
model=models.fit(train)

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



Let us see the model summary values like mean, standard deviation and so on.

In [31]:
summary=model.summary
summary.predictions.describe().show()

+-------+-------------------+------------------+
|summary|            Outcome|        prediction|
+-------+-------------------+------------------+
|  count|               1414|              1414|
|   mean| 0.3338048090523338| 0.260961810466761|
| stddev|0.47173778666566724|0.4393145048929266|
|    min|                0.0|               0.0|
|    max|                1.0|               1.0|
+-------+-------------------+------------------+



**Evalauation and Test Model**: we will be using BinaryClassificationEvaluator for performance measure of our model. BinaryClassificationEvaluator uses area under the ROC curve.

In [35]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
newpredictions=model.evaluate(test)
newpredictions.predictions.show()

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|      0|[3.90733428296241...|[0.98030182047910...|       0.0|
|[0.0,67.0,76.0,20...|      0|[2.41163604084024...|[0.91771031701682...|       0.0|
|[0.0,73.0,69.0,20...|      0|[4.31146907857720...|[0.98676372005437...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.76137492694367...|[0.97727660963669...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.76137492694367...|[0.97727660963669...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.59625521007304...|[0.93062018213674...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.59625521007304...|[0.93062018213674...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.70401782403069...|[0.93726331066765...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.26339603991380...|[0.90579980292783...|    

In [36]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.826329331046312

In [37]:
model.save("LRModel")

Loading the saved LRModel to use again 

In [38]:
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load('LRModel')