From 75c59278da3b1da81baf25c34f735fadde9e69fb Mon Sep 17 00:00:00 2001 From: iamsurajbobade Date: Fri, 19 Sep 2025 15:12:14 +0530 Subject: [PATCH 1/7] mcp/server: enable server logging --- mcp/logging.go | 22 ++++++++++++++++++++++ mcp/server.go | 36 +++++++++++++++++++++++++++++++++--- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/mcp/logging.go b/mcp/logging.go index 4d33097a..4c4a3bbc 100644 --- a/mcp/logging.go +++ b/mcp/logging.go @@ -87,6 +87,28 @@ type LoggingHandler struct { handler slog.Handler } +// discardHandler is a slog.Handler that drops all logs. +// TODO: use slog.NewNopHandler when we require Go 1.24+. +type discardHandler struct{} + +func (discardHandler) Enabled(context.Context, slog.Level) bool { return false } +func (discardHandler) Handle(context.Context, slog.Record) error { return nil } +func (discardHandler) WithAttrs([]slog.Attr) slog.Handler { return discardHandler{} } +func (discardHandler) WithGroup(string) slog.Handler { return discardHandler{} } + +// ensureLogger returns l if non-nil, otherwise a discard logger. +func ensureLogger(l *slog.Logger) *slog.Logger { + if l != nil { + return l + } + return slog.New(discardHandler{}) +} + +// internalLogger is used for package-internal logging where we don't have a +// specific server/handler context. It defaults to a discard logger to avoid +// unsolicited output from library code. +var internalLogger = slog.New(discardHandler{}) + // NewLoggingHandler creates a [LoggingHandler] that logs to the given [ServerSession] using a // [slog.JSONHandler]. func NewLoggingHandler(ss *ServerSession, opts *LoggingHandlerOptions) *LoggingHandler { diff --git a/mcp/server.go b/mcp/server.go index 27de09a3..275cc7ab 100644 --- a/mcp/server.go +++ b/mcp/server.go @@ -12,6 +12,7 @@ import ( "encoding/json" "fmt" "iter" + "log/slog" "maps" "net/url" "path/filepath" @@ -35,8 +36,9 @@ const DefaultPageSize = 1000 // sessions by using [Server.Run]. type Server struct { // fixed at creation - impl *Implementation - opts ServerOptions + impl *Implementation + opts ServerOptions + logger *slog.Logger mu sync.Mutex prompts *featureSet[*serverPrompt] @@ -53,6 +55,8 @@ type Server struct { type ServerOptions struct { // Optional instructions for connected clients. Instructions string + // Logger is used for server-side logging. If nil, disable logging. + Logger *slog.Logger // If non-nil, called when "notifications/initialized" is received. InitializedHandler func(context.Context, *InitializedRequest) // PageSize is the maximum number of items to return in a single page for @@ -132,6 +136,7 @@ func NewServer(impl *Implementation, options *ServerOptions) *Server { return &Server{ impl: impl, opts: opts, + logger: ensureLogger(opts.Logger), prompts: newFeatureSet(func(p *serverPrompt) string { return p.prompt.Name }), tools: newFeatureSet(func(t *serverTool) string { return t.tool.Name }), resources: newFeatureSet(func(r *serverResource) string { return r.resource.URI }), @@ -659,6 +664,7 @@ func (s *Server) ResourceUpdated(ctx context.Context, params *ResourceUpdatedNot sessions := slices.Collect(maps.Keys(subscribedSessions)) s.mu.Unlock() notifySessions(sessions, notificationResourceUpdated, params) + s.logger.Info("resource updated notification sent", "uri", params.URI, "subscriber_count", len(sessions)) return nil } @@ -676,6 +682,7 @@ func (s *Server) subscribe(ctx context.Context, req *SubscribeRequest) (*emptyRe s.resourceSubscriptions[req.Params.URI] = make(map[*ServerSession]bool) } s.resourceSubscriptions[req.Params.URI][req.Session] = true + s.logger.Info("resource subscribed", "uri", req.Params.URI, "session_id", req.Session.ID()) return &emptyResult{}, nil } @@ -697,6 +704,7 @@ func (s *Server) unsubscribe(ctx context.Context, req *UnsubscribeRequest) (*emp delete(s.resourceSubscriptions, req.Params.URI) } } + s.logger.Info("resource unsubscribed", "uri", req.Params.URI, "session_id", req.Session.ID()) return &emptyResult{}, nil } @@ -715,8 +723,10 @@ func (s *Server) unsubscribe(ctx context.Context, req *UnsubscribeRequest) (*emp // It need not be called on servers that are used for multiple concurrent connections, // as with [StreamableHTTPHandler]. func (s *Server) Run(ctx context.Context, t Transport) error { + s.logger.Info("server run start") ss, err := s.Connect(ctx, t, nil) if err != nil { + s.logger.Error("server connect failed", "error", err) return err } @@ -728,8 +738,14 @@ func (s *Server) Run(ctx context.Context, t Transport) error { select { case <-ctx.Done(): ss.Close() + s.logger.Info("server run cancelled", "error", ctx.Err()) return ctx.Err() case err := <-ssClosed: + if err != nil { + s.logger.Error("server session ended with error", "error", err) + } else { + s.logger.Info("server session ended") + } return err } } @@ -745,6 +761,7 @@ func (s *Server) bind(mcpConn Connection, conn *jsonrpc2.Connection, state *Serv s.mu.Lock() s.sessions = append(s.sessions, ss) s.mu.Unlock() + s.logger.Info("server session connected", "session_id", ss.ID()) return ss } @@ -760,6 +777,7 @@ func (s *Server) disconnect(cc *ServerSession) { for _, subscribedSessions := range s.resourceSubscriptions { delete(subscribedSessions, cc) } + s.logger.Info("server session disconnected", "session_id", cc.ID()) } // ServerSessionOptions configures the server session. @@ -784,7 +802,14 @@ func (s *Server) Connect(ctx context.Context, t Transport, opts *ServerSessionOp state = opts.State onClose = opts.onClose } - return connect(ctx, t, s, state, onClose) + + s.logger.Info("server connecting") + ss, err := connect(ctx, t, s, state, onClose) + if err != nil { + s.logger.Error("server connect error", "error", err) + return nil, err + } + return ss, nil } // TODO: (nit) move all ServerSession methods below the ServerSession declaration. @@ -804,9 +829,11 @@ func (ss *ServerSession) initialized(ctx context.Context, params *InitializedPar }) if !wasInit { + ss.server.logger.Warn("initialized before initialize") return nil, fmt.Errorf("%q before %q", notificationInitialized, methodInitialize) } if wasInitd { + ss.server.logger.Warn("duplicate initialized notification") return nil, fmt.Errorf("duplicate %q received", notificationInitialized) } if ss.server.opts.KeepAlive > 0 { @@ -815,6 +842,7 @@ func (ss *ServerSession) initialized(ctx context.Context, params *InitializedPar if h := ss.server.opts.InitializedHandler; h != nil { h(ctx, serverRequestFor(ss, params)) } + ss.server.logger.Info("session initialized") return nil, nil } @@ -1052,6 +1080,7 @@ func (ss *ServerSession) handle(ctx context.Context, req *jsonrpc.Request) (any, case methodInitialize, methodPing, notificationInitialized: default: if !initialized { + ss.server.logger.Warn("method invalid during initialization", "method", req.Method) return nil, fmt.Errorf("method %q is invalid during session initialization", req.Method) } } @@ -1108,6 +1137,7 @@ func (ss *ServerSession) setLevel(_ context.Context, params *SetLoggingLevelPara ss.updateState(func(state *ServerSessionState) { state.LogLevel = params.Level }) + ss.server.logger.Info("client log level set", "level", params.Level) return &emptyResult{}, nil } From cf6bc8d27d3b95d814dc87bace7a2b24968869a2 Mon Sep 17 00:00:00 2001 From: iamsurajbobade Date: Fri, 19 Sep 2025 15:33:04 +0530 Subject: [PATCH 2/7] remove unused internalLogger --- mcp/logging.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/mcp/logging.go b/mcp/logging.go index 4c4a3bbc..01578f6d 100644 --- a/mcp/logging.go +++ b/mcp/logging.go @@ -104,11 +104,6 @@ func ensureLogger(l *slog.Logger) *slog.Logger { return slog.New(discardHandler{}) } -// internalLogger is used for package-internal logging where we don't have a -// specific server/handler context. It defaults to a discard logger to avoid -// unsolicited output from library code. -var internalLogger = slog.New(discardHandler{}) - // NewLoggingHandler creates a [LoggingHandler] that logs to the given [ServerSession] using a // [slog.JSONHandler]. func NewLoggingHandler(ss *ServerSession, opts *LoggingHandlerOptions) *LoggingHandler { From 1228679d3da2a7d8e74a4dc67a2c993ea5629cff Mon Sep 17 00:00:00 2001 From: iamsurajbobade Date: Fri, 19 Sep 2025 19:28:03 +0530 Subject: [PATCH 3/7] address todo comments --- mcp/streamable.go | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/mcp/streamable.go b/mcp/streamable.go index 4ab343b2..56d29c8d 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -12,6 +12,7 @@ import ( "fmt" "io" "iter" + "log/slog" "math" "math/rand/v2" "net/http" @@ -39,6 +40,7 @@ const ( type StreamableHTTPHandler struct { getServer func(*http.Request) *Server opts StreamableHTTPOptions + logger *slog.Logger onTransportDeletion func(sessionID string) // for testing only @@ -67,6 +69,10 @@ type StreamableHTTPOptions struct { // // [§2.1.5]: https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server JSONResponse bool + + // Logger specifies the logger to use. + // If nil, do not log. + Logger *slog.Logger } // NewStreamableHTTPHandler returns a new [StreamableHTTPHandler]. @@ -82,6 +88,12 @@ func NewStreamableHTTPHandler(getServer func(*http.Request) *Server, opts *Strea if opts != nil { h.opts = *opts } + + if h.opts.Logger == nil { // ensure we have a logger + h.opts.Logger = ensureLogger(nil) + } + h.logger = h.opts.Logger + return h } @@ -367,6 +379,8 @@ type StreamableServerTransport struct { // StreamableHTTPOptions.JSONResponse is exported. jsonResponse bool + logger *slog.Logger + // connection is non-nil if and only if the transport has been connected. connection *streamableServerConn } @@ -381,6 +395,7 @@ func (t *StreamableServerTransport) Connect(ctx context.Context) (Connection, er stateless: t.Stateless, eventStore: t.EventStore, jsonResponse: t.jsonResponse, + logger: t.logger, incoming: make(chan jsonrpc.Message, 10), done: make(chan struct{}), streams: make(map[string]*stream), @@ -407,6 +422,8 @@ type streamableServerConn struct { jsonResponse bool eventStore EventStore + logger *slog.Logger + incoming chan jsonrpc.Message // messages from the client to the server mu sync.Mutex // guards all fields below @@ -754,7 +771,7 @@ func (c *streamableServerConn) respondSSE(stream *stream, w http.ResponseWriter, } if _, err := writeEvent(w, e); err != nil { // Connection closed or broken. - // TODO(#170): log when we add server-side logging. + c.logger.Warn("error writing event", "error", err) return false } writes++ @@ -773,7 +790,13 @@ func (c *streamableServerConn) respondSSE(stream *stream, w http.ResponseWriter, // simplify. http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) } else { - // TODO(#170): log when we add server-side logging + if ctx.Err() != nil { + // Client disconnected or cancelled the request. + c.logger.Info("stream context done", "error", ctx.Err()) + } else { + // Some other error. + c.logger.Warn("error receiving message", "error", err) + } } return } From 66156e69cb5c4399e50dbf4ae34a6865ede7af62 Mon Sep 17 00:00:00 2001 From: iamsurajbobade Date: Fri, 19 Sep 2025 19:32:24 +0530 Subject: [PATCH 4/7] feat: support for options in SSE Handler --- examples/server/sse/main.go | 2 +- mcp/sse.go | 21 +++++++++++++++++---- mcp/sse_example_test.go | 2 +- mcp/sse_test.go | 6 +++++- 4 files changed, 24 insertions(+), 7 deletions(-) diff --git a/examples/server/sse/main.go b/examples/server/sse/main.go index 27f9caed..0507dd60 100644 --- a/examples/server/sse/main.go +++ b/examples/server/sse/main.go @@ -65,6 +65,6 @@ func main() { default: return nil } - }) + }, nil) log.Fatal(http.ListenAndServe(addr, handler)) } diff --git a/mcp/sse.go b/mcp/sse.go index f39a0397..334f80b0 100644 --- a/mcp/sse.go +++ b/mcp/sse.go @@ -9,6 +9,7 @@ import ( "context" "fmt" "io" + "log/slog" "net/http" "net/url" "sync" @@ -43,12 +44,21 @@ import ( // [2024-11-05 version]: https://modelcontextprotocol.io/specification/2024-11-05/basic/transports type SSEHandler struct { getServer func(request *http.Request) *Server + opts SSEOptions onConnection func(*ServerSession) // for testing; must not block + logger *slog.Logger mu sync.Mutex sessions map[string]*SSEServerTransport } +// SSEOptions specifies options for an [SSEHandler]. +type SSEOptions struct { + // Logger specifies the logger to use. + // If nil, do not log. + Logger *slog.Logger +} + // NewSSEHandler returns a new [SSEHandler] that creates and manages MCP // sessions created via incoming HTTP requests. // @@ -62,13 +72,16 @@ type SSEHandler struct { // The getServer function may return a distinct [Server] for each new // request, or reuse an existing server. If it returns nil, the handler // will return a 400 Bad Request. -// -// TODO(rfindley): add options. -func NewSSEHandler(getServer func(request *http.Request) *Server) *SSEHandler { - return &SSEHandler{ +func NewSSEHandler(getServer func(request *http.Request) *Server, opts *SSEOptions) *SSEHandler { + s := &SSEHandler{ getServer: getServer, sessions: make(map[string]*SSEServerTransport), } + if s.opts.Logger == nil { // ensure we have a logger + s.opts.Logger = ensureLogger(nil) + } + s.logger = s.opts.Logger + return s } // A SSEServerTransport is a logical SSE session created through a hanging GET diff --git a/mcp/sse_example_test.go b/mcp/sse_example_test.go index d06ea62b..6132d31e 100644 --- a/mcp/sse_example_test.go +++ b/mcp/sse_example_test.go @@ -31,7 +31,7 @@ func ExampleSSEHandler() { server := mcp.NewServer(&mcp.Implementation{Name: "adder", Version: "v0.0.1"}, nil) mcp.AddTool(server, &mcp.Tool{Name: "add", Description: "add two numbers"}, Add) - handler := mcp.NewSSEHandler(func(*http.Request) *mcp.Server { return server }) + handler := mcp.NewSSEHandler(func(*http.Request) *mcp.Server { return server }, nil) httpServer := httptest.NewServer(handler) defer httpServer.Close() diff --git a/mcp/sse_test.go b/mcp/sse_test.go index 32a20bf3..b8662f71 100644 --- a/mcp/sse_test.go +++ b/mcp/sse_test.go @@ -24,7 +24,11 @@ func TestSSEServer(t *testing.T) { server := NewServer(testImpl, nil) AddTool(server, &Tool{Name: "greet"}, sayHi) - sseHandler := NewSSEHandler(func(*http.Request) *Server { return server }) + sseOptions := &SSEOptions{ + Logger: ensureLogger(nil), + } + + sseHandler := NewSSEHandler(func(*http.Request) *Server { return server }, sseOptions) serverSessions := make(chan *ServerSession, 1) sseHandler.onConnection = func(ss *ServerSession) { From f59b717265de8419176a9091624deef345e13240 Mon Sep 17 00:00:00 2001 From: iamsurajbobade Date: Fri, 19 Sep 2025 19:42:43 +0530 Subject: [PATCH 5/7] set options --- mcp/sse.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/mcp/sse.go b/mcp/sse.go index 334f80b0..e83dd49e 100644 --- a/mcp/sse.go +++ b/mcp/sse.go @@ -77,10 +77,16 @@ func NewSSEHandler(getServer func(request *http.Request) *Server, opts *SSEOptio getServer: getServer, sessions: make(map[string]*SSEServerTransport), } + + if opts != nil { + s.opts = *opts + } + if s.opts.Logger == nil { // ensure we have a logger s.opts.Logger = ensureLogger(nil) } s.logger = s.opts.Logger + return s } From d59d951e5f29ab3feef49d4f7586eb82baf6fa09 Mon Sep 17 00:00:00 2001 From: iamsurajbobade Date: Fri, 19 Sep 2025 19:51:47 +0530 Subject: [PATCH 6/7] revert server and streamable changees --- mcp/server.go | 36 +++--------------------------------- mcp/streamable.go | 27 ++------------------------- 2 files changed, 5 insertions(+), 58 deletions(-) diff --git a/mcp/server.go b/mcp/server.go index 275cc7ab..27de09a3 100644 --- a/mcp/server.go +++ b/mcp/server.go @@ -12,7 +12,6 @@ import ( "encoding/json" "fmt" "iter" - "log/slog" "maps" "net/url" "path/filepath" @@ -36,9 +35,8 @@ const DefaultPageSize = 1000 // sessions by using [Server.Run]. type Server struct { // fixed at creation - impl *Implementation - opts ServerOptions - logger *slog.Logger + impl *Implementation + opts ServerOptions mu sync.Mutex prompts *featureSet[*serverPrompt] @@ -55,8 +53,6 @@ type Server struct { type ServerOptions struct { // Optional instructions for connected clients. Instructions string - // Logger is used for server-side logging. If nil, disable logging. - Logger *slog.Logger // If non-nil, called when "notifications/initialized" is received. InitializedHandler func(context.Context, *InitializedRequest) // PageSize is the maximum number of items to return in a single page for @@ -136,7 +132,6 @@ func NewServer(impl *Implementation, options *ServerOptions) *Server { return &Server{ impl: impl, opts: opts, - logger: ensureLogger(opts.Logger), prompts: newFeatureSet(func(p *serverPrompt) string { return p.prompt.Name }), tools: newFeatureSet(func(t *serverTool) string { return t.tool.Name }), resources: newFeatureSet(func(r *serverResource) string { return r.resource.URI }), @@ -664,7 +659,6 @@ func (s *Server) ResourceUpdated(ctx context.Context, params *ResourceUpdatedNot sessions := slices.Collect(maps.Keys(subscribedSessions)) s.mu.Unlock() notifySessions(sessions, notificationResourceUpdated, params) - s.logger.Info("resource updated notification sent", "uri", params.URI, "subscriber_count", len(sessions)) return nil } @@ -682,7 +676,6 @@ func (s *Server) subscribe(ctx context.Context, req *SubscribeRequest) (*emptyRe s.resourceSubscriptions[req.Params.URI] = make(map[*ServerSession]bool) } s.resourceSubscriptions[req.Params.URI][req.Session] = true - s.logger.Info("resource subscribed", "uri", req.Params.URI, "session_id", req.Session.ID()) return &emptyResult{}, nil } @@ -704,7 +697,6 @@ func (s *Server) unsubscribe(ctx context.Context, req *UnsubscribeRequest) (*emp delete(s.resourceSubscriptions, req.Params.URI) } } - s.logger.Info("resource unsubscribed", "uri", req.Params.URI, "session_id", req.Session.ID()) return &emptyResult{}, nil } @@ -723,10 +715,8 @@ func (s *Server) unsubscribe(ctx context.Context, req *UnsubscribeRequest) (*emp // It need not be called on servers that are used for multiple concurrent connections, // as with [StreamableHTTPHandler]. func (s *Server) Run(ctx context.Context, t Transport) error { - s.logger.Info("server run start") ss, err := s.Connect(ctx, t, nil) if err != nil { - s.logger.Error("server connect failed", "error", err) return err } @@ -738,14 +728,8 @@ func (s *Server) Run(ctx context.Context, t Transport) error { select { case <-ctx.Done(): ss.Close() - s.logger.Info("server run cancelled", "error", ctx.Err()) return ctx.Err() case err := <-ssClosed: - if err != nil { - s.logger.Error("server session ended with error", "error", err) - } else { - s.logger.Info("server session ended") - } return err } } @@ -761,7 +745,6 @@ func (s *Server) bind(mcpConn Connection, conn *jsonrpc2.Connection, state *Serv s.mu.Lock() s.sessions = append(s.sessions, ss) s.mu.Unlock() - s.logger.Info("server session connected", "session_id", ss.ID()) return ss } @@ -777,7 +760,6 @@ func (s *Server) disconnect(cc *ServerSession) { for _, subscribedSessions := range s.resourceSubscriptions { delete(subscribedSessions, cc) } - s.logger.Info("server session disconnected", "session_id", cc.ID()) } // ServerSessionOptions configures the server session. @@ -802,14 +784,7 @@ func (s *Server) Connect(ctx context.Context, t Transport, opts *ServerSessionOp state = opts.State onClose = opts.onClose } - - s.logger.Info("server connecting") - ss, err := connect(ctx, t, s, state, onClose) - if err != nil { - s.logger.Error("server connect error", "error", err) - return nil, err - } - return ss, nil + return connect(ctx, t, s, state, onClose) } // TODO: (nit) move all ServerSession methods below the ServerSession declaration. @@ -829,11 +804,9 @@ func (ss *ServerSession) initialized(ctx context.Context, params *InitializedPar }) if !wasInit { - ss.server.logger.Warn("initialized before initialize") return nil, fmt.Errorf("%q before %q", notificationInitialized, methodInitialize) } if wasInitd { - ss.server.logger.Warn("duplicate initialized notification") return nil, fmt.Errorf("duplicate %q received", notificationInitialized) } if ss.server.opts.KeepAlive > 0 { @@ -842,7 +815,6 @@ func (ss *ServerSession) initialized(ctx context.Context, params *InitializedPar if h := ss.server.opts.InitializedHandler; h != nil { h(ctx, serverRequestFor(ss, params)) } - ss.server.logger.Info("session initialized") return nil, nil } @@ -1080,7 +1052,6 @@ func (ss *ServerSession) handle(ctx context.Context, req *jsonrpc.Request) (any, case methodInitialize, methodPing, notificationInitialized: default: if !initialized { - ss.server.logger.Warn("method invalid during initialization", "method", req.Method) return nil, fmt.Errorf("method %q is invalid during session initialization", req.Method) } } @@ -1137,7 +1108,6 @@ func (ss *ServerSession) setLevel(_ context.Context, params *SetLoggingLevelPara ss.updateState(func(state *ServerSessionState) { state.LogLevel = params.Level }) - ss.server.logger.Info("client log level set", "level", params.Level) return &emptyResult{}, nil } diff --git a/mcp/streamable.go b/mcp/streamable.go index 56d29c8d..4ab343b2 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -12,7 +12,6 @@ import ( "fmt" "io" "iter" - "log/slog" "math" "math/rand/v2" "net/http" @@ -40,7 +39,6 @@ const ( type StreamableHTTPHandler struct { getServer func(*http.Request) *Server opts StreamableHTTPOptions - logger *slog.Logger onTransportDeletion func(sessionID string) // for testing only @@ -69,10 +67,6 @@ type StreamableHTTPOptions struct { // // [§2.1.5]: https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server JSONResponse bool - - // Logger specifies the logger to use. - // If nil, do not log. - Logger *slog.Logger } // NewStreamableHTTPHandler returns a new [StreamableHTTPHandler]. @@ -88,12 +82,6 @@ func NewStreamableHTTPHandler(getServer func(*http.Request) *Server, opts *Strea if opts != nil { h.opts = *opts } - - if h.opts.Logger == nil { // ensure we have a logger - h.opts.Logger = ensureLogger(nil) - } - h.logger = h.opts.Logger - return h } @@ -379,8 +367,6 @@ type StreamableServerTransport struct { // StreamableHTTPOptions.JSONResponse is exported. jsonResponse bool - logger *slog.Logger - // connection is non-nil if and only if the transport has been connected. connection *streamableServerConn } @@ -395,7 +381,6 @@ func (t *StreamableServerTransport) Connect(ctx context.Context) (Connection, er stateless: t.Stateless, eventStore: t.EventStore, jsonResponse: t.jsonResponse, - logger: t.logger, incoming: make(chan jsonrpc.Message, 10), done: make(chan struct{}), streams: make(map[string]*stream), @@ -422,8 +407,6 @@ type streamableServerConn struct { jsonResponse bool eventStore EventStore - logger *slog.Logger - incoming chan jsonrpc.Message // messages from the client to the server mu sync.Mutex // guards all fields below @@ -771,7 +754,7 @@ func (c *streamableServerConn) respondSSE(stream *stream, w http.ResponseWriter, } if _, err := writeEvent(w, e); err != nil { // Connection closed or broken. - c.logger.Warn("error writing event", "error", err) + // TODO(#170): log when we add server-side logging. return false } writes++ @@ -790,13 +773,7 @@ func (c *streamableServerConn) respondSSE(stream *stream, w http.ResponseWriter, // simplify. http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) } else { - if ctx.Err() != nil { - // Client disconnected or cancelled the request. - c.logger.Info("stream context done", "error", ctx.Err()) - } else { - // Some other error. - c.logger.Warn("error receiving message", "error", err) - } + // TODO(#170): log when we add server-side logging } return } From 44548ac192ec4578fdd478087f2bf37764a36a79 Mon Sep 17 00:00:00 2001 From: iamsurajbobade Date: Fri, 19 Sep 2025 20:06:09 +0530 Subject: [PATCH 7/7] use logger in SSEServerTransport --- mcp/sse.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/mcp/sse.go b/mcp/sse.go index e83dd49e..f3b8cf34 100644 --- a/mcp/sse.go +++ b/mcp/sse.go @@ -119,6 +119,10 @@ type SSEServerTransport struct { // Response is the hanging response body to the incoming GET request. Response http.ResponseWriter + // logger is used for per-POST diagnostics and transport-level logs. + // If nil, logging is disabled. + logger *slog.Logger + // incoming is the queue of incoming messages. // It is never closed, and by convention, incoming is non-nil if and only if // the transport is connected. @@ -143,6 +147,7 @@ func (t *SSEServerTransport) ServeHTTP(w http.ResponseWriter, req *http.Request) // Read and parse the message. data, err := io.ReadAll(req.Body) if err != nil { + t.logger.Error("sse: failed to read body", "error", err) http.Error(w, "failed to read body", http.StatusBadRequest) return } @@ -151,11 +156,13 @@ func (t *SSEServerTransport) ServeHTTP(w http.ResponseWriter, req *http.Request) // useful msg, err := jsonrpc2.DecodeMessage(data) if err != nil { + t.logger.Error("sse: failed to parse body", "error", err) http.Error(w, "failed to parse body", http.StatusBadRequest) return } if req, ok := msg.(*jsonrpc.Request); ok { if _, err := checkRequest(req, serverMethodInfos); err != nil { + t.logger.Warn("sse: request validation failed", "error", err) http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -164,6 +171,7 @@ func (t *SSEServerTransport) ServeHTTP(w http.ResponseWriter, req *http.Request) case t.incoming <- msg: w.WriteHeader(http.StatusAccepted) case <-t.done: + t.logger.Warn("sse: session closed while posting message") http.Error(w, "session closed", http.StatusBadRequest) } } @@ -227,11 +235,12 @@ func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { sessionID = randText() endpoint, err := req.URL.Parse("?sessionid=" + sessionID) if err != nil { + h.logger.Error("sse: failed to create endpoint", "error", err) http.Error(w, "internal error: failed to create endpoint", http.StatusInternalServerError) return } - transport := &SSEServerTransport{Endpoint: endpoint.RequestURI(), Response: w} + transport := &SSEServerTransport{Endpoint: endpoint.RequestURI(), Response: w, logger: h.logger} // The session is terminated when the request exits. h.mu.Lock() @@ -251,6 +260,7 @@ func (h *SSEHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { } ss, err := server.Connect(req.Context(), transport, nil) if err != nil { + h.logger.Error("sse: server connect failed", "error", err) http.Error(w, "connection failed", http.StatusInternalServerError) return }