## 1. Project Overview
##### Dataset Description

This dataset contains employee records including employee ID, department, salary, joining date, and other HR-related attributes.

The data simulates a real-world HR system where employee records may contain inconsistencies, missing values, and formatting issues.
Explain:

##### Data Problems Identified

During initial exploration, the following issues were observed:

- Missing values in key columns

- Inconsistent formatting (e.g., department names values)

- Duplicate employee records

- Incorrect or unrealistic salary values

- Mixed date formats

These issues require cleaning before the data can be reliably used for reporting or downstream systems

##### Tools & Technologies Used

- PySpark – Distributed data processing

- Spark SQL – Querying structured data

- DataFrame API – Data cleaning and transformation

- Jupyter Notebook – Development environment

##### Goal of the project
The objective of this project is to:

- Clean and standardize employee data

- Handle missing and duplicate records

- Validate and transform columns

- Prepare production-ready structured data

- Simulate a real-world data engineering cleaning pipeline

In [36]:
# Setting up pyspark session
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark=SparkSession.builder\
.appName("HrDataCleaning")\
.getOrCreate()

## 2. Load and inspect data

In [37]:
#load raw data
df_raw=spark.read.format("csv")\
.option("header","true")\
.load("../raw_data/raw_hr_data.csv")

#printting raw data
print("Raw HR Data")
df_raw.show(10)

#schema of raw_hr_data
df_raw.printSchema()

Raw HR Data
+-----------+-----------+---------------+------+------------+--------------------+
|employee_id|       name|     department|salary|joining_date|               email|
+-----------+-----------+---------------+------+------------+--------------------+
|        E82| Employee_1|             IT|-31639|    2/4/2020| Employee1@Compan...|
|        E87| Employee_2|        finance|78,540|    7/3/2019| Employee2@Compan...|
|        E28| Employee_3|          sales|45,247|   2/12/2020| Employee3@Compan...|
|        E29| Employee_4|             hr|59,439|  11/30/2022| Employee4@Compan...|
|        E36| Employee_5|Human Resources|-44110|    7/9/2019| Employee5@Compan...|
|        E46| Employee_6|             HR|-52541|    2/3/2023| Employee6@Compan...|
|        E49| Employee_7|             IT|-66178|  13-12-2022| Employee7@Compan...|
|        E85| Employee_8|             IT|44,935|   7/26/2019| Employee8@Compan...|
|        E59| Employee_9|        Finance|  NULL|  28-12-2020| Employee9@Com

## 3. Initial data assessment

In [38]:
# 1> total rows
total_rows=df_raw.count()

# 2. Duplicate count
# for this we subtract thw count of unique to the total rows
df_unique=df_raw.dropDuplicates(["employee_id"])
unique_rows=df_unique.count()
duplicate_row=total_rows-unique_rows

#3. Missing Salary Count
#for checking actual NULL values in the salary column
missing_salary=df_raw.filter(col("salary").isNull()).count()

#Displaying Results
print(f"Total Rows:{total_rows}")
print(f"Duplicate Rows:{duplicate_row}")
print(f"Missing Salary Count:{missing_salary}")
print(f"Unique Rows:{unique_rows}")

Total Rows:100
Duplicate Rows:38
Missing Salary Count:19
Unique Rows:62


## 4. Cleaning

### 4.1 Salary Cleaning and Validation
#### Data Quality Issues Identified

During initial validation of the salary column, the following issues were observed:

- 19 records contained NULL values

- 11 records contained negative salary values

- Several records were formatted as strings with commas (e.g., "45,000")

- Empty string values were present

Total affected salary records: 30

#### Data Cleaning Actions Performed

The following transformations were applied:

- Removed comma formatting from salary values

- Converted salary column to numeric type

- Standardized empty strings to NULL

- Treated negative salary values as invalid

- Imputed missing and invalid salaries using department-level median

A salary_flag column was created to preserve audit traceability and distinguish between original and imputed values.

#### Business Rationale

Negative salary values are invalid in an HR payroll context.
To prevent incorrect financial reporting and maintain payroll consistency, these records were treated as missing and imputed using department-level median values.

This approach ensures data integrity while preserving the original business meaning.

In [39]:
# 1. replace empty string with NULL
df_salary=df_unique.withColumn("salary",
                            when(col("salary")=="", None).otherwise(col("salary"))
                            )

#2. remove commas
df_salary=df_salary.withColumn("salary",
                               regexp_replace(col("salary"),",","")
                               )

#3. converting to numeric- casting
df_salary=df_salary.withColumn("salary",
                               col("salary").cast("double")
                               )

#4. flaging invalid nagative salary - insted of deleting this null salary record I create a flag
df_salary=df_salary.withColumn("salary_flag",
                               when(col("salary") < 0, "Invalid_Negative")
                               .when(col("salary").isNull(),"Imputed_Department_Median")
                               .otherwise("Original_Valid")
                               )
#5. remove negative salaries
df_salary=df_salary.withColumn("salary",
                               when(col("salary") < 0,None).otherwise(col("salary"))
                               )

# cheking real missing salary count
missing_salary=df_salary.filter(col("salary").isNull())

print("Total Unique Rows:",df_salary.count())
print("Actual Missing Salary after cleaning:", missing_salary.count())

Total Unique Rows: 62
Actual Missing Salary after cleaning: 30


A salary_flag column was retained to maintain audit traceability and distinguish between original and imputed values.

### 4.2 Standardize Department

In [69]:
df_dep=df_salary.withColumn("department",
                              trim(lower(col("department")))
                              )\
                              .withColumn("department",
                                          when(col("department").isin("hr","human resources"),"Human Resources")
                                          .when(col("department").isin("it","information technology"),"Information Technology")
                                          .when(col("department").isin("finance"),"Finance")
                                          .when(col("department").isin("sales"),"Sales")
                                          .otherwise(col("department"))
                                          )

### 4.3 Clean Date

In [71]:
# joining date standardization- convert everything  into yyyy-MM-dd
df_date=df_dep.withColumn("joining_date",
                             coalesce(
                                to_date(col("joining_date"),"yyyy-MM-dd"),
                                to_date(col("joining_date"),"dd-MM-yyyy"),
                                to_date(col("joining_date"),"yyyy/MM/dd"),
                                to_date(col("joining_date"), "M/d/yyyy"),
                                to_date(col("joining_date"), "MM/dd/yyyy"),
                             ))


### 4.4 Clean Email

In [72]:
#cleaning email
#leading space and uppercase letter
df_email=df_date.withColumn("email",
                            lower(trim(col("email")))
                            )

Salary imputation was performed using department-level median values to preserve distribution characteristics and avoid skew from extreme values.
### 4.5 Impute salary (dept median)

In [73]:
#1. calculate department median
dept_median=df_email.groupBy("department").agg(
    percentile_approx("salary",0.5).alias("median_salary")
)

#2. join back to main data
df_dep=df_email.join(dept_median, on="department",how="left")

#3. fill missing salary
df_dep1=df_dep.withColumn("salary",
                               coalesce(col("salary"), col("median_salary"))
                               )
#drop helper column
df_dep1=df_dep1.drop("median_salary")

#printing clean df
df_dep1.show(100)

+--------------------+-----------+-----------+-------+------------+--------------------+--------------------+
|          department|employee_id|       name| salary|joining_date|               email|         salary_flag|
+--------------------+-----------+-----------+-------+------------+--------------------+--------------------+
|Information Techn...|         E1|Employee_26|34652.0|  2019-05-19|employee26@compan...|      Original_Valid|
|Information Techn...|        E10|Employee_27|48250.0|  2022-09-01|employee27@compan...|      Original_Valid|
|               Sales|        E11|Employee_22|77966.0|  2019-09-15|employee22@compan...|      Original_Valid|
|             Finance|        E12|Employee_32|67957.0|  2021-09-21|employee32@compan...|Imputed_Departmen...|
|     Human Resources|        E14|Employee_30|42465.0|  2021-05-14|employee30@compan...|      Original_Valid|
|     Human Resources|        E15|Employee_18|40365.0|  2023-01-18|employee18@compan...|      Original_Valid|
|         

## 5. Final schema enforcement

Final schema was explicitly enforced to ensure downstream compatibility and data contract stability.

In [92]:
# Fixing column order
df_clean=df_dep1.select("employee_id","name","department","salary",
                         "joining_date","email","salary_flag")
df_clean.printSchema()
df_clean.show(5)

df_clean.coalesce(1)\
.write.mode("overwrite")\
.option("header","true")\
.csv("../output/cleaned_data")

root
 |-- employee_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- joining_date: date (nullable = true)
 |-- email: string (nullable = true)
 |-- salary_flag: string (nullable = false)

+-----------+-----------+--------------------+-------+------------+--------------------+--------------------+
|employee_id|       name|          department| salary|joining_date|               email|         salary_flag|
+-----------+-----------+--------------------+-------+------------+--------------------+--------------------+
|         E1|Employee_26|Information Techn...|34652.0|  2019-05-19|employee26@compan...|      Original_Valid|
|        E10|Employee_27|Information Techn...|48250.0|  2022-09-01|employee27@compan...|      Original_Valid|
|        E11|Employee_22|               Sales|77966.0|  2019-09-15|employee22@compan...|      Original_Valid|
|        E12|Employee_32|             Finance|67957.

## 6. Exporting invalid records to separate file (for client transparency)

In [84]:
df_invalid_salary=df_raw.filter(
    (col("salary").isNull()) |
    (col("salary")=="") |
    (regexp_replace(col("salary"), ",", "").cast("double").isNull()) |
    (regexp_replace(col("salary"),",","").cast("double") < 0)
)

df_invalid_salary.show(100)

df_invalid_salary.coalesce(1)\
.write.mode("overwrite")\
.option("header","true")\
.csv("../output/invalid_data")

+-----------+-----------+--------------------+------+------------+--------------------+
|employee_id|       name|          department|salary|joining_date|               email|
+-----------+-----------+--------------------+------+------------+--------------------+
|        E82| Employee_1|                  IT|-31639|    2/4/2020| Employee1@Compan...|
|        E36| Employee_5|     Human Resources|-44110|    7/9/2019| Employee5@Compan...|
|        E46| Employee_6|                  HR|-52541|    2/3/2023| Employee6@Compan...|
|        E49| Employee_7|                  IT|-66178|  13-12-2022| Employee7@Compan...|
|        E59| Employee_9|             Finance|  NULL|  28-12-2020| Employee9@Compan...|
|        E90|Employee_10|               sales|  NULL|    2/2/2023| Employee10@Compa...|
|        E60|Employee_11|             Finance|  NULL|  25-04-2019| Employee11@Compa...|
|        E41|Employee_12|             Finance|  NULL|   3/11/2020| Employee12@Compa...|
|        E47|Employee_21|       

## 7. REPORT CREATION

### 7.1 Total salary expenses

In [86]:
#total salary expense
df_clean.select(sum("salary")).show()

df_clean.write.mode("overwrite")\
.option("header","true")\
.csv("../output/reports/total_salary_expenses")

+-----------+
|sum(salary)|
+-----------+
|  3771350.0|
+-----------+



### 7.2 Employee count per department

In [87]:
df_clean.groupBy("department").count().show()

df_clean.coalesce(1).write.mode("overwrite")\
.option("header","true")\
.csv("../output/reports/employee_per_department")

+--------------------+-----+
|          department|count|
+--------------------+-----+
|               Sales|   13|
|Information Techn...|   16|
|             Finance|   19|
|     Human Resources|   14|
+--------------------+-----+



### 7.3 Average salary per department

In [88]:
df_clean.groupBy("department").agg(
    avg("salary").alias("avg_salary")
).show()

df_clean.coalesce(1).write.mode("overwrite")\
.option("header","true")\
.csv("../output/reports/average_salary_per_department")

+--------------------+------------------+
|          department|        avg_salary|
+--------------------+------------------+
|               Sales|           72166.0|
|Information Techn...|         46622.875|
|             Finance| 65903.84210526316|
|     Human Resources|59646.642857142855|
+--------------------+------------------+



### 7.4 Hiring trend by month

In [89]:
df_clean.groupBy(
    date_format("joining_date","yyyy-MM").alias("joining_month")
    ).count().orderBy("joining_month").show(100)

df_clean.coalesce(1).write.mode("overwrite")\
.option("header","true")\
.csv("../output/reports/hiring_trend_by_month")

+-------------+-----+
|joining_month|count|
+-------------+-----+
|      2019-01|    2|
|      2019-02|    1|
|      2019-04|    3|
|      2019-05|    2|
|      2019-07|    3|
|      2019-08|    1|
|      2019-09|    3|
|      2019-11|    2|
|      2020-01|    2|
|      2020-02|    2|
|      2020-03|    1|
|      2020-04|    1|
|      2020-06|    3|
|      2020-07|    1|
|      2020-11|    1|
|      2020-12|    1|
|      2021-02|    2|
|      2021-03|    2|
|      2021-05|    2|
|      2021-06|    2|
|      2021-08|    1|
|      2021-09|    3|
|      2021-11|    3|
|      2022-01|    1|
|      2022-02|    1|
|      2022-04|    1|
|      2022-05|    2|
|      2022-08|    1|
|      2022-09|    3|
|      2022-10|    1|
|      2022-11|    3|
|      2022-12|    2|
|      2023-01|    1|
|      2023-02|    2|
+-------------+-----+



### Data Analysis

After completing data cleaning and validation, exploratory analysis was performed to generate key HR insights.

1. Total Salary Expense

Total Payroll Cost: 3,771,350

This represents the organization’s total salary expenditure after data correction and imputation.


2. Employee Distribution by Department

|Department|Employee_Count| 
|--|--|
|Finance|19| 
|IT|6| 
|HR|14|
|Slaes|13|

Insight:
Finance has the highest employee count, suggesting greater operational staffing requirements compared to other departments.

3. Average Salary by Department

|Department|Average_Salary|
|--|--|
|Sales|72,166|
|Finance|65,903|
|HR|59,646|
|IT|46,622|

Insight:
Sales has the highest average salary, while Information Technology has the lowest. This may reflect commission structures, role hierarchy, or compensation strategy differences across departments.

4. Hiring Trend Analysis (2019-2021)

Joining dates were aggregated by month (YYYY-MM) to analyze recruitment patterns.

Months with highest hiring count (3):
- 2019-04
- 2019-07
- 2019-09
- 2020-06
- 2021-09
- 2021-11
- 2022-09
- 2022-11
There are multiple moderate peaks, not just a few isolated ones.

Conclusion:
The organization maintained steady recruitment activity without significant expansion or contraction periods.

---
## Final Project Summary

#### Final Project Summary – HR Data Cleaning & Analysis

##### Project Overview:-
This project focused on building a structured data cleaning and validation workflow for an HR employee dataset using PySpark.
The raw dataset contained
- Duplicate records

- Missing salary values

- Negative salary entries

- Inconsistent department naming

- Mixed date formats

- Unstructured salary formatting
The objective was to design a reproducible transformation process to produce a standardized, analytics-ready dataset.

##### Data Engineering Tasks Performed

The following transformation steps were implemented:

- Removed duplicate records using employee_id as primary key

- Standardized salary format (removed commas, cast to numeric)

- Identified and handled negative salary values

- Treated invalid salary entries as missing

- Imputed missing salaries using department-level median

- Created salary_flag column to maintain audit traceability

- Standardized department values

- Normalized date format to YYYY-MM-DD

- Cleaned and normalized email formatting

- Enforced final schema for downstream compatibility

##### Data Quality Impact

- Total records processed: 62

- Salary records corrected or imputed: 30

- Final dataset contains no null salary values

- Schema standardized for reliable downstream consumption

##### Validation & Aggregation Checks

- Post-transformation validation queries were executed to verify:

- Total payroll consistency

- Department-level employee distribution

- Average salary by department

- Hiring distribution across months

These checks confirm transformation accuracy and data integrity.

##### Conclusion

This project demonstrates practical Data Engineering skills including:

- Data quality validation

- Schema standardization

- Controlled imputation strategy

- Audit flag implementation

- Aggregation-based validation

- Building transformation logic using PySpark

The final dataset is structured, validated, and ready for reporting, dashboarding, or pipeline integration.

---

### End of Project