# 17) Offline Testing — Best Model inference
Maqsad:
- `Models/best_model/optuna_logreg_best/optuna_logreg_best.joblib`
- `..._thresholds.json`
- `Data/Feature_Selected/<VERSION>/X_test.npz` (yoki Engineered_data)
lar bilan **offline** predict + (agar bo‘lsa) testda tezkor baholash.

**Eslatma:** Bu notebook `pandas .style` ishlatmaydi (jinja2 kerak emas). Hammasi `tabulate`.


In [None]:
from __future__ import annotations

from pathlib import Path
import json
import numpy as np
import pandas as pd
from scipy import sparse
import joblib
from tabulate import tabulate
import zipfile

# =========================
# CONFIG (kerak bo‘lsa o‘zgartiring)
# =========================
VERSION = "fe_v1_fs_chi2_v1"          # Data/Feature_Selected yoki Data/Engineered_data ichidagi versiya
PREFER_FEATURE_SELECTED = True        # True -> Feature_Selected, bo‘lmasa Engineered_data

MODEL_NAME = "optuna_logreg_best"     # Models/best_model/<MODEL_NAME> folder
TEXT_COL_FALLBACK = "REAC_pt_symptom" # agar raw csv dan text ko‘rsatmoqchi bo‘lsangiz

N_SHOW = 10                          # random offline sample count
RANDOM_STATE = 42

def find_project_root(start: Path | None = None) -> Path:
    start = start or Path.cwd()
    # Data/Raw_data yoki Data/Processed bo‘lsa project root deb olamiz
    for p in [start] + list(start.parents):
        data = p / "Data"
        raw = data / "Raw_data"
        processed = data / "Processed"
        models = p / "Models"
        if (raw.exists() and any(raw.iterdir())) or (processed.exists() and any(processed.iterdir())) or models.exists():
            return p
    return start

def find_data_dir(project_root: Path, version: str, prefer_fs: bool = True) -> tuple[Path, str]:
    fs = project_root / "Data" / "Feature_Selected" / version
    eng = project_root / "Data" / "Engineered_data" / version
    if prefer_fs and (fs / "X_test.npz").exists(): return fs, "Feature_Selected"
    if (eng / "X_test.npz").exists(): return eng, "Engineered_data"
    if (fs / "X_test.npz").exists(): return fs, "Feature_Selected"
    raise FileNotFoundError(f"X_test.npz topilmadi: {fs} yoki {eng}")

PROJECT_ROOT = find_project_root()
DATA_DIR, DATA_SOURCE = find_data_dir(PROJECT_ROOT, VERSION, PREFER_FEATURE_SELECTED)

MODEL_DIR = PROJECT_ROOT / "Models" / "best_model" / MODEL_NAME

print("PROJECT_ROOT:", PROJECT_ROOT.resolve())
print("DATA_SOURCE :", DATA_SOURCE)
print("DATA_DIR    :", DATA_DIR.resolve())
print("MODEL_DIR   :", MODEL_DIR.resolve())


In [None]:
# =========================
# 1) Load model + thresholds
# =========================
MODEL_PATH = MODEL_DIR / f"{MODEL_NAME}.joblib"
THR_PATH   = MODEL_DIR / f"{MODEL_NAME}_thresholds.json"

if not MODEL_PATH.exists():
    raise FileNotFoundError(f"Model topilmadi: {MODEL_PATH}")
if not THR_PATH.exists():
    raise FileNotFoundError(f"Thresholds topilmadi: {THR_PATH}")

model = joblib.load(MODEL_PATH)

with open(THR_PATH, "r", encoding="utf-8") as f:
    thr_dict = json.load(f)

print("Loaded model:", type(model))
print("Threshold keys:", len(thr_dict))


In [None]:
# =========================
# 2) Load X/Y + ids + meta
# =========================
META_PATH = DATA_DIR / "engineered_meta.json"
if not META_PATH.exists():
    raise FileNotFoundError(f"engineered_meta.json topilmadi: {META_PATH}")

with open(META_PATH, "r", encoding="utf-8") as f:
    meta = json.load(f)

y_cols = meta["y_cols"]  # masalan: ["y_gastrointestinal", ...]
label_names = [c.replace("y_", "", 1) for c in y_cols]

# Threshold vector (y_cols tartibida)
thr = np.array([float(thr_dict.get(name, 0.5)) for name in label_names], dtype=np.float32)

X_test = sparse.load_npz(DATA_DIR / "X_test.npz").tocsr()
Y_test_path = DATA_DIR / "Y_test.npy"
Y_test = np.load(Y_test_path) if Y_test_path.exists() else None

ids_test_path = DATA_DIR / "ids_test.csv"
ids_test = pd.read_csv(ids_test_path) if ids_test_path.exists() else None

print("X_test:", X_test.shape)
print("Y_test:", None if Y_test is None else Y_test.shape)
print("ids_test:", None if ids_test is None else ids_test.shape)
print("labels:", len(label_names))


In [None]:
# =========================
# 3) Helpers: scoring, thresholds, metrics
# =========================
def prf_from_counts(tp: int, fp: int, fn: int):
    prec = tp / (tp + fp) if (tp + fp) > 0 else 0.0
    rec  = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    f1   = (2 * prec * rec / (prec + rec)) if (prec + rec) > 0 else 0.0
    return prec, rec, f1

def multilabel_micro_macro(Y_true: np.ndarray, Y_pred: np.ndarray) -> dict:
    tp = int(((Y_true == 1) & (Y_pred == 1)).sum())
    fp = int(((Y_true == 0) & (Y_pred == 1)).sum())
    fn = int(((Y_true == 1) & (Y_pred == 0)).sum())
    _, _, micro_f1 = prf_from_counts(tp, fp, fn)

    f1s = []
    for j in range(Y_true.shape[1]):
        y = Y_true[:, j]
        p = Y_pred[:, j]
        tpj = int(((y == 1) & (p == 1)).sum())
        fpj = int(((y == 0) & (p == 1)).sum())
        fnj = int(((y == 1) & (p == 0)).sum())
        _, _, f1j = prf_from_counts(tpj, fpj, fnj)
        f1s.append(f1j)

    return {"micro_f1": float(micro_f1), "macro_f1": float(np.mean(f1s))}

def score_matrix(model, X):
    # OVR(LogReg) -> predict_proba bo‘ladi
    if hasattr(model, "predict_proba"):
        s = model.predict_proba(X)
        # sklearn OVR proba: list of arrays OR array; unify
        s = np.asarray(s)
        return s, "proba"
    if hasattr(model, "decision_function"):
        return np.asarray(model.decision_function(X)), "score"
    return np.asarray(model.predict(X)), "binary"

def apply_thresholds(scores: np.ndarray, thr: np.ndarray) -> np.ndarray:
    return (scores >= thr.reshape(1, -1)).astype(np.int8)

def per_label_table(Y_true: np.ndarray, Y_pred: np.ndarray, label_names: list[str]) -> pd.DataFrame:
    rows = []
    for j, name in enumerate(label_names):
        y = Y_true[:, j]
        p = Y_pred[:, j]
        tp = int(((y == 1) & (p == 1)).sum())
        fp = int(((y == 0) & (p == 1)).sum())
        fn = int(((y == 1) & (p == 0)).sum())
        tn = int(((y == 0) & (p == 0)).sum())
        prec, rec, f1 = prf_from_counts(tp, fp, fn)
        rows.append({
            "label": name,
            "support": int(y.sum()),
            "precision": prec,
            "recall": rec,
            "f1": f1,
            "tp": tp, "fp": fp, "fn": fn, "tn": tn
        })
    df = pd.DataFrame(rows).sort_values("f1", ascending=False).reset_index(drop=True)
    return df


In [None]:
# =========================
# 4) Offline TEST evaluation (agar Y_test bor bo‘lsa)
# =========================
scores_test, score_type = score_matrix(model, X_test)

# score_type == "binary" bo‘lsa thresholds kerak emas, lekin biz baribir apply qilamiz:
Y_pred_test = apply_thresholds(scores_test, thr) if score_type in ("proba","score") else scores_test.astype(np.int8)

print("score_type:", score_type)
print("scores_test:", scores_test.shape)

if Y_test is not None:
    summary = multilabel_micro_macro(Y_test, Y_pred_test)
    print("TEST summary:", summary)

    per_label = per_label_table(Y_test, Y_pred_test, label_names)
    print("\nTop 15 labels by F1 (TEST):")
    print(tabulate(per_label.head(15), headers="keys", tablefmt="github", showindex=False, floatfmt=".6f"))
else:
    print("Y_test topilmadi -> faqat prediction ko‘rsatamiz.")


In [None]:
# =========================
# 5) Offline preview: random sample (ids_test bo‘lsa primaryid bilan)
# =========================
rng = np.random.default_rng(RANDOM_STATE)
n = X_test.shape[0]
idx = rng.choice(n, size=min(N_SHOW, n), replace=False)

def labels_from_row(scores_row: np.ndarray, thr: np.ndarray, names: list[str]) -> list[str]:
    return [names[j] for j in range(len(names)) if scores_row[j] >= thr[j]]

def topk(scores_row: np.ndarray, names: list[str], k: int = 8):
    j = np.argsort(scores_row)[::-1][:k]
    return [(names[int(i)], float(scores_row[int(i)])) for i in j]

rows = []
for i in idx:
    s = scores_test[i]
    pred_labels = labels_from_row(s, thr, label_names)
    top = topk(s, label_names, k=6)

    row = {
        "row_idx": int(i),
        "pred_n": len(pred_labels),
        "pred_labels": "; ".join(pred_labels[:10]) + (" ..." if len(pred_labels) > 10 else ""),
        "top6": "; ".join([f"{a}:{b:.3f}" for a,b in top]),
    }
    if ids_test is not None:
        # odatda primaryid bor bo‘ladi
        for key in ["primaryid","caseid","caseversion"]:
            if key in ids_test.columns:
                row[key] = ids_test.loc[i, key]
    rows.append(row)

df_preview = pd.DataFrame(rows)
print(tabulate(df_preview, headers="keys", tablefmt="github", showindex=False))


In [None]:
# =========================
# 6) Raw CSV (zip) dan text/metadata ko‘rsatish (ixtiyoriy)
#   - Siz upload qilgan: faers_25Q4_targets_multilabel_v2.zip
#   - Agar projectda Data/Raw_data ichida bo‘lsa ham ishlaydi
# =========================
RAW_ZIP_CANDIDATES = [
    PROJECT_ROOT / "Data" / "Raw_data" / "faers_25Q4_targets_multilabel_v2.zip",
    PROJECT_ROOT / "Data" / "Raw_data" / "faers_25Q4_targets_multilabel_v2.csv",
    Path("faers_25Q4_targets_multilabel_v2.zip"),
    Path("faers_25Q4_targets_multilabel_v2.csv"),
]

raw_path = None
for p in RAW_ZIP_CANDIDATES:
    if p.exists():
        raw_path = p
        break

raw_df = None
if raw_path is None:
    print("Raw data topilmadi (ok). Agar kerak bo‘lsa zip/csv ni Data/Raw_data ga qo‘ying.")
else:
    print("Raw path:", raw_path.resolve())
    if raw_path.suffix.lower() == ".csv":
        raw_df = pd.read_csv(raw_path)
    else:
        with zipfile.ZipFile(raw_path) as zf:
            # zip ichidagi 1chi csv
            name = [n for n in zf.namelist() if n.lower().endswith(".csv")][0]
            with zf.open(name) as f:
                raw_df = pd.read_csv(f)

    print("raw_df:", raw_df.shape)

# Agar raw_df bo‘lsa: preview jadvalidagi primaryid bo‘yicha text ko‘rsatamiz
if raw_df is not None and ids_test is not None and "primaryid" in ids_test.columns and "primaryid" in raw_df.columns:
    # 1) birinchi preview row primaryid'ni olamiz
    pid = df_preview.iloc[0].get("primaryid", None)
    if pid is not None:
        one = raw_df[raw_df["primaryid"] == pid]
        if len(one) > 0:
            one = one.iloc[0]
            # text col tanlash
            candidates = ["REAC_pt_symptom_v2","REAC_pt_symptom","REAC_pt","REAC_pt_symptom_v2"]
            text_col = next((c for c in candidates if c in raw_df.columns), TEXT_COL_FALLBACK)
            print("\n--- RAW VIEW (1 sample) ---")
            cols_show = [c for c in ["primaryid","caseid","caseversion","DRUG_drugname","DRUG_prod_ai",text_col,"y_labels"] if c in raw_df.columns]
            print(one[cols_show])
        else:
            print("raw_df ichida bu primaryid topilmadi:", pid)


In [None]:
# =========================
# 7) Single lookup (primaryid yoki row index)
# =========================
# 1) primaryid bilan qidirish:
QUERY_PRIMARYID = None  # masalan: 1012809821

# 2) yoki to‘g‘ridan-to‘g‘ri row_idx:
QUERY_ROW_IDX = None    # masalan: 123

def show_one_by_row(i: int):
    s = scores_test[i]
    pred_labels = labels_from_row(s, thr, label_names)
    top = topk(s, label_names, k=12)

    print("\n" + "="*90)
    print("OFFLINE PREDICTION")
    print("="*90)
    if ids_test is not None:
        meta_cols = [c for c in ["primaryid","caseid","caseversion"] if c in ids_test.columns]
        if meta_cols:
            print("IDs:", ids_test.loc[i, meta_cols].to_dict())
    print("pred_n:", len(pred_labels))
    print("pred_labels:", "; ".join(pred_labels))
    print("top12:", "; ".join([f"{a}:{b:.4f}" for a,b in top]))

    if Y_test is not None:
        true_labels = [label_names[j] for j in range(len(label_names)) if Y_test[i, j] == 1]
        print("true_n:", len(true_labels))
        print("true_labels:", "; ".join(true_labels))

# primaryid -> row
if QUERY_PRIMARYID is not None and ids_test is not None and "primaryid" in ids_test.columns:
    hits = ids_test.index[ids_test["primaryid"] == QUERY_PRIMARYID].tolist()
    if not hits:
        print("primaryid topilmadi:", QUERY_PRIMARYID)
    else:
        show_one_by_row(int(hits[0]))
elif QUERY_ROW_IDX is not None:
    show_one_by_row(int(QUERY_ROW_IDX))
else:
    print("QUERY_PRIMARYID yoki QUERY_ROW_IDX bering (ixtiyoriy).")


## Done
Agar keyingi bosqichda siz **yangi (unseen) raw record** dan to‘g‘ridan-to‘g‘ri predict qilishni xohlasangiz (X_test.npz bo‘lmagan holat),
unda preprocessing artefaktlari (TFIDF vectorizer + feature selector) ham saqlangan bo‘lishi kerak.
Aytasiz — men shu notebookga o‘sha qismini ham aniq qo‘shib beraman (faqat sizda artefaktlar qayerda ekanini bilishimiz kerak).