In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/map-charting-student-math-misunderstandings/sample_submission.csv
/kaggle/input/map-charting-student-math-misunderstandings/train.csv
/kaggle/input/map-charting-student-math-misunderstandings/test.csv


In [2]:
# train = pd.read_csv('/kaggle/input/map-charting-student-math-misunderstandings/train.csv')
# test = pd.read_csv('/kaggle/input/map-charting-student-math-misunderstandings/test.csv')

In [3]:
import os, re, random, unicodedata
from pathlib import Path

import numpy as np
import pandas as pd
from scipy.sparse import hstack

from sklearn.preprocessing import LabelEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import train_test_split

# Reproducibility
def seed_all(seed=42):
    random.seed(seed); np.random.seed(seed)
seed_all(42)

# Quiet pandas
pd.set_option("display.max_colwidth", 200)

# Helper to locate CSVs locally or under /kaggle/input
def find_csv(name: str) -> Path:
    p = Path(name)
    if p.exists(): return p
    for root, _, files in os.walk("/kaggle/input"):
        if name in files:
            return Path(root)/name
    raise FileNotFoundError(f"Could not find {name} in working dir or /kaggle/input")

# Light text normalization
def normalize(s: str) -> str:
    s = unicodedata.normalize("NFKC", str(s))
    s = s.strip().lower()
    s = re.sub(r"\s+", " ", s)
    return s


In [4]:
# Load train/test

df_train = pd.read_csv("/kaggle/input/map-charting-student-math-misunderstandings/train.csv")
df_test  = pd.read_csv("/kaggle/input/map-charting-student-math-misunderstandings/test.csv")

# Detect ID & label columns
def detect_columns(df_train: pd.DataFrame, df_test: pd.DataFrame):
    id_cands = [c for c in df_test.columns if any(k in c.lower() for k in ["row_id","id","responseid","sample_id"])]
    ID_COL = id_cands[0] if id_cands else df_test.columns[0]
    CATEGORY_COL = next((c for c in df_train.columns if "category" in c.lower()), None)
    MISCON_COL   = next((c for c in df_train.columns if "miscon" in c.lower()), None)
    return ID_COL, CATEGORY_COL, MISCON_COL

ID_COL, CATEGORY_COL, MISCON_COL = detect_columns(df_train, df_test)
assert CATEGORY_COL is not None, "Category column not found in train."

# Ensure necessary text cols exist
for c in ["QuestionText","MC_Answer","StudentExplanation"]:
    if c in df_train.columns: df_train[c] = df_train[c].fillna("")
    if c in df_test.columns:  df_test[c]  = df_test[c].fillna("")

# Fallback text col if schema differs
RAW_TEXT_COL = "StudentExplanation" if "StudentExplanation" in df_train.columns else df_train.select_dtypes("object").columns[0]


In [5]:
# Build rich text: Q + MC + EXP (works even if some columns are missing)
def rich_text(df: pd.DataFrame) -> pd.Series:
    q  = df["QuestionText"]       if "QuestionText" in df.columns else df[RAW_TEXT_COL]
    mc = df["MC_Answer"]          if "MC_Answer" in df.columns else ""
    ex = df["StudentExplanation"] if "StudentExplanation" in df.columns else df[RAW_TEXT_COL]
    return ("q: " + q.astype(str) + " [mc] " + mc.astype(str) + " [exp] " + ex.astype(str)).map(normalize)

train_text = rich_text(df_train)
test_text  = rich_text(df_test)

# Ensure misconception column
if MISCON_COL is None:
    MISCON_COL = "__Misconception"
    df_train[MISCON_COL] = "None"
df_train[MISCON_COL]   = df_train[MISCON_COL].fillna("None").astype(str)
df_train[CATEGORY_COL] = df_train[CATEGORY_COL].fillna("True_Correct").astype(str)

# Canonicalize categories to the 3 allowed ones
ALLOWED_CATS = {"True_Correct","False_Neither","False_Misconception"}
def canon_cat(cat: str) -> str:
    s = str(cat)
    if "misconception" in s: return "False_Misconception"
    if "neither" in s:       return "False_Neither"
    if "correct" in s:       return "True_Correct"
    return "False_Neither"   # conservative fallback

df_train["__cat_canon"] = df_train[CATEGORY_COL].map(canon_cat)

# Mis label encoder (ensure 'None' exists)
mis_labels = sorted(set(df_train[MISCON_COL].tolist()) | {"None"})
le_mis = LabelEncoder().fit(mis_labels)

# Category encoder (on canonical labels)
le_cat = LabelEncoder().fit(sorted(ALLOWED_CATS))

# Encoded targets
y_cat = le_cat.transform(df_train["__cat_canon"].values)
y_mis = le_mis.transform(df_train[MISCON_COL].values)

# Maps
id2cat = {i:c for i,c in enumerate(le_cat.classes_)}
id2mis = {i:m for i,m in enumerate(le_mis.classes_)}

# Convenience
NONE_MIS = "None"
NONE_MIS_ID = le_mis.transform([NONE_MIS])[0] if NONE_MIS in le_mis.classes_ else None

# SAFE_MIS = most frequent non-None in train (for padding/fallbacks)
mis_counts = df_train[MISCON_COL].value_counts()
SAFE_MIS = next((m for m,_ in mis_counts.items() if m != "None"), "None")

In [6]:
# Fit TF-IDF on train + test text (unsupervised feature extraction is fine)
all_text = pd.concat([train_text, test_text], axis=0)

tfidf_word = TfidfVectorizer(
    analyzer="word", ngram_range=(1,2), min_df=2, max_features=200_000, sublinear_tf=True
)
Xw = tfidf_word.fit_transform(all_text)

tfidf_char = TfidfVectorizer(
    analyzer="char_wb", ngram_range=(3,5), min_df=2, max_features=200_000, sublinear_tf=True
)
Xc = tfidf_char.fit_transform(all_text)

# Combine
from scipy.sparse import csr_matrix
X = hstack([Xw, Xc]).tocsr()
X_train = X[:len(df_train)]
X_test  = X[len(df_train):]

In [7]:
# Cell 5 — Train two linear models (with single-class safeguards)
import numpy as np
from sklearn.linear_model import SGDClassifier

def _constant_proba_fn(n_samples: int, n_classes: int, hot_idx: int):
    """Return an (n_samples x n_classes) array with prob=1 at hot_idx, else 0."""
    vec = np.zeros((n_classes,), dtype=np.float32)
    vec[hot_idx] = 1.0
    return np.tile(vec, (n_samples, 1))

def train_or_constant_proba(X_tr, y_tr, n_classes: int, prefer_idx: int | None = None):
    """
    If y_tr has >=2 classes: fit SGDClassifier with log_loss and return predict_proba fn.
    If y_tr has 1 class: return a constant-probability function (all test rows predict that class).
    prefer_idx is used when y_tr is empty (shouldn't happen) or when you want to force a specific class index.
    """
    uniq = np.unique(y_tr)
    if len(uniq) < 2:
        # Pick the only class (or the preferred one if provided)
        hot = int(uniq[0]) if len(uniq) == 1 else (0 if prefer_idx is None else int(prefer_idx))
        def proba_fn(X):
            return _constant_proba_fn(X.shape[0], n_classes, hot)
        return None, proba_fn

    clf = SGDClassifier(
        loss="log_loss",
        class_weight="balanced",
        max_iter=30,
        tol=1e-3,
        random_state=42
    )
    clf.fit(X_tr, y_tr)
    return clf, clf.predict_proba

# --- Train category head (label space == le_cat.classes_) ---
n_cat = len(le_cat.classes_)
cat_clf, cat_proba_fn = train_or_constant_proba(X_train, y_cat, n_cat)

# --- Train misconception head (label space == le_mis.classes_) ---
# If single-class, prefer mapping to 'None' if available
n_mis = len(le_mis.classes_)
prefer_none_idx = int(NONE_MIS_ID) if (NONE_MIS_ID is not None) else None
mis_clf, mis_proba_fn = train_or_constant_proba(X_train, y_mis, n_mis, prefer_idx=prefer_none_idx)

# --- Get probabilities for the test split ---
cat_proba_test = cat_proba_fn(X_test)  # shape [N_test, n_cat]
mis_proba_test = mis_proba_fn(X_test)  # shape [N_test, n_mis]

# (optional) quick shape sanity
# print(cat_proba_test.shape, mis_proba_test.shape)

In [8]:
# Decode with gating and canonical categories; produce up to 3 unique tokens
def top3_combined_rows(cat_probs: np.ndarray, mis_probs: np.ndarray, topk: int = 3,
                       mis_max_per_row: int = 1, mis_min_conf: float = 0.08):
    N, Cc = cat_probs.shape
    Cm = mis_probs.shape[1]
    preds = []

    cat_names = [id2cat[i] for i in range(Cc)]
    mis_names = [id2mis[i] for i in range(Cm)]

    for i in range(N):
        c_row = cat_probs[i]
        m_row = mis_probs[i].copy()

        # Exclude 'None' when pairing with mis category
        if NONE_MIS_ID is not None:
            m_wo_none = m_row.copy(); m_wo_none[NONE_MIS_ID] = 0.0
        else:
            m_wo_none = m_row

        # top cats (already canonical)
        top_c_idx = np.argsort(-c_row)[:5]
        top_c_p   = c_row[top_c_idx]

        # top misconceptions (excl None)
        top_m_idx = np.argsort(-m_wo_none)[:10]
        top_m_p   = m_wo_none[top_m_idx]

        pool, used_c = [], set()
        for ci, cp in zip(top_c_idx, top_c_p):
            c_name = cat_names[ci]  # canonical by training
            if c_name in used_c:    # avoid duplicate category rows
                continue
            used_c.add(c_name)

            if c_name == "False_Misconception" and len(top_m_idx) > 0:
                used_here = 0
                for mi, mp in zip(top_m_idx, top_m_p):
                    if mp < mis_min_conf: continue
                    pool.append((cp * mp, f"{c_name}:{mis_names[mi]}"))
                    used_here += 1
                    if used_here >= mis_max_per_row: break
            else:
                # Non-mis categories are paired with None
                if NONE_MIS_ID is not None:
                    pool.append((cp, f"{c_name}:{mis_names[NONE_MIS_ID]}"))

        # Rank and pick unique top-k
        pool.sort(key=lambda x: x[0], reverse=True)
        uniq, seen = [], set()
        for _, lab in pool:
            if lab not in seen:
                seen.add(lab); uniq.append(lab)
            if len(uniq) == topk: break

        # Fallback
        if not uniq:
            best_c = cat_names[int(np.argmax(c_row))]
            none_m = mis_names[NONE_MIS_ID] if NONE_MIS_ID is not None else "None"
            uniq = [f"{best_c}:{none_m}"]

        preds.append(" ".join(uniq))
    return preds

pred_strings_raw = top3_combined_rows(cat_proba_test, mis_proba_test, topk=3)

In [9]:
# Final pass: map to allowed categories, force exactly 3 tokens per row
VALID_CATS = {"True_Correct","False_Neither","False_Misconception"}
VALID_MIS  = set(id2mis.values())
NONE_MIS   = "None"
SAFE_MIS   = SAFE_MIS  # from earlier (most common non-None)

def canon_cat(cat: str) -> str:
    # Should already be canonical, but keep this as a guard
    if cat in VALID_CATS: return cat
    s = str(cat)
    if "misconception" in s: return "False_Misconception"
    if "neither" in s:       return "False_Neither"
    if "correct" in s:       return "True_Correct"
    return "False_Neither"

def fix_prediction(pred: str) -> str:
    toks = str(pred).strip().split()
    cleaned, seen = [], set()
    for t in toks:
        if ":" not in t: continue
        cat, mis = t.split(":", 1)
        cat = canon_cat(cat)
        if cat not in VALID_CATS: continue
        if cat != "False_Misconception":
            mis = NONE_MIS
        else:
            if mis not in VALID_MIS:
                mis = SAFE_MIS
        lab = f"{cat}:{mis}"
        if lab in seen: continue
        cleaned.append(lab); seen.add(lab)
        if len(cleaned) == 3: break

    # Pad to exactly 3
    pad_order = [
        f"True_Correct:{NONE_MIS}",
        f"False_Neither:{NONE_MIS}",
        f"False_Misconception:{SAFE_MIS}"
    ]
    for lab in pad_order:
        if len(cleaned) == 3: break
        if lab not in seen:
            cleaned.append(lab); seen.add(lab)
    return " ".join(cleaned[:3])

pred_strings = [fix_prediction(s) for s in pred_strings_raw]

In [10]:
# ===== Optional holdout sanity check (safe with single-class heads) =====
from sklearn.model_selection import train_test_split
from scipy.sparse import hstack
import numpy as np

def map_at_3(truths, preds_lists):
    s = 0.0
    for t, ps in zip(truths, preds_lists):
        gain = 0.0
        for i, p in enumerate(ps[:3]):
            if p == t:
                gain = 1.0 / (i + 1)
                break
        s += gain
    return s / max(1, len(truths))

# If you defined canon_cat earlier in the notebook, reuse it; else a safe guard:
try:
    canon_cat
except NameError:
    VALID_CATS = {"True_Correct","False_Neither","False_Misconception"}
    def canon_cat(cat: str) -> str:
        if cat in VALID_CATS: return cat
        s = str(cat)
        if "misconception" in s: return "False_Misconception"
        if "neither" in s:       return "False_Neither"
        if "correct" in s:       return "True_Correct"
        return "False_Neither"

# Robust stratification (fallback to category when combo is singleton)
df_train["__combo"] = df_train[CATEGORY_COL].astype(str) + ":" + df_train[MISCON_COL].astype(str)
vc = df_train["__combo"].value_counts()
strat = df_train["__combo"].where(df_train["__combo"].map(vc) >= 2, df_train[CATEGORY_COL])

idx = np.arange(len(df_train))
tr_idx, va_idx = train_test_split(idx, test_size=0.15, random_state=42, stratify=strat)

# Build val features using the SAME fitted vectorizers
va_text = rich_text(df_train.iloc[va_idx])
Xw_va = tfidf_word.transform(va_text)
Xc_va = tfidf_char.transform(va_text)
X_va  = hstack([Xw_va, Xc_va])

# Wrap predict_proba safely (supports None -> constant proba)
def _wrap_proba(clf, n_classes, hot_idx=0):
    if clf is None:
        def fn(X):
            out = np.zeros((X.shape[0], n_classes), dtype=np.float32)
            out[:, int(hot_idx)] = 1.0
            return out
        return fn
    return clf.predict_proba

# Prefer 'None' for misconception head if single-class
n_cat = len(le_cat.classes_)
n_mis = len(le_mis.classes_)
_hot_none = int(NONE_MIS_ID) if ('NONE_MIS_ID' in globals() and NONE_MIS_ID is not None) else 0

# Use the proba functions if they exist; otherwise wrap the classifiers
cat_pf = cat_proba_fn if 'cat_proba_fn' in globals() else _wrap_proba(cat_clf, n_cat, 0)
mis_pf = mis_proba_fn if 'mis_proba_fn' in globals() else _wrap_proba(mis_clf, n_mis, _hot_none)

cat_p_va = cat_pf(X_va)
mis_p_va = mis_pf(X_va)

# Decode and evaluate
pred_va_strings = top3_combined_rows(cat_p_va, mis_p_va, topk=3)
pred_va_strings = [fix_prediction(s) for s in pred_va_strings]
truth_va = (df_train.iloc[va_idx][CATEGORY_COL].map(canon_cat) + ":" + df_train.iloc[va_idx][MISCON_COL]).tolist()

print("Holdout MAP@3 (sanity):", f"{map_at_3(truth_va, [s.split() for s in pred_va_strings]):.3f}")


Holdout MAP@3 (sanity): 0.615


In [11]:
# Build final submission with exact schema
sub = pd.DataFrame({
    "row_id": df_test[ID_COL].astype(np.int64).values,
    "Category:Misconception": pred_strings
})
sub.to_csv("submission.csv", index=False)

# Save where Kaggle expects it (+ a copy)
sub.to_csv("submission.csv", index=False)
Path("outputs").mkdir(parents=True, exist_ok=True)
sub.to_csv("outputs/submission.csv", index=False)

sub.head(5)


Unnamed: 0,row_id,Category:Misconception
0,36696,False_Neither:None True_Correct:None False_Misconception:Incomplete
1,36697,False_Neither:None False_Misconception:WNB True_Correct:None
2,36698,False_Neither:None False_Misconception:Shorter_is_bigger True_Correct:None


In [12]:
# # Minimal "download my submission" cell

# import os, numpy as np, pandas as pd, zipfile
# from pathlib import Path
# from IPython.display import display, FileLink

# # --- build or reuse `sub` ---
# if "sub" not in globals():
#     # Fallback: build from test + pred_strings (already computed earlier)
#     assert "test" in globals() and isinstance(test, pd.DataFrame), "`test` DataFrame not found"
#     assert "ID_COL" in globals() and isinstance(ID_COL, str) and ID_COL in test.columns, "Bad ID_COL"
#     assert "pred_strings" in globals() and len(pred_strings) == len(test), "Run prediction first"

#     sub = pd.DataFrame({
#         "row_id": test[ID_COL].astype(np.int64).values,
#         "prediction": pred_strings
#     })
# else:
#     # Ensure proper schema if `sub` already exists
#     if "row_id" not in sub.columns or "prediction" not in sub.columns:
#         assert "test" in globals() and "ID_COL" in globals() and "pred_strings" in globals(), \
#             "Missing pieces to rebuild submission"
#         sub = pd.DataFrame({
#             "row_id": test[ID_COL].astype(np.int64).values,
#             "prediction": pred_strings
#         })

# # --- light validation (won't raise if okay) ---
# assert list(sub.columns) == ["row_id","prediction"], "Submission must have columns ['row_id','prediction']"
# assert len(sub) > 0 and sub["row_id"].is_unique, "row_id must be unique and non-empty"
# assert sub["prediction"].notna().all(), "prediction contains NaNs"

# # --- save files where Kaggle expects them ---
# sub.to_csv("submission.csv", index=False)
# Path("outputs").mkdir(parents=True, exist_ok=True)
# sub.to_csv("outputs/submission.csv", index=False)

# # --- make a zip if you prefer downloading a single file ---
# with zipfile.ZipFile("submission.zip", "w", compression=zipfile.ZIP_DEFLATED) as zf:
#     zf.write("submission.csv")

# # --- quick preview + download links ---
# print("Saved files:")
# for p in ["submission.csv", "submission.zip", "outputs/submission.csv"]:
#     if Path(p).exists():
#         print(" -", p, f"({Path(p).stat().st_size} bytes)")

# display(sub.head(5))

# print("\nClick to download:")
# display(FileLink("submission.csv"))
# display(FileLink("submission.zip"))
