#### Problem Statement: Employee Data Analysis
    You are tasked with analyzing employee data for a company using PySpark. The dataset contains employee information such as Employee ID, Name, Age, Department, and Salary. Some of the fields have missing values (nulls). Your job is to perform the following tasks to help the company gain insights from the employee data.
    Tasks:
        1. Data Cleaning:
            Load the employee dataset into a PySpark DataFrame.
            Handle missing values (null):
                Age: Replace missing age values with the average age of all employees.
                Department: Replace missing department values with the most frequent department.
                Salary: Replace missing salary values with the median salary.

        2. Basic Aggregations:
            Calculate the average salary and average age for each department.
            Find the total salary and total number of employees for each department.
            Identify the department with the highest average salary.
        
        3. Salary Analysis:
            Create a new column Salary Range:
                If salary < 50,000: "Low"
                If salary >= 50,000 and < 100,000: "Medium"
                If salary >= 100,000: "High"
            Count the number of employees in each salary range.
        
        4. Age Distribution:
            Calculate the average age of employees within each salary range ("Low", "Medium", "High").
            Find the youngest and oldest employee in each department.
        
        5. Employee Classification:
            Find out how many employees are older than 50 years and have a salary greater than 80,000. This can be considered as a special group of
            senior employees with high salaries.
            
        6. Missing Value Handling (Advanced):
            For departments where the salary column has more than 20% missing values, drop those departments from the dataset.
        
        7. Save the Results:
            Save the cleaned dataset as a new CSV file.
            Save the aggregated data (average salary, total salary, etc.) for each department into a new CSV file.

In [1]:
!pip install pyspark



In [584]:
import pyspark

In [585]:
from pyspark.sql import SparkSession

In [586]:
spark=SparkSession.builder.appName('EmployeeData').getOrCreate()

In [587]:
df_pyspark=spark.read.option('header','true').csv('dataset.csv',inferSchema=True)

In [588]:
df_pyspark.show()

+-----------+---------------+----+----------+--------+
|Employee ID|           Name| Age|Department|  Salary|
+-----------+---------------+----+----------+--------+
|      84288|  Molly Johnson|46.0|        IT|118377.0|
|      81651|    Brandi Hill|58.0|        HR| 56542.0|
|      55037|  Joshua Thomas|40.0|   Finance| 76093.0|
|      13699|   George Terry|53.0|     Sales| 42896.0|
|      68662| Anna Wilkinson|45.0|     Sales| 66665.0|
|      22920| Stephen Carter|43.0|   Finance| 57427.0|
|      21352|  Tonya Douglas|38.0|        IT| 79765.0|
|      53478|   Diane Nelson|53.0|   Finance| 50736.0|
|      55742|     Teresa Cox|36.0|     Sales|116550.0|
|      38877|  Kendra Cannon|38.0|      NULL|115821.0|
|      57660|   Scott Holmes|54.0|Operations| 72615.0|
|      79344| Melanie Guzman|44.0|     Sales| 62587.0|
|      61879|  James Lindsey|37.0|Operations| 66655.0|
|       1841|William Aguirre|NULL|        IT| 64257.0|
|      56629|     Ryan Price|42.0|   Finance| 50434.0|
|      916

1. #### Data Cleaning:
    -    Load the employee dataset into a PySpark DataFrame.
    -    Handle missing values (null):
        -    Age: Replace missing age values with the average age of all employees.
        -    Department: Replace missing department values with the most frequent department.
        -    Salary: Replace missing salary values with the median salary.

In [589]:
from pyspark.sql.functions import avg,col,round,count,median,sum,when,min,max

In [590]:
avg_age = df_pyspark.select(round(avg("Age"))).collect()[0][0]

df_filled = df_pyspark.fillna({'Age': avg_age})

In [591]:
df_filled.show()

+-----------+---------------+----+----------+--------+
|Employee ID|           Name| Age|Department|  Salary|
+-----------+---------------+----+----------+--------+
|      84288|  Molly Johnson|46.0|        IT|118377.0|
|      81651|    Brandi Hill|58.0|        HR| 56542.0|
|      55037|  Joshua Thomas|40.0|   Finance| 76093.0|
|      13699|   George Terry|53.0|     Sales| 42896.0|
|      68662| Anna Wilkinson|45.0|     Sales| 66665.0|
|      22920| Stephen Carter|43.0|   Finance| 57427.0|
|      21352|  Tonya Douglas|38.0|        IT| 79765.0|
|      53478|   Diane Nelson|53.0|   Finance| 50736.0|
|      55742|     Teresa Cox|36.0|     Sales|116550.0|
|      38877|  Kendra Cannon|38.0|      NULL|115821.0|
|      57660|   Scott Holmes|54.0|Operations| 72615.0|
|      79344| Melanie Guzman|44.0|     Sales| 62587.0|
|      61879|  James Lindsey|37.0|Operations| 66655.0|
|       1841|William Aguirre|40.0|        IT| 64257.0|
|      56629|     Ryan Price|42.0|   Finance| 50434.0|
|      916

In [592]:
most_frequent_department = df_pyspark.groupBy("Department").agg(
    count("*").alias("count")
).orderBy(col("count").desc()).first()[0]

df_filled = df_filled.fillna({'Department' : most_frequent_department})

In [593]:
df_filled.show()

+-----------+---------------+----+----------+--------+
|Employee ID|           Name| Age|Department|  Salary|
+-----------+---------------+----+----------+--------+
|      84288|  Molly Johnson|46.0|        IT|118377.0|
|      81651|    Brandi Hill|58.0|        HR| 56542.0|
|      55037|  Joshua Thomas|40.0|   Finance| 76093.0|
|      13699|   George Terry|53.0|     Sales| 42896.0|
|      68662| Anna Wilkinson|45.0|     Sales| 66665.0|
|      22920| Stephen Carter|43.0|   Finance| 57427.0|
|      21352|  Tonya Douglas|38.0|        IT| 79765.0|
|      53478|   Diane Nelson|53.0|   Finance| 50736.0|
|      55742|     Teresa Cox|36.0|     Sales|116550.0|
|      38877|  Kendra Cannon|38.0|   Finance|115821.0|
|      57660|   Scott Holmes|54.0|Operations| 72615.0|
|      79344| Melanie Guzman|44.0|     Sales| 62587.0|
|      61879|  James Lindsey|37.0|Operations| 66655.0|
|       1841|William Aguirre|40.0|        IT| 64257.0|
|      56629|     Ryan Price|42.0|   Finance| 50434.0|
|      916

In [594]:
df_filled.orderBy("Salary","Employee ID").show()

+-----------+-------------------+----+----------+------+
|Employee ID|               Name| Age|Department|Salary|
+-----------+-------------------+----+----------+------+
|       1092|        Debra Jones|59.0|Operations|  NULL|
|       1108|     Tiffany Murphy|21.0|     Sales|  NULL|
|       1137|     William Morgan|44.0|        IT|  NULL|
|       1168|    James Baker PhD|41.0|     Admin|  NULL|
|       1190|      Leslie Tucker|26.0|   Finance|  NULL|
|       1370|Christine Patterson|29.0|     Sales|  NULL|
|       1571|     Sarah Stephens|40.0|Operations|  NULL|
|       1620|   Christine Oliver|43.0|     Admin|  NULL|
|       1716|     Kristen Barron|51.0|   Finance|  NULL|
|       1894|     Anthony Bishop|35.0|        HR|  NULL|
|       1930|       Jasmine Dunn|42.0| Marketing|  NULL|
|       1936|       Amanda Clark|27.0|        IT|  NULL|
|       1997|           Ann Wolf|36.0|     Sales|  NULL|
|       2250|       Susan Parker|50.0|   Finance|  NULL|
|       2318|    Anthony Muelle

In [595]:
median_salary = df_pyspark.select(
    median(col("Salary"))
).collect()[0][0]

df_filled = df_filled.fillna({'Salary' : median_salary}).orderBy("Employee ID")

In [596]:
df_filled.show()

+-----------+-----------------+----+----------+--------+
|Employee ID|             Name| Age|Department|  Salary|
+-----------+-----------------+----+----------+--------+
|       1008|  Mr. Ruben Evans|49.0|        HR| 93548.0|
|       1020|     Ernest Gates|26.0|   Finance| 83361.0|
|       1022|   Michael Hudson|40.0| Marketing| 91849.0|
|       1023|       Kevin Ryan|35.0|Operations| 99448.0|
|       1023|  Austin Crawford|41.0|        IT| 55821.0|
|       1024|    Sandy Aguilar|26.0|   Finance| 99213.0|
|       1036|        Isaac Day|42.0| Marketing| 52637.0|
|       1036|Andrew Richardson|24.0|   Finance| 79727.0|
|       1039| Shannon Saunders|27.0|   Finance|101054.0|
|       1045|   William Tucker|53.0|     Sales| 65766.0|
|       1064|   Jeffery Nguyen|34.0|   Finance| 44352.0|
|       1070| Shannon Davidson|42.0|        HR| 40568.0|
|       1072|    Beth Johnston|34.0|     Admin| 68985.0|
|       1081|   Derrick Jacobs|40.0|     Admin| 58839.0|
|       1082|      Angela Gray|

2. #### Basic Aggregations:
-        Calculate the average salary and average age for each department.
-        Find the total salary and total number of employees for each department.
-        Identify the department with the highest average salary.

In [597]:
department_avg_data = df_filled.groupBy("Department").agg(
    avg("Age").alias("Average Age"),
    avg("Salary").alias("Average Salary")
).show()

+----------+------------------+-----------------+
|Department|       Average Age|   Average Salary|
+----------+------------------+-----------------+
|     Sales| 40.02879177377892|80341.38920308484|
|        HR|40.236609464378574| 80241.6219448778|
|   Finance| 40.05416078984485|79343.85049365304|
|     Admin|39.787862513426425|79349.33780880773|
| Marketing| 39.74038461538461|80107.61805555556|
|        IT| 39.57010785824345|80660.37339496662|
|Operations| 40.22193074501574|79993.11909758658|
+----------+------------------+-----------------+



In [598]:
department_total_data = df_filled.groupBy("Department").agg(
    sum("Salary").alias("Total Salary"),
    count("Employee ID").alias("Total Employees")
).show()

+----------+------------+---------------+
|Department|Total Salary|Total Employees|
+----------+------------+---------------+
|     Sales|1.56264002E8|           1945|
|        HR|1.54304639E8|           1923|
|   Finance| 2.8127395E8|           3545|
|     Admin|1.47748467E8|           1862|
| Marketing|1.49961461E8|           1872|
|        IT|1.57045747E8|           1947|
|Operations|1.52466885E8|           1906|
+----------+------------+---------------+



In [625]:
department_highest_avg_salary = print(df_filled.groupBy("Department").agg(
    avg("Salary").alias("Average Salary")
).orderBy(col("Average Salary").desc()).first())

Row(Department='IT', Average Salary=80660.37339496662)


3. #### Salary Analysis:
    -    Create a new column Salary Range:
        -    If salary < 50,000: "Low"
        -    If salary >= 50,000 and < 100,000: "Medium"
        -    If salary >= 100,000: "High"
    -    Count the number of employees in each salary range.

In [599]:
df_filled = df_filled.withColumn("Salary Range",when(df_filled["Salary"] < 50000,"Low")
                                                .when(df_filled["Salary"] >= 100000,"High")
                                                .otherwise("Medium"))


In [600]:
df_filled.show()

+-----------+-----------------+----+----------+--------+------------+
|Employee ID|             Name| Age|Department|  Salary|Salary Range|
+-----------+-----------------+----+----------+--------+------------+
|       1008|  Mr. Ruben Evans|49.0|        HR| 93548.0|      Medium|
|       1020|     Ernest Gates|26.0|   Finance| 83361.0|      Medium|
|       1022|   Michael Hudson|40.0| Marketing| 91849.0|      Medium|
|       1023|       Kevin Ryan|35.0|Operations| 99448.0|      Medium|
|       1023|  Austin Crawford|41.0|        IT| 55821.0|      Medium|
|       1024|    Sandy Aguilar|26.0|   Finance| 99213.0|      Medium|
|       1036|        Isaac Day|42.0| Marketing| 52637.0|      Medium|
|       1036|Andrew Richardson|24.0|   Finance| 79727.0|      Medium|
|       1039| Shannon Saunders|27.0|   Finance|101054.0|        High|
|       1045|   William Tucker|53.0|     Sales| 65766.0|      Medium|
|       1064|   Jeffery Nguyen|34.0|   Finance| 44352.0|         Low|
|       1070| Shanno

In [601]:
salary_range_total_employee = df_filled.groupBy("Salary Range").agg(
    count("Employee ID").alias("Total Employee in Salary Range")
).show()

+------------+------------------------------+
|Salary Range|Total Employee in Salary Range|
+------------+------------------------------+
|        High|                          3370|
|         Low|                          1670|
|      Medium|                          9960|
+------------+------------------------------+



4. #### Age Distribution:
    -    Calculate the average age of employees within each salary range ("Low", "Medium", "High").
    -    Find the youngest and oldest employee in each department.

In [602]:
salary_range_average_age = df_filled.groupBy("Salary Range").agg(
    avg("Age").alias("Average Age in Salary Range")
).show()

+------------+---------------------------+
|Salary Range|Average Age in Salary Range|
+------------+---------------------------+
|        High|          39.90652818991098|
|         Low|          39.80419161676647|
|      Medium|          40.00502008032129|
+------------+---------------------------+



In [603]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number,asc,desc

youngest = Window.partitionBy("Department").orderBy(asc("Age"))
oldest = Window.partitionBy("Department").orderBy(desc("Age"))

df_youngest = df_filled.withColumn("Rank1",row_number().over(youngest)
                                  ).filter(col("Rank1")==1
                                          ).drop("Rank1").withColumn("Youngest of Department",col("Department"))

In [604]:
df_youngest.show()

+-----------+------------------+----+----------+--------+------------+----------------------+
|Employee ID|              Name| Age|Department|  Salary|Salary Range|Youngest of Department|
+-----------+------------------+----+----------+--------+------------+----------------------+
|       1346|   Shannon Sampson|20.0|     Admin| 88243.0|      Medium|                 Admin|
|       2397|       Laura Henry|20.0|   Finance|109179.0|        High|               Finance|
|       2041|      Robert Smith|20.0|        HR| 86699.0|      Medium|                    HR|
|      10431|    Matthew Wright|20.0|        IT| 61410.0|      Medium|                    IT|
|       1156|Kimberly Hernandez|20.0| Marketing|100166.0|        High|             Marketing|
|       2285|    Matthew Steele|20.0|Operations| 44055.0|         Low|            Operations|
|       7245|     Antonio Baker|20.0|     Sales| 72175.0|      Medium|                 Sales|
+-----------+------------------+----+----------+--------+---

In [605]:
df_oldest = df_filled.withColumn("Rank2",row_number().over(oldest)
                                  ).filter(col("Rank2")==1
                                          ).drop("Rank2").withColumn("Oldest of Department",col("Department"))

In [606]:
df_oldest.show()

+-----------+----------------+----+----------+--------+------------+--------------------+
|Employee ID|            Name| Age|Department|  Salary|Salary Range|Oldest of Department|
+-----------+----------------+----+----------+--------+------------+--------------------+
|       5040|   Carrie Reeves|60.0|     Admin| 51103.0|      Medium|               Admin|
|       1387|   Jeffrey Kelly|60.0|   Finance| 57401.0|      Medium|             Finance|
|       1788|     Cory Torres|60.0|        HR| 98977.0|      Medium|                  HR|
|      11906|   Donna Salazar|60.0|        IT|116123.0|        High|                  IT|
|       2063|   Amber Rosario|60.0| Marketing|113749.0|        High|           Marketing|
|       4066|  Samuel Hopkins|60.0|Operations| 83436.0|      Medium|          Operations|
|       7438|Selena Valentine|60.0|     Sales|114452.0|        High|               Sales|
+-----------+----------------+----+----------+--------+------------+--------------------+



5. #### Employee Classification:
    -    Find out how many employees are older than 50 years and have a salary greater than 80,000. This can be considered as a special group of senior employees with high salaries.

In [607]:
df_cleaned = df_filled.withColumn("Senior Employee with High Salaries",
                                                         when((df_filled.Age > 50) &
                                                          (df_filled.Salary > 80000), "True").otherwise("False"))

In [611]:
df_cleaned.show()

+-----------+-----------------+----+----------+--------+------------+----------------------------------+
|Employee ID|             Name| Age|Department|  Salary|Salary Range|Senior Employee with High Salaries|
+-----------+-----------------+----+----------+--------+------------+----------------------------------+
|       1008|  Mr. Ruben Evans|49.0|        HR| 93548.0|      Medium|                             False|
|       1020|     Ernest Gates|26.0|   Finance| 83361.0|      Medium|                             False|
|       1022|   Michael Hudson|40.0| Marketing| 91849.0|      Medium|                             False|
|       1023|       Kevin Ryan|35.0|Operations| 99448.0|      Medium|                             False|
|       1023|  Austin Crawford|41.0|        IT| 55821.0|      Medium|                             False|
|       1024|    Sandy Aguilar|26.0|   Finance| 99213.0|      Medium|                             False|
|       1036|        Isaac Day|42.0| Marketing| 52637.0

In [618]:
total_senior_emp_with_high_salaries = df_cleaned.groupBy("Senior Employee with High Salaries").agg(
                                            count("Employee ID").alias("Total Senior Employee with High Salaries")
                                      ).filter(col("Senior Employee with High Salaries")==True
                                    ).show()

+----------------------------------+----------------------------------------+
|Senior Employee with High Salaries|Total Senior Employee with High Salaries|
+----------------------------------+----------------------------------------+
|                              True|                                    1490|
+----------------------------------+----------------------------------------+



6. #### Missing Value Handling :
    -    For departments where the salary column has more than 20% missing values, drop those departments from the dataset.

In [537]:
df_dummy = df_filled

In [538]:
total_department_employee = Window.partitionBy("Department")
df_with_count = df_dummy.withColumn("Total Employee in Department", count("*").over(total_department_employee))

In [539]:
df_with_count.show()

+-----------+----------------+----+----------+--------+----------------------------+
|Employee ID|            Name| Age|Department|  Salary|Total Employee in Department|
+-----------+----------------+----+----------+--------+----------------------------+
|      72076|   Yvonne Oliver|48.0|     Admin| 62671.0|                        1862|
|      44431| Kenneth Francis|47.0|     Admin| 99175.0|                        1862|
|      10402|   Gregory Cowan|44.0|     Admin|    NULL|                        1862|
|       9997|   Randy Richard|55.0|     Admin|    NULL|                        1862|
|      56698|     April Lopez|35.0|     Admin| 61424.0|                        1862|
|       6150|    Angela Munoz|40.0|     Admin|103548.0|                        1862|
|      97874|  James Stephens|34.0|     Admin|108703.0|                        1862|
|       4399|   Linda Oconnor|35.0|     Admin| 50923.0|                        1862|
|      77529|  Jeffery Dalton|54.0|     Admin| 69349.0|          

In [540]:
df_with_count = df_with_count.withColumn("Total Null Salary Count",
                                   count(when(col("Salary").isNull(),True)).over(total_department_employee)
                                   )

In [541]:
df_with_count.show()

+-----------+----------------+----+----------+--------+----------------------------+-----------------------+
|Employee ID|            Name| Age|Department|  Salary|Total Employee in Department|Total Null Salary Count|
+-----------+----------------+----+----------+--------+----------------------------+-----------------------+
|      72076|   Yvonne Oliver|48.0|     Admin| 62671.0|                        1862|                    197|
|      44431| Kenneth Francis|47.0|     Admin| 99175.0|                        1862|                    197|
|      10402|   Gregory Cowan|44.0|     Admin|    NULL|                        1862|                    197|
|       9997|   Randy Richard|55.0|     Admin|    NULL|                        1862|                    197|
|      56698|     April Lopez|35.0|     Admin| 61424.0|                        1862|                    197|
|       6150|    Angela Munoz|40.0|     Admin|103548.0|                        1862|                    197|
|      97874|  Jame

In [542]:
df_without_null_salary = df_with_count.where(
    col("Total Null Salary Count")/col("Total Employee in Department") <= 0.2
)

In [543]:
df_without_null_salary.show()

+-----------+----------------+----+----------+--------+----------------------------+-----------------------+
|Employee ID|            Name| Age|Department|  Salary|Total Employee in Department|Total Null Salary Count|
+-----------+----------------+----+----------+--------+----------------------------+-----------------------+
|      72076|   Yvonne Oliver|48.0|     Admin| 62671.0|                        1862|                    197|
|      44431| Kenneth Francis|47.0|     Admin| 99175.0|                        1862|                    197|
|      10402|   Gregory Cowan|44.0|     Admin|    NULL|                        1862|                    197|
|       9997|   Randy Richard|55.0|     Admin|    NULL|                        1862|                    197|
|      56698|     April Lopez|35.0|     Admin| 61424.0|                        1862|                    197|
|       6150|    Angela Munoz|40.0|     Admin|103548.0|                        1862|                    197|
|      97874|  Jame

In [544]:
df_without_null_salary.groupBy("Department").agg(
    count("Employee ID").alias("Total Count")
).show()

+----------+-----------+
|Department|Total Count|
+----------+-----------+
|     Admin|       1862|
|   Finance|       3545|
|        HR|       1923|
|        IT|       1947|
| Marketing|       1872|
|Operations|       1906|
|     Sales|       1945|
+----------+-----------+



7. #### Save the Results:
    -    Save the cleaned dataset as a new CSV file.
    -    Save the aggregated data (average salary, total salary, etc.) for each department into a new CSV file.

In [620]:
df_cleaned.write.option(
    "Header",True).mode(
    "overwrite").csv(
    "output/cleaned_employee_data"
    )

In [622]:
df_aggregated = df_pyspark.groupBy("Department").agg(
    avg("Salary").alias("Average Salary"),
    sum("Salary").alias("Total Salary"),
    avg("Age").alias("Average Age")
)

In [623]:
df_aggregated.write.option(
    "Header",True).mode(
    "overwrite").csv(
    "output/aggregated_employee_data"
)