Task 1 - Employee_data

In [None]:
import boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Define S3 paths
bucket_name = "poc-bootcamp-capstone-group3"
bronze_prefix = "poc-bootcamp-bronze/employee_data/"
gold_path = f"s3://{bucket_name}/poc-bootcamp-gold/employee_data_output/"
processed_file_key = f"{bronze_prefix}processed_files.txt"

# Expected columns
expected_columns = {"emp_id", "age", "name"}

# Initialize Spark and boto3
spark = (
    SparkSession.builder
    .appName("DailyEmployeeProcessing")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.kryo.registrationRequired", "true")
    .getOrCreate()
)
s3 = boto3.client('s3')

try:
    # Ensure processed files log exists
    try:
        obj = s3.get_object(Bucket=bucket_name, Key=processed_file_key)
        processed_files = {line.split(',')[0]: float(line.split(',')[1]) for line in obj['Body'].read().decode('utf-8').splitlines() if line}
    except s3.exceptions.NoSuchKey:
        processed_files = {}

    # List all CSV files in bronze
    files = s3.list_objects_v2(Bucket=bucket_name, Prefix=bronze_prefix).get('Contents', [])
    csv_files = [f for f in files if f['Key'].endswith('.csv')]

    # Identify new/updated files
    to_process = []
    for file in csv_files:
        key = file['Key']
        last_modified = file['LastModified'].timestamp()
        if processed_files.get(key) != last_modified:
            to_process.append((key, last_modified))

    if not to_process:
        print("INFO: No new or updated files to process. Exiting gracefully.")
    else:
        for key, last_modified in to_process:
            print(f"Processing file: {key}")
            df = spark.read.option("header", True).option("inferSchema", True).csv(f"s3://{bucket_name}/{key}")

            # Check if all expected columns are present
            actual_columns = set(df.columns)
            missing = expected_columns - actual_columns
            if missing:
                raise ValueError(f"ERROR: Missing expected columns in {key}: {missing}")

            # Clean and validate
            cleaned_df = df.select(
                col("emp_id").cast("string"),import boto3
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Define S3 paths
bucket_name = "poc-bootcamp-capstone-group3"
bronze_prefix = "poc-bootcamp-bronze/employee_data/"
gold_path = f"s3://{bucket_name}/poc-bootcamp-gold/employee_data_output/"
processed_file_key = f"{bronze_prefix}processed_files.txt"

# Expected columns
expected_columns = {"emp_id", "age", "name"}

# Initialize Spark and boto3
spark = SparkSession.builder.appName("DailyEmployeeProcessing").getOrCreate()
s3 = boto3.client('s3')

try:
    # Ensure processed files log exists
    try:
        obj = s3.get_object(Bucket=bucket_name, Key=processed_file_key)
        processed_files = {line.split(',')[0]: float(line.split(',')[1]) for line in obj['Body'].read().decode('utf-8').splitlines() if line}
    except s3.exceptions.NoSuchKey:
        processed_files = {}

    # List all CSV files in bronze
    files = s3.list_objects_v2(Bucket=bucket_name, Prefix=bronze_prefix).get('Contents', [])
    csv_files = [f for f in files if f['Key'].endswith('.csv')]

    # Identify new/updated files
    to_process = []
    for file in csv_files:
        key = file['Key']
        last_modified = file['LastModified'].timestamp()
        if processed_files.get(key) != last_modified:
            to_process.append((key, last_modified))

    if not to_process:
        print("INFO: No new or updated files to process. Exiting gracefully.")
    else:
        for key, last_modified in to_process:
            print(f"Processing file: {key}")
            df = spark.read.option("header", True).option("inferSchema", True).csv(f"s3://{bucket_name}/{key}")

            # Check if all expected columns are present
            actual_columns = set(df.columns)
            missing = expected_columns - actual_columns
            if missing:
                raise ValueError(f"ERROR: Missing expected columns in {key}: {missing}")

            # Clean and validate
            cleaned_df = df.select(
                col("emp_id").cast("string"),
                col("age").cast("int"),
                col("name").cast("string")
            ).dropna().dropDuplicates()

            # Write cleaned data to gold path
            cleaned_df.write.mode("append").parquet(gold_path)

            # Update log
            processed_files[key] = last_modified

        # Write updated log back to S3
        log_content = "\n".join([f"{k},{v}" for k, v in processed_files.items()])
        s3.put_object(Bucket=bucket_name, Key=processed_file_key, Body=log_content)

except Exception as e:
    print(f"ERROR: Job failed with exception: {e}")


                col("age").cast("int"),
                col("name").cast("string")
            ).dropna().dropDuplicates()

            # Write cleaned data to gold path
            cleaned_df.write.mode("append").parquet(gold_path)

            # Update log
            processed_files[key] = last_modified

        # Write updated log back to S3
        log_content = "\n".join([f"{k},{v}" for k, v in processed_files.items()])
        s3.put_object(Bucket=bucket_name, Key=processed_file_key, Body=log_content)

except Exception as e:
    print(f"ERROR: Job failed with exception: {e}")



Task 2 - leave_quota

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType
import boto3

# Define S3 paths
bucket_name = "poc-bootcamp-capstone-group3"
bronze_prefix = "poc-bootcamp-bronze/employee_leave_quota/"
gold_path = f"s3://{bucket_name}/poc-bootcamp-gold/employee_leave_quota_output/"
processed_file_key = f"{bronze_prefix}processed_files.txt"

# Initialize Boto3
# Initialize Spark with Kryo serialization enabled
spark = (
    SparkSession.builder
      .appName("YearlyLeaveQuotaTableJob")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.kryo.registrationRequired", "true")
      .getOrCreate()
)
s3_client = boto3.client('s3')

try:
    # Step 1: Load processed files
    try:
        processed_obj = s3_client.get_object(Bucket=bucket_name, Key=processed_file_key)
        processed_files = processed_obj['Body'].read().decode('utf-8').splitlines()
    except s3_client.exceptions.ClientError:
        processed_files = []

    processed_files_set = set(processed_files)  # Faster lookup

    # Step 2: List all files under bronze_prefix
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=bronze_prefix)
    all_files = response.get('Contents', [])

    # Filter only CSV files excluding processed_files.txt itself
    new_files = [obj['Key'] for obj in all_files if obj['Key'].endswith('.csv') and obj['Key'] not in processed_files_set]

    if not new_files:
        print("No new files to process.")
    else:
        for file_key in new_files:
            file_path = f"s3://{bucket_name}/{file_key}"
            file_name = file_key.split("/")[-1]  # Get only filename

            print(f"Processing file: {file_name}")

            # Step 3: Read the file
            df = spark.read.option("header", True).option("inferSchema", True).csv(file_path)

            # Validate required columns
            required_cols = ["emp_id", "leave_quota", "year"]
            if not all(c in df.columns for c in required_cols):
                print(f"Skipping {file_name}: Missing required columns.")
                continue

            # Step 4: Clean and transform
            cleaned_df = (
                df.select(
                    col("emp_id").cast(StringType()),
                    col("leave_quota").cast(IntegerType()),
                    col("year").cast(IntegerType())
                )
                .filter(
                    col("emp_id").isNotNull() &
                    col("leave_quota").isNotNull() &
                    col("year").isNotNull()
                )
                .dropDuplicates(["emp_id", "year"])
            )

            # Step 5: Write to Gold
            print(f"Writing {file_name} data to {gold_path}")
            cleaned_df.write.mode("append").partitionBy("year").parquet(gold_path)

            # Step 6: After successful write, mark as processed
            processed_files_set.add(file_key)

        # Step 7: Update processed_files.txt
        updated_content = "\n".join(processed_files_set)
        s3_client.put_object(Bucket=bucket_name, Key=processed_file_key, Body=updated_content)
        print("Processed files list updated.")

except Exception as e:
    print(f"Job failed: {e}")

Task 3 - leave_calender

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year
from pyspark.sql.types import DateType, StringType
import boto3

# S3 paths
bucket_name = "poc-bootcamp-capstone-group3"
bronze_prefix = "poc-bootcamp-bronze/employee_leave_calendar_data/"
gold_path = f"s3://{bucket_name}/poc-bootcamp-gold/employee_leave_calendar_output/"
processed_file_key = f"{bronze_prefix}processed_files.txt"

# Initialize Spark session
spark = (
    SparkSession.builder
        .appName("YearlyLeaveCalendarTableJob")
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .config("spark.kryo.registrationRequired", "true")
        .getOrCreate()
)

# Initialize boto3 client
s3_client = boto3.client('s3')

try:
    # Step 1: Load processed files
    try:
        processed_obj = s3_client.get_object(Bucket=bucket_name, Key=processed_file_key)
        processed_files = processed_obj['Body'].read().decode('utf-8').splitlines()
    except s3_client.exceptions.ClientError:
        processed_files = []

    processed_files_set = set(processed_files)

    # Step 2: List all files under bronze_prefix
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=bronze_prefix)
    all_files = response.get('Contents', [])

    # Filter only CSV files excluding processed_files.txt itself
    new_files = [obj['Key'] for obj in all_files if obj['Key'].endswith('.csv') and obj['Key'] not in processed_files_set]

    if not new_files:
        print("No new files to process.")
    else:
        for file_key in new_files:
            file_path = f"s3://{bucket_name}/{file_key}"
            file_name = file_key.split("/")[-1]  # Get only filename

            print(f"Processing file: {file_name}")

            # Step 3: Read the file
            df = spark.read.option("header", True).option("inferSchema", True).csv(file_path)

            # Validate required columns
            required_cols = ["date", "reason"]
            if not all(c in df.columns for c in required_cols):
                print(f"Skipping {file_name}: Missing required columns.")
                continue

            # Step 4: Clean and transform
            cleaned_df = (
                df.select(
                    col("date").cast(DateType()),
                    col("reason").cast(StringType())
                )
                .filter(
                    col("date").isNotNull() &
                    col("reason").isNotNull()
                )
                .dropDuplicates(["date", "reason"]) 
            )

            # Add year column
            cleaned_df = cleaned_df.withColumn("year", year(col("date")))

            # Step 5: Write to Gold
            print(f"Writing {file_name} data to {gold_path}")
            cleaned_df.write.mode("append").partitionBy("year").parquet(gold_path)

            # Step 6: After successful write, mark as processed
            processed_files_set.add(file_key)

        # Step 7: Update processed_files.txt
        updated_content = "\n".join(processed_files_set)
        s3_client.put_object(Bucket=bucket_name, Key=processed_file_key, Body=updated_content)
        print("Processed files list updated.")

except Exception as e:
    print(f"Job failed: {e}")

Task 4 - leave_data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, current_date, to_date,
    sum as sum_, when, lit, year, month
)
from pyspark.sql.types import StringType, DateType

# Constants
BRONZE_PATH = "s3://poc-bootcamp-capstone-group3/poc-bootcamp-bronze/employee_leave_data/"
CONSOLIDATED = "s3://poc-bootcamp-capstone-group3/poc-bootcamp-gold/employee_leave_data_output/"


# Initialize Spark with Kryo serialization
spark = (
    SparkSession.builder
      .appName("DailyLeaveSnapshot")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.kryo.registrationRequired", "true")
      .getOrCreate()
)

# 1) LOAD raw events and cast date
raw = (
    spark.read
    .option("header", True)
    .csv(BRONZE_PATH)
    .withColumn("date", to_date(col("date"), "yyyy-M-d"))
)

# 2) AGGREGATE to find dominant status per emp/date
raw_agg = (
    raw.groupBy("emp_id", "date")
    .agg(
        sum_(when(col("status") == "ACTIVE", 1).otherwise(0)).alias("cnt_active"),
        sum_(when(col("status") == "CANCELLED", 1).otherwise(0)).alias("cnt_cancelled")
    )
    .withColumn(
        "new_status",
        when(col("cnt_active") > col("cnt_cancelled"), lit("ACTIVE"))
        .otherwise(lit("CANCELLED"))  # Includes tie and CANCELLED majority cases
    )
    .select("emp_id", "date", "new_status")
)

# 3) READ previous snapshot (if any)
try:
    hist = (
        spark.read
        .parquet(CONSOLIDATED)
        .select("emp_id", "date", "status", "ingestion_date")
    )
except:
    hist = spark.createDataFrame(
        [],
        schema=raw_agg.schema.add("status", StringType()).add("ingestion_date", DateType())
    )

# 4) MERGE logic: outer-join agg & history
merged = hist.alias("h").join(
    raw_agg.alias("r"),
    on=["emp_id", "date"],
    how="outer"
)

# 5) Decide final status and ingestion_date
today = current_date()
result = (
    merged
    .withColumn(
        "final_status",
        when(col("r.new_status").isNotNull(), col("r.new_status"))
        .otherwise(col("h.status"))
    )
    .withColumn(
        "ingestion_date",
        when(col("r.new_status").isNotNull(), today)
        .otherwise(col("h.ingestion_date"))
    )
    .filter(col("final_status").isNotNull())
    .select(
        col("emp_id"),
        col("date"),
        col("final_status").alias("status"),
        col("ingestion_date")
    )
)

# 6) Add partitions and write to Gold
result_with_partition = (
    result
    .withColumn("year", year(col("date")))
    .withColumn("month", month(col("date")))
)

(
    result_with_partition
    .write
    .mode("overwrite")
    .partitionBy("year", "month")
    .parquet(CONSOLIDATED)
)


Task 5 - timeframe_data

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    to_date, col, year, month, countDistinct, concat_ws, lit, date_format, from_unixtime, expr, row_number, lead
)
from pyspark.sql.window import Window
from datetime import datetime
import boto3

# ── 1) Glue setup
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# ── Enable Kryo serialization
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "true")

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# ── 2) S3 Config
bucket_name      = "poc-bootcamp-capstone-group3"
bronze_folder    = "poc-bootcamp-bronze/employee_timeframe_data/"
silver_base      = f"s3://{bucket_name}/poc-bootcamp-silver/employee_timeframe_data/"
gold_path        = f"s3://{bucket_name}/poc-bootcamp-gold/employee_timeframe_data_output/"
log_file_key     = f"{bronze_folder}processed_files.txt"

# ── 3) Today’s date for Silver path
today_date = datetime.today().strftime('%Y-%m-%d')

# ── 4) Ensure processed-files log exists
s3 = boto3.client('s3')
try:
    s3.head_object(Bucket=bucket_name, Key=log_file_key)
except s3.exceptions.ClientError:
    s3.put_object(Bucket=bucket_name, Key=log_file_key, Body="")

# ── 5) Load processed-files log
processed_obj       = s3.get_object(Bucket=bucket_name, Key=log_file_key)
processed_lines     = processed_obj['Body'].read().decode().splitlines()
processed_files     = {line.split(',')[0]: float(line.split(',')[1]) for line in processed_lines if line}
  
# ── 6) List new/modified CSVs in Bronze
resp    = s3.list_objects_v2(Bucket=bucket_name, Prefix=bronze_folder)
all_keys = [c['Key'] for c in resp.get('Contents', []) if c['Key'].endswith('.csv')]
new_keys = []
for key in all_keys:
    lm = s3.head_object(Bucket=bucket_name, Key=key)['LastModified'].timestamp()
    if processed_files.get(key) != lm:
        new_keys.append((key, lm))

if not new_keys:
    print("🚫 No new files to process.")
else:
    print(f"✅ Found {len(new_keys)} new files.")

    # ── 7) Read new files into Silver DataFrame
    paths     = [f"s3://{bucket_name}/{key}" for key, _ in new_keys]
    silver_df = spark.read.csv(paths, header=True, inferSchema=True) \
        .withColumn("start_date", to_date(from_unixtime(col("start_date")))) \
        .withColumn("end_date",   to_date(from_unixtime(col("end_date"))))

    # ── 8) Write Silver for today
    silver_path = silver_base + today_date + "/"
    silver_df.write.mode("append").parquet(silver_path)
    print(f"✅ Silver written to {silver_path}")

    # ── 9) Update processed-files log
    for key, lm in new_keys:
        processed_files[key] = lm
    log_body = "\n".join(f"{k},{v}" for k, v in processed_files.items())
    s3.put_object(Bucket=bucket_name, Key=log_file_key, Body=log_body)
    print("📝 Updated processed-files log.")

    # ── 10) Silver ➔ Gold merge & dedupe
    try:
        existing_gold = spark.read.parquet(gold_path)
    except:
        existing_gold = spark.createDataFrame([], silver_df.schema.add("status", "string"))

    combined = existing_gold.unionByName(silver_df.withColumn("status", lit(None)), allowMissingColumns=True)

    # Deduplicate by highest salary per emp_id + timeframe
    w1 = Window.partitionBy("emp_id", "start_date", "end_date").orderBy(col("salary").desc())
    dedup = combined.withColumn("rn", row_number().over(w1)).filter(col("rn") == 1).drop("rn")

    # Fill missing end_date
    w2 = Window.partitionBy("emp_id").orderBy("start_date")
    dedup = dedup.withColumn("next_start", lead("start_date").over(w2)) \
        .withColumn("end_date", when(col("end_date").isNull() & col("next_start").isNotNull(),
                                     expr("date_sub(next_start,1)")).otherwise(col("end_date")))

    # Assign ACTIVE/INACTIVE
    final_df = dedup.withColumn("status",
        when(col("end_date").isNull(), "ACTIVE").otherwise("INACTIVE")
    ).select("emp_id","start_date","end_date","designation","salary","status")

    # ── 11) Write Gold partitioned by status
    final_df.write.mode("overwrite").partitionBy("status").parquet(gold_path)
    print("🏆 Gold layer updated successfully.")

# ── 12) Commit Glue job
job.commit()


Task 6 - Count by designation

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.context import SparkContext
from pyspark.sql.functions import col, current_date
from datetime import datetime
import boto3

# Initialize Spark and Glue contexts
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# ✅ Enable Kryo Serialization and Snappy compression
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc._conf.set("spark.sql.parquet.compression.codec", "snappy")

# Initialize Glue Job
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Define S3 paths
gold_path = "s3://poc-bootcamp-capstone-group3/poc-bootcamp-gold/employee_timeframe_data_output/"
output_path = "s3://poc-bootcamp-capstone-group3/poc-bootcamp-gold/daily_active_employees_by_designation_output/"

# Read Gold Layer timeframe data
df = spark.read.parquet(gold_path)

# Filter records where today is between start_date and (end_date is null)
today = datetime.utcnow().date()
active_employees_df = df.filter(
    (col("start_date") <= today) &
    (col("end_date").isNull())
)

# Group by designation and count
summary_df = active_employees_df.groupBy("designation").count().withColumnRenamed("count", "active_count")

# Add snapshot date column for partitioning
summary_df = summary_df.withColumn("snapshot_date", current_date())

# Write to S3 partitioned by snapshot_date with Snappy compression
summary_df.write.mode("append").partitionBy("snapshot_date").parquet(output_path)

print("✅ Daily active employee snapshot generated and saved to S3.")

# Commit job
job.commit()


Task 7 - 8% Threshold

In [None]:
import datetime
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, dayofweek, lit
from pyspark.sql.types import DateType
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions

# Initialize Spark & Glue Context
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Enable Kryo serialization for optimized performance
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "true")

# Get job parameters
args = getResolvedOptions(sys.argv, ['YEAR', 'TODAY_DATE'])

# PARAMETERS
YEAR = int(args['YEAR'])  # Convert YEAR to integer
THRESHOLD_PCT = 0.08
today = datetime.datetime.strptime(args['TODAY_DATE'], '%Y-%m-%d').date()  # Convert string to date

# FILE PATHS
leaves_path = "s3://poc-bootcamp-capstone-group3/poc-bootcamp-gold/employee_leave_data_output/"
holidays_path = "s3://poc-bootcamp-capstone-group3/poc-bootcamp-bronze/employee_leave_calendar_data/"

# LOAD LEAVE DATA (Parquet format)
leaves_df = spark.read.parquet(leaves_path).withColumn("date", col("date").cast(DateType()))

# LOAD HOLIDAY DATA (CSV format)
holidays_df = spark.read.option("header", True).csv(holidays_path).withColumn("date", col("date").cast(DateType()))

# BUILD CALENDAR (Excluding weekends and holidays)
start = today + datetime.timedelta(days=1)  # Start from tomorrow
end = datetime.date(YEAR, 12, 31)  # End on December 31

# Generate range of dates from tomorrow to Dec 31
days_df = spark.range(0, (end - start).days + 1).select((lit(start) + col("id").cast("int")).alias("date"))

# Filter out weekends and holidays
working_days_df = days_df.join(holidays_df, on="date", how="left_anti").filter(dayofweek(col("date")).between(2, 6))

total_working_days = working_days_df.count()
print("Total upcoming working days:", total_working_days)

# FILTER ACTIVE LEAVES ON FUTURE WORKING DAYS (Excluding cancelled)
active_leaves_df = (
    leaves_df.filter((col("status") == "ACTIVE") & (col("date") > lit(today)))
    .join(working_days_df, on="date", how="inner")
    .dropDuplicates(["emp_id", "date"])
)

# COUNT LEAVE APPLICATIONS PER EMPLOYEE
emp_leave_counts_df = active_leaves_df.groupBy("emp_id").agg(countDistinct("date").alias("upcoming_leaves"))

# APPLY 8% THRESHOLD
result_df = emp_leave_counts_df.filter(col("upcoming_leaves") > THRESHOLD_PCT * lit(total_working_days))

# Write output to S3 in Parquet format
output_path = "s3://poc-bootcamp-capstone-group3/poc-bootcamp-gold/8%-Threshold_output/"
result_df.write.parquet(output_path, mode="overwrite")


Task 8 - 80% Threshold

In [None]:
import sys
import datetime
import time
import os
import boto3
from botocore.exceptions import ClientError
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    to_date, col, year, month, countDistinct, concat_ws, lit, date_format
)
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

# ── 1) Glue setup
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Enable Kryo serialization for optimized performance
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "true")

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# ── 2) S3 Config
LEAVE_DATA_PATH     = "s3://poc-bootcamp-capstone-group3/poc-bootcamp-gold/employee_leave_data_output/"
LEAVE_CALENDAR_PATH = "s3://poc-bootcamp-capstone-group3/poc-bootcamp-bronze/employee_leave_calendar_data/"
LEAVE_QUOTA_PATH    = "s3://poc-bootcamp-capstone-group3/poc-bootcamp-bronze/employee_leave_quota/"
OUTPUT_PATH         = "s3://poc-bootcamp-capstone-group3/poc-bootcamp-gold/employee_80%_output/"
METADATA_KEY        = "poc-bootcamp-bronze/80%Threshold/metadata.txt"
ALERTS_BUCKET       = "poc-bootcamp-capstone-group3"

# ── 3) Reference date and reporting period
ref_date = datetime.date(2024,11, 1)  # <-- Replace with `datetime.date.today()` for real-time jobs
# ref_date = datetime.date.today()   # <-- Uncomment to use today's date

report_month = ref_date.month - 1 or 12
report_year = ref_date.year if ref_date.month > 1 else ref_date.year - 1
period = f"{report_year}-{report_month:02d}"

# ── 4) Load previously processed metadata
s3 = boto3.client("s3")
processed = set()
tmp_meta = "/tmp/metadata.txt"
try:
    s3.download_file(ALERTS_BUCKET, METADATA_KEY, tmp_meta)
    with open(tmp_meta, "r") as f:
        processed = set(line.strip() for line in f)
except ClientError as e:
    if e.response['Error']['Code'] != 'NoSuchKey':
        raise

# ── 5) Load and clean data
leave_df = (
    spark.read.parquet(LEAVE_DATA_PATH)
         .withColumn("date", to_date("date", "yyyy-M-d"))
         .filter(col("status") == "ACTIVE")
         .dropDuplicates(["emp_id", "date"])
)

holidays_df = (
    spark.read.option("header", True).csv(LEAVE_CALENDAR_PATH)
         .withColumn("date", to_date("date", "yyyy-M-d"))
         .select("date").distinct()
)

clean_leaves = (
    leave_df.withColumn("dow", date_format("date", "E"))
            .filter(~col("dow").isin("Sat", "Sun"))
            .drop("dow")
            .join(holidays_df, on="date", how="left_anti")
)

quota_df = spark.read.option("header", True).csv(LEAVE_QUOTA_PATH)

# ── 6) Filter only leaves in the current reporting year up to the reporting month
reporting_year = report_year

up_to = clean_leaves.filter(
    (year("date") == reporting_year) &
    (month("date") <= report_month)
)

# ── 7) Count leave days per employee
counts_df = up_to.groupBy("emp_id").agg(countDistinct("date").alias("used"))

# ── 8) Compare with quota and find violators
breachers_df = (
    counts_df.join(quota_df, on="emp_id", how="inner")
             .filter((col("used") / col("leave_quota")) > 0.8)
             .select("emp_id")
)

# ── 9) Avoid duplicates
to_alert = [
    emp for emp in breachers_df.collect()
    if f"{period},{emp.emp_id}" not in processed
]

if not to_alert:
    print(f"No new alerts for {period}")

# ── 10) Write alerts to S3 text file
ts = time.strftime("%Y%m%d-%H%M%S")
out_path = f"{OUTPUT_PATH}{period}/run-{ts}/"

lines_df = spark.createDataFrame(
    [(e.emp_id, period) for e in to_alert],
    ["emp_id", "month"]
).select(concat_ws(",", "emp_id", "month").alias("line"))

lines_df.coalesce(1).write.mode("overwrite").text(out_path)
print(f"✅ Alerted {len(to_alert)} employees → {out_path}")

# ── 11) Update metadata
with open(tmp_meta, "a") as f:
    for emp in to_alert:
        f.write(f"{period},{emp.emp_id}\n")
s3.upload_file(tmp_meta, ALERTS_BUCKET, METADATA_KEY)
os.remove(tmp_meta)

# ── 12) Finish
job.commit()
