## Gold Layer - Risk Analytics

In [0]:
# Read Silver tables
spark_academics = spark.table("student_risk_data.default.silver_academics")
spark_attendance = spark.table("student_risk_data.default.silver_attendance")
spark_demographics = spark.table("student_risk_data.default.silver_demographics")
spark_retention = spark.table("student_risk_data.default.silver_retention")



In [0]:
print("Academics rows:", spark_academics.count())
print("Attendance rows:", spark_attendance.count())
print("Demographics rows:", spark_demographics.count())
print("Retention rows:", spark_retention.count())


Academics rows: 4510
Attendance rows: 4529
Demographics rows: 1000
Retention rows: 2000


In [0]:
print("ACADEMICS SCHEMA")
spark_academics.printSchema()

print("\nATTENDANCE SCHEMA")
spark_attendance.printSchema()

print("\nDEMOGRAPHICS SCHEMA")
spark_demographics.printSchema()

print("\nRETENTION SCHEMA")
spark_retention.printSchema()


ACADEMICS SCHEMA
root
 |-- student_id: string (nullable = true)
 |-- subject_code: string (nullable = true)
 |-- subject_name: string (nullable = true)
 |-- internal_marks: integer (nullable = true)
 |-- external_mark: integer (nullable = true)
 |-- academic_year: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nullable = true)
 |-- source_table: string (nullable = true)
 |-- valid_subject: boolean (nullable = true)
 |-- valid_score: boolean (nullable = true)
 |-- valid_student: boolean (nullable = true)
 |-- is_valid_record: boolean (nullable = true)
 |-- total_marks: integer (nullable = true)
 |-- grade: string (nullable = true)
 |-- pass_status: string (nullable = true)


ATTENDANCE SCHEMA
root
 |-- student_id: string (nullable = true)
 |-- subject_code: string (nullable = true)
 |-- attendance_percentage: integer (nullable = true)
 |-- participation_score: integer (nullable = true)
 |-- academic_year: string (nullable = true)
 |-- ingestion_timestamp: timestamp (nulla

## Analysing Past Retention & Dropout Records 
- Understand dropout patterns from past data

- Provide benchmarks to validate current risk students

- Feed trend and distribution dashboards

In [0]:
from pyspark.sql import functions as F

dropouts = spark_retention.filter(F.col("dropout_flag") == 1)

print("Count of past dropouts:", dropouts.count())

summary = dropouts.agg(
    F.count("*").alias("dropout_count"),
    F.avg("overall_attendance_percentage").alias("avg_attendance"),
    F.avg("academic_score").alias("avg_academic_score"),
    F.avg("age_at_enrollment").alias("avg_age")
)

summary.show(truncate=False)

# prevailing gender dropping out
dropouts.groupBy("gender") \
    .count() \
    .orderBy(F.desc("count")) \
    .limit(1) \
    .show()

# prevailing income band
dropouts.groupBy("family_income_band") \
    .count() \
    .orderBy(F.desc("count")) \
    .limit(1) \
    .show()


Count of past dropouts: 1005
+-------------+-----------------+------------------+-----------------+
|dropout_count|avg_attendance   |avg_academic_score|avg_age          |
+-------------+-----------------+------------------+-----------------+
|1005         |40.33532338308458|42.6179104477612  |20.95820895522388|
+-------------+-----------------+------------------+-----------------+

+------+-----+
|gender|count|
+------+-----+
| OTHER|  594|
+------+-----+

+------------------+-----+
|family_income_band|count|
+------------------+-----+
|            MEDIUM|  328|
+------------------+-----+



In [0]:
# Retention and Dropout trend by year
gold_retention_yearly_summary = (
    spark_retention
    .groupBy("academic_year")
    .agg(
        F.countDistinct("student_id").alias("total_students"),
        F.sum("dropout_flag").alias("total_dropouts"),
        F.round(
            F.sum("dropout_flag") / F.countDistinct("student_id") * 100, 2
        ).alias("dropout_rate_pct")
    )
    .orderBy("academic_year")
)
gold_retention_yearly_summary.show()

+-------------+--------------+--------------+----------------+
|academic_year|total_students|total_dropouts|dropout_rate_pct|
+-------------+--------------+--------------+----------------+
|    2020-2021|           626|           304|           48.56|
|    2021-2022|           658|           333|           50.61|
|    2022-2023|           716|           368|            51.4|
+-------------+--------------+--------------+----------------+



In [0]:
# academic score band vs dropout
gold_retention_academic_band = (
    spark_retention
    .withColumn(
        "academic_band",
        F.when(F.col("academic_score") < 40, "Low")
         .when(F.col("academic_score") < 60, "Medium")
         .otherwise("High")
    )
    .groupBy("academic_band")
    .agg(
        F.count("*").alias("students"),
        F.sum("dropout_flag").alias("dropouts"),
        F.round(
            F.sum("dropout_flag") / F.count("*") * 100, 2
        ).alias("dropout_rate_pct")
    )
)

# Validates academic risk thresholds
gold_retention_academic_band.show()

+-------------+--------+--------+----------------+
|academic_band|students|dropouts|dropout_rate_pct|
+-------------+--------+--------+----------------+
|       Medium|     996|     607|           60.94|
|         High|     515|       6|            1.17|
|          Low|     489|     392|           80.16|
+-------------+--------+--------+----------------+



In [0]:
# Attendance Band vs Dropout
gold_retention_attendance_band = (
    spark_retention
    .withColumn(
        "attendance_band",
        F.when(F.col("overall_attendance_percentage") < 50, "Low")
         .when(F.col("overall_attendance_percentage") < 75, "Medium")
         .otherwise("High")
    )
    .groupBy("attendance_band")
    .agg(
        F.count("*").alias("students"),
        F.sum("dropout_flag").alias("dropouts"),
        F.round(
            F.sum("dropout_flag") / F.count("*") * 100, 2
        ).alias("dropout_rate_pct")
    )
)
gold_retention_attendance_band.show()
# validates attendance thresholds

+---------------+--------+--------+----------------+
|attendance_band|students|dropouts|dropout_rate_pct|
+---------------+--------+--------+----------------+
|         Medium|     524|     190|           36.26|
|           High|     354|       8|            2.26|
|            Low|    1122|     807|           71.93|
+---------------+--------+--------+----------------+



In [0]:
# Institution-wise dropout summary
gold_retention_institution_summary = (
    spark_retention
    .groupBy("institution_code")
    .agg(
        F.countDistinct("student_id").alias("total_students"),
        F.sum("dropout_flag").alias("total_dropouts"),
        F.round(
            F.sum("dropout_flag") / F.countDistinct("student_id") * 100, 2
        ).alias("dropout_rate_pct")
    )
)
gold_retention_institution_summary.show()

+----------------+--------------+--------------+----------------+
|institution_code|total_students|total_dropouts|dropout_rate_pct|
+----------------+--------------+--------------+----------------+
|          INST02|          1001|           507|           50.65|
|          INST01|           999|           498|           49.85|
+----------------+--------------+--------------+----------------+



In [0]:
# Dropout Reason Distribution
gold_retention_reason_summary = (
    spark_retention
    .groupBy("dropout_reason")
    .agg(
        F.count("*").alias("students"),
        F.sum("dropout_flag").alias("dropouts")
    )
    .orderBy(F.desc("dropouts"))
)
gold_retention_reason_summary.show() 
# Explains dropout causes

+--------------+--------+--------+
|dropout_reason|students|dropouts|
+--------------+--------+--------+
|     Financial|     272|     272|
|      Academic|     255|     255|
|        Health|     240|     240|
|      Personal|     238|     238|
|       Unknown|     995|       0|
+--------------+--------+--------+



In [0]:
# Persist Gold Retention Tables
gold_retention_yearly_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("student_risk_data.default.gold_retention_yearly_summary")

gold_retention_institution_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("student_risk_data.default.gold_retention_institution_summary")

gold_retention_academic_band.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("student_risk_data.default.gold_retention_academic_band")

gold_retention_attendance_band.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("student_risk_data.default.gold_retention_attendance_band")

gold_retention_reason_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("student_risk_data.default.gold_retention_reason_summary")

## Analysing Current Student Data 
- to identify high-risk Students
- to support early intervention strategies

In [0]:

# Aggregate Academics to Student-Year
acad_student = (
    spark_academics
    .groupBy("student_id", "academic_year")
    .agg(
        F.avg("total_marks").alias("avg_academic_score"),
        F.count("*").alias("subjects_taken"),
        F.sum(F.when(F.col("grade") == "F", 1).otherwise(0))
            .alias("failed_subject_count")
    )
    .withColumn(
        "academic_risk_flag",
        F.when(
            (F.col("avg_academic_score") < 40) |
            (F.col("failed_subject_count") >= 3),
            1
        ).otherwise(0)
    )
)


In [0]:
# Aggregate Attendance to Student-Year
attendance_student = (
    spark_attendance
    .groupBy("student_id", "academic_year")
    .agg(
        F.avg("attendance_percentage").alias("avg_attendance_percentage"),
        F.avg("participation_score").alias("avg_participation_score"),
        F.sum("low_attendance").alias("low_attendance_subjects"),
        F.sum("low_participation").alias("low_participation_subjects")
    )
    .withColumn(
        "attendance_risk_flag",
        F.when(
            (F.col("avg_attendance_percentage") < 40) |
            (F.col("low_attendance_subjects") >= 3),
            1
        ).otherwise(0)
    )
)


In [0]:

# Join with Demographics 
# primary gold analytics table
gold_student_master = (
    acad_student
    .join(
        attendance_student,
        on=["student_id", "academic_year"],
        how="inner"
    )
    .join(
        spark_demographics.select(
            "student_id",
            "academic_year",
            "gender",
            "institution_code",
            "income_band",
            "urban_rural",
            "age",
            "age_risk_flag",
            "socio_economic_risk_flag"
        ),
        on=["student_id", "academic_year"],
        how="left"
    )
)


In [0]:
gold_student_master = gold_student_master.withColumn(
    "overall_risk_flag",
    F.when(
        (F.col("academic_risk_flag") == 1) |
        (F.col("attendance_risk_flag") == 1) |
        (F.col("socio_economic_risk_flag") == 1),
        1
    ).otherwise(0)
)


In [0]:
gold_student_master.count() # indicates we've got a summary for each student

1000

In [0]:
gold_student_master.filter(gold_student_master["overall_risk_flag"]==1).count() 
# number of high risk students identifies

285

In [0]:
# Institution-wise Risk Distribution
gold_institution_risk_summary = (
    gold_student_master
    .groupBy("institution_code")
    .agg(
        F.countDistinct("student_id").alias("students"),
        F.sum("overall_risk_flag").alias("at_risk_students"),
        F.round(
            F.sum("overall_risk_flag") / F.countDistinct("student_id") * 100, 2
        ).alias("risk_percentage")
    )
)
gold_institution_risk_summary.show()

+----------------+--------+----------------+---------------+
|institution_code|students|at_risk_students|risk_percentage|
+----------------+--------+----------------+---------------+
|          INST02|     497|             152|          30.58|
|          INST01|     503|             133|          26.44|
+----------------+--------+----------------+---------------+



In [0]:
# Overall Student Summary
gold_student_master.agg(
    F.countDistinct("student_id").alias("total_students"),
    F.sum("overall_risk_flag").alias("students_at_risk"),
    F.sum("academic_risk_flag").alias("academic_risk_students"),
    F.sum("attendance_risk_flag").alias("attendance_risk_students"),
    F.sum("socio_economic_risk_flag").alias("socio_economic_risk_students")
).show()


+--------------+----------------+----------------------+------------------------+----------------------------+
|total_students|students_at_risk|academic_risk_students|attendance_risk_students|socio_economic_risk_students|
+--------------+----------------+----------------------+------------------------+----------------------------+
|          1000|             285|                   112|                     210|                          22|
+--------------+----------------+----------------------+------------------------+----------------------------+



In [0]:
# Income Band vs Risk
gold_income_risk_summary = (
    gold_student_master
    .groupBy("income_band")
    .agg(
        F.countDistinct("student_id").alias("students"),
        F.sum("overall_risk_flag").alias("at_risk_students"),
        F.round(
            F.sum("overall_risk_flag") / F.countDistinct("student_id") * 100, 2
        ).alias("risk_percentage")
    )
)
gold_income_risk_summary.show()

+-----------+--------+----------------+---------------+
|income_band|students|at_risk_students|risk_percentage|
+-----------+--------+----------------+---------------+
|          3|     836|             220|          26.32|
|          2|     129|              31|          24.03|
|          1|      35|              34|          97.14|
+-----------+--------+----------------+---------------+



In [0]:
# Attendance vs Academic
gold_student_master = (
    gold_student_master
    .withColumn(
        "performance_bucket",
        F.when(
            (F.col("avg_attendance_percentage") >= 70) &
            (F.col("avg_academic_score") >= 60),
            "High Attendance / High Performance"
        )
        .when(
            (F.col("avg_attendance_percentage") < 70) &
            (F.col("avg_academic_score") >= 60),
            "Low Attendance / High Performance"
        )
        .when(
            (F.col("avg_attendance_percentage") >= 70) &
            (F.col("avg_academic_score") < 60),
            "High Attendance / Low Performance"
        )
        .otherwise("Low Attendance / Low Performance")
    )
)

In [0]:
# Subject-wise aggregation
subject_base = (
    spark_academics
    .join(
        spark_attendance,
        on=["student_id", "academic_year", "subject_code"],
        how="inner"
    )
)
gold_subject_summary = (
    subject_base
    .groupBy("subject_code")
    .agg(
        # Academic performance
        F.round(F.avg("total_marks"), 2).alias("avg_total_marks"),
        F.round(F.avg("external_mark"), 2).alias("avg_external_mark"),
        F.round(F.avg("internal_marks"), 2).alias("avg_internal_mark"),

        # Attendance & participation
        F.round(F.avg("attendance_percentage"), 2).alias("avg_attendance_percentage"),
        F.round(F.avg("participation_score"), 2).alias("avg_participation_score"),

        # Fail / pass counts
        F.sum(F.when(F.col("grade") == "F", 1).otherwise(0)).alias("fail_count"),
        F.sum(F.when(F.col("grade") != "F", 1).otherwise(0)).alias("pass_count"),

        # Risk indicators
        F.sum("low_attendance").alias("low_attendance_cases"),
        F.sum("low_participation").alias("low_participation_cases"),

        # Volume
        F.countDistinct("student_id").alias("student_count")
    )
)
# Defining Subject Names for clarity
subject_dim = spark.createDataFrame(
    [
        ("SUB03", "English"),
        ("SUB05", "Computer Science"),
        ("SUB04", "Social Studies"),
        ("SUB01", "Mathematics"),
        ("SUB02", "Science")
    ],
    ["subject_code", "subject_name"]
)

gold_subject_summary = (
    gold_subject_summary
    .join(subject_dim, on="subject_code", how="left")
)


In [0]:
# Persist Gold Tables
gold_student_master.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("student_risk_data.default.gold_student_master")

gold_institution_risk_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("student_risk_data.default.gold_institution_risk_summary")

gold_income_risk_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("student_risk_data.default.gold_income_risk_summary")

gold_subject_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("student_risk_data.default.gold_subject_performance_summary")



In [0]:
%sql
SELECT * FROM student_risk_data.default.gold_student_master;


student_id,academic_year,avg_academic_score,subjects_taken,failed_subject_count,academic_risk_flag,avg_attendance_percentage,avg_participation_score,low_attendance_subjects,low_participation_subjects,attendance_risk_flag,gender,institution_code,income_band,urban_rural,age,age_risk_flag,socio_economic_risk_flag,overall_risk_flag,performance_bucket
S241-25,2024-2025,50.2,5,1,0,64.4,6.0,2,2,0,OTHER,INST02,3,URBAN,21,0,0,0,Low Attendance / Low Performance
S385-25,2024-2025,55.2,5,0,0,64.6,5.6,2,2,0,OTHER,INST02,3,URBAN,21,0,0,0,Low Attendance / Low Performance
S371-24,2023-2024,39.0,5,2,1,57.4,4.0,4,4,1,MALE,INST02,3,RURAL,20,0,0,1,Low Attendance / Low Performance
S251-25,2024-2025,54.6,5,1,0,58.75,3.75,2,3,0,MALE,INST01,3,RURAL,21,0,0,0,Low Attendance / Low Performance
S253-24,2023-2024,58.4,5,1,0,61.333333333333336,7.0,2,0,0,FEMALE,INST01,3,RURAL,22,0,0,0,Low Attendance / Low Performance
S201-24,2023-2024,48.2,5,0,0,62.0,6.0,2,2,0,OTHER,INST02,3,URBAN,20,0,0,0,Low Attendance / Low Performance
S373-24,2023-2024,62.25,4,0,0,72.25,3.5,0,4,0,FEMALE,INST02,3,RURAL,22,0,0,0,High Attendance / High Performance
S288-24,2023-2024,55.2,5,0,0,62.25,4.75,2,2,0,MALE,INST02,3,URBAN,19,0,0,0,Low Attendance / Low Performance
S256-25,2024-2025,40.0,4,3,1,69.6,6.2,1,1,0,FEMALE,INST01,3,RURAL,23,1,0,1,Low Attendance / Low Performance
S393-25,2024-2025,52.25,4,1,0,77.2,4.8,0,4,0,MALE,INST01,3,RURAL,21,0,0,0,High Attendance / Low Performance


In [0]:
%sql
SELECT * FROM student_risk_data.default.gold_subject_performance_summary;

subject_code,avg_total_marks,avg_external_mark,avg_internal_mark,avg_attendance_percentage,avg_participation_score,fail_count,pass_count,low_attendance_cases,low_participation_cases,student_count,subject_name
SUB04,51.31,31.2,20.11,66.24,5.19,203,601,312,398,804,Social Studies
SUB01,52.42,32.36,20.06,67.54,5.22,186,633,295,405,819,Mathematics
SUB05,49.34,30.45,18.88,67.46,5.2,217,591,285,408,808,Computer Science
SUB02,51.95,32.27,19.69,68.27,5.07,196,628,272,428,824,Science
SUB03,51.68,32.26,19.41,61.94,5.19,193,623,293,407,816,English
