Installing Collab and Pyspark

In [9]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

In [4]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/8e/b0/bf9020b56492281b9c9d8aae8f44ff51e1bc91b3ef5a884385cb4e389a40/pyspark-3.0.0.tar.gz (204.7MB)
[K     |████████████████████████████████| 204.7MB 63kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 32.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044182 sha256=e68abe277c770784c5c519d3efc6fa84eaa95dd54ea8afd487f1a0854eb6acde
  Stored in directory: /root/.cache/pip/wheels/57/27/4d/ddacf7143f8d5b76c45c61ee2e43d9f8492fc5a8e78ebd7d37
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.0


In [13]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"

In [14]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

**Clone Dataset**

In [15]:
! git clone https://github.com/education454/diabetes_dataset

Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 3, done.[K
remote: Counting objects:  33% (1/3)[Kremote: Counting objects:  66% (2/3)[Kremote: Counting objects: 100% (3/3)[Kremote: Counting objects: 100% (3/3), done.[K
remote: Compressing objects:  50% (1/2)[Kremote: Compressing objects: 100% (2/2)[Kremote: Compressing objects: 100% (2/2), done.[K
remote: Total 3 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects:  33% (1/3)   Unpacking objects:  66% (2/3)   Unpacking objects: 100% (3/3)   Unpacking objects: 100% (3/3), done.


In [16]:
! ls diabetes_dataset

diabetes.csv


In [17]:
df = spark.read.csv('/content/diabetes_dataset/diabetes.csv',header=True, inferSchema= True)

In [18]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [19]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [20]:
print(df.count(),len(df.columns))

2000 9


In [21]:
df.groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [23]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

In [24]:
#find null values

In [25]:
for col in df.columns:
  print(col+":",df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [27]:
#find total number of zero
def count_zeros():
  columns_list=['Glucose','BloodPressure','SkinThickness','Insulin','BMI']
  for i in columns_list:
    print(i+":",df[df[i]==0].count())

In [28]:
count_zeros()

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [32]:
from pyspark.sql.functions import *
for i in df.columns[1:6]:
  data = df.agg({i:'mean'}).first()[0]
  print("Mean value for {} is {}".format(i,int(data)))

  df = df.withColumn(i,when(df[i]==0,int(data)).otherwise(df[i]))

Mean value for Glucose is 121
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 80
Mean value for BMI is 32


**Training the model**

In [33]:
#correlation

In [34]:
for col in df.columns:
  print("correlation to outcome for {} is {}".format(col,df.stat.corr('Outcome',col)))

correlation to outcome for Pregnancies is 0.22443699263363961
correlation to outcome for Glucose is 0.48796646527321064
correlation to outcome for BloodPressure is 0.17171333286446713
correlation to outcome for SkinThickness is 0.1659010662889893
correlation to outcome for Insulin is 0.1711763270226193
correlation to outcome for BMI is 0.2827927569760082
correlation to outcome for DiabetesPedigreeFunction is 0.1554590791569403
correlation to outcome for Age is 0.23650924717620253
correlation to outcome for Outcome is 1.0


In [38]:
from pyspark.ml.feature import VectorAssembler
assembler= VectorAssembler(inputCols=['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age'],outputCol='features')
output_data = assembler.transform(df)

In [40]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

Logistic Regression for binary classification

In [41]:
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select('features','Outcome')

In [42]:
train,test = final_data.randomSplit([0.7,0.3])
models = LogisticRegression(labelCol='Outcome')
model =models.fit(train)

In [43]:
summary = model.summary

In [44]:
summary.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|               1382|               1382|
|   mean|0.33357452966714907|0.24746743849493488|
| stddev|0.47166040218745575|0.43169683226232247|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [47]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)
predictions.predictions.show(40)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|      0|[3.45186898954827...|[0.96928682905027...|       0.0|
|[0.0,67.0,76.0,20...|      0|[2.43618551928781...|[0.91954533726187...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.62649456307626...|[0.97408040393626...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.62649456307626...|[0.97408040393626...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.21055323434184...|[0.90119319996469...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.33136896563246...|[0.91144189583672...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.44101697559371...|[0.91990205289925...|       0.0|
|[0.0,93.0,60.0,25...|      0|[2.66025731891078...|[0.93464038736166...|       0.0|
|[0.0,94.0,69.0,20...|      0|[2.56896285567250...|[0.92883717252827...|    

In [48]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.8480558551399213

In [49]:
model.save("Diabestes")

In [50]:
from pyspark.ml.classification import LogisticRegressionModel
Diabestes= LogisticRegressionModel.load('Diabestes')