In [1]:
import findspark
findspark.init()
findspark.find() 
from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier  

from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator 

In [2]:
spark = SparkSession.builder.appName("Classification with Spark").getOrCreate() #create a session

In [3]:
dataset = spark.read.csv("diabetes.csv",header=True) # read csv

In [4]:
dataset.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|  31|                   0.248| 26|      1|


In [5]:
dataset.printSchema() # datatype 

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



In [6]:
from pyspark.sql.functions import col
new_data = dataset.select(*(col(c).cast("float").alias(c) for c in dataset.columns))

In [7]:
new_data.printSchema()

root
 |-- Pregnancies: float (nullable = true)
 |-- Glucose: float (nullable = true)
 |-- BloodPressure: float (nullable = true)
 |-- SkinThickness: float (nullable = true)
 |-- Insulin: float (nullable = true)
 |-- BMI: float (nullable = true)
 |-- DiabetesPedigreeFunction: float (nullable = true)
 |-- Age: float (nullable = true)
 |-- Outcome: float (nullable = true)



In [8]:
from pyspark.sql.functions import col, count, isnan, when
#checking for null ir nan type values in our columns
new_data.select([count(when(col(c).isNull(), c)).alias(c) for c in new_data.columns]).show()

+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|          0|      0|            0|            0|      0|  0|                       0|  0|      0|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



In [9]:
cols=new_data.columns
cols.remove("Outcome") # remove outcome 
assembler = VectorAssembler(inputCols=cols,outputCol="features")

# Now let us use the transform method to transform our dataset
data=assembler.transform(new_data)

data.select("features",'Outcome').show(truncate=False) # truncate shows all features 

+-----------------------------------------------------------------------+-------+
|features                                                               |Outcome|
+-----------------------------------------------------------------------+-------+
|[6.0,148.0,72.0,35.0,0.0,33.599998474121094,0.6269999742507935,50.0]   |1.0    |
|[1.0,85.0,66.0,29.0,0.0,26.600000381469727,0.35100001096725464,31.0]   |0.0    |
|[8.0,183.0,64.0,0.0,0.0,23.299999237060547,0.671999990940094,32.0]     |1.0    |
|[1.0,89.0,66.0,23.0,94.0,28.100000381469727,0.16699999570846558,21.0]  |0.0    |
|[0.0,137.0,40.0,35.0,168.0,43.099998474121094,2.2880001068115234,33.0] |1.0    |
|[5.0,116.0,74.0,0.0,0.0,25.600000381469727,0.20100000500679016,30.0]   |0.0    |
|[3.0,78.0,50.0,32.0,88.0,31.0,0.24799999594688416,26.0]                |1.0    |
|[10.0,115.0,0.0,0.0,0.0,35.29999923706055,0.1340000033378601,29.0]     |0.0    |
|[2.0,197.0,70.0,45.0,543.0,30.5,0.15800000727176666,53.0]              |1.0    |
|[8.0,125.0,96.0

In [10]:
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
data=standardscaler.fit(data).transform(data)
# scaled features 

In [16]:
data.select("features",'Outcome','Scaled_features').show(truncate=False)

+-----------------------------------------------------------------------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                               |Outcome|Scaled_features                                                                                                                                          |
+-----------------------------------------------------------------------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|[6.0,148.0,72.0,35.0,0.0,33.599998474121094,0.6269999742507935,50.0]   |1.0    |[1.7806383732194306,4.628960915766174,3.7198138711154307,2.1940523222807116,0.0,4.261709202425419,1.8923810993699686,4.251616970894646]                  |
|[1.0,85.0,66.0,29.0,0.0,26.600000381469727,0.3510000109

In [17]:
assembled_data = data.select("Scaled_features","Outcome")
assembled_data.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|[1.78063837321943...|    1.0|
|[0.29677306220323...|    0.0|
|[2.37418449762590...|    1.0|
|[0.29677306220323...|    0.0|
|[0.0,4.2849165233...|    1.0|
|[1.48386531101619...|    0.0|
|[0.89031918660971...|    1.0|
|[2.96773062203238...|    0.0|
|[0.59354612440647...|    1.0|
|[2.37418449762590...|    1.0|
|[1.18709224881295...|    0.0|
|[2.96773062203238...|    1.0|
|[2.96773062203238...|    0.0|
|[0.29677306220323...|    1.0|
|[1.48386531101619...|    1.0|
|[2.07741143542266...|    1.0|
|[0.0,3.6906580274...|    1.0|
|[2.07741143542266...|    1.0|
|[0.29677306220323...|    0.0|
|[0.29677306220323...|    1.0|
+--------------------+-------+
only showing top 20 rows



In [18]:
train, test = assembled_data.randomSplit([0.7, 0.3])

In [19]:
train.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|(8,[0,1,6,7],[0.5...|    0.0|
|(8,[0,1,6,7],[0.8...|    0.0|
|(8,[0,1,6,7],[2.0...|    0.0|
|(8,[0,1,6,7],[2.9...|    1.0|
|(8,[1,5,6,7],[2.2...|    0.0|
|(8,[1,5,6,7],[3.6...|    0.0|
|(8,[1,5,6,7],[4.0...|    1.0|
|(8,[1,5,6,7],[4.4...|    1.0|
|(8,[1,5,6,7],[4.5...|    1.0|
|(8,[1,6,7],[2.940...|    0.0|
|[0.0,1.7827754878...|    0.0|
|[0.0,2.0955431172...|    0.0|
|[0.0,2.6272480873...|    0.0|
|[0.0,2.6272480873...|    0.0|
|[0.0,2.8461854279...|    0.0|
|[0.0,2.8461854279...|    0.0|
|[0.0,2.9087389538...|    0.0|
|[0.0,2.9087389538...|    0.0|
|[0.0,2.9087389538...|    0.0|
|[0.0,2.9712924797...|    0.0|
+--------------------+-------+
only showing top 20 rows



In [20]:
test.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|(8,[0,1,6,7],[0.5...|    0.0|
|(8,[0,1,6,7],[1.7...|    0.0|
|(8,[1,5,6,7],[3.0...|    0.0|
|(8,[1,5,6,7],[3.7...|    1.0|
|(8,[1,5,6,7],[4.3...|    1.0|
|(8,[1,5,6,7],[5.2...|    1.0|
|[0.0,2.3144804578...|    0.0|
|[0.0,2.4395875096...|    0.0|
|[0.0,2.6898016132...|    0.0|
|[0.0,2.9400157167...|    0.0|
|[0.0,3.0338460056...|    0.0|
|[0.0,3.1902298203...|    0.0|
|[0.0,3.1902298203...|    0.0|
|[0.0,3.1902298203...|    0.0|
|[0.0,3.2527833462...|    1.0|
|[0.0,3.2840601091...|    0.0|
|[0.0,3.2840601091...|    0.0|
|[0.0,3.3153368721...|    0.0|
|[0.0,3.3466136350...|    0.0|
|[0.0,3.3466136350...|    0.0|
+--------------------+-------+
only showing top 20 rows



## Logistic Regression

In [21]:
log_reg = LogisticRegression(labelCol="Outcome", featuresCol="Scaled_features",maxIter=40)
model=log_reg.fit(train)

#maxIter - how many iteration model has to perfrom to mimimize loss

In [22]:
prediction_test=model.transform(test)

In [23]:
prediction_test.show() # pyspark also gives probability 

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[0.5...|    0.0|[6.11522027913501...|[0.99779587975603...|       0.0|
|(8,[0,1,6,7],[1.7...|    0.0|[4.21007342669834...|[0.98537188024978...|       0.0|
|(8,[1,5,6,7],[3.0...|    0.0|[2.58774290337636...|[0.93006855564488...|       0.0|
|(8,[1,5,6,7],[3.7...|    1.0|[1.20495209874492...|[0.76940456264114...|       0.0|
|(8,[1,5,6,7],[4.3...|    1.0|[-0.7405481213005...|[0.32288429635456...|       1.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-1.2330653685001...|[0.22564536491400...|       1.0|
|[0.0,2.3144804578...|    0.0|[3.58535884161566...|[0.97302131402817...|       0.0|
|[0.0,2.4395875096...|    0.0|[2.59384736548992...|[0.93046455533136...|       0.0|
|[0.0,2.6898016132...|    0.0|[2.32868106226885...|[0.91122470012498...|    

In [24]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [25]:
# Compute raw scores on the test set
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [26]:
predictionAndLabels.collect()

[Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=1.0, prediction=1.0),
 Row(Outcome=1.0, prediction=1.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, pr

In [27]:
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 0.7410714285714285


In [28]:
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_LR = evaluator.evaluate(prediction_test)
print ("Accuracy = " ,accuracy_LR) # Accuracy 

Accuracy =  0.7459016393442623


## NaiveBayes

In [29]:
naive_bayes = NaiveBayes(featuresCol='Scaled_features',labelCol='Outcome',smoothing=1.0)  # smooth categorical data variables to get rid of 0 probability   

In [30]:
model = naive_bayes.fit(train) 

In [31]:
# select example rows to display.
prediction_test = model.transform(test)

In [32]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[0.5...|    0.0|[-10.252403111445...|[0.58726226499928...|       0.0|
|(8,[0,1,6,7],[1.7...|    0.0|[-17.053073767618...|[0.50279848574526...|       0.0|
|(8,[1,5,6,7],[3.0...|    0.0|[-15.831029606240...|[0.61333668157596...|       0.0|
|(8,[1,5,6,7],[3.7...|    1.0|[-17.716163242553...|[0.61676608015817...|       0.0|
|(8,[1,5,6,7],[4.3...|    1.0|[-25.854127812175...|[0.55821965526428...|       0.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-26.620742469296...|[0.54680510024776...|       0.0|
|[0.0,2.3144804578...|    0.0|[-22.249255486569...|[0.70736265910557...|       0.0|
|[0.0,2.4395875096...|    0.0|[-31.605893169364...|[0.76633663427380...|       0.0|
|[0.0,2.6898016132...|    0.0|[-28.565977001673...|[0.75676602488264...|    

In [33]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
|    1.0|       0.0|
|    1.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [34]:
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [37]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_NB = evaluator.evaluate(prediction_test)

In [38]:
print ("Accuracy",accuracy_NB)

Accuracy 0.6557377049180327


In [36]:
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 0.7422969187675071


## GBTClassifier

In [40]:
gradient_boost_class = GBTClassifier(labelCol="Outcome", featuresCol="Scaled_features")

In [41]:
model = gradient_boost_class.fit(train)

In [42]:
prediction_test = model.transform(test)

In [43]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[0.5...|    0.0|[1.55863695793772...|[0.95759968021031...|       0.0|
|(8,[0,1,6,7],[1.7...|    0.0|[0.99002182484175...|[0.87868581511282...|       0.0|
|(8,[1,5,6,7],[3.0...|    0.0|[1.55863695793772...|[0.95759968021031...|       0.0|
|(8,[1,5,6,7],[3.7...|    1.0|[1.53526807478471...|[0.95566089630662...|       0.0|
|(8,[1,5,6,7],[4.3...|    1.0|[-1.5968356571591...|[0.03940457825293...|       1.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-1.2762595968437...|[0.07225742587878...|       1.0|
|[0.0,2.3144804578...|    0.0|[1.58749557551826...|[0.95988222728447...|       0.0|
|[0.0,2.4395875096...|    0.0|[1.00853424486459...|[0.88257754431220...|       0.0|
|[0.0,2.6898016132...|    0.0|[1.19504048179240...|[0.91606779340716...|    

In [44]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [45]:
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [46]:
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 0.7704551433833202


In [47]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator( labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_GBT = evaluator.evaluate(prediction_test)

In [48]:
print ("Accuracy",accuracy_GBT)

Accuracy 0.7745901639344263


## RandomForestClassifier

In [50]:
random_forest_classifier = RandomForestClassifier(labelCol="Outcome", featuresCol="Scaled_features", numTrees=40)

In [51]:
model = random_forest_classifier.fit(train)

In [52]:
prediction_test = model.transform(test)

In [53]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[0.5...|    0.0|[38.8824078667804...|[0.97206019666951...|       0.0|
|(8,[0,1,6,7],[1.7...|    0.0|[32.8351333622286...|[0.82087833405571...|       0.0|
|(8,[1,5,6,7],[3.0...|    0.0|[38.9395507239233...|[0.97348876809808...|       0.0|
|(8,[1,5,6,7],[3.7...|    1.0|[31.4405975586591...|[0.78601493896647...|       0.0|
|(8,[1,5,6,7],[4.3...|    1.0|[16.7418583308680...|[0.41854645827170...|       1.0|
|(8,[1,5,6,7],[5.2...|    1.0|[9.36797262221872...|[0.23419931555546...|       1.0|
|[0.0,2.3144804578...|    0.0|[38.7637490072629...|[0.96909372518157...|       0.0|
|[0.0,2.4395875096...|    0.0|[34.9461539190584...|[0.87365384797646...|       0.0|
|[0.0,2.6898016132...|    0.0|[34.1633588747655...|[0.85408397186913...|    

In [54]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [56]:
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [57]:
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 0.7836484983314794


In [58]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator( labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_RF= evaluator.evaluate(prediction_test)

In [59]:
print ("Accuracy",accuracy_RF)

Accuracy 0.7786885245901639


In [60]:
print("Accuracy of GBT : ",accuracy_GBT)
print("Accuracy of LR : ",accuracy_LR)
print("Accuracy of NB : ",accuracy_NB)
print("Accuracy of RF : ",accuracy_RF)



Accuracy of GBT :  0.7745901639344263
Accuracy of LR :  0.7459016393442623
Accuracy of NB :  0.6557377049180327
Accuracy of RF :  0.7786885245901639
