Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 119 additions & 3 deletions src/mcpm/router/app.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be noted that currently app.py has been updated to start a streamable http app, the sse app has been deprecated and moved to sse_app.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoJoJoJoJoJoJo NoOpsResponse was required to fix crashes in OpenHands. Please let me know where (what classes, sse_app.py?) should I look to integrate this part.

Copy link
Contributor

@JoJoJoJoJoJoJo JoJoJoJoJoJoJo Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked the usage of mcpm in Openhands code base.

Which means only works on src/mcpm/router/router.py is enough as far as I understand. And after your pr merged in mcpm.sh repo, you may also need to bump the package dependency version in OpenHands

Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
import logging
import os
import asyncio
import re
from contextlib import asynccontextmanager

from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.routing import Mount, Route

from mcpm.monitor.event import monitor
from mcpm.monitor.base import AccessEventType
from mcpm.router.router import MCPRouter
from mcpm.router.router_config import RouterConfig
from mcpm.router.transport import RouterSseTransport
from mcpm.utils.config import ConfigManager
from mcpm.utils.platform import get_log_directory

Expand All @@ -23,10 +35,114 @@
api_key = config.get("api_key")
auth_enabled = config.get("auth_enabled", False)

router_instance = MCPRouter(reload_server=True, router_config=RouterConfig(api_key=api_key, auth_enabled=auth_enabled))
sse_transport = RouterSseTransport("/messages/", api_key=api_key if auth_enabled else None)

class NoOpsResponse(Response):
def __init__(self):
super().__init__(content=b"", status_code=204)

async def __call__(self, scope, receive, send):
await send(
{
"type": "http.response.start",
"status": self.status_code,
"headers": self.render_headers(),
}
)
await send({"type": "http.response.body", "body": b"", "more_body": False})

async def handle_sse(request: Request):
try:
async with sse_transport.connect_sse(
request.scope,
request.receive,
request._send,
) as (read_stream, write_stream):
await router_instance.aggregated_server.run(
read_stream,
write_stream,
router_instance.aggregated_server.initialization_options,
)
while not await request.is_disconnected():
await asyncio.sleep(0.1)
except asyncio.CancelledError:
raise
except Exception as e:
logger.error(f"Unexpected error in app.py handle_sse: {e}", exc_info=True)
finally:
return NoOpsResponse()

async def handle_query_events(request: Request) -> Response:
try:
offset = request.query_params.get("offset")
page = int(request.query_params.get("page", 1))
limit = int(request.query_params.get("limit", 10))
event_type_str = request.query_params.get("event_type", None)

allow_origins = None
if offset is None:
return JSONResponse(
{"error": "Missing required parameter", "detail": "The 'offset' parameter is required."},
status_code=400,
)

offset_pattern = r"^(\d+)([hdwm])$"
match = re.match(offset_pattern, offset.lower())
if not match:
return JSONResponse(
{"error": "Invalid offset format", "detail": "Offset must be e.g., '24h', '7d', '2w', '1m'."},
status_code=400,
)

if page < 1:
page = 1
event_type = None
if event_type_str:
try:
event_type = AccessEventType[event_type_str.upper()].name
except (KeyError, ValueError):
return JSONResponse(
{"error": "Invalid event type", "detail": f"Valid types: {', '.join([e.name for e in AccessEventType])}"},
status_code=400,
)

if monitor:
response_data = await monitor.query_events(offset, page, limit, event_type)
return JSONResponse(response_data.model_dump(), status_code=200)
else:
logger.warning("monitor object not available for /events route")
return JSONResponse({"error": "Monitoring not available"}, status_code=503)

except Exception as e:
logger.error(f"Error handling query events request: {e}", exc_info=True)
return JSONResponse({"error": str(e)}, status_code=500)

@asynccontextmanager
async def lifespan(app_starlette: Starlette):
logger.info("Starting MCPRouter (via app.py)...")
await router_instance.initialize_router()
if monitor:
await monitor.initialize_storage()
yield
logger.info("Shutting down MCPRouter (via app.py)...")
await router_instance.shutdown()
if monitor:
await monitor.close()

middlewares = []
if CORS_ENABLED:
allow_origins = os.environ.get("MCPM_ROUTER_CORS", "").split(",")
middlewares.append(
Middleware(CORSMiddleware, allow_origins=allow_origins, allow_methods=["*"], allow_headers=["*"])
)

router = MCPRouter(reload_server=True, router_config=RouterConfig(api_key=api_key, auth_enabled=auth_enabled))
app = router.get_remote_server_app(allow_origins=allow_origins, include_lifespan=True, monitor=monitor)
app = Starlette(
debug=os.environ.get("MCPM_DEBUG") == "true",
middleware=middlewares,
routes=[
Route("/sse", endpoint=handle_sse),
Route("/events", endpoint=handle_query_events, methods=["GET"]),
Mount("/messages/", app=sse_transport.handle_post_message),
],
lifespan=lifespan,
)
14 changes: 8 additions & 6 deletions src/mcpm/router/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,16 @@ async def handle_post_message(self, scope: Scope, receive: Receive, send: Send):
response = Response("Could not parse message", status_code=400)
await response(scope, receive, send)
try:
await writer.send(err)
await writer.send(SessionMessage(message=err))
except (BrokenPipeError, ConnectionError, OSError) as pipe_err:
logger.warning(f"Failed to send error due to pipe issue: {pipe_err}")
return

logger.debug(f"Sending message to writer: {message}")
response = Response("Accepted", status_code=202)
await response(scope, receive, send)
# Send the 202 Accepted response
accepted_response = Response("Accepted", status_code=202)
await accepted_response(scope, receive, send)

# add error handling, catch possible pipe errors
# Attempt to send the message to the writer
try:
await writer.send(SessionMessage(message=message))
except (BrokenPipeError, ConnectionError, OSError) as e:
Expand All @@ -240,7 +240,9 @@ async def handle_post_message(self, scope: Scope, receive: Receive, send: Send):
logger.warning(f"Connection error when sending message to session {session_id}: {e}")
self._read_stream_writers.pop(session_id, None)
self._session_id_to_identifier.pop(session_id, None)
return response

# Implicitly return None. The original 'return response' is removed.
return

def _validate_api_key(self, scope: Scope, api_key: str | None) -> bool:
# If api_key is explicitly set to None, disable API key validation
Expand Down