In [1]:
def sparse_to_dense(x):
    return x.toarray()
import matplotlib.pyplot as plt
import pandas as pd
import re
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import FunctionTransformer,StandardScaler,LabelEncoder
from sklearn.model_selection import train_test_split,cross_val_predict
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier,GradientBoostingClassifier, VotingClassifier,RandomForestRegressor,HistGradientBoostingRegressor
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score,f1_score,precision_recall_curve,mean_absolute_error, mean_squared_error
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import TruncatedSVD
from sklearn.base import clone
from sklearn.svm import SVC, LinearSVC
from scipy.sparse import hstack
import numpy as np
import warnings
warnings.filterwarnings('ignore')
#LOAD
url = "https://raw.githubusercontent.com/AREEG94FAHAD/TaskComplexityEval-24/refs/heads/main/problems_data.jsonl"
df = pd.read_json(url, lines=True)
df = df.drop(columns=['sample_io', 'url', 'title'], errors='ignore')
#df.sample(5)
df.isnull().sum()
#plt.bar(df['problem_class'].value_counts().index, df['problem_class'].value_counts())

description           0
input_description     0
output_description    0
problem_class         0
problem_score         0
dtype: int64

In [2]:
targets = ["problem_score", "problem_class"]
text_cols = [c for c in df.columns if c not in targets]
df["full_text"] = df[text_cols].fillna("").astype(str).agg(" ".join, axis=1)

import unicodedata
import re

def clean_text(s: str) -> str:
    if not isinstance(s, str):
        return ""
    s = unicodedata.normalize("NFKD", s)
    s = s.lower()
    s = s.replace("≤", "<=").replace("≥", ">=").replace("≠", "!=")
    s = re.sub(r"[→⇒]", "->", s)
    s = re.sub(r"(\d+)\s*\^\s*(\d+)", r"\1^\2", s)
    s = re.sub(r"\s+", " ", s).strip()
    sentences = re.split(r"[.!?]", s)
    seen = set()
    deduped = []
    for sent in sentences:
        sent = sent.strip()
        if sent and sent not in seen:
            seen.add(sent)
            deduped.append(sent)
    s = ". ".join(deduped)
    return s

df["full_text"] = df["full_text"].apply(clean_text)
df = df.drop_duplicates(subset=["full_text"]).reset_index(drop=True)

# FEATURE ENGINEERING
KEYWORDS = [ "dp","greedy","binary search","two pointers","sliding window", "recursion","backtracking","divide and conquer","bitmask", "array","string","stack","queue","heap","priority queue", "hashmap","set","tree","binary tree","bst","segment tree", "fenwick","trie","graph","dag","linked list","disjoint set","union find", "bfs","dfs","shortest path","dijkstra","bellman ford","floyd", "mst","kruskal","prim","topological","cycle","bipartite", "modulo","gcd","lcm","prime","sieve","combinatorics","probability", "matrix","prefix sum","xor","bitwise", "substring","subsequence","palindrome","z algorithm","kmp","hashing", "simulation","implementation","geometry","game theory"]

def numeric_features(X):
    if isinstance(X, pd.DataFrame):
        text = X.iloc[:, 0]
    elif isinstance(X, pd.Series):
        text = X
    else:
        text = pd.Series(X)
    text = text.fillna("").astype(str)
    length = text.str.len().values.reshape(-1, 1)
    math_symbols = text.str.count(r"[=<>+\-*/%^]").values.reshape(-1, 1)
    keyword_counts = np.column_stack([
        text.str.contains(rf"\b{k}\b", case=False, regex=True).astype(int)
        for k in KEYWORDS
    ])
    return np.hstack([length, math_symbols, keyword_counts])

num_feat = FunctionTransformer(numeric_features, validate=False)

features = ColumnTransformer([
    ("tfidf", TfidfVectorizer(
        ngram_range=(1,2),
        min_df=3,
        max_df=0.9,
        sublinear_tf=True
    ), "full_text"),
    ("numeric", Pipeline([
        ("extract", num_feat),
        ("scale", StandardScaler())
    ]), "full_text")
])

# SINGLE CONSISTENT SPLIT FOR EVERYTHING
X_all = df[["full_text"]]
y_class = df["problem_class"]
y_score = df["problem_score"]

X_train, X_test, y_train_class, y_test_class, y_train_score, y_test_score = train_test_split(
    X_all, y_class, y_score,
    test_size=0.2,
    stratify=y_class,
    random_state=42
)

# CLASSIFICATION (2-STAGE)
y_train_s1 = (y_train_class == "hard").astype(int)

stage1 = Pipeline([
    ("features", clone(features)),
    ("clf", RandomForestClassifier(
        n_estimators=500,
        max_depth=18,
        min_samples_leaf=4,
        min_samples_split=10,
        class_weight={0: 1.0, 1: 1.32},
        n_jobs=-1,
        random_state=42
    ))
])

cv_probs = cross_val_predict(
    stage1,
    X_train,
    y_train_s1,
    cv=5,
    method="predict_proba",
    n_jobs=-1
)[:, 1]

best_acc = -1
best_threshold = 0.5
for t in np.linspace(0.3, 0.7, 100):
    preds = (cv_probs >= t).astype(int)
    acc = accuracy_score(y_train_s1, preds)
    if acc > best_acc:
        best_acc = acc
        best_threshold = t

HARD_T = best_threshold
print(f"Frozen HARD threshold (CV): {HARD_T:.4f}")

stage1.fit(X_train, y_train_s1)
hard_proba_test = stage1.predict_proba(X_test)[:, 1]
hard_pred_test = hard_proba_test >= HARD_T

mask_train_s2 = y_train_class != "hard"
mask_test_s2  = ~hard_pred_test

X_train_s2 = X_train.loc[mask_train_s2]
y_train_s2 = y_train_class.loc[mask_train_s2]

X_test_s2 = X_test.loc[mask_test_s2]

stage2 = Pipeline([
    ("features", clone(features)),
    ("clf", LinearSVC(
        C=0.732,
        class_weight={
            "easy": 1.3,
            "medium": 1.0
        },
        max_iter=10000
    ))
])

stage2.fit(X_train_s2, y_train_s2)
stage2_pred_test = stage2.predict(X_test_s2)

final_pred_class = np.array(["hard"] * len(X_test), dtype=object)
final_pred_class[mask_test_s2] = stage2_pred_test

print("Accuracy:", accuracy_score(y_test_class, final_pred_class))
print(confusion_matrix(y_test_class, final_pred_class))
print(classification_report(y_test_class, final_pred_class))
from sklearn.linear_model import Ridge
regressors = {}
for cls in ["easy", "medium", "hard"]:
    mask_cls_train = (y_train_class == cls)
    X_cls_train = X_train.loc[mask_cls_train, ["full_text"]]
    y_cls_train = y_train_score.loc[mask_cls_train]

    reg_pipe = Pipeline([
        ("features", clone(features)),
        ("to_dense", FunctionTransformer(sparse_to_dense, accept_sparse=True)),
        ("reg", Ridge(alpha=1.0))
    ])
    
    reg_pipe.fit(X_cls_train, y_cls_train)
    regressors[cls] = reg_pipe

def predict_class_and_score_batch(X_text, classifier_stage1, classifier_stage2, hard_threshold, regressors):
    # Stage1 hard vs not-hard
    hard_proba = classifier_stage1.predict_proba(X_text)[:, 1]
    hard_mask = hard_proba >= hard_threshold
    pred_not_hard = classifier_stage2.predict(X_text[~hard_mask])
    final_classes = np.array(["hard"] * len(X_text), dtype=object)
    final_classes[~hard_mask] = pred_not_hard

   
    scores = np.zeros(len(X_text), dtype=float)
    for cls in ["easy", "medium", "hard"]:
        idx = np.where(final_classes == cls)[0]
        if len(idx) > 0:
            raw_scores = regressors[cls].predict(X_text.iloc[idx])
            if cls == "easy":
                scores[idx] = np.minimum(raw_scores, 2.8)
            elif cls == "medium":
                scores[idx] = np.clip(raw_scores, 2.9, 5.6)
            elif cls == "hard":
                scores[idx] = np.clip(raw_scores, 5.6, 10.0)

    return final_classes, scores

final_classes_test, final_scores_test = predict_class_and_score_batch(
    X_test, stage1, stage2, HARD_T, regressors
)

mae = mean_absolute_error(y_test_score, final_scores_test)
rmse = np.sqrt(mean_squared_error(y_test_score, final_scores_test))
print(f"Score MAE (test): {mae:.4f}")
print(f"Score RMSE (test): {rmse:.4f}")


Frozen HARD threshold (CV): 0.5384
Accuracy: 0.5176184690157959
[[ 80  20  53]
 [ 39 210 140]
 [ 41 104 136]]
              precision    recall  f1-score   support

        easy       0.50      0.52      0.51       153
        hard       0.63      0.54      0.58       389
      medium       0.41      0.48      0.45       281

    accuracy                           0.52       823
   macro avg       0.51      0.52      0.51       823
weighted avg       0.53      0.52      0.52       823

Score MAE (test): 1.8084
Score RMSE (test): 2.3504


In [None]:
import joblib

joblib.dump(stage1, "stage1_hard_classifier.pkl")
joblib.dump(HARD_T, "hard_threshold.pkl")
joblib.dump(stage2, "stage2_easy_medium.pkl")
joblib.dump(regressors, "score_regressors.pkl")

In [None]:
df.groupby("problem_class").agg(
    count=("problem_score", "count"),
    mn=("problem_score", "min"),
    mx=("problem_score", "max"),
    mean=("problem_score", "mean"),
    std=("problem_score", "std"),
    length_mean=("full_text", lambda x: x.str.len().mean())
)