q.1


In [48]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [49]:
spark=SparkSession.builder.appName("Question1").getOrCreate()

In [50]:
df = spark.read.csv("emp_data.csv", header=True)
df.show()

+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName| LastName|StartDate| ExitDate|             Title|      Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+--------

In [51]:
print("Columns with missing values:")
# Print column names with missing values
for column in df.columns:
  if df.filter(col(column).isNull()).count() > 0:
    print(f"{column} has missing values")

Columns with missing values:
ExitDate has missing values


In [52]:
# Replace missing values in the "LastName" column with "Unknown"
df_replaced = df.withColumn("LastName", when(col("LastName").isNull(), "Unknown").otherwise(col("LastName")))
print("DataFrame after replacing missing values in LastName:")
df_replaced.show()

DataFrame after replacing missing values in LastName:
+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName| LastName|StartDate| ExitDate|             Title|      Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------

In [53]:
# Drop rows with missing values in essential columns like EmpID or StartDate
df = df_replaced.dropna(subset=["EmpID", "StartDate"])
print("DataFrame after dropping rows with missing values in EmpID or StartDate:")
df.show()

DataFrame after dropping rows with missing values in EmpID or StartDate:
+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+-----------------------+
|EmpID|FirstName| LastName|StartDate| ExitDate|             Title|      Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|Current Employee Rating|
+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+---------------------

In [55]:
from pyspark.sql.functions import col, lit, greatest, least

# Cap values between 1 and 5 for the 'CurrentEmployeeRating' column
df_outliers_handled = df.withColumn(
    "CurrentEmployeeRating",
    least(greatest(col("Current Employee Rating").cast("float"), lit(1)), lit(5))
)

# Drop the original 'Current Employee Rating' column and show the result
df = df_outliers_handled.drop("Current Employee Rating")
df.show()


+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+---------------------+
|EmpID|FirstName| LastName|StartDate| ExitDate|             Title|      Supervisor|BusinessUnit|EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|CurrentEmployeeRating|
+-----+---------+---------+---------+---------+------------------+----------------+------------+--------------+------------+-------+--------------------------+---------------+--------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+------------

In [56]:
from pyspark.sql.functions import count

# Remove duplicate records
df = df.dropDuplicates()

# Group by 'DepartmentType' and 'JobFunctionDescription' and count the number of employees
df_employee_count = df.groupBy("DepartmentType", "JobFunctionDescription").agg(count("EmpID"))

# Show the result
df_employee_count.show()


+-----------------+----------------------+------------+
|   DepartmentType|JobFunctionDescription|count(EmpID)|
+-----------------+----------------------+------------+
|            Sales|                 Clerk|           2|
|            IT/IS|               Laborer|          25|
|            Sales|              Director|           4|
|            Sales|               Flagger|           1|
|Production       |               Planner|           2|
|Production       |                Welder|           1|
|            IT/IS|        Administration|           3|
|Production       |               Laborer|          42|
|            IT/IS|             Assistant|           2|
|Production       |                 Clerk|           2|
|            Sales|            Tower Hand|           2|
| Executive Office|       Project Manager|           2|
|Production       |              Director|           4|
|            IT/IS|              Engineer|          20|
|            Sales|           Coordinator|      

In [57]:
df.show()

+-----+---------+---------+---------+---------+--------------------+--------------------+------------+--------------------+------------+-------+--------------------------+---------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+--------+-----------+-----------------+---------------------+
|EmpID|FirstName| LastName|StartDate| ExitDate|               Title|          Supervisor|BusinessUnit|      EmployeeStatus|EmployeeType|PayZone|EmployeeClassificationType|TerminationType|   DepartmentType|            Division|       DOB|State|JobFunctionDescription|GenderCode|LocationCode|RaceDesc|MaritalDesc|Performance Score|CurrentEmployeeRating|
+-----+---------+---------+---------+---------+--------------------+--------------------+------------+--------------------+------------+-------+--------------------------+---------------+-----------------+--------------------+----------+-----+----------------------+----------+------------+------

In [58]:
from pyspark.sql.functions import max

# Group by DepartmentType and find the maximum PerformanceScore
result = df.groupBy("DepartmentType").agg(
    max("Performance Score").alias("MaxPerformanceScore")
)

# Show the result
result.show()

+--------------------+-------------------+
|      DepartmentType|MaxPerformanceScore|
+--------------------+-------------------+
|    Executive Office|                PIP|
|               IT/IS|                PIP|
|   Production       |                PIP|
|               Sales|  Needs Improvement|
|Software Engineering|        Fully Meets|
+--------------------+-------------------+

