diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b5508a..2be40ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,14 +4,19 @@ ### Added +- `MetricsCollector` interface — typed instrumentation hooks for connection lifecycle, room state, throughput, backpressure, and heartbeat health +- `NoopCollector` — default minimal-overhead implementation that discards all events; embed it in custom implementations for forward-compatible additions +- `WithMetrics(mc MetricsCollector)` server option — plug in any metrics backend (Prometheus, OTel, or custom) - `PanicError` exported type — wraps panics recovered from `OnMessage` handlers with the panic value and goroutine stack trace; delivered to `OnDisconnect` when resumption is disabled, or to `OnTransportDrop` when resumption is enabled (in which case `OnDisconnect` may later fire with a nil error); use `errors.As` on whichever callback error is non-nil to distinguish handler panics from transport failures - `WithUpgraderBufferSize(readSize, writeSize int)` option — configures the WebSocket upgrader I/O buffer sizes (default 1024 bytes each) +- `DisconnectReason` type and constants (`DisconnectNormal`, `DisconnectKick`, `DisconnectGraceExpired`, `DisconnectServerClose`, `DisconnectDuplicate`) — `ConnectionClosed` now includes a reason parameter so metrics backends can distinguish disconnect causes - Benchmarks for ring buffer, broadcast fan-out, direct Send throughput, and drop-oldest backpressure ### Changed - Broadcast fan-out reuses a scratch slice on the hub instead of allocating a new snapshot per invocation — zero-alloc steady state since the hub event loop is single-threaded - `ConnectFunc` GoDoc now documents that `roomID` is ignored on session resumption +- `Server.Close()` now emits `ConnectionClosed` (with `DisconnectServerClose` reason) and `RoomDestroyed` metrics for all sessions during shutdown ### Fixed diff --git a/README.md b/README.md index 9087fbe..c2a173e 100644 --- a/README.md +++ b/README.md @@ -143,6 +143,7 @@ See [wspulse/core](https://github.com/wspulse/core) for the full `router` API. | `WithCodec(c)` | JSONCodec | | `WithCheckOrigin(fn)` | allow all | | `WithLogger(l)` | zap.NewNop() — accepts `*zap.Logger` | +| `WithMetrics(mc)` | NoopCollector — accepts `MetricsCollector` | --- @@ -156,6 +157,36 @@ See [wspulse/core](https://github.com/wspulse/core) for the full `router` API. - **Swappable codec** — JSON by default; implement the `Codec` interface to plug in any encoding (binary, Protobuf, MessagePack, etc.). - **Kick** — `Server.Kick(connectionID)` always destroys the session immediately, bypassing the resume window. - **Graceful shutdown** — `Server.Close()` sends close frames to all connected clients, drains in-flight registrations, and fires `OnDisconnect` for every session. +- **Metrics** — optional `MetricsCollector` interface for observability; default is `NoopCollector` (minimal overhead). See [Metrics](#metrics) below. + +--- + +## Metrics + +wspulse/server exposes a `MetricsCollector` interface with typed hooks for connection lifecycle, room state, throughput, backpressure, and heartbeat health. The default is `NoopCollector{}` (minimal overhead). Embed `NoopCollector` in custom implementations for forward-compatible additions. + +```go +// Use a contrib adapter (e.g. wspulse/metrics-prometheus or wspulse/metrics-otel), +// or implement MetricsCollector yourself. +var collector wspulse.MetricsCollector = myCollector + +srv := wspulse.NewServer(connect, + wspulse.WithMetrics(collector), +) +``` + +The interface covers: + +| Hook | Description | +| --- | --- | +| `ConnectionOpened` / `ConnectionClosed` | Session lifecycle with duration and disconnect reason | +| `RoomCreated` / `RoomDestroyed` | Room allocation and deallocation | +| `MessageReceived` / `MessageSent` / `MessageBroadcast` | Throughput with byte sizes and fan-out | +| `FrameDropped` / `SendBufferUtilization` | Backpressure visibility | +| `ResumeAttempt` | Session resumption tracking | +| `PongTimeout` | Heartbeat health | + +Implement `MetricsCollector` to plug in any backend (Prometheus, OTel, or custom). See [doc/internals.md](doc/internals.md) for goroutine call sites and thread safety requirements. --- diff --git a/TODOS.md b/TODOS.md new file mode 100644 index 0000000..f40c9b7 --- /dev/null +++ b/TODOS.md @@ -0,0 +1,16 @@ +# TODOS + +## Failed resume detection: ResumeAttempt(..., false) + +When a client reconnects with a connectionID that has already expired (grace +window elapsed), identify this as a "failed resume" and emit +`ResumeAttempt(roomID, connectionID, false)` before creating the new session. + +The `success bool` parameter exists but is currently always `true`. Detecting +failed resumes is non-trivial: after grace expiry the session is destroyed and +the connectionID is freed. Distinguishing "new client reusing an ID" from +"client that missed the resume window" may require protocol-level signaling +(e.g. a client-sent `resume` intent header on the upgrade request). + +**Depends on:** MetricsCollector interface (this branch), possibly protocol +changes in the `.github` repo. diff --git a/doc/internals.md b/doc/internals.md index 8253307..6189ef0 100644 --- a/doc/internals.md +++ b/doc/internals.md @@ -14,6 +14,7 @@ belong to the consumers of wspulse/server. 3. [Backpressure and Send Buffer](#3-backpressure-and-send-buffer) 4. [Connection Teardown](#4-connection-teardown) 5. [Session Resumption](#5-session-resumption) +6. [Metrics](#6-metrics) --- @@ -263,3 +264,55 @@ Kick() returns nil If the hub has already shut down (`<-hub.done`), `Kick` returns `ErrServerClosed` without blocking. + +--- + +## 6. Metrics + +wspulse/server exposes an optional `MetricsCollector` interface for +instrumentation. The default is `NoopCollector{}`, a no-op implementation +that discards all events with minimal overhead. + +### Configuration + +```go +wspulse.NewServer(connect, + wspulse.WithMetrics(myCollector), // custom implementation +) +``` + +If `WithMetrics` is not called, the server uses `NoopCollector`. + +### Interface + +`MetricsCollector` defines typed methods for each lifecycle event. +All methods are fire-and-forget (no return value). Implementations +must be safe for concurrent use. + +### Goroutine call sites + +| Method | Called from | +| ----------------------- | ------------------- | +| `RoomCreated` | hub goroutine | +| `RoomDestroyed` | hub goroutine | +| `ConnectionOpened` | hub goroutine | +| `ConnectionClosed` | hub goroutine | +| `ResumeAttempt` | hub goroutine | +| `MessageBroadcast` | hub goroutine | +| `MessageReceived` | readPump goroutine | +| `PongTimeout` | readPump goroutine | +| `MessageSent` | writePump goroutine | +| `SendBufferUtilization` | writePump goroutine | +| `FrameDropped` | hub goroutine (broadcast), caller goroutine (Send), or transition goroutine (resume drain) | + +### Connection duration + +`ConnectionClosed` receives a `duration` parameter computed as +`time.Since(session.connectedAt)`. The `connectedAt` timestamp is set +once when the session is created in `handleRegister`. This means the +duration reflects the **logical session lifetime**, including any time +spent in the suspended state during session resumption. + +`ConnectionClosed` also receives a `reason` parameter of type +`DisconnectReason` that indicates why the session was terminated. +See the `DisconnectReason` constants for possible values. diff --git a/hub.go b/hub.go index dfbc9d6..2ff6c73 100644 --- a/hub.go +++ b/hub.go @@ -3,6 +3,7 @@ package wspulse import ( "sync" "sync/atomic" + "time" "github.com/gorilla/websocket" "go.uber.org/zap" @@ -141,6 +142,7 @@ func (h *hub) handleRegister(message registerMessage) { onResume = func() { fn(existing) } } existing.attachWS(message.transport, h, onResume) + h.config.metrics.ResumeAttempt(existing.roomID, existing.id, true) h.config.logger.Info("wspulse: session resumed", zap.String("conn_id", message.connectionID), ) @@ -151,7 +153,7 @@ func (h *hub) handleRegister(message registerMessage) { h.config.logger.Warn("wspulse: duplicate conn_id, kicking existing session", zap.String("conn_id", message.connectionID), ) - h.disconnectSession(existing, ErrDuplicateConnectionID) + h.disconnectSession(existing, ErrDuplicateConnectionID, DisconnectDuplicate) case stateClosed: // Close() was called externally before the hub processed the @@ -161,32 +163,39 @@ func (h *hub) handleRegister(message registerMessage) { h.config.logger.Debug("wspulse: stale closed session removed", zap.String("conn_id", message.connectionID), ) - h.disconnectSession(existing, nil) + h.disconnectSession(existing, nil, DisconnectNormal) } } // Create a new session. newSession := &session{ - id: message.connectionID, - roomID: message.roomID, - send: make(chan []byte, h.config.sendBufferSize), - done: make(chan struct{}), - state: stateConnected, - config: h.config, + id: message.connectionID, + roomID: message.roomID, + send: make(chan []byte, h.config.sendBufferSize), + done: make(chan struct{}), + state: stateConnected, + connectedAt: time.Now(), + config: h.config, } if h.config.resumeWindow > 0 { newSession.resumeBuffer = newRingBuffer(h.config.sendBufferSize) } + var roomCreated bool h.mu.Lock() if h.rooms[message.roomID] == nil { h.rooms[message.roomID] = make(map[string]*session) + roomCreated = true } h.rooms[message.roomID][message.connectionID] = newSession h.connectionsByID[message.connectionID] = newSession h.mu.Unlock() + if roomCreated { + h.config.metrics.RoomCreated(message.roomID) + } newSession.attachWS(message.transport, h, nil) + h.config.metrics.ConnectionOpened(message.roomID, message.connectionID) h.config.logger.Debug("wspulse: session connected", zap.String("conn_id", message.connectionID), @@ -246,7 +255,7 @@ func (h *hub) handleTransportDied(message transportDiedMessage) { h.config.logger.Debug("wspulse: transport-died for closed session, cleaning up", zap.String("conn_id", target.id), ) - h.disconnectSession(target, message.err) + h.disconnectSession(target, message.err, DisconnectNormal) } else { h.config.logger.Debug("wspulse: transport-died for unregistered closed session, skipping", zap.String("conn_id", target.id), @@ -282,7 +291,7 @@ func (h *hub) handleTransportDied(message transportDiedMessage) { h.config.logger.Info("wspulse: suspended session closed by application (race path)", zap.String("conn_id", target.id), ) - h.disconnectSession(target, nil) + h.disconnectSession(target, nil, DisconnectNormal) return } target.graceTimer = timer @@ -303,7 +312,7 @@ func (h *hub) handleTransportDied(message transportDiedMessage) { zap.String("conn_id", target.id), zap.Error(message.err), ) - h.disconnectSession(target, message.err) + h.disconnectSession(target, message.err, DisconnectNormal) } // handleGraceExpired destroys a session whose resume window has elapsed @@ -345,12 +354,13 @@ func (h *hub) handleGraceExpired(message graceExpiredMessage) { h.config.logger.Info("wspulse: session expired", zap.String("conn_id", target.id), ) + h.disconnectSession(target, nil, DisconnectGraceExpired) } else { h.config.logger.Info("wspulse: suspended session closed by application", zap.String("conn_id", target.id), ) + h.disconnectSession(target, nil, DisconnectNormal) } - h.disconnectSession(target, nil) } // handleKick removes a session, closes it, and fires onDisconnect. @@ -372,7 +382,7 @@ func (h *hub) handleKick(request kickRequest) { zap.String("conn_id", request.connectionID), ) - h.disconnectSession(target, nil) + h.disconnectSession(target, nil, DisconnectKick) request.result <- nil } @@ -395,6 +405,7 @@ func (h *hub) handleBroadcast(message broadcastMessage) { return } + enqueued := 0 for _, target := range h.scratch { select { case <-target.done: @@ -405,10 +416,12 @@ func (h *hub) handleBroadcast(message broadcastMessage) { // Use enqueue with drop-oldest so suspended sessions buffer to // resumeBuffer and connected sessions apply backpressure uniformly. _ = target.enqueue(message.data, true) + enqueued++ } + h.config.metrics.MessageBroadcast(message.roomID, len(message.data), enqueued) h.config.logger.Debug("wspulse: broadcast dispatched", zap.String("room_id", message.roomID), - zap.Int("recipients", len(h.scratch)), + zap.Int("recipients", enqueued), ) // Clear pointers so disconnected sessions can be GC'd. @@ -422,8 +435,9 @@ func (h *hub) handleBroadcast(message broadcastMessage) { // disconnectSession removes the session from hub maps, closes it, and fires // onDisconnect. Safe to call even if Close() was already called externally // (closeOnce makes it idempotent). -func (h *hub) disconnectSession(target *session, err error) { +func (h *hub) disconnectSession(target *session, err error, reason DisconnectReason) { h.removeSession(target) + h.config.metrics.ConnectionClosed(target.roomID, target.id, time.Since(target.connectedAt), reason) _ = target.Close() if fn := h.config.onDisconnect; fn != nil { go fn(target, err) @@ -434,17 +448,22 @@ func (h *hub) disconnectSession(target *session, err error) { func (h *hub) removeSession(target *session) { target.cancelGraceTimer() + var roomDestroyed bool h.mu.Lock() if room := h.rooms[target.roomID]; room != nil && room[target.id] == target { delete(room, target.id) if len(room) == 0 { delete(h.rooms, target.roomID) + roomDestroyed = true } } if h.connectionsByID[target.id] == target { delete(h.connectionsByID, target.id) } h.mu.Unlock() + if roomDestroyed { + h.config.metrics.RoomDestroyed(target.roomID) + } } // shutdown closes every active session. Called once by run() when done fires. @@ -463,17 +482,35 @@ func (h *hub) shutdown() { h.config.logger.Info("wspulse: hub shutting down", zap.Int("active_sessions", sessionCount), ) - for _, room := range h.rooms { + type closedInfo struct { + roomID string + connectionID string + duration time.Duration + } + var closedInfos []closedInfo + var destroyedRooms []string + for roomID, room := range h.rooms { for _, target := range room { target.cancelGraceTimer() + closedInfos = append(closedInfos, closedInfo{target.roomID, target.id, time.Since(target.connectedAt)}) _ = target.Close() disconnected = append(disconnected, target) } + destroyedRooms = append(destroyedRooms, roomID) } h.rooms = make(map[string]map[string]*session) h.connectionsByID = make(map[string]*session) h.mu.Unlock() + // Emit metrics outside the lock to avoid deadlocks if the + // MetricsCollector calls back into server APIs. + for _, info := range closedInfos { + h.config.metrics.ConnectionClosed(info.roomID, info.connectionID, info.duration, DisconnectServerClose) + } + for _, roomID := range destroyedRooms { + h.config.metrics.RoomDestroyed(roomID) + } + // Drain in-flight register messages to prevent goroutine leaks. for { select { diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..98cb7dc --- /dev/null +++ b/metrics.go @@ -0,0 +1,164 @@ +package wspulse + +import "time" + +// DisconnectReason describes why a connection was closed. +// Used as a label-friendly parameter in ConnectionClosed to let metrics +// backends (Prometheus, OTel, etc.) distinguish disconnect causes. +type DisconnectReason string + +const ( + // DisconnectNormal indicates the connection was closed normally: + // the transport died (no resume configured), or the application + // called Connection.Close() on a suspended session. + DisconnectNormal DisconnectReason = "normal" + + // DisconnectKick indicates the connection was terminated by + // an explicit Server.Kick() call. + DisconnectKick DisconnectReason = "kick" + + // DisconnectGraceExpired indicates the resume window elapsed + // without the client reconnecting. + DisconnectGraceExpired DisconnectReason = "grace_expired" + + // DisconnectServerClose indicates the connection was terminated + // because Server.Close() shut down the server. + DisconnectServerClose DisconnectReason = "server_close" + + // DisconnectDuplicate indicates the connection was replaced by + // a new connection with the same connectionID. + DisconnectDuplicate DisconnectReason = "duplicate" +) + +// MetricsCollector defines instrumentation hooks for wspulse server. +// Each method corresponds to a single lifecycle or throughput event. +// +// Implementations must be safe for concurrent use. Methods are called from +// the hub goroutine, readPump goroutines, and writePump goroutines. +// +// All methods are fire-and-forget: they do not return values. If the +// underlying metrics backend encounters an error, the implementation +// should handle it internally (e.g. log and skip). +// +// Hooks are invoked synchronously on hot paths; implementations must +// return quickly and must not panic. Implementations must not call back +// into the same Server synchronously (e.g. Kick, Send, Broadcast) as +// this can deadlock the hub event loop. +// +// For forward-compatible custom implementations, embed NoopCollector: +// +// type MyCollector struct { +// wspulse.NoopCollector // provides no-op defaults for future methods +// } +// func (c *MyCollector) ConnectionOpened(roomID, connectionID string) { +// // custom implementation +// } +// +// This ensures new methods added to MetricsCollector in future +// versions are automatically satisfied by the embedded no-op defaults. +type MetricsCollector interface { + // ConnectionOpened is called when a new session is created and registered. + ConnectionOpened(roomID, connectionID string) + + // ConnectionClosed is called when a session is terminated. duration is the + // total logical session lifetime from creation to destruction, including + // any time spent in the suspended state. reason indicates why the session + // was closed (see DisconnectReason constants). + ConnectionClosed(roomID, connectionID string, duration time.Duration, reason DisconnectReason) + + // ResumeAttempt is called when a suspended session is successfully resumed + // by a reconnecting client. In the current implementation, success is + // always true: a failed resume (reconnect after grace expiry) results in + // a new session via ConnectionOpened rather than an identifiable + // failed-resume event. The success parameter is reserved for future use + // when failed resume detection is implemented. + ResumeAttempt(roomID, connectionID string, success bool) + + // RoomCreated is called when the first connection joins a room, + // causing the room to be allocated. + RoomCreated(roomID string) + + // RoomDestroyed is called when the last connection leaves a room, + // causing the room to be deallocated. + RoomDestroyed(roomID string) + + // MessageReceived is called in the readPump when an inbound WebSocket + // message is read, before decoding. sizeBytes is the raw wire size. + MessageReceived(roomID string, sizeBytes int) + + // MessageBroadcast is called after a broadcast frame is fanned out to + // all sessions in a room. sizeBytes is the pre-encoded frame size; + // fanOut is the number of recipient sessions (including suspended ones). + MessageBroadcast(roomID string, sizeBytes int, fanOut int) + + // MessageSent is called in the writePump after a frame is successfully + // written to the WebSocket transport. sizeBytes is the encoded frame size. + MessageSent(roomID, connectionID string, sizeBytes int) + + // FrameDropped is called whenever a frame is discarded due to send buffer + // backpressure. This covers drops from Send (buffer full), Broadcast + // (drop-oldest), and resume drain (buffer full during replay). + // In the drop-oldest path, two FrameDropped events may fire: one for the + // oldest frame evicted and one for the new frame if it still cannot be + // enqueued — both represent real frame loss. + FrameDropped(roomID, connectionID string) + + // SendBufferUtilization is called in the writePump after every successful + // write. used and capacity report the current send channel occupancy. + // + // This method is called once per message write. For high-throughput + // connections (e.g. 10k msg/s), expect the same call rate per connection. + // Implementations should apply sampling, batching, or throttling as needed. + SendBufferUtilization(roomID, connectionID string, used, capacity int) + + // PongTimeout is called in the readPump when a read deadline expires, + // indicating the remote peer failed to respond to a Ping in time. + PongTimeout(roomID, connectionID string) +} + +// NoopCollector is the default MetricsCollector that discards all events. +// All methods are value-receiver no-ops with minimal overhead. +// +// Embed NoopCollector in custom implementations for forward-compatible +// additions to the MetricsCollector interface: +// +// type MyCollector struct { +// wspulse.NoopCollector +// } +type NoopCollector struct{} + +// compile-time check: NoopCollector must satisfy MetricsCollector. +var _ MetricsCollector = NoopCollector{} + +// ConnectionOpened is a no-op. See MetricsCollector.ConnectionOpened. +func (NoopCollector) ConnectionOpened(_, _ string) {} + +// ConnectionClosed is a no-op. See MetricsCollector.ConnectionClosed. +func (NoopCollector) ConnectionClosed(_, _ string, _ time.Duration, _ DisconnectReason) {} + +// ResumeAttempt is a no-op. See MetricsCollector.ResumeAttempt. +func (NoopCollector) ResumeAttempt(_, _ string, _ bool) {} + +// RoomCreated is a no-op. See MetricsCollector.RoomCreated. +func (NoopCollector) RoomCreated(_ string) {} + +// RoomDestroyed is a no-op. See MetricsCollector.RoomDestroyed. +func (NoopCollector) RoomDestroyed(_ string) {} + +// MessageReceived is a no-op. See MetricsCollector.MessageReceived. +func (NoopCollector) MessageReceived(_ string, _ int) {} + +// MessageBroadcast is a no-op. See MetricsCollector.MessageBroadcast. +func (NoopCollector) MessageBroadcast(_ string, _ int, _ int) {} + +// MessageSent is a no-op. See MetricsCollector.MessageSent. +func (NoopCollector) MessageSent(_, _ string, _ int) {} + +// FrameDropped is a no-op. See MetricsCollector.FrameDropped. +func (NoopCollector) FrameDropped(_, _ string) {} + +// SendBufferUtilization is a no-op. See MetricsCollector.SendBufferUtilization. +func (NoopCollector) SendBufferUtilization(_, _ string, _, _ int) {} + +// PongTimeout is a no-op. See MetricsCollector.PongTimeout. +func (NoopCollector) PongTimeout(_, _ string) {} diff --git a/metrics_integration_test.go b/metrics_integration_test.go new file mode 100644 index 0000000..3e4a67c --- /dev/null +++ b/metrics_integration_test.go @@ -0,0 +1,537 @@ +//go:build integration + +package wspulse_test + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/gorilla/websocket" + + wspulse "github.com/wspulse/server" +) + +func (r *recordingCollector) snapshot() []metricsEvent { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]metricsEvent, len(r.events)) + copy(out, r.events) + return out +} + +func (r *recordingCollector) countByName(name string) int { + events := r.snapshot() + n := 0 + for _, e := range events { + if e.name == name { + n++ + } + } + return n +} + +func (r *recordingCollector) eventsByName(name string) []metricsEvent { + events := r.snapshot() + var out []metricsEvent + for _, e := range events { + if e.name == name { + out = append(out, e) + } + } + return out +} + +func TestIntegration_MetricsCollector_ConnectionLifecycle(t *testing.T) { + rec := &recordingCollector{} + connected := make(chan struct{}, 4) + disconnected := make(chan struct{}, 4) + + srv := wspulse.NewServer( + func(r *http.Request) (string, string, error) { + return "metrics-room", "", nil + }, + wspulse.WithMetrics(rec), + wspulse.WithOnConnect(func(_ wspulse.Connection) { + connected <- struct{}{} + }), + wspulse.WithOnDisconnect(func(_ wspulse.Connection, _ error) { + disconnected <- struct{}{} + }), + ) + ts := httptest.NewServer(srv) + defer func() { + srv.Close() + ts.Close() + }() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + dialer := websocket.Dialer{HandshakeTimeout: 3 * time.Second} + + // Open connection. + c, resp, err := dialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Dial: %v", err) + } + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + select { + case <-connected: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for connect") + } + + if n := rec.countByName("RoomCreated"); n != 1 { + t.Errorf("RoomCreated: want 1, got %d", n) + } + if n := rec.countByName("ConnectionOpened"); n != 1 { + t.Errorf("ConnectionOpened: want 1, got %d", n) + } + + // Close connection. + _ = c.Close() + select { + case <-disconnected: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for disconnect") + } + + if n := rec.countByName("ConnectionClosed"); n != 1 { + t.Errorf("ConnectionClosed: want 1, got %d", n) + } + if n := rec.countByName("RoomDestroyed"); n != 1 { + t.Errorf("RoomDestroyed: want 1, got %d", n) + } + + // Verify ConnectionClosed has non-negative duration and normal reason. + for _, e := range rec.eventsByName("ConnectionClosed") { + if e.duration < 0 { + t.Errorf("ConnectionClosed duration should be >= 0, got %v", e.duration) + } + if e.reason != wspulse.DisconnectNormal { + t.Errorf("ConnectionClosed reason = %q, want %q", e.reason, wspulse.DisconnectNormal) + } + } +} + +func TestIntegration_MetricsCollector_MessageFlow(t *testing.T) { + rec := &recordingCollector{} + connected := make(chan struct{}, 4) + broadcastDone := make(chan struct{}, 1) + + var srv wspulse.Server + srv = wspulse.NewServer( + func(r *http.Request) (string, string, error) { + return "metrics-room", "", nil + }, + wspulse.WithMetrics(rec), + wspulse.WithOnConnect(func(_ wspulse.Connection) { + connected <- struct{}{} + }), + wspulse.WithOnMessage(func(conn wspulse.Connection, f wspulse.Frame) { + _ = srv.Broadcast(conn.RoomID(), f) + select { + case broadcastDone <- struct{}{}: + default: + } + }), + ) + ts := httptest.NewServer(srv) + defer func() { + srv.Close() + ts.Close() + }() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + dialer := websocket.Dialer{HandshakeTimeout: 3 * time.Second} + + // Open 2 connections. + c1, resp1, err := dialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Dial c1: %v", err) + } + if resp1 != nil && resp1.Body != nil { + resp1.Body.Close() + } + c2, resp2, err := dialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Dial c2: %v", err) + } + if resp2 != nil && resp2.Body != nil { + resp2.Body.Close() + } + defer c1.Close() + defer c2.Close() + for i := 0; i < 2; i++ { + select { + case <-connected: + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for connection %d", i+1) + } + } + + // Send a message from c1. + if err := c1.WriteMessage(websocket.TextMessage, []byte(`{"event":"test"}`)); err != nil { + t.Fatalf("write: %v", err) + } + + // Wait for broadcast to complete. + select { + case <-broadcastDone: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for broadcast") + } + + // Read broadcast on both clients to ensure writePump sent them (MessageSent fires). + c1.SetReadDeadline(time.Now().Add(3 * time.Second)) + c2.SetReadDeadline(time.Now().Add(3 * time.Second)) + _, _, _ = c1.ReadMessage() + _, _, _ = c2.ReadMessage() + + if n := rec.countByName("MessageReceived"); n != 1 { + t.Errorf("MessageReceived: want 1, got %d", n) + } + if n := rec.countByName("MessageBroadcast"); n != 1 { + t.Errorf("MessageBroadcast: want 1, got %d", n) + } + // 2 connections in room → 2 MessageSent. + if n := rec.countByName("MessageSent"); n != 2 { + t.Errorf("MessageSent: want 2, got %d", n) + } + // SendBufferUtilization fires after each MessageSent. + if n := rec.countByName("SendBufferUtilization"); n != 2 { + t.Errorf("SendBufferUtilization: want 2, got %d", n) + } +} + +func TestIntegration_MetricsCollector_ResumeAttempt(t *testing.T) { + rec := &recordingCollector{} + connected := make(chan struct{}, 4) + transportDrop := make(chan struct{}, 4) + transportRestore := make(chan struct{}, 4) + + srv := wspulse.NewServer( + func(r *http.Request) (string, string, error) { + return "resume-room", "resume-conn", nil + }, + wspulse.WithMetrics(rec), + wspulse.WithResumeWindow(5*time.Second), + wspulse.WithOnConnect(func(_ wspulse.Connection) { + connected <- struct{}{} + }), + wspulse.WithOnTransportDrop(func(_ wspulse.Connection, _ error) { + transportDrop <- struct{}{} + }), + wspulse.WithOnTransportRestore(func(_ wspulse.Connection) { + transportRestore <- struct{}{} + }), + ) + ts := httptest.NewServer(srv) + defer func() { + srv.Close() + ts.Close() + }() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + dialer := websocket.Dialer{HandshakeTimeout: 3 * time.Second} + + // Initial connection. + c1, resp1, err := dialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Dial: %v", err) + } + if resp1 != nil && resp1.Body != nil { + resp1.Body.Close() + } + select { + case <-connected: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for connect") + } + + if n := rec.countByName("ConnectionOpened"); n != 1 { + t.Errorf("ConnectionOpened: want 1, got %d", n) + } + + // Drop transport. + _ = c1.Close() + select { + case <-transportDrop: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for transport drop") + } + + // Reconnect with same ID → resume. + c2, resp2, err := dialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Reconnect Dial: %v", err) + } + if resp2 != nil && resp2.Body != nil { + resp2.Body.Close() + } + defer c2.Close() + + select { + case <-transportRestore: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for transport restore") + } + + if n := rec.countByName("ResumeAttempt"); n != 1 { + t.Errorf("ResumeAttempt: want 1, got %d", n) + } + // Should NOT fire a second ConnectionOpened (session was resumed, not recreated). + if n := rec.countByName("ConnectionOpened"); n != 1 { + t.Errorf("ConnectionOpened after resume: want 1, got %d", n) + } + + events := rec.eventsByName("ResumeAttempt") + if len(events) > 0 && !events[0].success { + t.Errorf("ResumeAttempt success = false, want true") + } +} + +func TestIntegration_MetricsCollector_FrameDropped_SendFull(t *testing.T) { + rec := &recordingCollector{} + connected := make(chan wspulse.Connection, 1) + + srv := wspulse.NewServer( + func(r *http.Request) (string, string, error) { + return "drop-room", "drop-conn", nil + }, + wspulse.WithMetrics(rec), + wspulse.WithSendBufferSize(1), + wspulse.WithOnConnect(func(c wspulse.Connection) { + connected <- c + }), + ) + ts := httptest.NewServer(srv) + defer func() { + srv.Close() + ts.Close() + }() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + dialer := websocket.Dialer{HandshakeTimeout: 3 * time.Second} + c, resp, err := dialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Dial: %v", err) + } + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + defer c.Close() + + var conn wspulse.Connection + select { + case conn = <-connected: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for connect") + } + + // Rapid-fire send to trigger at least one buffer-full drop. + // With send buffer size 1, the writePump may drain between individual + // sends, so we loop until at least one ErrSendBufferFull is observed. + frame := wspulse.Frame{Event: "fill", Payload: []byte(`{}`)} + var gotBufferFull bool + for i := 0; i < 200; i++ { + if sendErr := conn.Send(frame); sendErr == wspulse.ErrSendBufferFull { + gotBufferFull = true + break + } + } + if !gotBufferFull { + t.Fatal("expected at least one ErrSendBufferFull in 200 rapid sends") + } + + if n := rec.countByName("FrameDropped"); n < 1 { + t.Errorf("FrameDropped: want >= 1, got %d", n) + } +} + +func TestIntegration_MetricsCollector_FrameDropped_BroadcastDropOldest(t *testing.T) { + rec := &recordingCollector{} + connected := make(chan struct{}, 1) + + srv := wspulse.NewServer( + func(r *http.Request) (string, string, error) { + return "drop-room", "drop-conn", nil + }, + wspulse.WithMetrics(rec), + wspulse.WithSendBufferSize(1), + wspulse.WithOnConnect(func(_ wspulse.Connection) { + select { + case connected <- struct{}{}: + default: + } + }), + ) + ts := httptest.NewServer(srv) + defer func() { + srv.Close() + ts.Close() + }() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + dialer := websocket.Dialer{HandshakeTimeout: 3 * time.Second} + c, resp, err := dialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Dial: %v", err) + } + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + defer c.Close() + + select { + case <-connected: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for connect") + } + + frame := wspulse.Frame{Event: "fill", Payload: []byte(`{}`)} + // Fill buffer via broadcast (drop-oldest path). + _ = srv.Broadcast("drop-room", frame) + // Second broadcast triggers drop-oldest on the full buffer. + _ = srv.Broadcast("drop-room", frame) + // Third broadcast to ensure at least one drop-oldest fires. + _ = srv.Broadcast("drop-room", frame) + + // Poll until FrameDropped is observed instead of sleeping. + deadline := time.Now().Add(3 * time.Second) + for { + if n := rec.countByName("FrameDropped"); n >= 1 { + break + } + if time.Now().After(deadline) { + t.Fatalf("timed out waiting for FrameDropped, got %d", rec.countByName("FrameDropped")) + } + time.Sleep(5 * time.Millisecond) + } +} + +func TestIntegration_MetricsCollector_PongTimeout(t *testing.T) { + rec := &recordingCollector{} + disconnected := make(chan struct{}, 1) + + srv := wspulse.NewServer( + func(r *http.Request) (string, string, error) { + return "timeout-room", "timeout-conn", nil + }, + wspulse.WithMetrics(rec), + // Very short pongWait so the test doesn't take long. + wspulse.WithHeartbeat(50*time.Millisecond, 100*time.Millisecond), + wspulse.WithOnDisconnect(func(_ wspulse.Connection, _ error) { + select { + case disconnected <- struct{}{}: + default: + } + }), + ) + ts := httptest.NewServer(srv) + defer func() { + srv.Close() + ts.Close() + }() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + // Use a raw dialer that does NOT respond to pings. + dialer := websocket.Dialer{HandshakeTimeout: 3 * time.Second} + c, resp, err := dialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Dial: %v", err) + } + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + + // Disable the default pong handler so the server's pongWait expires. + c.SetPingHandler(func(string) error { return nil }) + // Must read to process control frames; the read will eventually fail. + go func() { + for { + _, _, err := c.ReadMessage() + if err != nil { + return + } + } + }() + + select { + case <-disconnected: + case <-time.After(3 * time.Second): + t.Fatal("timed out waiting for pong timeout disconnect") + } + _ = c.Close() + + if n := rec.countByName("PongTimeout"); n != 1 { + t.Errorf("PongTimeout: want 1, got %d", n) + } +} + +func TestIntegration_MetricsCollector_Shutdown(t *testing.T) { + rec := &recordingCollector{} + connected := make(chan struct{}, 4) + + srv := wspulse.NewServer( + func(r *http.Request) (string, string, error) { + return "shutdown-room", "", nil + }, + wspulse.WithMetrics(rec), + wspulse.WithOnConnect(func(_ wspulse.Connection) { + connected <- struct{}{} + }), + ) + ts := httptest.NewServer(srv) + defer ts.Close() + + wsURL := "ws" + strings.TrimPrefix(ts.URL, "http") + dialer := websocket.Dialer{HandshakeTimeout: 3 * time.Second} + + // Open 2 connections. + c1, resp1, err := dialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Dial c1: %v", err) + } + if resp1 != nil && resp1.Body != nil { + resp1.Body.Close() + } + c2, resp2, err := dialer.Dial(wsURL, nil) + if err != nil { + t.Fatalf("Dial c2: %v", err) + } + if resp2 != nil && resp2.Body != nil { + resp2.Body.Close() + } + defer c1.Close() + defer c2.Close() + for i := 0; i < 2; i++ { + select { + case <-connected: + case <-time.After(3 * time.Second): + t.Fatalf("timed out waiting for connection %d", i+1) + } + } + + // Shutdown the server. + srv.Close() + + // Verify ConnectionClosed fired for both connections with server_close reason. + closedEvents := rec.eventsByName("ConnectionClosed") + if len(closedEvents) != 2 { + t.Fatalf("ConnectionClosed: want 2, got %d", len(closedEvents)) + } + for _, e := range closedEvents { + if e.reason != wspulse.DisconnectServerClose { + t.Errorf("ConnectionClosed reason = %q, want %q", e.reason, wspulse.DisconnectServerClose) + } + } + + // Verify RoomDestroyed fired. + if n := rec.countByName("RoomDestroyed"); n != 1 { + t.Errorf("RoomDestroyed: want 1, got %d", n) + } +} diff --git a/metrics_test.go b/metrics_test.go new file mode 100644 index 0000000..78796e5 --- /dev/null +++ b/metrics_test.go @@ -0,0 +1,130 @@ +package wspulse_test + +import ( + "net/http" + "sync" + "testing" + "time" + + wspulse "github.com/wspulse/server" +) + +// ── NoopCollector ───────────────────────────────────────────────────────────── + +func TestNoopCollector_ImplementsInterface(t *testing.T) { + t.Parallel() + // Compile-time check already exists in metrics.go; this test documents intent. + var _ wspulse.MetricsCollector = wspulse.NoopCollector{} +} + +func TestNoopCollector_AllMethodsCallable(t *testing.T) { + t.Parallel() + var c wspulse.NoopCollector + c.ConnectionOpened("room", "conn") + c.ConnectionClosed("room", "conn", time.Second, wspulse.DisconnectNormal) + c.ResumeAttempt("room", "conn", true) + c.RoomCreated("room") + c.RoomDestroyed("room") + c.MessageReceived("room", 100) + c.MessageBroadcast("room", 100, 5) + c.MessageSent("room", "conn", 100) + c.FrameDropped("room", "conn") + c.SendBufferUtilization("room", "conn", 10, 256) + c.PongTimeout("room", "conn") +} + +// ── WithMetrics option ──────────────────────────────────────────────────────── + +func TestWithMetrics_NilPanics(t *testing.T) { + t.Parallel() + defer func() { + if r := recover(); r == nil { + t.Error("expected panic for nil collector") + } + }() + _ = wspulse.WithMetrics(nil) +} + +func TestWithMetrics_DefaultIsNoop(t *testing.T) { + t.Parallel() + // Server should start and close cleanly without WithMetrics (uses NoopCollector). + srv := wspulse.NewServer(func(_ *http.Request) (string, string, error) { + return "room", "conn", nil + }) + srv.Close() +} + +func TestWithMetrics_CustomCollector_Accepted(t *testing.T) { + t.Parallel() + srv := wspulse.NewServer( + func(_ *http.Request) (string, string, error) { + return "room", "", nil + }, + wspulse.WithMetrics(&recordingCollector{}), + ) + t.Cleanup(srv.Close) +} + +// ── recordingCollector ──────────────────────────────────────────────────────── + +// recordingCollector is a test helper that records all metrics calls. +// Safe for concurrent use. +type recordingCollector struct { + mu sync.Mutex + events []metricsEvent +} + +type metricsEvent struct { + name string + roomID string + connectionID string + sizeBytes int + fanOut int + duration time.Duration + success bool + used int + capacity int + reason wspulse.DisconnectReason +} + +func (r *recordingCollector) record(e metricsEvent) { + r.mu.Lock() + r.events = append(r.events, e) + r.mu.Unlock() +} + +func (r *recordingCollector) ConnectionOpened(roomID, connectionID string) { + r.record(metricsEvent{name: "ConnectionOpened", roomID: roomID, connectionID: connectionID}) +} +func (r *recordingCollector) ConnectionClosed(roomID, connectionID string, duration time.Duration, reason wspulse.DisconnectReason) { + r.record(metricsEvent{name: "ConnectionClosed", roomID: roomID, connectionID: connectionID, duration: duration, reason: reason}) +} +func (r *recordingCollector) ResumeAttempt(roomID, connectionID string, success bool) { + r.record(metricsEvent{name: "ResumeAttempt", roomID: roomID, connectionID: connectionID, success: success}) +} +func (r *recordingCollector) RoomCreated(roomID string) { + r.record(metricsEvent{name: "RoomCreated", roomID: roomID}) +} +func (r *recordingCollector) RoomDestroyed(roomID string) { + r.record(metricsEvent{name: "RoomDestroyed", roomID: roomID}) +} +func (r *recordingCollector) MessageReceived(roomID string, sizeBytes int) { + r.record(metricsEvent{name: "MessageReceived", roomID: roomID, sizeBytes: sizeBytes}) +} +func (r *recordingCollector) MessageBroadcast(roomID string, sizeBytes int, fanOut int) { + r.record(metricsEvent{name: "MessageBroadcast", roomID: roomID, sizeBytes: sizeBytes, fanOut: fanOut}) +} +func (r *recordingCollector) MessageSent(roomID, connectionID string, sizeBytes int) { + r.record(metricsEvent{name: "MessageSent", roomID: roomID, connectionID: connectionID, sizeBytes: sizeBytes}) +} +func (r *recordingCollector) FrameDropped(roomID, connectionID string) { + r.record(metricsEvent{name: "FrameDropped", roomID: roomID, connectionID: connectionID}) +} +func (r *recordingCollector) SendBufferUtilization(roomID, connectionID string, used, capacity int) { + r.record(metricsEvent{name: "SendBufferUtilization", roomID: roomID, connectionID: connectionID, used: used, capacity: capacity}) +} +func (r *recordingCollector) PongTimeout(roomID, connectionID string) { + r.record(metricsEvent{name: "PongTimeout", roomID: roomID, connectionID: connectionID}) +} + +var _ wspulse.MetricsCollector = (*recordingCollector)(nil) diff --git a/options.go b/options.go index 54ce530..0ea734b 100644 --- a/options.go +++ b/options.go @@ -52,6 +52,7 @@ type serverConfig struct { clock clock upgraderReadBufferSize int upgraderWriteBufferSize int + metrics MetricsCollector } func defaultConfig(connect ConnectFunc) *serverConfig { @@ -69,6 +70,7 @@ func defaultConfig(connect ConnectFunc) *serverConfig { clock: realClock{}, upgraderReadBufferSize: 1024, upgraderWriteBufferSize: 1024, + metrics: NoopCollector{}, } } @@ -250,3 +252,13 @@ func WithUpgraderBufferSize(readSize, writeSize int) ServerOption { c.upgraderWriteBufferSize = writeSize } } + +// WithMetrics configures the MetricsCollector used by the Server. +// Defaults to NoopCollector{} if not set. +// Panics if collector is nil. +func WithMetrics(collector MetricsCollector) ServerOption { + if collector == nil { + panic("wspulse: WithMetrics: collector must not be nil") + } + return func(c *serverConfig) { c.metrics = collector } +} diff --git a/resume.go b/resume.go index cb3fc14..cb37e5f 100644 --- a/resume.go +++ b/resume.go @@ -23,16 +23,18 @@ func newRingBuffer(capacity int) *ringBuffer { // Push appends data to the buffer. If the buffer is full, the oldest // element is dropped (drop-oldest backpressure, matching the broadcast -// strategy used for the send channel). -func (rb *ringBuffer) Push(data []byte) { +// strategy used for the send channel). Returns true if an element was +// dropped to make room. +func (rb *ringBuffer) Push(data []byte) (dropped bool) { if rb.size < rb.cap { index := (rb.head + rb.size) % rb.cap rb.data[index] = data rb.size++ - } else { - rb.data[rb.head] = data - rb.head = (rb.head + 1) % rb.cap + return false } + rb.data[rb.head] = data + rb.head = (rb.head + 1) % rb.cap + return true } // Drain returns all buffered frames in FIFO order and resets the buffer. diff --git a/session.go b/session.go index f5ef9ea..32194c6 100644 --- a/session.go +++ b/session.go @@ -1,6 +1,8 @@ package wspulse import ( + "errors" + "net" "runtime/debug" "sync" "time" @@ -73,6 +75,8 @@ type session struct { resumeBuffer *ringBuffer // nil when resume is disabled suspendEpoch uint64 // monotonically increases on each detachWS; stale grace timers compare this + connectedAt time.Time // session creation time; written once, read-only thereafter + closeOnce sync.Once config *serverConfig } @@ -115,11 +119,18 @@ func (s *session) enqueue(data []byte, dropOldest bool) error { // Check if we need to buffer (suspended state). s.mu.Lock() if s.state == stateSuspended && s.resumeBuffer != nil { - s.resumeBuffer.Push(data) + dropped := s.resumeBuffer.Push(data) s.mu.Unlock() - s.config.logger.Debug("wspulse: frame buffered to resumeBuffer", - zap.String("conn_id", s.id), - ) + if dropped { + s.config.metrics.FrameDropped(s.roomID, s.id) + s.config.logger.Debug("wspulse: oldest frame dropped from resumeBuffer (backpressure)", + zap.String("conn_id", s.id), + ) + } else { + s.config.logger.Debug("wspulse: frame buffered to resumeBuffer", + zap.String("conn_id", s.id), + ) + } return nil } s.mu.Unlock() @@ -135,6 +146,7 @@ func (s *session) enqueue(data []byte, dropOldest bool) error { s.config.logger.Debug("wspulse: send buffer full, dropping frame", zap.String("conn_id", s.id), ) + s.config.metrics.FrameDropped(s.roomID, s.id) return ErrSendBufferFull } } @@ -145,6 +157,7 @@ func (s *session) enqueue(data []byte, dropOldest bool) error { s.config.logger.Debug("wspulse: oldest frame dropped from send buffer (backpressure)", zap.String("conn_id", s.id), ) + s.config.metrics.FrameDropped(s.roomID, s.id) default: } select { @@ -154,6 +167,7 @@ func (s *session) enqueue(data []byte, dropOldest bool) error { s.config.logger.Debug("wspulse: frame irrecoverably dropped: send buffer still full after drop-oldest", zap.String("conn_id", s.id), ) + s.config.metrics.FrameDropped(s.roomID, s.id) return ErrSendBufferFull } } @@ -315,6 +329,7 @@ func (s *session) attachWS(transport *websocket.Conn, h *hub, onResumeComplete f s.config.logger.Debug("wspulse: oldest frame dropped to make room for resume frame", zap.String("conn_id", s.id), ) + s.config.metrics.FrameDropped(s.roomID, s.id) default: } select { @@ -326,6 +341,7 @@ func (s *session) attachWS(transport *websocket.Conn, h *hub, onResumeComplete f s.config.logger.Warn("wspulse: resume frame dropped: send buffer still full after drop-oldest", zap.String("conn_id", s.id), ) + s.config.metrics.FrameDropped(s.roomID, s.id) } } } @@ -438,6 +454,10 @@ func (s *session) readPump(transport *websocket.Conn, h *hub) { for { _, data, err := transport.ReadMessage() if err != nil { + var ne net.Error + if errors.As(err, &ne) && ne.Timeout() { + s.config.metrics.PongTimeout(s.roomID, s.id) + } if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, @@ -452,6 +472,7 @@ func (s *session) readPump(transport *websocket.Conn, h *hub) { } return } + s.config.metrics.MessageReceived(s.roomID, len(data)) if fn := s.config.onMessage; fn != nil { frame, decodeErr := s.config.codec.Decode(data) if decodeErr != nil { @@ -512,6 +533,8 @@ func (s *session) writePump(transport *websocket.Conn, pumpQuit, pumpDone chan s s.config.logger.Warn("wspulse: write failed", zap.String("conn_id", s.id), zap.Error(err)) return } + s.config.metrics.MessageSent(s.roomID, s.id, len(data)) + s.config.metrics.SendBufferUtilization(s.roomID, s.id, len(s.send), cap(s.send)) case <-ticker.C: _ = transport.SetWriteDeadline(time.Now().Add(s.config.writeWait))