In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,avg,round

In [0]:
spark = SparkSession.builder.appName("HR Emp Attrition").getOrCreate()

In [0]:
df = spark.read.option("header", True).option("inferSchema", True).csv("/Volumes/hr/default/hr")
display(df.limit(20).collect())

***Attrition Count***

In [0]:
df.groupBy("Attrition").count().show()

***Attrition by Overtime***

In [0]:
df.filter(col("Attrition")=="Yes").groupBy("OverTime", "Attrition").count().show()

***Avg Monthly income by Rolewise***

In [0]:
df.groupBy("JobRole").agg(round(avg("MonthlyIncome"),2).alias("AvgMonthlyIncome")).orderBy("AvgMonthlyIncome", ascending=False).show()

***Average years employees worked by attrition status***

In [0]:
df.groupBy("Attrition").agg(round(avg("YearsAtCompany"),2).alias("YearsAtCompany")).show()

***Dropping the below columns due to irrelevance, redundancy, or high missing values.***

In [0]:
drop_cols = ["EmployeeCount", "StandardHours", "Over18"]
df = df.drop(*drop_cols)

In [0]:
from pyspark.sql.functions import col
# Cast numeric columns
numeric_cols = ["Age", "DailyRate", "DistanceFromHome", "MonthlyIncome", 
                    "YearsAtCompany", "YearsInCurrentRole", "YearsSinceLastPromotion", "YearsWithCurrManager"]
for col_name in numeric_cols:
    df = df.withColumn(col_name, col(col_name).cast("integer"))

### By Age

In [0]:
from pyspark.sql import functions as F

def analyze_attrition_by_age(df):
    """
    Categorizes employees into age groups and calculates attrition counts & rates.
    """
    
    # Create Age Categories
    df = df.withColumn(
        "AgeCategory",
        F.when((F.col("Age") >= 18) & (F.col("Age") <= 25), "18-25")
         .when((F.col("Age") >= 26) & (F.col("Age") <= 35), "26-35")
         .when((F.col("Age") >= 36) & (F.col("Age") <= 45), "36-45")
         .when((F.col("Age") >= 46) & (F.col("Age") <= 55), "46-55")
         .otherwise("55+")
    )
    
    # Group by Age Category and calculate attrition counts & percentages
    return (
        df.groupBy("AgeCategory")
          .agg(
              F.count("*").alias("TotalEmployees"),
              F.sum(F.when(F.col("Attrition") == "Yes", 1).otherwise(0)).alias("AttritionCount")
          )
          .withColumn("AttritionRate", 
                      round((F.col("AttritionCount") / F.col("TotalEmployees") * 100),2).cast("double"))
          .orderBy("AgeCategory")
    )

##By Department#

In [0]:
def analyze_attrition_by_department(df):
    return (
        df.groupBy("Department")
          .agg(
              F.count("*").alias("TotalEmployees"),
              F.sum(F.when(F.col("Attrition") == "Yes", 1).otherwise(0)).alias("AttritionCount")
          )
          .withColumn("AttritionRate", 
                      round((F.col("AttritionCount") / F.col("TotalEmployees") * 100),2).cast("double"))
          .orderBy(F.desc("AttritionRate"))
    )

##By Education

In [0]:
def analyze_attrition_by_education(df):
    # Map education codes to labels
    education_map = {
        1: "Below College",
        2: "College",
        3: "Bachelor",
        4: "Master",
        5: "Doctor"
    }
    edu_mapping_expr = F.create_map([F.lit(x) for x in sum(education_map.items(), ())])

    df = df.withColumn("EducationLevel", edu_mapping_expr[F.col("Education")])

    return (
        df.groupBy("EducationLevel")
          .agg(
              F.count("*").alias("TotalEmployees"),
              F.sum(F.when(F.col("Attrition") == "Yes", 1).otherwise(0)).alias("AttritionCount")
          )
          .withColumn("AttritionRate", 
                      round((F.col("AttritionCount") / F.col("TotalEmployees") * 100),2).cast("double"))
          .orderBy(F.desc("AttritionRate"))
    )

##By Environment Satisfaction

In [0]:
def analyze_attrition_by_env_satisfaction(df):
    env_map = {
        1: "Low",
        2: "Medium",
        3: "High",
        4: "Very High"
    }
    env_mapping_expr = F.create_map([F.lit(x) for x in sum(env_map.items(), ())])

    df = df.withColumn("EnvironmentSatisfactionLevel", env_mapping_expr[F.col("EnvironmentSatisfaction")])

    return (
        df.groupBy("EnvironmentSatisfactionLevel")
          .agg(
              F.count("*").alias("TotalEmployees"),
              F.sum(F.when(F.col("Attrition") == "Yes", 1).otherwise(0)).alias("AttritionCount")
          )
          .withColumn("AttritionRate", 
                      round((F.col("AttritionCount") / F.col("TotalEmployees") * 100),2).cast("double"))
          .orderBy(F.desc("AttritionRate"))
    )

##By Job Satisfaction

In [0]:
def analyze_attrition_by_job_satisfaction(df):
    job_map = {
        1: "Low",
        2: "Medium",
        3: "High",
        4: "Very High"
    }
    job_mapping_expr = F.create_map([F.lit(x) for x in sum(job_map.items(), ())])

    df = df.withColumn("JobSatisfactionLevel", job_mapping_expr[F.col("JobSatisfaction")])

    return (
        df.groupBy("JobSatisfactionLevel")
          .agg(
              F.count("*").alias("TotalEmployees"),
              F.sum(F.when(F.col("Attrition") == "Yes", 1).otherwise(0)).alias("AttritionCount")
          )
          .withColumn("AttritionRate", 
                      round((F.col("AttritionCount") / F.col("TotalEmployees") * 100),2).cast("double"))
          .orderBy(F.desc("AttritionRate"))
    )

## Analysis Insights

In [0]:
if __name__ == "__main__":

    # Age Analysis
    attrition_by_age = analyze_attrition_by_age(df)
    attrition_by_age.show(truncate=False)

    # Department Analysis
    dept_attrition = analyze_attrition_by_department(df)
    dept_attrition.show(truncate=False)

    # Education Analysis
    edu_attrition = analyze_attrition_by_education(df)
    edu_attrition.show(truncate=False)

    # Environment Satisfaction Analysis
    env_attrition = analyze_attrition_by_env_satisfaction(df)
    env_attrition.show(truncate=False)

    # Job Satisfaction Analysis
    job_attrition = analyze_attrition_by_job_satisfaction(df)
    job_attrition.show(truncate=False)