From cc0566e14d3e38c42c514e464a46761abb1bf2ec Mon Sep 17 00:00:00 2001 From: Sam Thanawalla Date: Wed, 20 Aug 2025 15:31:52 +0000 Subject: [PATCH 1/5] mcp/streamable: use event store to fix unbounded memory issues This CL utilizes the event store to write outgoing messages and removes the unbounded outgoing data structure. For #190 --- mcp/event.go | 10 ++++-- mcp/streamable.go | 91 ++++++++++++++++++++--------------------------- 2 files changed, 46 insertions(+), 55 deletions(-) diff --git a/mcp/event.go b/mcp/event.go index f4f4eeea..fcc35ba1 100644 --- a/mcp/event.go +++ b/mcp/event.go @@ -283,6 +283,12 @@ func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID // index is no longer available. var ErrEventsPurged = errors.New("data purged") +// ErrUnknownSession is the error that [EventStore.After] should return if the session ID is unknown. +var ErrUnknownSession = errors.New("unknown session ID") + +// ErrUnknownSession is the error that [EventStore.After] should return if the stream ID is unknown. +var ErrUnknownStream = errors.New("unknown stream ID") + // After implements [EventStore.After]. func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID StreamID, index int) iter.Seq2[[]byte, error] { // Return the data items to yield. @@ -292,11 +298,11 @@ func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID S defer s.mu.Unlock() streamMap, ok := s.store[sessionID] if !ok { - return nil, fmt.Errorf("MemoryEventStore.After: unknown session ID %q", sessionID) + return nil, fmt.Errorf("MemoryEventStore.After: session ID %v: %w", sessionID, ErrUnknownSession) } dl, ok := streamMap[streamID] if !ok { - return nil, fmt.Errorf("MemoryEventStore.After: unknown stream ID %v in session %q", streamID, sessionID) + return nil, fmt.Errorf("MemoryEventStore.After: stream ID %v in session %q: %w", streamID, sessionID, ErrUnknownStream) } start := index + 1 if dl.first > start { diff --git a/mcp/streamable.go b/mcp/streamable.go index f56b7084..3d229022 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -490,7 +490,7 @@ type stream struct { // that there are messages available to write into the HTTP response. // In addition, the presence of a channel guarantees that at most one HTTP response // can receive messages for a logical stream. After claiming the stream, incoming - // requests should read from outgoing, to ensure that no new messages are missed. + // requests should read from the event store, to ensure that no new messages are missed. // // To simplify locking, signal is an atomic. We need an atomic.Pointer, because // you can't set an atomic.Value to nil. @@ -502,22 +502,23 @@ type stream struct { // The following mutable fields are protected by the mutex of the containing // StreamableServerTransport. - // outgoing is the list of outgoing messages, enqueued by server methods that - // write notifications and responses, and dequeued by streamResponse. - outgoing [][]byte - // streamRequests is the set of unanswered incoming RPCs for the stream. // - // Requests persist until their response data has been added to outgoing. + // Requests persist until their response data has been added to the event store. requests map[jsonrpc.ID]struct{} + + // lastWriteIndex tracks the index of the last message written to the event store for this stream. + lastWriteIndex atomic.Int64 } func newStream(id StreamID, jsonResponse bool) *stream { - return &stream{ + s := &stream{ id: id, jsonResponse: jsonResponse, requests: make(map[jsonrpc.ID]struct{}), } + s.lastWriteIndex.Store(-1) + return s } func signalChanPtr() *chan struct{} { @@ -706,8 +707,8 @@ func (c *streamableServerConn) respondJSON(stream *stream, w http.ResponseWriter var msgs []json.RawMessage ctx := req.Context() - for msg, ok := range c.messages(ctx, stream, false) { - if !ok { + for msg, err := range c.messages(ctx, stream, false, -1) { + if err != nil { if ctx.Err() != nil { w.WriteHeader(http.StatusNoContent) return @@ -770,44 +771,20 @@ func (c *streamableServerConn) respondSSE(stream *stream, w http.ResponseWriter, } } - if lastIndex >= 0 { - // Resume. - for data, err := range c.eventStore.After(req.Context(), c.SessionID(), stream.id, lastIndex) { - if err != nil { - // TODO: reevaluate these status codes. - // Maybe distinguish between storage errors, which are 500s, and missing - // session or stream ID--can these arise from bad input? - status := http.StatusInternalServerError - if errors.Is(err, ErrEventsPurged) { - status = http.StatusInsufficientStorage - } - errorf(status, "failed to read events: %v", err) - return - } - // The iterator yields events beginning just after lastIndex, or it would have - // yielded an error. - if !write(data) { - return - } - } - } - // Repeatedly collect pending outgoing events and send them. ctx := req.Context() - for msg, ok := range c.messages(ctx, stream, persistent) { - if !ok { + for msg, err := range c.messages(ctx, stream, persistent, lastIndex) { + if err != nil { if ctx.Err() != nil && writes == 0 { // This probably doesn't matter, but respond with NoContent if the client disconnected. w.WriteHeader(http.StatusNoContent) + } else if errors.Is(err, ErrEventsPurged) { + errorf(http.StatusInsufficientStorage, "failed to read events: %v", err) } else { errorf(http.StatusGone, "stream terminated") } return } - if err := c.eventStore.Append(req.Context(), c.SessionID(), stream.id, msg); err != nil { - errorf(http.StatusInternalServerError, "storing event: %v", err.Error()) - return - } if !write(msg) { return } @@ -822,27 +799,33 @@ func (c *streamableServerConn) respondSSE(stream *stream, w http.ResponseWriter, // If the stream did not terminate normally, it is either because ctx was // cancelled, or the connection is closed: check the ctx.Err() to differentiate // these cases. -func (c *streamableServerConn) messages(ctx context.Context, stream *stream, persistent bool) iter.Seq2[json.RawMessage, bool] { - return func(yield func(json.RawMessage, bool) bool) { +func (c *streamableServerConn) messages(ctx context.Context, stream *stream, persistent bool, lastIndex int) iter.Seq2[json.RawMessage, error] { + return func(yield func(json.RawMessage, error) bool) { for { - c.mu.Lock() - outgoing := stream.outgoing - stream.outgoing = nil - nOutstanding := len(stream.requests) - c.mu.Unlock() - - for _, data := range outgoing { - if !yield(data, true) { + for data, err := range c.eventStore.After(ctx, c.SessionID(), stream.id, lastIndex) { + if err != nil { + // Wait for session initialization before yielding. + if errors.Is(err, ErrUnknownSession) || errors.Is(err, ErrUnknownStream) { + break + } + yield(nil, err) return } + if !yield(data, nil) { + return + } + lastIndex++ } + c.mu.Lock() + nOutstanding := len(stream.requests) + c.mu.Unlock() // If all requests have been handled and replied to, we should terminate this connection. // "After the JSON-RPC response has been sent, the server SHOULD close the SSE stream." // §6.4, https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server // We only want to terminate POSTs, and GETs that are replaying. The general-purpose GET // (stream ID 0) will never have requests, and should remain open indefinitely. - if nOutstanding == 0 && !persistent { + if nOutstanding == 0 && !persistent && lastIndex >= int(stream.lastWriteIndex.Load()) { return } @@ -850,13 +833,14 @@ func (c *streamableServerConn) messages(ctx context.Context, stream *stream, per case <-*stream.signal.Load(): // there are new outgoing messages // return to top of loop case <-c.done: // session is closed - yield(nil, false) + yield(nil, errors.New("session is closed")) return case <-ctx.Done(): - yield(nil, false) + yield(nil, ctx.Err()) return } } + } } @@ -963,9 +947,10 @@ func (c *streamableServerConn) Write(ctx context.Context, msg jsonrpc.Message) e stream = c.streams[""] } - // TODO: if there is nothing to send these messages to (as would happen, for example, if forConn == "" - // and the client never did a GET), then memory will grow without bound. Consider a mitigation. - stream.outgoing = append(stream.outgoing, data) + if err := c.eventStore.Append(ctx, c.SessionID(), stream.id, data); err != nil { + return fmt.Errorf("error storing event: %w", err) + } + stream.lastWriteIndex.Add(1) if isResponse { // Once we've put the reply on the queue, it's no longer outstanding. delete(stream.requests, forRequest) From bf2306e93d62225024045e279436c9bd263135fd Mon Sep 17 00:00:00 2001 From: Sam Thanawalla Date: Mon, 25 Aug 2025 18:59:00 +0000 Subject: [PATCH 2/5] remove lastWriteIndex and register stream with event store when created --- mcp/event.go | 16 ++++++++-------- mcp/streamable.go | 21 +++++++++++---------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/mcp/event.go b/mcp/event.go index fcc35ba1..a3c0f970 100644 --- a/mcp/event.go +++ b/mcp/event.go @@ -274,6 +274,12 @@ func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID // Purge before adding, so at least the current data item will be present. // (That could result in nBytes > maxBytes, but we'll live with that.) s.purge() + + // An empty data slice signals that a stream has been registered. + // We ignore it since it contains no content and shouldn't affect the total size. + if data == nil { + return nil + } dl.appendData(data) s.nBytes += len(data) return nil @@ -283,12 +289,6 @@ func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID // index is no longer available. var ErrEventsPurged = errors.New("data purged") -// ErrUnknownSession is the error that [EventStore.After] should return if the session ID is unknown. -var ErrUnknownSession = errors.New("unknown session ID") - -// ErrUnknownSession is the error that [EventStore.After] should return if the stream ID is unknown. -var ErrUnknownStream = errors.New("unknown stream ID") - // After implements [EventStore.After]. func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID StreamID, index int) iter.Seq2[[]byte, error] { // Return the data items to yield. @@ -298,11 +298,11 @@ func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID S defer s.mu.Unlock() streamMap, ok := s.store[sessionID] if !ok { - return nil, fmt.Errorf("MemoryEventStore.After: session ID %v: %w", sessionID, ErrUnknownSession) + return nil, fmt.Errorf("MemoryEventStore.After: unknown session ID %q", sessionID) } dl, ok := streamMap[streamID] if !ok { - return nil, fmt.Errorf("MemoryEventStore.After: stream ID %v in session %q: %w", streamID, sessionID, ErrUnknownStream) + return nil, fmt.Errorf("MemoryEventStore.After: unknown stream ID %v in session %q", streamID, sessionID) } start := index + 1 if dl.first > start { diff --git a/mcp/streamable.go b/mcp/streamable.go index 3d229022..735cebf5 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -506,9 +506,6 @@ type stream struct { // // Requests persist until their response data has been added to the event store. requests map[jsonrpc.ID]struct{} - - // lastWriteIndex tracks the index of the last message written to the event store for this stream. - lastWriteIndex atomic.Int64 } func newStream(id StreamID, jsonResponse bool) *stream { @@ -517,7 +514,6 @@ func newStream(id StreamID, jsonResponse bool) *stream { jsonResponse: jsonResponse, requests: make(map[jsonrpc.ID]struct{}), } - s.lastWriteIndex.Store(-1) return s } @@ -679,6 +675,11 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques c.mu.Unlock() stream.signal.Store(signalChanPtr()) defer stream.signal.Store(nil) + // Register this stream with the event store. + if err := c.eventStore.Append(req.Context(), c.SessionID(), stream.id, nil); err != nil { + http.Error(w, fmt.Sprintf("error storing event: %v", err), http.StatusInternalServerError) + return + } } // Publish incoming messages. @@ -804,13 +805,14 @@ func (c *streamableServerConn) messages(ctx context.Context, stream *stream, per for { for data, err := range c.eventStore.After(ctx, c.SessionID(), stream.id, lastIndex) { if err != nil { - // Wait for session initialization before yielding. - if errors.Is(err, ErrUnknownSession) || errors.Is(err, ErrUnknownStream) { - break - } yield(nil, err) return } + // The stream exists, but does not contain any messages on the stream. + // Do not yield this data. + if data == nil { + break + } if !yield(data, nil) { return } @@ -825,7 +827,7 @@ func (c *streamableServerConn) messages(ctx context.Context, stream *stream, per // §6.4, https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server // We only want to terminate POSTs, and GETs that are replaying. The general-purpose GET // (stream ID 0) will never have requests, and should remain open indefinitely. - if nOutstanding == 0 && !persistent && lastIndex >= int(stream.lastWriteIndex.Load()) { + if nOutstanding == 0 && !persistent { return } @@ -950,7 +952,6 @@ func (c *streamableServerConn) Write(ctx context.Context, msg jsonrpc.Message) e if err := c.eventStore.Append(ctx, c.SessionID(), stream.id, data); err != nil { return fmt.Errorf("error storing event: %w", err) } - stream.lastWriteIndex.Add(1) if isResponse { // Once we've put the reply on the queue, it's no longer outstanding. delete(stream.requests, forRequest) From 06a0ab0df8598242ba5a15bb56719af0d1592c3b Mon Sep 17 00:00:00 2001 From: Sam Thanawalla Date: Wed, 27 Aug 2025 13:32:34 +0000 Subject: [PATCH 3/5] remove extra eventstore append and add comments --- mcp/event.go | 6 ------ mcp/streamable.go | 20 +++++--------------- 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/mcp/event.go b/mcp/event.go index a3c0f970..f4f4eeea 100644 --- a/mcp/event.go +++ b/mcp/event.go @@ -274,12 +274,6 @@ func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID // Purge before adding, so at least the current data item will be present. // (That could result in nBytes > maxBytes, but we'll live with that.) s.purge() - - // An empty data slice signals that a stream has been registered. - // We ignore it since it contains no content and shouldn't affect the total size. - if data == nil { - return nil - } dl.appendData(data) s.nBytes += len(data) return nil diff --git a/mcp/streamable.go b/mcp/streamable.go index 735cebf5..1ba9e03a 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -509,12 +509,11 @@ type stream struct { } func newStream(id StreamID, jsonResponse bool) *stream { - s := &stream{ + return &stream{ id: id, jsonResponse: jsonResponse, requests: make(map[jsonrpc.ID]struct{}), } - return s } func signalChanPtr() *chan struct{} { @@ -675,11 +674,6 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques c.mu.Unlock() stream.signal.Store(signalChanPtr()) defer stream.signal.Store(nil) - // Register this stream with the event store. - if err := c.eventStore.Append(req.Context(), c.SessionID(), stream.id, nil); err != nil { - http.Error(w, fmt.Sprintf("error storing event: %v", err), http.StatusInternalServerError) - return - } } // Publish incoming messages. @@ -794,8 +788,11 @@ func (c *streamableServerConn) respondSSE(stream *stream, w http.ResponseWriter, // messages iterates over messages sent to the current stream. // +// persistent indicates if it is the main GET listener. +// lastIndex is the index of the last seen event. +// // The first iterated value is the received JSON message. The second iterated -// value is an OK value indicating whether the stream terminated normally. +// value is an error value indicating whether the stream terminated normally. // // If the stream did not terminate normally, it is either because ctx was // cancelled, or the connection is closed: check the ctx.Err() to differentiate @@ -805,12 +802,6 @@ func (c *streamableServerConn) messages(ctx context.Context, stream *stream, per for { for data, err := range c.eventStore.After(ctx, c.SessionID(), stream.id, lastIndex) { if err != nil { - yield(nil, err) - return - } - // The stream exists, but does not contain any messages on the stream. - // Do not yield this data. - if data == nil { break } if !yield(data, nil) { @@ -821,7 +812,6 @@ func (c *streamableServerConn) messages(ctx context.Context, stream *stream, per c.mu.Lock() nOutstanding := len(stream.requests) c.mu.Unlock() - // If all requests have been handled and replied to, we should terminate this connection. // "After the JSON-RPC response has been sent, the server SHOULD close the SSE stream." // §6.4, https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server From 4e858272a2a517a5c70e8d902e2cbf450e0f2a41 Mon Sep 17 00:00:00 2001 From: Sam Thanawalla Date: Wed, 27 Aug 2025 20:57:43 +0000 Subject: [PATCH 4/5] add EventStore.Open --- mcp/event.go | 27 +++++++++++++++++++++++++-- mcp/streamable.go | 34 +++++++++++++++++++++++----------- 2 files changed, 48 insertions(+), 13 deletions(-) diff --git a/mcp/event.go b/mcp/event.go index f4f4eeea..a7e89f08 100644 --- a/mcp/event.go +++ b/mcp/event.go @@ -153,6 +153,11 @@ func scanEvents(r io.Reader) iter.Seq2[Event, error] { // // All of an EventStore's methods must be safe for use by multiple goroutines. type EventStore interface { + // Open prepares the event store for a given session. It ensures that the + // underlying data structure for the sessionID is initialized, making it + // ready to store event streams. + Open(_ context.Context, sessionID string, streamID StreamID) error + // Append appends data for an outgoing event to given stream, which is part of the // given session. Append(_ context.Context, sessionID string, _ StreamID, data []byte) error @@ -256,11 +261,21 @@ func NewMemoryEventStore(opts *MemoryEventStoreOptions) *MemoryEventStore { } } -// Append implements [EventStore.Append] by recording data in memory. -func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID StreamID, data []byte) error { +// Open implements [EventStore.Open]. It ensures that the underlying data +// structures for the given session and stream IDs are initialized and ready +// for use. +func (s *MemoryEventStore) Open(_ context.Context, sessionID string, streamID StreamID) error { s.mu.Lock() defer s.mu.Unlock() + s.init(sessionID, streamID) + return nil +} +// init is an internal helper function that ensures the nested map structure for a +// given sessionID and streamID exists, creating it if necessary. It returns the +// dataList associated with the specified IDs. +// This function must be called within a locked context. +func (s *MemoryEventStore) init(sessionID string, streamID StreamID) *dataList { streamMap, ok := s.store[sessionID] if !ok { streamMap = make(map[StreamID]*dataList) @@ -271,6 +286,14 @@ func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID dl = &dataList{} streamMap[streamID] = dl } + return dl +} + +// Append implements [EventStore.Append] by recording data in memory. +func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID StreamID, data []byte) error { + s.mu.Lock() + defer s.mu.Unlock() + dl := s.init(sessionID, streamID) // Purge before adding, so at least the current data item will be present. // (That could result in nBytes > maxBytes, but we'll live with that.) s.purge() diff --git a/mcp/streamable.go b/mcp/streamable.go index 1ba9e03a..ec2a4b87 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -401,7 +401,7 @@ func NewStreamableServerTransport(sessionID string, opts *StreamableServerTransp } // Connect implements the [Transport] interface. -func (t *StreamableServerTransport) Connect(context.Context) (Connection, error) { +func (t *StreamableServerTransport) Connect(ctx context.Context) (Connection, error) { if t.connection != nil { return nil, fmt.Errorf("transport already connected") } @@ -415,13 +415,17 @@ func (t *StreamableServerTransport) Connect(context.Context) (Connection, error) streams: make(map[StreamID]*stream), requestStreams: make(map[jsonrpc.ID]StreamID), } + if t.connection.eventStore == nil { + t.connection.eventStore = NewMemoryEventStore(nil) + } // Stream 0 corresponds to the hanging 'GET'. // // It is always text/event-stream, since it must carry arbitrarily many // messages. - t.connection.streams[""] = newStream("", false) - if t.connection.eventStore == nil { - t.connection.eventStore = NewMemoryEventStore(nil) + var err error + t.connection.streams[""], err = t.connection.newStream(ctx, "", false) + if err != nil { + return nil, err } return t.connection, nil } @@ -508,12 +512,15 @@ type stream struct { requests map[jsonrpc.ID]struct{} } -func newStream(id StreamID, jsonResponse bool) *stream { +func (c *streamableServerConn) newStream(ctx context.Context, id StreamID, jsonResponse bool) (*stream, error) { + if err := c.eventStore.Open(ctx, c.sessionID, id); err != nil { + return nil, err + } return &stream{ id: id, jsonResponse: jsonResponse, requests: make(map[jsonrpc.ID]struct{}), - } + }, nil } func signalChanPtr() *chan struct{} { @@ -664,7 +671,11 @@ func (c *streamableServerConn) servePOST(w http.ResponseWriter, req *http.Reques // notifications or server->client requests made in the course of handling. // Update accounting for this incoming payload. if len(requests) > 0 { - stream = newStream(StreamID(randText()), c.jsonResponse) + stream, err = c.newStream(req.Context(), StreamID(randText()), c.jsonResponse) + if err != nil { + http.Error(w, fmt.Sprintf("storing stream: %v", err), http.StatusInternalServerError) + return + } c.mu.Lock() c.streams[stream.id] = stream stream.requests = requests @@ -800,18 +811,19 @@ func (c *streamableServerConn) respondSSE(stream *stream, w http.ResponseWriter, func (c *streamableServerConn) messages(ctx context.Context, stream *stream, persistent bool, lastIndex int) iter.Seq2[json.RawMessage, error] { return func(yield func(json.RawMessage, error) bool) { for { + c.mu.Lock() + nOutstanding := len(stream.requests) + c.mu.Unlock() for data, err := range c.eventStore.After(ctx, c.SessionID(), stream.id, lastIndex) { if err != nil { - break + yield(nil, err) + return } if !yield(data, nil) { return } lastIndex++ } - c.mu.Lock() - nOutstanding := len(stream.requests) - c.mu.Unlock() // If all requests have been handled and replied to, we should terminate this connection. // "After the JSON-RPC response has been sent, the server SHOULD close the SSE stream." // §6.4, https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server From f2bc8b28ac02a1759b5454bcf72312d64120945b Mon Sep 17 00:00:00 2001 From: Sam Thanawalla Date: Thu, 28 Aug 2025 14:32:47 +0000 Subject: [PATCH 5/5] address comments --- mcp/event.go | 10 +++++----- mcp/streamable.go | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/mcp/event.go b/mcp/event.go index a7e89f08..0dd8734b 100644 --- a/mcp/event.go +++ b/mcp/event.go @@ -153,8 +153,8 @@ func scanEvents(r io.Reader) iter.Seq2[Event, error] { // // All of an EventStore's methods must be safe for use by multiple goroutines. type EventStore interface { - // Open prepares the event store for a given session. It ensures that the - // underlying data structure for the sessionID is initialized, making it + // Open prepares the event store for a given stream. It ensures that the + // underlying data structure for the stream is initialized, making it // ready to store event streams. Open(_ context.Context, sessionID string, streamID StreamID) error @@ -167,6 +167,7 @@ type EventStore interface { // Once the iterator yields a non-nil error, it will stop. // After's iterator must return an error immediately if any data after index was // dropped; it must not return partial results. + // The stream must have been opened previously (see [EventStore.Open]). After(_ context.Context, sessionID string, _ StreamID, index int) iter.Seq2[[]byte, error] // SessionClosed informs the store that the given session is finished, along @@ -262,8 +263,7 @@ func NewMemoryEventStore(opts *MemoryEventStoreOptions) *MemoryEventStore { } // Open implements [EventStore.Open]. It ensures that the underlying data -// structures for the given session and stream IDs are initialized and ready -// for use. +// structures for the given session are initialized and ready for use. func (s *MemoryEventStore) Open(_ context.Context, sessionID string, streamID StreamID) error { s.mu.Lock() defer s.mu.Unlock() @@ -274,7 +274,7 @@ func (s *MemoryEventStore) Open(_ context.Context, sessionID string, streamID St // init is an internal helper function that ensures the nested map structure for a // given sessionID and streamID exists, creating it if necessary. It returns the // dataList associated with the specified IDs. -// This function must be called within a locked context. +// Requires s.mu. func (s *MemoryEventStore) init(sessionID string, streamID StreamID) *dataList { streamMap, ok := s.store[sessionID] if !ok { diff --git a/mcp/streamable.go b/mcp/streamable.go index ec2a4b87..92f20206 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -719,7 +719,7 @@ func (c *streamableServerConn) respondJSON(stream *stream, w http.ResponseWriter w.WriteHeader(http.StatusNoContent) return } else { - http.Error(w, http.StatusText(http.StatusGone), http.StatusGone) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } } @@ -784,10 +784,8 @@ func (c *streamableServerConn) respondSSE(stream *stream, w http.ResponseWriter, if ctx.Err() != nil && writes == 0 { // This probably doesn't matter, but respond with NoContent if the client disconnected. w.WriteHeader(http.StatusNoContent) - } else if errors.Is(err, ErrEventsPurged) { - errorf(http.StatusInsufficientStorage, "failed to read events: %v", err) } else { - errorf(http.StatusGone, "stream terminated") + errorf(http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)) } return } @@ -799,11 +797,13 @@ func (c *streamableServerConn) respondSSE(stream *stream, w http.ResponseWriter, // messages iterates over messages sent to the current stream. // -// persistent indicates if it is the main GET listener. -// lastIndex is the index of the last seen event. +// persistent indicates if it is the main GET listener, which should never be +// terminated. +// lastIndex is the index of the last seen event, iteration begins at lastIndex+1. // // The first iterated value is the received JSON message. The second iterated // value is an error value indicating whether the stream terminated normally. +// Iteration ends at the first non-nil error. // // If the stream did not terminate normally, it is either because ctx was // cancelled, or the connection is closed: check the ctx.Err() to differentiate