In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, countDistinct, avg, max

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
10,application_1716494797375_0012,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Create or retrieve a Spark session
spark = SparkSession.builder \
    .appName("Online Education Analysis") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.pyspark.python", "python3") \
    .config("spark.pyspark.virtualenv.enabled", "true") \
    .config("spark.pyspark.virtualenv.type", "native") \
    .config("spark.pyspark.virtualenv.bin.path", "/usr/bin/virtualenv") \
    .getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
def load_data(file_name):
    """ Load data from CSV file on S3 using Spark """
    s3_path = f"s3a://kz-final-project/{file_name}"
    return spark.read.csv(s3_path, header=True, inferSchema=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
def calculate_completion_rate(df):
    """ Calculate the course completion rate for each student """
    return df.groupBy('uuid', 'ucid').agg(
        (countDistinct(col('is_correct')) / countDistinct(col('upid'))).alias('completion_rate')
    )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
def add_score_column(df):
    """ Add a 'score' column to DataFrame, where True is 1, False is 0 """
    return df.withColumn('score', col('is_correct').cast('int'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
def calculate_average_scores(df):
    """ Calculate the average scores for each student in each course """
    return df.groupBy('uuid', 'ucid').agg(
        avg('score').alias('average_score')
    )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
def analyze_first_login(df):
    """ Analyze the first login date for each student """
    return df.withColumn('first_login_date_TW', to_timestamp(col('first_login_date_TW'), 'yyyy-MM-dd')) \
             .groupBy('user_city').agg(
                 max('first_login_date_TW').alias('latest_first_login')
             )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
def count_interactions(df):
    """ Count distinct hints used by each student """
    return df.groupBy('uuid').agg(
        countDistinct('used_hint_cnt').alias('hints_used')
    )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Load data
info_content_df = load_data('Info_Content.csv')
info_user_data_df = load_data('Info_UserData.csv')
log_problem_df = load_data('Log_Problem.csv')

# Load processed data from S3
processed_df = spark.read.parquet("s3://kz-final-project/processed_data/")
processed_df = add_score_column(processed_df)  # Prepare the data frame with scores

# Calculate metrics
course_completion = calculate_completion_rate(processed_df)
average_scores = calculate_average_scores(processed_df)
first_login_analysis = analyze_first_login(info_user_data_df)
interactions = count_interactions(processed_df)

# Display the results
course_completion.show(10)
average_scores.show(10)
first_login_analysis.show(10)
interactions.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+-------------------+
|                uuid|                ucid|    completion_rate|
+--------------------+--------------------+-------------------+
|vWgIeDzvzqwgaktmo...|ad/P0gKMpXTOaZ/Sh...| 0.3333333333333333|
|8pdEjpATA3PsMhjIW...|Fj4qR65Oj+FrvKsCq...|0.09090909090909091|
|BNTYlBA3WKuWYunH8...|0jLx3N/h3zT921WmS...|0.09090909090909091|
|l+ygbw9Dm9Ytfp61B...|Iq4bn747ApUwO7SvD...| 0.1111111111111111|
|ZOx27o/qLMk2l4H+0...|cvnGl+iUiKphfjbPc...|0.14285714285714285|
|5exF/tOQXBjuOKvzh...|KzqwnxUK+iRBndR+l...|               0.25|
|gb9z96Tip7MciEXAw...|w74XdHKzME2SVRy/4...|               0.25|
|REoRF8OmojzA+MwEZ...|BMWFDTPQo2WDehX7l...|0.14285714285714285|
|qE8Hag7q5i9BYE7n8...|hVo+IVNVaOniJLhdg...|                0.2|
|ZosxtIsY4WlWBjUeU...|Iq4bn747ApUwO7SvD...|                0.2|
+--------------------+--------------------+-------------------+
only showing top 10 rows

+--------------------+--------------------+-------------+
|                uui

In [11]:
# Save processed data to S3
processed_df.write.mode('overwrite').parquet("s3://kz-final-project/processed_data2/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…