In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import RandomForestClassifier

from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score

from pyspark.python.pyspark.shell import spark

import scipy
import os
import numpy as np

from pyspark.ml.linalg import SparseVector
from pyspark.sql import SparkSession

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.3
      /_/

Using Python version 3.8.8 (default, Apr 13 2021 15:08:03)
Spark context Web UI available at http://LAPTOP-1IU0R79J:4040
Spark context available as 'sc' (master = local[*], app id = local-1651860213766).
SparkSession available as 'spark'.


# Decision Tree

In [3]:
# Đọc dữ liệu và tạo ra các cột mới
data_df = spark.read.load("Absenteeism_at_work.csv", format="csv", header=True, delimiter=";",inferSchema =True)
data_df = data_df.withColumnRenamed('Absenteeism time in hours','label')
data_df.limit(5).toPandas()

Unnamed: 0,ID,Reason for absence,Month of absence,Day of the week,Seasons,Transportation expense,Distance from Residence to Work,Service time,Age,Work load Average/day,...,Disciplinary failure,Education,Son,Social drinker,Social smoker,Pet,Weight,Height,Body mass index,label
0,11,26,7,3,1,289,36,13,33,239.554,...,0,1,2,1,0,1,90,172,30,4
1,36,0,7,3,1,118,13,18,50,239.554,...,1,1,1,1,0,0,98,178,31,0
2,3,23,7,4,1,179,51,18,38,239.554,...,0,1,0,1,0,0,89,170,31,2
3,7,7,7,5,1,279,5,14,39,239.554,...,0,1,2,1,1,0,68,168,24,4
4,11,23,7,5,1,289,36,13,33,239.554,...,0,1,2,1,0,1,90,172,30,2


In [4]:
# Tạo cột vector đặc trưng từ cột label và distance
assem = VectorAssembler(inputCols=data_df.columns[:-1], outputCol='features')
data = assem.transform(data_df)

# Đánh chỉ mục cho cột label, sau đó fit trên toàn bộ dữ liệu
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Chia tập dữ liệu train test với tỉ lệ 70-30
(trainingData, testData) = data.randomSplit([0.7, 0.3],seed=1234)

# Thiết lập mô hình cây quyết định 
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features")

# Thiết lập Pipeline với các bước labelIndexer, featureIndexer và dt
pipeline = Pipeline(stages=[labelIndexer, dt])

# Fit model với tập train
model = pipeline.fit(trainingData)

# Thực hiện dự đoán
predictions = model.transform(testData)

# In ra các dòng kết quả dự đoán
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

y_pred = predictions.select("prediction").rdd.flatMap(lambda x: x).collect()
y_true = predictions.select("indexedLabel").rdd.flatMap(lambda x: x).collect()

confusionmatrix = confusion_matrix(y_true, y_pred)

precision = precision_score(y_true, y_pred, average='micro')

recall = recall_score(y_true, y_pred, average='micro')
treeModel = model.stages[1]
# summary only
print(treeModel)
print("Decision Tree - Test Accuracy = %g" % (accuracy))
print("Decision Tree - Test Error = %g" % (1.0 - accuracy))

print("The Confusion Matrix for Decision Tree Model is :\n" + str(confusionmatrix))

print("The precision score for Decision Tree Model is: " + str(precision))

print("The recall score for Decision Tree Model is: " + str(recall))

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       5.0|         5.0|[1.0,0.0,3.0,5.0,...|
|       0.0|         0.0|[1.0,1.0,5.0,2.0,...|
|       0.0|         3.0|[1.0,13.0,1.0,3.0...|
|       0.0|         4.0|[1.0,26.0,10.0,2....|
|       5.0|         5.0|[2.0,0.0,6.0,2.0,...|
+----------+------------+--------------------+
only showing top 5 rows

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_d2919ce8e2b1, depth=5, numNodes=41, numClasses=19, numFeatures=20
Decision Tree - Test Accuracy = 0.418182
Decision Tree - Test Error = 0.581818
The Confusion Matrix for Decision Tree Model is :
[[42  8  8  8  2  0  0  0  0  0  0  0  0  0  0]
 [ 4 20 11  6  0  0  0  0  0  0  0  0  0  0  0]
 [ 4 11 10  5  6  0  0  0  0  0  0  0  0  0  0]
 [ 4  8  2  5  0  0  0  0  0  0  0  0  0  0  0]
 [ 1  4  5  4  1  0  0  0  0  0  0  0  0  0  0]
 [ 0  0  0  0  0 14  0  0  0  0  0  0  0  0  0]
 [ 5  1  

# Naive Bayes

In [5]:

spark = SparkSession.builder.getOrCreate()
data = spark.read.load("Absenteeism_at_work.csv", format="csv", header=True, delimiter=";")
data = data.withColumn("MOA", data["Month of absence"] - 0).withColumn("label", data['Seasons'] - 0). \
    withColumn("ROA", data["Reason for absence"] - 0). \
    withColumn("distance", data["Distance from Residence to Work"] - 0). \
    withColumn("BMI", data["Body mass index"] - 0)

assem = VectorAssembler(inputCols=["label", "MOA"], outputCol='features')

data = assem.transform(data)
# Split the data into train and test
splits = data.randomSplit([0.7, 0.3], 1000)
train = splits[0]
test = splits[1]

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train)

# select example rows to display.
predictions = model.transform(test)

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")

y_true = data.select("BMI").rdd.flatMap(lambda x: x).collect()
y_pred = data.select("ROA").rdd.flatMap(lambda x: x).collect()


accuracy = evaluator.evaluate(predictions)

confusionmatrix = confusion_matrix(y_true, y_pred)

precision = precision_score(y_true, y_pred, average='micro')

recall = recall_score(y_true, y_pred, average='micro')


print("Naive Bayes - Test set accuracy = " + str(accuracy))

print("The Confusion Matrix for Naive Bayes Model is :\n" + str(confusionmatrix))

print("The precision score for Naive Bayes Model is: " + str(precision))

print("The recall score for Naive Bayes Model is: " + str(recall))

Naive Bayes - Test set accuracy = 0.061224489795918366
The Confusion Matrix for Naive Bayes Model is :
[[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [2 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [5 0 0 ... 0 0 0]]
The precision score for Naive Bayes Model is: 0.02972972972972973
The recall score for Naive Bayes Model is: 0.02972972972972973


# RandomForrest

In [6]:
data = spark.read.load("Absenteeism_at_work.csv", format="csv", header=True, delimiter=";")
data = data.withColumn("MOA", data["Month of absence"] - 0).withColumn("label", data['Transportation expense'] - 0). \
    withColumn("ROA", data["Reason for absence"] - 0). \
    withColumn("distance", data["Distance from Residence to Work"] - 0). \
    withColumn("BMI", data["Body mass index"] - 0)

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.

assem = VectorAssembler(inputCols=["label", "distance"], outputCol='features')

data = assem.transform(data)

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

y_true = data.select("BMI").rdd.flatMap(lambda x: x).collect()
y_pred = data.select("ROA").rdd.flatMap(lambda x: x).collect()

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

confusionmatrix = confusion_matrix(y_true, y_pred)

precision = precision_score(y_true, y_pred, average='micro')

recall = recall_score(y_true, y_pred, average='micro')

rfModel = model.stages[2]
print(rfModel)  # summary only
print("Random Forest - Test Accuracy = %g" % (accuracy))
print("Random Forest - Test Error = %g" % (1.0 - accuracy))

print("The Confusion Matrix for Random Forest Model is :\n" + str(confusionmatrix))

print("The precision score for Random Forest Model is: " + str(precision))

print("The recall score for Random Forest Model is: " + str(recall))

+--------------+-----+------------+
|predictedLabel|label|    features|
+--------------+-----+------------+
|         235.0|235.0|[235.0,11.0]|
|         235.0|235.0|[235.0,11.0]|
|         235.0|235.0|[235.0,11.0]|
|         235.0|235.0|[235.0,11.0]|
|         235.0|235.0|[235.0,11.0]|
+--------------+-----+------------+
only showing top 5 rows

RandomForestClassificationModel: uid=RandomForestClassifier_dfe76c412858, numTrees=10, numClasses=24, numFeatures=2
Random Forest - Test Accuracy = 0.909524
Random Forest - Test Error = 0.0904762
The Confusion Matrix for Random Forest Model is :
[[0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 ...
 [2 0 0 ... 0 0 0]
 [0 0 0 ... 0 0 0]
 [5 0 0 ... 0 0 0]]
The precision score for Random Forest Model is: 0.02972972972972973
The recall score for Random Forest Model is: 0.02972972972972973
