**SILVER LAYER**

Identifier validation

In [0]:
# Read Bronze layer Delta tables
# Bronze data is used as the source for Silver layer cleaning, validation,
df_school_master = spark.table("bronze_school_master")
df_enrollment    = spark.table("bronze_student_enrollment")
df_performance   = spark.table("bronze_student_performance")

In [0]:
from pyspark.sql.functions import col

In [0]:
# school_id is a critical identifier and should never be null or empty
invalid_school_ids = (
    df_school_master
    .filter(col("school_id").isNull() | (col("school_id") == ""))
    .count()
)

if invalid_school_ids > 0:
    raise ValueError("school_master contains invalid or missing school_id values")

In [0]:
# Enrollment records must map to an existing school
orphan_enrollment = (
    df_enrollment
    .join(df_school_master, "school_id", "left_anti")
    .count()
)

if orphan_enrollment > 0:
    raise ValueError("Enrollment data contains unknown school_id values")


# Performance records must also map to an existing school
orphan_performance = (
    df_performance
    .join(df_school_master, "school_id", "left_anti")
    .count()
)

if orphan_performance > 0:
    raise ValueError("Performance data contains unknown school_id values")

In [0]:
# Academic years outside a reasonable range usually indicate bad data
invalid_years = (
    df_enrollment
    .filter((col("academic_year") < 2010) | (col("academic_year") > 2030))
    .count()
)

if invalid_years > 0:
    raise ValueError("Invalid academic_year values detected in enrollment data")


DATA CLEANING & TRANSFORMATION

In [0]:
from pyspark.sql.functions import trim, upper, when

In [0]:
# Standardize region values:
# - Remove extra spaces
# - Convert to proper case
df_school_master_clean = (
    df_school_master
    .withColumn("region", trim(upper(col("region"))))
    .withColumn(
        "region",
        when(col("region") == "NORTH", "North")
        .when(col("region") == "SOUTH", "South")
        .when(col("region") == "EAST", "East")
        .when(col("region") == "WEST", "West")
        .otherwise("Central")
    )
)

In [0]:
# Normalize school_type values for consistency
df_school_master_clean = (
    df_school_master_clean
    .withColumn(
        "school_type",
        when(col("school_type").isin("Govt", "GOVT"), "Government")
        .otherwise(col("school_type"))
    )
)

Handle missing & inconsistent enrollment records

In [0]:
from pyspark.sql.functions import lit,when,col

In [0]:

# Drop records where school_id or academic_year is missing
# These are mandatory identifiers and cannot be inferred
df_enrollment_clean = (
    df_enrollment
    .dropna(subset=["school_id", "academic_year"])
)

In [0]:
# Handle missing grade_level:
df_enrollment_clean = (
    df_enrollment
    .withColumn(
        "grade_level",
        when(
            col("grade_level").isNull() | (col("grade_level") < 0),0)  
            .otherwise(col("grade_level").cast("int"))
    )
    .withColumn("student_count", col("student_count").cast("int"))
    .filter(col("student_count") > 0)
)


In [0]:
# Standardize gender values
df_enrollment_clean = (
    df_enrollment_clean
    .withColumn(
        "gender",
        when(col("gender").isin("M"), "Male")
        .when(col("gender").isin("F"), "Female")
        .otherwise(col("gender"))
    )
)


In [0]:
# Deduplicate enrollment records
# Keeps one record per school, year, grade, and gender combination
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy(
    "school_id", "academic_year", "grade_level", "gender"
).orderBy("school_id")

df_enrollment_dedup = (
    df_enrollment_clean
    .withColumn("row_num", row_number().over(window_spec))
    .filter(col("row_num") == 1)
    .drop("row_num")
)


Clean student performance data

In [0]:
# Remove records where both score and attendance are missing
# These records provide no analytical value
df_performance_clean = (
    df_performance
    .dropna(subset=["average_score", "attendance_percentage"], how="all")
)

In [0]:
# Handle missing grade_level in performance data
df_performance_clean = df_performance_clean.withColumn(
    "grade_level",
    when(col("grade_level").isNull(), -1)
    .otherwise(col("grade_level"))
)

In [0]:
# Recalculate dropout_risk_flag where it is missing
df_performance_clean = (
    df_performance_clean
    .withColumn(
        "dropout_risk_flag",
        when(
            col("dropout_risk_flag").isNull() &
            ((col("attendance_percentage") < 72) | (col("average_score") < 52)),
            1
        )
        .when(col("dropout_risk_flag").isNull(), 0)
        .otherwise(col("dropout_risk_flag"))
    )
)

Handling Outliers

In [0]:
# Handling Outliers and Storing only meaningful data
df_performance_clean = (
    df_performance_clean
    .filter((col("average_score") >= 0) & (col("average_score") <= 100))
    .filter((col("attendance_percentage") >= 0) & (col("attendance_percentage") <= 100))
    .filter((col("dropout_risk_flag") >= 0) & (col("dropout_risk_flag") <= 100))
)


In [0]:
# Save Silver tables as Delta
df_school_master_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver_school_master")

df_enrollment_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver_enrollment")

df_performance_clean.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("silver_performance")