# Employee Attrition Analysis

In [1]:
import findspark
findspark.init("/home/ubuntu/spark-3.2.1-bin-hadoop2.7")
import pyspark
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
import json
import re
from sklearn.feature_selection import SelectKBest, chi2
from sklearn.metrics import roc_curve, auc
from graphviz import Digraph
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, mean, stddev, abs, when, expr, lit, rand, log
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, Bucketizer, QuantileDiscretizer, VectorAssembler, ChiSqSelector
from pyspark.ml.classification import DecisionTreeClassifier,LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from anytree import Node, RenderTree
# from pyspark.ml.classification import DecisionTreeClassificationModel
spark = SparkSession.builder.appName("jhon232_Employee_Attrition_Analysis").config("spark.driver.memory", "8g").config("spark.executor.memory", "15g").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/10 07:51:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Data Understanding

### Read Data

In [2]:
# read data in the format of csv
df_demographic = spark.read.csv("demographic_data.csv",header=True,inferSchema=True)
df_job_related = spark.read.csv("job_related_data.csv",header=True,inferSchema=True)

In [3]:
# inner join two datasets on EmployeeNumber
df = df_demographic.join(df_job_related, on="EmployeeNumber",how="inner")
display(df)

23/10/10 07:51:12 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


EmployeeNumber,Age,DistanceFromHome,Education,EducationField,Gender,MaritalStatus,Over18,Attrition,BusinessTravel,DailyRate,Department,EmployeeCount,EnvironmentSatisfaction,HourlyRate,JobInvolvement,JobLevel,JobRole,JobSatisfaction,MonthlyIncome,MonthlyRate,NumCompaniesWorked,OverTime,PercentSalaryHike,PerformanceRating,RelationshipSatisfaction,StandardHours,StockOptionLevel,TotalWorkingYears,TrainingTimesLastYear,WorkLifeBalance,YearsAtCompany,YearsInCurrentRole,YearsSinceLastPromotion,YearsWithCurrManager
1,41,1,2,Life Sciences,Female,Single,Y,Yes,Travel_Rarely,1102,Sales,1,2,94,3,2,Sales Executive,4,5993,19479,8,Yes,11,3,1,80,0,8,0,1,6,4,0,5
2,49,8,1,Life Sciences,Male,Married,Y,No,Travel_Frequently,279,Research & Develo...,1,3,61,2,2,Research Scientist,2,5130,24907,1,No,23,4,4,80,1,10,3,3,10,7,1,7
4,37,2,2,Other,Male,Single,Y,Yes,Travel_Rarely,1373,Research & Develo...,1,4,92,2,1,Laboratory Techni...,3,2090,2396,6,Yes,15,3,2,80,0,7,3,3,0,0,0,0
5,33,3,4,Life Sciences,Female,Married,Y,No,Travel_Frequently,1392,Research & Develo...,1,4,56,3,1,Research Scientist,3,2909,23159,1,Yes,11,3,3,80,0,8,3,3,8,7,3,0
7,27,2,1,Medical,Male,Married,Y,No,Travel_Rarely,591,Research & Develo...,1,1,40,3,1,Laboratory Techni...,2,3468,16632,9,No,12,3,4,80,1,6,3,3,2,2,2,2
8,32,2,2,Life Sciences,Male,Single,Y,No,Travel_Frequently,1005,Research & Develo...,1,4,79,3,1,Laboratory Techni...,4,3068,11864,0,No,13,3,3,80,0,8,2,2,7,7,3,6
10,59,3,3,Medical,Female,Married,Y,No,Travel_Rarely,1324,Research & Develo...,1,3,81,4,1,Laboratory Techni...,1,2670,9964,4,Yes,20,4,1,80,3,12,3,2,1,0,0,0
11,30,24,1,Life Sciences,Male,Divorced,Y,No,Travel_Rarely,1358,Research & Develo...,1,4,67,3,1,Laboratory Techni...,3,2693,13335,1,No,22,4,2,80,1,1,2,3,1,0,0,0
12,38,23,3,Life Sciences,Male,Single,Y,No,Travel_Frequently,216,Research & Develo...,1,4,44,2,3,Manufacturing Dir...,3,9526,8787,0,No,21,4,2,80,0,10,2,3,9,7,1,8
13,36,27,3,Medical,Male,Married,Y,No,Travel_Rarely,1299,Research & Develo...,1,3,94,3,2,Healthcare Repres...,3,5237,16577,6,No,13,3,2,80,2,17,3,2,7,7,7,7


In [4]:
# df_demographic.show()

In [5]:
# df_demographic.printSchema()

In [6]:
# df_demographic.count()

In [7]:
# df_job_related.show()

In [8]:
# df_job_related.printSchema()

In [9]:
# df_job_related.count()

### Describe Data

In [10]:
df.printSchema()

root
 |-- EmployeeNumber: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- DistanceFromHome: integer (nullable = true)
 |-- Education: integer (nullable = true)
 |-- EducationField: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- Over18: string (nullable = true)
 |-- Attrition: string (nullable = true)
 |-- BusinessTravel: string (nullable = true)
 |-- DailyRate: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- EmployeeCount: integer (nullable = true)
 |-- EnvironmentSatisfaction: integer (nullable = true)
 |-- HourlyRate: integer (nullable = true)
 |-- JobInvolvement: integer (nullable = true)
 |-- JobLevel: integer (nullable = true)
 |-- JobRole: string (nullable = true)
 |-- JobSatisfaction: integer (nullable = true)
 |-- MonthlyIncome: integer (nullable = true)
 |-- MonthlyRate: integer (nullable = true)
 |-- NumCompaniesWorked: integer (nullable = true)
 |-- OverTime: string 

In [11]:
df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1472 entries, 0 to 1471
Data columns (total 35 columns):
 #   Column                    Non-Null Count  Dtype  
---  ------                    --------------  -----  
 0   EmployeeNumber            1472 non-null   int32  
 1   Age                       1472 non-null   int32  
 2   DistanceFromHome          1472 non-null   int32  
 3   Education                 1472 non-null   int32  
 4   EducationField            1472 non-null   object 
 5   Gender                    1472 non-null   object 
 6   MaritalStatus             1472 non-null   object 
 7   Over18                    1472 non-null   object 
 8   Attrition                 1472 non-null   object 
 9   BusinessTravel            1472 non-null   object 
 10  DailyRate                 1472 non-null   int32  
 11  Department                1472 non-null   object 
 12  EmployeeCount             1472 non-null   int32  
 13  EnvironmentSatisfaction   1472 non-null   int32  
 14  HourlyRa

In [12]:
df_describe = df.toPandas()
display(df_describe.describe(include="all").transpose())

Unnamed: 0,count,unique,top,freq,mean,std,min,25%,50%,75%,max
EmployeeNumber,1472.0,,,,1024.880435,602.819651,1.0,490.25,1020.5,1556.25,2069.0
Age,1472.0,,,,36.91712,9.131634,18.0,30.0,36.0,43.0,60.0
DistanceFromHome,1472.0,,,,9.199728,8.118986,1.0,2.0,7.0,14.0,29.0
Education,1472.0,,,,683.085598,26096.050436,-1.0,2.0,3.0,4.0,1001221.0
EducationField,1472.0,6.0,Life Sciences,607.0,,,,,,,
Gender,1472.0,2.0,Male,882.0,,,,,,,
MaritalStatus,1472.0,3.0,Married,673.0,,,,,,,
Over18,1472.0,1.0,Y,1472.0,,,,,,,
Attrition,1472.0,2.0,No,1234.0,,,,,,,
BusinessTravel,1472.0,4.0,Travel_Rarely,1042.0,,,,,,,


### Explore Data

In [13]:
# convert categorical data to numeric data for computing statistics

columns_to_convert = ["EducationField", "Gender", "MaritalStatus","Attrition", "BusinessTravel", "Department", "JobRole", "Over18", "OverTime"]

indexers = [StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid="skip")
            for col_name in columns_to_convert]
stages = indexers

pipeline = Pipeline(stages=stages)

model = pipeline.fit(df)

df_indexed = model.transform(df)

for col_name in columns_to_convert:
    df_indexed = df_indexed.drop(col_name)
    df_indexed = df_indexed.withColumnRenamed(col_name + "_index", col_name)

In [None]:
# valid records, outliers, extremes, missing values

# define a function to detect outliers
def detect_outliers_std(data, threshold=3):
    mean_std = data.agg(mean(col(data.columns[0])).alias("mean"), stddev(col(data.columns[0])).alias("std")).collect()[0]
    mean_col = mean_std["mean"]
    std_col = mean_std["std"]
    z_score_col = (col(data.columns[0]) - mean_col) / std_col
    return data.filter(abs(z_score_col) > threshold).count()

# define a function to detect extremes based on outliers
def detect_extremes_std(data, threshold=5):
    return detect_outliers_std(data, threshold)

# define a function to count missing values
def count_missing_values(data):
    return data.filter(col(data.columns[0]).isNull()).count()

# count the number of outliers, extremes and missing values
def count_outliers_extremes_missing_values(data,outlier_threshold,extreme_threshold):
    results_df = []

    for column in data.columns:
        selected_data = data.select(column)
        valid_records = selected_data.filter(col(column).isNotNull()).count()
        outliers_count = detect_outliers_std(selected_data, outlier_threshold)
        extremes_count = detect_extremes_std(selected_data, extreme_threshold)
        missing_values_count = count_missing_values(selected_data)

        results_df.append(
            {
                "Variable": column,
                "Valid Records": valid_records,
                "Outliers": outliers_count,
                "Extremes": extremes_count,
                "Missing Values": missing_values_count,
            }
        )
        
    results_rows = [Row(**result) for result in results_df]
    results_df = spark.createDataFrame(results_rows)
    results_df = results_df.toPandas()
    
    return results_df

outlier_threshold = 3.0
extreme_threshold = 5.0

results_df = count_outliers_extremes_missing_values(df_indexed,outlier_threshold,extreme_threshold)
display(results_df)

In [None]:
# Histogram of Age for Attrition
df_age = df.select("Age", "Attrition").toPandas()

bins = range(18, 60, 5)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

# first subplot (Attrition: Yes)
counts1, edges1, _ = ax1.hist(df_age[df_age["Attrition"] == "Yes"]["Age"], bins=bins, orientation='horizontal', color='lightblue', edgecolor='black')
ax1.set_xlabel('Age')
ax1.set_ylabel('Count')
ax1.set_title('Histogram of Age for Attrition: Yes')

# second subplot (Attrition: No)
counts2, edges2, _ = ax2.hist(df_age[df_age["Attrition"] == "No"]["Age"], bins=bins, orientation='horizontal', edgecolor='black')
ax2.set_xlabel('Age')
ax2.set_ylabel('')
ax2.set_title('Histogram of Age for Attrition: No')

plt.tight_layout()
plt.show()

In [None]:
# Histogram of Monthly Income for Attrition
df_income = df.select("MonthlyIncome", "Attrition").toPandas()
plt.figure(figsize=(10, 5))
bins = range(1000, 20000, 500)
plt.hist(df_income[df_income["Attrition"] == "Yes"]["MonthlyIncome"], bins=bins, alpha=0.5, label='Attrition: Yes', color='red')
plt.hist(df_income[df_income["Attrition"] == "No"]["MonthlyIncome"], bins=bins, alpha=0.5, label='Attrition: No', color='blue')
plt.xlabel('Monthly Income')
plt.ylabel('Count')
plt.title('Histogram of Monthly Income vs. Attrition')
plt.legend()
plt.show()

In [None]:
# Distribution of Over Time for Attrition
df_overtime = df.select("OverTime", "Attrition").toPandas()
sns.set(style="whitegrid")
plt.figure(figsize=(8, 6))
sns.countplot(data=df_overtime, x="OverTime", hue="Attrition", palette="Set2", order=np.unique(df_overtime["OverTime"]),hue_order=np.unique(df_overtime["Attrition"]))
plt.xlabel("OverTime")
plt.ylabel("Count")
plt.title("Distribution of OverTime for Attrition")
plt.legend(title="Attrition", loc="upper right")
plt.show()

In [None]:
# Histogram of Years at Company for Attrition
df_years_at_company = df.select("YearsAtCompany", "Attrition").toPandas()
plt.figure(figsize=(10, 5))
bins = range(0, 40, 1)
plt.hist(df_years_at_company[df_years_at_company["Attrition"] == "Yes"]["YearsAtCompany"], bins=bins, alpha=0.5, label="Attrition: Yes", color="red")
plt.hist(df_years_at_company[df_years_at_company["Attrition"] == "No"]["YearsAtCompany"], bins=bins, alpha=0.5, label="Attrition: No", color="blue")
plt.xlabel("Years At Company")
plt.ylabel("Count")
plt.title("Histogram of Years At Company vs. Attrition")
plt.legend()
plt.show()

In [None]:
# Histogram of Total Working Years for Attrition
df_total_working_years = df.select("TotalWorkingYears", "Attrition").toPandas()
plt.figure(figsize=(10, 5))
bins = range(0, 40, 1)
total_working_years = df_total_working_years['TotalWorkingYears']
attrition = df_total_working_years['Attrition']
plt.hist(total_working_years[attrition == "Yes"], bins=bins, alpha=0.5, label="Attrition: Yes", color="red")
plt.hist(total_working_years[attrition == "No"], bins=bins, alpha=0.5, label="Attrition: No", color="blue")
plt.xlabel("Total Working Years")
plt.ylabel("Count")
plt.title("Histogram of Total Working Years vs. Attrition")
plt.legend()
plt.show()

In [None]:
# Distribution of Marital Status for Attrition
df_marital_status = df.select("MaritalStatus", "Attrition").toPandas()
attrition = df_marital_status['Attrition']
sns.set(style="whitegrid")
plt.figure(figsize=(8, 6))
sns.histplot(data=df_marital_status, y="MaritalStatus", hue="Attrition", palette="Set2", multiple="stack", bins=100, hue_order=np.unique(attrition))
plt.xlabel("Count")
plt.ylabel("Marital Status")
plt.title("Distribution of Marital Status for Attrition")
plt.show()

In [None]:
# Histogram of Job Role for Attrition
df_job_role = df.select("JobRole", "Attrition").toPandas()
sns.set_style("white")
unique_job_roles = sorted(df_job_role["JobRole"].unique())

# Create a mapping from original job roles to alphabetical order
job_role_to_alphabetical = {job: idx for idx, job in enumerate(unique_job_roles)}

# Sort the data by the alphabetical order of job roles
job_role_yes = df_job_role[df_job_role["Attrition"] == "Yes"]["JobRole"].map(job_role_to_alphabetical)
job_role_no = df_job_role[df_job_role["Attrition"] == "No"]["JobRole"].map(job_role_to_alphabetical)
max_count = max(job_role_yes.value_counts().max(), job_role_no.value_counts().max())

plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
counts1, edges1, _ = plt.hist(job_role_no, bins=len(unique_job_roles), color="lightblue", edgecolor="black")
plt.xticks(range(len(unique_job_roles)), unique_job_roles, rotation=90)
plt.xlabel("Job Role")
plt.ylabel("Count")
plt.ylim(0, max_count)
plt.title("Histogram of Job Role for Attrition: No")

plt.subplot(1, 2, 2)
counts2, edges2, _ = plt.hist(job_role_yes, bins=len(unique_job_roles), edgecolor="black")
plt.xticks(range(len(unique_job_roles)), unique_job_roles, rotation=90)
plt.xlabel("Job Role")
plt.ylim(0, max_count)
plt.title("Histogram of Job Role for Attrition: Yes")

plt.tight_layout()
plt.show()

In [None]:
# Distribution of Attrition
df_attrition = df.select("Attrition").toPandas()
sns.set(style="whitegrid")
plt.figure(figsize=(8, 6))
ax = sns.countplot(data=df_attrition, y="Attrition", palette="Set2", order=df_attrition["Attrition"].value_counts().index)
total = len(df_attrition)
for p in ax.patches:
    percentage = "{:.1f}%".format(100 * p.get_width() / total)
    x = p.get_x() + p.get_width() + 0.02
    y = p.get_y() + p.get_height() / 2
    ax.annotate(percentage, (x, y), fontsize=12, va="center")

plt.xlabel("Count")
plt.ylabel("Attrition")
plt.title("Distribution of Attrition")
plt.show()

In [None]:
# Distribution of Business Travel for Attrition
df_business_travel = df.select("Attrition","BusinessTravel").toPandas()
sns.set(style="whitegrid")
plt.figure(figsize=(8, 6))
sns.histplot(data=df_business_travel, y="BusinessTravel", hue="Attrition", palette="Set2", multiple="stack", hue_order=np.unique(df_business_travel["Attrition"]))
plt.xlabel("Count")
plt.ylabel("Business Travel")
plt.title("Distribution of Business Travel for Attrition")
plt.show()

In [None]:
# Heatmap of Correlation Matrix
df_overview = df.toPandas()
sns.set(style="white")
df_corr = round(df_overview.corr().sort_index(axis = 0).sort_index(axis = 1), 3)
plt.figure(figsize = (24,16))
heatmap = sns.heatmap(df_corr, annot=True, cmap=sns.cubehelix_palette(start=.5, rot=-.5, as_cmap=True), linewidths=.5, annot_kws = {"size": 10} )
heatmap.set_xticklabels(heatmap.get_xticklabels(), size=12)
heatmap.set_yticklabels(heatmap.get_yticklabels(), size=12)
plt.title("Correlation Matrix")
plt.show()

### Verify Data Quality

In [None]:
# Missing Data
missing_value_rows = df.filter((df["MonthlyIncome"].isNull()) | (df["MonthlyIncome"] == ""))
display(missing_value_rows)

In [None]:
# Outliers and Extreme Values
# Boxplots for Outliers
df_outliers_extremes = df.select("Attrition", "TotalWorkingYears", "YearsAtCompany", "YearsInCurrentRole", "YearsSinceLastPromotion", "YearsWithCurrManager").toPandas()
sns.set(style='white')
plt.figure(figsize = (20,12))
plt.subplot(3, 2, 1)
sns.boxplot(x=df_outliers_extremes["Attrition"], y=df_outliers_extremes['TotalWorkingYears'], data=df_outliers_extremes)
plt.subplot(3, 2, 2)
sns.boxplot(x=df_outliers_extremes["Attrition"], y=df_outliers_extremes['YearsAtCompany'], data=df_outliers_extremes)
plt.subplot(3, 2, 3)
sns.boxplot(x=df_outliers_extremes["Attrition"], y=df_outliers_extremes['YearsInCurrentRole'], data=df_outliers_extremes)
plt.subplot(3, 2, 4)
sns.boxplot(x=df_outliers_extremes["Attrition"], y=df_outliers_extremes['YearsSinceLastPromotion'], data=df_outliers_extremes)
plt.subplot(3, 2, 5)
sns.boxplot(x=df_outliers_extremes["Attrition"], y=df_outliers_extremes['YearsWithCurrManager'], data=df_outliers_extremes)
plt.show()

In [None]:
# Extremes in “Education”
education_allowed_values = [1, 2, 3, 4, 5]
df_extremes = df.filter(~df["Education"].isin(education_allowed_values))
display(df_extremes)

## Data Preparation

### Select Data

In [None]:
# number of columns in original dataset
len(df.columns)

In [None]:
# number of columns after selecting

columns_to_remove = ["EmployeeCount", "EmployeeNumber", "StandardHours", "Over18", "HourlyRate", "DailyRate", "MonthlyRate"]

columns_to_keep = [col for col in df.columns if col not in columns_to_remove]
df_select = df.select(columns_to_keep)

len(df_select.columns)

### Clean Data

In [None]:
# Remove Extreme Values In Education
education_allowed_values = [1, 2, 3, 4, 5]
df_clean = df_select.filter(df_select["Education"].isin(education_allowed_values))
df_clean.filter(~df_clean["Education"].isin(education_allowed_values)).show() # no extremes values in education after cleaning

In [None]:
# unique values of BusinessTravel in original dataset
df.select("BusinessTravel").distinct()

In [None]:
# Fix code consistency in BusinessTravel
df_clean = df_clean.withColumn("BusinessTravel", when(col("BusinessTravel") == "Travel-Rarely", "Travel_Rarely").otherwise(col("BusinessTravel")))
df_clean.select("BusinessTravel").distinct()

In [None]:
# Missing Monthly Income Value in Original Dataset
display(df.filter((df["MonthlyIncome"].isNull()) | (df["MonthlyIncome"] == "")))

In [None]:
# Replace Missing Monthly Income Value with Median
median_income = df_clean.selectExpr("percentile_approx(MonthlyIncome, 0.5) as median").collect()[0]["median"]
df_clean = df_clean.withColumn("MonthlyIncome", when((col("MonthlyIncome").isNull()) | (col("MonthlyIncome") == ""), median_income).otherwise(col("MonthlyIncome")))

# No missing value after cleaning
display(df_clean.filter((df_clean["MonthlyIncome"].isNull()) | (df_clean["MonthlyIncome"] == "")))

In [None]:
# show the updated data
display(df_clean.filter((df["MonthlyIncome"].isNull()) | (df["MonthlyIncome"] == "")))

In [None]:
def median_imputation(data,columns_with_outliers_extremes,outlier_threshold,extreme_threshold):
    for column in columns_with_outliers_extremes:
        median_expr = expr(f"percentile_approx({column}, 0.5) as median")
        std_dev_expr = expr(f"stddev({column}) as std_dev")
        quartiles = data.select(median_expr, std_dev_expr).collect()[0]
        median = quartiles.median
        std_dev = quartiles.std_dev

        upper_limit_outlier = median + outlier_threshold * std_dev
        lower_limit_outlier = median - outlier_threshold * std_dev
    
        upper_limit_extreme = median + extreme_threshold * std_dev
        lower_limit_extreme = median - extreme_threshold * std_dev
    
        data = data.withColumn(column, when((col(column) > upper_limit_outlier) | (col(column) < lower_limit_outlier), median).otherwise(col(column)))
        data = data.withColumn(column, when((col(column) > upper_limit_extreme) | (col(column) < lower_limit_extreme), median).otherwise(col(column)))

    return data

columns_with_outliers_extremes = ['TotalWorkingYears', 'YearsAtCompany', 'YearsInCurrentRole', 'YearsSinceLastPromotion', 'YearsWithCurrManager']

# First time for median imputation
df_clean = median_imputation(df_clean,columns_with_outliers_extremes,outlier_threshold,extreme_threshold)

results_df = count_outliers_extremes_missing_values(df_clean,outlier_threshold,extreme_threshold)
display(results_df)

In [None]:
columns_with_outliers_extremes = ['TotalWorkingYears', 'YearsAtCompany', 'YearsSinceLastPromotion']

# Second time for median imputation
df_clean = median_imputation(df_clean,columns_with_outliers_extremes,outlier_threshold,extreme_threshold)

results_df = count_outliers_extremes_missing_values(df_clean,outlier_threshold,extreme_threshold)
display(results_df)

In [None]:
columns_with_outliers_extremes = ['YearsAtCompany']

# Third time for median imputation
df_clean = median_imputation(df_clean,columns_with_outliers_extremes,outlier_threshold,extreme_threshold)

results_df = count_outliers_extremes_missing_values(df_clean,outlier_threshold,extreme_threshold)
display(results_df)

In [None]:
columns_with_outliers_extremes = ['YearsAtCompany']

# Fourth time for median imputation
df_clean = median_imputation(df_clean,columns_with_outliers_extremes,outlier_threshold,extreme_threshold)

results_df = count_outliers_extremes_missing_values(df_clean,outlier_threshold,extreme_threshold)
display(results_df)

### Construct Data

In [None]:
# Age Group
age_splits = [0, 25, 35, 50, float("inf")]
age_bucketizer = Bucketizer(splits=age_splits, inputCol="Age", outputCol="AgeGroup")
df_construct = age_bucketizer.transform(df_clean)
df_construct = df_construct.withColumn("AgeRange",lit(""))
age_groups = ["Youth", "Adult", "Middle", "Senior"]
age_ranges = ["0-25", "26-35", "36-50", "50+"]
for i, group in enumerate(age_groups):
    age_range = age_ranges[i]
    df_construct = df_construct.withColumn("AgeGroup", when(col("AgeGroup") == i, group).otherwise(col("AgeGroup")))
    df_construct = df_construct.withColumn("AgeRange", when(col("AgeGroup") == group, age_range).otherwise(col("AgeRange")))

display(df_construct.select("AgeGroup", "AgeRange").distinct())
df_construct = df_construct.drop("AgeRange")

# Income Group
income_discretizer = QuantileDiscretizer(numBuckets=4, inputCol="MonthlyIncome", outputCol="IncomeGroup", relativeError=0.01)
income_discretizer_model = income_discretizer.fit(df_construct)
df_construct = income_discretizer_model.transform(df_construct)
income_splits = income_discretizer_model.getSplits()
df_construct = df_construct.withColumn("IncomeRange",lit(""))
income_groups = ["Below Average", "Average", "Above Average", "High"]
max_income = df_construct.agg(F.max("MonthlyIncome")).collect()[0][0]
income_ranges = [f"{0}-{income_splits[1]}",f"{income_splits[1]}-{income_splits[2]}",f"{income_splits[2]}-{income_splits[3]}", f"{income_splits[3]}-{max_income}"]
for i, group in enumerate(income_groups):
    income_range = income_ranges[i]
    df_construct = df_construct.withColumn("IncomeGroup", when(col("IncomeGroup") == i, group).otherwise(col("IncomeGroup")))
    df_construct = df_construct.withColumn("IncomeRange", when(col("IncomeGroup") == group, income_range).otherwise(col("IncomeRange")))

display(df_construct.select("IncomeGroup", "IncomeRange").distinct())
df_construct = df_construct.drop("IncomeRange")

display(df_construct)

### Format Data as Required

In [None]:
# convert categorical data to numeric data for computing statistics
columns_to_convert = ["EducationField", "Gender", "MaritalStatus","Attrition", "BusinessTravel", "Department", "JobRole", "OverTime", "AgeGroup", "IncomeGroup"]

indexers = [StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid="skip")
            for col_name in columns_to_convert]
stages = indexers

pipeline = Pipeline(stages=stages)

model = pipeline.fit(df_construct)

df_construct = model.transform(df_construct)

for col_name in columns_to_convert:
    df_construct = df_construct.drop(col_name)
    df_construct = df_construct.withColumnRenamed(col_name + "_index", col_name)

## Data Transformation

### Reduce Data

In [None]:
# First feature selection -- select the top-29 features based on the chi-squared test
df_transformation = df_construct

feature_columns = [col_name for col_name in df_transformation.columns if col_name != "Attrition"]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_transformation = assembler.transform(df_transformation)

pandas_df = df_transformation.select("Attrition", "features").toPandas()

X = pandas_df["features"]
y = pandas_df["Attrition"]

X = np.vstack(X.apply(lambda x: x.toArray()))

k_best = SelectKBest(score_func=chi2, k=29)
X_transform = k_best.fit_transform(X, y)

selected_indices = (-k_best.scores_).argsort()[:29]
selected_features = [feature_columns[i] for i in selected_indices]
selected_scores = k_best.scores_[selected_indices]
selected_rankings = range(1, 30)

selected_feature_info_df_first = pd.DataFrame({"Feature": selected_features,
                                              "Score": selected_scores,
                                              "Ranking": selected_rankings})

print("Selected Features Information:")
print(selected_feature_info_df_first)

In [None]:
df_transformation = df_transformation.drop("features")

In [None]:
columns_to_remove_transformation_first = ["AgeGroup", "IncomeGroup"]

for column in columns_to_remove_transformation_first:
    df_transformation = df_transformation.drop(column)

df_transformation.printSchema()

In [None]:
# balancing data
majority_class = df_transformation.filter(col("Attrition") == 0)
minority_class = df_transformation.filter(col("Attrition") == 1)

minority_count = minority_class.count()
majority_count = majority_class.count()
balance_ratio = int(majority_count / minority_count)

oversampled_minority_class = minority_class
for _ in range(balance_ratio - 1):
    oversampled_minority_class = oversampled_minority_class.union(minority_class)

df_balanced = majority_class.union(oversampled_minority_class)

display(df_balanced)

In [None]:
df_balanced.count()

In [None]:
distribution_data = df_balanced.groupBy('Attrition').count().toPandas()

sns.set(style='whitegrid')
plt.figure(figsize=(8, 6))
sns.barplot(data=distribution_data, x='Attrition', y='count', palette='Set2')
plt.xlabel('Attrition')
plt.ylabel('Count')
plt.title('Distribution of Attrition')
plt.show()

In [None]:
# Second feature selection
feature_columns = [col_name for col_name in df_balanced.columns if col_name != "Attrition"]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_balanced = assembler.transform(df_balanced)

pandas_df = df_balanced.select("Attrition", "features").toPandas()

X = pandas_df["features"]
y = pandas_df["Attrition"]

X = np.vstack(X.apply(lambda x: x.toArray()))

k_best = SelectKBest(score_func=chi2, k=27)
X_transform = k_best.fit_transform(X, y)

selected_indices = (-k_best.scores_).argsort()[:27]
selected_features = [feature_columns[i] for i in selected_indices]
selected_scores = k_best.scores_[selected_indices]
selected_rankings = range(1, 28)

selected_feature_info_df_first = pd.DataFrame({"Feature": selected_features,
                                              "Score": selected_scores,
                                              "Ranking": selected_rankings})

print("Selected Features Information:")
print(selected_feature_info_df_first)

In [None]:
columns_to_remove_transformation_second = ["PercentSalaryHike", "MaritalStatus", "PerformanceRating", "BusinessTravel"]

for column in columns_to_remove_transformation_second:
    df_balanced = df_balanced.drop(column)

df_balanced.printSchema()

In [None]:
df_balanced = df_balanced.drop("features")

### Project Data

In [None]:
# Transform of Monthly Income
monthly_income_col = 'MonthlyIncome'

df_balanced_log = df_balanced.withColumn("log_monthly_income", log(col(monthly_income_col)))

pandas_df = df_balanced_log.select(monthly_income_col, "log_monthly_income").toPandas()

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

ax1.hist(pandas_df[monthly_income_col], bins=20, color='b', alpha=0.7)
ax1.set_xlabel('MonthlyIncome')
ax1.set_ylabel('Frequency')
ax1.set_title('Histogram of MonthlyIncome')

ax2.hist(pandas_df['log_monthly_income'], bins=20, color='r', alpha=0.7)
ax2.set_xlabel('The Log of MonthlyIncome')
ax2.set_ylabel('Frequency')
ax2.set_title('Histogram of The Log of MonthlyIncome')

plt.tight_layout()
plt.show()

## Data-mining Algorithm(s) Selection

In [None]:
# feature selection
df_dma = df_balanced

feature_columns = [col_name for col_name in df_dma.columns if col_name != "Attrition"]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_dma = assembler.transform(df_dma)

pandas_df = df_dma.select("Attrition", "features").toPandas()

X = pandas_df["features"]
y = pandas_df["Attrition"]

X = np.vstack(X.apply(lambda x: x.toArray()))

k_best = SelectKBest(score_func=chi2, k=23)
X_transform = k_best.fit_transform(X, y)

selected_indices = (-k_best.scores_).argsort()[:23]
selected_features = [feature_columns[i] for i in selected_indices]
selected_scores = k_best.scores_[selected_indices]
selected_rankings = range(1, 24)

selected_feature_info_df_first = pd.DataFrame({"Feature": selected_features,
                                              "Score": selected_scores,
                                              "Ranking": selected_rankings})

print("Selected Features Information:")
print(selected_feature_info_df_first)

In [None]:
df_dma = df_dma.drop("features")

In [None]:
# save temporary dataset
df_dma.write.parquet(os.getcwd() + '/temporary_dataset/df_dma.parquet', mode="overwrite")

In [None]:
# df_dma = spark.read.parquet(os.getcwd() + '/temporary_dataset/df_dma.parquet')
# display(df_dma)

In [None]:
#CART
feature_columns = [col_name for col_name in df_dma.columns if col_name != "Attrition"]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_dma_model = assembler.transform(df_dma)

# Split the dataset into train and test
(train_data, test_data) = df_dma_model.randomSplit([0.7, 0.3], seed=42)

dt_classifier = DecisionTreeClassifier(labelCol="Attrition", featuresCol="features", seed=42)

model = dt_classifier.fit(train_data)

predictions = model.transform(test_data)

feature_importance = model.featureImportances

feature_importance_df = pd.DataFrame(list(zip(feature_columns, feature_importance.toArray())),columns=["Feature", "Importance"])

feature_importance_df = feature_importance_df.sort_values(by="Importance", ascending=False)
print("Ranked Feature Importance:")
print(feature_importance_df)

In [None]:
# evaluate the model

# Accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="Attrition", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Confusion Matrix
conf_matrix = predictions.groupBy("Attrition", "prediction").count()
print("Confusion Matrix:")
conf_matrix.show()

# Classification Report
evaluator = BinaryClassificationEvaluator(labelCol="Attrition", metricName="areaUnderROC")
area_under_roc = evaluator.evaluate(predictions)
print(f"Area under ROC: {area_under_roc}")

In [None]:
tree = model.toDebugString

dot = Digraph(format='png')
lines = tree.split('\n')

def parse_tree_lines(lines, parent_node=None):
    for line in lines:
        if line.strip():
            if "DecisionTreeClassificationModel" in line:
                node_id = "Root"
                node_label = "Root"
            else:
                parts = line.strip().split(" ")
                node_id = parts[0].strip()
                node_label = " ".join(parts[1:]).strip()
            
            dot.node(node_id, label=node_label)
            
            if parent_node is not None:
                dot.edge(parent_node, node_id)
            
            if "->" in node_label:
                child_node_info = node_label.split("->")[1].strip()
                parse_tree_lines([child_node_info], node_id)

parse_tree_lines(lines)

In [None]:
def parse_debug_string_lines(lines):
    block = []
    while lines:
        if lines[0].startswith('If'):
            bl = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
            block.append({'name': bl, 'children': parse_debug_string_lines(lines)})
            if lines[0].startswith('Else'):
                be = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
                block.append({'name': be, 'children': parse_debug_string_lines(lines)})
        elif not lines[0].startswith(('If', 'Else')):
            block2 = lines.pop(0)
            block.append({'name': block2})
        else:
            break
    
    return block

def debug_str_to_json(debug_string):
    data = []
    for line in debug_string.splitlines():
        if line.strip():
            line = line.strip()
            data.append(line)
        else:
            break
        if not line: break
    json = {'name': 'Root', 'children': parse_debug_string_lines(data[1:])}
    return json

In [None]:
dict_tree_json = debug_str_to_json(tree)
f_type_to_flist_dict = df_dma_model.schema['features'].metadata["ml_attr"]["attrs"]

f_index_to_name_dict = {}

for f_type, f_list in f_type_to_flist_dict.items():
    for f in f_list:
        f_index = f['idx']
        f_name = f['name']
        f_index_to_name_dict[f_index] = f_name

In [None]:
# Replace this with the JSON representation of the decision tree
json_tree = dict_tree_json

def json_to_tree(json_data, parent=None):
    node_name = json_data["name"]
    
    # Extract feature index from the beginning of the node name
    feature_index_match = re.match(r'^feature (\d+) ', node_name)
    if feature_index_match:
        feature_index = int(feature_index_match.group(1))
        if feature_index in f_index_to_name_dict:
            feature_name = f_index_to_name_dict[feature_index]
            node_name = node_name.replace(f"feature {feature_index}", feature_name)
    node = Node(node_name, parent=parent)
    if "children" in json_data:
        for child in json_data["children"]:
            json_to_tree(child, parent=node)
    return node

root = json_to_tree(json_tree)

# Visualize the tree
for pre, fill, node in RenderTree(root):
    print(f"{pre}{node.name}")

from anytree.exporter import UniqueDotExporter
UniqueDotExporter(root).to_picture(os.getcwd()+"/decision_tree_image/decision_tree.png")


In [None]:
df_dma = df_dma.drop("features")

In [None]:
# logistic Regression
assembler_lr = VectorAssembler(inputCols=feature_columns, outputCol="features_lr")
df_dma_model_lr = assembler_lr.transform(df_dma)
(train_data_lr, test_data_lr) = df_dma_model_lr.randomSplit([0.7, 0.3], seed=42)
lr = LogisticRegression(featuresCol="features_lr", labelCol="Attrition")
model_lr = lr.fit(train_data_lr)
predictions_lr = model_lr.transform(test_data_lr)

In [None]:
# evaluate the model

# Accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="Attrition", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_lr)
print(f"Accuracy: {accuracy}")

# Confusion Matrix
conf_matrix = predictions_lr.groupBy("Attrition", "prediction").count()
print("Confusion Matrix:")
conf_matrix.show()

# Classification Report
evaluator = BinaryClassificationEvaluator(labelCol="Attrition", metricName="areaUnderROC")
area_under_roc = evaluator.evaluate(predictions_lr)
print(f"Area under ROC: {area_under_roc}")

In [None]:
# ramdom forest
assembler_rf = VectorAssembler(inputCols=feature_columns, outputCol="features_rf")
df_dma_model_rf = assembler_rf.transform(df_dma)
(train_data_rf, test_data_rf) = df_dma_model_rf.randomSplit([0.7, 0.3], seed=42)
rf = RandomForestClassifier(featuresCol="features_rf", labelCol="Attrition", numTrees=100, seed=42)
model_rf = rf.fit(train_data_rf)
predictions_rf = model_rf.transform(test_data_rf)

In [None]:
# evaluate the model

# Accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="Attrition", metricName="accuracy")
accuracy = evaluator.evaluate(predictions_rf)
print(f"Accuracy: {accuracy}")

# Confusion Matrix
conf_matrix = predictions_rf.groupBy("Attrition", "prediction").count()
print("Confusion Matrix:")
conf_matrix.show()

# Classification Report
evaluator = BinaryClassificationEvaluator(labelCol="Attrition", metricName="areaUnderROC")
area_under_roc = evaluator.evaluate(predictions_rf)
print(f"Area under ROC: {area_under_roc}")

In [None]:
# convert categorical data to numeric data for computing statistics
columns_to_convert = ["EducationField", "Gender","Attrition", "Department", "JobRole", "OverTime"]

indexers = [StringIndexer(inputCol=col_name, outputCol=col_name + "_index", handleInvalid="skip")
            for col_name in columns_to_convert]
stages = indexers

pipeline = Pipeline(stages=stages)

model = pipeline.fit(df_dma)

df_dma_numeric = model.transform(df_dma)

for col_name in columns_to_convert:
    df_dma_numeric = df_dma_numeric.drop(col_name)
    df_dma_numeric = df_dma_numeric.withColumnRenamed(col_name + "_index", col_name)

In [None]:
# using feature selection algorithm and cross-validation for 
# determining the suitable number of the features in CART model
feature_columns = [col_name for col_name in df_dma_numeric.columns if col_name != "Attrition"]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

clf = DecisionTreeClassifier(labelCol="Attrition", featuresCol="features", seed=42)

selector = ChiSqSelector(featuresCol="features", outputCol="selectedFeatures", labelCol="Attrition")

pipeline = Pipeline(stages=[assembler, selector, clf])

paramGrid = ParamGridBuilder().addGrid(selector.numTopFeatures, [5, 10, 15, 20, 25]).build()

evaluator = MulticlassClassificationEvaluator(labelCol="Attrition", metricName="accuracy")

# Create cross-validator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5,
                          seed=42)

cvModel = crossval.fit(df_dma_numeric)

bestModel = cvModel.bestModel

selectedIndices = bestModel.stages[1].selectedFeatures

In [None]:
bestK = bestModel.stages[1].getNumTopFeatures()
print(f"Best Features (k={bestK}): {selectedIndices}")

avgMetrics = cvModel.avgMetrics
print(f"Average Metrics: {avgMetrics}")

In [None]:
best_feature_indices = [0, 5, 7, 10, 11]
selected_features = [df_dma_numeric.columns[i] for i in best_feature_indices]

print("Best Features Column Names:")
print(selected_features)

In [None]:
# feature selection
feature_columns = [col_name for col_name in df_dma.columns if col_name != "Attrition"]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_dma_select = assembler.transform(df_dma)

pandas_df = df_dma_select.select("Attrition", "features").toPandas()

X = pandas_df["features"]
y = pandas_df["Attrition"]

X = np.vstack(X.apply(lambda x: x.toArray()))

k_best = SelectKBest(score_func=chi2, k=23)
X_transform = k_best.fit_transform(X, y)

selected_indices = (-k_best.scores_).argsort()[:23]
selected_features = [feature_columns[i] for i in selected_indices]
selected_scores = k_best.scores_[selected_indices]
selected_rankings = range(1, 24)

selected_feature_info_df_first = pd.DataFrame({"Feature": selected_features,
                                              "Score": selected_scores,
                                              "Ranking": selected_rankings})

print("Selected Features Information:")
print(selected_feature_info_df_first)

In [None]:
columns_to_remove = ["JobInvolvement", "TrainingTimesLastYear", "JobRole", "YearsSinceLastPromotion", "RelationshipSatisfaction", "WorkLifeBalance","Gender", "Education"]

df_dma_remove = df_dma

for column in columns_to_remove:
    df_dma_remove = df_dma_remove.drop(column)

df_dma_remove.printSchema()

In [None]:
# save temporary dataset
df_dma_remove.write.parquet(os.getcwd() + '/temporary_dataset/df_dma_remove.parquet', mode="overwrite")

In [None]:
# df_dma_remove = spark.read.parquet(os.getcwd() + '/temporary_dataset/df_dma_remove.parquet')
# display(df_dma_remove)

## Data Mining

In [None]:
df_dm = df_dma_remove

feature_columns = [col_name for col_name in df_dm.columns if col_name != "Attrition"]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_dm_model = assembler.transform(df_dm)

# Split the dataset into train and test
(train_data, test_data) = df_dm_model.randomSplit([0.8, 0.2], seed=42)

dt_classifier = DecisionTreeClassifier(labelCol="Attrition", featuresCol="features", seed=42)

model = dt_classifier.fit(train_data)

predictions = model.transform(test_data)

feature_importance = model.featureImportances

feature_importance_df = pd.DataFrame(list(zip(feature_columns, feature_importance.toArray())),columns=["Feature", "Importance"])

feature_importance_df = feature_importance_df.sort_values(by="Importance", ascending=False)
print("Ranked Feature Importance:")
print(feature_importance_df)

In [None]:
columns_to_remove = ["YearsWithCurrManager"]

for column in columns_to_remove:
    df_dm = df_dm.drop(column)

df_dm.printSchema()

In [None]:
feature_columns = [col_name for col_name in df_dm.columns if col_name != "Attrition"]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_dm_model = assembler.transform(df_dm)

# Split the dataset into train and test
(train_data, test_data) = df_dm_model.randomSplit([0.8, 0.2], seed=42)

dt_classifier = DecisionTreeClassifier(labelCol="Attrition", featuresCol="features", seed=42)

model = dt_classifier.fit(train_data)

predictions = model.transform(test_data)

feature_importance = model.featureImportances

feature_importance_df = pd.DataFrame(list(zip(feature_columns, feature_importance.toArray())),columns=["Feature", "Importance"])

feature_importance_df = feature_importance_df.sort_values(by="Importance", ascending=False)
print("Ranked Feature Importance:")
print(feature_importance_df)

In [None]:
# evaluate the model

# Accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="Attrition", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

# Confusion Matrix
conf_matrix = predictions.groupBy("Attrition", "prediction").count()
print("Confusion Matrix:")
conf_matrix.show()

# Classification Report
evaluator = BinaryClassificationEvaluator(labelCol="Attrition", metricName="areaUnderROC")
area_under_roc = evaluator.evaluate(predictions)
print(f"Area under ROC: {area_under_roc}")

In [None]:
tree = model.toDebugString

dot = Digraph(format='png')
lines = tree.split('\n')

def parse_tree_lines(lines, parent_node=None):
    for line in lines:
        if line.strip():
            if "DecisionTreeClassificationModel" in line:
                node_id = "Root"
                node_label = "Root"
            else:
                parts = line.strip().split(" ")
                node_id = parts[0].strip()
                node_label = " ".join(parts[1:]).strip()
            
            dot.node(node_id, label=node_label)
            
            if parent_node is not None:
                dot.edge(parent_node, node_id)
            
            if "->" in node_label:
                child_node_info = node_label.split("->")[1].strip()
                parse_tree_lines([child_node_info], node_id)

parse_tree_lines(lines)

In [None]:
def parse_debug_string_lines(lines):
    block = []
    while lines:
        if lines[0].startswith('If'):
            bl = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
            block.append({'name': bl, 'children': parse_debug_string_lines(lines)})
            if lines[0].startswith('Else'):
                be = ' '.join(lines.pop(0).split()[1:]).replace('(', '').replace(')', '')
                block.append({'name': be, 'children': parse_debug_string_lines(lines)})
        elif not lines[0].startswith(('If', 'Else')):
            block2 = lines.pop(0)
            block.append({'name': block2})
        else:
            break
    
    return block

def debug_str_to_json(debug_string):
    data = []
    for line in debug_string.splitlines():
        if line.strip():
            line = line.strip()
            data.append(line)
        else:
            break
        if not line: break
    json = {'name': 'Root', 'children': parse_debug_string_lines(data[1:])}
    return json

In [None]:
dict_tree_json = debug_str_to_json(tree)
f_type_to_flist_dict = df_dm_model.schema['features'].metadata["ml_attr"]["attrs"]

f_index_to_name_dict = {}

for f_type, f_list in f_type_to_flist_dict.items():
    for f in f_list:
        f_index = f['idx']
        f_name = f['name']
        f_index_to_name_dict[f_index] = f_name

In [None]:
# Replace this with the JSON representation of the decision tree
json_tree = dict_tree_json

def json_to_tree(json_data, parent=None):
    node_name = json_data["name"]
    
    # Extract feature index from the beginning of the node name
    feature_index_match = re.match(r'^feature (\d+) ', node_name)
    if feature_index_match:
        feature_index = int(feature_index_match.group(1))
        if feature_index in f_index_to_name_dict:
            feature_name = f_index_to_name_dict[feature_index]
            node_name = node_name.replace(f"feature {feature_index}", feature_name)
    node = Node(node_name, parent=parent)
    if "children" in json_data:
        for child in json_data["children"]:
            json_to_tree(child, parent=node)
    return node

root = json_to_tree(json_tree)

# Visualize the tree
for pre, fill, node in RenderTree(root):
    print(f"{pre}{node.name}")

from anytree.exporter import UniqueDotExporter
UniqueDotExporter(root).to_picture(os.getcwd()+"/decision_tree_image/decision_tree_1.png")

In [None]:
# Histogram of Monthly Income for Attrition

attrition_yes = df_dm.filter(df_dm['Attrition'] == 1)
attrition_no = df_dm.filter(df_dm['Attrition'] == 0)

attrition_yes_pd = attrition_yes.select("MonthlyIncome").toPandas()
attrition_no_pd = attrition_no.select("MonthlyIncome").toPandas()

bins = list(range(0, 20000, 500))

fig, axes = plt.subplots(1, 2, figsize=(12, 5))

sns.histplot(attrition_yes_pd, x="MonthlyIncome", bins=bins, kde=True, ax=axes[0], color='red')
axes[0].set_title('Monthly Income - Attrition: Yes')
axes[0].set_ylim([0, 300])

sns.histplot(attrition_no_pd, x="MonthlyIncome", bins=bins, kde=True, ax=axes[1], color='blue')
axes[1].set_title('Monthly Income - Attrition: No')
axes[1].set_ylim([0, 300])

plt.tight_layout()
plt.show()

In [None]:
# Histogram of Years At Company for Attrition

attrition_yes_pandas = attrition_yes.select("YearsAtCompany").toPandas()
attrition_no_pandas = attrition_no.select("YearsAtCompany").toPandas()

fig, axes = plt.subplots(1, 2, figsize=(12, 5))

sns.histplot(attrition_yes_pandas, x="YearsAtCompany", kde=True, ax=axes[0], color='red')
axes[0].set_title('Years At Company - Attrition: Yes')

sns.histplot(attrition_no_pandas, x="YearsAtCompany", kde=True, ax=axes[1], color='blue')
axes[1].set_title('Years At Company - Attrition: No')

plt.tight_layout()
plt.show()

In [None]:
# Histogram of Stock Option Level for Attrition
attrition_yes_pandas = attrition_yes.select("StockOptionLevel").toPandas()
attrition_no_pandas = attrition_no.select("StockOptionLevel").toPandas()

# Create subplots
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

# Plot histograms using Seaborn
sns.histplot(attrition_yes_pandas, x="StockOptionLevel", ax=axes[0], color='red')
axes[0].set_title('Stock Option Level - Attrition: Yes')

sns.histplot(attrition_no_pandas, x="StockOptionLevel", ax=axes[1], color='blue')
axes[1].set_title('Stock Option Level - Attrition: No')

plt.tight_layout()
plt.show()

In [None]:
# ROC Curve

train_preds = model.transform(train_data).select("Attrition", "probability")
test_preds = model.transform(test_data).select("Attrition", "probability")

train_probs = train_preds.rdd.map(lambda row: (float(row["probability"][1]), float(row["Attrition"])))
test_probs = test_preds.rdd.map(lambda row: (float(row["probability"][1]), float(row["Attrition"])))

fpr_train, tpr_train, _ = roc_curve(train_probs.map(lambda x: x[1]).collect(), train_probs.map(lambda x: x[0]).collect())
roc_auc_train = auc(fpr_train, tpr_train)

fpr_test, tpr_test, _ = roc_curve(test_probs.map(lambda x: x[1]).collect(), test_probs.map(lambda x: x[0]).collect())
roc_auc_test = auc(fpr_test, tpr_test)

# Create ROC curves plot
plt.figure(figsize=(12, 6))

plt.subplot(1, 2, 1)
plt.plot(fpr_train, tpr_train, color='darkorange', lw=2, label='Train ROC curve (area = %0.2f)' % roc_auc_train)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve for CART Model (Training Data)')
plt.legend(loc='lower right')

plt.subplot(1, 2, 2)
plt.plot(fpr_test, tpr_test, color='darkorange', lw=2, label='Test ROC curve (area = %0.2f)' % roc_auc_test)
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve for CART Model (Testing Data)')
plt.legend(loc='lower right')

plt.tight_layout()
plt.show()

In [None]:
from sklearn.metrics import accuracy_score

In [None]:
# Bagging
feature_columns = [col_name for col_name in df_dm.columns if col_name != "Attrition"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
df_dm_model = assembler.transform(df_dm)

(train_data, test_data) = df_dm_model.randomSplit([0.8, 0.2], seed=42)

num_trees = 10

tree_models = []

for i in range(num_trees):
    dt_classifier = DecisionTreeClassifier(labelCol="Attrition", featuresCol="features", seed=42 + i)
    tree_model = dt_classifier.fit(train_data)
    tree_models.append(tree_model)

predictions = [model.transform(test_data) for model in tree_models]

def majority_vote(row):
    ones = sum(row)
    return 1 if ones >= num_trees / 2 else 0

ensemble_predictions = [prediction.select("prediction").rdd.map(lambda row: row[0]).collect() for prediction in predictions]
final_predictions = [majority_vote(row) for row in zip(*ensemble_predictions)]

true_labels = test_data.select("Attrition").rdd.map(lambda row: row[0]).collect()
accuracy = accuracy_score(true_labels, final_predictions)
print("Accuracy of the Bagging Ensemble:", accuracy)