## Ingesting data from Bronze Layer

In [14]:
from pyspark.sql.functions import *

StatementMeta(, 137a2076-729e-4421-bcf2-7b9ce9c6164f, 16, Finished, Available, Finished)

In [2]:
#Reading bronze tables
assignments = spark.read.format("delta").table("BronzeLakehouse.bronze_assignments")
courses = spark.read.format("delta").table("BronzeLakehouse.bronze_courses")
enrollments = spark.read.format("delta").table("BronzeLakehouse.bronze_enrollments")
students = spark.read.format("delta").table("BronzeLakehouse.bronze_students")
submissions = spark.read.format("delta").table("BronzeLakehouse.bronze_submissions")

StatementMeta(, 137a2076-729e-4421-bcf2-7b9ce9c6164f, 4, Finished, Available, Finished)

## Data Quality Checks and Data Cleaning

In [4]:
def data_quality_report(df, df_name, id_column=None):
    print(f"\n📊 Data Quality Report: {df_name}")
    print("-" * 50)  #separator made of 50 dashhes
    print("- Schema:") 
    df.printSchema()  

    print("- Record Count:", df.count())  #prints total number of rows in df

    print("- Nulls per Column:")
    df.select([col(c).isNull().cast("int").alias(c) for c in df.columns]) \
      .groupBy().sum().show()  
    #creates new column where each cell is 1 if original value is null, or 0 otherwise, to count null values

    if id_column:  #check if id_column was provided
        total = df.count()
        unique = df.select(id_column).distinct().count()  #counts how many unique IDs are there
        print(f"- Duplicates in '{id_column}': {total - unique}")  #calculates how many duplicates are there


StatementMeta(, 137a2076-729e-4421-bcf2-7b9ce9c6164f, 6, Finished, Available, Finished)

In [5]:
#Cleaning students data
data_quality_report(students, "Students (Uncleaned)", "StudentID")  #old students data report

students_clean = (
    students
    .dropDuplicates(["StudentID"])
    .withColumn("StudentID", trim(col("StudentID")))  #remove trailing spaces
    .withColumn("Name", trim(col("Name")))
    .withColumn("Email", trim(col("Email")))
    .withColumn("EnrollmentDate", when(col("EnrollmentDate") == "", None).otherwise(col("EnrollmentDate"))) #if enrollment date is empty string then replace value with none
    .withColumn("EnrollmentDate", to_date("EnrollmentDate"))  #convert to proper date format
    .dropna(subset=["Email", "EnrollmentDate"])  #drop rows with missing email or enrollmentdate
)

data_quality_report(students_clean, "Students (Cleaned)", "StudentID")  #cleaned students data report

StatementMeta(, 137a2076-729e-4421-bcf2-7b9ce9c6164f, 7, Finished, Available, Finished)


📊 Data Quality Report: Students (Uncleaned)
--------------------------------------------------
- Schema:
root
 |-- StudentID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- EnrollmentDate: string (nullable = true)

- Record Count: 105
- Nulls per Column:
+--------------+---------+----------+-------------------+
|sum(StudentID)|sum(Name)|sum(Email)|sum(EnrollmentDate)|
+--------------+---------+----------+-------------------+
|             0|        0|         2|                  2|
+--------------+---------+----------+-------------------+

- Duplicates in 'StudentID': 5

📊 Data Quality Report: Students (Cleaned)
--------------------------------------------------
- Schema:
root
 |-- StudentID: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- EnrollmentDate: date (nullable = true)

- Record Count: 96
- Nulls per Column:
+--------------+---------+----------+-------------------+
|

In [6]:
#Cleaning courses data
data_quality_report(courses, "Courses (Uncleaned)", "CourseID")

courses_clean = (
    courses
    .dropDuplicates(["CourseID"])
    .withColumn("CourseID", trim(col("CourseID")))
    .withColumn("Instructor", when(col("Instructor").rlike("^[a-z]"), initcap(trim(col("Instructor")))).otherwise(trim(col("Instructor"))))
    #capitalise instructor name if started with lowercase otherwise only trim it
    .withColumn("Semester", when(col("Semester") == "Fall2024", "Fall 2024")  #add space between phrases
                            .when(col("Semester") == "SUMMER 2025", "Summer 2025")  #make it lowercase
                            .otherwise(trim(col("Semester"))))
    .dropna(subset=["CourseID", "CourseName", "Instructor"])  #drop rows where any of these are missing
)

data_quality_report(courses_clean, "Courses (Cleaned)", "CourseID")


StatementMeta(, 137a2076-729e-4421-bcf2-7b9ce9c6164f, 8, Finished, Available, Finished)


📊 Data Quality Report: Courses (Uncleaned)
--------------------------------------------------
- Schema:
root
 |-- CourseID: string (nullable = true)
 |-- CourseName: string (nullable = true)
 |-- Instructor: string (nullable = true)
 |-- Semester: string (nullable = true)

- Record Count: 10
- Nulls per Column:
+-------------+---------------+---------------+-------------+
|sum(CourseID)|sum(CourseName)|sum(Instructor)|sum(Semester)|
+-------------+---------------+---------------+-------------+
|            0|              0|              2|            0|
+-------------+---------------+---------------+-------------+

- Duplicates in 'CourseID': 0

📊 Data Quality Report: Courses (Cleaned)
--------------------------------------------------
- Schema:
root
 |-- CourseID: string (nullable = true)
 |-- CourseName: string (nullable = true)
 |-- Instructor: string (nullable = true)
 |-- Semester: string (nullable = true)

- Record Count: 8
- Nulls per Column:
+-------------+---------------+---

In [7]:
#Cleaning assignments data
data_quality_report(assignments, "Assignments", "AssignmentID")

assignments_clean = (
    assignments
    .dropDuplicates(["AssignmentID"])
    .withColumn("AssignmentID", trim(col("AssignmentID")))
    .withColumn("CourseID", trim(col("CourseID")))
    .withColumn("Title", trim(col("Title")))
    .withColumn("DueDate", to_date(col("DueDate")))  #convert to proper date format
    .dropna(subset=["AssignmentID", "CourseID"])
)

data_quality_report(assignments_clean, "Assignments (Cleaned)", "AssignmentID")


StatementMeta(, 137a2076-729e-4421-bcf2-7b9ce9c6164f, 9, Finished, Available, Finished)


📊 Data Quality Report: Assignments
--------------------------------------------------
- Schema:
root
 |-- AssignmentID: string (nullable = true)
 |-- CourseID: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- DueDate: string (nullable = true)

- Record Count: 52
- Nulls per Column:
+-----------------+-------------+----------+------------+
|sum(AssignmentID)|sum(CourseID)|sum(Title)|sum(DueDate)|
+-----------------+-------------+----------+------------+
|                0|            0|         0|           0|
+-----------------+-------------+----------+------------+

- Duplicates in 'AssignmentID': 2

📊 Data Quality Report: Assignments (Cleaned)
--------------------------------------------------
- Schema:
root
 |-- AssignmentID: string (nullable = true)
 |-- CourseID: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- DueDate: date (nullable = true)

- Record Count: 50
- Nulls per Column:
+-----------------+-------------+----------+------------+
|sum(

In [8]:
#Cleaning Enrollments data
data_quality_report(enrollments, "Enrollments", "EnrollmentID")

enrollments_clean = (
    enrollments
    .dropDuplicates(["EnrollmentID"])
    .withColumn("StudentID", trim(col("StudentID")))
    .withColumn("CourseID", upper(trim(col("CourseID"))))
    .dropna(subset=["StudentID", "CourseID"])
)

data_quality_report(enrollments_clean, "Enrollments (Cleaned)", "EnrollmentID")


StatementMeta(, 137a2076-729e-4421-bcf2-7b9ce9c6164f, 10, Finished, Available, Finished)


📊 Data Quality Report: Enrollments
--------------------------------------------------
- Schema:
root
 |-- EnrollmentID: string (nullable = true)
 |-- StudentID: string (nullable = true)
 |-- CourseID: string (nullable = true)

- Record Count: 315
- Nulls per Column:
+-----------------+--------------+-------------+
|sum(EnrollmentID)|sum(StudentID)|sum(CourseID)|
+-----------------+--------------+-------------+
|                0|             0|            0|
+-----------------+--------------+-------------+

- Duplicates in 'EnrollmentID': 15

📊 Data Quality Report: Enrollments (Cleaned)
--------------------------------------------------
- Schema:
root
 |-- EnrollmentID: string (nullable = true)
 |-- StudentID: string (nullable = true)
 |-- CourseID: string (nullable = true)

- Record Count: 300
- Nulls per Column:
+-----------------+--------------+-------------+
|sum(EnrollmentID)|sum(StudentID)|sum(CourseID)|
+-----------------+--------------+-------------+
|                0|       

In [9]:
#Cleaning submissions data
data_quality_report(submissions, "Submissions", "SubmissionID")

submissions_clean = (
    submissions
    .dropDuplicates(["SubmissionID"])
    .withColumn("AssignmentID", trim(col("AssignmentID")))
    .withColumn("StudentID", trim(col("StudentID")))
    .withColumn("SubmissionDate", to_date(trim(col("SubmissionDate"))))
    .withColumn("Grade", col("Grade").cast("double"))  #convert data type of grade to double
    .dropna(subset=["AssignmentID", "StudentID"])
)

data_quality_report(submissions_clean, "Submissions (Cleaned)", "SubmissionID")


StatementMeta(, 137a2076-729e-4421-bcf2-7b9ce9c6164f, 11, Finished, Available, Finished)


📊 Data Quality Report: Submissions
--------------------------------------------------
- Schema:
root
 |-- SubmissionID: string (nullable = true)
 |-- AssignmentID: string (nullable = true)
 |-- StudentID: string (nullable = true)
 |-- SubmissionDate: string (nullable = true)
 |-- Grade: string (nullable = true)

- Record Count: 525
- Nulls per Column:
+-----------------+-----------------+--------------+-------------------+----------+
|sum(SubmissionID)|sum(AssignmentID)|sum(StudentID)|sum(SubmissionDate)|sum(Grade)|
+-----------------+-----------------+--------------+-------------------+----------+
|                0|                0|             0|                  0|       115|
+-----------------+-----------------+--------------+-------------------+----------+

- Duplicates in 'SubmissionID': 25

📊 Data Quality Report: Submissions (Cleaned)
--------------------------------------------------
- Schema:
root
 |-- SubmissionID: string (nullable = true)
 |-- AssignmentID: string (nullab

## Joining Tables for further analysis

In [12]:
#1. Student-Course-Enrollment Join with selected columns
#Relationship: Student and Course. One student can be in many courses. Which student is in which course?
#to find out which assignments are students expected to submit
student_course_df = enrollments_clean \
    .join(students_clean, "StudentID", "inner") \
    .join(courses_clean, enrollments_clean.CourseID == courses_clean.CourseID, "inner") \
    .select(
        enrollments_clean["StudentID"],
        students_clean["Name"],
        students_clean["Email"],
        enrollments_clean["CourseID"],
        courses_clean["CourseName"]
    )

#2. Course-Assignment Join (keep only necessary fields)
#Relationship: Course and Assignments. Expected assignments for each course
course_assignment_df = assignments_clean \
    .join(courses_clean, "CourseID", "inner") \
    .select(
        assignments_clean["CourseID"],
        assignments_clean["AssignmentID"],
        assignments_clean["Title"],
        assignments_clean["DueDate"]
    )

#3. Alias both DataFrames to avoid ambiguity (as same column names exist in multiple dfs like courseID)
sc = student_course_df.alias("sc")
ca = course_assignment_df.alias("ca")

#4. Full student-assignment mapping
#Master expected submissions table (which students are expected to submit which assignments?)
student_assignment_df = sc.join(
    ca,
    sc["CourseID"] == ca["CourseID"],
    how="inner"
).select(
    sc["StudentID"],
    sc["Name"],
    sc["Email"],
    sc["CourseID"],
    sc["CourseName"],
    ca["AssignmentID"],
    ca["Title"],
    ca["DueDate"]
)

#5. Join with submissions
#All students + assignments are kept, even if there's no submission yet (left join)
#If student hasn't submitted, SubmissionID, SubmissionDate, Grade will be null
#Final silver table, from which we find missing submissions, track submission dates, calculate late submissions
silver_df = student_assignment_df.join(
    submissions_clean.select("AssignmentID", "StudentID", "SubmissionID", "SubmissionDate", "Grade"),
    on=["StudentID", "AssignmentID"],
    how="left"
)


StatementMeta(, 137a2076-729e-4421-bcf2-7b9ce9c6164f, 14, Finished, Available, Finished)

## Writing dataframe into Silver Table

In [16]:
silver_df.write.format("delta").mode("overwrite").saveAsTable("SilverLakehouse.silver_table")

StatementMeta(, 137a2076-729e-4421-bcf2-7b9ce9c6164f, 18, Finished, Available, Finished)