In [17]:
import pyspark
from pyspark.sql.functions import when, lit
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
spark = pyspark.sql.SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

In [18]:
df = spark.read.csv("diabetes.csv", header=True)

In [19]:
df.toPandas()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148,72,35,0,33.6,0.627,50,1
1,1,85,66,29,0,26.6,0.351,31,0
2,8,183,64,0,0,23.3,0.672,32,1
3,1,89,66,23,94,28.1,0.167,21,0
4,0,137,40,35,168,43.1,2.288,33,1
...,...,...,...,...,...,...,...,...,...
763,10,101,76,48,180,32.9,0.171,63,0
764,2,122,70,27,0,36.8,0.34,27,0
765,5,121,72,23,112,26.2,0.245,30,0
766,1,126,60,0,0,30.1,0.349,47,1


In [20]:
df.select("Pregnancies", "Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI", "DiabetesPedigreeFunction", "Age", "Outcome").describe().show()

+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|summary|       Pregnancies|          Glucose|     BloodPressure|     SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------------+------------------+------------------+
|  count|               768|              768|               768|               768|               768|               768|                     768|               768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|20.536458333333332| 79.79947916666667|31.992578124999977|      0.4718763020833327|33.240885416666664|0.3489583333333333|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|15.95

In [21]:
# Check if there are duplicates
new_df = df.dropDuplicates()
new_df = df.exceptAll(new_df)
new_df.toPandas()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome


In [22]:
# Check if there are null values
df_with_nulls = df.filter(df.Pregnancies.isNull() & df.Glucose.isNull() & df.BloodPressure.isNull() & df.SkinThickness.isNull() & df.Insulin.isNull() & df.BMI.isNull() & df.DiabetesPedigreeFunction.isNull() & df.Age.isNull() & df.Outcome.isNull()).show()


+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



In [23]:
df.describe()

DataFrame[summary: string, Pregnancies: string, Glucose: string, BloodPressure: string, SkinThickness: string, Insulin: string, BMI: string, DiabetesPedigreeFunction: string, Age: string, Outcome: string]

In [24]:
# Convert data types
df = df.withColumn("Glucose", df.Glucose.cast(IntegerType()))
df = df.withColumn("BloodPressure", df.BloodPressure.cast(IntegerType()))
df = df.withColumn("SkinThickness", df.SkinThickness.cast(IntegerType()))
df = df.withColumn("Insulin", df.Insulin.cast(IntegerType()))
df = df.withColumn("BMI", df.BMI.cast(FloatType()))
df = df.withColumn("DiabetesPedigreeFunction", df.DiabetesPedigreeFunction.cast(FloatType()))
df = df.withColumn("Age", df.Age.cast(IntegerType()))
df = df.withColumn("Outcome", df.Outcome.cast(IntegerType()))

In [25]:
# Fix zero values
df = df.withColumn("Glucose", when(df.Glucose == 0, lit(df.groupBy().mean("Glucose").take(1)[0][0])).otherwise(df.Glucose))
df = df.withColumn("Insulin", when(df.Insulin == 0, lit(df.groupBy().mean("Insulin").take(1)[0][0])).otherwise(df.Insulin))
df = df.withColumn("BloodPressure", when(df.BloodPressure == 0, lit(df.groupBy().mean("BloodPressure").take(1)[0][0])).otherwise(df.BloodPressure))
df = df.withColumn("SkinThickness", when(df.SkinThickness == 0, lit(df.groupBy().mean("SkinThickness").take(1)[0][0])).otherwise(df.SkinThickness))
df = df.withColumn("BMI", when(df.BMI == 0, lit(df.groupBy().mean("BMI").take(1)[0][0])).otherwise(df.BMI))
df.toPandas()


Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome
0,6,148.0,72.0,35.000000,79.799479,33.599998,0.627,50,1
1,1,85.0,66.0,29.000000,79.799479,26.600000,0.351,31,0
2,8,183.0,64.0,20.536458,79.799479,23.299999,0.672,32,1
3,1,89.0,66.0,23.000000,94.000000,28.100000,0.167,21,0
4,0,137.0,40.0,35.000000,168.000000,43.099998,2.288,33,1
...,...,...,...,...,...,...,...,...,...
763,10,101.0,76.0,48.000000,180.000000,32.900002,0.171,63,0
764,2,122.0,70.0,27.000000,79.799479,36.799999,0.340,27,0
765,5,121.0,72.0,23.000000,112.000000,26.200001,0.245,30,0
766,1,126.0,60.0,20.536458,79.799479,30.100000,0.349,47,1


In [26]:
df = df.withColumn("Pregnancies", df.Pregnancies.cast(IntegerType()))
df = df.withColumn("Glucose", df.Glucose.cast(IntegerType()))
df = df.withColumn("Glucose", df.Glucose.cast(FloatType()))
df = df.withColumn("BloodPressure", df.BloodPressure.cast(FloatType()))
df = df.withColumn("SkinThickness", df.SkinThickness.cast(FloatType()))
df = df.withColumn("Insulin", df.Insulin.cast(FloatType()))
df = df.withColumn("BMI", df.BMI.cast(FloatType()))
df = df.withColumn("DiabetesPedigreeFunction", df.DiabetesPedigreeFunction.cast(FloatType()))
df = df.withColumn("Age", df.Age.cast(IntegerType()))
df = df.withColumn("Outcome", df.Outcome.cast(IntegerType()))

In [27]:
df.select("Pregnancies", "Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI", "DiabetesPedigreeFunction", "Age", "Outcome").describe().show()

+-------+------------------+----------------+------------------+-----------------+------------------+------------------+------------------------+------------------+------------------+
|summary|       Pregnancies|         Glucose|     BloodPressure|    SkinThickness|           Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+------------------+----------------+------------------+-----------------+------------------+------------------+------------------------+------------------+------------------+
|  count|               768|             768|               768|              768|               768|               768|                     768|               768|               768|
|   mean|3.8450520833333335|    121.67578125| 72.25480651855469|26.60647940884034|118.66016179323196|32.450805174807705|      0.4718763029280429|33.240885416666664|0.3489583333333333|
| stddev|  3.36957806269887|30.4362515773519|12.115931598253848|9.63124059219339

In [28]:
# Select features
features = ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin',
       'BMI', 'DiabetesPedigreeFunction', 'Age']
assembler = VectorAssembler(inputCols=features, outputCol="features")
df = assembler.transform(df)
df.show()


+-----------+-------+-------------+-------------+--------+---------+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness| Insulin|      BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+--------+---------+------------------------+---+-------+--------------------+
|          6|  148.0|         72.0|         35.0|79.79948|     33.6|                   0.627| 50|      1|[6.0,148.0,72.0,3...|
|          1|   85.0|         66.0|         29.0|79.79948|     26.6|                   0.351| 31|      0|[1.0,85.0,66.0,29...|
|          8|  183.0|         64.0|    20.536459|79.79948|     23.3|                   0.672| 32|      1|[8.0,183.0,64.0,2...|
|          1|   89.0|         66.0|         23.0|    94.0|     28.1|                   0.167| 21|      0|[1.0,89.0,66.0,23...|
|          0|  137.0|         40.0|         35.0|   168.0|     43.1|                   2.288| 33|      1|[0.0,1

In [29]:
# Split to train and test datasets
train, test = df.randomSplit([0.7, 0.3], seed=2023)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))


Training Dataset Count: 522
Test Dataset Count: 246


In [30]:
# Run classification model
rf = RandomForestClassifier(featuresCol="features", labelCol="Outcome")
rfModel = rf.fit(train)
predictions = rfModel.transform(test)
predictions.select("Pregnancies", "Glucose", "BloodPressure", "SkinThickness", "Insulin", "BMI", "DiabetesPedigreeFunction", "Age", "Outcome", "rawPrediction", "prediction", "probability").show()

+-----------+-------+-------------+-------------+--------+----+------------------------+---+-------+--------------------+----------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness| Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|       rawPrediction|prediction|         probability|
+-----------+-------+-------------+-------------+--------+----+------------------------+---+-------+--------------------+----------+--------------------+
|          0|   84.0|         64.0|         22.0|    66.0|35.8|                   0.545| 21|      0|[16.6616657749355...|       0.0|[0.83308328874677...|
|          0|   91.0|         80.0|    20.536459|79.79948|32.4|                   0.601| 27|      0|[15.3568195084238...|       0.0|[0.76784097542119...|
|          0|   93.0|         60.0|         25.0|    92.0|28.7|                   0.532| 22|      0|[18.7369341340645...|       0.0|[0.93684670670322...|
|          0|   95.0|         85.0|         25.0|    36.0|37.4|             

In [31]:
predictions.select("Outcome", "prediction").show(10)


+-------+----------+
|Outcome|prediction|
+-------+----------+
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
+-------+----------+
only showing top 10 rows



In [32]:
# Evaluate classification accuracy
evaluator = BinaryClassificationEvaluator(labelCol="Outcome", rawPredictionCol="rawPrediction")
accurracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % accurracy)
print("Test Error = %s" % (1.0 - accurracy))


Accuracy = 0.8481779880257216
Test Error = 0.15182201197427836
