In [0]:

%run /Workspace/Users/pradeep.ponati@gmail.com/interactive_framework/00_Config

In [0]:

# Load Bronze data
bronze_df = spark.read.format("delta").load(BRONZE_BRIGHTSPACE)
print(f"‚úì Loaded {bronze_df.count()} records from Bronze")

In [0]:
# ============================================================================
# QUALITY CHECK ENGINE FOR BRIGHTSPACE LMS DATA
# ============================================================================

from pyspark.sql.functions import col, when, lit, concat_ws, row_number, current_timestamp
from pyspark.sql.window import Window

print("=" * 70)
print("RUNNING QUALITY CHECKS ON BRIGHTSPACE DATA")
print("=" * 70)
print(f"Total records to check: {bronze_df.count()}\n")

# Store all violations
all_violations = []

In [0]:

# ============================================================================
# CHECK 1: NULL CHECKS - Critical Fields
# ============================================================================
print("‚ñ∂ Check 1: Null values in critical fields...")

critical_fields = ['student_id', 'email', 'enrollment_date']

for field in critical_fields:
    null_records = bronze_df.filter(col(field).isNull())
    
    if null_records.count() > 0:
        violation_df = null_records \
            .withColumn("violation_type", lit("null_value")) \
            .withColumn("violation_severity", lit("critical")) \
            .withColumn("violation_column", lit(field)) \
            .withColumn("violation_description", lit(f"Null value in critical field: {field}"))
        
        all_violations.append(violation_df)
        print(f"  ‚úó Found {violation_df.count()} null values in '{field}'")


In [0]:
# ============================================================================
# CHECK 2: INVALID EMAIL FORMAT
# ============================================================================
print("\n‚ñ∂ Check 2: Email format validation...")

invalid_emails = bronze_df.filter(
    col("email").isNotNull() & 
    ~col("email").rlike("^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$")
)

if invalid_emails.count() > 0:
    violation_df = invalid_emails \
        .withColumn("violation_type", lit("format_mismatch")) \
        .withColumn("violation_severity", lit("high")) \
        .withColumn("violation_column", lit("email")) \
        .withColumn("violation_description", lit("Email does not match expected format"))
    
    all_violations.append(violation_df)
    print(f"  ‚úó Found {violation_df.count()} invalid email formats")

In [0]:

# ============================================================================
# CHECK 3: NEGATIVE GRADES
# ============================================================================
print("\n‚ñ∂ Check 3: Grade percentage validation...")

negative_grades = bronze_df.filter(col("grade_percentage") < 0)

if negative_grades.count() > 0:
    violation_df = negative_grades \
        .withColumn("violation_type", lit("out_of_range")) \
        .withColumn("violation_severity", lit("high")) \
        .withColumn("violation_column", lit("grade_percentage")) \
        .withColumn("violation_description", lit("Grade percentage cannot be negative"))
    
    all_violations.append(violation_df)
    print(f"  ‚úó Found {violation_df.count()} negative grades")



In [0]:
# ============================================================================
# CHECK 4: INVALID ATTENDANCE (> 100%)
# ============================================================================
print("\n‚ñ∂ Check 4: Attendance percentage validation...")

invalid_attendance = bronze_df.filter(col("attendance_percentage") > 100)

if invalid_attendance.count() > 0:
    violation_df = invalid_attendance \
        .withColumn("violation_type", lit("out_of_range")) \
        .withColumn("violation_severity", lit("medium")) \
        .withColumn("violation_column", lit("attendance_percentage")) \
        .withColumn("violation_description", lit("Attendance percentage cannot exceed 100%"))
    
    all_violations.append(violation_df)
    print(f"  ‚úó Found {violation_df.count()} invalid attendance percentages")

In [0]:

# ============================================================================
# CHECK 5: LAST ACCESS BEFORE ENROLLMENT
# ============================================================================
print("\n‚ñ∂ Check 5: Date logic validation...")

invalid_dates = bronze_df.filter(
    col("last_access_date").isNotNull() & 
    col("enrollment_date").isNotNull() &
    (col("last_access_date") < col("enrollment_date"))
)

if invalid_dates.count() > 0:
    violation_df = invalid_dates \
        .withColumn("violation_type", lit("logic_violation")) \
        .withColumn("violation_severity", lit("high")) \
        .withColumn("violation_column", lit("last_access_date")) \
        .withColumn("violation_description", lit("Last access date cannot be before enrollment date"))
    
    all_violations.append(violation_df)
    print(f"  ‚úó Found {violation_df.count()} invalid date sequences")

In [0]:


# ============================================================================
# CHECK 6: DUPLICATE STUDENT ENROLLMENTS
# ============================================================================
print("\n‚ñ∂ Check 6: Duplicate detection...")

window_spec = Window.partitionBy("student_id", "course_code")
duplicates = bronze_df \
    .filter(col("student_id").isNotNull()) \
    .withColumn("row_num", row_number().over(window_spec.orderBy("enrollment_date"))) \
    .filter(col("row_num") > 1)

if duplicates.count() > 0:
    violation_df = duplicates \
        .withColumn("violation_type", lit("duplicate")) \
        .withColumn("violation_severity", lit("medium")) \
        .withColumn("violation_column", lit("student_id,course_code")) \
        .withColumn("violation_description", lit("Duplicate enrollment for same student and course"))
    
    all_violations.append(violation_df)
    print(f"  ‚úó Found {violation_df.count()} duplicate enrollments")

In [0]:
# ============================================================================
# COMBINE ALL VIOLATIONS & CREATE QUARANTINE
# ============================================================================
print("\n" + "=" * 70)
print("QUALITY CHECK SUMMARY")
print("=" * 70)

if all_violations:
    # Union all violations
    quarantine_df = all_violations[0]
    for v_df in all_violations[1:]:
        quarantine_df = quarantine_df.unionByName(v_df, allowMissingColumns=True)
    
    from pyspark.sql.functions import monotonically_increasing_id, concat, lit as spark_lit

# Add quarantine metadata with UNIQUE ID for each violation
    quarantine_df = quarantine_df \
        .withColumn("_quarantine_id", concat(
            spark_lit("Q_"),
            monotonically_increasing_id().cast("string")
        )) \
        .withColumn("_quarantine_timestamp", current_timestamp()) \
        .withColumn("resolution_status", lit("pending")) \
        .withColumn("resolved_by", lit(None).cast("string")) \
        .withColumn("resolution_timestamp", lit(None).cast("timestamp")) \
        .withColumn("resolution_action", lit(None).cast("string")) \
        .withColumn("resolution_notes", lit(None).cast("string"))
    
    # Get unique violated record IDs
    violated_records = quarantine_df.select("_record_hash").distinct()
    
    # Separate clean records
    clean_df = bronze_df.join(violated_records, "_record_hash", "left_anti")
    
    total_records = bronze_df.count()
    clean_count = clean_df.count()
    quarantine_count = quarantine_df.count()
    
    print(f"‚úì Clean records: {clean_count} ({clean_count/total_records*100:.1f}%)")
    print(f"‚úó Quarantined records: {quarantine_count} ({quarantine_count/total_records*100:.1f}%)")
    
    # Show violation breakdown
    print("\nViolation Breakdown:")
    quarantine_df.groupBy("violation_type", "violation_severity") \
        .count() \
        .orderBy("violation_severity", "count", ascending=[False, False]) \
        .show(truncate=False)
    
else:
    print("‚úì No quality issues found! All data is clean.")
    clean_df = bronze_df
    quarantine_df = None






In [0]:
# ============================================================================
# SAVE RESULTS TO DELTA LAKE
# ============================================================================

print("=" * 70)
print("SAVING RESULTS TO DELTA LAKE")
print("=" * 70)

# Save Clean Data to Silver
print(f"\nüìù Writing clean data to Silver...")
print(f"   Path: {SILVER_BRIGHTSPACE}")
print(f"   Records: {clean_df.count()}")

clean_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(SILVER_BRIGHTSPACE)

print(f"‚úì Silver table saved!")

# Save Quarantine Data
if quarantine_df is not None:
    print(f"\nüìù Writing quarantine data...")
    print(f"   Path: {QUARANTINE_BRIGHTSPACE}")
    print(f"   Records: {quarantine_df.count()}")
    
    quarantine_df.write \
        .format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(QUARANTINE_BRIGHTSPACE)
    
    print(f"‚úì Quarantine table saved!")
    
    # Show sample quarantine records
    print("\nüìã Sample Quarantine Records:")
    quarantine_df.select(
        "student_id", "email", "violation_type", 
        "violation_severity", "violation_description"
    ).show(5, truncate=False)

else:
    print("\n‚úì No quarantine data to save (all records clean)")

print("\n" + "=" * 70)
print("DATA QUALITY PIPELINE COMPLETE")
print("=" * 70)
print(f"‚úì Bronze ‚Üí Quality Checks ‚Üí Silver + Quarantine")
print(f"‚úì Ready for interactive resolution!")
print("=" * 70)