In [1]:

from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier

from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Pyspark mllib packages are obsolete (there were used for RDD) and Pyspark.ml packages are used for new type (SQL,and all)

In [2]:
spark = SparkSession.builder.appName("Classification with Spark").getOrCreate()

24/09/27 01:44:27 WARN Utils: Your hostname, wtc12 resolves to a loopback address: 127.0.1.1; using 10.64.26.83 instead (on interface enp3s0)
24/09/27 01:44:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/27 01:44:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
dataset = spark.read.csv("/spark_tutorial/diabetes.csv",header=True)

In [4]:
dataset.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|  31|                   0.248| 26|      1|


In [5]:
dataset.printSchema()

root
 |-- Pregnancies: string (nullable = true)
 |-- Glucose: string (nullable = true)
 |-- BloodPressure: string (nullable = true)
 |-- SkinThickness: string (nullable = true)
 |-- Insulin: string (nullable = true)
 |-- BMI: string (nullable = true)
 |-- DiabetesPedigreeFunction: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Outcome: string (nullable = true)



In [6]:
from pyspark.sql.functions import col
new_data = dataset.select(*(col(c).cast("float").alias(c) for c in dataset.columns))

In [7]:
new_data.printSchema()

root
 |-- Pregnancies: float (nullable = true)
 |-- Glucose: float (nullable = true)
 |-- BloodPressure: float (nullable = true)
 |-- SkinThickness: float (nullable = true)
 |-- Insulin: float (nullable = true)
 |-- BMI: float (nullable = true)
 |-- DiabetesPedigreeFunction: float (nullable = true)
 |-- Age: float (nullable = true)
 |-- Outcome: float (nullable = true)



In [8]:
from pyspark.sql.functions import col, count, isnan, when
#checking for null ir nan type values in our columns
new_data.select([count(when(col(c).isNull(), c)).alias(c) for c in new_data.columns]).show()

+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|          0|      0|            0|            0|      0|  0|                       0|  0|      0|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



In [9]:
cols=new_data.columns
cols.remove("Outcome")
assembler = VectorAssembler(inputCols=cols,outputCol="features")

# Now let us use the transform method to transform our dataset
data=assembler.transform(new_data)

data.select("features",'Outcome').show(truncate=False)

+-----------------------------------------------------------------------+-------+
|features                                                               |Outcome|
+-----------------------------------------------------------------------+-------+
|[6.0,148.0,72.0,35.0,0.0,33.599998474121094,0.6269999742507935,50.0]   |1.0    |
|[1.0,85.0,66.0,29.0,0.0,26.600000381469727,0.35100001096725464,31.0]   |0.0    |
|[8.0,183.0,64.0,0.0,0.0,23.299999237060547,0.671999990940094,32.0]     |1.0    |
|[1.0,89.0,66.0,23.0,94.0,28.100000381469727,0.16699999570846558,21.0]  |0.0    |
|[0.0,137.0,40.0,35.0,168.0,43.099998474121094,2.2880001068115234,33.0] |1.0    |
|[5.0,116.0,74.0,0.0,0.0,25.600000381469727,0.20100000500679016,30.0]   |0.0    |
|[3.0,78.0,50.0,32.0,88.0,31.0,0.24799999594688416,26.0]                |1.0    |
|[10.0,115.0,0.0,0.0,0.0,35.29999923706055,0.1340000033378601,29.0]     |0.0    |
|[2.0,197.0,70.0,45.0,543.0,30.5,0.15800000727176666,53.0]              |1.0    |
|[8.0,125.0,96.0

In [10]:
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
data=standardscaler.fit(data).transform(data)


In [11]:
data.select("features",'Outcome','Scaled_features').show(truncate=False)

+-----------------------------------------------------------------------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                               |Outcome|Scaled_features                                                                                                                                          |
+-----------------------------------------------------------------------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------+
|[6.0,148.0,72.0,35.0,0.0,33.599998474121094,0.6269999742507935,50.0]   |1.0    |[1.7806383732194306,4.628960915766174,3.7198138711154307,2.1940523222807116,0.0,4.261709202425419,1.8923810993699686,4.251616970894646]                  |
|[1.0,85.0,66.0,29.0,0.0,26.600000381469727,0.3510000109

In [12]:
assembled_data = data.select("Scaled_features","Outcome")
assembled_data.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|[1.78063837321943...|    1.0|
|[0.29677306220323...|    0.0|
|[2.37418449762590...|    1.0|
|[0.29677306220323...|    0.0|
|[0.0,4.2849165233...|    1.0|
|[1.48386531101619...|    0.0|
|[0.89031918660971...|    1.0|
|[2.96773062203238...|    0.0|
|[0.59354612440647...|    1.0|
|[2.37418449762590...|    1.0|
|[1.18709224881295...|    0.0|
|[2.96773062203238...|    1.0|
|[2.96773062203238...|    0.0|
|[0.29677306220323...|    1.0|
|[1.48386531101619...|    1.0|
|[2.07741143542266...|    1.0|
|[0.0,3.6906580274...|    1.0|
|[2.07741143542266...|    1.0|
|[0.29677306220323...|    0.0|
|[0.29677306220323...|    1.0|
+--------------------+-------+
only showing top 20 rows



In [13]:
train, test = assembled_data.randomSplit([0.7, 0.3])

In [14]:
train.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|(8,[0,1,6,7],[0.5...|    0.0|
|(8,[0,1,6,7],[0.5...|    0.0|
|(8,[0,1,6,7],[0.8...|    0.0|
|(8,[0,1,6,7],[2.9...|    1.0|
|(8,[1,5,6,7],[2.2...|    0.0|
|(8,[1,5,6,7],[3.0...|    0.0|
|(8,[1,5,6,7],[3.6...|    0.0|
|(8,[1,5,6,7],[4.4...|    1.0|
|[0.0,2.0955431172...|    0.0|
|[0.0,2.3144804578...|    0.0|
|[0.0,2.4395875096...|    0.0|
|[0.0,2.6272480873...|    0.0|
|[0.0,2.6272480873...|    0.0|
|[0.0,2.6898016132...|    0.0|
|[0.0,2.8461854279...|    0.0|
|[0.0,2.8461854279...|    0.0|
|[0.0,2.9087389538...|    0.0|
|[0.0,2.9087389538...|    0.0|
|[0.0,2.9087389538...|    0.0|
|[0.0,2.9712924797...|    0.0|
+--------------------+-------+
only showing top 20 rows



In [15]:
test.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|(8,[0,1,6,7],[1.7...|    0.0|
|(8,[0,1,6,7],[2.0...|    0.0|
|(8,[1,5,6,7],[3.7...|    1.0|
|(8,[1,5,6,7],[4.0...|    1.0|
|(8,[1,5,6,7],[4.3...|    1.0|
|(8,[1,5,6,7],[4.5...|    1.0|
|(8,[1,5,6,7],[5.2...|    1.0|
|(8,[1,6,7],[2.940...|    0.0|
|[0.0,1.7827754878...|    0.0|
|[0.0,2.9400157167...|    0.0|
|[0.0,2.9712924797...|    0.0|
|[0.0,3.0338460056...|    0.0|
|[0.0,3.1276762944...|    0.0|
|[0.0,3.1276762944...|    0.0|
|[0.0,3.1902298203...|    0.0|
|[0.0,3.2527833462...|    0.0|
|[0.0,3.2527833462...|    1.0|
|[0.0,3.2840601091...|    0.0|
|[0.0,3.3466136350...|    1.0|
|[0.0,3.3466136350...|    0.0|
+--------------------+-------+
only showing top 20 rows



## Logistic Regression

In [16]:
log_reg = LogisticRegression(labelCol="Outcome", featuresCol="Scaled_features",maxIter=40)
model=log_reg.fit(train)

In [17]:
prediction_test=model.transform(test)

In [18]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[1.7...|    0.0|[2.74165641975509...|[0.93944040256016...|       0.0|
|(8,[0,1,6,7],[2.0...|    0.0|[2.76085518148179...|[0.94052349009294...|       0.0|
|(8,[1,5,6,7],[3.7...|    1.0|[1.43667634760414...|[0.80793943710572...|       0.0|
|(8,[1,5,6,7],[4.0...|    1.0|[0.16180101843823...|[0.54036223750615...|       0.0|
|(8,[1,5,6,7],[4.3...|    1.0|[-0.3119008919723...|[0.42265082064164...|       1.0|
|(8,[1,5,6,7],[4.5...|    1.0|[-0.7793653845149...|[0.31445667634262...|       1.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-0.9275504118156...|[0.28342194699589...|       1.0|
|(8,[1,6,7],[2.940...|    0.0|[4.32619672599891...|[0.98695470643648...|       0.0|
|[0.0,1.7827754878...|    0.0|[3.86118564328458...|[0.97939064812363...|    

In [19]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
|    1.0|       0.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [20]:
# Compute raw scores on the test set (as there is no feature to take rocScore so we convert data frame to RDD)
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [21]:
predictionAndLabels.collect()

[Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=1.0, prediction=1.0),
 Row(Outcome=1.0, prediction=1.0),
 Row(Outcome=1.0, prediction=1.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=1.0, prediction=0.0),
 Row(Outcome=0.0, prediction=0.0),
 Row(Outcome=0.0, pr

In [22]:
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)



Area under ROC = 0.7707631874298542


In [23]:
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_LR = evaluator.evaluate(prediction_test)
print ("Accuracy = " ,accuracy_LR)

Accuracy =  0.7763157894736842


## NaiveBayes

In [24]:
naive_bayes = NaiveBayes(featuresCol='Scaled_features',labelCol='Outcome',smoothing=1.0) #smoothing is to avoid 0 probablity

In [25]:
model = naive_bayes.fit(train) 

In [26]:
# select example rows to display.
prediction_test = model.transform(test)

In [27]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[1.7...|    0.0|[-17.019785097966...|[0.50027939738187...|       0.0|
|(8,[0,1,6,7],[2.0...|    0.0|[-18.017597775385...|[0.47664061422733...|       1.0|
|(8,[1,5,6,7],[3.7...|    1.0|[-17.648466121336...|[0.67635070763632...|       0.0|
|(8,[1,5,6,7],[4.0...|    1.0|[-21.673723585045...|[0.68520756437847...|       0.0|
|(8,[1,5,6,7],[4.3...|    1.0|[-25.794538096507...|[0.63523650163303...|       0.0|
|(8,[1,5,6,7],[4.5...|    1.0|[-26.226674928292...|[0.66607864934766...|       0.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-26.559631006772...|[0.62320306107208...|       0.0|
|(8,[1,6,7],[2.940...|    0.0|[-11.223504694432...|[0.62189563329256...|       0.0|
|[0.0,1.7827754878...|    0.0|[-28.966544566134...|[0.75914572922253...|    

In [28]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    0.0|       1.0|
|    1.0|       0.0|
|    1.0|       0.0|
|    1.0|       0.0|
|    1.0|       0.0|
|    1.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [29]:
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [30]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_NB = evaluator.evaluate(prediction_test)

In [31]:
print ("Accuracy",accuracy_NB)

Accuracy 0.6403508771929824


In [32]:
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)



Area under ROC = 0.6215962441314553


## GBTClassifier

In [33]:
gradient_boost_class = GBTClassifier(labelCol="Outcome", featuresCol="Scaled_features")

In [34]:
model = gradient_boost_class.fit(train)

In [35]:
prediction_test = model.transform(test)

In [36]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[1.7...|    0.0|[1.39577907205248...|[0.94221793370386...|       0.0|
|(8,[0,1,6,7],[2.0...|    0.0|[-0.8292013154256...|[0.15997654130140...|       1.0|
|(8,[1,5,6,7],[3.7...|    1.0|[1.48062351049074...|[0.95079237014616...|       0.0|
|(8,[1,5,6,7],[4.0...|    1.0|[-1.2689195637823...|[0.07324772363447...|       1.0|
|(8,[1,5,6,7],[4.3...|    1.0|[-1.0783241201371...|[0.10371160239755...|       1.0|
|(8,[1,5,6,7],[4.5...|    1.0|[-0.3373360517403...|[0.33745146936192...|       1.0|
|(8,[1,5,6,7],[5.2...|    1.0|[-1.3459638224350...|[0.06345137102943...|       1.0|
|(8,[1,6,7],[2.940...|    0.0|[1.54382176867492...|[0.95638016580551...|       0.0|
|[0.0,1.7827754878...|    0.0|[-0.0347950513347...|[0.48260949197005...|    

In [37]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    0.0|       1.0|
|    1.0|       0.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       1.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [38]:
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [39]:
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 0.7078887195121951


In [40]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator( labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_GBT = evaluator.evaluate(prediction_test)

In [41]:
print ("Accuracy",accuracy_GBT)

Accuracy 0.7236842105263158


## RandomForestClassifier

In [42]:
random_forest_classifier = RandomForestClassifier(labelCol="Outcome", featuresCol="Scaled_features", numTrees=40)

In [43]:
model = random_forest_classifier.fit(train)

In [44]:
prediction_test = model.transform(test)

In [45]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[1.7...|    0.0|[35.2061375332894...|[0.88015343833223...|       0.0|
|(8,[0,1,6,7],[2.0...|    0.0|[24.2248124898267...|[0.60562031224566...|       0.0|
|(8,[1,5,6,7],[3.7...|    1.0|[35.0557799921333...|[0.87639449980333...|       0.0|
|(8,[1,5,6,7],[4.0...|    1.0|[23.3801758287784...|[0.58450439571946...|       0.0|
|(8,[1,5,6,7],[4.3...|    1.0|[23.6343895783498...|[0.59085973945874...|       0.0|
|(8,[1,5,6,7],[4.5...|    1.0|[15.8202812737204...|[0.39550703184301...|       1.0|
|(8,[1,5,6,7],[5.2...|    1.0|[8.83425099282291...|[0.22085627482057...|       1.0|
|(8,[1,6,7],[2.940...|    0.0|[38.0745040401652...|[0.95186260100413...|       0.0|
|[0.0,1.7827754878...|    0.0|[34.5812908799117...|[0.86453227199779...|    

In [46]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|    0.0|       0.0|
|    0.0|       0.0|
|    1.0|       0.0|
|    1.0|       0.0|
|    1.0|       0.0|
|    1.0|       1.0|
|    1.0|       1.0|
|    0.0|       0.0|
|    0.0|       0.0|
|    0.0|       0.0|
+-------+----------+
only showing top 10 rows



In [47]:
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [48]:
metrics = BinaryClassificationMetrics(predictionAndLabels)

# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 0.7534978624174116


In [49]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator( labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_RF= evaluator.evaluate(prediction_test)

In [50]:
print ("Accuracy",accuracy_RF)

Accuracy 0.7587719298245614


In [51]:
print("Accuracy of GBT : ",accuracy_GBT)
print("Accuracy of LR : ",accuracy_LR)
print("Accuracy of NB : ",accuracy_NB)
print("Accuracy of RF : ",accuracy_RF)



Accuracy of GBT :  0.7236842105263158
Accuracy of LR :  0.7763157894736842
Accuracy of NB :  0.6403508771929824
Accuracy of RF :  0.7587719298245614


In [52]:
spark.stop()