In [248]:

from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier

from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [249]:
from pyspark.ml.classification import LogisticRegression, NaiveBayes, GBTClassifier, RandomForestClassifier
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [250]:
spark = SparkSession.builder.appName("pyspark classification session").getOrCreate()

In [251]:
ds = spark.read.csv("diabetes.csv",header=True)

In [252]:
ds.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



In [253]:
ds.show(5)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
only showing top 5 rows



In [254]:
from pyspark.sql.functions import col,count,isnan,when
newDs = ds.select(*(col(c).cast("float") for c in ds.columns))

In [255]:
newDs.show(3)

+-----------+-------+-------------+-------------+-------+----+------------------------+----+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction| Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+----+-------+
|        6.0|  148.0|         72.0|         35.0|    0.0|33.6|                   0.627|50.0|    1.0|
|        1.0|   85.0|         66.0|         29.0|    0.0|26.6|                   0.351|31.0|    0.0|
|        8.0|  183.0|         64.0|          0.0|    0.0|23.3|                   0.672|32.0|    1.0|
+-----------+-------+-------------+-------------+-------+----+------------------------+----+-------+
only showing top 3 rows



In [256]:
newDs.printSchema()

root
 |-- Pregnancies: float (nullable = true)
 |-- Glucose: float (nullable = true)
 |-- BloodPressure: float (nullable = true)
 |-- SkinThickness: float (nullable = true)
 |-- Insulin: float (nullable = true)
 |-- BMI: float (nullable = true)
 |-- DiabetesPedigreeFunction: float (nullable = true)
 |-- Age: float (nullable = true)
 |-- Outcome: float (nullable = true)



In [257]:
newDs.select([count(when(col(c).isNull(),c)).alias(c) for c in newDs.columns]).show()

+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|          0|      0|            0|            0|      0|  0|                       0|  0|      0|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



In [258]:
cols = newDs.columns
cols.remove("Outcome")

In [259]:
assambler = VectorAssembler(inputCols=cols,outputCol="features")
data = assambler.transform(newDs)
data.select("features", "outcome").show(truncate=False)

+-----------------------------------------------------------------------+-------+
|features                                                               |outcome|
+-----------------------------------------------------------------------+-------+
|[6.0,148.0,72.0,35.0,0.0,33.599998474121094,0.6269999742507935,50.0]   |1.0    |
|[1.0,85.0,66.0,29.0,0.0,26.600000381469727,0.35100001096725464,31.0]   |0.0    |
|[8.0,183.0,64.0,0.0,0.0,23.299999237060547,0.671999990940094,32.0]     |1.0    |
|[1.0,89.0,66.0,23.0,94.0,28.100000381469727,0.16699999570846558,21.0]  |0.0    |
|[0.0,137.0,40.0,35.0,168.0,43.099998474121094,2.2880001068115234,33.0] |1.0    |
|[5.0,116.0,74.0,0.0,0.0,25.600000381469727,0.20100000500679016,30.0]   |0.0    |
|[3.0,78.0,50.0,32.0,88.0,31.0,0.24799999594688416,26.0]                |1.0    |
|[10.0,115.0,0.0,0.0,0.0,35.29999923706055,0.1340000033378601,29.0]     |0.0    |
|[2.0,197.0,70.0,45.0,543.0,30.5,0.15800000727176666,53.0]              |1.0    |
|[8.0,125.0,96.0

In [260]:

standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
data=standardscaler.fit(data).transform(data)

In [261]:
data.select("features",'Outcome','Scaled_features').show(truncate=False)

+-----------------------------------------------------------------------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                               |Outcome|Scaled_features                                                                                                                                          |
+-----------------------------------------------------------------------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|[6.0,148.0,72.0,35.0,0.0,33.599998474121094,0.6269999742507935,50.0]   |1.0    |[1.7806383732194306,4.628960915766174,3.7198138711154307,2.1940523222807116,0.0,4.261709202425419,1.8923810993699686,4.251616970894646]                  |
|[1.0,85.0,66.0,29.0,0.0,26.600000381469727,0.3510000109

logistic regression 



In [262]:

assembled_data = data.select("Scaled_features","Outcome")
assembled_data.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|[1.78063837321943...|    1.0|
|[0.29677306220323...|    0.0|
|[2.37418449762590...|    1.0|
|[0.29677306220323...|    0.0|
|[0.0,4.2849165233...|    1.0|
|[1.48386531101619...|    0.0|
|[0.89031918660971...|    1.0|
|[2.96773062203238...|    0.0|
|[0.59354612440647...|    1.0|
|[2.37418449762590...|    1.0|
|[1.18709224881295...|    0.0|
|[2.96773062203238...|    1.0|
|[2.96773062203238...|    0.0|
|[0.29677306220323...|    1.0|
|[1.48386531101619...|    1.0|
|[2.07741143542266...|    1.0|
|[0.0,3.6906580274...|    1.0|
|[2.07741143542266...|    1.0|
|[0.29677306220323...|    0.0|
|[0.29677306220323...|    1.0|
+--------------------+-------+
only showing top 20 rows



In [263]:
train, test = assembled_data.randomSplit([0.7, 0.3])

In [264]:
#logistic regression

In [265]:
log_reg = LogisticRegression(labelCol="Outcome", featuresCol="Scaled_features",maxIter=40)
model=log_reg.fit(train)

In [266]:
prediction_test=model.transform(test)

In [267]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[2.9...|    1.0|[3.06911346834986...|[0.95560057373080...|       0.0|
|(8,[1,5,6,7],[2.2...|    0.0|[3.52568193595628...|[0.97140973110466...|       0.0|
|(8,[1,5,6,7],[4.3...|    1.0|[-0.8324047805670...|[0.30313683220081...|       1.0|
|[0.0,1.7827754878...|    0.0|[3.76548232458450...|[0.97736764418283...|       0.0|
|[0.0,2.4395875096...|    0.0|[2.65553366799045...|[0.93435123711860...|       0.0|
|[0.0,2.6272480873...|    0.0|[2.50470060491401...|[0.92447069356480...|       0.0|
|[0.0,2.6898016132...|    0.0|[2.32593059291758...|[0.91100195117843...|       0.0|
|[0.0,2.8461854279...|    0.0|[2.33372669396515...|[0.91163201669612...|       0.0|
|[0.0,2.9087389538...|    0.0|[2.71042033308713...|[0.93763873100133...|    

In [268]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    1.0|       0.0|
|    0.0|       0.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [269]:
# Compute raw scores on the test set
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd
predictionAndLabels.collect()

[Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=1.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, pr

In [270]:
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_LR = evaluator.evaluate(prediction_test)
print ("Accuracy = " ,(accuracy_LR*100))

Accuracy =  75.56561085972851


In [271]:
#NaiveBayes

In [272]:
naive_bayes = NaiveBayes(featuresCol='Scaled_features',labelCol='Outcome',smoothing=1.0)


In [273]:
# select example rows to display.
prediction_test = model.transform(test)

In [274]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[2.9...|    1.0|[3.06911346834986...|[0.95560057373080...|       0.0|
|(8,[1,5,6,7],[2.2...|    0.0|[3.52568193595628...|[0.97140973110466...|       0.0|
|(8,[1,5,6,7],[4.3...|    1.0|[-0.8324047805670...|[0.30313683220081...|       1.0|
|[0.0,1.7827754878...|    0.0|[3.76548232458450...|[0.97736764418283...|       0.0|
|[0.0,2.4395875096...|    0.0|[2.65553366799045...|[0.93435123711860...|       0.0|
|[0.0,2.6272480873...|    0.0|[2.50470060491401...|[0.92447069356480...|       0.0|
|[0.0,2.6898016132...|    0.0|[2.32593059291758...|[0.91100195117843...|       0.0|
|[0.0,2.8461854279...|    0.0|[2.33372669396515...|[0.91163201669612...|       0.0|
|[0.0,2.9087389538...|    0.0|[2.71042033308713...|[0.93763873100133...|    

In [275]:

prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    1.0|       0.0|
|    0.0|       0.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [276]:
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [277]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_NB = evaluator.evaluate(prediction_test)

In [278]:
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_LR = evaluator.evaluate(prediction_test)
print ("Accuracy = " ,(accuracy_LR*100))

Accuracy =  75.56561085972851


In [279]:
#RandomForestClassifier

In [280]:
random_forest_classifier = RandomForestClassifier(labelCol="Outcome", featuresCol="Scaled_features", numTrees=40)


In [281]:
model = random_forest_classifier.fit(train)


In [282]:
prediction_test = model.transform(test)

In [283]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[2.9...|    1.0|[32.2441722965139...|[0.80610430741284...|       0.0|
|(8,[1,5,6,7],[2.2...|    0.0|[38.8446835814312...|[0.97111708953578...|       0.0|
|(8,[1,5,6,7],[4.3...|    1.0|[20.0102903070982...|[0.50025725767745...|       0.0|
|[0.0,1.7827754878...|    0.0|[38.0135559420029...|[0.95033889855007...|       0.0|
|[0.0,2.4395875096...|    0.0|[33.9692389329879...|[0.84923097332469...|       0.0|
|[0.0,2.6272480873...|    0.0|[33.5457429490376...|[0.83864357372594...|       0.0|
|[0.0,2.6898016132...|    0.0|[35.1827065185715...|[0.87956766296428...|       0.0|
|[0.0,2.8461854279...|    0.0|[33.2578478371879...|[0.83144619592969...|       0.0|
|[0.0,2.9087389538...|    0.0|[37.9724252651216...|[0.94931063162804...|    

In [284]:
prediction_test.select("Outcome","prediction").show(5)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    1.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 5 rows



In [285]:

predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [286]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator( labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_RF= evaluator.evaluate(prediction_test)

In [287]:
print ("Accuracy",accuracy_RF)

Accuracy 0.7330316742081447


In [288]:
#print("Accuracy of GBT : ",accuracy_GBT)
print("Accuracy of LR : ",accuracy_LR)
print("Accuracy of NB : ",accuracy_NB)
print("Accuracy of RF : ",accuracy_RF)

Accuracy of LR :  0.755656108597285
Accuracy of NB :  0.755656108597285
Accuracy of RF :  0.7330316742081447
