### AIT 614 - Big Data Essentials <br>
#### Lab 5.2 - Binary Classification using Logistic Regression
<hr>
<b>Machine Learning using Spark MLlib on Databricks</b><br>

Course Section #: AIT614-Sect2<br>
Student's Full Name: Khanh Nguyen<br>

In [0]:
dataset = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/money4khanh@gmail.com/EmployeeAttrition.csv", inferSchema = "true")
dataset.printSchema()

In [0]:
#Randomly split data into training and test sets. 


trainDF, testDF = dataset.randomSplit([0.75, 0.25], seed=2022)
print(trainDF.cache().count()) # Cache because accessing training data multiple times
print(testDF.count())


In [0]:
#Display the distribution of the number of HourlyRate using summary() 
display(trainDF.select("HourlyRate").summary())

summary,HourlyRate
count,1113.0
mean,65.73135669362084
stddev,20.276032996495964
min,30.0
25%,48.0
50%,66.0
75%,84.0
max,100.0


In [0]:
#Education using groupBy() in the training set.
display(trainDF
        .groupBy("Education")
        .count()
        .sort("count", ascending=False))

Education,count
3,448
4,298
2,215
1,114
5,38


#Feature preprocessing
#####Convert categorical variables to numeric
Some machine learning algorithms, such as linear and logistic regression, require numeric features. The Adult dataset includes categorical features such as education, occupation, and marital status.

The following code block illustrates how to use StringIndexer and OneHotEncoder to convert categorical variables into a set of numeric variables that only take on values 0 and 1.

StringIndexer converts a column of string values to a column of label indexes. For example, it might convert the values "red", "blue", and "green" to 0, 1, and 2.
OneHotEncoder maps a column of category indices to a column of binary vectors, with at most one "1" in each row that indicates the category index for that row.
One-hot encoding in Spark is a two-step process. You first use the StringIndexer, followed by the OneHotEncoder. The following code block defines the StringIndexer and OneHotEncoder but does not apply it to any data yet.

In [0]:
#Convert categorical variables to numeric
from pyspark.ml.feature import StringIndexer, OneHotEncoder
 
categoricalCols = ["Department", "EducationField", "Gender", "JobRole", "MaritalStatus"]
 
# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols]) 
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols]) 
 
# The label column ("Attrition") is also a string value - it has two possible values, "Yes" or "No". 
# Convert it to a categorical value using StringIndexer.
labelToIndex = StringIndexer(inputCol="Attrition", outputCol="label")

In this notebook, we'll build a pipeline combining all of our feature engineering and modeling steps. 
How estimators and transformers work by applying the stringIndexer estimator that we created in the previous code block.

You can call the .fit() method to return a StringIndexerModel, which you can then use to transform the dataset.

The .transform() method of StringIndexerModel returns a new DataFrame with the new columns appended. Scroll right to see the new columns if necessary.

### Combine all feature columns into a single feature vector <br>
Most MLlib algorithms require a single features column as input. Each row in this column contains a vector of data points corresponding to the set of features used for prediction.

MLlib provides the VectorAssembler transformer to create a single vector column from a list of columns.

The following code block illustrates how to use VectorAssembler.

In [0]:
#Combine all feature columns: categorical (one hot encoding) and numerical into a single feature vector
from pyspark.ml.feature import VectorAssembler
 
# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["Age", "DailyRate", "Education", "DistanceFromHome", "HourlyRate", "JobInvolvement", "JobLevel", "JobSatisfaction", "MonthlyIncome", "YearsAtCompany", "YearsInCurrentRole", "YearsWithCurrManager", "NumCompaniesWorked", "PerformanceRating", "EnvironmentSatisfaction"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

In [0]:
#Define the model: Logistic Regression model
from pyspark.ml.classification import LogisticRegression
 
lr = LogisticRegression(featuresCol="features", labelCol="label", regParam=1.0)

#### Build the pipeline
A Pipeline is an ordered list of transformers and estimators. You can define a pipeline to automate and ensure repeatability of the transformations to be applied to a dataset. In this step, we define the pipeline and then apply it to the test dataset.

Similar to what we saw with StringIndexer, a Pipeline is an estimator. The pipeline.fit() method returns a PipelineModel, which is a transformer.

In [0]:
#Build the pipeline and then apply the pipeline model to the test dataset. 


from pyspark.ml import Pipeline
 
# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, labelToIndex, vecAssembler, lr])
 
# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)
 
# Apply the pipeline model to the test dataset.
predDF = pipelineModel.transform(testDF)

In [0]:
#display the predictions from the model.
#The features column is a sparse vector, which is often the case after one-hot encoding, because there are so many 0 values.
display(predDF.select("features", "label", "prediction", "probability"))

features,label,prediction,probability
"Map(vectorType -> sparse, length -> 33, indices -> List(0, 2, 10, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 30, 31, 32), values -> List(1.0, 1.0, 1.0, 1.0, 18.0, 1124.0, 3.0, 1.0, 97.0, 3.0, 1.0, 4.0, 1611.0, 1.0, 3.0, 4.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7996543691044878, 0.20034563089551216))"
"Map(vectorType -> sparse, length -> 33, indices -> List(0, 3, 8, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 30, 31, 32), values -> List(1.0, 1.0, 1.0, 1.0, 18.0, 1431.0, 3.0, 14.0, 33.0, 3.0, 1.0, 3.0, 1514.0, 1.0, 3.0, 2.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7925034791632489, 0.20749652083675107))"
"Map(vectorType -> sparse, length -> 33, indices -> List(0, 2, 7, 10, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 30, 31, 32), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 18.0, 230.0, 3.0, 3.0, 54.0, 3.0, 1.0, 3.0, 1420.0, 1.0, 3.0, 3.0))",1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7759404455596545, 0.22405955444034553))"
"Map(vectorType -> sparse, length -> 33, indices -> List(0, 3, 10, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 30, 31, 32), values -> List(1.0, 1.0, 1.0, 1.0, 19.0, 1181.0, 1.0, 3.0, 79.0, 3.0, 1.0, 2.0, 1483.0, 1.0, 1.0, 3.0, 2.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7767990784271404, 0.22320092157285965))"
"Map(vectorType -> sparse, length -> 33, indices -> List(1, 5, 15, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 31, 32), values -> List(1.0, 1.0, 1.0, 1.0, 19.0, 602.0, 1.0, 1.0, 100.0, 1.0, 1.0, 1.0, 2325.0, 4.0, 3.0))",1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.6737609881760861, 0.3262390118239139))"
"Map(vectorType -> sparse, length -> 33, indices -> List(1, 2, 7, 15, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 20.0, 727.0, 1.0, 9.0, 54.0, 3.0, 1.0, 1.0, 2728.0, 2.0, 2.0, 2.0, 1.0, 3.0, 4.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7326111850842356, 0.2673888149157644))"
"Map(vectorType -> sparse, length -> 33, indices -> List(0, 2, 7, 10, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 20.0, 805.0, 3.0, 3.0, 87.0, 2.0, 1.0, 3.0, 3033.0, 2.0, 2.0, 2.0, 1.0, 3.0, 1.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7701170803758531, 0.22988291962414686))"
"Map(vectorType -> sparse, length -> 33, indices -> List(0, 3, 7, 8, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 29, 30, 31, 32), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 20.0, 1362.0, 1.0, 10.0, 32.0, 3.0, 1.0, 3.0, 1009.0, 1.0, 1.0, 1.0, 3.0, 4.0))",1.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7980758738573589, 0.2019241261426411))"
"Map(vectorType -> sparse, length -> 33, indices -> List(0, 2, 7, 8, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 30, 31, 32), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 21.0, 391.0, 2.0, 15.0, 96.0, 3.0, 1.0, 4.0, 1232.0, 1.0, 3.0, 3.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7873006842333691, 0.21269931576663093))"
"Map(vectorType -> sparse, length -> 33, indices -> List(1, 3, 7, 15, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 21.0, 501.0, 1.0, 5.0, 58.0, 3.0, 1.0, 1.0, 2380.0, 2.0, 2.0, 2.0, 1.0, 3.0, 3.0))",0.0,0.0,"Map(vectorType -> dense, length -> 2, values -> List(0.7310293427700916, 0.2689706572299084))"


In [0]:
#Evaluate the model. Plot the ROC curve and print Area under Roc curve and Accuracy
display(pipelineModel.stages[-1], predDF.drop("prediction", "rawPrediction", "probability"), "ROC")

False Positive Rate,True Positive Rate,Threshold
0.0,0.0,0.2842722097233867
0.0111111111111111,0.0,0.2842722097233867
0.0222222222222222,0.0,0.2535727923674259
0.0222222222222222,0.0714285714285714,0.249795484529782
0.0333333333333333,0.0714285714285714,0.238380660629755
0.0333333333333333,0.1428571428571428,0.2360436294028838
0.0444444444444444,0.1428571428571428,0.2254168727774774
0.0555555555555555,0.1428571428571428,0.2243295122702755
0.0666666666666666,0.1428571428571428,0.2229348683002681
0.0666666666666666,0.2142857142857142,0.2221059916012158


In [0]:
# To evaluate the model, we use the BinaryClassificationEvaluator to evaluate the area under the ROC curve 
#and the MulticlassClassificationEvaluator to evalute the accuracy.
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
 
bcEvaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
print(f"Area under ROC curve: {bcEvaluator.evaluate(predDF)}")
 
mcEvaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print(f"Accuracy: {mcEvaluator.evaluate(predDF)}")

####Hyperparameter tuning
MLlib provides methods to facilitate hyperparameter tuning and cross validation.
For hyperparameter tuning, ParamGridBuilder lets you define a grid search over a set of model hyperparameters.

For cross validation, CrossValidator lets you specify an estimator (the pipeline to apply to the input dataset), an evaluator, a grid space of hyperparameters, and the number of folds to use for cross validation.

Use ParamGridBuilder and CrossValidator to tune the model. This example uses three values for regParam and three for elasticNetParam, for a total of 3 x 3 = 9 hyperparameter combinations for CrossValidator to examine.

In [0]:
#Use ParamGridBuilder() and CrossValidator() to tune the model
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
 
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())

Whenever you call CrossValidator in MLlib, Databricks automatically tracks all of the runs using MLflow. We can use the MLflow UI (AWS|Azure|GCP) to compare how each model performed.

 we use the pipeline we created as the estimator.

In [0]:
# Create a 3-fold CrossValidator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=bcEvaluator, numFolds=3, parallelism = 4)
 
# Run cross validations. This step takes a few minutes and returns the best model found from the cross validation.
cvModel = cv.fit(trainDF)

#MLlib will automatically track trials in MLflow. After your tuning fit() call has completed, view the MLflow UI to see logged runs.

In [0]:
#Make predictions and evaluate model performance
# Use the best model identified by the cross-validation to make predictions on the test dataset
cvPredDF = cvModel.transform(testDF)
 
# Evaluate the model's performance based on area under the ROC curve and accuracy 
print(f"Area under ROC curve: {bcEvaluator.evaluate(cvPredDF)}")
print(f"Accuracy: {mcEvaluator.evaluate(cvPredDF)}")

In [0]:
#USE SQL COMMANDS
#creating a temporary view of the predictions dataset.
cvPredDF = cvModel.transform(testDF)
cvPredDF.createOrReplaceTempView("finalPredictions")

In [0]:
%sql
SELECT JobRole, prediction, count(*) AS count 
FROM finalPredictions
GROUP BY JobRole, prediction
ORDER BY JobRole


JobRole,prediction,count
Healthcare Representative,0.0,42
Human Resources,0.0,10
Laboratory Technician,0.0,46
Laboratory Technician,1.0,5
Manager,0.0,28
Manufacturing Director,0.0,45
Research Director,0.0,14
Research Scientist,0.0,57
Sales Executive,0.0,88
Sales Executive,1.0,4


In [0]:
%sql
SELECT Age, prediction, count(*) AS count
FROM finalPredictions
GROUP BY Age, prediction
ORDER BY Age

Age,prediction,count
18,0.0,3
19,1.0,1
19,0.0,1
20,1.0,2
20,0.0,1
21,0.0,1
21,1.0,2
22,0.0,6
22,1.0,2
23,0.0,5


### Citation/Reference <br>
Getting started with MLlib - binary classification example. Databricks.  Retrieved on April 6, 2022 from https://docs.microsoft.com/en-us/azure/databricks/_static/notebooks/getting-started/get-started-with-mllib-dbr7.html  <br>

Apache Spark. (2022). Class StringIndexerModel. Retrieved on April 6, 2022 from
https://spark.apache.org/docs/latest/api/java/org/apache/spark/ml/feature/StringIndexerModel.html