# Genex Meta-Study Researcher (Fixed)

This notebook screens PDFs for relevance, extracts structured findings, and enables grounded Q&A with citations.


In [15]:

# 0) Imports
import os, re, json, math
from typing import Any, Dict, List, Optional
import uuid

import fitz  # PyMuPDF
from pydantic import BaseModel, Field

from google.adk.agents import LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.models.lite_llm import LiteLlm

try:
    from google.genai.types import Content, Part
    _HAS_GENAI_TYPES = True
except Exception:
    Content, Part = None, None
    _HAS_GENAI_TYPES = False


In [16]:

# 1) Settings
PAPERS_DIR = r"C:/Users/T490/Downloads/Genetics-Dashboard/docs/papers/serine_deficiency_papers/"
MAX_PAGES_PER_PDF = 25
MODEL = "openai/gpt-4o-mini"

APP_NAME = "genex_meta_study"
USER_ID = "genex_user"

llm = LiteLlm(model=MODEL)
session_service = InMemorySessionService()


In [17]:

# 2) Schemas
class PaperMetadata(BaseModel):
    title: str
    authors: List[str] = Field(default_factory=list)
    year: Optional[int] = None
    journal: Optional[str] = None
    doi: Optional[str] = None

class RelevanceDecision(BaseModel):
    paper_id: str
    title: str
    is_relevant: bool = False
    relevance_score: float = 0.0
    reason: str = ""
    matched_terms: List[str] = Field(default_factory=list)

class ExtractedFinding(BaseModel):
    name: str
    category: str
    polarity: str
    snippet: str
    section: str

class PaperExtraction(BaseModel):
    paper_id: str
    title: str
    authors: List[str] = Field(default_factory=list)
    year: Optional[int] = None
    journal: Optional[str] = None
    doi: Optional[str] = None
    condition: str = ""
    relevance_score: float = 0.0
    summary: str = ""
    key_takeaways: List[str] = Field(default_factory=list)
    findings: List[ExtractedFinding] = Field(default_factory=list)


In [18]:

# 3) Helpers (robust across ADK versions)
def list_pdfs(folder: str) -> List[str]:
    return sorted(
        os.path.join(folder, f)
        for f in os.listdir(folder)
        if f.lower().endswith(".pdf")
    )

def pdf_to_text(path: str, max_pages: int = 20) -> str:
    doc = fitz.open(path)
    out = []
    for i in range(min(len(doc), max_pages)):
        out.append(doc.load_page(i).get_text("text"))
    doc.close()
    return "\n".join(out)

def safe_json_extract(text: str) -> Optional[Dict[str, Any]]:
    if not text or not isinstance(text, str):
        return None
    text = text.strip()
    try:
        obj = json.loads(text)
        return obj if isinstance(obj, dict) else None
    except Exception:
        pass

    fence = re.search(r"```json\s*(\{[\s\S]*?\})\s*```", text, flags=re.IGNORECASE)
    if fence:
        try:
            obj = json.loads(fence.group(1))
            return obj if isinstance(obj, dict) else None
        except Exception:
            pass

    m = re.search(r"(\{[\s\S]*\})", text)
    if m:
        try:
            obj = json.loads(m.group(1))
            return obj if isinstance(obj, dict) else None
        except Exception:
            pass
    return None

def _make_message(text: str):
    if _HAS_GENAI_TYPES and Content is not None and Part is not None:
        return Content(parts=[Part(text=text)])
    return text

async def collect_events(async_gen) -> List[Any]:
    events = []
    async for e in async_gen:
        events.append(e)
    return events

async def run_runner(runner: Runner, user_id: str, session_id: str, text: str) -> List[Any]:
    res = runner.run_async(user_id=user_id, session_id=session_id, new_message=_make_message(text))
    if hasattr(res, "__aiter__"):
        return await collect_events(res)
    out = await res
    if isinstance(out, list):
        return out
    return [out]

def _event_to_text(e: Any) -> str:
    if e is None:
        return ""
    if isinstance(e, str):
        return e
    if isinstance(e, dict):
        for k in ("text", "output_text"):
            if isinstance(e.get(k), str):
                return e[k]
        content = e.get("content")
        if isinstance(content, dict):
            parts = content.get("parts") or []
            texts = []
            for p in parts:
                if isinstance(p, dict) and isinstance(p.get("text"), str):
                    texts.append(p["text"])
            return "\n".join(texts)
    for attr in ("text", "output_text"):
        if hasattr(e, attr):
            v = getattr(e, attr)
            if isinstance(v, str):
                return v
    if hasattr(e, "content"):
        c = getattr(e, "content")
        if hasattr(c, "parts"):
            texts = []
            for p in getattr(c, "parts") or []:
                if hasattr(p, "text") and isinstance(getattr(p, "text"), str):
                    texts.append(getattr(p, "text"))
                elif isinstance(p, dict) and isinstance(p.get("text"), str):
                    texts.append(p["text"])
            if texts:
                return "\n".join(texts)
        if isinstance(c, str):
            return c
    return ""

def extract_last_text(events: List[Any]) -> str:
    for e in reversed(events or []):
        t = _event_to_text(e).strip()
        if t:
            return t
    return ""


In [19]:

# 4) Agents

METADATA_SYSTEM = '''
You extract bibliographic metadata from the first pages of a biomedical paper.

Return ONE valid JSON object ONLY with EXACTLY these keys:
{
  "title": "...",
  "authors": ["Last, First", "..."],
  "year": null,
  "journal": null,
  "doi": null
}

Rules:
- If unsure, use null (or [] for authors).
- Do not hallucinate.
'''

RELEVANCE_SYSTEM = '''
You decide whether a paper is relevant to the CONDITION.

Return ONE valid JSON object ONLY with EXACTLY these keys:
{
  "paper_id": "...",
  "title": "...",
  "is_relevant": false,
  "relevance_score": 0.0,
  "reason": "...",
  "matched_terms": ["...", "..."]
}

Rules:
- Use ONLY the provided text.
- relevance_score is 0..1.
- If uncertain, be conservative.
'''

EXTRACTION_SYSTEM = '''
You extract structured findings from a paper for the CONDITION.

Return ONE valid JSON object ONLY with EXACTLY these keys:
{
  "paper_id": "...",
  "title": "...",
  "year": null,
  "journal": null,
  "condition": "...",
  "relevance_score": 0.0,
  "summary": "...",
  "key_takeaways": ["...", "..."],
  "findings": [
    {"name":"...", "category":"definition|gene|symptom|treatment|outcome|population|other",
     "polarity":"supports|refutes|mixed|unclear",
     "snippet":"<=2 sentences", "section":"methods|results|discussion|abstract|unknown"}
  ]
}

Rules:
- Use ONLY the provided text.
- Do NOT invent details.
'''

QA_SYSTEM = '''
You are a biomedical literature Q&A agent.

You will be given:
- a user question
- evidence items (snippets) extracted from papers, each with citation fields

Your job:
- Answer ONLY using the evidence items.
- If the evidence does not contain an answer, say: "Not found in the provided papers."
- Use inline citations like [1], [2] corresponding to evidence item IDs.

Output Markdown:
1) Answer
2) Evidence (bullets with snippet + citation number)
3) References (numbered: title, authors, journal, year; include DOI if present)
Do not invent papers or details.
'''

metadata_agent = LlmAgent(name="PaperMetadataExtractor", model=llm, instruction=METADATA_SYSTEM)
relevance_agent = LlmAgent(name="PaperRelevanceScreener", model=llm, instruction=RELEVANCE_SYSTEM)
extraction_agent = LlmAgent(name="PaperExtractor", model=llm, instruction=EXTRACTION_SYSTEM)
qa_agent = LlmAgent(name="PaperQAAgent", model=llm, instruction=QA_SYSTEM)


In [20]:

# 5) Pipeline

def infer_meta_from_text(paper_id: str, path: str, raw_text: str) -> Dict[str, Any]:
    title = os.path.splitext(os.path.basename(path))[0]
    year = None
    journal = None
    m = re.search(r"(19\d{2}|20\d{2})", raw_text[:4000])
    if m:
        try:
            year = int(m.group(1))
        except Exception:
            year = None
    return {"paper_id": paper_id, "path": path, "title": title, "year": year, "journal": journal, "authors": [], "doi": None}

async def extract_metadata(paper_id: str, path: str) -> Dict[str, Any]:
    meta_text = pdf_to_text(path, max_pages=2)[:60000]
    meta = infer_meta_from_text(paper_id, path, meta_text)

    session_id = f"meta-{paper_id}"
    await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=session_id)
    runner = Runner(app_name=APP_NAME, agent=metadata_agent, session_service=session_service)

    msg = f"PAPER_ID: {paper_id}\nFILENAME: {os.path.basename(path)}\n\nTEXT (first pages):\n{meta_text}"
    events = await run_runner(runner, user_id=USER_ID, session_id=session_id, text=msg)
    meta_json = safe_json_extract(extract_last_text(events))

    if isinstance(meta_json, dict):
        meta["title"] = meta_json.get("title") or meta["title"]
        meta["journal"] = meta_json.get("journal") or meta["journal"]
        meta["year"] = meta_json.get("year") or meta["year"]
        meta["doi"] = meta_json.get("doi") or meta.get("doi")
        meta["authors"] = meta_json.get("authors") or meta.get("authors", [])
    return meta

async def screen_relevance(condition: str, paper_id: str, title: str, text: str) -> RelevanceDecision:
    session_id = f"rel-{paper_id}"
    await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=session_id)
    runner = Runner(app_name=APP_NAME, agent=relevance_agent, session_service=session_service)

    prompt = f"CONDITION: {condition}\nPAPER_ID: {paper_id}\nTITLE: {title}\n\nTEXT:\n{text[:120000]}"
    events = await run_runner(runner, user_id=USER_ID, session_id=session_id, text=prompt)
    obj = safe_json_extract(extract_last_text(events)) or {}
    obj.setdefault("paper_id", paper_id)
    obj.setdefault("title", title)
    return RelevanceDecision(**obj)

async def extract_paper(condition: str, paper_id: str, meta: Dict[str, Any], text: str, relevance_score: float) -> PaperExtraction:
    session_id = f"ext-{paper_id}"
    await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=session_id)
    runner = Runner(app_name=APP_NAME, agent=extraction_agent, session_service=session_service)

    prompt = (
        f"CONDITION: {condition}\nPAPER_ID: {paper_id}\n"
        f"TITLE: {meta.get('title')}\nYEAR: {meta.get('year')}\nJOURNAL: {meta.get('journal')}\n"
        f"AUTHORS: {', '.join(meta.get('authors') or [])}\nDOI: {meta.get('doi')}\n\n"
        f"TEXT:\n{text[:140000]}"
    )
    events = await run_runner(runner, user_id=USER_ID, session_id=session_id, text=prompt)
    obj = safe_json_extract(extract_last_text(events)) or {}

    obj["paper_id"] = paper_id
    obj["title"] = meta.get("title") or obj.get("title") or ""
    obj["authors"] = meta.get("authors") or obj.get("authors") or []
    obj["year"] = meta.get("year") or obj.get("year")
    obj["journal"] = meta.get("journal") or obj.get("journal")
    obj["doi"] = meta.get("doi") or obj.get("doi")
    obj["condition"] = condition
    obj["relevance_score"] = float(relevance_score or obj.get("relevance_score") or 0.0)

    if not isinstance(obj.get("findings"), list):
        obj["findings"] = []
    return PaperExtraction(**obj)

async def run_condition_folder(condition: str, folder: str) -> Dict[str, Any]:
    pdfs = list_pdfs(folder)
    papers = []
    papers_screened = 0
    papers_relevant = 0

    for idx, path in enumerate(pdfs, start=1):
        paper_id = f"paper_{idx:03d}"
        raw_text = pdf_to_text(path, max_pages=MAX_PAGES_PER_PDF)
        papers_screened += 1

        meta = await extract_metadata(paper_id, path)
        rel = await screen_relevance(condition, paper_id, meta.get("title") or paper_id, raw_text)

        if (not rel.is_relevant) or (rel.relevance_score < 0.20):
            continue

        papers_relevant += 1
        extraction = await extract_paper(condition, paper_id, meta, raw_text, rel.relevance_score)
        papers.append(extraction.model_dump())

    md = [f"# Meta-study report: {condition}", "", f"- Papers screened: **{papers_screened}**", f"- Papers relevant: **{papers_relevant}**", ""]
    for p in papers:
        authors = ", ".join(p.get("authors") or []) or "UNKNOWN"
        j = p.get("journal") or "UNKNOWN JOURNAL"
        y = p.get("year") or "n.d."
        md += [f"## {p.get('title','(untitled)')}", f"**Authors:** {authors}  ", f"**Journal/Year:** {j} ({y})  "]
        if p.get("doi"):
            md.append(f"**DOI:** {p.get('doi')}  ")
        md.append("")
        if p.get("summary"):
            md += [p["summary"], ""]
        if p.get("findings"):
            md.append("**Findings (top 10):**")
            for f in p["findings"][:10]:
                md.append(f"- **{f.get('category')}**: {f.get('name')} ({f.get('polarity')}) — {f.get('snippet')}")
            md.append("")

    return {"condition": condition, "papers_screened": papers_screened, "papers_relevant": papers_relevant, "papers": papers, "report_markdown": "\n".join(md)}


In [21]:

# 6) Q&A

def build_evidence_index(papers: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    rows = []
    for p in papers:
        for f in (p.get("findings") or []):
            rows.append({
                "paper_id": p.get("paper_id"),
                "title": p.get("title"),
                "authors": p.get("authors") or [],
                "year": p.get("year"),
                "journal": p.get("journal"),
                "doi": p.get("doi"),
                "condition": p.get("condition"),
                "relevance_score": p.get("relevance_score", 0.0),
                "name": f.get("name"),
                "category": f.get("category"),
                "polarity": f.get("polarity"),
                "section": f.get("section"),
                "snippet": f.get("snippet"),
            })
    return rows

def _tokenize(s: str) -> List[str]:
    return re.findall(r"[a-z0-9]+", (s or "").lower())

def retrieve_evidence(question: str, evidence_rows: List[Dict[str, Any]], top_k: int = 14) -> List[Dict[str, Any]]:
    qtok = set(_tokenize(question))
    if not qtok:
        return evidence_rows[:top_k]
    scored = []
    for r in evidence_rows:
        blob = " ".join([str(r.get("name","")), str(r.get("category","")), str(r.get("snippet","")), str(r.get("title","")), str(r.get("journal",""))]).lower()
        btok = set(_tokenize(blob))
        overlap = len(qtok & btok)
        boost = 0.0
        if r.get("polarity") == "supports":
            boost += 0.25
        boost += 0.15 * float(r.get("relevance_score") or 0.0)
        score = overlap + boost
        if overlap > 0:
            scored.append((score, r))
    scored.sort(key=lambda x: x[0], reverse=True)
    return [r for _, r in scored[:top_k]]

async def ask_papers(question: str, result: Dict[str, Any], top_k: int = 14) -> str:
    papers = result.get("papers") or []
    evidence_rows = build_evidence_index(papers)
    top = retrieve_evidence(question, evidence_rows, top_k=top_k)

    lines = []
    for i, r in enumerate(top, start=1):
        authors = ", ".join(r.get("authors") or [])
        lines.append(
            f"EVIDENCE_ITEM [{i}]\n"
            f"title: {r.get('title')}\n"
            f"authors: {authors if authors else 'UNKNOWN'}\n"
            f"journal: {r.get('journal')}\n"
            f"year: {r.get('year')}\n"
            f"doi: {r.get('doi')}\n"
            f"snippet: {r.get('snippet')}\n"
        )
    qa_context = "\n".join(lines) if lines else "NO EVIDENCE ITEMS RETRIEVED."

    session_id = f"qa-{uuid.uuid4().hex[:8]}"
    await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=session_id)
    runner = Runner(app_name=APP_NAME, agent=qa_agent, session_service=session_service)

    prompt = f"USER QUESTION:\n{question}\n\n{qa_context}"
    events = await run_runner(runner, user_id=USER_ID, session_id=session_id, text=prompt)
    return extract_last_text(events)


In [22]:

# 7) Example run (uncomment)

# If your notebook doesn't support top-level `await`, run:
# import nest_asyncio; nest_asyncio.apply()

# result = await run_condition_folder("L-serine deficiency disorder", PAPERS_DIR)
# print(result["papers_screened"], result["papers_relevant"])
# print(result["report_markdown"][:1200])
# print(await ask_papers("Are there different types of serine deficiency?", result))


In [23]:
import nest_asyncio; nest_asyncio.apply()

In [24]:
result = await run_condition_folder("L-serine deficiency disorder", PAPERS_DIR)

In [25]:
print(result["papers_screened"], result["papers_relevant"])

11 10


In [26]:
print(result["report_markdown"][:1200])

# Meta-study report: L-serine deficiency disorder

- Papers screened: **11**
- Papers relevant: **10**

## L-Serine in disease and development
**Authors:** De Koning, Tom J., Snell, Keith, Duran, Marinus, Berger, Ruud, Poll-The, Bwee-Tien, Surtees, Robert  
**Journal/Year:** Biochemical Journal (2003)  

This review discusses the role of L-serine in metabolism and its importance in the development and function of the central nervous system, emphasizing its conditional essentiality during specific developmental stages and in various diseases, including L-serine biosynthesis disorders.

**Findings (top 10):**
- **definition**: L-serine's role in the synthesis of neuromodulators (supports) — The formation of glycine from L-serine is an important reaction; not only does it result in the transfer of a one-carbon group to folates, but also the glycine itself has important functions, particularly in the central nervous system.
- **outcome**: Impact of L-serine deficiency on the central nervou

In [13]:
print(await ask_papers("Are there different types of serine deficiency?", result))

1) Yes, there are different types of serine deficiency, classified under serine deficiency disorders which include a spectrum of disease from lethal prenatal-onset conditions to those with infantile, juvenile, or adult onset phenotypes.

2) Evidence:
   - "Serine deficiency disorders include a spectrum of disease ranging from lethal prenatal-onset Neu-Laxova syndrome to serine deficiency with infantile, juvenile, or adult onset." [4]
   - "The patients presented with ichthyosis and juvenile-onset neuropathy, demonstrating a mild phenotype of serine deficiency disorder." [11]

3) References:
   1. "Serine Deficiency Disorders," van der Crabben, Saskia N, de Koning, Tom J, GeneReviews, 2023. DOI: None.
   2. "Juvenile-onset PSAT1-related neuropathy: A milder phenotype of serine deﬁciency disorder," Shen, Yu, et al., Frontiers in Genetics, 2022. DOI: 10.3389/fgene.2022.949038.


In [27]:
print(await ask_papers("what are the genes linked to serine deficiency disorder?", result))


1) The genes linked to serine deficiency disorder include PSAT1 and PSPH. Mutations in these genes have been associated with various phenotypes of serine deficiency disorders, such as juvenile-onset neuropathy and Neu-Laxova syndrome.

2) Evidence:
   - "Both patients presented with ichthyosis and juvenile-onset neuropathy attributed to the homozygous mutation c.43G > C (p.A15P) in the PSAT1 gene" [1].
   - "We identified six families with PSAT1 mutations fully segregating with the disease, linking this gene to NLS" [6].
   - "A homozygous frameshift mutation in PSPH was discovered in another family, contributing to the pathology of NLS" [7].
   - "The study provides evidence that NLS is genetically heterogeneous, involving multiple mutations in genes encoding L-serine biosynthesis enzymes" [9].

3) References:
   1. Juvenile-onset PSAT1-related neuropathy: A milder phenotype of serine deficiency disorder. Shen, Yu et al. Frontiers in Genetics, 2022. DOI: 10.3389/fgene.2022.949038.
   