In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Machine Learning imports - CLASSIFICATION ONLY
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score, cohen_kappa_score
from sklearn.impute import SimpleImputer
from xgboost import XGBClassifier

import warnings
warnings.filterwarnings('ignore')

# Configuration
plt.style.use('default')
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['font.size'] = 10

print("METACOGNITION PREDICTION MODEL")
print("=" * 60)
print("Predicting Post3Meta scores using behavioral features")
print("Target: Post3Meta (Metacognition Assessment)")
print("Features: Learning behavior patterns and help-seeking")
print("=" * 60)

METACOGNITION PREDICTION MODEL
Predicting Post3Meta scores using behavioral features
Target: Post3Meta (Metacognition Assessment)
Features: Learning behavior patterns and help-seeking


# Training Data Preprocessing

In [None]:
df_main = pd.read_csv("./raw_data/training_set_with_formatted_time.csv")
df_ws = pd.read_csv("./raw_data/workspace_summary_train.csv")
df_scores = pd.read_csv("./raw_data/student_scores_train.csv")

df_main.drop_duplicates(inplace=True)
df_ws.drop_duplicates(inplace=True)
df_scores.drop_duplicates(inplace=True)

In [None]:
# Remove unnecessary columns
df_main.drop(columns=['CF..Anon.School.Id.', 'CF..Anon.Class.Id.', 'Time', 'formatted_time'], inplace=True)

# Remove rows containing 'OK_AMBIGUOUS'
df_main = df_main[df_main['Outcome'] != 'OK_AMBIGUOUS']

df_main.sort_values(by=['Anon.Student.Id', 'datetime'], inplace=True)

df_main['datetime'] = pd.to_datetime(
    df_main['datetime'],
    infer_datetime_format=True,
    errors='coerce'       # turns invalid parses into NaT
)

# Generate time steps
df_main['time_step'] = df_main.groupby('Anon.Student.Id')['datetime'].rank(method='first') - 1 

In [None]:
df_main.to_csv('preprocessed_data/df_main_allws.csv', index=False)
df_ws.to_csv('preprocessed_data/df_ws_allws.csv', index=False)

In [None]:
workspace_ids_to_remove = [
    'worksheet_grapher_a1_lin_mod_mult_rep',
    'equation_line_2',
    'analyzing_models_2step_rationals',
    'multiple_representations_of_linear_functions',
    'worksheet_grapher_a1_slope_intercept_integer',
    'worksheet_grapher_a1_slope_intercept_decimal',
    'connecting_slope_intercept_and_point_slope_forms',
    'equation_line_1',
    'equation_line_3',
    'worksheet_grapher_a1_mod_initial_plus_point',
    'worksheet_grapher_a1_mod_two_points',
    'modeling_linear_equations_in_standard_form',
    'graph_setup_linear_equation-1',
    'graph_setup_linear_equation-2',
    'classifying_relations_and_functions',
    'introduction_to_functions',
    'graphs_of_functions',
    'graphs_of_functions-1',
    'compare_functions_diff_reps_linear_relationships'
]


df_main = df_main[~df_main['Level..Workspace.Id.'].isin(workspace_ids_to_remove)]
df_ws = df_ws[~df_ws['workspace'].isin(workspace_ids_to_remove)]

In [None]:
# create 4 dataframes, each with cleaned pre/post scores and imputed NaNs with median
imputer = SimpleImputer(strategy='median')

df_cleaned_math = df_scores[["Anon.Student.Id", "PreMath", "PostMath"]].copy()
df_cleaned_math = df_cleaned_math.dropna(subset=["PostMath"])

df_cleaned_map = df_scores[["Anon.Student.Id", "PreMAP", "Post1MAP", "Post2MAP", "Post3MAP"]].copy()
df_cleaned_map[["PreMAP", "Post1MAP", "Post2MAP", "Post3MAP"]] = imputer.fit_transform(df_cleaned_map[["PreMAP", "Post1MAP", "Post2MAP", "Post3MAP"]])

df_cleaned_se = df_scores[["Anon.Student.Id", "PreSE", "Post1SE", "Post2SE", "Post3SE"]].copy()
df_cleaned_se[["PreSE", "Post1SE", "Post2SE", "Post3SE"]] = imputer.fit_transform(df_cleaned_se[["PreSE", "Post1SE", "Post2SE", "Post3SE"]])

df_cleaned_meta = df_scores[["Anon.Student.Id", "PreMeta", "Post1Meta", "Post2Meta", "Post3Meta"]].copy()
df_cleaned_meta[["PreMeta", "Post1Meta", "Post2Meta", "Post3Meta"]] = imputer.fit_transform(df_cleaned_meta[["PreMeta", "Post1Meta", "Post2Meta", "Post3Meta"]])

In [None]:
# convert all to csv and store in processed_data folder
df_main.to_csv("preprocessed_data/df_main.csv", index=False)
df_ws.to_csv("preprocessed_data/df_ws.csv", index=False)
df_scores.to_csv("preprocessed_data/df_scores.csv", index=False)
df_cleaned_meta.to_csv("preprocessed_data/df_cleaned_meta.csv", index=False)

# Loading Preprocessed Data

In [2]:
# ==============================================================================
# DATA LOADING AND VALIDATION
# ==============================================================================

print("\nLOADING AND VALIDATING DATA")
print("=" * 50)

# Load all data files
df_main = pd.read_csv('preprocessed_data/df_main_allws.csv')
df_ws = pd.read_csv('preprocessed_data/df_ws_allws.csv')
df_cleaned_meta = pd.read_csv('preprocessed_data/df_cleaned_meta.csv')

print(f"✓ Main interaction data: {len(df_main):,} records")
print(f"✓ Workspace data: {len(df_ws):,} workspace sessions")
print(f"✓ Metacognition assessments: {len(df_cleaned_meta):,} students")

print(f"\nVALIDATING METACOGNITION DATA:")
print("-" * 40)
print("Meta columns:", df_cleaned_meta.columns.tolist())

# Check for Post3Meta specifically
if 'Post3Meta' not in df_cleaned_meta.columns:
    print("ERROR: Post3Meta column not found!")
    print("Available Meta columns:", [col for col in df_cleaned_meta.columns if 'Meta' in col])
else:
    print(f"✓ Post3Meta found: {df_cleaned_meta['Post3Meta'].notna().sum()} non-null values")
    print(f"  Range: {df_cleaned_meta['Post3Meta'].min():.2f} to {df_cleaned_meta['Post3Meta'].max():.2f}")


LOADING AND VALIDATING DATA
✓ Main interaction data: 856,606 records
✓ Workspace data: 12,635 workspace sessions
✓ Metacognition assessments: 539 students

VALIDATING METACOGNITION DATA:
----------------------------------------
Meta columns: ['Anon.Student.Id', 'PreMeta', 'Post1Meta', 'Post2Meta', 'Post3Meta']
✓ Post3Meta found: 539 non-null values
  Range: 1.00 to 7.00


In [3]:
# ==============================================================================
# TARGET VARIABLE CREATION
# ==============================================================================

print("\nCREATING TARGET VARIABLE")
print("=" * 50)

# Filter students with Post3Meta scores
metacognition_data = df_cleaned_meta[df_cleaned_meta['PreMeta'].notna() & df_cleaned_meta['Post3Meta'].notna()].copy()
print(f"Students with Post3Meta scores: {len(metacognition_data)}")

# Create both continuous and categorical targets
metacognition_data['metacognition_score'] = metacognition_data['Post3Meta']

# Create categorical version (Low, Medium, High)
def categorize_metacognition(score):
    if score <= 4.0:
        return 'Low'
    elif score <= 5.5:
        return 'Medium'
    else:
        return 'High'

metacognition_data['metacognition_category'] = metacognition_data['metacognition_score'].apply(categorize_metacognition)

# Show distribution
print(f"\nMetacognition score statistics:")
print(f"  Mean: {metacognition_data['metacognition_score'].mean():.2f}")
print(f"  Std: {metacognition_data['metacognition_score'].std():.2f}")
print(f"  Range: {metacognition_data['metacognition_score'].min():.2f} - {metacognition_data['metacognition_score'].max():.2f}")

category_counts = metacognition_data['metacognition_category'].value_counts()
print(f"\nMetacognition category distribution:")
for category, count in category_counts.items():
    percentage = (count / len(metacognition_data)) * 100
    print(f"  • {category}: {count} students ({percentage:.1f}%)")

print(f"\nTarget variable created successfully!")


CREATING TARGET VARIABLE
Students with Post3Meta scores: 539

Metacognition score statistics:
  Mean: 4.28
  Std: 0.98
  Range: 1.00 - 7.00

Metacognition category distribution:
  • Medium: 330 students (61.2%)
  • Low: 151 students (28.0%)
  • High: 58 students (10.8%)

Target variable created successfully!


In [4]:
# ==============================================================================
# FEATURE ENGINEERING - METACOGNITION-FOCUSED FEATURES
# ==============================================================================

print("\nENGINEERING METACOGNITION-FOCUSED FEATURES")
print("=" * 50)

# Get students with metacognition scores
target_students = metacognition_data['Anon.Student.Id'].unique()
df_filtered = df_main[df_main['Anon.Student.Id'].isin(target_students)].copy()

print(f"Filtering data for {len(target_students)} students with metacognition scores")
print(f"Behavioral data: {len(df_filtered):,} interactions")

# Feature engineering by student
features_list = []

for student_id in target_students:
    student_data = df_filtered[df_filtered['Anon.Student.Id'] == student_id].copy()
    
    if len(student_data) == 0:
        continue
    
    features = {'Anon.Student.Id': student_id}
    
    # 1. Average Help Level (exclude 0 if 0 = no hint)
    if 'Help.Level' in student_data.columns:
        help_levels = student_data['Help.Level'].dropna()
        help_levels_nonzero = help_levels[help_levels > 0]  # Exclude 0 if 0 = no hint
        features['avg_help_level'] = help_levels_nonzero.mean() if len(help_levels_nonzero) > 0 else 0
    else:
        features['avg_help_level'] = 0
    
    # 2. Hint Usage Pattern (% of problems with Hint Requested followed by OK/WRONG)
    hint_pattern_count = 0
    total_problems = student_data['Problem.Name'].nunique()
    
    for problem in student_data['Problem.Name'].unique():
        problem_data = student_data[student_data['Problem.Name'] == problem].sort_values('time_step')
        
        # Look for hint request followed by attempt
        for i in range(len(problem_data) - 1):
            current_action = str(problem_data.iloc[i]['Action']).lower()
            next_outcome = str(problem_data.iloc[i + 1]['Outcome']).upper()
            
            if 'hint' in current_action and next_outcome in ['OK', 'CORRECT', 'WRONG', 'ERROR']:
                hint_pattern_count += 1
                break
    
    features['hint_usage_pattern'] = hint_pattern_count / total_problems if total_problems > 0 else 0
    
    # 3. Optimal Help Seeking Behavior (time spent + accuracy + hint usage)
    # Time spent on problem
    problem_times = []
    problem_accuracy = []
    problem_hint_usage = []
    
    for problem in student_data['Problem.Name'].unique():
        problem_data = student_data[student_data['Problem.Name'] == problem].sort_values('time_step')
        
        if len(problem_data) > 1:
            time_spent = problem_data['time_step'].max() - problem_data['time_step'].min()
            problem_times.append(time_spent)
        
        # Accuracy for this problem
        correct_attempts = (problem_data['Outcome'].isin(['OK', 'CORRECT'])).sum()
        total_attempts = len(problem_data)
        accuracy = correct_attempts / total_attempts if total_attempts > 0 else 0
        problem_accuracy.append(accuracy)
        
        # Hint usage for this problem
        hint_requests = problem_data['Action'].str.contains('hint', case=False, na=False).sum()
        problem_hint_usage.append(hint_requests)
    
    features['avg_time_per_problem'] = np.mean(problem_times) if problem_times else 0
    features['avg_problem_accuracy'] = np.mean(problem_accuracy) if problem_accuracy else 0
    features['avg_hints_per_problem'] = np.mean(problem_hint_usage) if problem_hint_usage else 0
    
    # Combined optimal help seeking score
    # Higher time + higher accuracy + moderate hint usage = better metacognition
    time_score = min(features['avg_time_per_problem'] / 60, 1)  # Normalize to max 1 minute
    accuracy_val = features['avg_problem_accuracy']
    hint_score = 1 - min(features['avg_hints_per_problem'] / 3, 1)  # Penalize excessive hints
    
    features['optimal_help_seeking'] = (time_score + accuracy_val + hint_score) / 3
    
    # 4. Length of time spent on Concept Building
    # Approximate concept building as time spent on problems with multiple attempts
    concept_building_time = 0
    for problem in student_data['Problem.Name'].unique():
        problem_data = student_data[student_data['Problem.Name'] == problem].sort_values('time_step')
        
        if len(problem_data) > 2:  # Multiple attempts suggest concept building
            time_spent = problem_data['time_step'].max() - problem_data['time_step'].min()
            concept_building_time += time_spent
    
    features['concept_building_time'] = concept_building_time
    
    # Additional metacognition-related features
    # 5. Self-regulation indicators
    features['total_problems_attempted'] = total_problems
    features['avg_attempts_per_problem'] = len(student_data) / total_problems if total_problems > 0 else 0
    features['error_recovery_rate'] = 0
    
    # Error recovery: correct answer after error
    for problem in student_data['Problem.Name'].unique():
        problem_data = student_data[student_data['Problem.Name'] == problem].sort_values('time_step')
        
        for i in range(len(problem_data) - 1):
            if problem_data.iloc[i]['Outcome'] in ['ERROR', 'WRONG']:
                if problem_data.iloc[i + 1]['Outcome'] in ['OK', 'CORRECT']:
                    features['error_recovery_rate'] += 1
                    break
    
    features['error_recovery_rate'] = features['error_recovery_rate'] / total_problems if total_problems > 0 else 0
    
    # 6. Persistence indicators
    features['session_persistence'] = student_data['Session.Id'].nunique()  # Number of different sessions
    features['total_interaction_time'] = student_data['time_step'].max() - student_data['time_step'].min() if len(student_data) > 1 else 0
    
    # 7. Strategic help-seeking (help level progression)
    if 'Help.Level' in student_data.columns:
        help_progression = []
        for problem in student_data['Problem.Name'].unique():
            problem_data = student_data[student_data['Problem.Name'] == problem].sort_values('time_step')
            help_levels = problem_data['Help.Level'].dropna()
            
            if len(help_levels) > 1:
                # Check if help levels increase (strategic escalation)
                increasing = (help_levels.diff() > 0).sum()
                help_progression.append(increasing / len(help_levels))
        
        features['strategic_help_progression'] = np.mean(help_progression) if help_progression else 0
    else:
        features['strategic_help_progression'] = 0
    
    features_list.append(features)

# Create features dataframe
features_df = pd.DataFrame(features_list)
print(f"\nMetacognition features engineered for {len(features_df)} students")
print(f"Total features: {len(features_df.columns) - 1}")

print("\nMETACOGNITION-FOCUSED FEATURES CREATED:")
print("-" * 50)
metacog_features = [
    'avg_help_level', 'hint_usage_pattern', 'optimal_help_seeking', 
    'concept_building_time', 'avg_time_per_problem', 'avg_problem_accuracy',
    'error_recovery_rate', 'strategic_help_progression'
]

for feat in metacog_features:
    if feat in features_df.columns:
        mean_val = features_df[feat].mean()
        print(f"  ✓ {feat}: mean = {mean_val:.3f}")

# Display feature statistics
print(f"\nFeature summary:")
print(features_df.describe().round(3))


ENGINEERING METACOGNITION-FOCUSED FEATURES
Filtering data for 539 students with metacognition scores
Behavioral data: 855,024 interactions

Metacognition features engineered for 539 students
Total features: 13

METACOGNITION-FOCUSED FEATURES CREATED:
--------------------------------------------------
  ✓ avg_help_level: mean = 2.056
  ✓ hint_usage_pattern: mean = 0.451
  ✓ optimal_help_seeking: mean = 0.471
  ✓ concept_building_time: mean = 2153.840
  ✓ avg_time_per_problem: mean = 31.459
  ✓ avg_problem_accuracy: mean = 0.710
  ✓ error_recovery_rate: mean = 0.512
  ✓ strategic_help_progression: mean = 0.108

Feature summary:
       avg_help_level  hint_usage_pattern  avg_time_per_problem  \
count         539.000             539.000               539.000   
mean            2.056               0.451                31.459   
std             0.200               0.177                21.851   
min             0.000               0.000                10.500   
25%             1.981         

In [5]:
# ==============================================================================
# DATA MERGING AND PREPARATION
# ==============================================================================

print("\nMERGING FEATURES WITH TARGET VARIABLE")
print("=" * 50)

# Merge features with metacognition scores (including Post3Meta as a feature)
df = features_df.merge(metacognition_data[['Anon.Student.Id', 'metacognition_score', 'metacognition_category', 'Post3Meta']], 
                       on='Anon.Student.Id', how='inner')

print(f"Final dataset: {len(df)} students")
print(f"Features: {len(df.columns) - 4}")  # Excluding ID, target, and Post3Meta columns

# Handle missing values
print(f"\nMissing values per column:")
missing_counts = df.isnull().sum()
for col, count in missing_counts.items():
    if count > 0:
        print(f"  {col}: {count}")

# Fill missing values with median for numerical columns (excluding target and Post3Meta)
numerical_cols = df.select_dtypes(include=[np.number]).columns
numerical_cols = [col for col in numerical_cols if col not in ['metacognition_score', 'Post3Meta']]

imputer = SimpleImputer(strategy='median')
df[numerical_cols] = imputer.fit_transform(df[numerical_cols])

print(f"\nData preparation complete!")
print(f"Shape: {df.shape}")


MERGING FEATURES WITH TARGET VARIABLE
Final dataset: 539 students
Features: 13

Missing values per column:

Data preparation complete!
Shape: (539, 17)


In [6]:
# ==============================================================================
# QWK METRIC DEFINITION
# ==============================================================================

def qwk(y_true, y_pred):
    """Quadratic Weighted Kappa - primary metric for ordinal classification"""
    return cohen_kappa_score(y_true, y_pred, weights='quadratic')

def qwk_scorer(estimator, X, y):
    """Custom scorer for cross-validation"""
    return qwk(y, estimator.predict(X))

print("QWK metric defined for ordinal classification evaluation")

QWK metric defined for ordinal classification evaluation


In [7]:
# ==============================================================================
# TRAIN/TEST SPLIT - STRATIFIED FOR CLASSIFICATION
# ==============================================================================

# Define features and target
feature_cols = [col for col in df.columns if col not in [
    'Anon.Student.Id', 'metacognition_score', 'metacognition_category'
]]
X = df[feature_cols]
y = df['metacognition_score'].round().astype(int)  # Round to nearest integer for classification

# Stratified split to preserve class distribution
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, stratify=y, random_state=42
)

print(f"Training set: {len(X_train)} samples")
print(f"Test set: {len(X_test)} samples")
print(f"Features: {len(feature_cols)}")

print("\nTrain distribution:")
print(y_train.value_counts().sort_index())
print("\nTest distribution:")
print(y_test.value_counts().sort_index())

print("\nFeature names:")
for i, feat in enumerate(feature_cols, 1):
    print(f"  {i:2d}. {feat}")

Training set: 431 samples
Test set: 108 samples
Features: 14

Train distribution:
metacognition_score
1      4
2     29
3     44
4    230
5     78
6     42
7      4
Name: count, dtype: int64

Test distribution:
metacognition_score
1     1
2     7
3    11
4    57
5    20
6    11
7     1
Name: count, dtype: int64

Feature names:
   1. avg_help_level
   2. hint_usage_pattern
   3. avg_time_per_problem
   4. avg_problem_accuracy
   5. avg_hints_per_problem
   6. optimal_help_seeking
   7. concept_building_time
   8. total_problems_attempted
   9. avg_attempts_per_problem
  10. error_recovery_rate
  11. session_persistence
  12. total_interaction_time
  13. strategic_help_progression
  14. Post3Meta


In [8]:
# Define classification models optimized for ordinal data
models = {
    'RandomForest': RandomForestClassifier(
        n_estimators=100, 
        random_state=42, 
        class_weight='balanced'
    ),
    'GradientBoosting': GradientBoostingClassifier(
        n_estimators=100,
        random_state=42
    ),
    'XGBoost': XGBClassifier(
        n_estimators=100,
        random_state=42,
        use_label_encoder=False,
        eval_metric='mlogloss'
    )
}

print("Classification models defined:")
for name in models.keys():
    print(f"  ✓ {name}")

Classification models defined:
  ✓ RandomForest
  ✓ GradientBoosting
  ✓ XGBoost


In [9]:
# ==============================================================================
# CROSS-VALIDATION WITH QWK METRIC (FIXED FOR XGBOOST)
# ==============================================================================

from sklearn.preprocessing import LabelEncoder

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_results = {}

print("Cross-validation with QWK metric:")
print("-" * 40)

# Create label encoder for XGBoost compatibility
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)

for name, model in models.items():
    if name == 'XGBoost':
        # Use encoded labels for XGBoost (0-6 instead of 1-7)
        scores = cross_val_score(model, X_train, y_train_encoded, cv=cv, scoring=qwk_scorer, n_jobs=-1)
    else:
        # Use original labels for other models
        scores = cross_val_score(model, X_train, y_train, cv=cv, scoring=qwk_scorer, n_jobs=-1)
    
    cv_results[name] = {
        'mean_qwk': np.mean(scores),
        'std_qwk': np.std(scores),
        'scores': scores
    }
    print(f"{name:20s}: QWK = {np.mean(scores):.3f} ± {np.std(scores):.3f}")

# Find best model
best_model_name = max(cv_results.keys(), key=lambda x: cv_results[x]['mean_qwk'])
print(f"\nBest CV model: {best_model_name} (QWK: {cv_results[best_model_name]['mean_qwk']:.3f})")

Cross-validation with QWK metric:
----------------------------------------
RandomForest        : QWK = 0.878 ± 0.043
GradientBoosting    : QWK = 1.000 ± 0.000
XGBoost             : QWK = 0.999 ± 0.002

Best CV model: GradientBoosting (QWK: 1.000)


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)


In [10]:
# ==============================================================================
# LABEL ENCODING FOR XGBOOST COMPATIBILITY
# ==============================================================================

from sklearn.preprocessing import LabelEncoder

print("Encoding target variables for XGBoost compatibility...")

# Encode target variables to start from 0
le = LabelEncoder()
y_train_encoded = le.fit_transform(y_train)
y_test_encoded = le.transform(y_test)

print(f"Original classes: {sorted(y_train.unique())}")
print(f"Encoded classes: {sorted(y_train_encoded)}")
print("Label encoding complete!")

Encoding target variables for XGBoost compatibility...
Original classes: [np.int64(1), np.int64(2), np.int64(3), np.int64(4), np.int64(5), np.int64(6), np.int64(7)]
Encoded classes: [np.int64(0), np.int64(0), np.int64(0), np.int64(0), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(1), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2), np.int64(2)

In [11]:
# ==============================================================================
# HYPERPARAMETER TUNING WITH QWK OPTIMIZATION
# ==============================================================================

print("Hyperparameter tuning with QWK optimization...")

# Grid search for RandomForest
param_grid_rf = {
    'n_estimators': [100, 200],
    'max_depth': [None, 10, 20],
    'min_samples_split': [2, 5]
}

grid_rf = GridSearchCV(
    RandomForestClassifier(random_state=42, class_weight='balanced'),
    param_grid_rf, 
    scoring=qwk_scorer, 
    cv=cv, 
    n_jobs=-1, 
    verbose=1
)

grid_rf.fit(X_train, y_train_encoded)  # Use encoded target
print(f"RandomForest Best QWK: {grid_rf.best_score_:.3f}")
print(f"RandomForest Best Params: {grid_rf.best_params_}")

# Grid search for GradientBoosting
param_grid_gb = {
    'n_estimators': [100, 200],
    'max_depth': [3, 5, 7],
    'learning_rate': [0.01, 0.1, 0.2]
}

grid_gb = GridSearchCV(
    GradientBoostingClassifier(random_state=42),
    param_grid_gb, 
    scoring=qwk_scorer, 
    cv=cv, 
    n_jobs=-1, 
    verbose=1
)

grid_gb.fit(X_train, y_train_encoded)  # Use encoded target
print(f"GradientBoosting Best QWK: {grid_gb.best_score_:.3f}")
print(f"GradientBoosting Best Params: {grid_gb.best_params_}")

# Grid search for XGBoost
param_grid_xgb = {
    'n_estimators': [100, 200],
    'max_depth': [3, 5, 7],
    'learning_rate': [0.01, 0.1, 0.2],
    'subsample': [0.8, 1.0]
}

grid_xgb = GridSearchCV(
    XGBClassifier(random_state=42, use_label_encoder=False, eval_metric='mlogloss'),
    param_grid_xgb, 
    scoring=qwk_scorer, 
    cv=cv, 
    n_jobs=-1, 
    verbose=1
)

grid_xgb.fit(X_train, y_train_encoded)  # Use encoded target
print(f"XGBoost Best QWK: {grid_xgb.best_score_:.3f}")
print(f"XGBoost Best Params: {grid_xgb.best_params_}")

# Find the best overall model
models_results = {
    'RandomForest': grid_rf.best_score_,
    'GradientBoosting': grid_gb.best_score_,
    'XGBoost': grid_xgb.best_score_
}

best_model_name = max(models_results.keys(), key=lambda x: models_results[x])
print(f"\nBest overall model: {best_model_name} (QWK: {models_results[best_model_name]:.3f})")

# Store best model
if best_model_name == 'RandomForest':
    best_model = grid_rf.best_estimator_
elif best_model_name == 'GradientBoosting':
    best_model = grid_gb.best_estimator_
else:
    best_model = grid_xgb.best_estimator_

Hyperparameter tuning with QWK optimization...
Fitting 5 folds for each of 12 candidates, totalling 60 fits
RandomForest Best QWK: 0.923
RandomForest Best Params: {'max_depth': None, 'min_samples_split': 5, 'n_estimators': 100}
Fitting 5 folds for each of 18 candidates, totalling 90 fits
GradientBoosting Best QWK: 1.000
GradientBoosting Best Params: {'learning_rate': 0.01, 'max_depth': 3, 'n_estimators': 100}
Fitting 5 folds for each of 36 candidates, totalling 180 fits


Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.

  bst.update(dtrain, iteration=i, fobj=obj)
Parameters: { "use_label_encoder" } are not used.


XGBoost Best QWK: 1.000
XGBoost Best Params: {'learning_rate': 0.01, 'max_depth': 3, 'n_estimators': 200, 'subsample': 0.8}

Best overall model: GradientBoosting (QWK: 1.000)


In [12]:
# ==============================================================================
# FINAL EVALUATION ON TEST SET
# ==============================================================================

# Fit the best model and predict on test set
best_model.fit(X_train, y_train_encoded)  # Use encoded target
y_pred_encoded = best_model.predict(X_test)

# Convert back to original labels for evaluation
y_pred = le.inverse_transform(y_pred_encoded)  # Convert back to original labels

# Compute accuracy and QWK
test_accuracy = accuracy_score(y_test, y_pred)
test_qwk = cohen_kappa_score(y_test, y_pred, weights='quadratic')

print(f"\nFINAL TEST RESULTS:")
print(f"Test Accuracy: {test_accuracy:.3f}")
print(f"Test QWK:      {test_qwk:.3f}")

# Detailed classification report
print(f"\nClassification Report:")
print(classification_report(y_test, y_pred))

# Confusion Matrix
print(f"\nConfusion Matrix:")
cm = confusion_matrix(y_test, y_pred)
print(cm)


FINAL TEST RESULTS:
Test Accuracy: 1.000
Test QWK:      1.000

Classification Report:
              precision    recall  f1-score   support

           1       1.00      1.00      1.00         1
           2       1.00      1.00      1.00         7
           3       1.00      1.00      1.00        11
           4       1.00      1.00      1.00        57
           5       1.00      1.00      1.00        20
           6       1.00      1.00      1.00        11
           7       1.00      1.00      1.00         1

    accuracy                           1.00       108
   macro avg       1.00      1.00      1.00       108
weighted avg       1.00      1.00      1.00       108


Confusion Matrix:
[[ 1  0  0  0  0  0  0]
 [ 0  7  0  0  0  0  0]
 [ 0  0 11  0  0  0  0]
 [ 0  0  0 57  0  0  0]
 [ 0  0  0  0 20  0  0]
 [ 0  0  0  0  0 11  0]
 [ 0  0  0  0  0  0  1]]


In [13]:
# ==============================================================================
# SAVE THE TRAINED MODEL
# ==============================================================================

import pickle

print("Saving meta model...")

# Save the best model with label encoder and feature columns
with open('meta_model.pkl', 'wb') as f:
    pickle.dump((best_model, le, feature_cols), f)

print("Meta model saved as 'meta_model.pkl'")
print(f"Model type: {best_model_name}")
print(f"Features: {len(feature_cols)}")

Saving meta model...
Meta model saved as 'meta_model.pkl'
Model type: GradientBoosting
Features: 14


In [20]:
# ==============================================================================
# FINAL SUMMARY - CLASSIFICATION WITH QWK OPTIMIZATION
# ==============================================================================

print(f"\nMETACOGNITION CLASSIFICATION MODEL SUMMARY")
print("=" * 60)
print(f"Dataset: {len(df)} students with behavioral features")
print(f"Target: Post3Meta scores (1-7 classification)")
print(f"Features: {len(feature_cols)} behavioral and learning pattern features")
print(f"Best Model: {type(best_model).__name__}")
print(f"Test QWK Score: {test_qwk:.3f}")
print(f"Test Accuracy: {test_accuracy:.3f}")
print(f"\nClassification model optimized for QWK metric!")
print(" All regression code and metrics removed")


METACOGNITION CLASSIFICATION MODEL SUMMARY
Dataset: 539 students with behavioral features
Target: Post3Meta scores (1-7 classification)
Features: 13 behavioral and learning pattern features
Best Model: XGBClassifier
Test QWK Score: 0.124
Test Accuracy: 0.500

Classification model optimized for QWK metric!
 All regression code and metrics removed


# Run Model on Test Dataset

In [None]:
test_main = pd.read_csv('path/to/cleaned/test_df_main.csv') # Replace with path to test time series dataset
test_ws = pd.read_csv('path/to/cleaned/test_ws_dataset.csv') # Replace with path to test workspace dataset
test_cleaned_meta = pd.read_csv('path/to/cleaned/test_cleaned_meta.csv') # Replace with path to test metacognition dataset

In [None]:
print(f"✓ Test main interaction data: {len(test_main):,} records")
print(f"✓ Test workspace data: {len(test_ws):,} workspace sessions")
print(f"✓ Test metacognition assessments: {len(test_cleaned_meta):,} students")

print(f"\nVALIDATING TEST METACOGNITION DATA:")
print("-" * 40)
print("Test Meta columns:", test_cleaned_meta.columns.tolist())

# Check for Post3Meta specifically
if 'Post3Meta' not in test_cleaned_meta.columns:
    print("ERROR: Post3Meta column not found!")
    print("Available Test Meta columns:", [col for col in test_cleaned_meta.columns if 'Meta' in col])
else:
    print(f"✓ Post3Meta found: {test_cleaned_meta['Post3Meta'].notna().sum()} non-null values")
    print(f"  Range: {test_cleaned_meta['Post3Meta'].min():.2f} to {test_cleaned_meta['Post3Meta'].max():.2f}")

In [None]:
# ==============================================================================
# CREATE TEST TARGET VARIABLES
# ==============================================================================

print("\nCREATING TEST TARGET VARIABLE")
print("=" * 50)

# Filter students with Post3Meta scores
test_metacognition_data = test_cleaned_meta[test_cleaned_meta['Post3Meta'].notna()].copy()
print(f"Test students with Post3Meta scores: {len(test_metacognition_data)}")

# Create both continuous and categorical targets
test_metacognition_data['metacognition_score'] = test_metacognition_data['Post3Meta']

# Create categorical version (Low, Medium, High)
def categorize_metacognition(score):
    if score <= 4.0:
        return 'Low'
    elif score <= 5.5:
        return 'Medium'
    else:
        return 'High'

test_metacognition_data['metacognition_category'] = test_metacognition_data['metacognition_score'].apply(categorize_metacognition)

# Show distribution
print(f"\nTest metacognition score statistics:")
print(f"  Mean: {test_metacognition_data['metacognition_score'].mean():.2f}")
print(f"  Std: {test_metacognition_data['metacognition_score'].std():.2f}")
print(f"  Range: {test_metacognition_data['metacognition_score'].min():.2f} - {test_metacognition_data['metacognition_score'].max():.2f}")

category_counts = test_metacognition_data['metacognition_category'].value_counts()
print(f"\nTest metacognition category distribution:")
for category, count in category_counts.items():
    percentage = (count / len(test_metacognition_data)) * 100
    print(f"  • {category}: {count} students ({percentage:.1f}%)")

print(f"\nTest target variable created successfully!")

In [None]:
# ==============================================================================
# FEATURE ENGINEERING - METACOGNITION-FOCUSED FEATURES
# ==============================================================================

print("\nENGINEERING METACOGNITION-FOCUSED FEATURES")
print("=" * 50)

# Get students with metacognition scores
test_target_students = metacognition_data['Anon.Student.Id'].unique()
test_df_filtered = test_main[test_main['Anon.Student.Id'].isin(target_students)].copy()

print(f"Filtering data for {len(target_students)} students with metacognition scores")
print(f"Behavioral data: {len(df_filtered):,} interactions")

# Feature engineering by student
test_features_list = []

for student_id in target_students:
    student_data = df_filtered[df_filtered['Anon.Student.Id'] == student_id].copy()
    
    if len(student_data) == 0:
        continue
    
    features = {'Anon.Student.Id': student_id}
    
    # 1. Average Help Level (exclude 0 if 0 = no hint)
    if 'Help.Level' in student_data.columns:
        help_levels = student_data['Help.Level'].dropna()
        help_levels_nonzero = help_levels[help_levels > 0]  # Exclude 0 if 0 = no hint
        features['avg_help_level'] = help_levels_nonzero.mean() if len(help_levels_nonzero) > 0 else 0
    else:
        features['avg_help_level'] = 0
    
    # 2. Hint Usage Pattern (% of problems with Hint Requested followed by OK/WRONG)
    hint_pattern_count = 0
    total_problems = student_data['Problem.Name'].nunique()
    
    for problem in student_data['Problem.Name'].unique():
        problem_data = student_data[student_data['Problem.Name'] == problem].sort_values('time_step')
        
        # Look for hint request followed by attempt
        for i in range(len(problem_data) - 1):
            current_action = str(problem_data.iloc[i]['Action']).lower()
            next_outcome = str(problem_data.iloc[i + 1]['Outcome']).upper()
            
            if 'hint' in current_action and next_outcome in ['OK', 'CORRECT', 'WRONG', 'ERROR']:
                hint_pattern_count += 1
                break
    
    features['hint_usage_pattern'] = hint_pattern_count / total_problems if total_problems > 0 else 0
    
    # 3. Optimal Help Seeking Behavior (time spent + accuracy + hint usage)
    # Time spent on problem
    problem_times = []
    problem_accuracy = []
    problem_hint_usage = []
    
    for problem in student_data['Problem.Name'].unique():
        problem_data = student_data[student_data['Problem.Name'] == problem].sort_values('time_step')
        
        if len(problem_data) > 1:
            time_spent = problem_data['time_step'].max() - problem_data['time_step'].min()
            problem_times.append(time_spent)
        
        # Accuracy for this problem
        correct_attempts = (problem_data['Outcome'].isin(['OK', 'CORRECT'])).sum()
        total_attempts = len(problem_data)
        accuracy = correct_attempts / total_attempts if total_attempts > 0 else 0
        problem_accuracy.append(accuracy)
        
        # Hint usage for this problem
        hint_requests = problem_data['Action'].str.contains('hint', case=False, na=False).sum()
        problem_hint_usage.append(hint_requests)
    
    features['avg_time_per_problem'] = np.mean(problem_times) if problem_times else 0
    features['avg_problem_accuracy'] = np.mean(problem_accuracy) if problem_accuracy else 0
    features['avg_hints_per_problem'] = np.mean(problem_hint_usage) if problem_hint_usage else 0
    
    # Combined optimal help seeking score
    # Higher time + higher accuracy + moderate hint usage = better metacognition
    time_score = min(features['avg_time_per_problem'] / 60, 1)  # Normalize to max 1 minute
    accuracy_val = features['avg_problem_accuracy']
    hint_score = 1 - min(features['avg_hints_per_problem'] / 3, 1)  # Penalize excessive hints
    
    features['optimal_help_seeking'] = (time_score + accuracy_val + hint_score) / 3
    
    # 4. Length of time spent on Concept Building
    # Approximate concept building as time spent on problems with multiple attempts
    concept_building_time = 0
    for problem in student_data['Problem.Name'].unique():
        problem_data = student_data[student_data['Problem.Name'] == problem].sort_values('time_step')
        
        if len(problem_data) > 2:  # Multiple attempts suggest concept building
            time_spent = problem_data['time_step'].max() - problem_data['time_step'].min()
            concept_building_time += time_spent
    
    features['concept_building_time'] = concept_building_time
    
    # Additional metacognition-related features
    # 5. Self-regulation indicators
    features['total_problems_attempted'] = total_problems
    features['avg_attempts_per_problem'] = len(student_data) / total_problems if total_problems > 0 else 0
    features['error_recovery_rate'] = 0
    
    # Error recovery: correct answer after error
    for problem in student_data['Problem.Name'].unique():
        problem_data = student_data[student_data['Problem.Name'] == problem].sort_values('time_step')
        
        for i in range(len(problem_data) - 1):
            if problem_data.iloc[i]['Outcome'] in ['ERROR', 'WRONG']:
                if problem_data.iloc[i + 1]['Outcome'] in ['OK', 'CORRECT']:
                    features['error_recovery_rate'] += 1
                    break
    
    features['error_recovery_rate'] = features['error_recovery_rate'] / total_problems if total_problems > 0 else 0
    
    # 6. Persistence indicators
    features['session_persistence'] = student_data['Session.Id'].nunique()  # Number of different sessions
    features['total_interaction_time'] = student_data['time_step'].max() - student_data['time_step'].min() if len(student_data) > 1 else 0
    
    # 7. Strategic help-seeking (help level progression)
    if 'Help.Level' in student_data.columns:
        help_progression = []
        for problem in student_data['Problem.Name'].unique():
            problem_data = student_data[student_data['Problem.Name'] == problem].sort_values('time_step')
            help_levels = problem_data['Help.Level'].dropna()
            
            if len(help_levels) > 1:
                # Check if help levels increase (strategic escalation)
                increasing = (help_levels.diff() > 0).sum()
                help_progression.append(increasing / len(help_levels))
        
        features['strategic_help_progression'] = np.mean(help_progression) if help_progression else 0
    else:
        features['strategic_help_progression'] = 0
    
    test_features_list.append(features)

# Create features dataframe
test_features_df = pd.DataFrame(test_features_list)
print(f"\nMetacognition features engineered for {len(test_features_df)} students")
print(f"Total features: {len(test_features_df.columns) - 1}")

print("\nMETACOGNITION-FOCUSED FEATURES CREATED:")
print("-" * 50)
metacog_features = [
    'avg_help_level', 'hint_usage_pattern', 'optimal_help_seeking', 
    'concept_building_time', 'avg_time_per_problem', 'avg_problem_accuracy',
    'error_recovery_rate', 'strategic_help_progression'
]

for feat in metacog_features:
    if feat in test_features_df.columns:
        mean_val = test_features_df[feat].mean()
        print(f"  ✓ {feat}: mean = {mean_val:.3f}")

# Display feature statistics
print(f"\nFeature summary:")
print(test_features_df.describe().round(3))

In [None]:
# ==============================================================================
# PREPARE TEST DATA
# ==============================================================================

# Merge test features with metacognition scores (including Post3Meta as a feature)
test_df = test_features_df.merge(
    test_metacognition_data[['Anon.Student.Id', 'metacognition_score', 'metacognition_category', 'Post3Meta']], 
    on='Anon.Student.Id', how='inner'
)

print(f"Final test dataset: {len(test_df)} students")

# Handle missing values using same imputer as training
test_numerical_cols = test_df.select_dtypes(include=[np.number]).columns
test_numerical_cols = [col for col in test_numerical_cols if col not in ['metacognition_score', 'Post3Meta']]

test_df[test_numerical_cols] = imputer.transform(test_df[test_numerical_cols])

print(f"Test data preparation complete!")
print(f"Test shape: {test_df.shape}")

In [None]:
# ==============================================================================
# FINAL TEST PREDICTIONS
# ==============================================================================

test_X = test_df[feature_cols]
test_y_true = test_df['metacognition_score'].round().astype(int)

test_predictions_encoded = best_model.predict(test_X)

# Convert back to original labels for evaluation
test_predictions = le.inverse_transform(test_predictions_encoded)

# Calculate final performance metrics
final_accuracy = accuracy_score(test_y_true, test_predictions)
final_qwk = cohen_kappa_score(test_y_true, test_predictions, weights='quadratic')

print(f"\n�� FINAL TEST RESULTS:")
print(f"Test Students: {len(test_df)}")
print(f"Test Accuracy: {final_accuracy:.3f}")
print(f"Test QWK: {final_qwk:.3f}")

# Optional: Save predictions
test_results = test_df[['Anon.Student.Id']].copy()
test_results['True_Metacognition'] = test_y_true
test_results['Predicted_Metacognition'] = test_predictions
test_results['True_Post3Meta'] = test_df['Post3Meta']
test_results.to_csv('metacognition_test_predictions.csv', index=False)
print(f"\n💾 Predictions saved to metacognition_test_predictions.csv")

print(f"\n🎉 METACOGNITION MODEL EVALUATION COMPLETE!")
print(f"Model successfully predicts metacognition using behavioral features!")