# Install Dependencies

In [None]:
!pip install pyspark



# Run a SparkSession

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark").getOrCreate()

# Clone Diabetes Dataset

In [None]:
! git clone https://github.com/education454/diabetes_dataset

fatal: destination path 'diabetes_dataset' already exists and is not an empty directory.


In [None]:
! ls diabetes_dataset

diabetes.csv  new_test.csv


In [None]:
df = spark.read.csv('/content/diabetes_dataset/diabetes.csv',header=True,inferSchema=True)

In [None]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|      0|25.6|                   0.294| 28|      0|


In [None]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [None]:
df.groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [None]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

# Cleaning Data

In [None]:
for col in df.columns:
  print(col+":",df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [None]:
def count_zeros():
  columns_list =['Glucose','BloodPressure','SkinThickness','Insulin','BMI']
  for i in columns_list:
    print(i+":",df[df[i]==0].count())

In [None]:
count_zeros()

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [None]:
from pyspark.sql.functions import *
for i in df.columns[1:6]:
  data = df.agg({i:'mean'}).first()[0]
  print("Mean value for {} is {}".format(i,int(data)))
  df = df.withColumn(i,when(df[i]==0,int(data)).otherwise(df[i]))

Mean value for Glucose is 121
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 80
Mean value for BMI is 32


In [None]:
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|     SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|              2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|           121.969|           72.2505|            26.665|          118.494|32.640999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|30.533214334373536|11.970354817098098|10.0542189

In [None]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
|          0|    173|           78|           32|    265|46.5|                   1.159| 58|      0|
|          4|     99|           72|           17|     80|25.6|                   0.294| 28|      0|


# Correlation

In [None]:
for col in df.columns:
  print("correlation to outcome for {} is {}".format(col,df.stat.corr('Outcome',col)))

correlation to outcome for Pregnancies is 0.22443699263363961
correlation to outcome for Glucose is 0.48796646527321064
correlation to outcome for BloodPressure is 0.17171333286446713
correlation to outcome for SkinThickness is 0.1659010662889893
correlation to outcome for Insulin is 0.1711763270226193
correlation to outcome for BMI is 0.2827927569760082
correlation to outcome for DiabetesPedigreeFunction is 0.1554590791569403
correlation to outcome for Age is 0.23650924717620253
correlation to outcome for Outcome is 1.0


# Feature Selection

In [None]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['Pregnancies','Glucose','BloodPressure','SkinThickness','Insulin','BMI','DiabetesPedigreeFunction','Age'],outputCol='features')
output_data = assembler.transform(df)

In [None]:
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
output_data.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
|          0|    173|           

# Build & Train Model

In [None]:
from pyspark.ml.classification import LogisticRegression
final_data = output_data.select('features','Outcome')

In [None]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [None]:
train , test = final_data.randomSplit([0.7,0.3])
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)

In [None]:
summary = model.summary

In [None]:
summary.predictions.describe().show()

+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|               1419|               1419|
|   mean|0.34460887949260044|0.26990838618745594|
| stddev| 0.4754081150241662|0.44406848358919887|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



# Evaluation & Test Model

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [None]:
predictions.predictions.show(20)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|      0|[3.97692625790894...|[0.98160168016472...|       0.0|
|[0.0,73.0,69.0,20...|      0|[4.19276613879647...|[0.98512030353284...|       0.0|
|[0.0,73.0,69.0,20...|      0|[4.19276613879647...|[0.98512030353284...|       0.0|
|[0.0,74.0,52.0,10...|      0|[3.62467033451346...|[0.97403430637816...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.47890263441681...|[0.92264951806551...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.40071363776683...|[0.91688170576830...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.40804215218578...|[0.91743850565431...|       0.0|
|[0.0,93.0,60.0,20...|      0|[2.40804215218578...|[0.91743850565431...|       0.0|
|[0.0,93.0,60.0,25...|      0|[2.76413737990466...|[0.94070682791988...|    

In [None]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.8402816527168858

In [None]:
print("Test Error = %g" % (1.0 - evaluator.evaluate(model.transform(test))))

Test Error = 0.173292


In [None]:
type(predictions.predictions)

pyspark.sql.dataframe.DataFrame

In [None]:
from sklearn.metrics import classification_report, confusion_matrix

y_true = predictions.predictions.select(['Outcome']).collect()
y_pred = predictions.predictions.select(['prediction']).collect()

print(classification_report(y_true, y_pred))
print(confusion_matrix(y_true,y_pred))



              precision    recall  f1-score   support

           0       0.78      0.87      0.82       361
           1       0.72      0.57      0.63       206

    accuracy                           0.76       567
   macro avg       0.75      0.72      0.73       567
weighted avg       0.76      0.76      0.75       567

[[315  46]
 [ 89 117]]


In [None]:
model.save("model")

In [None]:
from pyspark.ml.classification import LogisticRegressionModel
model = LogisticRegressionModel.load('model')