# Big Data Examination
# Roll No. - DS5B-2121
## Question 1

Considering left as dependent variable in HR dataset, split the dataset according to your last digit of roll no. (Example: if your roll no is ending with 0, the ratio will be 70, 30; if your roll no is ending with 1, the ratio will be 71, 29; if your roll no is ending with 2, the ratio will be 72, 28; if your roll no is ending with 3, the ratio will be 73, 27 etc.). Determine the accuracy of the model.

### Importing Pyspark Library
It is an interface for Apache Spark in Python that allows us to write Spark applications using Python APIs, but also provides the PySpark

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 29 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 42.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=8b46831582fe33020b51646e24a86fc72a74a84b94240776aece0e2d97207751
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


## Importing Library, Creating Session and Reading Data

In [None]:
from pyspark.sql import SparkSession
session = SparkSession.builder.appName("HR_comma_Dataset").getOrCreate()
data = session.read.csv("HR comma.csv", header = True, inferSchema = True)
#we reassign value of __name__ (inbuilt variable) to "__main__" and main is used as entry point in many languages like C++ and Java,
# else the value of name might be different

In [None]:
data.show(10)

+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
|satisfaction_level|last_evaluation|number_project|average_montly_hours|time_spend_company|Work_accident|left|promotion_last_5years|sales|salary|
+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
|              0.38|           0.53|             2|                 157|                 3|            0|   1|                    0|sales|   low|
|               0.8|           0.86|             5|                 262|                 6|            0|   1|                    0|sales|medium|
|              0.11|           0.88|             7|                 272|                 4|            0|   1|                    0|sales|medium|
|              0.72|           0.87|             5|                 223|                 5|            0|   1|              

## Check Null Values in columns

In [None]:
from pyspark.sql.functions import isnan, when, count, col
data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
|satisfaction_level|last_evaluation|number_project|average_montly_hours|time_spend_company|Work_accident|left|promotion_last_5years|sales|salary|
+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+
|                 0|              0|             0|                   0|                 0|            0|   0|                    0|    0|     0|
+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----+------+



## There are no null values in the Dataset so we will move to Exploratory Data ANalysis

### SHowing the Data

In [None]:
data.columns

['satisfaction_level',
 'last_evaluation',
 'number_project',
 'average_montly_hours',
 'time_spend_company',
 'Work_accident',
 'left',
 'promotion_last_5years',
 'sales',
 'salary']

In [None]:
data.printSchema()

root
 |-- satisfaction_level: double (nullable = true)
 |-- last_evaluation: double (nullable = true)
 |-- number_project: integer (nullable = true)
 |-- average_montly_hours: integer (nullable = true)
 |-- time_spend_company: integer (nullable = true)
 |-- Work_accident: integer (nullable = true)
 |-- left: integer (nullable = true)
 |-- promotion_last_5years: integer (nullable = true)
 |-- sales: string (nullable = true)
 |-- salary: string (nullable = true)



## Importing Vector Assembler, String Indexer and One Hot Encoder

In [None]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
# It is use for mapping a string columm to a index column that will be treated as a categorical column by spark.
str_idx = StringIndexer(inputCols = ['sales','salary'], outputCols = ["newsales", "newsalary"])

# Applying OneHotEncoder and converting into 0,1 matrices

In [None]:
# It is an important technique for converting categorical attributes into a numeric vector
one_hot = OneHotEncoder(inputCols = ["newsales","newsalary"], outputCols = ["newsales_onehot","newsalary_onehot"])

In [None]:
# It is feature transformer that combine multiple columns into a single vector column.
# Pyspark ml models takes only one independent variable and one dependent varibale
#but, we have multiple independent variabales, so we use vector assembler to convert them into a single list
# of independent variables
vec_ass = VectorAssembler(inputCols = ['satisfaction_level','last_evaluation','number_project','average_montly_hours','time_spend_company','Work_accident','promotion_last_5years','newsales_onehot','newsalary_onehot'], outputCol = "all_features")

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol= "all_features", labelCol = "left")

## Creating Pipeline
Its like deciding the order of steps to be executed

In [None]:
from pyspark.ml import Pipeline
mypipeline = Pipeline(stages = [str_idx, one_hot, vec_ass, lr])

## Splitting the Dataset
# As my roll no is DS5B-2121 I will be using split as 0.71 and 0.29

In [None]:
training, test = data.randomSplit([0.71, 0.29])

## Building the Model

In [None]:
lr_model = mypipeline.fit(training)

## Fitting the data to the model 
to compute patterns using Train data and then these will be applied on test data

In [None]:
result = lr_model.transform(test)

## SHowing the Result

In [None]:
result.show(4, truncate = False)

+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----------+------+--------+---------+---------------+----------------+--------------------------------------------------------+----------------------------------------+----------------------------------------+----------+
|satisfaction_level|last_evaluation|number_project|average_montly_hours|time_spend_company|Work_accident|left|promotion_last_5years|sales      |salary|newsales|newsalary|newsales_onehot|newsalary_onehot|all_features                                            |rawPrediction                           |probability                             |prediction|
+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----------+------+--------+---------+---------------+----------------+--------------------------------------------------------+--------------------------------

## Evaluating the Logistic Regression Model using MultiClassificationEvaluator
As the number of unique values the dependent variable could take are more than 2, we have to apply MultiClassificationEvaluator insted of BinaryClassificationEvaluator

In [None]:
evaluation = ["f1","accuracy","weightedPrecision","weightedRecall", "weightedTruePositiveRate", "weightedFalsePositiveRate", "weightedFMeasure", "truePositiveRateByLabel", "falsePositiveRateByLabel", "precisionByLabel","recallByLabel", "fMeasureByLabel", "logLoss","hammingLoss"]
for i in evaluation:
  from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  eval = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol= "left", metricName=i)
  print(i, ":", eval.evaluate(result))

f1 : 0.7779568868738669
accuracy : 0.801658604008293
weightedPrecision : 0.7820065880658045
weightedRecall : 0.8016586040082929
weightedTruePositiveRate : 0.8016586040082929
weightedFalsePositiveRate : 0.5166086426447655
weightedFMeasure : 0.7779568868738669
truePositiveRateByLabel : 0.9422208847427024
falsePositiveRateByLabel : 0.6571709233791748
precisionByLabel : 0.8239473684210527
recallByLabel : 0.9422208847427024
fMeasureByLabel : 0.8791239646216482
logLoss : 0.4152158353029982
hammingLoss : 0.19834139599170697


# Building the Decision Tree Classifier Model

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(featuresCol= "all_features", labelCol = "left")

## Creating Pipeline

In [None]:
dtc_model = mypipeline.fit(training)

In [None]:
result2 = dtc_model.transform(test)

## Tranforming Data to compute the dataset

In [None]:
result2.show(4, truncate = False)

+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----------+------+--------+---------+---------------+----------------+--------------------------------------------------------+----------------------------------------+----------------------------------------+----------+
|satisfaction_level|last_evaluation|number_project|average_montly_hours|time_spend_company|Work_accident|left|promotion_last_5years|sales      |salary|newsales|newsalary|newsales_onehot|newsalary_onehot|all_features                                            |rawPrediction                           |probability                             |prediction|
+------------------+---------------+--------------+--------------------+------------------+-------------+----+---------------------+-----------+------+--------+---------+---------------+----------------+--------------------------------------------------------+--------------------------------

## Evaluation of Decision Tree Classifier Model

In [None]:
evaluation = ["f1","accuracy","weightedPrecision","weightedRecall", "weightedTruePositiveRate", "weightedFalsePositiveRate", "weightedFMeasure", "truePositiveRateByLabel", "falsePositiveRateByLabel", "precisionByLabel","recallByLabel", "fMeasureByLabel", "logLoss","hammingLoss"]
for i in evaluation:
  from pyspark.ml.evaluation import MulticlassClassificationEvaluator
  eval = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol= "left", metricName=i)
  print(i, ":", eval.evaluate(result2))

f1 : 0.7779568868738669
accuracy : 0.801658604008293
weightedPrecision : 0.7820065880658045
weightedRecall : 0.8016586040082929
weightedTruePositiveRate : 0.8016586040082929
weightedFalsePositiveRate : 0.5166086426447655
weightedFMeasure : 0.7779568868738669
truePositiveRateByLabel : 0.9422208847427024
falsePositiveRateByLabel : 0.6571709233791748
precisionByLabel : 0.8239473684210527
recallByLabel : 0.9422208847427024
fMeasureByLabel : 0.8791239646216482
logLoss : 0.4152158353029982
hammingLoss : 0.19834139599170697
