Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@

### Added

- `MetricsCollector` interface — typed instrumentation hooks for connection lifecycle, room state, throughput, backpressure, and heartbeat health
- `NoopCollector` — default minimal-overhead implementation that discards all events; embed it in custom implementations for forward-compatible additions
- `WithMetrics(mc MetricsCollector)` server option — plug in any metrics backend (Prometheus, OTel, or custom)
- `PanicError` exported type — wraps panics recovered from `OnMessage` handlers with the panic value and goroutine stack trace; delivered to `OnDisconnect` when resumption is disabled, or to `OnTransportDrop` when resumption is enabled (in which case `OnDisconnect` may later fire with a nil error); use `errors.As` on whichever callback error is non-nil to distinguish handler panics from transport failures
- `WithUpgraderBufferSize(readSize, writeSize int)` option — configures the WebSocket upgrader I/O buffer sizes (default 1024 bytes each)
- `DisconnectReason` type and constants (`DisconnectNormal`, `DisconnectKick`, `DisconnectGraceExpired`, `DisconnectServerClose`, `DisconnectDuplicate`) — `ConnectionClosed` now includes a reason parameter so metrics backends can distinguish disconnect causes
- Benchmarks for ring buffer, broadcast fan-out, direct Send throughput, and drop-oldest backpressure

### Changed

- Broadcast fan-out reuses a scratch slice on the hub instead of allocating a new snapshot per invocation — zero-alloc steady state since the hub event loop is single-threaded
- `ConnectFunc` GoDoc now documents that `roomID` is ignored on session resumption
- `Server.Close()` now emits `ConnectionClosed` (with `DisconnectServerClose` reason) and `RoomDestroyed` metrics for all sessions during shutdown

### Fixed

Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ See [wspulse/core](https://github.com/wspulse/core) for the full `router` API.
| `WithCodec(c)` | JSONCodec |
| `WithCheckOrigin(fn)` | allow all |
| `WithLogger(l)` | zap.NewNop() — accepts `*zap.Logger` |
| `WithMetrics(mc)` | NoopCollector — accepts `MetricsCollector` |

---

Expand All @@ -156,6 +157,36 @@ See [wspulse/core](https://github.com/wspulse/core) for the full `router` API.
- **Swappable codec** — JSON by default; implement the `Codec` interface to plug in any encoding (binary, Protobuf, MessagePack, etc.).
- **Kick** — `Server.Kick(connectionID)` always destroys the session immediately, bypassing the resume window.
- **Graceful shutdown** — `Server.Close()` sends close frames to all connected clients, drains in-flight registrations, and fires `OnDisconnect` for every session.
- **Metrics** — optional `MetricsCollector` interface for observability; default is `NoopCollector` (minimal overhead). See [Metrics](#metrics) below.

---

## Metrics

wspulse/server exposes a `MetricsCollector` interface with typed hooks for connection lifecycle, room state, throughput, backpressure, and heartbeat health. The default is `NoopCollector{}` (minimal overhead). Embed `NoopCollector` in custom implementations for forward-compatible additions.

```go
// Use a contrib adapter (e.g. wspulse/metrics-prometheus or wspulse/metrics-otel),
// or implement MetricsCollector yourself.
var collector wspulse.MetricsCollector = myCollector

srv := wspulse.NewServer(connect,
wspulse.WithMetrics(collector),
)
```

The interface covers:

| Hook | Description |
| --- | --- |
| `ConnectionOpened` / `ConnectionClosed` | Session lifecycle with duration and disconnect reason |
| `RoomCreated` / `RoomDestroyed` | Room allocation and deallocation |
| `MessageReceived` / `MessageSent` / `MessageBroadcast` | Throughput with byte sizes and fan-out |
| `FrameDropped` / `SendBufferUtilization` | Backpressure visibility |
| `ResumeAttempt` | Session resumption tracking |
| `PongTimeout` | Heartbeat health |

Implement `MetricsCollector` to plug in any backend (Prometheus, OTel, or custom). See [doc/internals.md](doc/internals.md) for goroutine call sites and thread safety requirements.

---

Expand Down
16 changes: 16 additions & 0 deletions TODOS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# TODOS

## Failed resume detection: ResumeAttempt(..., false)

When a client reconnects with a connectionID that has already expired (grace
window elapsed), identify this as a "failed resume" and emit
`ResumeAttempt(roomID, connectionID, false)` before creating the new session.

The `success bool` parameter exists but is currently always `true`. Detecting
failed resumes is non-trivial: after grace expiry the session is destroyed and
the connectionID is freed. Distinguishing "new client reusing an ID" from
"client that missed the resume window" may require protocol-level signaling
(e.g. a client-sent `resume` intent header on the upgrade request).

**Depends on:** MetricsCollector interface (this branch), possibly protocol
changes in the `.github` repo.
53 changes: 53 additions & 0 deletions doc/internals.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ belong to the consumers of wspulse/server.
3. [Backpressure and Send Buffer](#3-backpressure-and-send-buffer)
4. [Connection Teardown](#4-connection-teardown)
5. [Session Resumption](#5-session-resumption)
6. [Metrics](#6-metrics)

---

Expand Down Expand Up @@ -263,3 +264,55 @@ Kick() returns nil

If the hub has already shut down (`<-hub.done`), `Kick` returns
`ErrServerClosed` without blocking.

---

## 6. Metrics

wspulse/server exposes an optional `MetricsCollector` interface for
instrumentation. The default is `NoopCollector{}`, a no-op implementation
that discards all events with minimal overhead.

### Configuration

```go
wspulse.NewServer(connect,
wspulse.WithMetrics(myCollector), // custom implementation
)
```

If `WithMetrics` is not called, the server uses `NoopCollector`.

### Interface

`MetricsCollector` defines typed methods for each lifecycle event.
All methods are fire-and-forget (no return value). Implementations
must be safe for concurrent use.

### Goroutine call sites

| Method | Called from |
| ----------------------- | ------------------- |
| `RoomCreated` | hub goroutine |
| `RoomDestroyed` | hub goroutine |
| `ConnectionOpened` | hub goroutine |
| `ConnectionClosed` | hub goroutine |
| `ResumeAttempt` | hub goroutine |
| `MessageBroadcast` | hub goroutine |
| `MessageReceived` | readPump goroutine |
| `PongTimeout` | readPump goroutine |
| `MessageSent` | writePump goroutine |
| `SendBufferUtilization` | writePump goroutine |
| `FrameDropped` | hub goroutine (broadcast), caller goroutine (Send), or transition goroutine (resume drain) |

### Connection duration

`ConnectionClosed` receives a `duration` parameter computed as
`time.Since(session.connectedAt)`. The `connectedAt` timestamp is set
once when the session is created in `handleRegister`. This means the
duration reflects the **logical session lifetime**, including any time
spent in the suspended state during session resumption.

`ConnectionClosed` also receives a `reason` parameter of type
`DisconnectReason` that indicates why the session was terminated.
See the `DisconnectReason` constants for possible values.
69 changes: 53 additions & 16 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package wspulse
import (
"sync"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
"go.uber.org/zap"
Expand Down Expand Up @@ -141,6 +142,7 @@ func (h *hub) handleRegister(message registerMessage) {
onResume = func() { fn(existing) }
}
existing.attachWS(message.transport, h, onResume)
h.config.metrics.ResumeAttempt(existing.roomID, existing.id, true)
h.config.logger.Info("wspulse: session resumed",
zap.String("conn_id", message.connectionID),
)
Expand All @@ -151,7 +153,7 @@ func (h *hub) handleRegister(message registerMessage) {
h.config.logger.Warn("wspulse: duplicate conn_id, kicking existing session",
zap.String("conn_id", message.connectionID),
)
h.disconnectSession(existing, ErrDuplicateConnectionID)
h.disconnectSession(existing, ErrDuplicateConnectionID, DisconnectDuplicate)

case stateClosed:
// Close() was called externally before the hub processed the
Expand All @@ -161,32 +163,39 @@ func (h *hub) handleRegister(message registerMessage) {
h.config.logger.Debug("wspulse: stale closed session removed",
zap.String("conn_id", message.connectionID),
)
h.disconnectSession(existing, nil)
h.disconnectSession(existing, nil, DisconnectNormal)
}
}

// Create a new session.
newSession := &session{
id: message.connectionID,
roomID: message.roomID,
send: make(chan []byte, h.config.sendBufferSize),
done: make(chan struct{}),
state: stateConnected,
config: h.config,
id: message.connectionID,
roomID: message.roomID,
send: make(chan []byte, h.config.sendBufferSize),
done: make(chan struct{}),
state: stateConnected,
connectedAt: time.Now(),
config: h.config,
}
if h.config.resumeWindow > 0 {
newSession.resumeBuffer = newRingBuffer(h.config.sendBufferSize)
}

var roomCreated bool
h.mu.Lock()
if h.rooms[message.roomID] == nil {
h.rooms[message.roomID] = make(map[string]*session)
roomCreated = true
}
h.rooms[message.roomID][message.connectionID] = newSession
h.connectionsByID[message.connectionID] = newSession
h.mu.Unlock()
if roomCreated {
h.config.metrics.RoomCreated(message.roomID)
}

newSession.attachWS(message.transport, h, nil)
h.config.metrics.ConnectionOpened(message.roomID, message.connectionID)

h.config.logger.Debug("wspulse: session connected",
zap.String("conn_id", message.connectionID),
Expand Down Expand Up @@ -246,7 +255,7 @@ func (h *hub) handleTransportDied(message transportDiedMessage) {
h.config.logger.Debug("wspulse: transport-died for closed session, cleaning up",
zap.String("conn_id", target.id),
)
h.disconnectSession(target, message.err)
h.disconnectSession(target, message.err, DisconnectNormal)
} else {
h.config.logger.Debug("wspulse: transport-died for unregistered closed session, skipping",
zap.String("conn_id", target.id),
Expand Down Expand Up @@ -282,7 +291,7 @@ func (h *hub) handleTransportDied(message transportDiedMessage) {
h.config.logger.Info("wspulse: suspended session closed by application (race path)",
zap.String("conn_id", target.id),
)
h.disconnectSession(target, nil)
h.disconnectSession(target, nil, DisconnectNormal)
return
}
target.graceTimer = timer
Expand All @@ -303,7 +312,7 @@ func (h *hub) handleTransportDied(message transportDiedMessage) {
zap.String("conn_id", target.id),
zap.Error(message.err),
)
h.disconnectSession(target, message.err)
h.disconnectSession(target, message.err, DisconnectNormal)
}

// handleGraceExpired destroys a session whose resume window has elapsed
Expand Down Expand Up @@ -345,12 +354,13 @@ func (h *hub) handleGraceExpired(message graceExpiredMessage) {
h.config.logger.Info("wspulse: session expired",
zap.String("conn_id", target.id),
)
h.disconnectSession(target, nil, DisconnectGraceExpired)
} else {
h.config.logger.Info("wspulse: suspended session closed by application",
zap.String("conn_id", target.id),
)
h.disconnectSession(target, nil, DisconnectNormal)
}
h.disconnectSession(target, nil)
}

// handleKick removes a session, closes it, and fires onDisconnect.
Expand All @@ -372,7 +382,7 @@ func (h *hub) handleKick(request kickRequest) {
zap.String("conn_id", request.connectionID),
)

h.disconnectSession(target, nil)
h.disconnectSession(target, nil, DisconnectKick)
request.result <- nil
}

Expand All @@ -395,6 +405,7 @@ func (h *hub) handleBroadcast(message broadcastMessage) {
return
}

enqueued := 0
for _, target := range h.scratch {
select {
case <-target.done:
Expand All @@ -405,10 +416,12 @@ func (h *hub) handleBroadcast(message broadcastMessage) {
// Use enqueue with drop-oldest so suspended sessions buffer to
// resumeBuffer and connected sessions apply backpressure uniformly.
_ = target.enqueue(message.data, true)
enqueued++
}
h.config.metrics.MessageBroadcast(message.roomID, len(message.data), enqueued)
h.config.logger.Debug("wspulse: broadcast dispatched",
zap.String("room_id", message.roomID),
zap.Int("recipients", len(h.scratch)),
zap.Int("recipients", enqueued),
)

// Clear pointers so disconnected sessions can be GC'd.
Expand All @@ -422,8 +435,9 @@ func (h *hub) handleBroadcast(message broadcastMessage) {
// disconnectSession removes the session from hub maps, closes it, and fires
// onDisconnect. Safe to call even if Close() was already called externally
// (closeOnce makes it idempotent).
func (h *hub) disconnectSession(target *session, err error) {
func (h *hub) disconnectSession(target *session, err error, reason DisconnectReason) {
h.removeSession(target)
h.config.metrics.ConnectionClosed(target.roomID, target.id, time.Since(target.connectedAt), reason)
_ = target.Close()
if fn := h.config.onDisconnect; fn != nil {
go fn(target, err)
Expand All @@ -434,17 +448,22 @@ func (h *hub) disconnectSession(target *session, err error) {
func (h *hub) removeSession(target *session) {
target.cancelGraceTimer()

var roomDestroyed bool
h.mu.Lock()
if room := h.rooms[target.roomID]; room != nil && room[target.id] == target {
delete(room, target.id)
if len(room) == 0 {
delete(h.rooms, target.roomID)
roomDestroyed = true
}
}
if h.connectionsByID[target.id] == target {
delete(h.connectionsByID, target.id)
}
h.mu.Unlock()
if roomDestroyed {
h.config.metrics.RoomDestroyed(target.roomID)
}
}

// shutdown closes every active session. Called once by run() when done fires.
Expand All @@ -463,17 +482,35 @@ func (h *hub) shutdown() {
h.config.logger.Info("wspulse: hub shutting down",
zap.Int("active_sessions", sessionCount),
)
for _, room := range h.rooms {
type closedInfo struct {
roomID string
connectionID string
duration time.Duration
}
var closedInfos []closedInfo
var destroyedRooms []string
for roomID, room := range h.rooms {
for _, target := range room {
target.cancelGraceTimer()
closedInfos = append(closedInfos, closedInfo{target.roomID, target.id, time.Since(target.connectedAt)})
_ = target.Close()
disconnected = append(disconnected, target)
}
destroyedRooms = append(destroyedRooms, roomID)
}
h.rooms = make(map[string]map[string]*session)
h.connectionsByID = make(map[string]*session)
h.mu.Unlock()

// Emit metrics outside the lock to avoid deadlocks if the
// MetricsCollector calls back into server APIs.
for _, info := range closedInfos {
h.config.metrics.ConnectionClosed(info.roomID, info.connectionID, info.duration, DisconnectServerClose)
}
for _, roomID := range destroyedRooms {
h.config.metrics.RoomDestroyed(roomID)
}

// Drain in-flight register messages to prevent goroutine leaks.
for {
select {
Expand Down
Loading