In [1]:
!pip show py4j

Name: py4j
Version: 0.10.9.7
Summary: Enables Python programs to dynamically access arbitrary Java objects
Home-page: https://www.py4j.org/
Author: Barthelemy Dagenais
Author-email: barthelemy@infobart.com
License: BSD License
Location: /usr/local/lib/python3.9/dist-packages
Requires: 
Required-by: hyperopt


In [2]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# install spark
!wget -q https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
# unzip spark
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [7]:
# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

In [8]:
import findspark
findspark.init()

In [9]:
from pyspark.sql import SparkSession

In [10]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [11]:
'''
load models
'''
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [12]:
'''
load data
load the dataset to google Drive. Then copy the link of the data file
'''
data = spark.read.format("libsvm").load("/content/dataset.txt")

In [14]:
data.select("features").show(1, False)

+-------------------------------------------------+
|features                                         |
+-------------------------------------------------+
|(4,[0,1,2,3],[-0.222222,0.5,-0.762712,-0.833333])|
+-------------------------------------------------+
only showing top 1 row



In [15]:
data.dtypes

[('label', 'double'), ('features', 'vector')]

In [16]:
'''
label indexer 
map a string column of labels to an ML column of label indices
'''
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

In [17]:
'''
class for indexing categorical feature columns in a dataset of Vector
'''
featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

In [21]:
'''
split dataset to training and testing 
'''
(trainingData, testData) = data.randomSplit([0.7, 0.3])

**TASK 1**

In [18]:
# importing decesion tree classifier
from pyspark.ml.classification import DecisionTreeClassifier

In [19]:
dt = DecisionTreeClassifier( maxDepth=2,featuresCol="indexedFeatures",labelCol="indexedLabel")

In [20]:
dt_pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

In [22]:
dt_model = dt_pipeline.fit(trainingData)

In [23]:
dt_predictions = dt_model.transform(testData)

In [24]:
print(dt_model.stages[2])

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_268ec2b0d850, depth=2, numNodes=5, numClasses=3, numFeatures=4


In [25]:
dt_predictions.show(5)

+-----+--------------------+------------+--------------------+--------------+--------------------+----------+
|label|            features|indexedLabel|     indexedFeatures| rawPrediction|         probability|prediction|
+-----+--------------------+------------+--------------------+--------------+--------------------+----------+
|  0.0|(4,[0,1,2,3],[-1....|         0.0|(4,[0,1,2,3],[-1....|[37.0,0.0,1.0]|[0.97368421052631...|       0.0|
|  0.0|(4,[0,1,2,3],[0.1...|         0.0|(4,[0,1,2,3],[0.1...|[37.0,0.0,1.0]|[0.97368421052631...|       0.0|
|  0.0|(4,[0,1,2,3],[0.1...|         0.0|(4,[0,1,2,3],[0.1...|[37.0,0.0,1.0]|[0.97368421052631...|       0.0|
|  0.0|(4,[0,1,2,3],[0.1...|         0.0|(4,[0,1,2,3],[0.1...|[37.0,0.0,1.0]|[0.97368421052631...|       0.0|
|  0.0|(4,[0,1,2,3],[0.3...|         0.0|(4,[0,1,2,3],[0.3...|[37.0,0.0,1.0]|[0.97368421052631...|       0.0|
+-----+--------------------+------------+--------------------+--------------+--------------------+----------+
only showi

In [26]:
acc_evaluator_dt = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy",)
acc_dt = acc_evaluator_dt.evaluate(dt_predictions)
print("accurancy:"+str(acc_dt))

accurancy:0.8863636363636364


In [27]:
pr_evaluator_dt = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="precisionByLabel")
precision_dt = pr_evaluator_dt.evaluate(dt_predictions)
print("precision:"+str(precision_dt))

precision:0.7058823529411765


Task 1 A

In [28]:
f1_evaluator_dt = MulticlassClassificationEvaluator( labelCol='indexedLabel', predictionCol="prediction", metricName='f1')
f1 = f1_evaluator_dt.evaluate(dt_predictions)
print("F1 Score:"+str(f1))

F1 Score:0.8863636363636365


In [29]:
rec_evaluator_dt = MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol ="prediction", metricName="recallByLabel")
rec =rec_evaluator_dt.evaluate(dt_predictions)
print("recall: "+str(rec))

recall: 1.0


Task 1 B

In [30]:
#  changed depth to 5
dt_d5 = DecisionTreeClassifier( maxDepth=5,featuresCol="indexedFeatures",labelCol="indexedLabel")

In [31]:
dt_pipeline_d5 = Pipeline(stages=[labelIndexer, featureIndexer, dt_d5])

In [32]:
dt_model_d5 = dt_pipeline_d5.fit(trainingData)

In [33]:
dt_predictions_d5 = dt_model_d5.transform(testData)

In [36]:
print(dt_model_d5.stages[2])

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_a6f86ddfe766, depth=4, numNodes=11, numClasses=3, numFeatures=4


In [37]:
dt_predictions_d5.show(5)

+-----+--------------------+------------+--------------------+--------------+-------------+----------+
|label|            features|indexedLabel|     indexedFeatures| rawPrediction|  probability|prediction|
+-----+--------------------+------------+--------------------+--------------+-------------+----------+
|  0.0|(4,[0,1,2,3],[-1....|         0.0|(4,[0,1,2,3],[-1....|[36.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.1...|         0.0|(4,[0,1,2,3],[0.1...|[36.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.1...|         0.0|(4,[0,1,2,3],[0.1...|[36.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.1...|         0.0|(4,[0,1,2,3],[0.1...|[36.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.3...|         0.0|(4,[0,1,2,3],[0.3...|[36.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
+-----+--------------------+------------+--------------------+--------------+-------------+----------+
only showing top 5 rows



1 E using 1 B

In [66]:
from sklearn.metrics import confusion_matrix
from pyspark.sql.functions import col
actual_labels = testData.select(col("label")).rdd.flatMap(lambda x: x).collect()
print("actual_labels: "+str(actual_labels))
predicted_labels = dt_predictions_d5.select(col("prediction")).rdd.flatMap(lambda x: x).collect()
print("predicted_labels: "+str(predicted_labels))

cm = confusion_matrix(actual_labels, predicted_labels)
print()
print("confusion matrix")
print(cm)

from sklearn.metrics import classification_report
print()
print("confusion report")
print(classification_report(actual_labels, predicted_labels))

actual_labels: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0]
predicted_labels: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 0.0, 2.0, 2.0, 0.0, 2.0, 2.0, 0.0, 0.0, 0.0, 2.0]

confusion matrix
[[12  0  0]
 [ 0 15  0]
 [ 6  0 11]]

confusion report
              precision    recall  f1-score   support

         0.0       0.67      1.00      0.80        12
         1.0       1.00      1.00      1.00        15
         2.0       1.00      0.65      0.79        17

    accuracy                           0.86        44
   macro avg       0.89      0.88      0.86        44
weighted avg       0.91      0.86      0.86        44



In [39]:
acc_evaluator_dt_d5 = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy",)
acc_dt_d5 = acc_evaluator_dt_d5.evaluate(dt_predictions_d5)
print("accurancy:"+str(acc_dt_d5))

accurancy:0.8636363636363636


In [40]:
pr_evaluator_dt_d5 = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="precisionByLabel")
precision_dt_d5 = pr_evaluator_dt_d5.evaluate(dt_predictions_d5)
print("precision:"+str(precision_dt_d5))

precision:0.6666666666666666


In [41]:
f1_evaluator_dt_d5 = MulticlassClassificationEvaluator( labelCol='indexedLabel', predictionCol="prediction", metricName='f1')
f1_d5 = f1_evaluator_dt_d5.evaluate(dt_predictions_d5)
print("F1 Score:"+str(f1_d5))

F1 Score:0.8626623376623377


In [42]:
rec_evaluator_dt_d5 = MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol ="prediction", metricName="recallByLabel")
rec_d5 =rec_evaluator_dt_d5.evaluate(dt_predictions_d5)
print("recall: "+str(rec_d5))

recall: 1.0


1 D

In [51]:
for (trainingData2, testData2) in [data.randomSplit([0.6, 0.4]), data.randomSplit([0.7, 0.3]), data.randomSplit([0.8, 0.2])]:
  print('---------------------------------------------------')
  for depth in [2,5,10]: 
    print('for depth: '+str(depth))
    dt_t2 = DecisionTreeClassifier( maxDepth=depth,featuresCol="indexedFeatures",labelCol="indexedLabel")
    dt_pipeline_t2 = Pipeline(stages=[labelIndexer, featureIndexer, dt_t2])
    dt_model_t2 = dt_pipeline_t2.fit(trainingData2)
    dt_predictions_t2 = dt_model_t2.transform(testData2)
    print(dt_model_t2.stages[2])
  
    acc_evaluator_dt_t2 = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy",)
    acc_dt_t2 = acc_evaluator_dt.evaluate(dt_predictions_t2)
    print("accurancy:"+str(acc_dt_t2))

    pr_evaluator_dt_t2 = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="precisionByLabel")
    precision_dt_t2 = pr_evaluator_dt_t2.evaluate(dt_predictions_t2)
    print("precision:"+str(precision_dt_t2))

    f1_evaluator_dt_t2 = MulticlassClassificationEvaluator( labelCol='indexedLabel', predictionCol="prediction", metricName='f1')
    f1_t2 = f1_evaluator_dt_t2.evaluate(dt_predictions_t2)
    print("F1 Score:"+str(f1_t2))

    rec_evaluator_dt_t2 = MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol ="prediction", metricName="recallByLabel")
    rec_t2 =rec_evaluator_dt_d5.evaluate(dt_predictions_t2)
    print("recall: "+str(rec_t2))
    print()


---------------------------------------------------
for depth: 2
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_d442d92249ba, depth=2, numNodes=5, numClasses=3, numFeatures=4
accurancy:0.9857142857142858
precision:1.0
F1 Score:0.9856858422811634
recall: 0.9523809523809523

for depth: 5
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_60750e6ad2fe, depth=5, numNodes=17, numClasses=3, numFeatures=4
accurancy:0.9857142857142858
precision:0.9545454545454546
F1 Score:0.9857284229872058
recall: 1.0

for depth: 10
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_702f056f4ef6, depth=5, numNodes=17, numClasses=3, numFeatures=4
accurancy:0.9857142857142858
precision:0.9545454545454546
F1 Score:0.9857284229872058
recall: 1.0

---------------------------------------------------
for depth: 2
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_45aa06d89044, depth=2, numNodes=5, numClasses=3, numFeatures=4
accurancy:0.9607843137254902
precision:0.90476190476

Task 2

In [67]:
from pyspark.ml.classification import RandomForestClassifier

In [68]:
rf = RandomForestClassifier(numTrees=3,featuresCol="indexedFeatures",labelCol="indexedLabel")
rf_pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])
rf_model = rf_pipeline.fit(trainingData)
rf_predictions = rf_model.transform(testData)
print(rf_model.stages[2])
rf_predictions.show(5)

RandomForestClassificationModel: uid=RandomForestClassifier_2151600f7320, numTrees=3, numClasses=3, numFeatures=4
+-----+--------------------+------------+--------------------+-------------+-------------+----------+
|label|            features|indexedLabel|     indexedFeatures|rawPrediction|  probability|prediction|
+-----+--------------------+------------+--------------------+-------------+-------------+----------+
|  0.0|(4,[0,1,2,3],[-1....|         0.0|(4,[0,1,2,3],[-1....|[3.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.1...|         0.0|(4,[0,1,2,3],[0.1...|[3.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.1...|         0.0|(4,[0,1,2,3],[0.1...|[3.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.1...|         0.0|(4,[0,1,2,3],[0.1...|[3.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
|  0.0|(4,[0,1,2,3],[0.3...|         0.0|(4,[0,1,2,3],[0.3...|[3.0,0.0,0.0]|[1.0,0.0,0.0]|       0.0|
+-----+--------------------+------------+--------------------+--------

In [None]:
acc_evaluator_rf = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy",)
acc_rf = acc_evaluator_rf.evaluate(rf_predictions)
print("accurancy:"+str(acc_rf))

In [None]:
f_evaluator_rf = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
f1_score_rf = f_evaluator_rf.evaluate(rf_predictions)
print("f1 score:"+str(f1_score_rf))

In [None]:
pr_evaluator_dt_t2 = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="precisionByLabel")
precision_dt_t2 = pr_evaluator_dt_t2.evaluate(dt_predictions_t2)
print("precision:"+str(precision_dt_t2))