In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

import os
import sys

from pyspark.sql.types import StringType

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Initialize Spark session

spark = (SparkSession.builder.appName("DataProcessingApp")
         .config("spark.executor.memory", "12g")
         .config("spark.driver.memory", "12g")
         .getOrCreate())
spark.sparkContext.setLogLevel("DEBUG")

# Read the CSV file into a DataFrame
problem_df = spark.read.csv('data/2019-2020_school_year/pdets_no_null.csv', header=True, inferSchema=True)
problem_df = problem_df.dropna()

plogs = spark.read.csv('data/2019-2020_school_year/plogs.csv', header=True, inferSchema=True)

# Describe the DataFrame and format the output
problem_df.show()

+----------+--------------------+--------------------+------------------+-----------------+-----+------+---------+
|problem_id|        problem_type|student_answer_count|      mean_correct|mean_time_on_task|grade|domain|subdomain|
+----------+--------------------+--------------------+------------------+-----------------+-----+------+---------+
|     47084|Algebraic Expression|                   5|               0.4|         353.5066|    6|    SP|        B|
|     54190|Algebraic Expression|                  55|0.9444444444444444| 22.6090925925926|    7|    NS|        A|
|     65251|     Multiple Choice|                  43| 0.627906976744186| 73.4248837209302|    4|    NF|        A|
|     71510|     Multiple Choice|                  12|0.8333333333333334| 167.459083333333|    8|     G|        A|
|     71527|     Multiple Choice|                  26|0.7692307692307693| 33.5521923076923|    8|     G|        A|
|     75122|     Multiple Choice|                   8|               1.0| 546.34

In [2]:
from pyspark.sql.functions import isnan, when, count
problem_df.select([count(when(col(c).isNull(), c)).alias(c) for c in problem_df.columns]).show()

+----------+------------+--------------------+------------+-----------------+-----+------+---------+
|problem_id|problem_type|student_answer_count|mean_correct|mean_time_on_task|grade|domain|subdomain|
+----------+------------+--------------------+------------+-----------------+-----+------+---------+
|         0|           0|                   0|           0|                0|    0|     0|        0|
+----------+------------+--------------------+------------+-----------------+-----+------+---------+



In [3]:
problem_df.describe().show()

+-------+-----------------+--------------------+--------------------+------------------+------------------+-----------------+------+---------+
|summary|       problem_id|        problem_type|student_answer_count|      mean_correct| mean_time_on_task|            grade|domain|subdomain|
+-------+-----------------+--------------------+--------------------+------------------+------------------+-----------------+------+---------+
|  count|            47709|               47709|               47709|             47709|             47709|            47709| 47709|    47709|
|   mean|862295.5115387033|                NULL|  155.63377978997673| 0.651990152511452|123.77270059773669|5.808191299923683|  NULL|     NULL|
| stddev|564394.0460591534|                NULL|   284.7886485738366|0.2363646223092889|127.52519217089224|1.910221914335783|  NULL|     NULL|
|    min|                5|Algebraic Expression|                   1|               0.0|             0.003|                1|   APR|        A|

In [4]:
# Count the occurrences of each unique combination of grade, domain, and subdomain
unique_combination = problem_df.groupBy(['grade', 'domain', 'subdomain']).count()
unique_combination.show()

+-----+------+---------+-----+
|grade|domain|subdomain|count|
+-----+------+---------+-----+
|    8|     F|        A|  652|
|    7|    SP|        A|  103|
|    5|     G|        B|  387|
|    6|    NS|        C|  642|
|    3|    NF|        A|  799|
|    6|    SP|        A|  195|
|    6|    EE|        A| 1008|
|    4|    NF|        A|  522|
|  HSA|   APR|        A|    7|
|    5|    OA|        A|   89|
|  HSA|   REI|        D|   55|
|  HSN|    RN|        A|   25|
|    4|    NF|        C|  242|
|    1|   NBT|        B|   21|
|    4|     G|        A|  376|
|    6|    SP|        B| 1229|
|    7|    RP|        A| 2856|
|    6|    EE|        B|  233|
|    6|    NS|        A|  323|
|    3|    MD|        C|  521|
+-----+------+---------+-----+
only showing top 20 rows



In [5]:
# Count the number of unique combinations of grade, domain, and subdomain
unique_combination.count()

98

In [6]:
# Function to rename columns
def rename_columns(input_df):
    new_columns = [
        column.replace(column[:3], "9") if column.startswith("HS") else column
        for column in input_df.columns
    ]
    return input_df.toDF(*new_columns)

In [7]:
from pyspark.sql.functions import (mean, concat_ws, first)

df = plogs.join(problem_df, plogs['problem_id'] == problem_df['problem_id'], 'inner')
mean_correct_df = (
    df.groupBy("student_id", "grade", "domain", "subdomain")
    .agg(mean("mean_correct").alias("mean_correct"))
    .withColumn("grade_domain_subdomain", 
                concat_ws("_", "grade", "domain", "subdomain"))
)

feedback_matrix = (
    mean_correct_df
    .groupBy("student_id")
    .pivot("grade_domain_subdomain")
    .agg(first("mean_correct"))
)

feedback_matrix.sort('student_id').show(10)

+----------+-----+------+-------+-------+-------+------+------+------+------+-----+------+------+------+------+------------------+-------+------+------+------+-----+------+------+------+------+-------+------+------+------+------+------+-----+------+------+------+-------+-------+------+------+------+------+------+------------------+-----+-----+------+------+------+-------+-------+------+------+------+------+------+------+------+-----+------------------+------+------+------+------+------+------+------+-----+-----+------------------+------+------+------+------------------+------+------+------+-----+-----+-----+-----+-----+------+------+---------+---------+---------+---------+---------+---------+---------+---------+--------+--------+--------+---------+---------+---------+--------+--------+
|student_id|1_G_A|1_MD_A|1_NBT_A|1_NBT_B|1_NBT_C|1_OA_A|1_OA_B|1_OA_C|1_OA_D|2_G_A|2_MD_A|2_MD_B|2_MD_C|2_MD_D|           2_NBT_A|2_NBT_B|2_OA_A|2_OA_B|2_OA_C|3_G_A|3_MD_A|3_MD_B|3_MD_C|3_MD_D|3_NBT_A|3_

In [8]:
from pyspark.sql.functions import col, lit

hs_columns = [col for col in feedback_matrix.columns if col.startswith('HS')]
condition = lit(False)
for col in hs_columns:
    condition = condition | feedback_matrix[col].isNotNull()

# Apply filter
filtered_df = feedback_matrix.filter(condition)
filtered_df.show()

+----------+-----+------+-------+-------+-------+------+------+------+------+-----+------+------+------+------+-------+-------+------+------+------+-----+------+------+------------------+------+-------+------+------+------+------+------+-----+------+------+------+-------+-------+------+------+------+------+------+------+-----+------------------+------------------+------+------------------+-------+------------------+------+------+------+------+------------------+------------------+------------------+------------------+------+------------------+-------------------+-------------------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-------------------+------------------+------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------+------------------+---------------

In [9]:
# Helper functions to extract grade and domain from grade_domain_subdomain
def extract_components(column_name):
    parts = column_name.split("_")
    # High school grade -> 9
    # Cause in HS_ grade -> core standards only categorize them by domain (Algebra, Geometry, etc.), not by grade anymore
    return {
        'grade': int(parts[0]),
        'domain': parts[1],
        'subdomain': parts[2] if len(parts) > 2 else None
    }

In [10]:
feedback_matrix = rename_columns(feedback_matrix)
feedback_matrix.show(5)

+----------+-----+------+-------+-------+-------+------+------+------+------+-----+------+------+------+------+-------+-------+------+------+------+-----+------+------+------+------+-------+------+-----------------+------+------+------+-----+------+------+------------------+-------+-------+------+------+------+------+------+------+-----+-----+------+------+------+------------------+------------------+-------------------+------------------+------------------+------+------------------+------------------+------+-----+------------------+-------------------+------+------------------+------+------+------------------+-------------------+------------------+------------------+------------------+-------------------+------+-----------------+------------------+------------------+------+------------------+-----+------------------+-----+------------------+------------------+-----------------+------+-------+-------+-------+-------+-------+-------+-------+-------+------+------+------+-------+-------+-

In [11]:
# 2. Get column information with grade and domain
pivot_columns = [c for c in feedback_matrix.columns if c != "student_id"]
column_info = [(col_name, extract_components(col_name)) for col_name in pivot_columns]
column_info

[('1_G_A', {'grade': 1, 'domain': 'G', 'subdomain': 'A'}),
 ('1_MD_A', {'grade': 1, 'domain': 'MD', 'subdomain': 'A'}),
 ('1_NBT_A', {'grade': 1, 'domain': 'NBT', 'subdomain': 'A'}),
 ('1_NBT_B', {'grade': 1, 'domain': 'NBT', 'subdomain': 'B'}),
 ('1_NBT_C', {'grade': 1, 'domain': 'NBT', 'subdomain': 'C'}),
 ('1_OA_A', {'grade': 1, 'domain': 'OA', 'subdomain': 'A'}),
 ('1_OA_B', {'grade': 1, 'domain': 'OA', 'subdomain': 'B'}),
 ('1_OA_C', {'grade': 1, 'domain': 'OA', 'subdomain': 'C'}),
 ('1_OA_D', {'grade': 1, 'domain': 'OA', 'subdomain': 'D'}),
 ('2_G_A', {'grade': 2, 'domain': 'G', 'subdomain': 'A'}),
 ('2_MD_A', {'grade': 2, 'domain': 'MD', 'subdomain': 'A'}),
 ('2_MD_B', {'grade': 2, 'domain': 'MD', 'subdomain': 'B'}),
 ('2_MD_C', {'grade': 2, 'domain': 'MD', 'subdomain': 'C'}),
 ('2_MD_D', {'grade': 2, 'domain': 'MD', 'subdomain': 'D'}),
 ('2_NBT_A', {'grade': 2, 'domain': 'NBT', 'subdomain': 'A'}),
 ('2_NBT_B', {'grade': 2, 'domain': 'NBT', 'subdomain': 'B'}),
 ('2_OA_A', {'grad

In [12]:
# 3. Create imputation rules with domain awareness
def create_imputation_expressions(column_info_):
    expressions = []
    for col_name, target_info in column_info_:
        # Find higher grade columns
        higher_grade_cols = [
            (c, info) for c, info in column_info_ 
            if info['grade'] > target_info['grade']
        ]
        
        if higher_grade_cols:
            # Calculate weights based on both grade proximity and domain similarity
            weights = []
            for hc, hinfo in higher_grade_cols:
                # Grade proximity weight (inverse of grade difference)
                grade_diff = hinfo['grade'] - target_info['grade']
                grade_weight = 1.0 / grade_diff
                
                # Domain similarity weight
                domain_weight = 4.0 if hinfo['domain'] == target_info['domain'] else 1.0
                
                # Combined weight
                total_weight = grade_weight * domain_weight
                
                # Performance adjustment factor (better performance in lower grades)
                grade_boost = 0.05 * (hinfo['grade'] - target_info['grade'])
                
                weights.append(
                    f"(CASE WHEN {hc} IS NOT NULL THEN {total_weight} * (LEAST(1.0, {hc} + {grade_boost})) ELSE 0 END)"
                )
            
            # Weight normalization terms
            weight_sum = " + ".join([
                f"(CASE WHEN {hc} IS NOT NULL THEN {1.0/(hinfo['grade']-target_info['grade']) * (4.0 if hinfo['domain'] == target_info['domain'] else 1.0)} ELSE 0 END)"
                for hc, hinfo in higher_grade_cols
            ])
            
            weighted_sum = " + ".join(weights)
            
            expr_str = f"""
                CASE 
                    WHEN {col_name} IS NOT NULL THEN {col_name}
                    WHEN ({weight_sum}) > 0 THEN 
                        LEAST(1.0, ({weighted_sum}) / ({weight_sum}))
                    ELSE NULL 
                END as {col_name}
            """
            expressions.append(expr_str)
        else:
            expr_str = f"""
                CASE 
                    WHEN {col_name} IS NOT NULL THEN {col_name}
                END as {col_name}
            """
            expressions.append(expr_str)
    
    return expressions

In [13]:
# 4. Apply imputation
imputation_expressions = create_imputation_expressions(column_info)
imputed_matrix = feedback_matrix.selectExpr(
    "student_id",
    *imputation_expressions
)

In [14]:
imputed_matrix.show(10)

+----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+------------

In [15]:
# 5. Forward fill remaining nulls with grade-based defaults
default_expressions = []
for col_name, info in column_info:
    # Higher default performance for lower grades
    base_performance = 0.35
    grade_boost = 0.05 * (max(g['grade'] for _, g in column_info) - info['grade'])
    default_value = min(0.95, base_performance + grade_boost)  # Cap at 0.95

    expr_str = f"""
        CASE 
            WHEN {col_name} IS NOT NULL THEN {col_name}
            ELSE {default_value}  -- Grade-adjusted default performance
        END as {col_name}
    """
    default_expressions.append(expr_str)

final_matrix = imputed_matrix.selectExpr(
    "student_id",
    *default_expressions
)

In [16]:
final_matrix.show(10)

+----------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+------------

In [17]:
final_matrix.toPandas().to_csv("final_matrix.csv", index=False)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\Users\hp\miniconda3\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\hp\miniconda3\Lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\hp\miniconda3\Lib\socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 