In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession


create a sparksession

In [3]:
spark = SparkSession.builder.appName('Diabetes').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/02/05 16:34:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/05 16:34:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/02/05 16:34:53 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/02/05 16:34:53 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [4]:
spark

Load and explore the dataset

In [5]:
diabetes_df = spark.read.csv('diabetes.csv',header = True,inferSchema = True)

                                                                                

In [6]:
diabetes_df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [7]:
# Print the schema
diabetes_df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [8]:
# get the dimensions of the data
print((diabetes_df.count(),len(diabetes_df.columns)))

(768, 9)


In [9]:
# Check the columsn of the data
diabetes_df.columns

['Pregnancies',
 'Glucose',
 'BloodPressure',
 'SkinThickness',
 'Insulin',
 'BMI',
 'DiabetesPedigreeFunction',
 'Age',
 'Outcome']

In [10]:
#Count the total number of diabetes and non-diabetic classes
diabetes_df.groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  268|
|      0|  500|
+-------+-----+



In [11]:
# get the summary statistics
diabetes_df.describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|summary|       Pregnancies|          Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|  count|               768|              768|               768|               768|               768|               768|                     768|               768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|20.536458333333332| 79.79947916666667|31.992578124999977|      0.4718763020833327|33.240885416666664|0.3489583333333333|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|15.95

In [12]:
# Check for null values
for i in diabetes_df.columns:
    print(i+ ":" , diabetes_df[diabetes_df[i].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [13]:
# Look for the unneccesary values present in the data
def count_zeros():
    columns_list = ['Glucose','BloodPressure','SkinThickness','Insulin','BMI']
    for i in columns_list:
        print(i+ " : " ,diabetes_df[diabetes_df[i]==0].count())

In [14]:
count_zeros()

Glucose :  5
BloodPressure :  35
SkinThickness :  227
Insulin :  374
BMI :  11


In [15]:
# Calculate and replace the unneccesary values with the mean of the column
from pyspark.sql.functions import *
for i in diabetes_df.columns[1:6]:
    data = diabetes_df.agg({i:'mean'}).first()[0]
    print('The mean value for {} is {} '.format(i,int(data)))
    diabetes_df = diabetes_df.withColumn(i,when(diabetes_df[i]==0,int(data)).otherwise(diabetes_df[i]))

The mean value for Glucose is 120 
The mean value for BloodPressure is 69 
The mean value for SkinThickness is 20 
The mean value for Insulin is 79 
The mean value for BMI is 31 


In [16]:
# Display the data
diabetes_df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|     79|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|     79|26.6|                   0.351| 31|      0|
|          8|    183|           64|           20|     79|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|           20|     79|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [17]:
# Correlation Analysis
for col in diabetes_df.columns:
    print(" The Correlation to Outcome for {} is {} ".format(col,diabetes_df.stat.corr("Outcome",col)))

 The Correlation to Outcome for Pregnancies is 0.22189815303398638 
 The Correlation to Outcome for Glucose is 0.49288410274882094 
 The Correlation to Outcome for BloodPressure is 0.16287909949861834 
 The Correlation to Outcome for SkinThickness is 0.171856814176564 
 The Correlation to Outcome for Insulin is 0.17869558803050842 
 The Correlation to Outcome for BMI is 0.31289043493401536 
 The Correlation to Outcome for DiabetesPedigreeFunction is 0.17384406565296007 
 The Correlation to Outcome for Age is 0.23835598302719757 
 The Correlation to Outcome for Outcome is 1.0 


In [19]:
# Feature selection
from pyspark.ml.feature import VectorAssembler
feature_assembler = VectorAssembler(inputCols = ['Pregnancies',
 'Glucose',
 'BloodPressure',
 'SkinThickness',
 'Insulin',
 'BMI',
 'DiabetesPedigreeFunction',
 'Age'],outputCol = 'features')

In [22]:
output_data = feature_assembler.transform(diabetes_df)
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          6|    148|           72|           35|     79|33.6|                   0.627| 50|      1|[6.0,148.0,72.0,3...|
|          1|     85|           66|           29|     79|26.6|                   0.351| 31|      0|[1.0,85.0,66.0,29...|
|          8|    183|           64|           20|     79|23.3|                   0.672| 32|      1|[8.0,183.0,64.0,2...|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|[1.0,89.0,66.0,23...|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|[0.0,137.0,40.0,3...|
|          5|    116|           

In [23]:
#Print the schema
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



Machine Learning


In [24]:
# import Logistic regression and create final dataframe
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select('features','Outcome')

In [25]:
# print the Schema
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [26]:
# Split the dataset into test and train
train,test = final_data.randomSplit([0.75,0.25])

In [27]:
# Build and train the model
models = LogisticRegression(labelCol = 'Outcome')
model = models.fit(train)

[Stage 80:>                                                         (0 + 1) / 1]                                                                                

23/02/05 16:51:56 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
23/02/05 16:51:56 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


In [29]:
# Coefficients
print("Coefficients : ",model.coefficients)

Coefficients :  [0.132799058218365,0.03957911904211467,-0.016170703359743594,0.00862808941216656,-0.0021931465975932782,0.08048938191459552,0.8643868743327963,0.01761577489644412]


In [30]:
# Intercept
print("Intercept : ",model.intercept)

Intercept :  -8.539733824956494


In [28]:
# Summary of the model
summary = model.summary
summary.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|                600|                600|
|   mean|0.34833333333333333|               0.26|
| stddev|0.47683978706315305|0.43900023007207967|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [32]:
# print accuracy
print("The accuracy : ",summary.accuracy)

The accuracy :  0.7783333333333333


In [35]:
# Evaluate the model
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [36]:
# Display predictions
predictions.predictions.show()

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,78.0,88.0,29...|      0|[2.99796233734555...|[0.95248198709127...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.62940864615918...|[0.93273045479119...|       0.0|
|[0.0,93.0,60.0,25...|      0|[2.65773908148315...|[0.93448638546640...|       0.0|
|[0.0,95.0,80.0,45...|      0|[2.20575899244606...|[0.90076547954559...|       0.0|
|[0.0,97.0,64.0,36...|      0|[1.72315198234839...|[0.84853438622524...|       0.0|
|[0.0,99.0,69.0,20...|      0|[3.11940488978313...|[0.95768611882616...|       0.0|
|[0.0,100.0,70.0,2...|      0|[2.23405496131886...|[0.90326624650584...|       0.0|
|[0.0,100.0,88.0,6...|      0|[0.58387230890674...|[0.64195793699241...|       0.0|
|[0.0,102.0,52.0,2...|      0|[2.88660011525437...|[0.94718004368028...|    

In [37]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol = 'rawPrediction',labelCol = 'Outcome')
evaluator.evaluate(model.transform(test))

0.8575649199191415

In [38]:
# Save the model
model.save('model')

                                                                                