In [1]:
pip install distutils

Note: you may need to restart the kernel to use updated packages.


ERROR: Could not find a version that satisfies the requirement distutils (from versions: none)
ERROR: No matching distribution found for distutils


In [1]:
from pyspark.sql import SparkSession

In [2]:
# Initialize Spark session
spark=SparkSession.builder.appName('Employee_Analysis').getOrCreate()

In [3]:
# Read the Employee, department tables into a DataFrame
employee_df=spark.read.csv('Datasets/Employee.csv', header=True, inferSchema=True)
department_df=spark.read.csv('Datasets/Department.csv', header=True, inferSchema=True)

In [4]:
# View top rows from tables
employee_df.show(5)
department_df.show(5)

+----------+---------+--------+------------+------+-----------+
|EmployeeID|FirstName|LastName|DepartmentID|Salary|JoiningDate|
+----------+---------+--------+------------+------+-----------+
|         1|     John|     Doe|         101| 60000|  1/15/2022|
|         2|    Alice|   Smith|         102| 65000|  2/20/2022|
|         3|  Michael| Johnson|         101| 58000|  3/10/2022|
|         4|    Emily|   Brown|         103| 70000|   4/5/2022|
|         5|    David|     Lee|         102| 62000|  5/12/2022|
+----------+---------+--------+------------+------+-----------+
only showing top 5 rows

+------------+--------------+---------+
|DepartmentID|DepartmentName|ManagerID|
+------------+--------------+---------+
|         101|         Sales|        1|
|         102|     Marketing|        2|
|         103|       Finance|        4|
|         104|            HR|     NULL|
|         105|    Operations|        7|
+------------+--------------+---------+
only showing top 5 rows



In [5]:
# Register the DataFrame as a temporary view
employee_df.createOrReplaceTempView("employee_table")
department_df.createOrReplaceTempView("department_table")

In [6]:
# Check for duplicate records
duplicate_records_df = spark.sql("""
    SELECT EmployeeID, FirstName, LastName, DepartmentID,Salary, JoiningDate, COUNT(*) AS duplicate_count
    FROM employee_table
    GROUP BY EmployeeID, FirstName, LastName, DepartmentID,Salary, JoiningDate
    HAVING COUNT(*) > 1
""")

# Show the duplicate records
duplicate_records_df.show()


+----------+---------+----------+------------+------+-----------+---------------+
|EmployeeID|FirstName|  LastName|DepartmentID|Salary|JoiningDate|duplicate_count|
+----------+---------+----------+------------+------+-----------+---------------+
|         3|  Michael|   Johnson|         101| 58000|       NULL|              2|
|        22|    Megan|     Scott|         101| 63000| 10/10/2023|              3|
|        19|    Kevin|     White|         102| 68000|  7/25/2023|              3|
|         9|   Robert|    Taylor|         104| 68000|   9/5/2022|              3|
|        34|   Hannah|   Collins|         101| 66000| 10/10/2024|              3|
|        28|   Alexis|    Turner|         103| 76000|  4/10/2024|              3|
|        23|  Brandon|     Green|         102| 69000| 11/15/2023|              3|
|        38| Danielle|    Howard|         101| 67000|       NULL|              2|
|        44|    Sarah|    Barnes|         103| 80000|  8/30/2025|              3|
|        30|   O

In [7]:
# Remove duplicates 
cleaned_employee_df = employee_df.dropDuplicates(['EmployeeID','FirstName', 'LastName', 'DepartmentID', 'JoiningDate'])

In [8]:
# Register the cleaned DataFrame as a temporary view
cleaned_employee_df.createOrReplaceTempView("cleaned_employee_view")

# Check for duplicates in the cleaned DataFrame
duplicates_still_exist_df = spark.sql("""
    SELECT FirstName, LastName, DepartmentID, JoiningDate, COUNT(*) AS duplicate_count
    FROM cleaned_employee_view
    GROUP BY FirstName, LastName, DepartmentID, JoiningDate
    HAVING COUNT(*) > 1
""")

# Show any remaining duplicates
duplicates_still_exist_df.show()

+---------+--------+------------+-----------+---------------+
|FirstName|LastName|DepartmentID|JoiningDate|duplicate_count|
+---------+--------+------------+-----------+---------------+
|  William|   Kelly|         102|   3/5/2025|              3|
|  Zachary|  Parker|         102|  7/25/2024|              3|
+---------+--------+------------+-----------+---------------+



We still have duplicates with the firstNames William and Zachary, so check which columns have duplicate values for these two employees

In [9]:
# Check for duplicates in the cleaned DataFrame
duplicates_still_exist_df = spark.sql("""
    SELECT *
    FROM cleaned_employee_view
    Where FirstName LIKE '%William%' OR FirstName LIKE '%Zachary%'
""")

# Show any remaining duplicates
duplicates_still_exist_df.show()

+----------+---------+--------+------------+------+-----------+
|EmployeeID|FirstName|LastName|DepartmentID|Salary|JoiningDate|
+----------+---------+--------+------------+------+-----------+
|        39|  William|   Kelly|         102| 73000|   3/5/2025|
|        89|  William|   Kelly|         102| 73000|   3/5/2025|
|       139|  William|   Kelly|         102| 73000|   3/5/2025|
|        31|  Zachary|  Parker|         102| 71000|  7/25/2024|
|        81|  Zachary|  Parker|         102| 71000|  7/25/2024|
|       131|  Zachary|  Parker|         102| 71000|  7/25/2024|
+----------+---------+--------+------------+------+-----------+



As we can see these employees have different employee ids, we will keep the row with lowest employeeID and remove the remaining rows.

In [10]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

In [11]:
# Define a window specification partitioned by FirstName and LastName and ordered by EmployeeID
window_spec = Window.partitionBy("FirstName", "LastName").orderBy("EmployeeID")

# Add a row number column to each row based on the window specification
cleaned_employee_df = cleaned_employee_df.withColumn("row_number", row_number().over(window_spec))

# Keep only the rows with row_number = 1, which corresponds to the lowest EmployeeID for each unique FirstName and LastName combination
deduplicated_employee_df = cleaned_employee_df.filter("row_number = 1").drop("row_number")

# Show the deduplicated DataFrame
deduplicated_employee_df.show(5)

+----------+---------+--------+------------+------+-----------+
|EmployeeID|FirstName|LastName|DepartmentID|Salary|JoiningDate|
+----------+---------+--------+------------+------+-----------+
|        49|     Adam|  Cooper|         104| 78000|  1/25/2026|
|        28|   Alexis|  Turner|         103| 76000|  4/10/2024|
|         2|    Alice|   Smith|         102| 65000|  2/20/2022|
|        16|   Amanda|Anderson|         103| 73000|  4/10/2023|
|        33|   Andrew| Edwards|         104| 74000|   9/5/2024|
+----------+---------+--------+------------+------+-----------+
only showing top 5 rows



In [12]:
# Register the cleaned DataFrame as a temporary view
deduplicated_employee_df.createOrReplaceTempView("deduplicated_employee_view")

In [13]:
# Check for duplicates in the cleaned DataFrame where FirstName contains 'William' or 'Zachary'
duplicates_check_df = spark.sql("""
    SELECT FirstName, LastName, DepartmentID, Salary, JoiningDate, COUNT(*) AS duplicate_count
    FROM deduplicated_employee_view
    GROUP BY FirstName, LastName, DepartmentID, Salary, JoiningDate
    HAVING COUNT(*) > 1
""")

# Show any remaining duplicates
duplicates_check_df.show()

+---------+--------+------------+------+-----------+---------------+
|FirstName|LastName|DepartmentID|Salary|JoiningDate|duplicate_count|
+---------+--------+------------+------+-----------+---------------+
+---------+--------+------------+------+-----------+---------------+



In [14]:
# Check for duplicates in the cleaned DataFrame where FirstName contains 'William' or 'Zachary'
deduplicated_employee_df = spark.sql("""
    SELECT *
    FROM deduplicated_employee_view
""")

# Show any remaining duplicates
deduplicated_employee_df.show()

+----------+-----------+---------+------------+------+-----------+
|EmployeeID|  FirstName| LastName|DepartmentID|Salary|JoiningDate|
+----------+-----------+---------+------------+------+-----------+
|        49|       Adam|   Cooper|         104| 78000|  1/25/2026|
|        28|     Alexis|   Turner|         103| 76000|  4/10/2024|
|         2|      Alice|    Smith|         102| 65000|  2/20/2022|
|        16|     Amanda| Anderson|         103| 73000|  4/10/2023|
|        33|     Andrew|  Edwards|         104| 74000|   9/5/2024|
|        14|     Ashley|    Lopez|         101| 61000|  2/28/2023|
|        29|   Benjamin| Phillips|         104| 73000|  5/15/2024|
|        23|    Brandon|    Green|         102| 69000| 11/15/2023|
|        17|      Brian|    Moore|         104| 70000|  5/15/2023|
|        13|Christopher|Hernandez|         104|  NULL|  1/25/2023|
|        11|     Daniel| Anderson|         102| 66000| 11/15/2022|
|        38|   Danielle|   Howard|         101| 67000|  2/28/2

In [15]:
# Check for duplicates in the cleaned DataFrame where FirstName contains 'William' or 'Zachary'
null_employee_df = spark.sql("""
    SELECT *
    FROM deduplicated_employee_view
    Where EmployeeID='NULL' or FirstName ='NULL' or LastName ='NULL' or DepartmentID='NULL' or Salary='NULL' or JoiningDate ='NULL'
""")

# Show any remaining duplicates
null_employee_df.show()


+----------+-----------+----------+------------+------+-----------+
|EmployeeID|  FirstName|  LastName|DepartmentID|Salary|JoiningDate|
+----------+-----------+----------+------------+------+-----------+
|        13|Christopher| Hernandez|         104|  NULL|  1/25/2023|
|        42|      Kayla|    Murphy|         101|  NULL|  6/20/2025|
|        48|      Laura|    Brooks|         103| 81000|       NULL|
|        52|     Lauren|      NULL|         101| 65000|  2/20/2022|
|        15|    Matthew|    Wilson|         102| 67000|       NULL|
|        51|       NULL|    Parker|         105| 60000|  1/15/2022|
|        43|    Timothy|Richardson|         102| 74000|       NULL|
+----------+-----------+----------+------------+------+-----------+



In [18]:
from pyspark.sql.functions import avg,when

# Replace "NULL" with the average salary
clean_employee_df = deduplicated_employee_df.withColumn("Salary", 
                    when(col("Salary") == "NULL", average_salary).otherwise(col("Salary")))

# Show the DataFrame with "NULL" values replaced by the average salary
clean_employee_df.show()


NameError: name 'col' is not defined

In [None]:
from pyspark.sql.functions import avg

# Compute the average salary
average_salary = deduplicated_employee_df.select(avg(col("Salary"))).collect()[0][0]

# Fill NULL values in the DataFrame with the average salary
deduplicated_employee_df = deduplicated_employee_df.fillna(average_salary, subset=["Salary"])

# Show the DataFrame with NULL values replaced by the average salary
deduplicated_employee_df.show()

+----------+-----------+----------+------------+------+-----------+
|EmployeeID|  FirstName|  LastName|DepartmentID|Salary|JoiningDate|
+----------+-----------+----------+------------+------+-----------+
|        13|Christopher| Hernandez|         104|  NULL|  1/25/2023|
|        42|      Kayla|    Murphy|         101|  NULL|  6/20/2025|
|        48|      Laura|    Brooks|         103| 81000|       NULL|
|        15|    Matthew|    Wilson|         102| 67000|       NULL|
|        51|       NULL|    Parker|         105| 60000|  1/15/2022|
|        43|    Timothy|Richardson|         102| 74000|       NULL|
+----------+-----------+----------+------------+------+-----------+

