# Employee ETL Pipeline - Silver Layer

Architecture Overview:  
![silver.jpg](./silver.jpg "silver.jpg")

In this layer we are focusing on transformation of the data - data cleaning, grouping, joins and data enrichment.    
See more on _Medallion Architecture_: https://www.databricks.com/glossary/medallion-architecture

## Import Libraries

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import count, col, sum as spark_sum
from pyspark.sql import Window

## Get Data from Bronze Layer

In [0]:
employees = spark.read.table("hrdata.bronze.employees")

In [0]:
display(employees.limit(5))

## Data Cleaning and Data Quality Checks
Making sure the data does not include unrealistic values or duplicates.

### Duplicates

In [0]:
DUPLICATE_KEY_COLUMNS = [
  "first_name",
  "last_name",
  "email",
  "date_of_birth",
  "phone_number"
]

In [0]:
employees_counts = employees.groupBy(DUPLICATE_KEY_COLUMNS).agg(count("*").alias("count"))
employees_duplicate_keys = employees_counts.filter(col("count") > 1).drop("count")
display(employees_duplicate_keys)

In [0]:
# Join back to the original DataFrame to get ALL duplicate rows
employees_duplicates = employees.join(employees_duplicate_keys, on=DUPLICATE_KEY_COLUMNS, how="inner")

print("--- Found Duplicate Records (All Occurrences) ---")
display(employees_duplicates)
print(f"Total duplicate records found: {employees_duplicates.count()}")

#### Remove duplicates

In [0]:
employees_deduplicated = employees.dropDuplicates(subset=DUPLICATE_KEY_COLUMNS)
print(f"Remaining records: {employees_deduplicated.count()}")

### Data Validation

#### Salary

In [0]:
SALARY_MIN = 30000
SALARY_MAX = 300000

# filtering salaries lower than SALARY_MIN or higher than SALARY_MAX
unrealistic_salaries = employees_deduplicated.filter((col("salary") > SALARY_MAX) | (col("salary") < SALARY_MIN))
display(unrealistic_salaries[
    ["first_name", "last_name", "email", "department", "salary"]
])

In [0]:
filtered_employees = employees_deduplicated.filter((col("salary") <= SALARY_MAX) & (col("salary") >= SALARY_MIN))
print(f"Remaining records: {filtered_employees.count()}")

#### Email

In [0]:
# filtering unrealistic emails

# without [et]
unrealistic_emails = filtered_employees.filter(
    ~filtered_employees.email.contains("@")
)

# emails without point
unrealistic_emails = unrealistic_emails.union(
    filtered_employees.filter(~filtered_employees.email.contains("."))
)

display(unrealistic_emails
    ["first_name", "last_name", "email", "department"]
)

In [0]:
# removing records with invalid email address - filtering out inrealistic_emails
filtered_employees = filtered_employees.join(unrealistic_emails, on="email", how="left_anti")
print(f"Remaining records: {filtered_employees.count()}")

#### Hire Date

#### Birth Date