Pyspark Project on ML Classification 

In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 63.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=93b3bebe53d2f3c9897253b99d7fbb179c346335d93a9cc8c6f11ee24b545f13
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


Importing thr Required Modules

In [33]:
from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import RandomForestClassifier

from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

Creating the Spark Session for the SQL DataFrame

In [34]:
spark = SparkSession.builder.appName("Diabities Classification").master("local").getOrCreate()

Importing the Diabetes Dataset

In [35]:
df = spark.read.csv("diabetes.csv", header = True, inferSchema=True)

In [36]:
df.show()

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          6|    148|           72|           35|      0|33.6|                   0.627| 50|      1|
|          1|     85|           66|           29|      0|26.6|                   0.351| 31|      0|
|          8|    183|           64|            0|      0|23.3|                   0.672| 32|      1|
|          1|     89|           66|           23|     94|28.1|                   0.167| 21|      0|
|          0|    137|           40|           35|    168|43.1|                   2.288| 33|      1|
|          5|    116|           74|            0|      0|25.6|                   0.201| 30|      0|
|          3|     78|           50|           32|     88|31.0|                   0.248| 26|      1|


In [37]:
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [38]:
from pyspark.sql.functions import col
new_data = df.select(*(col(c).cast("float").alias(c) for c in df.columns))

checking for null or nan type values in our columns


---



In [39]:
from pyspark.sql.functions import col, count, isnan, when
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin|BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+
|          0|      0|            0|            0|      0|  0|                       0|  0|      0|
+-----------+-------+-------------+-------------+-------+---+------------------------+---+-------+



Applying the Vector Assembler method on our Dataset

In [40]:
cols=df.columns
cols.remove("Outcome")
assembler = VectorAssembler(inputCols=cols,outputCol="features")
data=assembler.transform(df)
data.select("features",'Outcome').show(truncate=False)

+-------------------------------------------+-------+
|features                                   |Outcome|
+-------------------------------------------+-------+
|[6.0,148.0,72.0,35.0,0.0,33.6,0.627,50.0]  |1      |
|[1.0,85.0,66.0,29.0,0.0,26.6,0.351,31.0]   |0      |
|[8.0,183.0,64.0,0.0,0.0,23.3,0.672,32.0]   |1      |
|[1.0,89.0,66.0,23.0,94.0,28.1,0.167,21.0]  |0      |
|[0.0,137.0,40.0,35.0,168.0,43.1,2.288,33.0]|1      |
|[5.0,116.0,74.0,0.0,0.0,25.6,0.201,30.0]   |0      |
|[3.0,78.0,50.0,32.0,88.0,31.0,0.248,26.0]  |1      |
|[10.0,115.0,0.0,0.0,0.0,35.3,0.134,29.0]   |0      |
|[2.0,197.0,70.0,45.0,543.0,30.5,0.158,53.0]|1      |
|[8.0,125.0,96.0,0.0,0.0,0.0,0.232,54.0]    |1      |
|[4.0,110.0,92.0,0.0,0.0,37.6,0.191,30.0]   |0      |
|[10.0,168.0,74.0,0.0,0.0,38.0,0.537,34.0]  |1      |
|[10.0,139.0,80.0,0.0,0.0,27.1,1.441,57.0]  |0      |
|[1.0,189.0,60.0,23.0,846.0,30.1,0.398,59.0]|1      |
|[5.0,166.0,72.0,19.0,175.0,25.8,0.587,51.0]|1      |
|[7.0,100.0,0.0,0.0,0.0,30.0

Appling the StandarScalar Method on our Dataset (This method will scale our values  ranging between -3 to +3 range, it is a Statistics Concept)

In [41]:
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
data=standardscaler.fit(data).transform(data)

In [42]:
data.select("features",'Outcome','Scaled_features').show(truncate=False)

+-------------------------------------------+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                   |Outcome|Scaled_features                                                                                                                                         |
+-------------------------------------------+-------+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|[6.0,148.0,72.0,35.0,0.0,33.6,0.627,50.0]  |1      |[1.7806383732194306,4.628960915766174,3.7198138711154307,2.1940523222807116,0.0,4.261709381170972,1.8923811872495484,4.251616970894646]                 |
|[1.0,85.0,66.0,29.0,0.0,26.6,0.351,31.0]   |0      |[0.29677306220323846,2.658524850271114,3.4098293818558116,1.8179290670325896,0.0,3.3738532600936866,1.0593712866420917,

In [43]:
assembled_data = data.select("Scaled_features","Outcome")
assembled_data.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|[1.78063837321943...|      1|
|[0.29677306220323...|      0|
|[2.37418449762590...|      1|
|[0.29677306220323...|      0|
|[0.0,4.2849165233...|      1|
|[1.48386531101619...|      0|
|[0.89031918660971...|      1|
|[2.96773062203238...|      0|
|[0.59354612440647...|      1|
|[2.37418449762590...|      1|
|[1.18709224881295...|      0|
|[2.96773062203238...|      1|
|[2.96773062203238...|      0|
|[0.29677306220323...|      1|
|[1.48386531101619...|      1|
|[2.07741143542266...|      1|
|[0.0,3.6906580274...|      1|
|[2.07741143542266...|      1|
|[0.29677306220323...|      0|
|[0.29677306220323...|      1|
+--------------------+-------+
only showing top 20 rows



Splitting the Dataset into specified ratio for train and test

In [44]:
train, test = assembled_data.randomSplit([0.7, 0.3])

In [45]:
train.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|(8,[0,1,6,7],[0.5...|      0|
|(8,[0,1,6,7],[0.8...|      0|
|(8,[0,1,6,7],[1.7...|      0|
|(8,[0,1,6,7],[2.0...|      0|
|(8,[1,5,6,7],[3.0...|      0|
|(8,[1,5,6,7],[3.6...|      0|
|(8,[1,5,6,7],[3.7...|      1|
|(8,[1,5,6,7],[4.3...|      1|
|(8,[1,5,6,7],[4.4...|      1|
|(8,[1,6,7],[2.940...|      0|
|[0.0,1.7827754878...|      0|
|[0.0,2.0955431172...|      0|
|[0.0,2.3144804578...|      0|
|[0.0,2.6272480873...|      0|
|[0.0,2.6272480873...|      0|
|[0.0,2.6898016132...|      0|
|[0.0,2.8461854279...|      0|
|[0.0,2.8461854279...|      0|
|[0.0,2.9087389538...|      0|
|[0.0,2.9087389538...|      0|
+--------------------+-------+
only showing top 20 rows



In [46]:
train.count()

538

Training Dataset has 538 Rows

In [47]:
test.show()

+--------------------+-------+
|     Scaled_features|Outcome|
+--------------------+-------+
|(8,[0,1,6,7],[0.5...|      0|
|(8,[0,1,6,7],[2.9...|      1|
|(8,[1,5,6,7],[2.2...|      0|
|(8,[1,5,6,7],[4.0...|      1|
|(8,[1,5,6,7],[4.5...|      1|
|(8,[1,5,6,7],[5.2...|      1|
|[0.0,2.4395875096...|      0|
|[0.0,3.0338460056...|      0|
|[0.0,3.1902298203...|      0|
|[0.0,3.1902298203...|      0|
|[0.0,3.2840601091...|      0|
|[0.0,3.2840601091...|      0|
|[0.0,3.5655509756...|      0|
|[0.0,3.6593812644...|      0|
|[0.0,3.7219347903...|      0|
|[0.0,3.7532115533...|      0|
|[0.0,3.9095953680...|      0|
|[0.0,3.9408721309...|      0|
|[0.0,3.9408721309...|      0|
|[0.0,4.0034256568...|      1|
+--------------------+-------+
only showing top 20 rows



In [48]:
test.count()

230

Test Dataset has 230 rows

Upon Applying the Logistic Classification Algorithmn

LOGISTIC REGRESSION

In [49]:
log_reg = LogisticRegression(labelCol="Outcome", featuresCol="Scaled_features",maxIter=40)
model=log_reg.fit(train)

In [50]:
prediction_test=model.transform(test)

In [51]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[0.5...|      0|[4.46915042614835...|[0.98867273170317...|       0.0|
|(8,[0,1,6,7],[2.9...|      1|[2.52770923822832...|[0.92606165458711...|       0.0|
|(8,[1,5,6,7],[2.2...|      0|[3.26401594081078...|[0.96317350261805...|       0.0|
|(8,[1,5,6,7],[4.0...|      1|[-0.2383777043663...|[0.44068617863428...|       1.0|
|(8,[1,5,6,7],[4.5...|      1|[-1.1848116846418...|[0.23418814573015...|       1.0|
|(8,[1,5,6,7],[5.2...|      1|[-1.1206438359115...|[0.24589187813055...|       1.0|
|[0.0,2.4395875096...|      0|[2.44911285582019...|[0.92049655146820...|       0.0|
|[0.0,3.0338460056...|      0|[1.48727822238109...|[0.81566939715901...|       0.0|
|[0.0,3.1902298203...|      0|[1.12939396512428...|[0.75572704016523...|    

In [52]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|      0|       0.0|
|      1|       0.0|
|      0|       0.0|
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
+-------+----------+
only showing top 10 rows



To compute the Accuracy we can acheive it by Ml module, so we are using MLlib module to get the maccuracy, so we are converting it to RDD Dataframe

In [65]:
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [66]:
predictionAndLabels.collect()

[Row(Outcome=0, prediction=0.0),
 Row(Outcome=1, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=1, prediction=1.0),
 Row(Outcome=1, prediction=1.0),
 Row(Outcome=1, prediction=1.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=1, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=1, prediction=0.0),
 Row(Outcome=0, prediction=0.0),
 Row(Outcome=0, prediction=1.0),
 Row(Outcome=0, prediction=1.0),
 Row(Outcome=1, prediction=1.0),
 Row(Outcome=1, prediction=1.0),
 Row(Outcome=1, prediction=1.0),
 Row(Outcome=1, prediction=1.0),
 Row(Outcome=1, prediction=1.0),
 Row(Outco

In [69]:
evaluator = MulticlassClassificationEvaluator(labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_LR = evaluator.evaluate(prediction_test)
print ("Accuracy = " ,accuracy_LR)

Accuracy =  0.8043478260869565


The Accuracy of our Logistic Regression Model is around 80%

RANDOM FOREST CLASSIFIER

In [70]:
random_forest_classifier = RandomForestClassifier(labelCol="Outcome", featuresCol="Scaled_features", numTrees=40)

In [71]:
model = random_forest_classifier.fit(train)

In [72]:
prediction_test = model.transform(test)

In [73]:
prediction_test.show()

+--------------------+-------+--------------------+--------------------+----------+
|     Scaled_features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|(8,[0,1,6,7],[0.5...|      0|[38.4793988996654...|[0.96198497249163...|       0.0|
|(8,[0,1,6,7],[2.9...|      1|[31.1025992160118...|[0.77756498040029...|       0.0|
|(8,[1,5,6,7],[2.2...|      0|[38.3980173178952...|[0.95995043294738...|       0.0|
|(8,[1,5,6,7],[4.0...|      1|[19.1759591722805...|[0.47939897930701...|       1.0|
|(8,[1,5,6,7],[4.5...|      1|[12.5417953109145...|[0.31354488277286...|       1.0|
|(8,[1,5,6,7],[5.2...|      1|[11.2578682531015...|[0.28144670632753...|       1.0|
|[0.0,2.4395875096...|      0|[35.7813219342886...|[0.89453304835721...|       0.0|
|[0.0,3.0338460056...|      0|[31.6414208052347...|[0.79103552013086...|       0.0|
|[0.0,3.1902298203...|      0|[30.0463563138277...|[0.75115890784569...|    

In [74]:
prediction_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|      0|       0.0|
|      1|       0.0|
|      0|       0.0|
|      1|       1.0|
|      1|       1.0|
|      1|       1.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
+-------+----------+
only showing top 10 rows



Converting it to RDD

In [75]:
predictionAndLabels = prediction_test.select("Outcome","prediction").rdd

In [77]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator( labelCol="Outcome", predictionCol="prediction", metricName="accuracy")
accuracy_RF= evaluator.evaluate(prediction_test)

In [78]:
print(accuracy_RF)

0.782608695652174


The Accuracy of Random Forest Classifier is around 78%

COMPARISON

In [79]:
print("Accuracy of LR : ",accuracy_LR)
print("Accuracy of RF : ",accuracy_RF)


Accuracy of LR :  0.8043478260869565
Accuracy of RF :  0.782608695652174
