# Data Analytics with PySpark & Flask

## Load Data

Load data from db to Pandas df

In [1]:
import pandas as pd
import psycopg2

# connect to PostgreSQL db
conn = psycopg2.connect(
    dbname="54_proj",
    user="postgres",
    password="123",
    host="localhost",
    port="5432"
)

q_int = "SELECT * FROM interactions"
q_stu = "SELECT * FROM students"
q_lec = "SELECT * FROM lectures"
q_q = "SELECT * FROM questions"

# load sql to df
df_int = pd.read_sql(q_int, conn)
df_stu = pd.read_sql(q_stu, conn)
df_lec = pd.read_sql(q_lec, conn)
df_q = pd.read_sql(q_q, conn)

dfs = [df_int, df_stu, df_lec, df_q]

  df_int = pd.read_sql(q_int, conn)
  df_stu = pd.read_sql(q_stu, conn)
  df_lec = pd.read_sql(q_lec, conn)
  df_q = pd.read_sql(q_q, conn)


In [2]:
df_int.shape

(23000713, 7)

Create PySpark dataframe from pandas df

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions
from pyspark import SparkContext, SparkConf

spark = SparkSession.builder \
    .appName("Pandas to Spark") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()
sc = spark.sparkContext

#check driver memory 
print(sc._conf.get('spark.driver.memory'))

questions = spark.createDataFrame(df_q)
students = spark.createDataFrame(df_stu)
lectures = spark.createDataFrame(df_lec)
interactions = spark.createDataFrame(df_int)

24/04/20 12:27:09 WARN Utils: Your hostname, MacBook-Pro-295.local resolves to a loopback address: 127.0.0.1; using 10.206.88.24 instead (on interface en0)
24/04/20 12:27:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/20 12:27:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/04/20 12:27:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


8g


Select columns that is relevant to our analysis

In [3]:
interactions = interactions.select('user_id','question_id', 'user_answer', 'elapsed_time')

In [4]:
questions = questions.select('question_id', 'correct_answer', 'tags')

In [5]:
lectures = lectures.select('lecture_id', 'tags', 'video_length')

In [6]:
from pyspark.sql.functions import col, split, explode, array_contains

#split tags into list then explode to separate rows
exploded_questions = questions.withColumn("tags_array", explode(split(col("tags"), ";")))

#left join on the tags column
joined_df = lectures.join(exploded_questions, lectures.tags == exploded_questions.tags_array, how='left')

# Select lectures that do not have any associated questions
lectures_without_questions = joined_df.filter(col("question_id").isNull())

# Select distinct lectures without questions to avoid duplicate entries
distinct_lectures_without_questions = lectures_without_questions.select("lecture_id","video_length").distinct()

## filter letures that has no associated questions
lectures = lectures.join(
    distinct_lectures_without_questions,
    on="lecture_id",
    how="left_anti"
)

In [9]:
# free up memory after loading into spark
import gc
dfs = [df_int, df_stu, df_lec, df_q]
for df in dfs:
    del df
del dfs
gc.collect()

279

## Analyze Student Performance

In [10]:
from pyspark.sql import Window
from pyspark.sql.functions import avg, count, broadcast, when

# Broadcasting 'questions' as it is much smaller than 'interactions'
questions_df = broadcast(questions)
correctness_df = interactions.join(questions_df, "question_id")

# define window specification
window_spec = Window.partitionBy("question_id")

# add calculated columns with window functions
correctness_df = correctness_df.withColumn(
    "average_time_taken_on_this_q",
    #convert millisecond to second
    (avg("elapsed_time").over(window_spec) / 1000)
).withColumn(
    "average_correctness_of_this_q",
    #use percentage
    (avg(when(col("user_answer") == col("correct_answer"), 1).otherwise(0)).over(window_spec)*100)
)
#for better performance
correctness_df.cache()

24/04/20 12:43:51 WARN CacheManager: Asked to cache already cached data.


DataFrame[question_id: string, user_id: bigint, user_answer: string, elapsed_time: bigint, correct_answer: string, tags: string, average_time_taken_on_this_q: double, average_correctness_of_this_q: double]

In [8]:
# Generate summary statistics for student: student id
def analyze_student_performance(student_id):
    # Filter for specific student interactions
    student_specific = correctness_df.filter(col("user_id") == student_id)

    # calculate student's correctness and total questions completed
    correctness_metrics = student_specific.agg(
        avg(when(col("user_answer") == col("correct_answer"), 1).otherwise(0)).alias("student_correctness"),
        count("*").alias("total_count")
    ).collect()[0]
    
    student_correctness = correctness_metrics['student_correctness'] * 100
    total_questions_solved = correctness_metrics['total_count']

    # calculate overall correctness using 'correctness_df' defined earlier
    overall_correct = correctness_df.filter(col("user_answer") == col("correct_answer")).count()
    total_count = correctness_df.count()
    overall_correctness = overall_correct / total_count if total_count > 0 else 0
    
    #calculate how this student's performance compared to average
    correctness_comparison = (student_correctness - overall_correctness * 100)

    # extract wrong answers with additional details & rename columsn
    wrong_answers_df = student_specific.filter(col("user_answer") != col("correct_answer")).select(
        col("question_id"),
        col("elapsed_time"),
        col("average_time_taken_on_this_q"),
        col("average_correctness_of_this_q"),
        col("user_answer").alias("student_answer"),
        col("correct_answer").alias("correct_answer")
    ).withColumn(
    "elapsed_time", col('elapsed_time')/1000
    )

    return {
        "total_questions_solved": total_questions_solved,
        "student_correctness": student_correctness,
        "correctness_comparison": correctness_comparison,
        "wrong_answers_df": wrong_answers_df
    }


In [12]:
#test func output
student_id = 440454  # Replace this with an actual student ID
results = analyze_student_performance(student_id)
print("Total Questions Solved:", results["total_questions_solved"])
print("Student Correctness:", results["student_correctness"])
print("Correctness Comparison:", results["correctness_comparison"])
results["wrong_answers_df"].show()

Exception in thread "serve-DataFrame" java.net.SocketTimeoutException: Accept timed out
	at java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:65)
                                                                                

Total Questions Solved: 7
Student Correctness: 28.57142857142857
Correctness Comparison: -36.74975081983639


127.0.0.1 - - [20/Apr/2024 12:47:58] "GET /student/97657 HTTP/1.1" 200 -        
INFO:werkzeug:127.0.0.1 - - [20/Apr/2024 12:47:58] "GET /student/97657 HTTP/1.1" 200 -
                                                                                

+-----------+------------+----------------------------+-----------------------------+--------------+--------------+
|question_id|elapsed_time|average_time_taken_on_this_q|average_correctness_of_this_q|student_answer|correct_answer|
+-----------+------------+----------------------------+-----------------------------+--------------+--------------+
|      q4775|        20.0|           27.98703384968445|             73.1612162937464|             a|             b|
|      q6731|        24.0|          30.901054030956224|           25.705061247744755|             a|             c|
|       q297|        23.0|          19.052934407364788|           59.196394322976595|             a|             b|
|        q68|        25.0|          21.563899868247695|            87.43961352657004|             c|             b|
|      q5807|        12.0|                       32.78|           45.857142857142854|             c|             d|
+-----------+------------+----------------------------+-----------------

### generate flask ui for student summary dashboard

In [13]:
from flask import Flask, render_template, request, redirect, url_for

app = Flask(__name__)

@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'POST':
        student_id = request.form['student_id']
        print("Redirecting to dashboard with student ID:", student_id)
        print(url_for('dashboard', student_id=student_id))
        return redirect(url_for('dashboard', student_id=student_id))
    return render_template('index_student.html')

@app.route('/student/<int:student_id>')
def dashboard(student_id):
    try:
        results = analyze_student_performance(student_id)
        results['student_id'] = student_id
    except Exception as e:
        print('function error!')
        return f"An error occurred: {e}", 500
    return render_template('student.html',  results = results)

if __name__ == '__main__':
    app.run(host='127.0.0.1',port=3000)

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:3000
 * Running on http://127.0.0.1:3000
Press CTRL+C to quit
INFO:werkzeug:[33mPress CTRL+C to quit[0m
127.0.0.1 - - [20/Apr/2024 12:49:32] "GET / HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [20/Apr/2024 12:49:32] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [20/Apr/2024 12:49:34] "POST / HTTP/1.1" 302 -
INFO:werkzeug:127.0.0.1 - - [20/Apr/2024 12:49:34] "[32mPOST / HTTP/1.1[0m" 302 -


Redirecting to dashboard with student ID: 97657
/student/97657


127.0.0.1 - - [20/Apr/2024 12:49:49] "GET /student/97657 HTTP/1.1" 200 -        
INFO:werkzeug:127.0.0.1 - - [20/Apr/2024 12:49:49] "GET /student/97657 HTTP/1.1" 200 -
127.0.0.1 - - [20/Apr/2024 12:51:23] "GET / HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [20/Apr/2024 12:51:23] "GET / HTTP/1.1" 200 -


## Analyze Lecture Quality

In [14]:
#correctness df to each question 
from pyspark.sql.functions import *
question_correctness = correctness_df.select('question_id', 'average_time_taken_on_this_q', 'average_correctness_of_this_q', 'tags')\
                                     .groupBy('question_id')\
                                     .agg(
                                        first("tags").alias("tags"),
                                        avg("average_time_taken_on_this_q").alias("average_time_taken"),
                                        avg("average_correctness_of_this_q").alias("average_correctness")
).sort(['average_correctness', 'average_time_taken'], ascending=[True, False])
question_correctness.show()

                                                                                

+-----------+--------------------+------------------+-------------------+
|question_id|                tags|average_time_taken|average_correctness|
+-----------+--------------------+------------------+-------------------+
|      q1489|          52;181;184|               1.0|                0.0|
|      q3613|                  74|               1.0|                0.0|
|      q1488|          53;181;184|               1.0|                0.0|
|      q1490|          54;181;184|               1.0|                0.0|
|     q10626|                  74|29.372523117569873|  9.841479524438808|
|      q7771| 155;179;177;153;177| 58.84649719276178|  9.918902058640017|
|      q9428|                  85| 36.04166666666666|               12.5|
|      q9539|                  94| 28.41720105620453| 13.240286684270348|
|     q16961|                  -1|103.96135398230084| 13.274336283185844|
|      q3140|           64;52;184|23.996076086956492| 14.130434782608722|
|      q3170|           60;55;183|23.7

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, explode, broadcast

def analyze_lecture(lecture_id):

    target_lecture = lectures.filter(col("lecture_id") == lecture_id)

    exploded_correctness = question_correctness.withColumn("tag", explode(split(col("tags"), ";")))

    #lecture is much smaller than questions so use broadcast to optimize 
    broadcasted_lecture = broadcast(target_lecture)

    #join correctness df with the lecture df on tags
    related_questions = exploded_correctness.join(
        broadcasted_lecture,
        exploded_correctness.tag == broadcasted_lecture.tags,  # Joining on the single tag from lecture and exploded tags from correctness
        "inner"
    ).select(
        col("question_id").alias("Question ID"),
        round(col("average_time_taken"), 2).alias("Average Time Taken (s)"),
        round(col("average_correctness"), 2).alias("Average Correctness (%)")
    )
    
    summary = related_questions.agg(
        count("Question ID").alias("Total Associated Questions"),
        round(avg("Average Time Taken (s)"), 2).alias("Overall Average Time (s)"),
        round(avg("Average Correctness (%)"), 2).alias("Overall Average Correctness (%)")
    )

    return summary, related_questions

In [28]:
#example output
summary, results = analyze_lecture('l427')
summary.show()
#results.show()

                                                                                

+---------------+------------------------+-------------------------------+
|total_questions|Overall Average Time (s)|Overall Average Correctness (%)|
+---------------+------------------------+-------------------------------+
|             36|                   21.38|                           82.7|
+---------------+------------------------+-------------------------------+



### generate flask ui for lecture summary

In [18]:
from flask import Flask, render_template, request, redirect, url_for
import pandas as pd

app = Flask(__name__)

@app.route("/", methods=['GET', 'POST'])
def index():
    if request.method == 'POST':
        lecture_id = request.form.get('lecture_id')
        if lecture_id:
            return redirect(url_for('display_summary', lecture_id=lecture_id))
    # Assume lecture_ids are extracted properly for display
    lecture_ids = lectures.select("lecture_id").distinct().collect()
    return render_template('index_lecture.html', lecture_ids=lecture_ids)

@app.route("/summary/<lecture_id>")
def display_summary(lecture_id):
    summary_df, detailed_df = analyze_lecture(lecture_id)
    # Convert Spark DataFrames to Pandas for easier handling in HTML
    detailed_html = detailed_df.toPandas().to_html(classes="table table-striped", index=False)
    summary_html = summary_df.toPandas().to_html(classes="table table-striped", index=False)
    return render_template('lecture.html', lecture_id = lecture_id, detailed_html=detailed_html, summary_html=summary_html)


if __name__ == '__main__':
    app.run(host='127.0.0.1',port=5000)


 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
 * Running on http://127.0.0.1:5000
Press CTRL+C to quit
INFO:werkzeug:[33mPress CTRL+C to quit[0m
127.0.0.1 - - [20/Apr/2024 12:52:34] "GET / HTTP/1.1" 200 -                     
INFO:werkzeug:127.0.0.1 - - [20/Apr/2024 12:52:34] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [20/Apr/2024 12:52:35] "GET /favicon.ico HTTP/1.1" 404 -
INFO:werkzeug:127.0.0.1 - - [20/Apr/2024 12:52:35] "[33mGET /favicon.ico HTTP/1.1[0m" 404 -
127.0.0.1 - - [20/Apr/2024 12:52:46] "POST / HTTP/1.1" 302 -
INFO:werkzeug:127.0.0.1 - - [20/Apr/2024 12:52:46] "[32mPOST / HTTP/1.1[0m" 302 -
127.0.0.1 - - [20/Apr/2024 12:53:56] "GET /summary/l695 HTTP/1.1" 200 -         
INFO:werkzeug:127.0.0.1 - - [20/Apr/2024 12:53:56] "GET /summary/l695 HTTP/1.1" 200 -
127.0.0.1 - - [20/Apr/2024 13:03:05] "GET / HTTP/1.1" 200 -                     
INFO:werkzeug:127.0.0.1 - - [20/Apr/2024 13:03:05] "GET / HTTP/1.1" 200 -


Thank you :)