In [1]:
# @title Install dependencies (pinned for Colab stability; GPU recommended for SD)
!pip -q install \
  praw==7.7.1 requests==2.32.3 beautifulsoup4==4.12.3 lxml==5.3.0 \
  python-dotenv==1.0.1 pydantic==2.9.2 tqdm==4.66.5 loguru==0.7.2 \
  numpy==1.26.4 scikit-learn==1.5.2 streamlit==1.38.0 pyngrok==7.2.3 \
  nest-asyncio==1.6.0 matplotlib==3.9.2 pdfminer.six==20240706 faiss-cpu==1.8.0.post1 \
  openai==1.51.2 networkx==3.3 fastapi==0.115.0 uvicorn==0.30.6 \
  langgraph==0.1.5

# Optional: Stable Diffusion stack (GPU highly recommended)
!pip -q install torch --extra-index-url https://download.pytorch.org/whl/cu121
!pip -q install diffusers==0.29.0 transformers==4.41.0 accelerate==0.30.0 safetensors==0.4.2 peft==0.10.0


[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/149.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m9.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.6/57.6 kB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.0/61.0 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m191.0/191.0 kB[0m [31m14.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m64.9/64.9 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m147.9/147.9 kB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.9/4.9 MB[0m [31m115.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
# @title Load .env (if present) and set optional fallbacks
import os
from pathlib import Path

# Load .env if available
try:
    from dotenv import load_dotenv
    _ = load_dotenv()
except Exception:
    pass

# Optional fallback secrets for quick demo (leave blank to rely on .env)
OPTIONAL_SECRETS = {
    "OPENAI_API_KEY": "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",
    "UNPAYWALL_EMAIL": "jackryan76388@gmail.com",
    "REDDIT_CLIENT_ID": "GVVuq2kgFJHHq3bFU_aC6A",
    "REDDIT_CLIENT_SECRET": "48KIcfWC9wkRaBwYOJwnIC-mBz3X9w",
    "REDDIT_USER_AGENT": "ReliScoreCancerBot/0.1 by u/Normal-Platform9498",
    "REDDIT_USERNAME": "Normal-Platform9498",
    "REDDIT_PASSWORD": "Jackryan763",
    "HTTP_TIMEOUT": "15",
    "NGROK_AUTHTOKEN": "342JRxQluyUnlpNgbRWrTndqGXp_4pn7QWU8YyUsrpkGT6LV3",
}
for k, v in OPTIONAL_SECRETS.items():
    if v and not os.environ.get(k):
        os.environ[k] = v


In [3]:
# @title Write ALL core ReliScore files (original + safe extensions)
import textwrap
from pathlib import Path

PROJECT_ROOT = Path.cwd()
SRC = PROJECT_ROOT / "src" / "reli_core"
PKG_DIRS = [
    SRC, SRC/"nlp", SRC/"extract", SRC/"aggregate", SRC/"reasoner",
    SRC/"writer", SRC/"sources", SRC/"assets", SRC/"nlg",
    SRC/"graph", SRC/"multihop", SRC/"diffusion", SRC/"agent", SRC/"api"
]
for d in PKG_DIRS:
    d.mkdir(parents=True, exist_ok=True)

def w(rel_path: str, content: str):
    p = PROJECT_ROOT / rel_path
    p.parent.mkdir(parents=True, exist_ok=True)
    p.write_text(textwrap.dedent(content).strip() + "\n", encoding="utf-8")
    return p

# ---------- __init__ ----------
w("src/reli_core/__init__.py", """
from pathlib import Path
__all__ = ["version", "PACKAGE_ROOT"]

def version() -> str:
    return "ReliScore v0.1.0"

try:
    PACKAGE_ROOT = Path(__file__).resolve().parent
except NameError:
    PACKAGE_ROOT = Path.cwd() / "src" / "reli_core"
""")

# ---------- utils / cache ----------
w("src/reli_core/utils.py", """
import os, time, json, hashlib, re
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
import requests
from loguru import logger

HTTP_TIMEOUT = int(os.environ.get("HTTP_TIMEOUT", "15"))
DATA_DIR = Path.cwd() / "data_cache"
DATA_DIR.mkdir(parents=True, exist_ok=True)

def sha1(s: str) -> str:
    return hashlib.sha1(s.encode("utf-8")).hexdigest()

def cache_path(name: str) -> Path:
    return DATA_DIR / name

def save_json(path: Path, obj: Any):
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(obj, indent=2, ensure_ascii=False), encoding="utf-8")

def load_json(path: Path) -> Optional[Any]:
    if path.exists():
        return json.loads(path.read_text(encoding="utf-8"))
    return None

def get(url: str, params: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None) -> Tuple[Optional[requests.Response], Optional[str]]:
    try:
        res = requests.get(url, params=params, headers=headers, timeout=HTTP_TIMEOUT)
        if res.status_code == 200:
            return res, None
        return None, f"HTTP {res.status_code} for {url}"
    except Exception as e:
        return None, str(e)

def redact_email(text: str) -> str:
    return re.sub(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}", "[redacted-email]", text or "")

def dedupe_list(items):
    seen = set()
    out = []
    for x in items:
        try:
            k = json.dumps(x, sort_keys=True, default=str)
        except Exception:
            k = str(x)
        if k not in seen:
            seen.add(k)
            out.append(x)
    return out

def safe_int(x, default=None):
    try:
        return int(x)
    except:
        return default

def now_ts() -> int:
    return int(time.time())

def log_idempotent(path: Path, key: str) -> bool:
    path.parent.mkdir(parents=True, exist_ok=True)
    if not path.exists():
        path.write_text("", encoding="utf-8")
    existing = set(line.strip() for line in path.read_text(encoding="utf-8").splitlines() if line.strip())
    if key in existing:
        return False
    with path.open("a", encoding="utf-8") as f:
        f.write(key + "\\n")
    return True
""")

w("src/reli_core/cache.py", """
from typing import Any
from .utils import save_json, load_json, sha1, cache_path

def get_or_set(name: str, builder) -> Any:
    p = cache_path(name)
    obj = load_json(p)
    if obj is not None:
        return obj
    obj = builder()
    save_json(p, obj)
    return obj

def memoize_json(key: str, fn, *args, **kwargs) -> Any:
    h = sha1(key)
    p = cache_path(f"memo/{h}.json")
    obj = load_json(p)
    if obj is not None:
        return obj
    obj = fn(*args, **kwargs)
    save_json(p, obj)
    return obj
""")

# ---------- safety / templates / ranking ----------
w("src/reli_core/safety.py", """
import re
from typing import Dict
ADVICE_DISCLAIMER = ("This is not medical advice. Consult a qualified clinician for decisions about "
                     "diagnosis, treatment, or prevention.")

def sanitize_claim(text: str) -> str:
    text = (text or "").strip()
    text = re.sub(r"\\s+", " ", text)
    return text[:1000]

def safety_checks(claim: str) -> Dict[str, bool]:
    lower = (claim or "").lower()
    emerg = any(x in lower for x in ["suicid", "emergency", "chest pain", "unconscious"])
    pregnancy = "pregnan" in lower
    pedi = any(x in lower for x in ["child", "infant", "toddler", "pediatric"])
    return {"emergency_flag": emerg, "pregnancy_flag": pregnancy, "pediatric_flag": pedi}
""")

w("src/reli_core/templates.py", """
from typing import List, Dict

def format_citation(item: Dict) -> str:
    year = item.get("year") or "n.d."
    src = item.get("source", "Unknown")
    title = (item.get("title", "Untitled") or "").strip()
    ident = item.get("id", "")
    return f"[{src} {year}] {title} ({ident})"

def build_citations_markdown(study_points: List[Dict], limit: int = None) -> str:
    items = study_points if limit is None else study_points[:max(0, limit)]
    lines = [f"- {format_citation(c)}" for c in items]
    return "\\n".join(lines) if lines else "_No citations available._"

def build_reasons_markdown(verdict: Dict) -> str:
    rs = (verdict or {}).get("reasons") or []
    if not rs:
        return "_No key reasons extracted._"
    return "\\n".join(f"- {r}" for r in rs)
""")

w("src/reli_core/ranking.py", """
from typing import List, Dict
from datetime import datetime

TIER_WEIGHT = {
    "guideline": 4.0, "systematic_review": 3.5, "randomized_trial": 3.0,
    "cohort": 2.0, "case_control": 1.5, "case_series": 1.0,
    "in_vitro": 0.5, "animal": 0.5,
}

def score_item(x: Dict) -> float:
    year = x.get("year") or 0
    try:
        recency = max(0, datetime.now().year - int(year))
    except:
        recency = 10
    tier = x.get("tier", "cohort")
    weight = TIER_WEIGHT.get(tier, 1.0)
    recency_factor = max(0.2, 1.0 - recency / 20.0)
    applicability = x.get("applicability", 1.0)
    oa_boost = 1.1 if (x.get("oa_url") or "").strip() else 1.0
    return weight * recency_factor * (0.5 + 0.5 * applicability) * oa_boost

def rank_items(items: List[Dict]) -> List[Dict]:
    for it in items:
        it["_score"] = score_item(it)
    return sorted(items, key=lambda z: z.get("_score", 0), reverse=True)
""")

# ---------- NLP ----------
w("src/reli_core/nlp/claim_detect.py", """
from typing import Dict
INTENTS = ["prevention","treatment","screening","supportive_care","genomic","anecdote","meta_news"]

def classify_intent(text: str) -> str:
    t = (text or "").lower()
    if any(k in t for k in ["breakthrough","cure","new treatment","news","headline","approval","fda","ema"]):
        return "meta_news"
    if any(k in t for k in ["prevent","risk","lower risk","prophylaxis"]):
        return "prevention"
    if any(k in t for k in ["treat","therapy","drug","dose","improve","chemotherapy"]):
        return "treatment"
    if any(k in t for k in ["screen","detect early","psa","mammogram"]):
        return "screening"
    if any(k in t for k in ["nausea","fatigue","support"]):
        return "supportive_care"
    if any(k in t for k in ["mutation","genomic","biomarker"]):
        return "genomic"
    if any(k in t for k in ["i feel","my uncle","i tried"]):
        return "anecdote"
    return "treatment"

def query_terms(text: str, intent: str) -> Dict[str, str]:
    base = text.strip().rstrip(".")
    if intent == "meta_news":
        return {
            "pubmed": "immunotherapy checkpoint inhibitor tumor-agnostic oncology review",
            "eupmc": "CAR-T radioligand ADC tumor-agnostic approval review",
            "crossref": "oncology breakthroughs FDA approval checkpoint inhibitor CAR-T ADC radiopharmaceutical review",
            "preprint": "cancer breakthrough phase 3 site:medrxiv.org OR site:biorxiv.org",
            "ctgov": "cancer breakthrough phase 3",
            "fda": "oncology approval breakthrough PD-1 CAR-T ADC radioligand KRAS NTRK tumor-agnostic",
        }
    return {
        "pubmed": f"{base} cancer {intent}",
        "eupmc": f"{base} cancer {intent}",
        "crossref": f"{base} cancer {intent}",
        "preprint": f"{base} cancer {intent} site:biorxiv.org OR site:medrxiv.org",
        "ctgov": f"{base} cancer {intent}",
    }
""")

w("src/reli_core/nlp/normalize.py", """
import re
def normalize_title(t: str) -> str:
    t = re.sub(r"\\s+", " ", (t or "").strip())
    return t[:500]
""")

# ---------- extract / aggregate / reasoner ----------
w("src/reli_core/extract/effects.py", """
import re
from typing import Dict, Optional

EFFECT_PAT = re.compile(r"(RR|OR|HR)\\s*[:=]?\\s*([0-9]*\\.?[0-9]+)\\s*(?:\\(\\s*95%\\s*CI\\s*[:=]?\\s*([0-9]*\\.?[0-9]+)\\s*[-–]\\s*([0-9]*\\.?[0-9]+)\\s*\\))?", re.I)
SUPPORT_PAT = re.compile(r"\\b(reduced risk|lower risk|protective|inverse association)\\b", re.I)
REFUTE_PAT = re.compile(r"\\b(increased risk|higher risk|positive association|no reduction|no association)\\b", re.I)

def extract_effects(text: str) -> Optional[Dict]:
    if not text: return None
    m = EFFECT_PAT.search(text)
    if not m: return None
    kind, val, lo, hi = m.groups()
    return {"metric": kind.upper(), "value": float(val), "ci_low": float(lo) if lo else None, "ci_high": float(hi) if hi else None}

def infer_direction(title: str, abstract: str, effect: Optional[Dict]) -> str:
    if effect and effect.get("value"):
        if effect["value"] < 1.0: return "supports"
        if effect["value"] > 1.0: return "refutes"
    blob = " ".join([title or "", abstract or ""])
    if SUPPORT_PAT.search(blob) and not REFUTE_PAT.search(blob): return "supports"
    if REFUTE_PAT.search(blob) and not SUPPORT_PAT.search(blob): return "refutes"
    return "unclear"
""")

w("src/reli_core/aggregate/prevention.py", """
from typing import List, Dict
import math
def pooled_effect(studies: List[Dict]) -> Dict:
    vals = [s.get("effect", {}).get("value") for s in studies if s.get("effect")]
    vals = [v for v in vals if isinstance(v, (int, float))]
    if not vals: return {"pooled": None, "n": 0}
    log_vals = [math.log(v) for v in vals if v > 0]
    if not log_vals: return {"pooled": None, "n": 0}
    gmean = math.exp(sum(log_vals) / len(log_vals))
    return {"pooled": gmean, "n": len(vals)}
""")

w("src/reli_core/reasoner/verdict.py", """
from typing import Dict, List
from ..ranking import rank_items

def decide_verdict(items: List[Dict], intent: str) -> Dict:
    ranked = rank_items(items)
    topk = ranked[:10]
    positives = sum(1 for x in topk if (x.get("direction") or "").lower() == "supports")
    negatives = sum(1 for x in topk if (x.get("direction") or "").lower() == "refutes")
    label = "Unclear"
    if positives >= max(2, negatives + 1): label = "Supported"
    elif negatives >= max(2, positives + 1): label = "Contradicted"
    elif positives > 0 and negatives > 0: label = "Mixed"
    conf = min(0.95, 0.4 + 0.05 * len(topk) + 0.1 * abs(positives - negatives))

    reasons = []
    for x in topk[:8]:
        eff = x.get("effect")
        eff_str = ""
        if isinstance(eff, dict) and eff.get("value"):
            val = eff["value"]
            ci = f" (95% CI {eff['ci_low']}-{eff['ci_high']})" if (eff.get("ci_low") and eff.get("ci_high")) else ""
            eff_str = f" — {eff['metric']} {val}{ci}"
        reasons.append(f"{x.get('tier','study').title()} {x.get('year','')}: {x.get('title','')} {eff_str}".strip())
    return {"label": label, "confidence": round(conf, 2), "reasons": reasons}
""")

# ---------- nlg (LLM helper) ----------
w("src/reli_core/nlg/llm_helper.py", """
import os, json
from typing import Optional, Dict

def _can_use_llm() -> bool:
    return bool(os.environ.get("OPENAI_API_KEY"))

def explain_with_llm(facts: Dict, model: str = "gpt-4o-mini") -> Optional[str]:
    if not _can_use_llm():
        return None
    try:
        from openai import OpenAI
        client = OpenAI(api_key=os.environ["OPENAI_API_KEY"])
        system = ("You are a cautious medical explainer. Write a short, human explanation. "
                  "STRICT RULES: Only use the structured 'facts' JSON provided. Do not invent new studies or numbers. "
                  "No citations inline; the app will show references separately. Be kind and clear. "
                  "Order your response as four paragraphs with headings: "
                  "'What this actually means', 'Where the claim goes wrong or right', 'How to think about it', 'Bottom line'.")
        user = ("Claim:\\n" + (facts.get("claim","") or "") + "\\n\\n"
                "Verdict:\\n" + json.dumps(facts.get("verdict",{}), ensure_ascii=False) + "\\n\\n"
                "Evidence facts (JSON):\\n" + json.dumps(facts, ensure_ascii=False))
        resp = client.chat.completions.create(
            model=model, temperature=0.2,
            messages=[{"role":"system","content":system},{"role":"user","content":user}],
        )
        return resp.choices[0].message.content.strip()
    except Exception:
        return None
""")

# ---------- writer (LLM-first, meta-news aware fallback) ----------
w("src/reli_core/writer/lay.py", """
from typing import Dict, List
try:
    from src.reli_core.nlg.llm_helper import explain_with_llm
except Exception:
    explain_with_llm = None

def _fmt_effect(e):
    if not isinstance(e, dict) or not e.get("value"): return None
    val = e["value"]; ci = ""
    if e.get("ci_low") and e.get("ci_high"):
        ci = f" (95% CI {e['ci_low']}-{e['ci_high']})"
    return f"{e.get('metric','').upper()} {val}{ci}"

def _short_support_line(te):
    yr = te.get("year") or ""; ttl = (te.get("title") or "").rstrip(".")
    fx = {"metric":te.get("metric"),"value":te.get("value"),"ci_low":te.get("ci_low"),"ci_high":te.get("ci_high")}
    eff = _fmt_effect(fx); part = f"{yr}: {ttl}"
    if eff: part += f" — {eff}"
    return f"{part}."

def _human_meta_news(facts: Dict) -> str:
    v = facts.get("verdict", {}); label = v.get("label","Unclear"); conf = v.get("confidence",0.0)
    cts = facts.get("counts", {}); years = facts.get("years", {})
    span = f" spanning {years['earliest']}–{years['latest']}" if years.get("earliest") and years.get("latest") else ""
    tops = facts.get("top_titles") or []
    bullets = [f"- {yr}: {ttl}" for yr, ttl in tops[:6] if ttl]
    bullet_block = "\\n".join(bullets) if bullets else "- We retrieved high-level items (approvals/reviews), but titles were sparse."
    hop_trace = facts.get("hop_trace") or []
    hop_lines = []
    for i, hop in enumerate(hop_trace[:3], 1):
        ex = hop.get("subq","").strip()
        ans = hop.get("answer","").strip()
        hop_lines.append(f"{i}. **{ex}** → {ans}")
    hop_block = "\\n".join(hop_lines)

    p1 = (f"**What this actually means**  \\n"
          f"We looked for real, patient-impactful advances (phase 3 results, approvals, guideline shifts). "
          f"We found {cts.get('total',0)} items{span}. Examples from titles:\\n{bullet_block}")
    p2 = ("**Where the claim goes wrong or right**  \\n"
          "Headlines often hype early lab or phase-1/2 signals that never survive phase-3 trials. "
          "Real breakthroughs have regulatory approval and guideline adoption.")
    p3 = ("**How to think about it**  \\n"
          "Focus on results that reached phase 3 and approval (checkpoint inhibitors, CAR-T, radioligand therapy, ADCs, KRAS/NTRK-targeted drugs). "
          "They don't cure all cancers, but for specific groups they extend survival or yield durable remissions.")
    p4 = f"**Bottom line**  \\nVerdict: **{label}** (confidence {conf:.2f})."
    if hop_block:
        p4 += f"\\n\\n**Reasoning hops (condensed):**\\n{hop_block}"
    return "\\n\\n".join([p1,p2,p3,p4])

def _human_general(facts: Dict) -> str:
    v = facts.get("verdict", {}); label = v.get("label","Unclear"); conf = v.get("confidence",0.0)
    cts = facts.get("counts", {}); years = facts.get("years", {}); span = f" spanning {years['earliest']}–{years['latest']}" if years.get("earliest") and years.get("latest") else ""
    avg = facts.get("avg_score"); avg_txt = f" with average quality score ≈ {avg:.2f}" if isinstance(avg,(int,float)) else ""
    pooled = facts.get("pooled_effect"); pooled_txt = f" Pooled effect ≈ {pooled:.2f} (ratio < 1 suggests lower risk)." if isinstance(pooled,(int,float)) and pooled>0 else ""
    tops = facts.get("top_effects") or []
    study_lines = []
    for te in tops[:3]: study_lines.append(_short_support_line(te))
    studies_txt = " ".join(study_lines) if study_lines else ""
    if not studies_txt:
        tt = facts.get("top_titles") or []
        named = "; ".join([f"{y}: {t}" for y,t in tt[:3] if t])
        if named: studies_txt = f"Examples: {named}."
    hop_trace = facts.get("hop_trace") or []
    hop_lines = []
    for i, hop in enumerate(hop_trace[:2], 1):
        ex = hop.get("subq","").strip()
        ans = hop.get("answer","").strip()
        hop_lines.append(f"{i}. **{ex}** → {ans}")
    hop_block = "\\n".join(hop_lines)

    p1 = (f"**What this actually means**  \\n"
          f"We analyzed open-access sources and identified {cts.get('total',0)} relevant items{span}. "
          f"Of these, {cts.get('supports',0)} support, {cts.get('refutes',0)} refute, and {cts.get('unclear',0)} are uncertain{avg_txt}.{pooled_txt} "
          f"{(' Key examples: ' + studies_txt) if studies_txt else ''}")
    middle = {
        "Supported": "Higher-ranked evidence supports the claim, but magnitudes vary and do not imply certainty.",
        "Contradicted": "Higher-ranked evidence tends to contradict the claim; reported effects point away from benefit.",
        "Mixed": "Evidence points in both directions; design and population differences likely explain the split.",
        "Unclear": "Current evidence is limited or inconsistent; stronger, targeted studies are needed."
    }.get(label, "Interpret cautiously.")
    p2 = f"**Where the claim goes wrong or right**  \\n{middle}"
    p3 = ("**How to think about it**  \\n"
          "Interpret results in context: study quality, recency, consistency, and relevance to your population. "
          "Be cautious with absolute language like “always” or “never.”")
    p4 = f"**Bottom line**  \\nVerdict: **{label}** (confidence {conf:.2f})."
    if hop_block: p4 += f"\\n\\n**Reasoning hops (condensed):**\\n{hop_block}"
    return "\\n\\n".join([p1,p2,p3,p4])

def write_explanation(claim: str, verdict: Dict, aggregates: Dict, study_points: List[Dict], intent: str, facts: Dict = None) -> str:
    facts = facts or {}
    if explain_with_llm is not None:
        try:
            llm_text = explain_with_llm(facts)
            if llm_text: return llm_text
        except Exception:
            pass
    if facts.get("intent") == "meta_news":
        return _human_meta_news(facts)
    return _human_general(facts)

def graph_explanations(study_points: List[Dict], verdict: Dict) -> Dict[str, Dict[str, str]]:
    return {}
""")

# ---------- sources ----------
w("src/reli_core/sources/__init__.py", """
from typing import Dict
def base_item(**kw) -> Dict:
    d = dict(
        id=kw.get("id",""), source=kw.get("source",""), title=kw.get("title","Untitled"),
        year=kw.get("year", None), tier=kw.get("tier","cohort"), applicability=kw.get("applicability",1.0),
        oa_url=kw.get("oa_url",""), pmid=kw.get("pmid",""), pmcid=kw.get("pmcid",""),
        abstract=kw.get("abstract",""), direction=kw.get("direction","unclear"),
        summary=kw.get("summary",""), effect=kw.get("effect"),
    )
    return d
""")

w("src/reli_core/sources/pubmed.py", """
from typing import List, Dict
from ..utils import get
from . import base_item
EUTILS = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
ESUMMARY = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"

def search_pubmed(query: str, n: int = 20) -> List[Dict]:
    items: List[Dict] = []
    if not query: return items
    res, err = get(EUTILS, params={"db":"pubmed","retmode":"json","retmax":str(n),"term":query})
    if err or not res: return items
    data = res.json()
    ids = data.get("esearchresult",{}).get("idlist",[])[:n]
    if not ids: return items
    res2, err2 = get(ESUMMARY, params={"db":"pubmed","retmode":"json","id":",".join(ids)})
    if err2 or not res2: return items
    summ = res2.json().get("result",{})
    for pid in ids:
        r = summ.get(pid)
        if not r: continue
        title = (r.get("title") or "").strip().rstrip(".")
        try: year = int((r.get("pubdate","") or "")[:4])
        except: year = None
        items.append(base_item(
            id=f"PMID:{pid}", source="PubMed", title=title, year=year, tier="cohort",
            applicability=1.0, pmid=pid, summary=r.get("sortfirstauthor") or "", direction="unclear"
        ))
    return items
""")

w("src/reli_core/sources/europe_pmc.py", """
from typing import List, Dict
from ..utils import get
from . import base_item
EPMC = "https://www.ebi.ac.uk/europepmc/webservices/rest/search"

def search_eupmc(query: str, n: int = 20) -> List[Dict]:
    items: List[Dict] = []
    if not query: return items
    res, err = get(EPMC, params={"query":query, "resultType":"core", "format":"json", "pageSize":str(n)})
    if err or not res: return items
    docs = res.json().get("resultList",{}).get("result",[])
    for d in docs[:n]:
        pmcid = d.get("pmcid",""); pmid = d.get("pmid",""); title = (d.get("title") or "").strip()
        try: year = int(d.get("pubYear")) if (d.get("pubYear") or "").isdigit() else None
        except: year = None
        abstract = d.get("abstractText","") or ""
        items.append(base_item(
            id=f"PMCID:{pmcid}" if pmcid else (f"PMID:{pmid}" if pmid else d.get("id","EPMC")),
            source="EuropePMC", title=title, year=year, tier="cohort", applicability=1.0,
            pmid=pmid, pmcid=pmcid, oa_url=(f"https://europepmc.org/article/pmcid/{pmcid}" if pmcid else ""),
            abstract=abstract, summary=d.get("journalTitle",""), direction="unclear"
        ))
    return items
""")

w("src/reli_core/sources/crossref.py", """
from typing import List, Dict
from ..utils import get
from . import base_item
API = "https://api.crossref.org/works"

def search_crossref(query: str, n: int = 15) -> List[Dict]:
    items: List[Dict] = []
    if not query: return items
    res, err = get(API, params={"query":query, "rows":str(n)})
    if err or not res: return items
    for it in res.json().get("message",{}).get("items",[])[:n]:
        title = " ".join(it.get("title") or [])[:300]
        try: year = it.get("issued",{}).get("date-parts",[[None]])[0][0]
        except: year = None
        doi = it.get("DOI","")
        items.append(base_item(
            id=f"DOI:{doi}" if doi else title[:24], source="Crossref", title=title, year=year,
            tier="cohort", applicability=0.8, oa_url="", summary=(it.get("container-title") or [""])[0],
            direction="unclear"
        ))
    return items
""")

w("src/reli_core/sources/preprints.py", """
from typing import List, Dict
from ..utils import get
from . import base_item
CR = "https://api.crossref.org/works"

def search_preprints(query: str, n: int = 10) -> List[Dict]:
    items: List[Dict] = []
    if not query: return items
    q = f"{query} (biorxiv OR medrxiv)"
    res, err = get(CR, params={"query":q, "rows":str(n)})
    if err or not res: return items
    for it in res.json().get("message",{}).get("items",[])[:n]:
        title = " ".join(it.get("title") or [])[:300]
        try: year = it.get("issued",{}).get("date-parts",[[None]])[0][0]
        except: year = None
        doi = it.get("DOI","")
        items.append(base_item(
            id=f"DOI:{doi}" if doi else title[:24], source="Preprint", title=title, year=year,
            tier="case_series", applicability=0.6, oa_url="", summary="preprint", direction="unclear"
        ))
    return items
""")

w("src/reli_core/sources/ctgov.py", """
from typing import List, Dict
from ..utils import get
from . import base_item
API = "https://clinicaltrials.gov/api/v2/studies"

def search_ctgov(query: str, n: int = 10) -> List[Dict]:
    items: List[Dict] = []
    if not query: return items
    res, err = get(API, params={"query.term":query, "pageSize":str(n)})
    if err or not res: return items
    for st in res.json().get("studies",[])[:n]:
        ident = st.get("protocolSection",{}).get("identificationModule",{}).get("nctId","")
        title = st.get("protocolSection",{}).get("identificationModule",{}).get("officialTitle","") or \
                st.get("protocolSection",{}).get("identificationModule",{}).get("briefTitle","")
        items.append(base_item(
            id=ident or title[:24], source="ClinicalTrials.gov", title=title, year=None, tier="randomized_trial",
            applicability=0.7, oa_url=f"https://clinicaltrials.gov/study/{ident}" if ident else "",
            summary="trial", direction="unclear"
        ))
    return items
""")

w("src/reli_core/sources/fulltext.py", """
from typing import Optional, Dict
from ..utils import get
import os
UNPAYWALL = "https://api.unpaywall.org/v2/"

def _pick_best_oa(data: dict) -> Optional[str]:
    if not isinstance(data, dict): return None
    best = data.get("best_oa_location")
    if isinstance(best, dict):
        return best.get("url_for_pdf") or best.get("url")
    locs = data.get("oa_locations") or []
    for loc in locs:
        if not isinstance(loc, dict): continue
        url = loc.get("url_for_pdf") or loc.get("url")
        if url: return url
    return None

def best_oa_url(doi: str) -> Optional[str]:
    email = os.environ.get("UNPAYWALL_EMAIL","").strip()
    if not doi or not email: return None
    res, err = get(f"{UNPAYWALL}{doi}", params={"email": email})
    if err or not res: return None
    try: data = res.json() or {}
    except Exception: return None
    return _pick_best_oa(data)

def enrich_oa(item: Dict) -> Dict:
    try:
        doi = (item.get("id","").split("DOI:",1)[-1]) if "DOI:" in item.get("id","") else ""
        if not item.get("oa_url") and doi:
            url = best_oa_url(doi)
            if url: item["oa_url"] = url
        return item
    except Exception:
        return item
""")

w("src/reli_core/sources/agency.py", """
from typing import List, Dict
from . import base_item
SITEMAPS = {"WHO": "https://www.who.int/sitemap.xml","CDC": "https://www.cdc.gov/sitemap.xml","NICE": "https://www.nice.org.uk/sitemap.xml"}

def harvest_agencies(keyword: str, n: int = 10) -> List[Dict]:
    items: List[Dict] = []
    for k in ["WHO","CDC","NICE"]:
        items.append(base_item(
            id=f"{k}:{keyword}", source=k, title=f"{k} guidance related to {keyword}", year=None, tier="guideline",
            applicability=1.0, oa_url=SITEMAPS[k], summary="agency guidance", direction="unclear"
        ))
    return items[:n]
""")

w("src/reli_core/sources/repos.py", """
from typing import List, Dict
from . import base_item

def find_datasets(keyword: str, n: int = 5) -> List[Dict]:
    repos = ["Zenodo", "Dryad", "Figshare", "OSF"]
    out: List[Dict] = []
    for r in repos[:n]:
        out.append(base_item(
            id=f"{r}:{keyword}", source=r, title=f"{r} dataset for {keyword}", year=None, tier="case_series",
            applicability=0.5, oa_url=f"https://{r.lower()}.org", summary="dataset", direction="unclear"
        ))
    return out
""")

w("src/reli_core/sources/fda.py", """
from typing import List, Dict
from ..utils import get
from . import base_item
API = "https://api.fda.gov/drug/drugsfda.json"
KEY_TERMS = ["pembrolizumab","nivolumab","atezolizumab","durvalumab","ipilimumab",
             "tisagenlecleucel","axicabtagene","idecabtagene","ciltacabtagene",
             "sotorasib","adagrasib","larotrectinib","entrectinib",
             "trastuzumab deruxtecan","enhertu","pluvicto","lutetium","psma"]

def search_fda_oncology(query: str, n: int = 25) -> List[Dict]:
    items: List[Dict] = []
    for term in KEY_TERMS:
        res, err = get(API, params={"search": f'products.brand_name:"{term}"', "limit": str(n)})
        if err or not res: continue
        for r in res.json().get("results", []):
            app = r.get("applications", [{}])[0]
            prod = (r.get("products") or [{}])[0]
            title = f"FDA approval: {prod.get('brand_name','')} ({prod.get('active_ingredients','')})"
            try:
                y = (app.get("action_date","") or "")[:4]; year = int(y) if y.isdigit() else None
            except:
                year = None
            items.append(base_item(
                id=f"FDA:{prod.get('brand_name','')}", source="FDA", title=title, year=year, tier="guideline",
                applicability=1.0, oa_url="https://www.fda.gov/drugs/drug-approvals-and-databases/drugsfda-data-files",
                summary="FDA approval", direction="unclear"
            ))
    return items
""")

# ---------- Topic harvest ----------
w("src/reli_core/topic_harvest.py", """
from typing import Dict, List
from .sources.pubmed import search_pubmed
from .sources.europe_pmc import search_eupmc
from .sources.crossref import search_crossref
from .sources.preprints import search_preprints
from .sources.ctgov import search_ctgov
from .sources.agency import harvest_agencies
from .sources.repos import find_datasets
from .ranking import rank_items
from .utils import dedupe_list

def harvest_topic(keyword: str, limit: int = 50) -> Dict[str, List[Dict]]:
    pools = []
    pools += search_pubmed(keyword, n=20)
    pools += search_eupmc(keyword, n=20)
    pools += search_crossref(keyword, n=15)
    pools += search_preprints(keyword, n=10)
    pools += search_ctgov(keyword, n=10)
    pools += harvest_agencies(keyword, n=5)
    pools += find_datasets(keyword, n=5)
    items = dedupe_list(pools)
    ranked = rank_items(items)[:limit]
    return {"items": ranked}
""")
print("✅ Core files written.")


✅ Core files written.


In [4]:
# @title Write Graph-RAG + Multi-Hop modules (Week-6)
from pathlib import Path
import textwrap

def w(rel_path: str, content: str):
    p = Path(rel_path)
    p.parent.mkdir(parents=True, exist_ok=True)
    p.write_text(textwrap.dedent(content).strip() + "\n", encoding="utf-8")
    return p

# ---------- Graph build ----------
w("src/reli_core/graph/build_graph.py", """
from typing import List, Dict
from pathlib import Path
import re, pickle
import networkx as nx
from ..utils import cache_path

ENTITY_PAT = re.compile(r"\\b([A-Z][a-z0-9]+(?:-[A-Z0-9]+)?)\\b")

def extract_entities(title: str) -> Dict[str, str]:
    # very light heuristics: treat capitalized tokens as candidate entities
    ents = set(ENTITY_PAT.findall(title or ""))
    return {e:e for e in list(ents)[:12]}

def build_entity_graph(items: List[Dict]) -> nx.Graph:
    g = nx.Graph()
    for it in items:
        title = it.get("title","") or ""
        ents = extract_entities(title)
        paper_id = it.get("id","paper")
        g.add_node(paper_id, kind="paper", title=title, year=it.get("year"), source=it.get("source"))
        for e in ents:
            g.add_node(e, kind="entity")
            g.add_edge(paper_id, e, rel="mentions")
    return g

def persist_graph(g, path: str = None) -> str:
    path = path or str(cache_path("graphrag_graph.pkl"))
    with open(path, "wb") as f:
        pickle.dump(g, f)
    return path

def load_graph(path: str = None):
    path = path or str(cache_path("graphrag_graph.pkl"))
    if not Path(path).exists(): return None
    with open(path, "rb") as f:
        return pickle.load(f)
""")

# ---------- Graph-RAG retriever ----------
w("src/reli_core/graph/graphrag.py", """
from typing import Dict, List
import networkx as nx
from .build_graph import extract_entities
from ..ranking import rank_items

def extract_seed_entities(text: str) -> Dict[str,str]:
    return extract_entities(text)

def _neighbors_k_hops(g: nx.Graph, seeds: List[str], hops: int = 2) -> Dict:
    seen = set(seeds)
    frontier = set(seeds)
    for _ in range(hops):
        next_frontier = set()
        for node in frontier:
            for n in g.neighbors(node):
                if n not in seen:
                    seen.add(n); next_frontier.add(n)
        frontier = next_frontier
    nodes = list(seen)
    edges = [(u,v,g.edges[u,v].get("rel","")) for u in nodes for v in g.neighbors(u) if v in nodes and u < v]
    return {"nodes": nodes, "edges": edges}

def graphrag_retrieve(g: nx.Graph, query: str, corpus_items: List[Dict], k: int = 8, hops: int = 2) -> Dict:
    if g is None or g.number_of_nodes() == 0:
        return {"spans": [], "neighborhood": {"nodes":[],"edges":[]}}
    seeds = list(extract_seed_entities(query).keys()) or []
    # rank corpus by overlap with seeds (very light)
    scored = []
    for it in corpus_items:
        title = (it.get("title") or "").lower()
        score = sum(1 for s in seeds if s.lower() in title)
        scored.append((score, it))
    scored.sort(key=lambda z: (z[0], z[1].get("_score",0)), reverse=True)
    top = [it for sc, it in scored[:k] if sc > 0] or [it for sc, it in scored[:k]]
    spans = []
    for it in top:
        spans.append({
            "text": (it.get("abstract") or it.get("summary") or it.get("title") or "")[:400],
            "source_id": it.get("id"),
            "title": it.get("title"),
            "year": it.get("year"),
            "oa_url": it.get("oa_url")
        })
    # assemble neighborhood on seeds + top paper IDs
    seeds2 = seeds + [t.get("id") for t in top if t.get("id")]
    neighborhood = _neighbors_k_hops(g, seeds2, hops=hops)
    return {"spans": spans, "neighborhood": neighborhood}
""")

# ---------- Multi-Hop router + QA ----------
w("src/reli_core/multihop/router.py", """
def route(query: str) -> str:
    q = (query or "").lower()
    # crude conditions for multi-hop: conjunctions, sequences, explicit 'which ... and which ...'
    if any(x in q for x in [" which ", " and which ", " then ", " first ", " second ", " compare ", " vs "]):
        return "multi_hop"
    if any(x in q for x in ["breakthrough","approval","phase 3","meta analysis"]):
        return "multi_hop"
    return "single_hop"
""")

w("src/reli_core/multihop/qa.py", """
from typing import Dict, List
from ..graph.graphrag import graphrag_retrieve
from ..graph.build_graph import build_entity_graph

def _propose_subqs(query: str) -> List[str]:
    q = (query or "").strip().rstrip("?")
    # ultra-light planner: split by 'and', 'then'
    parts = []
    for cut in [" then ", " and "]:
        if cut in q.lower():
            parts = [p.strip() for p in q.lower().split(cut) if p.strip()]
            break
    if not parts: parts = [q]
    return parts[:3]

def run_multihop(g, query: str, corpus_items: List[Dict], hop_limit: int = 3, k: int = 6) -> Dict:
    subqs = _propose_subqs(query)
    trace = []
    acc_entities = set()
    all_spans = []
    for i, subq in enumerate(subqs[:hop_limit]):
        r = graphrag_retrieve(g, subq, corpus_items, k=k, hops=2)
        spans = r.get("spans",[])
        all_spans.extend(spans)
        # 'answer' here is just the best span's title/year
        if spans:
            best = spans[0]
            ans = f"{best.get('title','')[:60]} ({best.get('year','')})"
            cits = [f"{s.get('title','')[:60]} ({s.get('year','')})" for s in spans[:3]]
        else:
            ans, cits = "No strong match found", []
        trace.append({"subq": subq, "answer": ans, "citations": cits, "entities": list(acc_entities)})
    return {"final_answer": trace[-1]["answer"] if trace else "", "hop_trace": trace, "graph_snippets": all_spans, "citations": all_spans}
""")

# ---------- Graph visualization (mini) ----------
w("src/reli_core/viz_graph.py", """
from typing import Dict
import matplotlib.pyplot as plt
import networkx as nx
import io, base64

def draw_neighborhood(neighborhood: Dict, out_path: str) -> str:
    nodes = neighborhood.get("nodes") or []
    edges = neighborhood.get("edges") or []
    if not nodes:
        fig, ax = plt.subplots(figsize=(4,2))
        ax.text(0.5,0.5,"No neighborhood",ha="center",va="center"); ax.axis("off")
        fig.savefig(out_path, bbox_inches="tight"); plt.close(fig)
        return out_path
    g = nx.Graph()
    g.add_nodes_from(nodes)
    for u,v,rel in edges:
        g.add_edge(u,v,rel=rel)
    pos = nx.spring_layout(g, seed=42, k=0.6)
    fig, ax = plt.subplots(figsize=(5,4))
    nx.draw_networkx_nodes(g, pos, node_size=300, ax=ax)
    nx.draw_networkx_labels(g, pos, font_size=8, ax=ax)
    nx.draw_networkx_edges(g, pos, alpha=0.6, ax=ax)
    ax.axis("off")
    fig.savefig(out_path, bbox_inches="tight")
    plt.close(fig)
    return out_path
""")
print("✅ Week-6 Graph-RAG + Multi-Hop modules written.")


✅ Week-6 Graph-RAG + Multi-Hop modules written.


In [5]:
# @title Week-7 additions: config, diffusion (SD), agent, FastAPI
from pathlib import Path
import textwrap

def w(rel_path: str, content: str):
    p = Path(rel_path)
    p.parent.mkdir(parents=True, exist_ok=True)
    p.write_text(textwrap.dedent(content).strip() + "\n", encoding="utf-8")
    return p

# ---------- Config loader ----------
w("src/reli_core/config.py", """
import json, os
from pathlib import Path

DEFAULTS = {
    "graphrag": {"retriever_k": 6, "hops": 2},
    "multihop": {"hop_limit": 3},
    "sd": {"model_id": "runwayml/stable-diffusion-v1-5", "height": 512, "width": 512, "steps": 25, "guidance": 7.5},
    "agent": {"max_hops": 3, "enable_calculator": True, "allow_images": True},
}

def load_json_if_exists(p: str):
    try:
        fp = Path(p)
        if fp.exists():
            return json.loads(fp.read_text(encoding="utf-8"))
    except Exception:
        pass
    return {}

def load_run_config():
    cfg = DEFAULTS.copy()
    cfg.update(load_json_if_exists("/mnt/data/Week7_run_config.json"))
    env_cfg = load_json_if_exists("/mnt/data/Week7-env_week7.json")
    if env_cfg:
        cfg["env"] = env_cfg
    return cfg
""")

# ---------- Diffusion (Stable Diffusion wrapper) ----------
w("src/reli_core/diffusion/lora_loader.py", """
from typing import Optional
def maybe_apply_lora(pipe, lora_path: Optional[str] = None):
    # Placeholder for LoRA application; keep no-op if none provided.
    return pipe
""")

w("src/reli_core/diffusion/sd_service.py", """
from typing import Optional, Dict
import torch
from diffusers import StableDiffusionPipeline, StableDiffusionImg2ImgPipeline, StableDiffusionInpaintPipeline
from .lora_loader import maybe_apply_lora

class SDService:
    def __init__(self, model_id: str = "runwayml/stable-diffusion-v1-5", device: Optional[str] = None, lora_path: Optional[str] = None):
        device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        self.txt2img = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16 if device=="cuda" else torch.float32)
        self.txt2img = self.txt2img.to(device)
        maybe_apply_lora(self.txt2img, lora_path)
        self.device = device

    @torch.inference_mode()
    def generate(self, prompt: str, negative_prompt: Optional[str] = None, height: int = 512, width: int = 512, steps: int = 25, guidance: float = 7.5, seed: Optional[int] = None) -> Dict:
        if seed is not None:
            generator = torch.Generator(device=self.device).manual_seed(int(seed))
        else:
            generator = None
        out = self.txt2img(prompt=prompt, negative_prompt=negative_prompt, height=height, width=width, num_inference_steps=steps, guidance_scale=guidance, generator=generator)
        img = out.images[0]
        return {"image": img, "metadata": {"prompt": prompt, "negative": negative_prompt, "height": height, "width": width, "steps": steps, "guidance": guidance, "seed": seed}}
""")

# ---------- Agent (planner + tools + guardrails) ----------
w("src/reli_core/agent/guardrails.py", """
from typing import Tuple

BLOCKED = ["suicide", "violence", "illegal", "self-harm", "bioweapon"]
def check_safe(prompt: str) -> Tuple[bool, str]:
    p = (prompt or "").lower()
    if any(b in p for b in BLOCKED):
        return False, "Prompt rejected by safety policy."
    return True, ""
""")

w("src/reli_core/agent/tools.py", """
from typing import Dict
from ..pipeline import process_claim
from ..diffusion.sd_service import SDService

_sd_singleton = None

def projectQA(query: str) -> Dict:
    return process_claim(query)

def stableDiffusion(args: Dict) -> Dict:
    global _sd_singleton
    if _sd_singleton is None:
        _sd_singleton = SDService()
    prompt = args.get("prompt","")
    neg = args.get("negative_prompt")
    h = int(args.get("height",512)); w = int(args.get("width",512))
    steps = int(args.get("steps",25)); guidance = float(args.get("guidance",7.5))
    seed = args.get("seed")
    out = _sd_singleton.generate(prompt=prompt, negative_prompt=neg, height=h, width=w, steps=steps, guidance=guidance, seed=seed)
    return out
""")

w("src/reli_core/agent/agent.py", """
from typing import Dict, Any
from .guardrails import check_safe
from .tools import projectQA, stableDiffusion

def route_agent(prompt: str, cfg: Dict) -> Dict[str, Any]:
    ok, msg = check_safe(prompt)
    if not ok:
        return {"error": msg, "trace": []}
    allow_images = cfg.get("agent",{}).get("allow_images", True)
    lower = (prompt or "").lower()
    trace = []
    if allow_images and any(k in lower for k in ["generate an image","make an image","draw","illustration","poster","infographic","visualize"]):
        trace.append({"tool":"stableDiffusion", "args":{"prompt": prompt}})
        img = stableDiffusion({"prompt": prompt})
        return {"type":"image","result": img, "trace": trace}
    # default to projectQA
    trace.append({"tool":"projectQA", "args":{"query": prompt}})
    qa = projectQA(prompt)
    return {"type":"qa","result": qa, "trace": trace}
""")

# ---------- FastAPI ----------
w("src/reli_core/api/main.py", """
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional, Dict, Any
from ..pipeline import process_claim
from ..config import load_run_config
from ..agent.agent import route_agent
from ..graph.build_graph import load_graph
from ..graph.graphrag import graphrag_retrieve

app = FastAPI(title="ReliScore API")
CFG = load_run_config()

class ClaimIn(BaseModel):
    claim: str

class AgentIn(BaseModel):
    prompt: str

class GraphIn(BaseModel):
    query: str

@app.get("/healthz")
def healthz():
    return {"ok": True}

@app.post("/qa/factcheck")
def factcheck(inp: ClaimIn):
    return process_claim(inp.claim)

@app.post("/agent/route")
def agent_route(inp: AgentIn):
    return route_agent(inp.prompt, CFG)

@app.post("/qa/graphrag")
def graphrag_endpoint(inp: GraphIn):
    g = load_graph()  # may be None
    # This requires you to pass corpus items client-side; here we return only that no graph is present or not.
    return {"graph_loaded": bool(g)}
""")
print("✅ Week-7 modules written.")


✅ Week-7 modules written.


In [6]:
# @title Update pipeline to include Graph-RAG + Multi-Hop hooks
from pathlib import Path, PurePosixPath
import textwrap

PIPELINE_PATH = Path("src/reli_core/pipeline.py")
PIPELINE_PATH.write_text(textwrap.dedent("""
from typing import Dict, List, Tuple
from statistics import mean
from .safety import sanitize_claim, safety_checks
from .nlp.claim_detect import classify_intent, query_terms
from .nlp.normalize import normalize_title
from .sources.pubmed import search_pubmed
from .sources.europe_pmc import search_eupmc
from .sources.crossref import search_crossref
from .sources.preprints import search_preprints
from .sources.ctgov import search_ctgov
from .sources.fda import search_fda_oncology
from .sources.fulltext import enrich_oa
from .extract.effects import extract_effects, infer_direction
from .aggregate.prevention import pooled_effect
from .reasoner.verdict import decide_verdict
from .ranking import rank_items
from .graph.build_graph import build_entity_graph, persist_graph, load_graph
from .graph.graphrag import graphrag_retrieve
from .multihop.router import route as route_hops
from .multihop.qa import run_multihop
from .config import load_run_config

CFG = load_run_config()

def _counts(items: List[Dict]) -> Tuple[int,int,int]:
    s = sum(1 for x in items if (x.get("direction") or "").lower()=="supports")
    r = sum(1 for x in items if (x.get("direction") or "").lower()=="refutes")
    u = len(items) - s - r
    return s, r, max(0,u)

def _year_span(items: List[Dict]):
    years = [y for y in (x.get("year") for x in items) if isinstance(y,int)]
    return (min(years), max(years)) if years else (None, None)

def _avg_score(items: List[Dict]) -> float:
    return round(mean([x.get("_score",0) for x in items]) if items else 0.0, 2)

def emit_study_points(items: List[Dict]) -> List[Dict]:
    out = []
    for it in items:
        it["title"] = normalize_title(it.get("title"))
        eff = extract_effects(it.get("abstract","")) or extract_effects(it.get("summary","")) or extract_effects(it.get("title",""))
        if eff: it["effect"] = eff
        it["direction"] = infer_direction(it.get("title",""), it.get("abstract",""), it.get("effect"))
        out.append(it)
    return out

def aggregate(items: List[Dict], intent: str) -> Dict:
    agg = {}
    if intent == "prevention":
        agg["prevention"] = pooled_effect(items)
    return agg

def _top_effect_snippets(items: List[Dict], k: int = 5):
    picks = []
    for it in items:
        e = it.get("effect")
        if isinstance(e, dict) and e.get("value"):
            s = {
                "title": (it.get("title") or "")[:180],
                "year": it.get("year"),
                "metric": e.get("metric"),
                "value": e.get("value"),
                "ci_low": e.get("ci_low"),
                "ci_high": e.get("ci_high"),
                "direction": it.get("direction"),
                "source": it.get("source"),
                "id": it.get("id"),
                "oa_url": it.get("oa_url"),
                "_score": it.get("_score",0),
            }
            picks.append(s)
    picks.sort(key=lambda z: z.get("_score",0), reverse=True)
    return picks[:k]

def build_narrative_facts(claim: str, intent: str, ranked_items: List[Dict], verdict: Dict, aggregates: Dict, hop_trace=None) -> Dict:
    s, r, u = _counts(ranked_items)
    y0, y1 = _year_span(ranked_items)
    avg = _avg_score(ranked_items)
    pooled = (aggregates or {}).get("prevention", {}).get("pooled")
    facts = {
        "claim": claim,
        "intent": intent,
        "verdict": verdict,
        "counts": {"supports": s, "refutes": r, "unclear": u, "total": len(ranked_items)},
        "years": {"earliest": y0, "latest": y1},
        "avg_score": avg,
        "pooled_effect": pooled,
        "top_effects": _top_effect_snippets(ranked_items, k=6),
        "top_titles": [(it.get("year"), (it.get("title") or "")[:180]) for it in ranked_items[:8]],
        "hop_trace": hop_trace or [],
    }
    return facts

def _retrieve_corpus(intent: str, q: dict) -> List[Dict]:
    pools: List[Dict] = []
    if intent == "meta_news":
        pools += search_fda_oncology(q.get("fda",""), n=25)
    pools += search_pubmed(q.get("pubmed",""), n=20)
    pools += search_eupmc(q.get("eupmc",""), n=20)
    pools += search_crossref(q.get("crossref",""), n=10)
    pools += search_preprints(q.get("preprint",""), n=8)
    pools += search_ctgov(q.get("ctgov",""), n=8)
    items = [enrich_oa(x) for x in pools]
    return items

def process_claim(claim_raw: str) -> Dict:
    claim = sanitize_claim(claim_raw)
    sflags = safety_checks(claim)
    intent = classify_intent(claim)
    q = query_terms(claim, intent)
    # Step 1: retrieve
    corpus = _retrieve_corpus(intent, q)
    # Step 2: effects + directions
    study_points = emit_study_points(corpus)
    ranked = rank_items(study_points)
    # Build/Load Graph for Graph-RAG
    g = load_graph()
    if g is None:
        try:
            g = build_entity_graph(ranked)
            persist_graph(g)
        except Exception:
            g = None
    # Router for multi-hop
    hop_mode = route_hops(claim)
    hop_trace = []
    if hop_mode == "multi_hop" and g is not None:
        try:
            mh = run_multihop(g, claim, ranked, hop_limit=CFG.get("multihop",{}).get("hop_limit",3), k=CFG.get("graphrag",{}).get("retriever_k",6))
            hop_trace = mh.get("hop_trace",[])
            # augment ranked with any spans as pseudo-items (for visibility)
            for sp in (mh.get("graph_snippets") or [])[:5]:
                ranked.append({
                    "id": sp.get("source_id","graph"),
                    "source": "GraphRAG",
                    "title": sp.get("title",""),
                    "year": sp.get("year"),
                    "tier": "case_series",
                    "applicability": 0.7,
                    "oa_url": sp.get("oa_url",""),
                    "abstract": sp.get("text",""),
                    "direction": "unclear",
                    "_score": 1.0
                })
            ranked = rank_items(ranked)
        except Exception:
            pass
    # Reason
    verdict = decide_verdict(ranked, intent=intent)
    aggregates = {"prevention": pooled_effect(ranked)} if intent == "prevention" else {}
    facts = build_narrative_facts(claim, intent, ranked, verdict, aggregates, hop_trace=hop_trace)
    return {
        "claim": claim,
        "intent": intent,
        "safety": sflags,
        "study_points": ranked,
        "aggregates": aggregates,
        "verdict": verdict,
        "facts": facts,
        "router_debug": {"mode": hop_mode}
    }
""").strip())
print("✅ Pipeline extended with Graph-RAG + Multi-Hop hooks.")


✅ Pipeline extended with Graph-RAG + Multi-Hop hooks.


In [7]:
# @title Visual modules (unchanged) + insights
from pathlib import Path, PurePosixPath
import textwrap

w = lambda p,c: Path(p).write_text(textwrap.dedent(c).strip()+"\n", encoding="utf-8")

w("src/reli_core/viz.py", """
from typing import List, Dict
import matplotlib.pyplot as plt

def beeswarm_like(study_points: List[Dict]):
    xs = [i for i,_ in enumerate(study_points)]
    ys = [s.get("_score",0) for s in study_points]
    fig, ax = plt.subplots(figsize=(6,3))
    ax.scatter(xs, ys, alpha=0.8)
    ax.set_xlabel("Study rank")
    ax.set_ylabel("Score")
    ax.set_title("Evidence Beeswarm (score vs rank)")
    return fig

def forest_plot(study_points: List[Dict]):
    effs, labels = [], []
    for s in study_points[:15]:
        e = s.get("effect")
        if e and e.get("value"):
            effs.append((e["value"], e.get("ci_low"), e.get("ci_high")))
            labels.append((s.get("title") or s.get("id",""))[:40])
    if not effs:
        fig, ax = plt.subplots(figsize=(6,2))
        ax.text(0.5,0.5,"No extractable effects",ha="center",va="center"); ax.axis("off")
        return fig
    fig, ax = plt.subplots(figsize=(7,0.4*len(effs)+1))
    y = list(range(len(effs)))
    vals = [v for v, lo, hi in effs]
    los = [lo if lo else v for v, lo, hi in effs]
    his = [hi if hi else v for v, lo, hi in effs]
    ax.errorbar(vals, y, xerr=[[v-l for v,l in zip(vals,los)], [h-v for v,h in zip(vals,his)]], fmt='o', capsize=3)
    ax.axvline(1.0, linestyle="--")
    ax.set_yticks(y); ax.set_yticklabels(labels); ax.set_xlabel("Effect ratio (RR/OR/HR)")
    ax.invert_yaxis(); ax.set_title("Forest Plot")
    return fig

def timeline(study_points: List[Dict]):
    pts = [(s.get("year"), s.get("_score",0)) for s in study_points if s.get("year")]
    pts = sorted(pts)
    if not pts:
        fig, ax = plt.subplots(figsize=(6,2))
        ax.text(0.5,0.5,"No dates available",ha="center",va="center"); ax.axis("off")
        return fig
    xs = [x for x,_ in pts]; ys = [y for _,y in pts]
    fig, ax = plt.subplots(figsize=(6,3))
    ax.plot(xs, ys, marker="o")
    ax.fill_between(xs, [y*0.9 for y in ys], [y*1.1 for y in ys], alpha=0.2)
    ax.set_xlabel("Year"); ax.set_ylabel("Score"); ax.set_title("Evidence Timeline")
    return fig
""")

w("src/reli_core/viz_answer.py", """
from typing import List, Dict
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D  # noqa: F401
import math

def _split_counts(study_points: List[Dict]):
    by_src = {}
    dir_map = {"supports":0, "refutes":0, "unclear":0}
    for s in study_points:
        src = s.get("source","Other") or "Other"
        by_src[src] = by_src.get(src,0) + 1
        d = (s.get("direction") or "unclear").lower()
        if d not in dir_map: d = "unclear"
        dir_map[d] += 1
    return by_src, dir_map

def reli_graph(study_points: List[Dict]):
    _, dir_map = _split_counts(study_points)
    total = sum(dir_map.values()) or 1
    parts = [dir_map.get("supports",0)/total, dir_map.get("refutes",0)/total, dir_map.get("unclear",0)/total]
    labels = ["Supports","Refutes","Unclear"]
    fig, ax = plt.subplots(figsize=(7,1.8))
    left = 0.0
    for frac, lab in zip(parts, labels):
        ax.barh([0], [frac], left=left)
        if frac > 0: ax.text(left + frac/2, 0, f"{lab} {int(frac*100)}%", va="center", ha="center")
        left += frac
    ax.set_xlim(0,1); ax.set_yticks([])
    ax.set_title("ReliGraph — Evidence Split (Stacked)")
    ax.set_xlabel("Proportion of study points")
    return fig

def trust_compass(study_points: List[Dict]):
    n = max(1, len(study_points))
    supp = sum(1 for s in study_points if (s.get("direction") or "").lower()=="supports")
    consensus = supp / n
    quality = sum(s.get("_score",0) for s in study_points)/n
    fig, ax = plt.subplots(figsize=(5,4))
    ax.scatter([consensus],[quality], s=200)
    ax.set_xlim(0,1); ax.set_ylim(0, max(1, quality*1.5))
    ax.set_xlabel("Consensus (0..1)"); ax.set_ylabel("Quality (score)")
    ax.set_title("Trust Compass")
    return fig

def timeline_ribbon(study_points: List[Dict]):
    pts = [(s.get("year"), s.get("_score",0)) for s in study_points if s.get("year")]
    pts = sorted(pts)
    if not pts:
        fig, ax = plt.subplots(figsize=(6,2))
        ax.text(0.5,0.5,"No dates available",ha="center",va="center"); ax.axis("off")
        return fig
    xs = [x for x,_ in pts]; ys = [y for _,y in pts]
    fig, ax = plt.subplots(figsize=(6,3))
    ax.plot(xs, ys, marker="o")
    ax.fill_between(xs, [y*0.85 for y in ys], [y*1.15 for y in ys], alpha=0.2)
    ax.set_xlabel("Year"); ax.set_ylabel("Score"); ax.set_title("Evidence Timeline Ribbon")
    return fig

def _safe_effect_value(s: Dict):
    e = s.get("effect")
    if isinstance(e, dict): return e.get("value")
    return None

def evidence_galaxy_3d(study_points: List[Dict]):
    xs, ys, zs, cs = [], [], [], []
    for s in study_points:
        y = s.get("year"); ifnot = y is None
        if not y: continue
        xs.append(y); ys.append(s.get("_score",0))
        eff = _safe_effect_value(s)
        try: z = math.log(eff) if isinstance(eff,(int,float)) and eff and eff > 0 else 0.0
        except Exception: z = 0.0
        zs.append(z); cs.append(s.get("applicability",1.0))
    fig = plt.figure(figsize=(7,4)); ax = fig.add_subplot(111, projection='3d')
    if not xs:
        ax.text2D(0.5,0.5,"No 3D points", transform=ax.transAxes); return fig
    ax.scatter(xs, ys, zs, s=40, alpha=0.9, c=cs)
    ax.set_xlabel("Year"); ax.set_ylabel("Quality (score)"); ax.set_zlabel("log(effect)")
    ax.set_title("Evidence Galaxy (3D)")
    return fig
""")

w("src/reli_core/viz_insights.py", """
from typing import List, Dict, Tuple, Optional
import math
from statistics import mean

def _counts(study_points: List[Dict]) -> Dict[str,int]:
    supp = sum(1 for s in study_points if (s.get("direction") or "").lower()=="supports")
    refu = sum(1 for s in study_points if (s.get("direction") or "").lower()=="refutes")
    uncl = len(study_points) - supp - refu
    return {"supports": supp, "refutes": refu, "unclear": max(0, uncl)}

def _years_and_scores(study_points: List[Dict]) -> Tuple[list, list]:
    pts = [(s.get("year"), s.get("_score",0)) for s in study_points if s.get("year")]
    pts = sorted(pts)
    if not pts: return [], []
    xs = [x for x,_ in pts]; ys = [y for _,y in pts]
    return xs, ys

def _slope(xs: list, ys: list) -> Optional[float]:
    n = len(xs)
    if n < 2: return None
    xbar = sum(xs)/n; ybar = sum(ys)/n
    num = sum((x-xbar)*(y-ybar) for x,y in zip(xs,ys))
    den = sum((x-xbar)**2 for x in xs) or 1e-9
    return num/den

def _effects(study_points: List[Dict]):
    vals = []
    for s in study_points:
        e = s.get("effect")
        if isinstance(e, dict) and isinstance(e.get("value"), (int,float)) and e["value"]>0:
            vals.append(e["value"])
    return vals

def insights(study_points: List[Dict], verdict: Dict) -> Dict:
    n = len(study_points); counts = _counts(study_points)
    consensus = (counts["supports"] / n) if n>0 else 0.0
    xs, ys = _years_and_scores(study_points)
    trend = _slope(xs, ys)
    effs = _effects(study_points); has_fx = len(effs) > 0
    if has_fx:
        log_vals = [math.log(v) for v in effs if v>0]
        med_log = sorted(log_vals)[len(log_vals)//2] if log_vals else 0.0
    else:
        med_log = 0.0
    avg_score = mean([s.get("_score",0) for s in study_points]) if n else 0.0
    label = (verdict or {}).get("label","Unclear")
    return {"n": n, "supports": counts["supports"], "refutes": counts["refutes"], "unclear": counts["unclear"],
            "consensus": consensus, "avg_score": avg_score, "trend_slope": trend, "has_effects": has_fx,
            "median_log_effect": med_log, "label": label}

def captions_from_insights(ins: Dict) -> Dict[str, str]:
    if ins["n"] == 0: reli_take = "No evidence found to split."
    else:
        if ins["supports"] > ins["refutes"]:
            reli_take = f"Evidence leans supportive ({ins['supports']} vs {ins['refutes']})."
        elif ins["refutes"] > ins["supports"]:
            reli_take = f"Evidence leans against the claim ({ins['refutes']} vs {ins['supports']})."
        else:
            reli_take = "Support and refutation are balanced."
    reli_why = "A larger supportive portion raises confidence; a larger refuting portion lowers it."
    reli_how = "Segments show proportions that support, refute, or are unclear."

    tc_take = f"Consensus ≈ {ins['consensus']:.2f} with average quality ≈ {ins['avg_score']:.2f}."
    if ins["label"] == "Supported": tc_take += " Studies cluster toward agreement."
    elif ins["label"] == "Contradicted": tc_take += " Studies cluster toward disagreement."
    elif ins["label"] == "Mixed": tc_take += " Agreement is split."

    if ins["trend_slope"] is None: tl_take = "Not enough dated studies to assess trends."
    elif ins["trend_slope"] > 0.01: tl_take = "Evidence quality trends upward over time."
    elif ins["trend_slope"] < -0.01: tl_take = "Evidence quality trends downward over time."
    else: tl_take = "Evidence quality is relatively stable over time."
    tl_how = "Line shows average study score by year; band shows a simple range."
    tl_why = "Upward trends suggest growing confidence; downward trends suggest weakening support."

    if not ins["has_effects"]: eg_take = "Few studies reported effect sizes; points may cluster near zero."
    else:
        if ins["median_log_effect"] < 0: eg_take = "Reported effects tilt toward risk reductions (ratios < 1)."
        elif ins["median_log_effect"] > 0: eg_take = "Reported effects tilt toward risk increases (ratios > 1)."
        else: eg_take = "Reported effects cluster near no change."
    eg_how = "x=year, y=quality, z≈log(effect)."
    eg_why = "Stronger departures from zero hint at stronger effects."

    return {
        "reli_graph": f"**Takeaway:** {reli_take}\\n*How to read:* {reli_how}\\n*Why it matters:* {reli_why}",
        "trust_compass": f"**Takeaway:** {tc_take}\\n*How to read:* Right = agreement, Up = quality.\\n*Why it matters:* Position reflects how aligned and strong the evidence is.",
        "timeline_ribbon": f"**Takeaway:** {tl_take}\\n*How to read:* {tl_how}\\n*Why it matters:* {tl_why}",
        "evidence_galaxy": f"**Takeaway:** {eg_take}\\n*How to read:* {eg_how}\\n*Why it matters:* {eg_why}",
    }
""")
print("✅ Visual modules ready.")


✅ Visual modules ready.


In [8]:
# @title Streamlit app (Fact-Check + Harvest + Agent & SD Lab)
from pathlib import Path, PurePosixPath
import textwrap

Path("streamlit_app.py").write_text(textwrap.dedent("""
import streamlit as st
from src.reli_core.pipeline import process_claim
from src.reli_core.topic_harvest import harvest_topic
from src.reli_core.viz_answer import reli_graph, trust_compass, timeline_ribbon, evidence_galaxy_3d
from src.reli_core.templates import build_citations_markdown, build_reasons_markdown
from src.reli_core.safety import ADVICE_DISCLAIMER
from src.reli_core.writer.lay import write_explanation
from src.reli_core.viz_insights import insights, captions_from_insights
from src.reli_core.graph.build_graph import load_graph
from src.reli_core.graph.graphrag import graphrag_retrieve
from src.reli_core.viz_graph import draw_neighborhood
from src.reli_core.config import load_run_config
from src.reli_core.agent.agent import route_agent
from src.reli_core.sources.pubmed import search_pubmed

import os
st.set_page_config(page_title="ReliScore — Medical Fact-Check & Evidence Explorer", layout="wide")
st.title("🧭 ReliScore — Fact-Check & Harvest (+ Agent & SD)")

CFG = load_run_config()
with st.sidebar:
    st.header("Settings")
    use_graphrag = st.checkbox("Use Graph-RAG enrichment", value=True)
    enable_agent = st.checkbox("Enable Agent (Week-7)", value=False)
    top_k = st.slider("Graph-RAG top-k", 3, 12, CFG.get("graphrag",{}).get("retriever_k",6))
    hop_limit = st.slider("Multi-Hop limit", 1, 5, CFG.get("multihop",{}).get("hop_limit",3))
    st.caption("Tip: Enable Agent to route between QA and Stable Diffusion (image) tools.")

tabs = st.tabs(["✅ Fact-Check", "🌐 Harvest", "🧪 Agent & SD Lab"])

with tabs[0]:
    st.subheader("Fact-Check a Claim")
    claim = st.text_area("Enter a medical claim (cancer-focused is best):", height=120, placeholder="e.g., Are there real breakthroughs in cancer treatment?")
    audit = st.checkbox("Evidence Audit Mode (show retrieval signals)", value=False)
    if st.button("Check"):
        if enable_agent:
            with st.spinner("Agent routing..."):
                ag = route_agent(claim, CFG)
            if ag.get("type") == "qa":
                out = ag["result"]
            else:
                st.warning("Agent chose image generation; switch to the Agent & SD Lab tab to see images.")
                out = process_claim(claim)
        else:
            with st.spinner("Searching open-access evidence..."):
                out = process_claim(claim)

        verdict = out["verdict"]; label = verdict.get("label","Unclear"); conf = verdict.get("confidence",0.0)
        st.markdown(f"## Verdict: **{label}**")
        st.markdown(f"**Score:** {conf:.2f}")

        st.markdown(write_explanation(claim, out["verdict"], out["aggregates"], out["study_points"], out["intent"], facts=out.get("facts")))
        st.markdown(f"> {ADVICE_DISCLAIMER}")
        st.divider()

        citations_md_all = build_citations_markdown(out["study_points"])
        citations_count = len(out["study_points"])
        reasons_md = build_reasons_markdown(out["verdict"])
        reasons_count = len(out["verdict"].get("reasons", [])) if out.get("verdict") else 0

        colA, colB = st.columns(2)
        with colA:
            with st.expander(f"📚 References ({citations_count}) — open-access links that informed this result", expanded=False):
                show_all = st.checkbox("Show full list (otherwise first 20)", value=False, key="refs_show_all")
                md = build_citations_markdown(out["study_points"], limit=None if show_all else 20)
                st.markdown(md)
                st.download_button("Download references (.txt)", data=citations_md_all, file_name="references.txt")
        with colB:
            if reasons_count > 0:
                with st.expander(f"🧠 Why we think so ({reasons_count}) — pivotal study signals", expanded=False):
                    st.markdown(reasons_md)
                    st.download_button("Download reasons (.txt)", data=reasons_md, file_name="reasons.txt")

        if audit:
            st.divider()
            st.markdown("### Evidence Audit")
            st.json({
                "intent": out["intent"],
                "aggregates": out["aggregates"],
                "verdict": out["verdict"],
                "facts": out.get("facts"),
                "router_debug": out.get("router_debug"),
                "top_studies_preview": [
                    {k: v for k, v in s.items() if k in ("source","year","tier","title","oa_url","_score","direction","effect")}
                    for s in out["study_points"][:12]
                ]
            })

        st.divider()
        caps = captions_from_insights(insights(out["study_points"], out["verdict"]))
        c1, c2 = st.columns(2)
        with c1:
            st.pyplot(reli_graph(out["study_points"])); st.caption(caps["reli_graph"])
            st.pyplot(timeline_ribbon(out["study_points"])); st.caption(caps["timeline_ribbon"])
        with c2:
            st.pyplot(trust_compass(out["study_points"])); st.caption(caps["trust_compass"])
            st.pyplot(evidence_galaxy_3d(out["study_points"])); st.caption(caps["evidence_galaxy"])

        if use_graphrag:
            st.divider()
            st.markdown("### Graph Neighborhood (Week-6)")
            g = load_graph()
            if g is None:
                st.info("Graph not built yet — it will auto-build on first run.")
            try:
                spans = graphrag_retrieve(g, claim, out["study_points"], k=top_k, hops=2)
                img_path = "data_cache/neighborhood.png"
                draw_neighborhood(spans.get("neighborhood",{}), img_path)
                with st.expander("Show Graph Neighborhood", expanded=False):
                    st.image(img_path)
            except Exception as e:
                st.warning(f"Graph-RAG view not available: {e}")

with tabs[1]:
    st.subheader("Harvest a Topic")
    kw = st.text_input("Topic keyword (e.g., 'mammography screening breast cancer')", "")
    if st.button("Harvest"):
        with st.spinner("Gathering sources..."):
            h = harvest_topic(kw)
        st.success(f"Found {len(h['items'])} items")
        for it in h["items"]:
            with st.expander(f"{it.get('title','Untitled')}"):
                st.write({
                    "source": it.get("source"),
                    "year": it.get("year"),
                    "tier": it.get("tier"),
                    "oa_url": it.get("oa_url"),
                    "id": it.get("id"),
                    "summary": it.get("summary"),
                })

with tabs[2]:
    st.subheader("Agent & Stable Diffusion Lab (Week-7)")
    prompt = st.text_area("Enter a prompt. The agent will route to QA (ReliScore) or to Stable Diffusion.", height=120)
    if st.button("Run Agent"):
        with st.spinner("Agent thinking..."):
            res = route_agent(prompt, CFG)
        if res.get("error"):
            st.error(res["error"])
        elif res.get("type") == "qa":
            out = res["result"]
            st.markdown(f"### Agent chose: QA")
            st.markdown(f"**Verdict:** {out['verdict'].get('label')}  |  **Score:** {out['verdict'].get('confidence')}")
            st.markdown(write_explanation(prompt, out["verdict"], out["aggregates"], out["study_points"], out["intent"], facts=out.get("facts")))
            with st.expander("Agent Trace", expanded=False):
                st.json(res.get("trace"))
        elif res.get("type") == "image":
            st.markdown("### Agent chose: Stable Diffusion")
            img = res["result"]["image"]
            meta = res["result"]["metadata"]
            st.image(img, caption=f"SD output — {meta}")
            with st.expander("Agent Trace", expanded=False):
                st.json(res.get("trace"))
        else:
            st.info("No action taken.")
"""))
print("✅ Streamlit app updated.")


✅ Streamlit app updated.


In [11]:
# @title Launch FastAPI (Uvicorn) + ngrok (optional)
import subprocess, sys, threading, time, os
from pyngrok import ngrok
import nest_asyncio
nest_asyncio.apply()

API_PORT = 8000
if os.environ.get("NGROK_AUTHTOKEN"):
    ngrok.set_auth_token(os.environ["NGROK_AUTHTOKEN"])
api_url = ngrok.connect(API_PORT, "http").public_url
print("FastAPI Public URL:", api_url)

def run_api():
    cmd = [sys.executable, "-m", "uvicorn", "src.reli_core.api.main:app", "--host", "0.0.0.0", "--port", str(API_PORT)]
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    for line in proc.stdout:
        print("[API] ", line, end="")

threading.Thread(target=run_api, daemon=True).start()
time.sleep(3)
print("FastAPI starting on", f"http://localhost:{API_PORT}", "→", api_url)


FastAPI Public URL: https://rosaura-expenseless-averagely.ngrok-free.dev
FastAPI starting on http://localhost:8000 → https://rosaura-expenseless-averagely.ngrok-free.dev


In [12]:
# @title Launch Streamlit with ngrok (headless)
import subprocess, sys, threading, time, os
from pyngrok import ngrok
import nest_asyncio
nest_asyncio.apply()

PORT = 8501
if os.environ.get("NGROK_AUTHTOKEN"):
    ngrok.set_auth_token(os.environ["NGROK_AUTHTOKEN"])
public_url = ngrok.connect(PORT, "http").public_url
print("Streamlit Public URL:", public_url)

def run_streamlit():
    cmd = [sys.executable, "-m", "streamlit", "run", "streamlit_app.py",
           "--server.port", str(PORT), "--server.address", "0.0.0.0"]
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    for line in proc.stdout:
        print("[ST] ", line, end="")

threading.Thread(target=run_streamlit, daemon=True).start()
time.sleep(3)
print("Streamlit starting on", f"http://localhost:{PORT}", "→", public_url)


Streamlit Public URL: https://rosaura-expenseless-averagely.ngrok-free.dev
[ST]  
[ST]  Collecting usage statistics. To deactivate, set browser.gatherUsageStats to false.
[ST]  
[ST]  2025-10-23 23:23:06.004 Port 8501 is already in use
[API]    deprecate("Transformer2DModelOutput", "1.0.0", deprecation_message)
[API]  INFO:     Started server process [13585]
[API]  INFO:     Waiting for application startup.
[API]  INFO:     Application startup complete.
[API]  ERROR:    [Errno 98] error while attempting to bind on address ('0.0.0.0', 8000): [errno 98] address already in use
[API]  INFO:     Waiting for application shutdown.
[API]  INFO:     Application shutdown complete.
Streamlit starting on http://localhost:8501 → https://rosaura-expenseless-averagely.ngrok-free.dev
