In [57]:

from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.appName("spark").getOrCreate()

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from sklearn.metrics import classification_report, confusion_matrix
from pyspark.ml.classification import LogisticRegressionModel

spark

In [58]:
df = (
        spark.read.format('csv')
        .options(header = True, inferSchema = True )
        .load('data/diabetes.csv')
)

df.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



In [59]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [60]:
df.groupby('Outcome').count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



In [61]:
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

# Cleaning Data

In [62]:
for column in df.columns:
  print(column+":",df[df[column].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


In [63]:
def count_zeros():
  columns_list =['Glucose','BloodPressure','SkinThickness','Insulin','BMI']
  for i in columns_list:
    print(i+":",df[df[i]==0].count())

count_zeros()    

Glucose: 13
BloodPressure: 90
SkinThickness: 573
Insulin: 956
BMI: 28


In [64]:
for column in df.columns[1:6]:
  data = df.agg({column:'mean'}).first()[0]
  print(f"Mean value for {column} is {int(data)}")
  df = df.withColumn(column,F.when(df[column]==0,int(data)).otherwise(df[column]))

Mean value for Glucose is 121
Mean value for BloodPressure is 69
Mean value for SkinThickness is 20
Mean value for Insulin is 80
Mean value for BMI is 32


In [65]:
df.describe().show()

+-------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|     SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|              2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|           121.969|           72.2505|            26.665|          118.494|32.640999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|30.533214334373536|11.970354817098098|10.0542189

In [66]:
df.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



# Correlation

In [67]:
for column in df.columns:
  print(f"correlation to outcome for {column} is {df.stat.corr('Outcome',column)}")

correlation to outcome for Pregnancies is 0.22443699263363961
correlation to outcome for Glucose is 0.48796646527321064
correlation to outcome for BloodPressure is 0.17171333286446713
correlation to outcome for SkinThickness is 0.1659010662889893
correlation to outcome for Insulin is 0.1711763270226193
correlation to outcome for BMI is 0.2827927569760082
correlation to outcome for DiabetesPedigreeFunction is 0.1554590791569403
correlation to outcome for Age is 0.23650924717620253
correlation to outcome for Outcome is 1.0


# Feature Selection

In [68]:
assembler = VectorAssembler(
    inputCols=[
        'Pregnancies',
        'Glucose',
        'BloodPressure',
        'SkinThickness',
        'Insulin',
        'BMI',
        'DiabetesPedigreeFunction',
        'Age'
        ],
    outputCol='features')
output_data = assembler.transform(df)

output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [69]:
output_data.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|[0.0,135.0,68.0,4...|
|          1|    139|           62|           41|    480|40.7|                   0.536| 21|      0|[1.0,139.0,62.0,4...|
+-----------+-------+-----------

# Build & Train Model

In [70]:
final_data = output_data.select('features','Outcome')

In [71]:
final_data.printSchema()

root
 |-- features: vector (nullable = true)
 |-- Outcome: integer (nullable = true)



In [72]:
train , test = final_data.randomSplit([0.7,0.3])
models = LogisticRegression(labelCol='Outcome')
model = models.fit(train)

In [73]:
summary = model.summary

In [82]:
summary.predictions.describe().show()



+-------+-------------------+-------------------+
|summary|            Outcome|         prediction|
+-------+-------------------+-------------------+
|  count|               1382|               1382|
|   mean| 0.3321273516642547|0.24819102749638206|
| stddev|0.47114689406817134| 0.4321196075477067|
|    min|                0.0|                0.0|
|    max|                1.0|                1.0|
+-------+-------------------+-------------------+



# Evaluation & Test Model

In [75]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = model.evaluate(test)

In [76]:
predictions.predictions.show(20)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,78.0,88.0,29...|      0|[2.87867508863847...|[0.94678214650030...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.61646419505095...|[0.93191369996150...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.85410726967892...|[0.94553060486042...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.85410726967892...|[0.94553060486042...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.40257474508149...|[0.91702343009370...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.50292968107612...|[0.92434694647353...|       0.0|
|[0.0,94.0,69.0,20...|      0|[2.76258231441719...|[0.94062003080037...|       0.0|
|[0.0,94.0,70.0,27...|      0|[1.93344946248917...|[0.87363073186593...|       0.0|
|[0.0,94.0,70.0,27...|      0|[1.93344946248917...|[0.87363073186593...|    

In [77]:
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.8271303364433134

In [78]:
print("Test Error = %g" % (1.0 - evaluator.evaluate(model.transform(test))))

Test Error = 0.17287


In [79]:
type(predictions.predictions)

pyspark.sql.dataframe.DataFrame

In [80]:
y_true = predictions.predictions.select(['Outcome']).collect()
y_pred = predictions.predictions.select(['prediction']).collect()

print(classification_report(y_true, y_pred))
print(confusion_matrix(y_true,y_pred))



              precision    recall  f1-score   support

           0       0.77      0.87      0.82       393
           1       0.70      0.55      0.61       225

    accuracy                           0.75       618
   macro avg       0.74      0.71      0.72       618
weighted avg       0.75      0.75      0.74       618

[[341  52]
 [102 123]]


In [81]:
model.save("data/diabete-model")

Py4JJavaError: An error occurred while calling o1228.save.
: java.io.IOException: Path data/diabete-model already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:683)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
model = LogisticRegressionModel.load('data/diabete-model')

In [None]:
type(model)

pyspark.ml.classification.LogisticRegressionModel