Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,55 @@
## 🚀 Experimental Extension: Multi-Step Query Orchestration

This fork extends the original Reactome MCP server by introducing a lightweight orchestration layer for multi-step biological query execution.

> **Note:** The core MCP implementation remains unchanged. All extensions are implemented as an external orchestration layer within the `orchestrator/` directory of this repository.

### What the Orchestration Layer Adds

| Module | Role |
|---|---|
| `orchestrator/planner.py` | Converts a natural-language query → structured JSON execution plan |
| `orchestrator/executor.py` | Executes plan steps sequentially or in parallel; resolves `$step.field` references between steps |
| `orchestrator/mock_adapter.py` | Simulates Reactome MCP tools by name (swap in the real client with zero executor changes) |
| `orchestrator/demo.py` | Runs a full end-to-end demonstration — no API key or running server required |

### Supported Query Patterns

```
Compare TP53 and BRCA1 → parallel enrichment analysis of both genes
Find apoptosis pathways for BCL2 → 3-step sequential chain (search → analyse → pathway detail)
Analyse EGFR → single-step pathway enrichment
Search PTEN signaling → single-step full-text search
```

### Quick Start

```bash
# No extra dependencies — uses Python standard library only
cd orchestrator
python demo.py
```

### Architecture

```
User Query
┌─────────────┐ structured plan (JSON) ┌──────────────┐
│ planner.py │ ──────────────────────────────► │ executor.py │
└─────────────┘ └──────┬───────┘
│ tool call (name + input)
┌────────────────────┐
│ mock_adapter.py │
│ (→ real MCP client│
│ in production) │
└────────────────────┘
```

---

# reactome-mcp

An [MCP (Model Context Protocol)](https://modelcontextprotocol.io/) server that exposes the [Reactome](https://reactome.org/) pathway knowledgebase to AI assistants. It wraps Reactome's Content Service and Analysis Service REST APIs, giving LLMs the ability to search, browse, analyse, and export biological pathway data through natural language.
Expand Down
Binary file added orchestrator/__pycache__/executor.cpython-313.pyc
Binary file not shown.
Binary file not shown.
Binary file added orchestrator/__pycache__/planner.cpython-313.pyc
Binary file not shown.
139 changes: 139 additions & 0 deletions orchestrator/demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""
Real demonstration of the Reactome MCP Orchestration Layer calls.

Run from inside the orchestrator/ directory:

python demo.py

Or from the repo root:

python orchestrator/demo.py

What it does
------------
1. Creates a Planner and an Executor.
2. Runs four representative queries that cover every execution mode.
3. Prints a formatted summary of plans and results to stdout.

Network access is required – the real_adapter.py module calls the Reactome
REST APIs (Content and Analysis services) directly.
"""

from __future__ import annotations
import json
import sys
import os
import time

# ---------------------------------------------------------------------------
# Allow running from repo root without modifying PYTHONPATH
# ---------------------------------------------------------------------------
sys.path.insert(0, os.path.dirname(__file__))

from planner import Planner
from executor import Executor


# ---------------------------------------------------------------------------
# Formatting helpers
# ---------------------------------------------------------------------------

DIVIDER = "=" * 70
THIN_DIVIDER = "-" * 70

ANSI = {
"reset": "\033[0m",
"bold": "\033[1m",
"cyan": "\033[96m",
"green": "\033[92m",
"yellow": "\033[93m",
"red": "\033[91m",
"grey": "\033[90m",
}

def c(color: str, text: str) -> str:
"""Wrap *text* in an ANSI colour code (skipped on non-TTY outputs)."""
if not sys.stdout.isatty():
return text
return f"{ANSI.get(color, '')}{text}{ANSI['reset']}"


def print_plan(plan: dict) -> None:
print(c("cyan", f" Execution mode : {plan['execution']}"))
if plan.get("error"):
print(c("red", f" Planner error : {plan['error']}"))
return
for step in plan.get("steps", []):
print(c("grey", f" [{step['id']}] {step['tool']}({step['input']!r})"))


def print_result(result: dict) -> None:
if not result.get("steps"):
print(c("red", " No steps executed."))
return
for step in result["steps"]:
status = c("green", "OK") if "error" not in step["result"] else c("red", "ERR")
print(f" [{status}] {step['id']} – {step['tool']}({step['input']!r}) "
f"{c('grey', str(step['duration_ms']) + ' ms')}")
print(f" {json.dumps(step['result'])}")
print(c("yellow", f" Total : {result['total_ms']} ms"))


# ---------------------------------------------------------------------------
# Demo queries
# ---------------------------------------------------------------------------

DEMO_QUERIES = [
# (label, query)
("Parallel — compare two genes", "Compare TP53 and BRCA1"),
("Sequential — 3-step pathway chain", "Find apoptosis pathways for BCL2"),
("Single step — enrichment analysis", "Analyse EGFR"),
("Single step — free-text search", "Search PTEN signaling"),
("Error handling — unrecognised query", "Do something completely unknown"),
]


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main() -> None:
planner = Planner()
executor = Executor()

print(f"\n{c('bold', DIVIDER)}")
print(c("bold", " Reactome MCP Orchestration Layer -- Demo"))
print(c("bold", DIVIDER))
print(
" This demo runs the Planner -> Executor pipeline with REAL API calls.\n"
" Tool calls are handled by real_adapter.py which calls the Reactome\n"
" REST APIs directly, mirroring the production MCP server behavior.\n"
)

overall_start = time.perf_counter()

for label, query in DEMO_QUERIES:
print(f"\n{THIN_DIVIDER}")
print(c("bold", f" {label}"))
print(f" Query : {c('cyan', query)}")
print()

# --- Plan ---
print(c("bold", " [PLAN]"))
plan = planner.generate_plan(query)
print_plan(plan)
print()

# --- Execute ---
print(c("bold", " [RESULT]"))
result = executor.run(plan)
print_result(result)

overall_ms = round((time.perf_counter() - overall_start) * 1000, 2)
print(f"\n{DIVIDER}")
print(c("green", f" [OK] All demo queries completed in {overall_ms} ms"))
print(f"{DIVIDER}\n")


if __name__ == "__main__":
main()
180 changes: 180 additions & 0 deletions orchestrator/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""
executor.py
-----------
Executes the structured plan produced by the Planner.

Execution modes
---------------
sequential
Steps run one after another. A step's input may reference the output of
a previous step using the notation "$<step_id>.<field>".
Example: "$step1.stId" resolves to the "stId" key of step1's result.

parallel
All steps are dispatched concurrently using a thread pool, then results
are collected in order.

single
Convenience alias for a plan with exactly one step (runs sequentially).

none
The plan contains no steps (usually an error from the planner).
"""

from __future__ import annotations
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any

from real_adapter import call_tool


class Executor:
"""
Runs a plan dict produced by Planner.generate_plan().

Usage
-----
>>> executor = Executor()
>>> result = executor.run(plan)
"""

# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------

def run(self, plan: dict) -> dict:
"""
Execute *plan* and return a result dict.

Parameters
----------
plan : dict
Output of Planner.generate_plan().

Returns
-------
dict with keys:
query – original query string
execution – execution mode used
steps – list of {id, tool, input, result, duration_ms}
total_ms – total wall-clock time in milliseconds
"""
mode = plan.get("execution", "none")
steps = plan.get("steps", [])
query = plan.get("query", "")

start = time.perf_counter()
if mode in ("sequential", "single"):
executed = self._run_sequential(steps)
elif mode == "parallel":
executed = self._run_parallel(steps)
else:
executed = []

total_ms = round((time.perf_counter() - start) * 1000, 2)

return {
"query": query,
"execution": mode,
"steps": executed,
"total_ms": total_ms,
"error": plan.get("error"),
}

# ------------------------------------------------------------------
# Execution strategies
# ------------------------------------------------------------------

def _run_sequential(self, steps: list[dict]) -> list[dict]:
"""Run steps one at a time; later steps may reference earlier results."""
results: dict[str, dict] = {} # step_id → result dict
executed: list[dict] = []

for step in steps:
resolved_input = self._resolve_input(step["input"], results)
step_result = self._execute_step(step, resolved_input)
results[step["id"]] = step_result["result"]
executed.append(step_result)

return executed

def _run_parallel(self, steps: list[dict]) -> list[dict]:
"""Dispatch all steps concurrently; collect results in original order."""
futures = {}
executed_map: dict[str, dict] = {}

with ThreadPoolExecutor(max_workers=min(len(steps), 8)) as pool:
for step in steps:
future = pool.submit(self._execute_step, step, step["input"])
futures[future] = step["id"]

for future in as_completed(futures):
step_id = futures[future]
executed_map[step_id] = future.result()

# Return in original plan order
return [executed_map[s["id"]] for s in steps]

# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------

def _execute_step(self, step: dict, resolved_input: str) -> dict:
"""Call the tool and capture timing."""
t0 = time.perf_counter()
result = call_tool(step["tool"], resolved_input)
ms = round((time.perf_counter() - t0) * 1000, 2)
return {
"id": step["id"],
"tool": step["tool"],
"input": resolved_input,
"result": result,
"duration_ms": ms,
}

def _resolve_input(self, input_value: str, results: dict[str, dict]) -> str:
"""
Resolve step-reference tokens of the form $<step_id>.<field>.

Example
-------
input_value = "$step1.stId"
results = {"step1": {"stId": "R-HSA-199420", ...}}
returns "R-HSA-199420"

If the reference cannot be resolved, the original token is returned
unchanged so the error is visible in the output.
"""
m = re.match(r"^\$(\w+)\.(\w+)$", str(input_value))
if not m:
return input_value # plain string, no substitution needed

step_id, field = m.groups()
step_result = results.get(step_id, {})
return str(step_result.get(field, input_value)) # fallback to token


# ---------------------------------------------------------------------------
# Quick smoke-test
# ---------------------------------------------------------------------------

if __name__ == "__main__":
import json
from planner import Planner

planner = Planner()
executor = Executor()

for query in [
"Compare TP53 and BRCA1",
"Find apoptosis pathways for BCL2",
"Analyse EGFR",
"Search PTEN",
]:
plan = planner.generate_plan(query)
result = executor.run(plan)
print(f"\n{'='*60}")
print(f"Query : {query}")
print(json.dumps(result, indent=2))
Loading