In [0]:
#Validate critical fields
#Detect bad / incomplete records
#Create DQ flags (very important)
#Keep data usable for Gold

In [0]:
silver_df = spark.table("student_project.silver_student_dimension")

In [0]:
spark.sql("SHOW TABLES IN student_project").show()

In [0]:
from pyspark.sql.functions import col

In [0]:
invalid_students_df = silver_df.filter(
    col("student_id").isNull() |
    col("academic_yr").isNull() |
    col("class_id").isNull() |
    col("class_name").isNull() |
    col("student_name").isNull()
)

In [0]:
print("Invalid student records:", invalid_students_df.count())

In [0]:
missing_parent_contact_df = silver_df.filter(
    col("f_mobile").isNull() | col("f_email").isNull()
)

In [0]:
print("Students with missing parent contact:",
      missing_parent_contact_df.count())

In [0]:
from pyspark.sql.functions import count
duplicate_students_df = silver_df.groupBy(
    "academic_yr", "class_id", "section_id", "student_id"
).agg(
    count("*").alias("record_count")
).filter(col("record_count") > 1)

In [0]:
print("Duplicate student rows:", duplicate_students_df.count())

In [0]:
from pyspark.sql.functions import when

In [0]:
silver_dq_df = silver_df \
    .withColumn(
        "is_valid_student",
        when(
            col("student_id").isNull() |
            col("academic_yr").isNull() |
            col("class_id").isNull() |
            col("class_name").isNull() |
            col("student_name").isNull(),
            False
        ).otherwise(True)
    ) \
    .withColumn(
        "has_parent_contact",
        when(
            col("f_mobile").isNotNull() & col("f_email").isNotNull(),
            True
        ).otherwise(False)
    )

In [0]:
silver_dq_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("student_project.silver_student_dimension")

In [0]:
spark.table("student_project.silver_student_dimension").printSchema()