In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [50]:
spark = SparkSession.builder.appName("Employee Data Cleaning").getOrCreate()
file_path = "emp_data.csv" 
df = spark.read.csv(file_path, header=True, inferSchema=True)
df.printSchema()
df.show()



root
 |-- EmpID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- ExitDate: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Supervisor: string (nullable = true)
 |-- BusinessUnit: string (nullable = true)
 |-- EmployeeStatus: string (nullable = true)
 |-- EmployeeType: string (nullable = true)
 |-- PayZone: string (nullable = true)
 |-- EmployeeClassificationType: string (nullable = true)
 |-- TerminationType: string (nullable = true)
 |-- DepartmentType: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- State: string (nullable = true)
 |-- JobFunctionDescription: string (nullable = true)
 |-- GenderCode: string (nullable = true)
 |-- LocationCode: integer (nullable = true)
 |-- RaceDesc: string (nullable = true)
 |-- MaritalDesc: string (nullable = true)
 |-- Performance Score: string (nullable = true)
 |-- Cu

In [51]:
# 1. Handle Missing Values
df.select([count(when(isnull(c) | isnan(c), c)).alias(c) for c in df.columns]).show()



# Drop rows with missing values in essential columns
df = df.dropna(subset=["EmpID", "StartDate"])


+-----+---------+--------+---------+--------+-----+----------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------+---+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName|LastName|StartDate|ExitDate|Title|Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|DepartmentType|Division|DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+--------+---------+--------+-----+----------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------+---+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|    0|        0|       0|        0|     263|    0|         0|           0|             0|           0|

In [52]:
# 2. Remove Outliers
# Cap Current Employee Rating between 1 and 5
df = df.withColumn("Current Employee Rating", 
                   when(col("Current Employee Rating") < 1, 1)
                   .when(col("Current Employee Rating") > 5, 5)
                   .otherwise(col("Current Employee Rating")))

# Investigate LocationCode for unrealistic values
df.select("LocationCode").distinct().show()

+------------+
|LocationCode|
+------------+
|        2122|
|       97413|
|       80424|
|        1460|
|       75321|
|       49449|
|       56687|
|       78046|
|       44553|
|        9454|
|       57754|
|        8779|
|       29811|
|       65321|
|       39859|
|       49914|
|       74388|
|       34011|
|       60107|
|       31207|
+------------+
only showing top 20 rows



In [53]:
df.show()

+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName| LastName|StartDate| ExitDate|             Title|      Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+--------

In [54]:
# 3. Remove Duplicates
df = df.dropDuplicates()

In [55]:
df.count()

552

In [56]:
df.printSchema()

root
 |-- EmpID: integer (nullable = true)
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- StartDate: string (nullable = true)
 |-- ExitDate: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Supervisor: string (nullable = true)
 |-- BusinessUnit: string (nullable = true)
 |-- EmployeeStatus: string (nullable = true)
 |-- EmployeeType: string (nullable = true)
 |-- PayZone: string (nullable = true)
 |-- EmployeeClassificationType: string (nullable = true)
 |-- TerminationType: string (nullable = true)
 |-- DepartmentType: string (nullable = true)
 |-- Division: string (nullable = true)
 |-- DOB: string (nullable = true)
 |-- State: string (nullable = true)
 |-- JobFunctionDescription: string (nullable = true)
 |-- GenderCode: string (nullable = true)
 |-- LocationCode: integer (nullable = true)
 |-- RaceDesc: string (nullable = true)
 |-- MaritalDesc: string (nullable = true)
 |-- Performance Score: string (nullable = true)
 |-- Cu

In [57]:
# 4. Count the number of employees in each department for every designation
department_designation_count = df.groupBy(trim("DepartmentType"),trim( "Title")).count()
department_designation_count.show()

+--------------------+--------------------+-----+
|trim(DepartmentType)|         trim(Title)|count|
+--------------------+--------------------+-----+
|               IT/IS|Principal Data Ar...|    8|
|               IT/IS|        Data Analyst|   11|
|          Production|Production Techni...|   61|
|               IT/IS|Sr. Network Engineer|   30|
|          Production|    Network Engineer|    1|
|    Executive Office|    Network Engineer|   18|
|               IT/IS|    Network Engineer|   22|
|          Production|Production Techni...|    1|
|          Production|Production Techni...|  190|
|               IT/IS|      Data Architect|    2|
|               IT/IS|  Area Sales Manager|    7|
|               Sales|  Area Sales Manager|  119|
|               IT/IS|Enterprise Architect|    5|
|               IT/IS|          IT Support|   44|
|               Sales| Area Sales Manager?|    1|
|               Sales| Area Sales Manager.|    1|
|               IT/IS|Software Engineer...|    1|


In [58]:
# 5. For each department, find the employee with the highest performance score
highest_performance = df.groupBy(trim("DepartmentType")).agg(max("Current Employee Rating").alias("MaxPerformanceScore"))
highest_performance.show()

+--------------------+-------------------+
|trim(DepartmentType)|MaxPerformanceScore|
+--------------------+-------------------+
|               Sales|                  5|
|          Production|                  5|
|    Executive Office|                  3|
|Software Engineering|                  3|
|               IT/IS|                  4|
+--------------------+-------------------+



In [59]:
from pyspark.sql.functions import to_date, date_format
df = df.withColumn("FormattedDate", date_format(to_date("StartDate", "dd-MMM-yy"), "dd/MM/yyyy"))
df.show()

+-----+---------+--------------+---------+---------+--------------------+--------------------+------------+----------------+------------+-------+--------------------------+---------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+-------------+
|EmpID|FirstName|      LastName|StartDate| ExitDate|               Title|          Supervisor|BusinessUnit|  EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|FormattedDate|
+-----+---------+--------------+---------+---------+--------------------+--------------------+------------+----------------+------------+-------+--------------------------+---------------+-----------------+--------------------+----------+-----+------------------

In [60]:
pandas_df = df.toPandas()
pandas_df["FormattedDate"] = pandas_df["FormattedDate"].astype(str)
pandas_df.to_csv(output_path, index=False)

In [61]:
df.show()

+-----+---------+--------------+---------+---------+--------------------+--------------------+------------+----------------+------------+-------+--------------------------+---------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+-------------+
|EmpID|FirstName|      LastName|StartDate| ExitDate|               Title|          Supervisor|BusinessUnit|  EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|FormattedDate|
+-----+---------+--------------+---------+---------+--------------------+--------------------+------------+----------------+------------+-------+--------------------------+---------------+-----------------+--------------------+----------+-----+------------------