# Lab 03 - Spark

## Information

| Student name       | ID       |
|--------------------|----------|
| Nguyễn Thiên Phúc  | 20127681 |
| Lê Nguyễn Nhật Phú | 20127275 |
| Võ Hiền Hải Thuận  | 20127344 |


## Note

**`Restart Kernel` and `Run all cells` before submitting on Moodle or your work will be counted as 0!**


## Assignment

### Import libraries

In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
import scipy
import os
os.environ["SPARK_HOME"] = "C:\\Spark\\"
os.environ["HADOOP_HOME"] = "C:\\Hadoop\\"
from pyspark.python.pyspark.shell import spark

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.4.0
      /_/

Using Python version 3.10.4 (tags/v3.10.4:9d38120, Mar 23 2022 23:13:41)
Spark context Web UI available at http://Phuc-Nguyen:4040
Spark context available as 'sc' (master = local[*], app id = local-1682064970227).
SparkSession available as 'spark'.


### Requirement 1

**Problem 1**: Decision Tree

In [None]:
data = spark.read.csv('Absenteeism_at_work.csv', header=True, inferSchema=True)

feature_columns = ['Month of absence', 'Reason for absence', 'Distance from Residence to Work', 'Body mass index']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

(training_data, testing_data) = data.randomSplit([0.7, 0.3], seed=100)

dt = DecisionTreeClassifier(labelCol='target', featuresCol='features', maxDepth=3)

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[assembler, dt])

model = pipeline.fit(training_data)

# Make predictions.
predictions = model.transform(testing_data)

evaluator = MulticlassClassificationEvaluator(labelCol='target', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

accuracy = evaluator.evaluate(predictions)

y_true = predictions.select(['target']).collect()
y_pred = predictions.select(['prediction']).collect()

confusionmatrix = confusion_matrix(y_true, y_pred)

precision = precision_score(y_true, y_pred, average='micro')

recall = recall_score(y_true, y_pred, average='micro')

treeModel = model.stages[2]
# summary only
print(treeModel)
print("Decision Tree - Test Accuracy = %g" % (accuracy))
print("Decision Tree - Test Error = %g" % (1.0 - accuracy))

print("The Confusion Matrix for Decision Tree Model is :\n" + str(confusionmatrix))

print("The precision score for Decision Tree Model is: " + str(precision))

print("The recall score for Decision Tree Model is: " + str(recall))

**Problem 2**: Naive Bayes

In [None]:
# your code goes here
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# import numpy
# Load training data
from pyspark.ml.linalg import SparseVector
# from pyspark.python.pyspark.shell import spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = spark.read.load("Absenteeism_at_work.csv", format="csv", header=True, delimiter=",")
data = data.withColumn("MOA", data["Month of absence"] - 0).withColumn("label", data['Seasons'] - 0). \
    withColumn("ROA", data["Reason for absence"] - 0). \
    withColumn("distance", data["Distance from Residence to Work"] - 0). \
    withColumn("BMI", data["Body mass index"] - 0)

assem = VectorAssembler(inputCols=["label", "MOA"], outputCol='features')

data = assem.transform(data)
# Split the data into train and test
splits = data.randomSplit([0.7, 0.3], 1000)
train = splits[0]
test = splits[1]

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train)

# select example rows to display.
predictions = model.transform(test)

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")

y_true = data.select("BMI").rdd.flatMap(lambda x: x).collect()
y_pred = data.select("ROA").rdd.flatMap(lambda x: x).collect()


accuracy = evaluator.evaluate(predictions)

confusionmatrix = confusion_matrix(y_true, y_pred)

precision = precision_score(y_true, y_pred, average='micro')

recall = recall_score(y_true, y_pred, average='micro')


print("Naive Bayes - Test set accuracy = " + str(accuracy))

print("The Confusion Matrix for Naive Bayes Model is :\n" + str(confusionmatrix))

print("The precision score for Naive Bayes Model is: " + str(precision))

print("The recall score for Naive Bayes Model is: " + str(recall))

**Problem 3**: Random Forest

In [None]:
# your code goes here
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
data = spark.read.load("Absenteeism_at_work.csv", format="csv", header=True, delimiter=",")

data = data.withColumn("MOA", data["Month of absence"] - 0).withColumn("label", data['Transportation expense'] - 0). \
    withColumn("ROA", data["Reason for absence"] - 0). \
    withColumn("distance", data["Distance from Residence to Work"] - 0). \
    withColumn("BMI", data["Body mass index"] - 0)

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.

assem = VectorAssembler(inputCols=["label", "distance"], outputCol='features')

data = assem.transform(data)

labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

y_true = data.select("BMI").rdd.flatMap(lambda x: x).collect()
y_pred = data.select("ROA").rdd.flatMap(lambda x: x).collect()

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

confusionmatrix = confusion_matrix(y_true, y_pred)

precision = precision_score(y_true, y_pred, average='micro')

recall = recall_score(y_true, y_pred, average='micro')

rfModel = model.stages[2]
print(rfModel)  # summary only
print("Random Forest - Test Accuracy = %g" % (accuracy))
print("Random Forest - Test Error = %g" % (1.0 - accuracy))

print("The Confusion Matrix for Random Forest Model is :\n" + str(confusionmatrix))

print("The precision score for Random Forest Model is: " + str(precision))

print("The recall score for Random Forest Model is: " + str(recall))


### Requirement 2

**Dataset**:

- Dataset 1: ...

- Dataset 2: ...

**Implementation**

In [None]:
# your code goes here