In [2]:
# ============================================================
# SkillTracer — Next-Item Recommender + API + HTML (f-string safe)
#   - Builds reco_catalog.pkl
#   - Writes recommender.py (NO f-strings), app2.py, index.html
# ============================================================
import os, csv, json, pickle, textwrap
import numpy as np
import pandas as pd

BASE = r"C:\Users\sagni\Downloads\SkillTracer Knowledge Tracing"
DATA_PATH = os.path.join(BASE, r"archive (1)\2012-2013-data-with-predictions-4-final.csv")
PP_PATH   = os.path.join(BASE, "preprocessor.pkl")
os.makedirs(BASE, exist_ok=True)

# ---------- helpers ----------
def is_zip_or_xlsx(path):
    try:
        with open(path, "rb") as f:
            return f.read(2) == b"PK"
    except Exception:
        return False

def robust_read_any(path, usecols=None):
    if not os.path.exists(path):
        raise FileNotFoundError(path)
    if is_zip_or_xlsx(path):
        import openpyxl
        df = pd.read_excel(path, engine="openpyxl", usecols=usecols)
        print(f"[INFO] Loaded as Excel: {os.path.basename(path)} shape={df.shape}")
        return df
    encodings = ["utf-8","utf-8-sig","cp1252","latin1"]
    delimiters = [";", ",", "\t", "|"]
    try:
        with open(path, "rb") as f:
            head = f.read(8192).decode("latin1", errors="ignore")
        try:
            sniffed = csv.Sniffer().sniff(head)
            if sniffed.delimiter in delimiters:
                delimiters = [sniffed.delimiter] + [d for d in delimiters if d != sniffed.delimiter]
        except Exception:
            pass
    except Exception:
        pass
    last_err = None
    for enc in encodings:
        for sep in delimiters:
            try:
                df = pd.read_csv(path, encoding=enc, sep=sep, engine="python", usecols=usecols)
                if df.shape[1] > 1:
                    print(f"[INFO] Loaded as CSV enc='{enc}', sep='{sep}', shape={df.shape}")
                    return df
            except Exception as e:
                last_err = e
                continue
    raise RuntimeError(f"Could not parse {path}. Last error: {last_err}")

def pick_col(candidates, cols):
    for c in candidates:
        if c in cols: return c
    lc = {c.lower(): c for c in cols}
    for c in candidates:
        if c.lower() in lc: return lc[c.lower()]
    return None

# ---------- load preprocessor (skill mapping) ----------
if not os.path.exists(PP_PATH):
    raise FileNotFoundError("preprocessor.pkl not found. Train first to create it.")
with open(PP_PATH, "rb") as f:
    preproc = pickle.load(f)
skill2idx = preproc["skill2idx"]
idx2skill = preproc.get("idx2skill", {i:s for s,i in skill2idx.items()})
n_skills  = preproc["n_skills"]

# ---------- load dataset and pick needed columns ----------
df_all = robust_read_any(DATA_PATH)
cols = list(df_all.columns)

student_col = pick_col(["student_id","user_id","Anon Student Id","Anon StudentID","student","sid"], cols)
skill_col   = pick_col(["skill_id","skill","tag","KC(SubSkills)","KC","skill_name","concept_id"], cols)
problem_col = pick_col(["problem_id","item_id","problem","question_id","Step ID","Problem Name"], cols)
correct_col = pick_col(["correct","is_correct","Correct First Attempt","answered_correctly","label"], cols)

need = [c for c in [student_col, skill_col, problem_col, correct_col] if c is not None]
df = df_all[need].copy()
del df_all

if student_col is None or skill_col is None or correct_col is None:
    raise ValueError(f"Missing essential columns in dataset. Found: {cols}")

# Normalize
df[correct_col] = pd.to_numeric(df[correct_col], errors="coerce")
df[correct_col] = (df[correct_col] > 0).astype(int)

def take_first_skill(v):
    if pd.isna(v): return np.nan
    s = str(v)
    for sep in ["~~","; ", ";", ",", "|"]:
        if sep in s: return s.split(sep)[0]
    return s

df[skill_col] = df[skill_col].apply(take_first_skill)
df = df.dropna(subset=[skill_col, correct_col])

# Filter to known skills (from training mapping)
df = df[df[skill_col].astype(str).isin(skill2idx.keys())].copy()

# ---------- compute per-skill stats ----------
k = 1.0  # Laplace smoothing
skill_grp = df.groupby(skill_col)[correct_col].agg(['sum','count']).rename(columns={'sum':'pos','count':'n'}).reset_index()
skill_grp['p_correct'] = (skill_grp['pos'] + k) / (skill_grp['n'] + 2*k)

# ---------- per-problem stats ----------
item_stats = {}
skill_to_items = {}
if problem_col is not None:
    need2 = df[[skill_col, problem_col, correct_col]].dropna()
    item_grp = need2.groupby([skill_col, problem_col])[correct_col].agg(['sum','count']).rename(columns={'sum':'pos','count':'n'}).reset_index()
    item_grp['p_correct'] = (item_grp['pos'] + k) / (item_grp['n'] + 2*k)
    eps = 1e-6
    item_grp['p_clip'] = item_grp['p_correct'].clip(eps, 1-eps)
    item_grp['b'] = -np.log(item_grp['p_clip']/(1-item_grp['p_clip']))  # 1PL difficulty
    for _, r in item_grp.iterrows():
        sk = str(r[skill_col]); it = str(r[problem_col])
        item_stats[it] = {
            "skill": sk,
            "n": int(r['n']),
            "p_correct": float(r['p_correct']),
            "b": float(r['b'])
        }
        skill_to_items.setdefault(sk, []).append(it)

# ---------- skill stats dict ----------
skill_stats = {}
for _, r in skill_grp.iterrows():
    sk = str(r[skill_col])
    skill_stats[sk] = {"n": int(r['n']), "p_correct": float(r['p_correct'])}

# ---------- save reco_catalog ----------
reco_catalog = {
    "skill_stats": skill_stats,
    "item_stats": item_stats,
    "skill_to_items": skill_to_items,
    "meta": {
        "problem_col_found": problem_col is not None,
        "problem_col_name": problem_col,
        "smoothing_k": k,
        "note": "1PL-style: P(correct) ~ sigmoid(theta_student - b_item); b estimated from global p_item."
    }
}
with open(os.path.join(BASE, "reco_catalog.pkl"), "wb") as f:
    pickle.dump(reco_catalog, f)
with open(os.path.join(BASE, "reco_catalog_preview.json"), "w", encoding="utf-8") as f:
    json.dump({"meta": reco_catalog["meta"], "num_skills": len(skill_stats), "num_items": len(item_stats)}, f, indent=2)

print("[OK] Built recommendation catalog: skills =", len(skill_stats), " items =", len(item_stats), " problem_col:", problem_col)

# ---------- recommender.py (NO f-strings; use placeholder replacement) ----------
recommender_py = textwrap.dedent('''
import os, json, pickle, math, numpy as np

BASE = r"<<<BASE>>>"
PP_PATH   = os.path.join(BASE, "preprocessor.pkl")
CATALOG   = os.path.join(BASE, "reco_catalog.pkl")
THRESHOLD = os.path.join(BASE, "threshold.json")

_pre = None
_cat = None
_thr = None

def _sigmoid(x):
    return 1.0/(1.0+math.exp(-x))

def _logit(p, eps=1e-6):
    p = min(max(p, eps), 1-eps)
    return math.log(p/(1-p))

def _load_pre():
    global _pre
    if _pre is None:
        with open(PP_PATH, "rb") as f:
            _pre = pickle.load(f)
    return _pre

def _load_cat():
    global _cat
    if _cat is None:
        with open(CATALOG, "rb") as f:
            _cat = pickle.load(f)
    return _cat

def _load_thr(default=0.5):
    global _thr
    if _thr is None:
        try:
            with open(THRESHOLD, "r", encoding="utf-8") as f:
                _thr = float(json.load(f).get("best_threshold", default))
        except Exception:
            _thr = default
    return _thr

def mastery_from_history(history, decay=0.3):
    """
    Exponential moving average per-skill:
    m_new = (1-decay)*m_prev + decay*correct
    Returns dict: skill -> mastery in [0,1], plus global p_student.
    """
    skill_m = {}
    skill_w = {}
    total_c = 0
    for e in history:
        if isinstance(e, dict):
            sk, cr = str(e.get("skill")), int(e.get("correct", 0))
        else:
            sk, cr = str(e[0]), int(e[1])
        if not sk:
            continue
        prev = skill_m.get(sk, 0.5)  # neutral start
        skill_m[sk] = (1.0 - decay)*prev + decay*cr
        skill_w[sk] = skill_w.get(sk, 0) + 1
        total_c += cr
    n = sum(skill_w.values())
    p_student = (total_c + 1) / (n + 2) if n > 0 else 0.5  # Laplace
    return skill_m, p_student

def recommend(history, top_k=5, target_low=0.60, target_high=0.75, min_item_count=30):
    """
    Returns top_k recommended problems (if available) or skills, aiming for predicted success in [target_low, target_high].
    Uses a simple 1PL-IRT estimate: for each skill, theta_skill=logit(p_student_skill) via EMA; for each item, P=σ(theta - b_item).
    Falls back to skill-level if no problem column in catalog.
    """
    pre = _load_pre()
    cat = _load_cat()
    skill_stats = cat["skill_stats"]
    item_stats  = cat["item_stats"]
    skill_to_items = cat["skill_to_items"]
    has_items = cat["meta"].get("problem_col_found", False)

    # per-skill mastery from history (EMA)
    skill_m, _ = mastery_from_history(history, decay=0.3)

    recs = []
    if has_items and len(item_stats) > 0:
        # item-level
        for sk, mastery in skill_m.items():
            base_p = skill_stats.get(sk, {}).get("p_correct", 0.5)
            theta = _logit( (0.9*mastery + 0.1*base_p) )
            for it in skill_to_items.get(sk, []):
                st = item_stats[it]
                if st["n"] < min_item_count:
                    continue
                p_hat = _sigmoid(theta - st["b"])
                if target_low <= p_hat <= target_high:
                    score = -abs((target_low+target_high)/2.0 - p_hat)
                    recs.append({"problem_id": it, "skill": sk, "pred_success": float(p_hat),
                                 "seen": int(st["n"]), "p_item": float(st["p_correct"]), "difficulty_b": float(st["b"]),
                                 "score": float(score)})
        # widen if not enough
        if len(recs) < top_k:
            extra = []
            band_low  = max(0.50, target_low - 0.10)
            band_high = min(0.85, target_high + 0.10)
            for sk, mastery in skill_m.items():
                base_p = skill_stats.get(sk, {}).get("p_correct", 0.5)
                theta = _logit( (0.9*mastery + 0.1*base_p) )
                for it in skill_to_items.get(sk, []):
                    st = item_stats[it]
                    if st["n"] < min_item_count:
                        continue
                    p_hat = _sigmoid(theta - st["b"])
                    if band_low <= p_hat <= band_high:
                        score = -abs((target_low+target_high)/2.0 - p_hat)
                        extra.append({"problem_id": it, "skill": sk, "pred_success": float(p_hat),
                                      "seen": int(st["n"]), "p_item": float(st["p_correct"]), "difficulty_b": float(st["b"]),
                                      "score": float(score)})
            recs = (recs + extra)
        recs.sort(key=lambda x: (x["score"], -x["seen"]), reverse=True)
        return recs[:top_k]
    else:
        # skill-level only
        for sk, mastery in skill_m.items():
            gap = 0.65 - mastery
            score = -abs(gap)
            recs.append({"skill": sk, "mastery": float(mastery), "score": float(score)})
        recs.sort(key=lambda x: x["score"], reverse=True)
        return recs[:top_k]
''').replace("<<<BASE>>>", BASE)

with open(os.path.join(BASE, "recommender.py"), "w", encoding="utf-8") as f:
    f.write(recommender_py)

# ---------- app2.py (no f-strings) ----------
app2_py = textwrap.dedent('''
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, conlist
from typing import List, Optional, Union
from predictor import predict, model_info
from recommender import recommend

app = FastAPI(title="SkillTracer API (with Recommender)", version="1.1.0",
              description="Predict next correctness AND recommend next items.")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"],
)

class HistoryEvent(BaseModel):
    skill: str = Field(..., description="Skill/concept")
    correct: int = Field(..., ge=0, le=1)

class PredictRequest(BaseModel):
    history: List[Union[HistoryEvent, conlist(Union[str, int], min_items=2, max_items=2)]]
    threshold: Optional[float] = Field(None, ge=0.0, le=1.0)

class PredictResponse(BaseModel):
    probability: Optional[float]
    threshold: float
    predicted_class: int
    note: Optional[str] = None

class RecommendRequest(BaseModel):
    history: List[Union[HistoryEvent, conlist(Union[str, int], min_items=2, max_items=2)]]
    top_k: int = Field(5, ge=1, le=20)
    target_low: float = Field(0.60, ge=0.0, le=1.0)
    target_high: float = Field(0.75, ge=0.0, le=1.0)
    min_item_count: int = Field(30, ge=1, description="Only consider items seen at least this many times in training")

@app.get("/health")
def health():
    return {"status": "ok", "model": model_info()}

@app.post("/predict", response_model=PredictResponse)
def predict_route(body: PredictRequest):
    try:
        normalized = []
        for e in body.history:
            if isinstance(e, list) or isinstance(e, tuple):
                normalized.append({"skill": str(e[0]), "correct": int(e[1])})
            else:
                normalized.append({"skill": e.skill, "correct": e.correct})
        return predict(normalized, threshold=body.threshold)
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.post("/recommend")
def recommend_route(body: RecommendRequest):
    try:
        normalized = []
        for e in body.history:
            if isinstance(e, list) or isinstance(e, tuple):
                normalized.append({"skill": str(e[0]), "correct": int(e[1])})
            else:
                normalized.append({"skill": e.skill, "correct": e.correct})
        recs = recommend(normalized, top_k=body.top_k,
                         target_low=body.target_low, target_high=body.target_high,
                         min_item_count=body.min_item_count)
        return {"recommendations": recs}
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

@app.get("/")
def root():
    return {
        "hello": "SkillTracer API (with Recommender)",
        "docs": "/docs",
        "try_predict": {
            "history": [["Algebra",1], ["Algebra",0], ["Fractions",1]],
            "threshold": 0.5
        },
        "try_recommend": {
            "history": [["Algebra",1], ["Algebra",0], ["Fractions",1]],
            "top_k": 5, "target_low": 0.60, "target_high": 0.75
        }
    }

# Run: uvicorn app2:app --host 0.0.0.0 --port 8000
''')
with open(os.path.join(BASE, "app2.py"), "w", encoding="utf-8") as f:
    f.write(app2_py)

# ---------- index.html ----------
index_html = textwrap.dedent(r'''
<!doctype html>
<html>
<head>
  <meta charset="utf-8"/>
  <title>SkillTracer Demo</title>
  <style>
    body { font-family: system-ui, -apple-system, Segoe UI, Roboto, sans-serif; margin: 30px; max-width: 900px; }
    textarea { width: 100%; height: 120px; }
    .row { display: flex; gap: 12px; align-items: center; }
    .row > * { flex: 1; }
    pre { background: #111; color: #0f0; padding: 12px; border-radius: 8px; overflow:auto; }
    button { padding: 10px 16px; border-radius: 8px; border: 1px solid #ddd; cursor: pointer; }
    button:hover { background: #f4f4f4; }
  </style>
</head>
<body>
  <h1>SkillTracer — Predict & Recommend</h1>
  <p><b>History JSON</b> (list of {"skill": "...", "correct": 0/1})</p>
  <textarea id="hist">[
  {"skill":"Algebra","correct":1},
  {"skill":"Algebra","correct":0},
  {"skill":"Fractions","correct":1}
]</textarea>
  <div class="row">
    <div>
      <label>Threshold</label>
      <input id="thr" type="number" step="0.01" min="0" max="1" value="0.5" />
    </div>
    <div>
      <label>Top K</label>
      <input id="topk" type="number" min="1" max="20" value="5" />
    </div>
    <div>
      <label>Target Band</label>
      <input id="low" type="number" step="0.01" min="0" max="1" value="0.60" />
      <input id="high" type="number" step="0.01" min="0" max="1" value="0.75" />
    </div>
  </div>
  <p>
    <button onclick="doPredict()">Predict</button>
    <button onclick="doRecommend()">Recommend</button>
  </p>
  <h3>Response</h3>
  <pre id="out"></pre>

<script>
async function doPredict() {
  const body = {
    history: JSON.parse(document.getElementById('hist').value),
    threshold: parseFloat(document.getElementById('thr').value)
  };
  const res = await fetch('/predict', {
    method: 'POST', headers: {'Content-Type':'application/json'}, body: JSON.stringify(body)
  });
  document.getElementById('out').textContent = JSON.stringify(await res.json(), null, 2);
}
async function doRecommend() {
  const body = {
    history: JSON.parse(document.getElementById('hist').value),
    top_k: parseInt(document.getElementById('topk').value),
    target_low: parseFloat(document.getElementById('low').value),
    target_high: parseFloat(document.getElementById('high').value)
  };
  const res = await fetch('/recommend', {
    method: 'POST', headers: {'Content-Type':'application/json'}, body: JSON.stringify(body)
  });
  document.getElementById('out').textContent = JSON.stringify(await res.json(), null, 2);
}
</script>
</body>
</html>
''')
with open(os.path.join(BASE, "index.html"), "w", encoding="utf-8") as f:
    f.write(index_html)

print("\n[DONE] Recommender pack written to:", BASE)
print("  - reco_catalog.pkl, reco_catalog_preview.json")
print("  - recommender.py (no f-strings)")
print("  - app2.py")
print("  - index.html")
print("\nStart the API:")
print(r'  cd "C:\Users\sagni\Downloads\SkillTracer Knowledge Tracing"')
print(r'  uvicorn app2:app --host 0.0.0.0 --port 8000')


[INFO] Loaded as CSV enc='utf-8', sep=',', shape=(6123270, 35)
[OK] Built recommendation catalog: skills = 265  items = 53091  problem_col: problem_id

[DONE] Recommender pack written to: C:\Users\sagni\Downloads\SkillTracer Knowledge Tracing
  - reco_catalog.pkl, reco_catalog_preview.json
  - recommender.py (no f-strings)
  - app2.py
  - index.html

Start the API:
  cd "C:\Users\sagni\Downloads\SkillTracer Knowledge Tracing"
  uvicorn app2:app --host 0.0.0.0 --port 8000
