In [14]:
# install pyspark
! pip install pyspark



In [2]:
# create Spark session. If it already exists then Get it or Create a new one.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("spark").getOrCreate()

In [3]:
! git clone https://github.com/education454/diabetes_dataset

Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (6/6), 13.02 KiB | 4.34 MiB/s, done.


In [4]:
! ls diabetes_dataset

diabetes.csv  new_test.csv


In [40]:
# Create dataframe in spark.
# Panda dataframe are Mutable, meaning they can be changed.
# Spark Dataframe are immutable, meaning meaning once created, a Spark DataFrame cannot be directly modified. Transformations create new DataFrames.
df = spark.read.csv('/content/diabetes_dataset/diabetes.csv', header=True, inferSchema=True)

In [6]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [7]:
# Print the dataframe's schema
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [9]:
# in df, count gives # of rows and df.columns gives the # of columns
print((df.count(), len(df.columns)))

(2000, 9)


In [10]:
df.groupBy('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [12]:
# describe() gives the details of each column like count, mean, stddeviation, min and max values
df.describe().show()


+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

In [15]:
# find null values
for col in df.columns:
  print (col+ ":", df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [17]:
# For all the columns find total number of 0 entries
def count_zeros():
  columns_list = ['Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI']
  for i in columns_list:
    print(i+":", df[df[i]==0].count())
count_zeros()

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [21]:
# Find mean values for all the columns
from pyspark.sql.functions import *
for i in df.columns[1:6]:
  data = df.agg({i:'mean'}).first()[0]
  print("Mean value for {} is {}".format(i,int(data)))
  df = df.withColumn(i,when(df[i]==0,int(data)).otherwise(df[i]))

Mean value for Glucose is 121
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 80
Mean value for BMI is 32


In [22]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


In [23]:
for col in df.columns:
  print("Coorelation to the final Outcome for {} is {}".format(col,df.stat.corr('Outcome',col)))

Coorelation to the final Outcome for Pregnancies is 0.22443699263363961
Coorelation to the final Outcome for Glucose is 0.48796646527321064
Coorelation to the final Outcome for BloodPressure is 0.17171333286446713
Coorelation to the final Outcome for SkinThickness is 0.1659010662889893
Coorelation to the final Outcome for Insulin is 0.1711763270226193
Coorelation to the final Outcome for BMI is 0.2827927569760082
Coorelation to the final Outcome for DiabetesPedigreeFunction is 0.1554590791569403
Coorelation to the final Outcome for Age is 0.23650924717620253
Coorelation to the final Outcome for Outcome is 1.0


In [25]:
# Create a Vector assembler by comn=bining all the columns together as one data point
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI','DiabetesPedigreeFunction', 'Age'],outputCol='features')

In [26]:
output_data = assembler.transform(df)

In [27]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [28]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

In [29]:
# Logistic Regression classification based on sigmoid function to output a probability, which is then used to assign the data point to one of two classes
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select('features', 'Outcome')

In [30]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [None]:
#create training and test data by spliting in 70% of training data and 30% of test data
train, test = final_data.randomSplit([0.7, 0.3])

In [33]:
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)
summary=model.summary
summary.predictions.describe().show()


+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|               1406|               1406|
|   mean|0.34210526315789475|0.26600284495021337|
| stddev|0.47458344231130917|0.44202295876720005|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



In [39]:
# Evaluation and Test Model
# Binary Classification Model which give results on 1/0, true/false, spam/not spam format. Just 2 possible outputs
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [36]:
predictions.predictions.show(20)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|      0|[4.44071325709558...|[0.98834979908657...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.58328034254764...|[0.92977774842224...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.77462066363675...|[0.94128886481978...|       0.0|
|[0.0,86.0,68.0,32...|      0|[2.77381323657615...|[0.94124422715709...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.33632171395749...|[0.91184084593944...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.47501710639359...|[0.92237176230597...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.47501710639359...|[0.92237176230597...|       0.0|
|[0.0,93.0,60.0,25...|      0|[2.83797153081988...|[0.94469357507297...|       0.0|
|[0.0,93.0,100.0,3...|      0|[1.33107949314506...|[0.79101913954958...|    

In [37]:
# use Binary classification evaluator to evaluate the model. It shows 82.33% accuracy
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.8233278318823786

In [38]:
# save the model for future use
model.save("model")

In [None]:
from pyspark.ml.classification import LogisticRegressionModel
# load the logistic regression model created for use
model = LogisticRegressionModel.load('model')