In [0]:

AWS_ACCESS_KEY_ID = dbutils.secrets.get(scope = "aws", key = "AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = dbutils.secrets.get(scope = "aws", key = "AWS_SECRET_ACCESS_KEY")
print(AWS_ACCESS_KEY_ID)
print(AWS_SECRET_ACCESS_KEY)

[REDACTED]
[REDACTED]


In [0]:
s3aurl = "s3a://{}:{}@{}".format(AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,"vamsee-spark-io/raw/ibm_hr_attrition.csv")
#s3url = "s3://vamsee-spark-io/source_data/ibm_hr_attrition.csv"

In [0]:
df_preview = spark.read.option("header", "true").csv(s3aurl)

# Show the inferred schema
df_preview.printSchema()

# Display a few rows
df_preview.show(5)

root
 |-- Age: string (nullable = true)
 |-- Attrition: string (nullable = true)
 |-- BusinessTravel: string (nullable = true)
 |-- DailyRate: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- DistanceFromHome: string (nullable = true)
 |-- Education: string (nullable = true)
 |-- EducationField: string (nullable = true)
 |-- EmployeeCount: string (nullable = true)
 |-- EmployeeNumber: string (nullable = true)
 |-- EnvironmentSatisfaction: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- HourlyRate: string (nullable = true)
 |-- JobInvolvement: string (nullable = true)
 |-- JobLevel: string (nullable = true)
 |-- JobRole: string (nullable = true)
 |-- JobSatisfaction: string (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- MonthlyIncome: string (nullable = true)
 |-- MonthlyRate: string (nullable = true)
 |-- NumCompaniesWorked: string (nullable = true)
 |-- Over18: string (nullable = true)
 |-- OverTime: string (nullable = tr

# Bronze Layer

In [0]:
s3outurl = "s3a://{}:{}@{}".format(AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,"vamsee-spark-io/bronze/")
df_preview.coalesce(1).write.mode("append").format("delta").option("header", "true").save(s3outurl)

# Silver Layer

In [0]:
from pyspark.sql import functions as F, types as T
from pyspark.sql.window import Window


# 1) Read Bronze Delta
bronze_path = "s3a://{}:{}@{}".format(AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,"vamsee-spark-io/bronze/")
bronze = spark.read.format("delta").load(bronze_path)

# 2) Standardize schema & strings
# (Adjust casts to match your CSV; these are common in the IBM HR dataset)
silver = (
    bronze
    # trim string columns
    .select(*[
        F.trim(c).alias(c) if t == "string" else F.col(c)
        for c, t in bronze.dtypes
    ])
    # cast numerics that sometimes come as strings
    .withColumn("Age", F.col("Age").cast("int"))
    .withColumn("MonthlyIncome", F.col("MonthlyIncome").cast("int"))
    .withColumn("DistanceFromHome", F.col("DistanceFromHome").cast("int"))
    .withColumn("YearsAtCompany", F.col("YearsAtCompany").cast("int"))
    .withColumn("JobSatisfaction", F.col("JobSatisfaction").cast("int"))
    .withColumn("EnvironmentSatisfaction", F.col("EnvironmentSatisfaction").cast("int"))
    .withColumn("WorkLifeBalance", F.col("WorkLifeBalance").cast("int"))
)

# 3) Normalize categorical values (consistent case)
to_title = ["Department","JobRole","BusinessTravel","EducationField","MaritalStatus"]
for c in to_title:
    if c in silver.columns:
        silver = silver.withColumn(c, F.initcap(F.col(c)))

# 4) Deduplicate on EmployeeNumber (keep latest by ingest_ts if present)
if "ingest_ts" in silver.columns:
    windowed = (silver
        .withColumn("_rn", F.row_number().over(
            Window.partitionBy("EmployeeNumber").orderBy(F.col("ingest_ts").desc_nulls_last())
        ))
        .filter(F.col("_rn")==1)
        .drop("_rn")
    )
    silver = windowed
else:
    silver = silver.dropDuplicates(["EmployeeNumber"])

# 5) Derived columns
silver = (
    silver
    .withColumn("AttritionFlag", F.when(F.col("Attrition")=="Yes", F.lit(1)).otherwise(F.lit(0)))
    .withColumn("OverTimeFlag", F.when(F.col("OverTime")=="Yes", F.lit(1)).otherwise(F.lit(0)))
    .withColumn("TenureYears", F.col("YearsAtCompany").cast("double"))
    .withColumn(
        "AgeBand",
        F.when(F.col("Age") < 26, "18-25")
         .when(F.col("Age") < 36, "26-35")
         .when(F.col("Age") < 46, "36-45")
         .otherwise("46+")
    )
)

# 6) Simple Data Quality checks (fail fast if something’s off)
assert silver.count() > 0, "DQ FAIL: no rows after cleaning"
required = ["EmployeeNumber","Department","JobRole","Attrition"]
for c in required:
    assert silver.filter(F.col(c).isNull() | (F.col(c)=="")).count() == 0, f"DQ FAIL: null/blank in {c}"

# Optional: validate categories you care about
allowed_overtime = {"Yes","No"}
if "OverTime" in silver.columns:
    bad = silver.filter(~F.col("OverTime").isin(list(allowed_overtime))).count()
    assert bad == 0, f"DQ FAIL: unexpected OverTime values"

# 7) Write Silver as Delta back to S3
# (Use coalesce/repartition to control small files if you want)
silver_path = "s3a://{}:{}@{}".format(AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,"vamsee-spark-io/silver/")
(silver
 .repartition(1)            # small demo; for real data pick a sensible number
 .write
 .format("delta")
 .mode("overwrite")
 .option("overwriteSchema","true")
 .save(silver_path)
)



# Gold Layer

In [0]:
# 03_aggregate_gold.py

from pyspark.sql import functions as F

# ---------- Params ----------
dbutils.widgets.text("bucket", "vamsee-spark-io")
bucket = dbutils.widgets.get("bucket")

silver_path = "s3a://{}:{}@{}".format(AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,"vamsee-spark-io/silver/")
gold_base   = "s3a://{}:{}@{}".format(AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,"vamsee-spark-io/gold/")
reports     = "s3a://{}:{}@{}".format(AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,"vamsee-spark-io/reports/")

silver = spark.read.format("delta").load(silver_path)

# ---------- Core KPI summary (single row) ----------
kpi = (
    silver.agg(
        F.count("*").alias("employees"),
        F.mean("AttritionFlag").alias("attrition_rate"),
        F.mean("OverTimeFlag").alias("overtime_share"),
        F.avg("MonthlyIncome").alias("avg_monthly_income"),
        F.avg("JobSatisfaction").alias("avg_job_satisfaction")
    )
    .withColumn("attrition_rate", F.round("attrition_rate", 4))
    .withColumn("overtime_share", F.round("overtime_share", 4))
    .withColumn("avg_monthly_income", F.round("avg_monthly_income", 2))
    .withColumn("avg_job_satisfaction", F.round("avg_job_satisfaction", 2))
)
kpi_path = f"{gold_base}/kpi_summary"
kpi.write.format("delta").mode("overwrite").save(kpi_path)

# ---------- Helper to compute attrition rate rollups ----------
def rate(df, group_cols):
    return (
        df.groupBy(*group_cols)
          .agg(F.count("*").alias("employees"),
               F.mean("AttritionFlag").alias("attrition_rate"))
          .withColumn("attrition_rate", F.round("attrition_rate", 4))
          .orderBy(*group_cols)
    )

# ---------- Rollups ----------
by_dept   = rate(silver, ["Department"])
by_role   = rate(silver, ["Department", "JobRole"])
by_age    = rate(silver, ["AgeBand"]) if "AgeBand" in silver.columns else rate(silver, ["Age"])
by_ot     = rate(silver, ["OverTimeFlag"])

# If you created TenureBand in Silver, prefer that. Otherwise YearsAtCompany.
group_tenure = ["TenureBand"] if "TenureBand" in silver.columns else ["YearsAtCompany"]
by_tenure = rate(silver, group_tenure)

by_dept.write .format("delta").mode("overwrite").save(f"{gold_base}/attrition_by_department")
by_role.write .format("delta").mode("overwrite").save(f"{gold_base}/attrition_by_dept_role")
by_age.write  .format("delta").mode("overwrite").save(f"{gold_base}/attrition_by_age")
by_ot.write   .format("delta").mode("overwrite").save(f"{gold_base}/attrition_by_overtime")
by_tenure.write.format("delta").mode("overwrite").save(f"{gold_base}/attrition_by_tenure")

# ---------- Income & satisfaction insights ----------
income_by_role = (
    silver.groupBy("JobRole")
          .agg(
              F.count("*").alias("employees"),
              F.avg("MonthlyIncome").alias("avg_income"),
              F.expr("percentile_approx(MonthlyIncome, 0.5)").alias("median_income")
          )
          .withColumn("avg_income", F.round("avg_income", 2))
          .orderBy(F.desc("avg_income"))
)
income_by_role.write.format("delta").mode("overwrite").save(f"{gold_base}/income_by_role")

sat_vs_attr = (
    silver.groupBy("JobSatisfaction")
          .agg(
              F.count("*").alias("employees"),
              F.mean("AttritionFlag").alias("attrition_rate")
          )
          .withColumn("attrition_rate", F.round("attrition_rate", 4))
          .orderBy("JobSatisfaction")
)
sat_vs_attr.write.format("delta").mode("overwrite").save(f"{gold_base}/attrition_by_jobsatisfaction")

# ---------- BI-friendly CSV exports (optional) ----------
(by_dept.coalesce(1)
    .write.mode("overwrite").option("header", True)
    .csv(f"{reports}/attrition_by_department_csv"))

(income_by_role.coalesce(1)
    .write.mode("overwrite").option("header", True)
    .csv(f"{reports}/income_by_role_csv"))

(kpi.coalesce(1)
    .write.mode("overwrite").option("header", True)
    .csv(f"{reports}/kpi_summary_csv"))


