In [8]:
spark.stop()

In [9]:
from pyspark.sql import SparkSession
import os

# Initialize Spark session
spark = SparkSession.builder.appName("OULADAnalysis").getOrCreate()

# Paths to the OULAD data CSV files
student_assessment_path = "studentAssessment.csv"
student_registration_path = "studentRegistration.csv"
student_info_path = "studentInfo.csv"
vle_path = "vle.csv"
courses_path = "courses.csv"
assessments_path = "assessments.csv"

# Function to check file existence and get absolute path
def load_csv(file_path):
    # abs_path = os.path.abspath(file_path)
    abs_path = f"OULAD dataset/{file_path}"
    # print the current working directory
    print("Current working directory:", os.getcwd(), " abs_path:", abs_path)

    if os.path.exists(abs_path):
        return spark.read.csv(f"{abs_path}", header=True, inferSchema=True)
    else:
        print(f"File {file_path} not found.")
        return None

# Load the datasets
student_assessment_df = load_csv(student_assessment_path)
student_registration_df = load_csv(student_registration_path)
student_info_df = load_csv(student_info_path)
vle_df = load_csv(vle_path)
courses_df = load_csv(courses_path)
# assessments_df = load_csv(assessments_path)

# Show schema of one of the dataframes to inspect structure
student_assessment_df.printSchema()
student_registration_df.printSchema()
student_info_df.printSchema()
vle_df.printSchema()
courses_df.printSchema()
# assessments_df.printSchema()


Current working directory: /home/studen/mickael/python_data_engineer/hadoop/spark  abs_path: OULAD dataset/studentAssessment.csv


AnalysisException: [PATH_NOT_FOUND] Path does not exist: hdfs://localhost:9000/user/studen/OULAD dataset/studentAssessment.csv.

Part 2: Translating SQL Queries to PySpark

Assume the following SQL queries are from your "SQL - Analytics" worksheet, and we'll translate each one into PySpark:

### **1. Query: Calculate the Average Score for Each Student**

Explanation: We want to calculate the average score for each student in the student_assessment table.

`SELECT id_student, AVG(score) AS average_score`

`FROM studentAssessment`

`GROUP BY id_student;`


In [8]:
student_assessment_df.createOrReplaceTempView("student_assessment")
print('5 first lines of student_assessment_df:')
student_assessment_df.show(5)

print('\n 5 lines of student 11391:')
student_assessment_df.filter(student_assessment_df["id_student"] == 11391).show(5)

5 first lines of student_assessment_df:
+-------------+----------+--------------+---------+-----+
|id_assessment|id_student|date_submitted|is_banked|score|
+-------------+----------+--------------+---------+-----+
|         1752|     11391|            18|        0|   78|
|         1752|     28400|            22|        0|   70|
|         1752|     31604|            17|        0|   72|
|         1752|     32885|            26|        0|   69|
|         1752|     38053|            19|        0|   79|
+-------------+----------+--------------+---------+-----+
only showing top 5 rows


 5 lines of student 11391:
+-------------+----------+--------------+---------+-----+
|id_assessment|id_student|date_submitted|is_banked|score|
+-------------+----------+--------------+---------+-----+
|         1752|     11391|            18|        0|   78|
|         1753|     11391|            53|        0|   85|
|         1754|     11391|           115|        0|   80|
|         1755|     11391|           

In [10]:
# Register the DataFrame as a temporary SQL view

# Write the SQL query
sql_query = """
    SELECT id_student, AVG(score) AS average_score
    FROM student_assessment
    GROUP BY id_student
"""

# Execute the SQL query
avg_score_sql_df = spark.sql(sql_query)

# Show the result
avg_score_sql_df.show(5)



+----------+-----------------+
|id_student|    average_score|
+----------+-----------------+
|    180753|             53.4|
|    324084|             57.6|
|   2057803|             66.0|
|    486656|87.72727272727273|
|    502604|72.36363636363636|
+----------+-----------------+
only showing top 5 rows



                                                                                

In [4]:
from pyspark.sql import functions as F

# Group by id_student and calculate the average score
avg_score_df = student_assessment_df.groupBy("id_student").agg(F.avg("score").alias("average_score"))
avg_score_df.show(5)




+----------+-----------------+
|id_student|    average_score|
+----------+-----------------+
|    180753|             53.4|
|    324084|             57.6|
|   2057803|             66.0|
|    486656|87.72727272727273|
|    502604|72.36363636363636|
+----------+-----------------+
only showing top 5 rows



                                                                                

In [None]:
avg_score_df = student_assessment_df.groupBy("id_student").agg(F.avg("score").alias("average_score"))
avg_score_df.show(5)

### **2. Query: Find the Number of Students for Each Course**

Explanation: This query will return the number of students enrolled in each course. 

We’ll need to join the student_registration table with the courses table to get the course information.

**`SELECT code_module, code_presentation, COUNT(id_student) AS num_students`**

**`FROM studentRegistration`**

**`GROUP BY code_module, code_presentation;`**

In [27]:
student_registration_df.createOrReplaceTempView("student_registration")
student_registration_df.show(5)

+-----------+-----------------+----------+-----------------+-------------------+
|code_module|code_presentation|id_student|date_registration|date_unregistration|
+-----------+-----------------+----------+-----------------+-------------------+
|        AAA|            2013J|     11391|             -159|               NULL|
|        AAA|            2013J|     28400|              -53|               NULL|
|        AAA|            2013J|     30268|              -92|                 12|
|        AAA|            2013J|     31604|              -52|               NULL|
|        AAA|            2013J|     32885|             -176|               NULL|
+-----------+-----------------+----------+-----------------+-------------------+
only showing top 5 rows



In [28]:
sql_query = """
select code_module, code_presentation, count(id_student)
from student_registration
group by code_module, code_presentation"""

num_students_sql_df = spark.sql(sql_query)
num_students_sql_df.show(5)

+-----------+-----------------+-----------------+
|code_module|code_presentation|count(id_student)|
+-----------+-----------------+-----------------+
|        FFF|            2013J|             2283|
|        BBB|            2013J|             2237|
|        BBB|            2014J|             2292|
|        EEE|            2014J|             1188|
|        BBB|            2014B|             1613|
+-----------+-----------------+-----------------+
only showing top 5 rows



In [29]:
student_registration_df.groupBy("code_module", "code_presentation") \
    .agg(F.count("id_student").alias("num_students")) \
    .show(5)


+-----------+-----------------+------------+
|code_module|code_presentation|num_students|
+-----------+-----------------+------------+
|        FFF|            2013J|        2283|
|        BBB|            2013J|        2237|
|        BBB|            2014J|        2292|
|        EEE|            2014J|        1188|
|        BBB|            2014B|        1613|
+-----------+-----------------+------------+
only showing top 5 rows



### **3. Query: Get Students with Scores Above Average**

**Explanation:** We want to find students whose score is above the average score for their respective courses. 

This will require calculating the average score for each course and then filtering the students accordingly.

In [16]:
student_assessment_df.createOrReplaceTempView("student_assessment")
student_registration_df.createOrReplaceTempView("student_registration")
student_assessment_df.show(5)
student_registration_df.show(5)

+-------------+----------+--------------+---------+-----+
|id_assessment|id_student|date_submitted|is_banked|score|
+-------------+----------+--------------+---------+-----+
|         1752|     11391|            18|        0|   78|
|         1752|     28400|            22|        0|   70|
|         1752|     31604|            17|        0|   72|
|         1752|     32885|            26|        0|   69|
|         1752|     38053|            19|        0|   79|
+-------------+----------+--------------+---------+-----+
only showing top 5 rows

+-----------+-----------------+----------+-----------------+-------------------+
|code_module|code_presentation|id_student|date_registration|date_unregistration|
+-----------+-----------------+----------+-----------------+-------------------+
|        AAA|            2013J|     11391|             -159|               NULL|
|        AAA|            2013J|     28400|              -53|               NULL|
|        AAA|            2013J|     30268|      

In [19]:
student_assessment_df.createOrReplaceTempView("student_assessment")
student_registration_df.createOrReplaceTempView("student_registration")

sql_query = """
    with avg_scores as (
        select id_assessment, avg(score) as avg_score
        from student_assessment
        group by id_assessment
    ) 

    select sa.id_student, sa.id_assessment, sa.score, as.avg_score
    from student_assessment sa
    join avg_scores as
    on sa.id_assessment = as.id_assessment 
    where sa.score > as.avg_score;
"""

above_avg_sql_df = spark.sql(sql_query)
above_avg_sql_df.show(5)


+----------+-------------+-----+----------------+
|id_student|id_assessment|score|       avg_score|
+----------+-------------+-----+----------------+
|     11391|         1752|   78|70.3072625698324|
|     31604|         1752|   72|70.3072625698324|
|     38053|         1752|   79|70.3072625698324|
|     45642|         1752|   72|70.3072625698324|
|     52130|         1752|   72|70.3072625698324|
+----------+-------------+-----+----------------+
only showing top 5 rows



#### Part 1: Student average by Course

In [31]:
student_assessment_df.createOrReplaceTempView("student_assessment")
student_registration_df.createOrReplaceTempView("student_registration")

sqlq = '''
        SELECT s.code_module, s.code_presentation, s.id_student,  AVG(a.score) AS stud_avg_score
        FROM student_assessment a
        JOIN student_registration s ON a.id_student = s.id_student
        GROUP BY s.code_module, s.code_presentation, s.id_student
        order by s.code_module, s.code_presentation;
        '''

avg_scores_df = spark.sql(sqlq)
avg_scores_df.show(10)



+-----------+-----------------+----------+--------------+
|code_module|code_presentation|id_student|stud_avg_score|
+-----------+-----------------+----------+--------------+
|        AAA|            2013J|   1758449|          61.0|
|        AAA|            2013J|   2645733|          82.6|
|        AAA|            2013J|    383254|          75.6|
|        AAA|            2013J|    319047|          78.4|
|        AAA|            2013J|    442442|          40.5|
|        AAA|            2013J|   2574583|          70.8|
|        AAA|            2013J|     32885|          54.4|
|        AAA|            2013J|   1976139|         67.25|
|        AAA|            2013J|   2678643|          71.4|
|        AAA|            2013J|   2641155|          79.2|
+-----------+-----------------+----------+--------------+
only showing top 10 rows



                                                                                

#### Part 2: Average Course

In [36]:
student_assessment_df.createOrReplaceTempView("student_assessment")
student_registration_df.createOrReplaceTempView("student_registration")

sqlq = '''
with stud_avg_scores_by_courses as (
    SELECT s.code_module, s.code_presentation, s.id_student,  AVG(a.score) AS stud_avg_score
        FROM student_assessment a
        JOIN student_registration s ON a.id_student = s.id_student
        GROUP BY s.code_module, s.code_presentation, s.id_student
        order by s.code_module, s.code_presentation
)
select sabc.code_module, sabc.code_presentation, avg(sabc.stud_avg_score) as course_avg_score
from stud_avg_scores_by_courses sabc
group by sabc.code_module, sabc.code_presentation;
'''

avg_scores_df = spark.sql(sqlq)
avg_scores_df.show(20)

+-----------+-----------------+-----------------+
|code_module|code_presentation| course_avg_score|
+-----------+-----------------+-----------------+
|        AAA|            2013J|68.70252976190477|
|        AAA|            2014J|67.75304570807913|
|        BBB|            2013B|77.01329487886625|
|        BBB|            2013J|76.85449436782734|
|        BBB|            2014B| 76.9498548327345|
|        BBB|            2014J|64.85704275743726|
|        CCC|            2014B|68.43857689587726|
|        CCC|            2014J|73.47009178050807|
|        DDD|            2013B|65.79623979608903|
|        DDD|            2013J|66.98399242550796|
|        DDD|            2014B|66.15579808673954|
|        DDD|            2014J|69.74450526171822|
|        EEE|            2013J|75.64172984208012|
|        EEE|            2014B|72.36605001479646|
|        EEE|            2014J|79.53356797860346|
|        FFF|            2013B|75.82871827428937|
|        FFF|            2013J|74.55298800862386|


#### Part 3: Average student > Average course

In [41]:
student_assessment_df.createOrReplaceTempView("student_assessment")
student_registration_df.createOrReplaceTempView("student_registration")

sqlq = ''' 
with stud_avg_scores_by_courses as (
    SELECT s.code_module, s.code_presentation, s.id_student,  AVG(a.score) AS stud_avg_score
        FROM student_assessment a
        JOIN student_registration s ON a.id_student = s.id_student
        GROUP BY s.code_module, s.code_presentation, s.id_student
        order by s.code_module, s.code_presentation
), course_avg_scores as (
    select sabc.code_module, sabc.code_presentation, avg(sabc.stud_avg_score) as course_avg_score
    from stud_avg_scores_by_courses sabc
    group by sabc.code_module, sabc.code_presentation
)
select cas.code_module, cas.code_presentation, sabc.id_student, sabc.stud_avg_score,  cas.course_avg_score
from stud_avg_scores_by_courses sabc
join course_avg_scores cas
on sabc.code_module = cas.code_module and sabc.code_presentation = cas.code_presentation
where sabc.stud_avg_score > cas.course_avg_score;
'''

avg_scores_df = spark.sql(sqlq)
avg_scores_df.show(20)

+-----------+-----------------+----------+-----------------+-----------------+
|code_module|code_presentation|id_student|   stud_avg_score| course_avg_score|
+-----------+-----------------+----------+-----------------+-----------------+
|        AAA|            2013J|     62155|             76.0|68.70252976190477|
|        AAA|            2013J|    319047|             78.4|68.70252976190477|
|        AAA|            2014J|    375260|             71.4|67.75304570807913|
|        BBB|            2013B|   2380539|             82.1|77.01329487886625|
|        BBB|            2013B|    298476|81.63636363636364|77.01329487886625|
|        BBB|            2013B|    340831|85.83333333333333|77.01329487886625|
|        BBB|            2013B|    558689|             80.6|77.01329487886625|
|        BBB|            2013J|    546941|81.63636363636364|76.85449436782734|
|        BBB|            2013J|    568609|             84.6|76.85449436782734|
|        BBB|            2013J|   2115711|          

In [43]:
from pyspark.sql import functions as F

stud_avg_scores_by_courses = student_assessment_df\
    .join(student_registration_df,"id_student")\
    .groupBy("code_module", "code_presentation", "id_student")\
    .agg(F.avg("score").alias("stud_avg_score"))

course_avg_scores = stud_avg_scores_by_courses.groupBy("code_module", "code_presentation")\
    .agg(F.avg("stud_avg_score").alias("course_avg_score"))

higher_student = stud_avg_scores_by_courses.join(course_avg_scores, ["code_module", "code_presentation"])\
    .filter(F.col("stud_avg_score") > F.col("course_avg_score"))

higher_student.show()
higher_student.explain(True)

                                                                                

+-----------+-----------------+----------+-----------------+-----------------+
|code_module|code_presentation|id_student|   stud_avg_score| course_avg_score|
+-----------+-----------------+----------+-----------------+-----------------+
|        AAA|            2013J|     62155|             76.0|68.70252976190477|
|        AAA|            2013J|    319047|             78.4|68.70252976190477|
|        AAA|            2014J|    375260|             71.4|67.75304570807913|
|        BBB|            2013B|   2380539|             82.1|77.01329487886625|
|        BBB|            2013B|    298476|81.63636363636364|77.01329487886625|
|        BBB|            2013B|    340831|85.83333333333333|77.01329487886625|
|        BBB|            2013B|    558689|             80.6|77.01329487886625|
|        BBB|            2013J|    546941|81.63636363636364|76.85449436782734|
|        BBB|            2013J|    568609|             84.6|76.85449436782734|
|        BBB|            2013J|   2115711|          

### **4. Query: Find the Number of Attempts for Each Student**

Explanation: We want to find how many attempts each student has made. This is stored in the num_of_prev_attempts column in the student_info table.

In [44]:
# Register the DataFrames as temporary SQL views
student_info_df.createOrReplaceTempView("student_info")

# SQL query to calculate average score per code_module and filter students with scores above the average
query = """
    SELECT id_student, num_of_prev_attempts
    FROM student_info
    WHERE num_of_prev_attempts > 0;
"""

# Execute the SQL query
above_avg_df = spark.sql(query)

# Show the results
above_avg_df.show()


+----------+--------------------+
|id_student|num_of_prev_attempts|
+----------+--------------------+
|     65002|                   1|
|     94961|                   1|
|    121349|                   1|
|    129955|                   1|
|    135335|                   1|
|    135400|                   1|
|    141377|                   1|
|    147756|                   1|
|    148993|                   1|
|    155984|                   1|
|    159954|                   1|
|    188278|                   1|
|    235507|                   1|
|    260355|                   1|
|    268733|                   1|
|    277880|                   1|
|    303985|                   1|
|    331358|                   1|
|    335764|                   1|
|    342007|                   1|
+----------+--------------------+
only showing top 20 rows



In [37]:
# Filter students who have made previous attempts
attempts_df = student_info_df.filter(student_info_df.num_of_prev_attempts > 0)
attempts_df.show()


+-----------+-----------------+----------+------+--------------------+--------------------+--------+--------+--------------------+---------------+----------+------------+
|code_module|code_presentation|id_student|gender|              region|   highest_education|imd_band|age_band|num_of_prev_attempts|studied_credits|disability|final_result|
+-----------+-----------------+----------+------+--------------------+--------------------+--------+--------+--------------------+---------------+----------+------------+
|        AAA|            2014J|     65002|     F| East Anglian Region|A Level or Equiva...|  70-80%|    0-35|                   1|             60|         N|        Fail|
|        AAA|            2014J|     94961|     M|        South Region|  Lower Than A Level|  70-80%|   35-55|                   1|             60|         N|        Pass|
|        AAA|            2014J|    121349|     F|   South West Region|A Level or Equiva...|  40-50%|   35-55|                   1|             60

5. Query: Calculate the Average Score for Students by Course and Presentation

Explanation: We want to calculate the average score for students in each course and presentation.

SQL Query:

SELECT code_module, code_presentation, AVG(score) AS avg_score
FROM studentAssessment sa
JOIN studentRegistration sr ON sa.id_student = sr.id_student
GROUP BY code_module, code_presentation;


In [53]:
# SQL query to calculate average score per course and presentation
query = """
    SELECT sr.code_module, sr.code_presentation, AVG(sa.score) AS avg_score
    FROM studentAssessment sa
    JOIN studentRegistration sr ON sa.id_student = sr.id_student
    GROUP BY sr.code_module, sr.code_presentation
"""

# Execute the SQL query
avg_score_by_course_presentation_df = spark.sql(query)

# Show the results
avg_score_by_course_presentation_df.show()


+-----------+-----------------+-----------------+
|code_module|code_presentation|        avg_score|
+-----------+-----------------+-----------------+
|        FFF|            2013J|76.69445366987712|
|        BBB|            2013J|78.68632312537693|
|        BBB|            2014J|66.94286420974524|
|        EEE|            2014J|79.57708659655404|
|        BBB|            2014B|78.60993044822257|
|        GGG|            2014B|79.49848392965434|
|        DDD|            2013J|69.90850417615793|
|        AAA|            2014J|68.75673981191223|
|        DDD|            2014J| 71.4885775862069|
|        GGG|            2013J|79.93120233652442|
|        FFF|            2014J|78.19184161119645|
|        AAA|            2013J|69.50537634408602|
|        DDD|            2013B|  69.502701760502|
|        CCC|            2014J|75.96365316494686|
|        BBB|            2013B|78.84378990380523|
|        EEE|            2013J|75.08482728465238|
|        FFF|            2014B|76.15621524510223|


In [45]:
# Join the assessment data with registration to get course and presentation details
avg_score_by_course_presentation_df = student_assessment_df.join(student_registration_df, "id_student") \
                                                           .groupBy("code_module", "code_presentation") \
                                                           .agg(F.avg("score").alias("avg_score"))
avg_score_by_course_presentation_df.show()


+-----------+-----------------+-----------------+
|code_module|code_presentation|        avg_score|
+-----------+-----------------+-----------------+
|        FFF|            2013J|76.69445366987712|
|        BBB|            2013J|78.68632312537693|
|        BBB|            2014J|66.94286420974524|
|        EEE|            2014J|79.57708659655404|
|        BBB|            2014B|78.60993044822257|
|        GGG|            2014B|79.49848392965434|
|        DDD|            2013J|69.90850417615793|
|        AAA|            2014J|68.75673981191223|
|        DDD|            2014J| 71.4885775862069|
|        GGG|            2013J|79.93120233652442|
|        FFF|            2014J|78.19184161119645|
|        AAA|            2013J|69.50537634408602|
|        DDD|            2013B|  69.502701760502|
|        CCC|            2014J|75.96365316494686|
|        BBB|            2013B|78.84378990380523|
|        EEE|            2013J|75.08482728465238|
|        FFF|            2014B|76.15621524510223|


6. Query: Identify the Most Popular VLE Activity by Course

Explanation: We want to determine the most popular VLE activity by course based on the number of views or activities recorded in the vle table.

SQL Query:

SELECT code_module, activity_type, COUNT(*) AS activity_count
FROM vle
GROUP BY code_module, activity_type
ORDER BY activity_count DESC
LIMIT 1;


In [56]:
# Register the DataFrame as a temporary SQL view
vle_df.createOrReplaceTempView("vle")

# SQL query to group by course and activity type, count activities, and order by activity count
query = """
    SELECT code_module, activity_type, COUNT(*) AS activity_count
    FROM vle
    GROUP BY code_module, activity_type
    ORDER BY activity_count DESC
    LIMIT 1
"""

# Execute the SQL query
most_popular_vle_df = spark.sql(query)

# Show the results
most_popular_vle_df.show()


+-----------+-------------+--------------+
|code_module|activity_type|activity_count|
+-----------+-------------+--------------+
|        BBB|     resource|           807|
+-----------+-------------+--------------+



In [54]:
# Group by course and activity type, and count the activities
most_popular_vle_df = vle_df.groupBy("code_module", "activity_type") \
                            .agg(F.count("*").alias("activity_count")) \
                            .orderBy(F.col("activity_count").desc()) \
                            .limit(1)
most_popular_vle_df.show()


+-----------+-------------+--------------+
|code_module|activity_type|activity_count|
+-----------+-------------+--------------+
|        BBB|     resource|           807|
+-----------+-------------+--------------+



7. Query: Students Who Have Failed All Previous Attempts

Explanation: We want to find students who have failed every attempt based on the final_result column in the student_info table, where "Fail" is the final result.

SQL Query:

SELECT id_student
FROM studentInfo
WHERE final_result = 'Fail';


In [57]:
# Register the DataFrame as a temporary SQL view
student_info_df.createOrReplaceTempView("student_info")

# SQL query to filter students who failed
query = """
    SELECT *
    FROM student_info
    WHERE final_result = 'Fail'
"""

# Execute the SQL query
failed_students_df = spark.sql(query)

# Show the results
failed_students_df.show()


+-----------+-----------------+----------+------+--------------------+--------------------+--------+--------+--------------------+---------------+----------+------------+
|code_module|code_presentation|id_student|gender|              region|   highest_education|imd_band|age_band|num_of_prev_attempts|studied_credits|disability|final_result|
+-----------+-----------------+----------+------+--------------------+--------------------+--------+--------+--------------------+---------------+----------+------------+
|        AAA|            2013J|     74372|     M| East Anglian Region|A Level or Equiva...|   10-20|   35-55|                   0|            150|         N|        Fail|
|        AAA|            2013J|    146188|     F|West Midlands Region|A Level or Equiva...|  20-30%|    0-35|                   0|             60|         Y|        Fail|
|        AAA|            2013J|    147756|     M|        North Region|  Lower Than A Level|  60-70%|    0-35|                   0|            120

In [55]:
# Filter the students who failed
failed_students_df = student_info_df.filter(student_info_df.final_result == "Fail")
failed_students_df.show()


+-----------+-----------------+----------+------+--------------------+--------------------+--------+--------+--------------------+---------------+----------+------------+
|code_module|code_presentation|id_student|gender|              region|   highest_education|imd_band|age_band|num_of_prev_attempts|studied_credits|disability|final_result|
+-----------+-----------------+----------+------+--------------------+--------------------+--------+--------+--------------------+---------------+----------+------------+
|        AAA|            2013J|     74372|     M| East Anglian Region|A Level or Equiva...|   10-20|   35-55|                   0|            150|         N|        Fail|
|        AAA|            2013J|    146188|     F|West Midlands Region|A Level or Equiva...|  20-30%|    0-35|                   0|             60|         Y|        Fail|
|        AAA|            2013J|    147756|     M|        North Region|  Lower Than A Level|  60-70%|    0-35|                   0|            120