## Setup

In [50]:
import azureml.core
from azureml.core import Workspace

# Load the workspace from the saved config file
ws = Workspace.from_config()
print('Ready to use Azure ML {} to work with {}'.format(azureml.core.VERSION, ws.name))

Ready to use Azure ML 1.43.0 to work with azureml_edsp


In [51]:
import pyspark.pandas as ps
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [52]:
spark = SparkSession \
        .builder \
        .appName('Employee Retention') \
        .getOrCreate()

In [53]:
# Load data
data = spark.read.csv('data/Sample_IssueDataset.csv', \
                      header="true", \
                      inferSchema="true")

# Temporarily convert to pandas-on-spark DataFrame for
# better appearance and print a sample of rows
data.pandas_api().sample(frac=0.0002, random_state=135)

Unnamed: 0,Activity on Company Forums,EmployeeLeft,Hired through SMTP,National Origin (code),Negative Review in Past 5 Years,"Survey, Relative, Attitude toward Peers","Survey, Relative, Peer's Average Attitude toward Environment","Survey, Relative, Peer's Average Attitude toward Resources","Survey, Relative, Peer's Average Attitude toward WorkType","Survey, Relative, Peer's Average Attitude toward Workload","Survey, Relative, Peer's Average Review of Employee",University
3949,37.0,0.0,0,4,1,3,3,5,1,2,1,Smolensk Humanitarian University
6814,32.0,1.0,0,6,1,2,3,5,3,4,4,Universitas Pasundan
9153,45.0,1.0,0,3,0,1,2,1,1,5,6,University of Commerce Luigi Bocconi


In [54]:
data.printSchema()

root
 |-- Activity on Company Forums: double (nullable = true)
 |-- EmployeeLeft: double (nullable = true)
 |-- Hired through SMTP: integer (nullable = true)
 |-- National Origin (code): integer (nullable = true)
 |-- Negative Review in Past 5 Years: integer (nullable = true)
 |-- Survey, Relative, Attitude toward Peers: integer (nullable = true)
 |-- Survey, Relative, Peer's Average Attitude toward Environment: integer (nullable = true)
 |-- Survey, Relative, Peer's Average Attitude toward Resources: integer (nullable = true)
 |-- Survey, Relative, Peer's Average Attitude toward WorkType: integer (nullable = true)
 |-- Survey, Relative, Peer's Average Attitude toward Workload: integer (nullable = true)
 |-- Survey, Relative, Peer's Average Review of Employee: integer (nullable = true)
 |-- University: string (nullable = true)



In [55]:
data.select("University").distinct().show()

+--------------------+
|          University|
+--------------------+
|Universitas Neger...|
|Kyrgyz National U...|
|  Americanos College|
|     Rice University|
|Universitas Pasundan|
|University of Com...|
|Smolensk Humanita...|
+--------------------+



In [56]:
train_df, test_df = data.randomSplit([0.8, 0.2], seed=123)

## Handle categorical feature(s)

For decision tree and random forests, we should _not_ one-hot-encode our variables. Categorical features may end up taking a back seat relative to the continuous features. 

In [57]:
# Return all column names and their data types as a list
train_df.dtypes

[('Activity on Company Forums', 'double'),
 ('EmployeeLeft', 'double'),
 ('Hired through SMTP', 'int'),
 ('National Origin (code)', 'int'),
 ('Negative Review in Past 5 Years', 'int'),
 ('Survey, Relative, Attitude toward Peers', 'int'),
 ("Survey, Relative, Peer's Average Attitude toward Environment", 'int'),
 ("Survey, Relative, Peer's Average Attitude toward Resources", 'int'),
 ("Survey, Relative, Peer's Average Attitude toward WorkType", 'int'),
 ("Survey, Relative, Peer's Average Attitude toward Workload", 'int'),
 ("Survey, Relative, Peer's Average Review of Employee", 'int'),
 ('University', 'string')]

In [58]:
# Identify and store fields which are of type "string"
categorical_cols = [field for (field, dataType) in train_df.dtypes if dataType=="string"]
categorical_cols

['University']

In [59]:
# Append "Index" to the string-type column name(s)
index_output_cols = [x + "Index" for x in categorical_cols]
index_output_cols

['UniversityIndex']

In [60]:
from pyspark.ml.feature import StringIndexer

string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")

Combine the numeric and indexed categorical columns.

In [61]:
# Filter only for numeric columns and exlude our label, "EmployeeLeft"
numeric_cols = [field for (field, dataType) in train_df.dtypes if ((dataType=="double" or dataType=="int") & (field != "EmployeeLeft"))]
numeric_cols

['Activity on Company Forums',
 'Hired through SMTP',
 'National Origin (code)',
 'Negative Review in Past 5 Years',
 'Survey, Relative, Attitude toward Peers',
 "Survey, Relative, Peer's Average Attitude toward Environment",
 "Survey, Relative, Peer's Average Attitude toward Resources",
 "Survey, Relative, Peer's Average Attitude toward WorkType",
 "Survey, Relative, Peer's Average Attitude toward Workload",
 "Survey, Relative, Peer's Average Review of Employee"]

In [62]:
# Combine StringIndexer output with the numeric columns
assembler_inputs = index_output_cols + numeric_cols
assembler_inputs

['UniversityIndex',
 'Activity on Company Forums',
 'Hired through SMTP',
 'National Origin (code)',
 'Negative Review in Past 5 Years',
 'Survey, Relative, Attitude toward Peers',
 "Survey, Relative, Peer's Average Attitude toward Environment",
 "Survey, Relative, Peer's Average Attitude toward Resources",
 "Survey, Relative, Peer's Average Attitude toward WorkType",
 "Survey, Relative, Peer's Average Attitude toward Workload",
 "Survey, Relative, Peer's Average Review of Employee"]

In [63]:
from pyspark.ml.feature import VectorAssembler

# Combine all numeric and categorical inputs
vec_assembler = VectorAssembler(inputCols = assembler_inputs, outputCol="features")

## Building the decision tree with DecisionTreeClassifier and default parameters

In [73]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="EmployeeLeft")

## Fit our pipeline

In [74]:
from pyspark.ml import Pipeline

# Combine stages into a pipeline
stages = [string_indexer, vec_assembler, dt]
pipeline = Pipeline(stages=stages)

In [75]:
# Perform the fit
pipeline_model = pipeline.fit(train_df)

## Retrieve fitted decision tree model and observe feature importance scores

In [76]:
dt_model = pipeline_model.stages[-1]
dt_model

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_42a7b1895c78, depth=5, numNodes=43, numClasses=2, numFeatures=11

In [77]:
dt_model.featureImportances

SparseVector(11, {0: 0.0011, 3: 0.3658, 5: 0.1062, 6: 0.34, 8: 0.1869})

## Interpret feature importances

In [78]:
import pandas as pd

features_df = pd.DataFrame(list(zip(vec_assembler.getInputCols(), dt_model.featureImportances)), columns=["feature", "importance"])
features_df

Unnamed: 0,feature,importance
0,UniversityIndex,0.001089
1,Activity on Company Forums,0.0
2,Hired through SMTP,0.0
3,National Origin (code),0.365821
4,Negative Review in Past 5 Years,0.0
5,"Survey, Relative, Attitude toward Peers",0.106237
6,"Survey, Relative, Peer's Average Attitude towa...",0.339959
7,"Survey, Relative, Peer's Average Attitude towa...",0.0
8,"Survey, Relative, Peer's Average Attitude towa...",0.186894
9,"Survey, Relative, Peer's Average Attitude towa...",0.0


In [79]:
features_df.sort_values("importance", ascending=False)

Unnamed: 0,feature,importance
3,National Origin (code),0.365821
6,"Survey, Relative, Peer's Average Attitude towa...",0.339959
8,"Survey, Relative, Peer's Average Attitude towa...",0.186894
5,"Survey, Relative, Attitude toward Peers",0.106237
0,UniversityIndex,0.001089
1,Activity on Company Forums,0.0
2,Hired through SMTP,0.0
4,Negative Review in Past 5 Years,0.0
7,"Survey, Relative, Peer's Average Attitude towa...",0.0
9,"Survey, Relative, Peer's Average Attitude towa...",0.0


## Apply our model to a test set

In [83]:
pred_df = pipeline_model.transform(test_df)
pred_df

DataFrame[Activity on Company Forums: double, EmployeeLeft: double, Hired through SMTP: int, National Origin (code): int, Negative Review in Past 5 Years: int, Survey, Relative, Attitude toward Peers: int, Survey, Relative, Peer's Average Attitude toward Environment: int, Survey, Relative, Peer's Average Attitude toward Resources: int, Survey, Relative, Peer's Average Attitude toward WorkType: int, Survey, Relative, Peer's Average Attitude toward Workload: int, Survey, Relative, Peer's Average Review of Employee: int, University: string, UniversityIndex: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [81]:
pred_df.select("features", "EmployeeLeft", "prediction").orderBy("EmployeeLeft", ascending=False).show()

+--------------------+------------+----------+
|            features|EmployeeLeft|prediction|
+--------------------+------------+----------+
|[1.0,13.0,0.0,4.0...|         1.0|       1.0|
|[1.0,15.0,1.0,4.0...|         1.0|       1.0|
|[6.0,13.0,0.0,4.0...|         1.0|       0.0|
|[0.0,6.0,0.0,2.0,...|         1.0|       0.0|
|[1.0,13.0,1.0,5.0...|         1.0|       1.0|
|[2.0,14.0,1.0,3.0...|         1.0|       1.0|
|[5.0,15.0,1.0,4.0...|         1.0|       1.0|
|[2.0,14.0,1.0,3.0...|         1.0|       1.0|
|[2.0,9.0,1.0,4.0,...|         1.0|       1.0|
|[0.0,14.0,1.0,4.0...|         1.0|       1.0|
|[0.0,10.0,1.0,6.0...|         1.0|       1.0|
|[5.0,15.0,0.0,3.0...|         1.0|       0.0|
|[2.0,11.0,1.0,3.0...|         1.0|       0.0|
|[2.0,15.0,0.0,4.0...|         1.0|       1.0|
|[1.0,12.0,1.0,6.0...|         1.0|       1.0|
|[5.0,15.0,0.0,4.0...|         1.0|       0.0|
|[1.0,13.0,0.0,4.0...|         1.0|       0.0|
|[3.0,15.0,0.0,4.0...|         1.0|       1.0|
|[2.0,13.0,1.

In [85]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator=MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="EmployeeLeft")
acc = evaluator.evaluate(pred_df)
 
print("Prediction accuracy: ", acc)

Prediction accuracy:  0.6758899921093504


![image.png](attachment:23725e52-8276-464a-8077-ea1f02bfeefe.png)

# Bootstrap aggregating (bagging)

![image.png](attachment:9fa6e816-e8be-4df9-a7b1-3846305cb7c1.png)

For classification, there is no such thing as "average". Therefore, we simply take a majority vote of the predictions.

# Random forests

![image.png](attachment:f3acb581-9fa5-438f-8698-3292600652d1.png)

Similar to bootstrapping, but we take only a _subset_ of features.

![image.png](attachment:b2918886-b4d8-48d7-9141-5dd95fa70fd9.png)

## Tune random forest models with **grid search** and **cross-validation**

In [86]:
from pyspark.ml.classification import RandomForestClassifier

In [87]:
rf = RandomForestClassifier(labelCol="EmployeeLeft")

In [88]:
stages_rf = [string_indexer, vec_assembler, rf]
pipeline_rf = Pipeline(stages=stages_rf)

In [89]:
rf.explainParams()

"bootstrap: Whether bootstrap samples are used when building trees. (default: True)\ncacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)\ncheckpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)\nfeatureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the fe

As we can see above, there are many hyperparameters that we could tune, and it would take a long time to configure manually. Instead, we can use the Spark `ParamGridBuilder`.

In [108]:
from pyspark.ml.tuning import ParamGridBuilder

param_grid = (ParamGridBuilder() \
              .addGrid(rf.maxDepth, [2, 10]) \
              .addGrid(rf.numTrees, [5, 20]) \
              .build()
             )

Next, use 3-fold cross-validation to identify the optimal hyperparameters. 

In [109]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator

evaluator = RegressionEvaluator(labelCol="EmployeeLeft", predictionCol="prediction")

cv = CrossValidator(estimator=pipeline_rf, evaluator = evaluator, estimatorParamMaps=param_grid,
                    numFolds=3, seed=123
                   )

How many models are we training right now? 

In [110]:
cv_model = cv.fit(train_df)

                                                                                

## Train faster

In [111]:
cv = CrossValidator(estimator=rf, evaluator = evaluator, estimatorParamMaps=param_grid,
                    numFolds=3, seed=123
                   )

In [112]:
stages_with_cv = [string_indexer, vec_assembler, cv]
pipeline = Pipeline(stages=stages_with_cv)

pipeline_model = pipeline.fit(train_df)

Look at the model with the best hyperparameter configuration.

In [113]:
list(zip(cv_model.getEstimatorParamMaps(), cv_model.avgMetrics))

[({Param(parent='RandomForestClassifier_67e1504bb3e4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestClassifier_67e1504bb3e4', name='numTrees', doc='Number of trees to train (>= 1).'): 5},
  0.5959289790464691),
 ({Param(parent='RandomForestClassifier_67e1504bb3e4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 2,
   Param(parent='RandomForestClassifier_67e1504bb3e4', name='numTrees', doc='Number of trees to train (>= 1).'): 20},
  0.5922969448196834),
 ({Param(parent='RandomForestClassifier_67e1504bb3e4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 10,
   Param(parent='RandomForestClassifier_67e1504bb3e4

In [114]:
pred_df = pipeline_model.transform(test_df)

evaluator_rf=MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="EmployeeLeft")
acc_rf = evaluator.evaluate(pred_df)
 
print("Prediction accuracy: ", acc_rf)

Prediction accuracy:  0.5394369833192332


# Gradient boosted decision trees