In [1]:
from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import Row
from pyspark.sql.functions import col,isnan, when, count
import pyspark.sql.functions as F
from pyspark.ml.feature import *
import warnings
warnings.filterwarnings("ignore")
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext

In [3]:
#Machine Learning Data
inp="mongodb://127.0.0.1/Project.mldata"
outp="mongodb://127.0.0.1/Project.mldata"

In [4]:
#Visualization Data
inp="mongodb://127.0.0.1/Project.OriData"
outp="mongodb://127.0.0.1/Project.OriData"

In [5]:
spark=SparkSession\
        .builder\
        .appName("EmployeeAttrition")\
        .config("spark.mongodb.input.uri",inp)\
        .config("spark.mongodb.output.uri",outp)\
        .config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:2.4.2")\
        .getOrCreate()

In [6]:
url = "https://raw.githubusercontent.com/some1damaged/employee_attrition/main/HR_Employee_Attrition_Dataset.csv"
spark.sparkContext.addFile(url)

In [7]:
rawdf = spark.read.option("header" ,"true").option("inferSchema",True).csv(SparkFiles.get("HR_Employee_Attrition_Dataset.csv"))
rawdf.show(5)

+--------------+---------+---+-----------------+---------+--------------------+----------------+---------+--------------+-----------------------+------+----------+--------------+--------+--------------------+---------------+-------------+-------------+-----------+------------------+------+--------+-----------------+-----------------+------------------------+-------------+----------------+-----------------+---------------------+---------------+--------------+------------------+-----------------------+--------------------+
|EmployeeNumber|Attrition|Age|   BusinessTravel|DailyRate|          Department|DistanceFromHome|Education|EducationField|EnvironmentSatisfaction|Gender|HourlyRate|JobInvolvement|JobLevel|             JobRole|JobSatisfaction|MaritalStatus|MonthlyIncome|MonthlyRate|NumCompaniesWorked|Over18|OverTime|PercentSalaryHike|PerformanceRating|RelationshipSatisfaction|StandardHours|StockOptionLevel|TotalWorkingYears|TrainingTimesLastYear|WorkLifeBalance|YearsAtCompany|YearsInCurr

In [8]:
rawdf.printSchema()

root
 |-- EmployeeNumber: integer (nullable = true)
 |-- Attrition: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- BusinessTravel: string (nullable = true)
 |-- DailyRate: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- DistanceFromHome: integer (nullable = true)
 |-- Education: integer (nullable = true)
 |-- EducationField: string (nullable = true)
 |-- EnvironmentSatisfaction: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- HourlyRate: integer (nullable = true)
 |-- JobInvolvement: integer (nullable = true)
 |-- JobLevel: integer (nullable = true)
 |-- JobRole: string (nullable = true)
 |-- JobSatisfaction: integer (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- MonthlyIncome: integer (nullable = true)
 |-- MonthlyRate: integer (nullable = true)
 |-- NumCompaniesWorked: integer (nullable = true)
 |-- Over18: string (nullable = true)
 |-- OverTime: string (nullable = true)
 |-- PercentSalaryHike: inte

In [9]:
#Checking for null values
rawdf.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in rawdf.columns]).show()

+--------------+---------+---+--------------+---------+----------+----------------+---------+--------------+-----------------------+------+----------+--------------+--------+-------+---------------+-------------+-------------+-----------+------------------+------+--------+-----------------+-----------------+------------------------+-------------+----------------+-----------------+---------------------+---------------+--------------+------------------+-----------------------+--------------------+
|EmployeeNumber|Attrition|Age|BusinessTravel|DailyRate|Department|DistanceFromHome|Education|EducationField|EnvironmentSatisfaction|Gender|HourlyRate|JobInvolvement|JobLevel|JobRole|JobSatisfaction|MaritalStatus|MonthlyIncome|MonthlyRate|NumCompaniesWorked|Over18|OverTime|PercentSalaryHike|PerformanceRating|RelationshipSatisfaction|StandardHours|StockOptionLevel|TotalWorkingYears|TrainingTimesLastYear|WorkLifeBalance|YearsAtCompany|YearsInCurrentRole|YearsSinceLastPromotion|YearsWithCurrManager

In [10]:
rawdropdf=rawdf.drop("EmployeeNumber","Over18","StandardHours")

In [11]:
rawdropdf.show(1)

+---------+---+--------------+---------+----------+----------------+---------+--------------+-----------------------+------+----------+--------------+--------+---------------+---------------+-------------+-------------+-----------+------------------+--------+-----------------+-----------------+------------------------+----------------+-----------------+---------------------+---------------+--------------+------------------+-----------------------+--------------------+
|Attrition|Age|BusinessTravel|DailyRate|Department|DistanceFromHome|Education|EducationField|EnvironmentSatisfaction|Gender|HourlyRate|JobInvolvement|JobLevel|        JobRole|JobSatisfaction|MaritalStatus|MonthlyIncome|MonthlyRate|NumCompaniesWorked|OverTime|PercentSalaryHike|PerformanceRating|RelationshipSatisfaction|StockOptionLevel|TotalWorkingYears|TrainingTimesLastYear|WorkLifeBalance|YearsAtCompany|YearsInCurrentRole|YearsSinceLastPromotion|YearsWithCurrManager|
+---------+---+--------------+---------+----------+---

## Dealing with categorical columns

In [14]:
#Fetching distinct values
attrition_type = rawdropdf.select('Attrition').distinct().rdd.flatMap(lambda x: x).collect()
businessTravel_type = rawdropdf.select('BusinessTravel').distinct().rdd.flatMap(lambda x: x).collect()
department_type = rawdropdf.select('Department').distinct().rdd.flatMap(lambda x: x).collect()
educationField_type = rawdropdf.select('EducationField').distinct().rdd.flatMap(lambda x: x).collect()
gender_type = rawdropdf.select('Gender').distinct().rdd.flatMap(lambda x: x).collect()
jobRole_type = rawdropdf.select('JobRole').distinct().rdd.flatMap(lambda x: x).collect()
maritalStatus_type = rawdropdf.select('MaritalStatus').distinct().rdd.flatMap(lambda x: x).collect()
overTime_type = rawdropdf.select('OverTime').distinct().rdd.flatMap(lambda x: x).collect()

#Filtering using list comprehension
types_expr = [F.when(F.col('Attrition') == attr, 1).otherwise(0).alias("Attrition_" + attr) for attr in attrition_type]
businesstypes_expr = [F.when(F.col('BusinessTravel') == travel, 1).otherwise(0).alias("BusinessTravel_" + travel) for travel in businessTravel_type]
department_expr = [F.when(F.col('Department') == dept, 1).otherwise(0).alias("Department_" + dept) for dept in department_type]
educationField_expr = [F.when(F.col('EducationField') == field, 1).otherwise(0).alias("EducationField_" + field) for field in educationField_type]
gender_expr = [F.when(F.col('Gender') == gen, 1).otherwise(0).alias("Gender_" + gen) for gen in gender_type]
jobRole_expr = [F.when(F.col('JobRole') == job, 1).otherwise(0).alias("JobRole_" + job) for job in jobRole_type]
maritalStatus_expr = [F.when(F.col('MaritalStatus') == status, 1).otherwise(0).alias("MaritalStatus_" + status) for status in maritalStatus_type]
overTime_expr = [F.when(F.col('OverTime') == over, 1).otherwise(0).alias("OverTime_" + over) for over in overTime_type]

ml_df=rawdropdf.select('*',*types_expr+businesstypes_expr+department_expr+educationField_expr+gender_expr+jobRole_expr+maritalStatus_expr+overTime_expr)

In [15]:
mlnew_df = ml_df.drop('OverTime_No','Attrition_No','Gender_Male','Attrition','BusinessTravel','Department','EducationField','Gender','JobRole','MaritalStatus','OverTime')

In [16]:
mlnew_df.show(1)

+---+---------+----------------+---------+-----------------------+----------+--------------+--------+---------------+-------------+-----------+------------------+-----------------+-----------------+------------------------+----------------+-----------------+---------------------+---------------+--------------+------------------+-----------------------+--------------------+-------------+--------------------------------+-------------------------+----------------------------+----------------+---------------------------------+--------------------------+-------------------------------+--------------------+------------------------+----------------------+----------------------------+------------------------------+-------------+-----------------------+------------------------------+-----------------------------+----------------------------+---------------------------------+--------------------------+---------------+-------------------------+-----------------------+---------------------+-------

In [17]:
mlnew_df.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- DailyRate: integer (nullable = true)
 |-- DistanceFromHome: integer (nullable = true)
 |-- Education: integer (nullable = true)
 |-- EnvironmentSatisfaction: integer (nullable = true)
 |-- HourlyRate: integer (nullable = true)
 |-- JobInvolvement: integer (nullable = true)
 |-- JobLevel: integer (nullable = true)
 |-- JobSatisfaction: integer (nullable = true)
 |-- MonthlyIncome: integer (nullable = true)
 |-- MonthlyRate: integer (nullable = true)
 |-- NumCompaniesWorked: integer (nullable = true)
 |-- PercentSalaryHike: integer (nullable = true)
 |-- PerformanceRating: integer (nullable = true)
 |-- RelationshipSatisfaction: integer (nullable = true)
 |-- StockOptionLevel: integer (nullable = true)
 |-- TotalWorkingYears: integer (nullable = true)
 |-- TrainingTimesLastYear: integer (nullable = true)
 |-- WorkLifeBalance: integer (nullable = true)
 |-- YearsAtCompany: integer (nullable = true)
 |-- YearsInCurrentRole: integer (nullable = 

# Loading data into mongodb

In [18]:
rawdropdf.write.format("com.mongodb.spark.sql.DefaultSource").option("database","Project").option("collection", "OriData").save()

In [19]:
mlnew_df.write.format("com.mongodb.spark.sql.DefaultSource").option("database","Project").option("collection", "mldata").save()