Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,20 +170,20 @@ async def _handle_sse_event(

# If this is a response and we have original_request_id, replace it
if original_request_id is not None and isinstance(message.root, JSONRPCResponse | JSONRPCError):
message.root.id = original_request_id
message.root.id = original_request_id # pragma: no cover

session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
await read_stream_writer.send(session_message) # pragma: no cover

# Call resumption token callback if we have an ID. Only update
# the resumption token on notifications to avoid overwriting it
# with the token from the final response.
if sse.id and resumption_callback and not isinstance(message.root, JSONRPCResponse | JSONRPCError):
await resumption_callback(sse.id.strip())
await resumption_callback(sse.id.strip()) # pragma: no cover

# If this is a response or error return True indicating completion
# Otherwise, return False to continue listening
return isinstance(message.root, JSONRPCResponse | JSONRPCError)
return isinstance(message.root, JSONRPCResponse | JSONRPCError) # pragma: no cover

except Exception as exc: # pragma: no cover
logger.exception("Error parsing SSE message")
Expand Down Expand Up @@ -221,7 +221,7 @@ async def handle_get_stream(
except Exception as exc:
logger.debug(f"GET stream error (non-fatal): {exc}") # pragma: no cover

async def _handle_resumption_request(self, ctx: RequestContext) -> None:
async def _handle_resumption_request(self, ctx: RequestContext) -> None: # pragma: no cover
"""Handle a resumption request using GET with SSE."""
headers = self._prepare_request_headers(ctx.headers)
if ctx.metadata and ctx.metadata.resumption_token:
Expand Down Expand Up @@ -339,7 +339,7 @@ async def _handle_sse_response(
if is_complete:
await response.aclose()
break
except Exception as e:
except Exception as e: # pragma: no cover
logger.exception("Error reading SSE stream:") # pragma: no cover
await ctx.read_stream_writer.send(e) # pragma: no cover

Expand Down Expand Up @@ -408,7 +408,7 @@ async def post_writer(

async def handle_request_async():
if is_resumption:
await self._handle_resumption_request(ctx)
await self._handle_resumption_request(ctx) # pragma: no cover
else:
await self._handle_post_request(ctx)

Expand Down
4 changes: 2 additions & 2 deletions tests/shared/test_streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ class SimpleEventStore(EventStore):
"""Simple in-memory event store for testing."""

def __init__(self):
self._events: list[tuple[StreamId, EventId, types.JSONRPCMessage]] = []
self._event_id_counter = 0
self._events: list[tuple[StreamId, EventId, types.JSONRPCMessage]] = [] # pragma: no cover
self._event_id_counter = 0 # pragma: no cover

async def store_event(self, stream_id: StreamId, message: types.JSONRPCMessage) -> EventId: # pragma: no cover
"""Store an event and return its ID."""
Expand Down