# MRI Tumor Classifier (Multi-Slice MVP) — Build Notebook

**Purpose:** Reproducible Kaggle notebook that writes a small Python package (`mri-mvp`), installs it, runs smoke tests, and launches a **public Gradio demo**.

**Run policy:** Run cells top-to-bottom. Only re-run the launch after running the reset cell.


## 1. Project scaffold (filesystem + package folders)


In [4]:
# CELL: 00_MODEL_BUNDLE_PATHS - Locate model/policy artifacts in Kaggle inputs

from pathlib import Path

INPUT_ROOT = Path("/kaggle/input")
print("Mounted inputs:")
for p in sorted(INPUT_ROOT.iterdir()):
    if p.is_dir():
        print(" -", p.name)

NEEDED = [
    "best_model.pth",
    "temperature_scaling.json",
    "final_policy_config.json",
    "domain_guard_lr.npz",
]

def find(name: str) -> Path | None:
    hits = list(INPUT_ROOT.rglob(name))
    if not hits:
        return None
    hits = sorted(hits, key=lambda p: (len(p.parts), str(p)))
    return hits[0]

paths = {n: find(n) for n in NEEDED}
print("\nResolved paths:")
for k,v in paths.items():
    print(f"{k}: {v}")

missing = [k for k,v in paths.items() if v is None]
assert not missing, f"Missing artifacts: {missing}. Attach/upload the model bundle."

MODEL_BUNDLE_DIR = paths["best_model.pth"].parent
print("\nMODEL_BUNDLE_DIR:", MODEL_BUNDLE_DIR)


Mounted inputs:
 - datasets

Resolved paths:
best_model.pth: /kaggle/input/datasets/kabomolefe/mri-model-bundle-v1/best_model.pth
temperature_scaling.json: /kaggle/input/datasets/kabomolefe/mri-model-bundle-v1/temperature_scaling.json
final_policy_config.json: /kaggle/input/datasets/kabomolefe/mri-model-bundle-v1/final_policy_config.json
domain_guard_lr.npz: /kaggle/input/datasets/kabomolefe/mri-model-bundle-v1/domain_guard_lr.npz

MODEL_BUNDLE_DIR: /kaggle/input/datasets/kabomolefe/mri-model-bundle-v1


In [5]:
# CELL: 01_SCAFFOLD
!pwd
!mkdir -p /kaggle/working/mri-mvp/src/mri/{io,preprocess,qc,infer,serving,application} /kaggle/working/mri-mvp/tests /kaggle/working/mri-mvp/scripts
!touch /kaggle/working/mri-mvp/src/mri/__init__.py
!touch /kaggle/working/mri-mvp/src/mri/io/__init__.py
!touch /kaggle/working/mri-mvp/src/mri/preprocess/__init__.py
!touch /kaggle/working/mri-mvp/src/mri/qc/__init__.py
!touch /kaggle/working/mri-mvp/src/mri/infer/__init__.py
!touch /kaggle/working/mri-mvp/src/mri/serving/__init__.py
!touch /kaggle/working/mri-mvp/src/mri/application/__init__.py


/kaggle/working


## 2. Dependencies (pinned for reproducibility)

> After this cell, Kaggle may prompt you to restart the kernel. Do it once, then continue.


In [6]:
# CELL: 02_INSTALL_DEPS (PINNED)
%pip install -q \
  "gradio==5.49.1" \
  "httpx==0.28.1" \
  "httpcore==1.0.9" \
  "fastapi" \
  "uvicorn" \
  "pydicom" \
  "pandas"


[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m68.6/68.6 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m444.8/444.8 kB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m:00:01[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m34.9 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
google-adk 1.22.1 requires google-cloud-bigquery-storage>=2.0.0, which is not installed.
langchain-core 0.3.79 requires packaging<26.0.0,>=23.2.0, but you have packaging 26.0rc2 which is incompatible.
fastai 2.8.4 requires fastcore<1.9,>=1.8.0, but you have fastcore 1.11.3 which is incompatible.[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.


## 3. Source modules (written into `/kaggle/working/mri-mvp/src/mri/...`)


### 3.1 IO — upload ingestion (images + DICOM ZIP + nested ZIP)


In [7]:
%%writefile /kaggle/working/mri-mvp/src/mri/io/load_case.py
# CELL: 03_LOAD_CASE (IO loader)
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Dict, Any, Tuple
import os
import zipfile
import tempfile

import numpy as np
from PIL import Image
import pydicom


@dataclass
class SliceRecord:
    image: Image.Image
    meta: Dict[str, Any]


def _looks_like_dicom(path: str) -> bool:
    try:
        ds = pydicom.dcmread(path, stop_before_pixels=True, force=True)
        return hasattr(ds, "SeriesInstanceUID") or hasattr(ds, "StudyInstanceUID") or hasattr(ds, "SOPClassUID")
    except Exception:
        return False


def _to_uint8(arr: np.ndarray) -> np.ndarray:
    a = arr.astype(np.float32)
    lo, hi = np.percentile(a, (1, 99))
    if not np.isfinite(lo) or not np.isfinite(hi) or hi <= lo:
        lo = float(a.min())
        hi = float(a.max() if a.max() > a.min() else a.min() + 1.0)
    a = np.clip((a - lo) / (hi - lo), 0.0, 1.0)
    return (a * 255.0).astype(np.uint8)


def _load_dicom_series(paths: List[str]) -> Tuple[List[SliceRecord], List[str]]:
    warnings: List[str] = []
    items: List[SliceRecord] = []

    def _inst(p: str):
        try:
            ds = pydicom.dcmread(p, stop_before_pixels=True, force=True)
            v = getattr(ds, "InstanceNumber", None)
            return int(v) if v is not None else 10**9
        except Exception:
            return 10**9

    paths = sorted(paths, key=_inst)

    for p in paths:
        try:
            ds = pydicom.dcmread(p, force=True)
            if not hasattr(ds, "pixel_array"):
                continue
            arr = ds.pixel_array

            photo = str(getattr(ds, "PhotometricInterpretation", "")).upper()
            if photo == "MONOCHROME1":
                arr = arr.max() - arr

            arr8 = _to_uint8(arr)
            img = Image.fromarray(arr8).convert("RGB")
            items.append(SliceRecord(image=img, meta={"path": p}))
        except Exception as e:
            warnings.append(f"Skipped slice (decode failed): {os.path.basename(p)} | {type(e).__name__}")

    if not items:
        warnings.append("No decodable DICOM pixel data found (might be compressed).")

    return items, warnings


def _load_images(paths: List[str]) -> Tuple[List[SliceRecord], List[str]]:
    warnings: List[str] = []
    out: List[SliceRecord] = []
    for p in paths:
        try:
            img = Image.open(p).convert("RGB")
            out.append(SliceRecord(image=img, meta={"path": p}))
        except Exception:
            warnings.append(f"Could not open as image: {os.path.basename(p)}")
    return out, warnings


def load_case(filepaths: List[str]) -> Tuple[List[SliceRecord], Dict[str, Any]]:
    warnings: List[str] = []
    filepaths = [str(p) for p in (filepaths or [])]

    if not filepaths:
        return [], {"warnings": ["No files provided."]}

    # ZIP case (supports nested zips)
    if len(filepaths) == 1 and filepaths[0].lower().endswith(".zip"):
        zip_path = filepaths[0]

        with tempfile.TemporaryDirectory() as tmp:
            # extract outer zip
            with zipfile.ZipFile(zip_path, "r") as z:
                z.extractall(tmp)

            # find nested zips (depth=1) and extract them too
            nested = []
            for root, _, files in os.walk(tmp):
                for fn in files:
                    if fn.lower().endswith(".zip"):
                        nested.append(os.path.join(root, fn))

            if nested:
                warnings.append(f"Found nested zip(s): {[os.path.basename(n) for n in nested[:5]]}")
                for nz in nested[:5]:
                    sub = os.path.join(tmp, "_nested_" + os.path.basename(nz))
                    os.makedirs(sub, exist_ok=True)
                    try:
                        with zipfile.ZipFile(nz, "r") as z2:
                            z2.extractall(sub)
                    except Exception as e:
                        warnings.append(f"Could not extract nested zip {os.path.basename(nz)}: {type(e).__name__}")

            # collect all non-zip files (including extracted nested contents)
            all_files = []
            for root, _, files in os.walk(tmp):
                for fn in files:
                    if fn.lower().endswith(".zip"):
                        continue
                    p = os.path.join(root, fn)
                    try:
                        if os.path.getsize(p) < 128:
                            continue
                    except Exception:
                        continue
                    all_files.append(p)

            dicom_paths = [p for p in all_files if _looks_like_dicom(p)]

            if not dicom_paths:
                warnings.append("ZIP contained no DICOM files (detected by header).")
                warnings.append(f"Sample files: {[os.path.relpath(p, tmp) for p in all_files[:25]]}")
                return [], {"warnings": warnings}

            # IMPORTANT: load happens INSIDE tempdir context (prevents FileNotFoundError)
            slices, w2 = _load_dicom_series(dicom_paths)
            warnings.extend(w2)
            return slices, {"warnings": warnings}

    # Otherwise treat as regular images
    slices, w = _load_images(filepaths)
    warnings.extend(w)
    return slices, {"warnings": warnings}


Writing /kaggle/working/mri-mvp/src/mri/io/load_case.py


### 3.2 Preprocess — crop/pad + normalization helpers


In [8]:

%%writefile /kaggle/working/mri-mvp/src/mri/preprocess/core.py
# CELL: 04_PREPROCESS
from __future__ import annotations
from typing import Dict, Any, Tuple
import numpy as np
from PIL import Image


def crop_pad_square(img: Image.Image, margin: int = 8) -> Tuple[Image.Image, Dict[str, Any]]:
    """
    Robust ROI crop (foreground bbox) + square pad.
    Returns processed image + metadata.

    Why: Prevents "tiny brain" (too much black border) and prevents distortion.
    """
    meta: Dict[str, Any] = {"ok": True, "reason": None, "bbox": None}

    try:
        rgb = img.convert("RGB")
        gray = rgb.convert("L")
        arr = np.array(gray)

        # Dynamic threshold: more robust across varying contrasts than a hard-coded number
        thr = np.percentile(arr, 20)  # background-ish percentile
        thr = max(10, thr)            # avoid thresholds that are too low
        mask = arr > thr

        coords = np.argwhere(mask)
        if coords.size == 0:
            meta["ok"] = False
            meta["reason"] = "no_foreground_detected"
            return rgb, meta

        y0, x0 = coords.min(axis=0)
        y1, x1 = coords.max(axis=0) + 1

        # Add margin (clamped)
        y0 = max(0, y0 - margin)
        x0 = max(0, x0 - margin)
        y1 = min(arr.shape[0], y1 + margin)
        x1 = min(arr.shape[1], x1 + margin)

        meta["bbox"] = (int(x0), int(y0), int(x1), int(y1))

        cropped = rgb.crop((x0, y0, x1, y1))

        # Pad to square
        w, h = cropped.size
        s = max(w, h)
        canvas = Image.new("RGB", (s, s), (0, 0, 0))
        canvas.paste(cropped, ((s - w) // 2, (s - h) // 2))

        return canvas, meta

    except Exception:
        meta["ok"] = False
        meta["reason"] = "preprocess_exception"
        return img.convert("RGB"), meta


Writing /kaggle/working/mri-mvp/src/mri/preprocess/core.py


### 3.3 QC — basic slice validity checks (incl. grayscale gate)


In [9]:
%%writefile /kaggle/working/mri-mvp/src/mri/qc/basic.py
# CELL: 05_QC
from __future__ import annotations
from typing import Dict, Any, Tuple
import numpy as np
from PIL import Image

def qc_slice(img: Image.Image, min_foreground_ratio: float = 0.05, min_std: float = 5.0) -> Tuple[bool, Dict[str, Any]]:
    """
    Basic QC to reject garbage slices before the model sees them.

    Added:
    - grayscale_score gate: rejects colorful screenshots/photos
    """
    rgb = img.convert("RGB")
    gray = rgb.convert("L")
    arr = np.array(gray).astype(np.float32)

    # Existing QC
    fg_ratio = float((arr > 10).mean())
    std = float(arr.std())
    mean = float(arr.mean())

    # NEW: "Is this grayscale-like?"
    rgb_arr = np.array(rgb).astype(np.float32)  # HxWx3
    # per-pixel channel spread, then average
    grayscale_score = float(np.mean(np.std(rgb_arr, axis=2)))  # 0 for perfect grayscale

    ok = True
    reasons = []

    if fg_ratio < min_foreground_ratio:
        ok = False
        reasons.append("too_much_background")

    if std < min_std:
        ok = False
        reasons.append("low_contrast")

    # Tune threshold: start strict to block screenshots
    # Typical MRI grayscale_score is near ~0–2; screenshots much higher.
    if grayscale_score > 3.0:
        ok = False
        reasons.append("not_grayscale_like")

    return ok, {
        "foreground_ratio": fg_ratio,
        "mean": mean,
        "std": std,
        "grayscale_score": grayscale_score,
        "reasons": reasons,
    }


Writing /kaggle/working/mri-mvp/src/mri/qc/basic.py


### 3.4 Inference core — model load + per-slice + case aggregation


In [10]:
%%writefile /kaggle/working/mri-mvp/src/mri/infer/predictor.py
# CELL: 06_PREDICTOR
"""
Predictor + policy engine

Goals:
- Accept *either* a raw state_dict checkpoint (legacy MVP) OR the training notebook's dict checkpoint.
- Drive label order, temperature scaling, and abstain/OOD policy from training artifacts when available.
- Always return a structured result: ACCEPT / ABSTAIN_* with reason codes (never "no output").
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Tuple
import os
import json

import torch
import torch.nn as nn
from torchvision import models, transforms
from PIL import Image

from mri.preprocess.core import crop_pad_square
from mri.qc.basic import qc_slice


IMG_SIZE = 224

# --- safe legacy fallback ONLY (used when no label_map is present) ---
LEGACY_LABELS = ["Glioma", "Meningioma", "No Tumor", "Pituitary"]


@dataclass(frozen=True)
class DomainGuard:
    """Binary logistic regression p(in_domain) = sigmoid(w·z + b)."""
    w: torch.Tensor  # (D,) on CPU
    b: float         # scalar

    def score(self, z: torch.Tensor) -> float:
        # z: (D,) CPU tensor
        x = float(torch.dot(self.w, z).item() + self.b)
        p = 1.0 / (1.0 + float(torch.exp(torch.tensor(-x)).item()))
        return float(p)


@dataclass
class ModelBundle:
    model: nn.Module
    feat_extractor: nn.Module
    labels: List[str]                  # index -> label string
    label_map_raw: Optional[Dict[str, str]]
    num_classes: int
    temperature: float                 # T (>= 0.05)
    tau_conf: Optional[float]
    tau_domain: Optional[float]
    domain_guard: Optional[DomainGuard]
    source_dir: str


def _read_json(path: str) -> Optional[dict]:
    try:
        with open(path, "r") as f:
            return json.load(f)
    except Exception:
        return None


def _clean_state_dict(sd: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
    # Support DataParallel checkpoints
    return {k.replace("module.", ""): v for k, v in sd.items()}


def _resolve_bundle_dir(model_ref: str) -> Tuple[str, str]:
    """
    Returns (bundle_dir, ckpt_path).
    - If model_ref is a directory, look for best_model.pth inside.
    - If model_ref is a file, treat its parent as bundle_dir.
    """
    if os.path.isdir(model_ref):
        bundle_dir = model_ref
        ckpt_path = os.path.join(bundle_dir, "best_model.pth")
        if not os.path.exists(ckpt_path):
            # allow alternate naming if user copied it
            alt = os.path.join(bundle_dir, "mri_resnet18_baseline_best.pth")
            if os.path.exists(alt):
                ckpt_path = alt
        return bundle_dir, ckpt_path

    bundle_dir = os.path.dirname(model_ref) or "."
    return bundle_dir, model_ref


def load_bundle(model_ref: str, device: torch.device) -> ModelBundle:
    """
    Loads a model bundle.
    model_ref can be:
      - path to .pth (legacy MVP state_dict OR training dict checkpoint)
      - directory containing best_model.pth + policy artifacts

    Policy artifacts (optional but preferred):
      - temperature_scaling.json   {"temperature": T, ...}
      - final_policy_config.json   {"tau_conf": ..., "tau_domain": ..., "temperature_T": ...}
      - domain_guard_lr.npz        {"coef": (1,D), "intercept": (1,)}  OR {"w": (D,), "b": ()}
    """
    if not os.path.exists(model_ref):
        raise RuntimeError(f"Model reference not found: {model_ref}")

    bundle_dir, ckpt_path = _resolve_bundle_dir(model_ref)

    if not os.path.exists(ckpt_path):
        raise RuntimeError(f"Checkpoint not found: {ckpt_path}")

    # Sanity: avoid tiny invalid files (e.g., pointer)
    size_mb = os.path.getsize(ckpt_path) / (1024 * 1024)
    if size_mb < 10:
        raise RuntimeError(
            f"Checkpoint looks invalid (too small: {size_mb:.2f} MB): {ckpt_path}"
        )

    raw = torch.load(ckpt_path, map_location="cpu")

    # Determine checkpoint shape
    if isinstance(raw, dict) and "model_state_dict" in raw:
        sd = raw["model_state_dict"]
        label_map = raw.get("label_map")  # often {0:"glioma",...} but JSON may coerce keys to str elsewhere
        num_classes = int(raw.get("num_classes", 4))
    elif isinstance(raw, dict):
        # Legacy: assume it's a raw state_dict
        sd = raw
        label_map = None
        num_classes = 4
    else:
        raise RuntimeError(f"Unsupported checkpoint format: type={type(raw)}")

    sd = _clean_state_dict(sd)

    # Build model
    model = models.resnet18(weights=None)
    model.fc = nn.Linear(model.fc.in_features, num_classes)
    model.load_state_dict(sd, strict=True)
    model.to(device).eval()

    # Feature extractor = penultimate pooled features
    feat_extractor = nn.Sequential(*list(model.children())[:-1]).to(device).eval()

    # Labels from training artifact if possible
    labels: List[str]
    label_map_raw: Optional[Dict[str, str]] = None

    if isinstance(label_map, dict) and len(label_map) == num_classes:
        # normalize keys to int order
        label_map_raw = {str(k): str(v) for k, v in label_map.items()}
        labels = [label_map_raw[str(i)] for i in range(num_classes)]
    else:
        labels = LEGACY_LABELS[:num_classes]

    # Load temperature
    T = 1.0
    ts = _read_json(os.path.join(bundle_dir, "temperature_scaling.json"))
    if isinstance(ts, dict) and "temperature" in ts:
        try:
            T = float(ts["temperature"])
        except Exception:
            T = 1.0
    T = float(max(0.05, min(T, 100.0)))

    # Load policy thresholds
    tau_conf = None
    tau_domain = None
    pol = _read_json(os.path.join(bundle_dir, "final_policy_config.json"))
    if isinstance(pol, dict):
        if "tau_conf" in pol:
            try:
                tau_conf = float(pol["tau_conf"])
            except Exception:
                tau_conf = None
        if "tau_domain" in pol:
            try:
                tau_domain = float(pol["tau_domain"])
            except Exception:
                tau_domain = None
        # If policy config includes temperature_T, prefer it over temperature_scaling.json
        if "temperature_T" in pol:
            try:
                T = float(pol["temperature_T"])
                T = float(max(0.05, min(T, 100.0)))
            except Exception:
                pass

    # Load domain guard weights (no sklearn dependency)
    domain_guard = None
    npz_path = os.path.join(bundle_dir, "domain_guard_lr.npz")
    if os.path.exists(npz_path):
        try:
            import numpy as np
            d = np.load(npz_path)
            if "coef" in d and "intercept" in d:
                coef = d["coef"].astype("float32")
                intercept = d["intercept"].astype("float32")
                w = torch.tensor(coef.reshape(-1), dtype=torch.float32)  # (D,)
                b = float(intercept.reshape(-1)[0])
                domain_guard = DomainGuard(w=w, b=b)
            elif "w" in d and "b" in d:
                w = torch.tensor(d["w"].astype("float32").reshape(-1), dtype=torch.float32)
                b = float(d["b"].reshape(-1)[0])
                domain_guard = DomainGuard(w=w, b=b)
        except Exception:
            domain_guard = None

    return ModelBundle(
        model=model,
        feat_extractor=feat_extractor,
        labels=labels,
        label_map_raw=label_map_raw,
        num_classes=num_classes,
        temperature=T,
        tau_conf=tau_conf,
        tau_domain=tau_domain,
        domain_guard=domain_guard,
        source_dir=bundle_dir,
    )


_val_tf = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])


def _probs_from_logits(logits_1d: torch.Tensor) -> List[float]:
    p = torch.softmax(logits_1d, dim=-1).detach().cpu().tolist()
    return [float(x) for x in p]


def _embed_1d(feat_extractor: nn.Module, x: torch.Tensor) -> torch.Tensor:
    # x: (1,3,H,W) on device
    with torch.no_grad():
        z = feat_extractor(x)  # (1,512,1,1)
    z = z.view(z.shape[0], -1)[0].detach().cpu()  # (512,) CPU
    return z


def predict_case(
    bundle: ModelBundle,
    slices: List[Image.Image],
    device: torch.device,
    min_valid_slices: int = 3,
    abstain_agree_threshold: float = 0.50,
) -> Dict[str, Any]:
    """
    Multi-slice inference with policy:
      - QC-gate slices
      - aggregate logits + embeddings across valid slices
      - apply temperature scaling (if available)
      - apply policy: ABSTAIN if (p_in_domain < tau_domain) OR (max_prob_cal < tau_conf)
    """
    labels = bundle.labels
    k = bundle.num_classes

    processed_images: List[Image.Image] = []
    per_slice: List[Dict[str, Any]] = []
    logits_list: List[torch.Tensor] = []
    emb_list: List[torch.Tensor] = []

    for idx, img in enumerate(slices):
        proc, pmeta = crop_pad_square(img)
        processed_images.append(proc)

        ok, qcmeta = qc_slice(proc)

        record: Dict[str, Any] = {
            "slice_index": idx,
            "preprocess_ok": bool(pmeta.get("ok", True)),
            "preprocess_reason": pmeta.get("reason"),
            "bbox": pmeta.get("bbox"),
            "qc_ok": bool(ok),
            "qc": qcmeta,
            "top_label": None,
            "top_conf": None,
        }

        if not ok:
            per_slice.append(record)
            continue

        x = _val_tf(proc).unsqueeze(0).to(device)

        with torch.no_grad():
            logits = bundle.model(x)[0]  # (K,)

        probs = _probs_from_logits(logits)
        top_i = int(torch.argmax(torch.tensor(probs)).item())

        record["probs"] = {labels[i]: float(probs[i]) for i in range(k)}
        record["top_label"] = labels[top_i]
        record["top_conf"] = float(probs[top_i])

        per_slice.append(record)
        logits_list.append(logits)

        # embedding (CPU)
        try:
            emb = _embed_1d(bundle.feat_extractor, x)
            emb_list.append(emb)
        except Exception:
            # Do not fail inference if embedding path breaks
            pass

    result: Dict[str, Any] = {
        "status": "ABSTAIN",
        "abstain_type": "QC",
        "abstain_reason": None,
        "case_prediction": None,
        "case_probs": None,
        "valid_slices": len(logits_list),
        "agree_rate": 0.0,
        "top_conf": None,
        "p_in_domain": None,
        "thresholds": {
            "tau_conf": bundle.tau_conf,
            "tau_domain": bundle.tau_domain,
            "temperature": bundle.temperature,
        },
        "per_slice": per_slice,
        "processed_images": processed_images,
    }

    if len(logits_list) < min_valid_slices:
        result["abstain_reason"] = f"too_few_valid_slices<{min_valid_slices}"
        return result

    # Aggregate logits (stable) and embed
    case_logits = torch.stack(logits_list, dim=0).mean(dim=0)  # (K,)
    T = float(bundle.temperature) if bundle.temperature else 1.0
    case_probs_cal = _probs_from_logits(case_logits / T)

    top_i = int(torch.argmax(torch.tensor(case_probs_cal)).item())
    top_label = labels[top_i]
    top_conf = float(case_probs_cal[top_i])

    result["case_prediction"] = top_label
    result["case_probs"] = {labels[i]: float(case_probs_cal[i]) for i in range(k)}
    result["top_conf"] = top_conf

    # Disagreement: how many valid slices vote for the same top label?
    votes = [r.get("top_label") for r in per_slice if r.get("qc_ok") and r.get("top_label") is not None]
    agree = sum(1 for v in votes if v == top_label) / max(1, len(votes))
    result["agree_rate"] = float(agree)

    # Domain score (case-level) if we have a guard + embeddings
    if bundle.domain_guard is not None and len(emb_list) > 0:
        z_case = torch.stack(emb_list, dim=0).mean(dim=0)  # (D,) CPU
        p_in = bundle.domain_guard.score(z_case)
        result["p_in_domain"] = float(p_in)

    # Policy evaluation
    abstain_domain = False
    abstain_lowconf = False
    abstain_disagree = False

    if bundle.tau_domain is not None and result["p_in_domain"] is not None:
        abstain_domain = result["p_in_domain"] < float(bundle.tau_domain)

    if bundle.tau_conf is not None:
        abstain_lowconf = top_conf < float(bundle.tau_conf)

    # Optional extra guard (kept from legacy MVP): disagreement can abstain
    abstain_disagree = (agree < abstain_agree_threshold)

    if abstain_domain:
        result["status"] = "ABSTAIN"
        result["abstain_type"] = "OOD"
        result["abstain_reason"] = f"p_in_domain<{bundle.tau_domain:.3f}"
        result["case_prediction"] = None
        result["case_probs"] = {"ABSTAIN_OOD": 1.0}
        return result

    if abstain_lowconf or abstain_disagree:
        result["status"] = "ABSTAIN"
        result["abstain_type"] = "UNCERTAIN"
        reason_bits = []
        if abstain_lowconf:
            reason_bits.append(f"max_prob_cal<{bundle.tau_conf:.3f}")
        if abstain_disagree:
            reason_bits.append(f"agree<{abstain_agree_threshold:.2f}")
        result["abstain_reason"] = ",".join(reason_bits) if reason_bits else "uncertain"
        result["case_prediction"] = None
        result["case_probs"] = {"ABSTAIN_UNCERTAIN": 1.0}
        return result

    # Accept
    result["status"] = "ACCEPT"
    result["abstain_type"] = None
    result["abstain_reason"] = None
    return result


Writing /kaggle/working/mri-mvp/src/mri/infer/predictor.py


### 3.5 Application layer — `InferenceService` (clean architecture seam)


In [11]:
%%writefile /kaggle/working/mri-mvp/src/mri/application/__init__.py
# Application layer package


Overwriting /kaggle/working/mri-mvp/src/mri/application/__init__.py


In [12]:
%%writefile /kaggle/working/mri-mvp/src/mri/application/inference_service.py
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Dict

import pandas as pd
import torch
from PIL import Image

from mri.io.load_case import load_case
from mri.infer.predictor import load_bundle, predict_case, ModelBundle


class ModelNotLoadedError(RuntimeError):
    pass


@dataclass(frozen=True)
class InferenceOutputs:
    header: str
    gallery: List[Image.Image]
    case_label: Dict[str, float]
    table: pd.DataFrame
    warnings: List[str]


class InferenceService:
    """
    Application-layer orchestration:
    - loads/holds model bundle (weights + label map + policy artifacts)
    - loads case from paths
    - runs multi-slice inference + abstain/OOD policy
    - formats outputs for UI adapters
    """

    def __init__(self, model_ref: str, device: torch.device):
        self.model_ref = model_ref
        self.device = device
        self._bundle: ModelBundle | None = None
        self._model_error: str | None = None

    def load(self) -> None:
        if self._bundle is not None or self._model_error is not None:
            return
        try:
            self._bundle = load_bundle(self.model_ref, self.device)
        except Exception as e:
            self._model_error = str(e)

    def _require_bundle(self) -> ModelBundle:
        self.load()
        if self._bundle is None:
            raise ModelNotLoadedError(
                "Model bundle not loaded.\n"
                f"Expected model_ref at: {self.model_ref}\n"
                f"Error: {self._model_error}"
            )
        return self._bundle

    def predict_from_paths(self, filepaths: List[str]) -> InferenceOutputs:
        bundle = self._require_bundle()

        slices, meta = load_case(filepaths)
        if not slices:
            raise ValueError(f"No usable slices. Loader warnings: {meta.get('warnings', [])}")

        pil_slices = [s.image for s in slices]
        out = predict_case(bundle, pil_slices, device=self.device)

        # Per-slice table
        rows = []
        for r in out["per_slice"]:
            rows.append({
                "slice_index": r["slice_index"],
                "qc_ok": r["qc_ok"],
                "top_label": r.get("top_label"),
                "top_conf": r.get("top_conf"),
                "fg_ratio": r["qc"]["foreground_ratio"],
                "std": r["qc"]["std"],
                "qc_reasons": ",".join(r["qc"]["reasons"]),
            })
        df = pd.DataFrame(rows)

        # UI label block
        case_label = out["case_probs"] if out["case_probs"] else {"ABSTAIN": 1.0}

        # Header
        status = out.get("status", "ABSTAIN")
        abstain_type = out.get("abstain_type")
        abstain_reason = out.get("abstain_reason") or ""
        p_in = out.get("p_in_domain")
        top_conf = out.get("top_conf")

        header = (
            f"{status}"
            f"{'/' + str(abstain_type) if abstain_type else ''}"
            f" | valid_slices={out['valid_slices']}"
            f" | agree={out.get('agree_rate', 0):.2f}"
            f" | top_conf_cal={float(top_conf or 0):.3f}"
            f" | p_in_domain={float(p_in) if p_in is not None else 'na'}"
            f" | {abstain_reason}"
        )

        gallery = out["processed_images"][:32]
        warnings = meta.get("warnings", [])

        return InferenceOutputs(
            header=header,
            gallery=gallery,
            case_label=case_label,
            table=df,
            warnings=warnings,
        )


Writing /kaggle/working/mri-mvp/src/mri/application/inference_service.py


### 3.6 Serving — Gradio UI entrypoint (`mri.serving.app:demo`)


In [13]:
%%writefile /kaggle/working/mri-mvp/src/mri/serving/app.py
# CELL: 07_SERVING_APP
from __future__ import annotations

import os
import gradio as gr
from fastapi import FastAPI
import torch
import pandas as pd

from mri.io.load_case import load_case
from mri.infer.predictor import load_bundle, predict_case, ModelBundle

# Feature flag (safe rollout)
USE_SERVICE = os.getenv("USE_SERVICE", "1") == "1"
_service = None

# MODEL_REF can be:
#   - directory with best_model.pth + policy artifacts (preferred)
#   - path to .pth checkpoint (legacy MVP or training best_model.pth)
DEFAULT_MODEL_REF = "/kaggle/working/mri-mvp/best_metric_model.pth"
MODEL_REF = os.getenv("MODEL_REF", os.getenv("MODEL_PATH", DEFAULT_MODEL_REF))

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Eager load (gives clearer error message early)
_bundle: ModelBundle | None = None
_bundle_error: str | None = None
try:
    _bundle = load_bundle(MODEL_REF, device)
except Exception as e:
    _bundle_error = str(e)


def run_inference(files):
    # Normalize uploaded file objects to filepaths
    files = files or []
    filepaths = []
    for f in files:
        if hasattr(f, "name"):
            filepaths.append(f.name)
        elif isinstance(f, dict) and "name" in f:
            filepaths.append(f["name"])
        else:
            filepaths.append(str(f))

    if _bundle is None:
        raise gr.Error(
            "Model bundle not loaded.\n"
            f"MODEL_REF: {MODEL_REF}\n"
            f"Error: {_bundle_error}"
        )

    slices, meta = load_case(filepaths)
    if not slices:
        raise gr.Error(f"No usable slices. Loader warnings: {meta.get('warnings', [])}")

    pil_slices = [s.image for s in slices]
    out = predict_case(_bundle, pil_slices, device=device)

    rows = []
    for r in out["per_slice"]:
        rows.append({
            "slice_index": r["slice_index"],
            "qc_ok": r["qc_ok"],
            "top_label": r.get("top_label"),
            "top_conf": r.get("top_conf"),
            "fg_ratio": r["qc"]["foreground_ratio"],
            "std": r["qc"]["std"],
            "qc_reasons": ",".join(r["qc"]["reasons"]),
        })
    df = pd.DataFrame(rows)

    case_label = out["case_probs"] if out["case_probs"] else {"ABSTAIN": 1.0}

    status = out.get("status", "ABSTAIN")
    abstain_type = out.get("abstain_type")
    abstain_reason = out.get("abstain_reason") or ""
    top_conf = out.get("top_conf") or 0.0
    p_in = out.get("p_in_domain")

    header = (
        f"{status}"
        f"{'/' + str(abstain_type) if abstain_type else ''}"
        f" | valid_slices={out['valid_slices']}"
        f" | agree={out.get('agree_rate', 0):.2f}"
        f" | top_conf_cal={float(top_conf):.3f}"
        f" | p_in_domain={float(p_in) if p_in is not None else 'na'}"
        f" | {abstain_reason}"
    )

    gallery = out["processed_images"][:32]
    return header, gallery, case_label, df, meta.get("warnings", [])


demo = gr.Interface(
    fn=run_inference,
    inputs=gr.Files(label="Upload multiple images OR a ZIP containing a DICOM series"),
    outputs=[
        gr.Textbox(label="Case Status"),
        gr.Gallery(label="What the model saw (processed slices)", columns=4, rows=2),
        gr.Label(num_top_classes=4, label="Case Prediction / Abstain"),
        gr.Dataframe(label="Per-slice QC + Predictions"),
        gr.JSON(label="Loader Warnings"),
    ],
    title="MRI Tumor Classifier (Multi-Slice MVP)",
    description=(
        "Upload multiple image slices or a ZIP containing a DICOM series. "
        "Returns case-level prediction OR ABSTAIN (OOD/uncertain) with debug signals."
    )
)

app = FastAPI()
app = gr.mount_gradio_app(app, demo, path="/")


Writing /kaggle/working/mri-mvp/src/mri/serving/app.py


In [14]:
%%writefile /kaggle/working/mri-mvp/tests/test_io_nested_zip.py
from __future__ import annotations

import os
import zipfile
from pathlib import Path

import numpy as np
import pytest

# If pydicom isn't installed, these tests cannot run (and your loader won't work either).
pydicom = pytest.importorskip("pydicom")
from pydicom.dataset import FileDataset, FileMetaDataset
from pydicom.uid import ExplicitVRLittleEndian, generate_uid

from mri.io.load_case import load_case


def _write_synthetic_dicom(
    path: Path,
    *,
    instance_number: int,
    series_uid: str,
    study_uid: str,
    arr: np.ndarray,
) -> None:
    """
    Writes a minimal uncompressed MR-like DICOM slice with valid pixel data.
    File may be extensionless; loader relies on header detection.
    """
    path.parent.mkdir(parents=True, exist_ok=True)

    file_meta = FileMetaDataset()
    file_meta.TransferSyntaxUID = ExplicitVRLittleEndian
    file_meta.MediaStorageSOPClassUID = "1.2.840.10008.5.1.4.1.1.7"  # Secondary Capture
    file_meta.MediaStorageSOPInstanceUID = generate_uid()
    file_meta.ImplementationClassUID = generate_uid()

    ds = FileDataset(
        filename_or_obj=str(path),
        dataset={},
        file_meta=file_meta,
        preamble=b"\0" * 128,
    )

    # Minimal identity / modality fields (not strictly required by _looks_like_dicom, but good hygiene)
    ds.SOPClassUID = file_meta.MediaStorageSOPClassUID
    ds.SOPInstanceUID = file_meta.MediaStorageSOPInstanceUID
    ds.StudyInstanceUID = study_uid
    ds.SeriesInstanceUID = series_uid
    ds.InstanceNumber = int(instance_number)
    ds.Modality = "MR"
    ds.PatientID = "TEST_PATIENT"
    ds.PatientName = "TEST^PATIENT"

    # Pixel data fields for a single-channel image
    if arr.dtype != np.uint16:
        arr = arr.astype(np.uint16)

    rows, cols = arr.shape
    ds.Rows = int(rows)
    ds.Columns = int(cols)
    ds.SamplesPerPixel = 1
    ds.PhotometricInterpretation = "MONOCHROME2"
    ds.BitsAllocated = 16
    ds.BitsStored = 16
    ds.HighBit = 15
    ds.PixelRepresentation = 0  # unsigned
    ds.PixelData = arr.tobytes()

    # pydicom v4+ prefers explicit args at write time
    ds.save_as(
        str(path),
        enforce_file_format=True,
        little_endian=True,
        implicit_vr=False,
    )



def _zip_dir(zip_path: Path, root_dir: Path) -> None:
    """
    Zips the entire directory tree under root_dir into zip_path.
    """
    zip_path.parent.mkdir(parents=True, exist_ok=True)
    with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z:
        for p in root_dir.rglob("*"):
            if p.is_file():
                z.write(p, arcname=p.relative_to(root_dir))


def _make_dicom_series(dir_path: Path, n: int = 5) -> list[Path]:
    """
    Creates a small synthetic DICOM series in a deep folder tree with extensionless files.
    """
    series_uid = generate_uid()
    study_uid = generate_uid()

    out_paths: list[Path] = []
    for i in range(1, n + 1):
        # simple gradient-ish pattern per slice
        arr = np.full((64, 64), i * 500, dtype=np.uint16)
        # extensionless on purpose
        p = dir_path / f"IM_{i:04d}"
        _write_synthetic_dicom(
            p,
            instance_number=i,
            series_uid=series_uid,
            study_uid=study_uid,
            arr=arr,
        )
        out_paths.append(p)
    return out_paths


def test_load_case_zip_extensionless_dicom_deep_tree(tmp_path: Path):
    """
    ZIP ingestion should:
    - discover DICOMs by header (not extension)
    - handle deep folder structures
    - return decoded PIL images as SliceRecords
    """
    deep_root = tmp_path / "outer_payload" / "a" / "b" / "c" / "series"
    _make_dicom_series(deep_root, n=6)

    zip_path = tmp_path / "case.zip"
    _zip_dir(zip_path, tmp_path / "outer_payload")

    slices, meta = load_case([str(zip_path)])

    assert isinstance(meta, dict)
    assert "warnings" in meta
    assert isinstance(meta["warnings"], list)

    assert len(slices) == 6, f"Expected 6 slices, got {len(slices)}. Warnings: {meta['warnings']}"
    # basic sanity on returned objects
    for s in slices:
        assert hasattr(s, "image")
        assert s.image.size == (64, 64)


def test_load_case_nested_zip_extensionless_dicom(tmp_path: Path):
    """
    Nested ZIP ingestion should:
    - extract outer zip
    - find inner zip(s)
    - extract inner zip(s)
    - discover DICOMs by header
    """
    inner_root = tmp_path / "inner_payload" / "deep" / "series"
    _make_dicom_series(inner_root, n=4)

    inner_zip = tmp_path / "inner.zip"
    _zip_dir(inner_zip, tmp_path / "inner_payload")

    # Outer zip contains the inner zip in a nested folder
    outer_payload = tmp_path / "outer_payload"
    (outer_payload / "x" / "y").mkdir(parents=True, exist_ok=True)
    (outer_payload / "x" / "y" / "inner.zip").write_bytes(inner_zip.read_bytes())

    outer_zip = tmp_path / "outer.zip"
    _zip_dir(outer_zip, outer_payload)

    slices, meta = load_case([str(outer_zip)])

    warnings = meta.get("warnings", [])
    assert any("Found nested zip" in w for w in warnings), f"Expected nested-zip warning. Got: {warnings}"

    assert len(slices) == 4, f"Expected 4 slices, got {len(slices)}. Warnings: {warnings}"
    for s in slices:
        assert s.image.size == (64, 64)


def test_load_case_zip_with_no_dicoms_returns_clean_warnings(tmp_path: Path):
    """
    If a ZIP contains no DICOMs (by header), loader should return ([], warnings)
    and not crash.
    """
    payload = tmp_path / "payload"
    payload.mkdir(parents=True, exist_ok=True)
    # put a random small file
    (payload / "readme.txt").write_text("hello", encoding="utf-8")

    zip_path = tmp_path / "no_dicoms.zip"
    _zip_dir(zip_path, payload)

    slices, meta = load_case([str(zip_path)])
    warnings = meta.get("warnings", [])

    assert slices == []
    assert any("ZIP contained no DICOM files" in w for w in warnings), f"Expected no-dicom warning. Got: {warnings}"


Writing /kaggle/working/mri-mvp/tests/test_io_nested_zip.py


In [15]:
!pytest -q /kaggle/working/mri-mvp/tests/test_io_nested_zip.py



[31m[1m_____________ ERROR collecting mri-mvp/tests/test_io_nested_zip.py _____________[0m
[31mImportError while importing test module '/kaggle/working/mri-mvp/tests/test_io_nested_zip.py'.
Hint: make sure your test modules/packages have valid Python names.
Traceback:
/usr/lib/python3.12/importlib/__init__.py:90: in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
mri-mvp/tests/test_io_nested_zip.py:15: in <module>
    from mri.io.load_case import load_case
E   ModuleNotFoundError: No module named 'mri'[0m
[31mERROR[0m mri-mvp/tests/test_io_nested_zip.py
!!!!!!!!!!!!!!!!!!!! Interrupted: 1 error during collection !!!!!!!!!!!!!!!!!!!!
[31m[31m[1m1 error[0m[31m in 1.00s[0m[0m


### 3.7 Tests — minimal smoke imports


In [16]:
%%writefile /kaggle/working/mri-mvp/tests/test_smoke_inference_service.py
def test_import_service():
    from mri.application.inference_service import InferenceService
    assert InferenceService is not None


Writing /kaggle/working/mri-mvp/tests/test_smoke_inference_service.py


In [17]:
%%writefile /kaggle/working/mri-mvp/tests/test_smoke_serving.py
def test_import_demo():
    from mri.serving.app import demo
    assert demo is not None


Writing /kaggle/working/mri-mvp/tests/test_smoke_serving.py


## 4. Model checkpoint (copy into working dir)

Update the `/kaggle/input/...` path to match your checkpoint dataset.


In [18]:
# CELL: 08_CHECKPOINT
# Point the MVP to the policy-aligned model bundle dataset (preferred)

import os
from pathlib import Path

MODEL_BUNDLE_DIR = "/kaggle/input/datasets/kabomolefe/mri-model-bundle-v1"
assert Path(MODEL_BUNDLE_DIR).exists(), f"Missing bundle dir: {MODEL_BUNDLE_DIR}. Check Inputs."

os.environ["MODEL_REF"] = MODEL_BUNDLE_DIR
print("✅ MODEL_REF set to:", os.environ["MODEL_REF"])

# prove the expected artifacts exist
need = ["best_model.pth", "temperature_scaling.json", "final_policy_config.json", "domain_guard_lr.npz"]
print({n: (Path(MODEL_BUNDLE_DIR)/n).exists() for n in need})


✅ MODEL_REF set to: /kaggle/input/datasets/kabomolefe/mri-model-bundle-v1
{'best_model.pth': True, 'temperature_scaling.json': True, 'final_policy_config.json': True, 'domain_guard_lr.npz': True}


## 5. Install package (editable)

We install from `/kaggle/working/mri-mvp` so `import mri` resolves cleanly.


In [19]:
%%writefile /kaggle/working/mri-mvp/setup.py
# CELL: 09_SETUP_PY
from setuptools import setup, find_packages

setup(
    name="mri-mvp",
    version="0.0.0",
    package_dir={"": "src"},
    packages=find_packages("src"),
)


Writing /kaggle/working/mri-mvp/setup.py


In [20]:
# CELL: 10_PIP_INSTALL_EDITABLE
%pip install -e /kaggle/working/mri-mvp -q


  Preparing metadata (setup.py) ... [?25l[?25hdone
Note: you may need to restart the kernel to use updated packages.


### 5.1 Kaggle path bootstrap (belt-and-suspenders)

This helps if Kaggle import paths get weird after restarts.


In [21]:
# CELL: 11_PATH_BOOTSTRAP (required for Kaggle)
import site

site.addsitedir("/kaggle/working/mri-mvp/src")


## 6. Smoke tests (fast)

These catch syntax/import issues before launch.


In [22]:
# CELL: 12_PY_COMPILE
!python -m py_compile /kaggle/working/mri-mvp/src/mri/application/inference_service.py
!python -m py_compile /kaggle/working/mri-mvp/src/mri/serving/app.py


In [23]:
# CELL: 12_SMOKE_TEST (fast)
import os
from pathlib import Path

# Confirm MODEL_REF is set BEFORE importing serving.app
print("MODEL_REF:", os.getenv("MODEL_REF"))
assert os.getenv("MODEL_REF"), "MODEL_REF not set. Run CELL 08_CHECKPOINT first."

# Import package + app
import mri
print("OK: imported mri package")

from mri.serving.app import demo
print("OK: imported demo")

# Confirm bundle files exist
bundle = Path(os.environ["MODEL_REF"])
print("Bundle dir:", bundle)
print("best_model.pth size MB:", (bundle/"best_model.pth").stat().st_size/1e6)


MODEL_REF: /kaggle/input/datasets/kabomolefe/mri-model-bundle-v1
OK: imported mri package
new /
OK: imported demo
Bundle dir: /kaggle/input/datasets/kabomolefe/mri-model-bundle-v1
best_model.pth size MB: 44.794315


In [24]:
# CELL: 13_PYTEST_SMOKE
!pytest -q /kaggle/working/mri-mvp/tests


[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m.[0m[32m                                                                    [100%][0m
[32m[32m[1m5 passed[0m[32m in 12.73s[0m[0m


## 7. Launch public demo (last section)

**Rule:** Run `Reset` then `Launch`. If you need a new link, reset first.


**# Run Order**
Setup/install

(Optional) tests

Reset

Launch (keep running)* 

In [25]:
# CELL: 99_RESET_GRADIO (CANONICAL)
# CANONICAL RESET CELL
# Run this ONCE after kernel restart and BEFORE launch.
# Do NOT import demo here.

import gradio as gr

# Safe global reset (do NOT import demo here)
gr.close_all()
print("✅ reset done (gr.close_all)")


✅ reset done (gr.close_all)


In [26]:
# CELL: 100_LAUNCH_PUBLIC_DEMO (CANONICAL)
# CANONICAL LAUNCH CELL
# Run this AFTER reset. Keep it running while testing the public link.
# If you restart kernel, you must rerun setup → reset → launch.

import time, requests, importlib, subprocess
import gradio as gr
import gradio.networking as net
import mri.serving.app as serving_app

# Clean start BEFORE launch
gr.close_all()

# Fresh demo object each run (prevents relaunching a previously-closed instance)
importlib.reload(serving_app)
demo = serving_app.demo

# Kaggle workaround: bound Gradio share readiness checks
_orig_url_ok = net.url_ok
def _url_ok_with_timeout(url: str) -> bool:
    t0 = time.time()
    while time.time() - t0 < 8:
        try:
            r = requests.get(url, timeout=2)
            if r.status_code in (200, 302, 401, 404):
                return True
        except Exception:
            pass
        time.sleep(0.5)
    return True

net.url_ok = _url_ok_with_timeout

app, local_url, share_url = demo.launch(
    debug=True,
    share=True,
    inline=False,
    prevent_thread_lock=True,
    show_error=True,
)

print("✅ LOCAL:", local_url)
print("✅ PUBLIC:", share_url)

# Prove the server is actually listening (truth signal)
print(subprocess.run(["bash","-lc","ss -ltnp | grep 7860 || true"], capture_output=True, text=True).stdout)

# Local health check
try:
    r = requests.get(local_url, timeout=2)
    print("✅ local health:", r.status_code)
except Exception as e:
    print("❌ local health failed:", repr(e))

print("\nKeep this cell running while testing the PUBLIC link. Interrupt to stop the server/tunnel.")
while True:
    time.sleep(60)


new /
* Running on local URL:  http://127.0.0.1:7860
* Running on public URL: https://6db1504fbd45bf8eac.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Traceback (most recent call last):
  File "/usr/local/lib/python3.12/dist-packages/gradio/queueing.py", line 759, in process_events
    response = await route_utils.call_process_api(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/gradio/route_utils.py", line 354, in call_process_api
    output = await app.get_blocks().process_api(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/gradio/blocks.py", line 2116, in process_api
    result = await self.call_function(
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/gradio/blocks.py", line 1623, in call_function
    prediction = await anyio.to_thread.run_sync(  # type: ignore
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/dist-packages/anyio/to_thread.py", line 63, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^

Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://6db1504fbd45bf8eac.gradio.live
✅ LOCAL: http://127.0.0.1:7860/
✅ PUBLIC: https://6db1504fbd45bf8eac.gradio.live

❌ local health failed: ConnectionError(MaxRetryError('HTTPConnectionPool(host=\'127.0.0.1\', port=7860): Max retries exceeded with url: / (Caused by NewConnectionError("HTTPConnection(host=\'127.0.0.1\', port=7860): Failed to establish a new connection: [Errno 111] Connection refused"))'))

Keep this cell running while testing the PUBLIC link. Interrupt to stop the server/tunnel.


KeyboardInterrupt: 

## 8. Verification checklist

✅ Multi-image JPEG upload (6+ slices)

✅ DICOM.zip upload (works)

✅ Nested/no-extension zip (works)

⚠️ Non-MRI screenshots → should ideally be rejected/uncertain (known limitation until we add stronger input validity gating)


****Run cells top → bottom once

After install cell, restart kernel if prompted

Always run RESET before LAUNCH

Public demo uses share=True, inline=False

Stopping launch cell kills the tunnel (404 is expected)

In [None]:
!python - << 'PY'
import json
nb = json.load(open("/kaggle/working/mvp-notebook.ipynb","r",encoding="utf-8"))
code_cells = [c for c in nb["cells"] if c.get("cell_type")=="code"]
print("code_cells:", len(code_cells))
PY
