diff --git a/docs/PROJECT-MEMORY.md b/docs/PROJECT-MEMORY.md index 2285f0a..846477d 100644 --- a/docs/PROJECT-MEMORY.md +++ b/docs/PROJECT-MEMORY.md @@ -15,9 +15,11 @@ Stateless WebSocket router between mobile clients and pyry binaries. Internet-ex | `WSConn` adapter (`nhooyr.io/websocket.Conn` → registry `Conn`; per-conn write mutex; `Close`-cancelled context; 10s `Send` deadline) | Done (#15) | `internal/relay/ws_conn.go` | | `/healthz` JSON endpoint (`status`, `version`, `connected_binaries`, `connected_phones`, `uptime_seconds`; `Cache-Control: no-store`; unauthenticated) | Done (#10) | `internal/relay/healthz.go`, `cmd/pyrycode-relay/main.go` | | WS upgrade on `/v1/server` (header gate pre-upgrade; `ClaimServer`; `4409` on conflict; `CloseRead`-held until #6; disconnect defers a 30s grace release via `ScheduleReleaseServer`) | Done (#16, #21) | `internal/relay/server_endpoint.go`, `cmd/pyrycode-relay/main.go` | -| WS upgrade on `/v1/client` (header gate pre-upgrade; `RegisterPhone`; `4404` if no binary; `CloseRead`-held until #6; disconnect calls `UnregisterPhone`; token never parsed or logged) | Done (#5) | `internal/relay/client_endpoint.go`, `cmd/pyrycode-relay/main.go` | +| WS upgrade on `/v1/client` (header gate pre-upgrade; `RegisterPhone`; `4404` if no binary; data path via `StartPhoneForwarder`; disconnect calls `UnregisterPhone`; token never parsed or logged) | Done (#5, #25) | `internal/relay/client_endpoint.go`, `cmd/pyrycode-relay/main.go` | | Header validation (`x-pyrycode-server`, `x-pyrycode-version`, `user-agent` on `/v1/server`; `x-pyrycode-server`, `x-pyrycode-token`, `user-agent` on `/v1/client`; optional `x-pyrycode-device-name` on `/v1/client`) | Done (#16, #5) | `internal/relay/server_endpoint.go`, `internal/relay/client_endpoint.go` | -| Frame forwarding using the routing envelope | Not started | — | +| Phone-side frame forwarder (`StartPhoneForwarder`: read frame → `Marshal(connID, frame)` → `BinaryFor(serverID).Send(wrapped)`; synchronous; opaque inner bytes; per-frame `BinaryFor` picks up grace-window state) | Done (#25) | `internal/relay/forward.go`, `internal/relay/client_endpoint.go` | +| `WSConn.Read` (single-caller; no `closeCtx` join — underlying `Close` aborts in-flight reads) | Done (#25) | `internal/relay/ws_conn.go` | +| Binary-side frame forwarder | Not started | — | | `conn_id` generation scheme | Not started | — | | Threat model doc — operational surface (deploy, supply chain, DoS, log hygiene, cert handling, TLS, error leakage) | Done (#11) | `docs/threat-model.md` | @@ -39,7 +41,9 @@ Stateless WebSocket router between mobile clients and pyry binaries. Internet-ex - **Policy values live at the wiring site, not as package-level constants.** `30*time.Second` for the grace window is a literal in `cmd/pyrycode-relay/main.go`, threaded into `ServerHandler` as a constructor parameter — it appears exactly once, the value is policy (matches protocol spec), and inlining keeps the protocol-spec linkage visible where the relay is composed. Tests pass ms-scale durations through the same parameter. Promote to a package-level constant only when a second wiring entry point needs the same value. Adopted in `/v1/server` grace duration (#21). - **Pointer-identity for stale `time.AfterFunc` fires.** `time.Timer.Stop()` returns false if the timer's func has already started executing. Wrap each pending timer in a small struct and store the wrapper pointer in a map; the `AfterFunc` closure captures the wrapper pointer and asserts `map[key] == self` under the lock before acting. If a faster goroutine replaced the entry, the pointer no longer matches and the closure no-ops. Capturing the `*time.Timer` directly forces a self-referential local var (assigned after `AfterFunc` returns) which trips the race detector under stress; the wrapper avoids that. Adopted in `Registry.ScheduleReleaseServer` (#20). Same shape applies to any "one cancellable timer per key" pattern. - **Credentials the relay does not validate are presence-checked then discarded — never logged, never put in error strings.** `/v1/client`'s `X-Pyrycode-Token` is opaque to the relay (the binary owns verification). The handler reads the token into a local string, branches on `!= ""`, and lets the local go out of scope unread. The log-event field set is enumerated explicitly in the doc; the token name does not appear in any `slog` call, `fmt.Errorf`, or response body. Defence is layered: spec, code review, and the structural absence of any code path that uses the value after the gate. Same posture extends to any future "courier credential" the relay carries but does not own (e.g. session resume tokens). Adopted in `/v1/client` (#5). -- **Hold long-lived WS handlers open with `c.CloseRead(r.Context())` plus `<-readCtx.Done()` until the real read loop lands.** `CloseRead` drains-and-discards frames (including control frames — pings must be processed for the connection to observe a peer-side close). The frame loop ticket replaces `<-readCtx.Done()` with the actual read body in the same call site. Adopted in `/v1/server` (#16) and `/v1/client` (#5) pending #6. +- **Hold long-lived WS handlers open with `c.CloseRead(r.Context())` plus `<-readCtx.Done()` until the real read loop lands.** `CloseRead` drains-and-discards frames (including control frames — pings must be processed for the connection to observe a peer-side close). The frame-loop ticket replaces both the `CloseRead` call and the `<-readCtx.Done()` block with the actual read pump in the same call site — keeping `CloseRead` alongside a real reader would race the sole-reader contract. Adopted in `/v1/server` (#16) and `/v1/client` (#5); `/v1/client` swapped to `StartPhoneForwarder` in #25 (`/v1/server`'s placeholder remains until the binary-side forwarder lands). +- **Forwarder owns reading; handler owns cleanup.** The phone-side frame forwarder (#25) is a pure read pump — no `UnregisterPhone`, no `wsconn.Close()`, no extra goroutine. It runs synchronously on the HTTP handler goroutine; on return, the handler's `defer { UnregisterPhone; Close; log }` runs in the right order. Adding cleanup inside the forwarder would either double-close (idempotent, but muddies the lifecycle) or unregister twice. Pattern extends to the symmetric binary-side forwarder. +- **Define narrow read-side interfaces at the consumer, not on the adapter.** `phoneSource` (`ConnID() + Read(ctx)`) lives in `forward.go`, not on `WSConn`. Production passes `*WSConn`; tests substitute a fake without touching `WSConn`. If a future caller needs the same shape, promote the interface then — don't anticipate. Adopted in #25. - **Capture process-state timestamps in `main` after `flag.Parse()`, not as package-level vars.** `startedAt := time.Now()` lives inside `main` and is passed into the handler factory. A package-level `var startedAt = time.Now()` would fire at import time — before flag parsing, before `--version` early-returns — and be wrong for short-lived test binaries and any future deferred-serve setup. Adopted in #10. ## Conventions diff --git a/docs/knowledge/INDEX.md b/docs/knowledge/INDEX.md index c563512..3104fa3 100644 --- a/docs/knowledge/INDEX.md +++ b/docs/knowledge/INDEX.md @@ -4,7 +4,8 @@ One-line pointers into the evergreen knowledge base. Newest entries at the top o ## Features -- [`/v1/client` WS upgrade](features/client-endpoint.md) — phone-side ingress: validates `X-Pyrycode-Server` / `X-Pyrycode-Token` / `User-Agent` pre-upgrade (token presence-only, never parsed/logged); registers phone on the binary's slot, emits `4404` if no binary holds the id, holds the conn via `CloseRead` until #6's frame loop replaces it (#5). +- [Phone-side frame forwarder](features/phone-forwarder.md) — per-phone read pump: wraps each inbound phone frame in the routing envelope keyed by the phone's `conn_id` and `Send`s it to the binary holding `serverID`; opaque inner bytes; synchronous (handler discards the return); replaced `/v1/client`'s `CloseRead` placeholder; added `WSConn.Read` (single-caller) (#25). +- [`/v1/client` WS upgrade](features/client-endpoint.md) — phone-side ingress: validates `X-Pyrycode-Server` / `X-Pyrycode-Token` / `User-Agent` pre-upgrade (token presence-only, never parsed/logged); registers phone on the binary's slot, emits `4404` if no binary holds the id, hands the conn to `StartPhoneForwarder` for the data path (#5, #25). - [`/v1/server` WS upgrade](features/server-endpoint.md) — binary-side ingress: validates `X-Pyrycode-Server` / `X-Pyrycode-Version` / `User-Agent` pre-upgrade, claims the slot, emits `4409` on conflict, holds the conn via `CloseRead` until #6's frame loop replaces it; on disconnect schedules a 30s grace release so a quick reconnect inherits the slot (#21). - [`/healthz` JSON endpoint](features/healthz.md) — unauthenticated `GET /healthz` returning `{status, version, connected_binaries, connected_phones, uptime_seconds}`; `Cache-Control: no-store`, body bounded ≈135 bytes. - [WSConn adapter](features/ws-conn-adapter.md) — wraps `nhooyr.io/websocket.Conn` to satisfy the registry's `Conn`; owns the per-conn write mutex and a `Close`-cancelled context with a 10s per-`Send` deadline. diff --git a/docs/knowledge/features/client-endpoint.md b/docs/knowledge/features/client-endpoint.md index a0666f1..d02ce01 100644 --- a/docs/knowledge/features/client-endpoint.md +++ b/docs/knowledge/features/client-endpoint.md @@ -4,7 +4,7 @@ This is the public, internet-exposed endpoint. The peer is *less* trusted than `/v1/server`'s peer (which at least runs operator-issued software); anyone on the internet who learns the relay hostname can connect. Header validation runs **before** `websocket.Accept`; the token is presence-checked only and never logged. -This is the phone side only. Frame forwarding is #6; heartbeat is #7. The handler currently holds the connection open by draining-and-discarding frames — #6 swaps in the real read loop in the same call site. +This is the phone side only. Heartbeat is a future ticket. After header validation and `RegisterPhone`, the handler hands the connection to `StartPhoneForwarder` ([phone-forwarder.md](phone-forwarder.md), #25), which is the read pump for the data path. ## Wire shape @@ -69,7 +69,7 @@ mux.Handle("/v1/client", relay.ClientHandler(reg, logger)) - On any other error → `wsconn.Close()`, return (defensive; not currently reachable). 7. Log `phone_registered`. 8. `defer { reg.UnregisterPhone(serverID, connID); wsconn.Close(); log phone_unregistered }`. Registered **after** the successful `RegisterPhone` so a no-server path never tries to unregister a slot we never owned. -9. `readCtx := c.CloseRead(r.Context()); <-readCtx.Done()` — block until the peer closes (or until the registry tears the conn down on binary-grace expiry). +9. `_ = StartPhoneForwarder(r.Context(), reg, serverID, wsconn, logger)` — synchronous read pump that wraps inbound frames and `Send`s them to the binary holding `serverID`. Returns on phone close, ctx cancel, missing binary, or `Send` failure. The handler discards the return; the forwarder logs the cause. See [phone-forwarder.md](phone-forwarder.md). `randHex8` and `remoteHost` are reused verbatim from `server_endpoint.go` — same package, no duplication. @@ -102,8 +102,7 @@ Header gate failures (`400`) are not logged — same hygiene rationale as `/v1/s | Header validation | request goroutine | none | pre-upgrade; no resources held | | `websocket.Accept` | request goroutine | none | conn allocated on success | | `RegisterPhone` | request goroutine | registry write lock (held internally) | one-shot | -| `CloseRead` | spawns one read-discard goroutine | none | terminates on conn close / `r.Context()` cancel | -| `<-readCtx.Done()` | request goroutine, blocking | none | unblocks on (a) peer close, (b) registry-driven `Close` from binary-grace expiry, (c) server shutdown | +| `StartPhoneForwarder` | request goroutine, blocking; sole reader of the WS | per-frame `BinaryFor` RLock + `WSConn.writeMu` on the binary | returns on (a) phone-side close, (b) `r.Context()` cancel, (c) `Send` failure to the binary, (d) registry-driven `Close` from binary-grace expiry | | `defer` (unregister/close/log) | request goroutine | `wsconn.closeOnce` | runs only on the success path; idempotent | The handler takes no lock of its own. `WSConn.Close`'s `closeOnce` is a `sync.Once`, not a held lock. `UnregisterPhone` is a no-op on unknown `(serverID, connID)`. **No lock-order risk; no goroutine-leak path.** @@ -117,7 +116,7 @@ When a binary disconnects, `/v1/server`'s defer arms `ScheduleReleaseServer(serv 3. deletes the phones entry from the map, 4. calls `Close()` on every snapshotted phone. -The phone handler's `<-readCtx.Done()` then unblocks (the underlying conn's context was cancelled by `WSConn.Close`), the defer runs, and: +The phone handler's in-flight `WSConn.Read` then aborts with the library's close error (the underlying `*websocket.Conn.Close` was invoked by `WSConn.Close`), `StartPhoneForwarder` returns, the defer runs, and: - `reg.UnregisterPhone(serverID, connID)` no-ops (entry already deleted in step 3). - `wsconn.Close()` no-ops (`sync.Once` already fired in step 4). @@ -131,7 +130,7 @@ The phone observes the close on its socket as `StatusNormalClosure` — by delib - **Direct `c.Close` on the no-server path, not `wsconn.Close()`.** `WSConn.Close` always emits `StatusNormalClosure`; `4404` requires the underlying `*websocket.Conn`. The no-server case is a stillborn WSConn — no `Send` was attempted, no goroutine holds `writeMu` — so the WSConn invariant is preserved in spirit. See [ADR-0005](../decisions/0005-application-close-codes-via-underlying-conn.md). - **`defer` after `RegisterPhone` succeeds.** If `RegisterPhone` returns `ErrNoServer`, the handler must NOT call `UnregisterPhone` on a slot it never claimed. The registry tolerates it as a no-op today, but the structural rule sharpens once any cleanup grows side effects beyond a no-op (e.g. metrics increment in a future ticket). - **No `RegisterPhone` retry on `ErrNoServer` even during a binary's grace window.** During grace, `BinaryFor` still returns the (closed) binary, so `RegisterPhone` succeeds and `handleGraceExpiry` later closes the phone cleanly on expiry. If the slot is empty when `RegisterPhone` runs, that's a true 4404 — no other binary will pick up this phone's claim before the phone re-dials. The handler does not retry, does not poll, does not double-check. -- **`CloseRead` instead of `for { c.Read() }`.** Same as `/v1/server`: keep the goroutine alive, drain control frames so the conn observes peer-close, defer to #6 for the real read loop. Swap site is `<-readCtx.Done()` → real loop body in the same goroutine. +- **`StartPhoneForwarder` is the sole reader; `CloseRead` is gone.** Pre-#25 the handler used `c.CloseRead(r.Context())` to drain control frames pending the real read loop. With the forwarder in place, the read loop processes control frames inline with data; retaining `CloseRead` would race the sole-reader contract. See [phone-forwarder.md](phone-forwarder.md). - **`crypto/rand`-backed `connID` suffix.** 32 bits is sufficient — scoped per server-id, used only as an opaque map key in `UnregisterPhone`. `RegisterPhone` does not dedupe by `ConnID` (registry contract); collision odds at v1 scale are negligible. Using `crypto/rand` over `math/rand` is forward-compat hardening: if a future ticket exposes the conn-id, unguessable bytes avoid creating an oracle. - **No length cap or charset check on `device_name`.** Informational only; slog escapes control characters; no observed failure motivates a check. If oversized headers ever become an issue, mitigation lives upstream (per-IP rate limit, per-header byte cap) — both deferred to the DoS ticket. @@ -140,8 +139,8 @@ The phone observes the close on its socket as `StatusNormalClosure` — by delib - **No token validation.** The relay is not the trust boundary for the phone token; the binary owns it. The relay's authorization model on `/v1/client` is purely "is there a binary holding this server-id?" - **No connection caps (per-IP or global).** Documented residual in `docs/threat-model.md` § DoS resistance. Same gap as `/v1/server`, named there, not widened. - **No phone-count-per-server-id cap.** `phones[serverID]` grows under attack; the broadcast cost (when #6 lands) is the DoS shape that owns it. -- **No frame loop.** `CloseRead` discards frames until #6 replaces it. -- **No heartbeat / ping-pong.** #7. +- **No inner-frame parsing.** `StartPhoneForwarder` wraps each frame in the routing envelope and forwards opaque bytes; the binary owns inner-frame validation. +- **No heartbeat / ping-pong.** Future ticket. - **No phone-side reconnect grace.** The binary-side grace from #20 already closes orphan phones cleanly on expiry; phone-side grace is not in the protocol spec and is out of scope. - **No `400` log line.** Avoids amplifying header-floods into log volume. - **No log on `websocket.Accept` errors.** Library writes a 4xx; the failure is visible in the http access log. @@ -151,7 +150,7 @@ The phone observes the close on its socket as `StatusNormalClosure` — by delib - **Server-id enumeration via 4404 vs success.** `4404` confirms "no binary holds that id"; success confirms "a binary holds that id, and the relay accepted my token's *presence*." Both are protocol-spec-defined responses; the relay cannot withhold the signal without breaking phones. The token's *value* is validated by the binary on the first frame post-upgrade — an invalid token surfaces as `4401` from the binary forwarded by the relay (#6's territory). At the relay layer, server-id existence is observable by design. - **Token leak via crafted log line.** The token is never logged, never put into an error string, never compared with `fmt.Errorf`. Log-field set is enumerated; structurally the handler does not reach for the token after the presence-check. The defence is layered: spec, code review, and the absence of any code path that uses the token after the gate. - **Crafted `X-Pyrycode-Device-Name` to forge log lines.** slog's text and JSON handlers quote string values and escape control characters. Log-line forging is structurally blocked. -- **Slow-loris on upgrade.** Same as `/v1/server`: peer that completes handshake but never sends frames consumes one goroutine; `CloseRead` waits. Connection caps deferred. +- **Slow-loris on upgrade.** Peer that completes handshake but never sends frames parks one goroutine in `WSConn.Read` indefinitely. Connection caps deferred. - **Race: binary disconnects between phone's `RegisterPhone` and phone observing its conn live.** `RegisterPhone` succeeds; the binary's grace timer arms in parallel; if grace expires, `handleGraceExpiry` closes this phone; the handler's defer runs cleanly. Phone observes `StatusNormalClosure` and reconnects. - **Phone re-dials thousands of times to grow the phones slice forever.** Each successful register appends; each disconnect removes via `UnregisterPhone`. The slice does not leak across connections. Steady state is "concurrent live phones" — bounded by file descriptors / connection cap (deferred). - **Token-presence oracle via 4404.** Token-absent → 400; token-present + no binary → 4404; token-present + binary → success. The 400 vs 4404 difference is gated on token presence, but the *validity* of the token is not testable through the relay (the binary owns that). 4404 is not a presence oracle — it confirms the gate passed, which an attacker already knew because they sent a non-empty value. @@ -179,6 +178,7 @@ Not tested: registry mocks (use the real one — race-tested in #3); library moc ## Related - [`/v1/server`](server-endpoint.md) — sibling ingress; this handler reuses its `randHex8`, `remoteHost`, validate-pre-upgrade shape, and stillborn-WSConn close-code pattern. +- [Phone-side frame forwarder](phone-forwarder.md) — the data-path read pump this handler hands the conn to after registration. - [Connection registry](connection-registry.md) — `RegisterPhone` / `UnregisterPhone` / `ErrNoServer` are the primitives this handler routes through. Its `handleGraceExpiry` closes phones registered here when a binary's grace window expires. - [WSConn adapter](ws-conn-adapter.md) — what the handler hands to `RegisterPhone`. The direct-`c.Close` on no-server is the documented exception to the WSConn-only invariant. - [ADR-0005](../decisions/0005-application-close-codes-via-underlying-conn.md) — close-code emission on the underlying `*websocket.Conn`. diff --git a/docs/knowledge/features/phone-forwarder.md b/docs/knowledge/features/phone-forwarder.md new file mode 100644 index 0000000..f198ebb --- /dev/null +++ b/docs/knowledge/features/phone-forwarder.md @@ -0,0 +1,124 @@ +# Phone-side frame forwarder + +The phone-side data path. After `/v1/client` registers a phone in the registry, the handler hands the connection to `StartPhoneForwarder`, which reads frames from the phone, wraps each in the routing envelope keyed by the phone's relay-assigned `conn_id`, and writes the wrapped envelope to the binary holding the requested `serverID`. Inner frames are opaque bytes — the relay never parses the inner protocol. + +Mirror image of the (separate) binary-side forwarder. Replaces the placeholder `<-readCtx.Done()` block that #5 left in place pending the read loop. + +Authoritative wire spec: [`pyrycode/pyrycode/docs/protocol-mobile.md` § Routing envelope](https://github.com/pyrycode/pyrycode/blob/main/docs/protocol-mobile.md#routing-envelope). + +## API + +Package `internal/relay` (`forward.go`): + +```go +type phoneSource interface { + ConnID() string + Read(ctx context.Context) ([]byte, error) +} + +func StartPhoneForwarder( + ctx context.Context, + reg *Registry, + serverID string, + phone phoneSource, + logger *slog.Logger, +) error +``` + +Despite the `Start` verb (carried from the AC), the call is **synchronous**: it blocks until a terminating condition is hit. The returned error is for observability only — the `/v1/client` handler discards it (`_ = StartPhoneForwarder(...)`); the lifecycle is closed out by the handler's existing `defer { UnregisterPhone; Close; log phone_unregistered }`. + +`phoneSource` is package-private and defined at the consumer (this file), not on `WSConn`. Production passes `*WSConn`, which satisfies it via `Read` ([ws-conn-adapter.md](ws-conn-adapter.md)). Tests substitute a fake without touching `WSConn`'s shape. + +## Loop body + +Per iteration: + +1. `phone.Read(ctx)` for the next frame. Any error → log `phone_forwarder_read_end` (info) and return. +2. `Marshal(phone.ConnID(), frame)` to wrap. Marshal failure (only `ErrInvalidFrameJSON` is reachable in practice — the `WSConn` constructor guarantees a non-empty `ConnID`) → log `phone_forwarder_marshal_err` (warn) and return; the handler's `defer` closes the conn. +3. `reg.BinaryFor(serverID)`. Missing → log `phone_forwarder_no_binary` (info) and return `nil` (the only non-error return path). +4. `binary.Send(wrapped)`. Error → log `phone_forwarder_send_failed` (info) and return. + +No retries, no buffering, no backpressure handling beyond `WSConn.Send`'s own 10s write deadline. Backpressure-as-blocking is accepted for v1. + +## Termination paths + +| Cause | Mechanism | Cleanup owner | +|---|---|---| +| Phone closes WS | `c.Read` returns close error → loop returns | handler's `defer` | +| Server shutdown / request cancel | `ctx` cancel → `c.Read` returns `ctx.Err()` → loop returns | handler's `defer` | +| Binary disconnect + grace expiry | registry's `handleGraceExpiry` calls `wsconn.Close()` → underlying `*websocket.Conn.Close` aborts in-flight `Read` → loop returns | registry tore down state; handler's `defer` is idempotent | +| Binary disconnect during grace | next-frame `BinaryFor` still returns the dead binary → `Send` fails → loop returns | handler's `defer`; phone unregisters; on grace expiry the registry's fan-out-Close is a no-op for this phone | +| Adversarial non-JSON frame from phone | `Marshal` returns `ErrInvalidFrameJSON` → loop returns | handler's `defer` (warn-logged) | + +All four production paths terminate the single goroutine; the handler's defer cleans up registry state. **No goroutine leaks.** + +## Concurrency model + +- One forwarder per phone — runs on the HTTP handler goroutine itself; no extra goroutine spawned. +- `phone.Read` (and `WSConn.Read`) is **single-caller**: the forwarder is the sole reader by contract. Concurrent `Read` is not supported by the underlying library. +- `binary.Send` is multi-caller; `WSConn.writeMu` (#15) serialises across all phone forwarders writing to the same binary. Frames go on the wire whole and non-interleaved. +- `reg.BinaryFor` takes an RLock; cheap, contention-free under typical load. + +## Edits to neighbouring code + +- `WSConn.Read(ctx) ([]byte, error)` was added in this ticket (`ws_conn.go`). It does **not** join `closeCtx`: when `Close` cancels `closeCtx` *and* closes the underlying `*websocket.Conn`, the in-flight `Read` returns immediately with the library's close error. No need to plumb `closeCtx` through the read path. The type-level doc lists `Read` as the single exception to "concurrent-safe with every other method." See [ws-conn-adapter.md](ws-conn-adapter.md). +- `/v1/client` handler now ends with `_ = StartPhoneForwarder(r.Context(), reg, serverID, wsconn, logger)` instead of `c.CloseRead(...) + <-readCtx.Done()`. The drain-and-discard goroutine `CloseRead` would have spawned is gone — the new read loop processes control frames inline with data reads, and retaining `CloseRead` would race the forwarder for the sole-reader role (see `lessons.md` § "A long-lived WS handler that does not read frames will never observe peer close" for why `CloseRead` was needed before this ticket; it is no longer needed once a real reader exists). + +## Logging + +Field set is fixed; nothing else (frame bytes, headers, tokens) appears. + +| event | level | fields | +|---|---|---| +| `phone_forwarder_read_end` | info | `server_id`, `conn_id`, `err` | +| `phone_forwarder_marshal_err` | warn | `server_id`, `conn_id`, `err` | +| `phone_forwarder_no_binary` | info | `server_id`, `conn_id` | +| `phone_forwarder_send_failed` | info | `server_id`, `conn_id`, `err` | + +`err` carries library errors (close codes, ctx cancellation, write deadlines), never user payloads. The handler's `phone_unregistered` log line bookends the lifecycle. + +## What this forwarder deliberately does NOT do + +- **No `UnregisterPhone` and no `wsconn.Close()`.** The handler's `defer` owns both. Doing them here would either double-close (idempotent, but muddies the lifecycle) or unregister twice (no-op the second time but signals a confused contract). +- **No inner-frame parsing.** The relay's role per `protocol-mobile.md` § Routing envelope is "wrap, address, forward; never inspect." `Marshal`'s `json.Valid` check is structural, not semantic. +- **No buffering, no bounded channels.** A slow downstream `Send` blocks this goroutine; `WSConn.Send`'s 10s deadline bounds the worst case to ~10s before the loop returns via `Send` error. +- **No per-frame size cap.** Inherited from `WSConn`'s underlying `*websocket.Conn` (nhooyr's default 32 MiB read limit). A deliberate per-message cap is a follow-up against `WSConn` so it covers both forwarders. Not a regression introduced by this ticket; named explicitly in the architect's security review. +- **No retries on `Send` failure.** Returns and lets the handler's defer run; the phone reconnects. +- **No heartbeat / ping-pong.** Separate ticket. + +## Adversarial framing + +- **Non-JSON frame from phone.** `Marshal`'s `json.Valid` returns false → forwarder returns with a warn log → handler closes the conn. Loud but not panicky. +- **Frame larger than the binary's tolerance.** Forwarder doesn't know or care; the binary owns inner-frame validation. Per-message size on the inbound side is the inherited residual. +- **Phone races a binary disconnect.** The `BinaryFor`-then-`Send` pattern is best-effort; during grace `BinaryFor` returns the dead binary. The forwarder logs and exits cleanly when `Send` fails. No double-free, no double-unregister — the handler's defer is idempotent against the registry's grace-expiry fan-out (the `WSConn.Close` `closeOnce` swallows the second close; `UnregisterPhone` no-ops on an already-removed entry). +- **Slow binary.** Bounded by `WSConn.Send`'s 10s deadline (#15). A wedged binary causes the forwarder to return via `Send` error within ~10s rather than hanging indefinitely. +- **Crafted log content.** `err` strings come from the WS library (close codes, ctx errors, write deadline errors); no user-supplied bytes enter logs. + +Verdict from the architect's security review: **PASS**. No new credentials handled, no new logging surface, no new locks. The single residual (per-message size cap) is inherited from #15 and tracked separately. + +## Testing + +`internal/relay/forward_test.go`, `package relay`. All tests wire mocks to a real `Registry`; no httptest server (the handler-level integration is covered by `/v1/client`'s own tests). + +Two package-private fakes: + +- `fakePhone` — implements `phoneSource`. Holds a `frames chan []byte`. `Read` selects on `ctx.Done()` and on `frames`; closed channel → `io.EOF`. Tests close the chan to signal "phone disconnected". +- `fakeBinary` — implements registry `Conn`. Holds a mutex-protected `sent [][]byte` (the forwarder writes from a goroutine while the test reads, so the existing `registry_test.go` `fakeConn` shape needed mu-protection). `snapshot()` copies under the lock; `waitForSent(t, want, timeout)` polls. + +Tests (1:1 with AC): + +- **Forwards 3 frames bytewise.** Push 3 frames containing nested JSON; assert each wrapped envelope at the binary side carries the registered phone's `conn_id` and an inner frame byte-equal **modulo whitespace** via `json.Compact` per the lesson on `json.RawMessage` round-trips. +- **Phone disconnects mid-stream.** Push 1 frame, close the frames chan → forwarder returns; mimic the handler-level `UnregisterPhone`; assert `Registry.PhonesFor(serverID)` is nil. +- **No binary registered.** Don't claim a binary; push 1 frame; forwarder logs and returns `nil`. No panic. +- **Context cancellation.** Cancel parent ctx; forwarder returns within 100ms (generous bound under `-race`). + +`make test` clean with `-race`. The package-level doc comment in `forward_test.go` documents the manual stress invocation `go test -race -count=20 ./internal/relay/` per the [race-count lesson](../../lessons.md). + +## Related + +- [`/v1/client`](client-endpoint.md) — sole production caller; owns the cleanup defer the forwarder must not touch. +- [Routing envelope](routing-envelope.md) — `Marshal` is the per-frame wrap. +- [WSConn adapter](ws-conn-adapter.md) — `Read` was added here to satisfy `phoneSource`; sole-reader contract documented on the type. +- [Connection registry](connection-registry.md) — `BinaryFor` is the per-frame lookup. +- [ADR-0006](../decisions/0006-grace-period-as-reclaim-path.md) — explains why `BinaryFor` may return a dead binary during grace, and why a `Send` failure is the forwarder's signal to exit. +- [Protocol spec § Routing envelope](https://github.com/pyrycode/pyrycode/blob/main/docs/protocol-mobile.md#routing-envelope) — authoritative wire shape. diff --git a/docs/knowledge/features/ws-conn-adapter.md b/docs/knowledge/features/ws-conn-adapter.md index 1ed7ecb..b5fb55b 100644 --- a/docs/knowledge/features/ws-conn-adapter.md +++ b/docs/knowledge/features/ws-conn-adapter.md @@ -15,6 +15,7 @@ func NewWSConn(c *websocket.Conn, connID string) *WSConn func (w *WSConn) ConnID() string func (w *WSConn) Send(msg []byte) error +func (w *WSConn) Read(ctx context.Context) ([]byte, error) func (w *WSConn) Close() ``` @@ -31,13 +32,16 @@ const writeTimeout = 10 * time.Second | Method | Lock | Context | Concurrent-safe with | |---|---|---|---| | `ConnID` | none | none | every other method | -| `Send` | `writeMu` (held across one frame write) | `WithTimeout(closeCtx, writeTimeout)` | other `Send` (serialised), `Close` (cancellation breaks it), `ConnID` | -| `Close` | `closeOnce` (one shot) | cancels `closeCtx` | `Send` (in-flight write is cancelled), other `Close` (no-op), `ConnID` | +| `Send` | `writeMu` (held across one frame write) | `WithTimeout(closeCtx, writeTimeout)` | other `Send` (serialised), `Close` (cancellation breaks it), `ConnID`, `Read` | +| `Read` | none | caller-supplied `ctx` (NOT joined with `closeCtx`) | `Send`, `Close` (underlying `Close` aborts in-flight `Read`), `ConnID`. **NOT** safe with another `Read` — single-caller only. | +| `Close` | `closeOnce` (one shot) | cancels `closeCtx` | `Send` (in-flight write is cancelled), `Read` (underlying `*websocket.Conn.Close` aborts the read), other `Close` (no-op), `ConnID` | One mutex, one one-shot. The lock graph is a single node. There are no callbacks, no channels, no goroutines spawned by the adapter. `Close` deliberately does **not** take `writeMu`. Acquiring it would deadlock against a slow peer holding the mutex inside `Send`: the whole point of cancelling `closeCtx` is to abort the in-flight `Write` so it releases the mutex on its own. `nhooyr.io/websocket.Conn.Close` is documented to be safe with an in-flight `Write` — that property is why this library was chosen. +`Read` takes only the caller-supplied `ctx`; it does **not** join `closeCtx`. Rationale (added in #25): when `Close` cancels `closeCtx` *and* closes the underlying `*websocket.Conn`, the in-flight library `Read` returns immediately with the close error. Plumbing `closeCtx` through the read path would be redundant. The single-caller contract makes the lock-free shape safe — only the per-WSConn forwarder goroutine (`internal/relay/forward.go`) calls `Read`. + ## Context strategy The non-trivial design problem is the `Send([]byte) error` ↔ `Conn.Write(ctx, ...)` API mismatch: the registry's contract has no context, but the library requires one for every write. The adapter owns its own context: @@ -57,7 +61,8 @@ These are documented so the next contributor doesn't add defensive code that doe - **No `connID` validation.** Length, charset, uniqueness are owned by the conn-id-scheme ticket. The adapter treats the id as opaque. - **No handshake / header validation / subprotocol selection.** Lives at the upgrade boundary (#4/#16, #5). - **No ping/pong.** Heartbeat is #7. -- **No read-side frame loop or envelope wrap/unwrap.** That is #6. +- **No read-side frame loop or envelope wrap/unwrap.** `Read` is a single-frame primitive; the loop and envelope wrapping live in `internal/relay/forward.go` ([phone-forwarder.md](phone-forwarder.md)). +- **No per-message size cap on `Read`.** Inherited from `*websocket.Conn`'s default (nhooyr's 32 MiB read limit). A deliberate `SetReadLimit` policy is a follow-up so it covers both forwarders. - **No per-conn send queue / backpressure / rate limit.** `Send` writes synchronously and returns. None of those are in the registry's `Conn` contract. - **No close-on-`Send`-error.** Caller observes the error and chooses to call `Close`. - **No close-code semantics beyond `StatusNormalClosure`.** The adapter doesn't know why the registry asked it to close. Close-code mapping (`4401`/`4404`/`4409`) is the upgrade handler's job. @@ -101,4 +106,5 @@ What we deliberately do not test: the library's behaviour itself (we trust `Writ - [ADR-0004: WS library choice and adapter context strategy](../decisions/0004-ws-library-and-adapter-context-strategy.md) - [Connection registry](connection-registry.md) — the `Conn` interface this implements. +- [Phone-side frame forwarder](phone-forwarder.md) — sole caller of `Read`; satisfies the local `phoneSource` interface. - [Threat model](../../threat-model.md) — slow-loris and supply-chain framings. diff --git a/docs/lessons.md b/docs/lessons.md index f5b05cb..04951c3 100644 --- a/docs/lessons.md +++ b/docs/lessons.md @@ -12,7 +12,7 @@ The natural shape — `var t *time.Timer; t = time.AfterFunc(d, func() { ... use ## A long-lived WS handler that does not read frames will never observe peer close -A WebSocket connection only sees a peer-side close when *something* on this side reads from the conn — control frames (ping/pong/close) are processed inline with reads. A handler that just blocks forever (e.g. `select {}` or `<-r.Context().Done()`) keeps the goroutine alive but does not move the close machinery: the conn stays "open" until the kernel TCP timeout, and `r.Context()` does not cancel on peer close (it cancels on *server* shutdown / client TCP RST seen by the http server). Use `c.CloseRead(r.Context())` to spawn a drain-and-discard goroutine and block on the returned context — that goroutine processes control frames and cancels the context on close. When the real frame loop lands later, swap `<-readCtx.Done()` for the loop body. Source: `/v1/server` (#16). +A WebSocket connection only sees a peer-side close when *something* on this side reads from the conn — control frames (ping/pong/close) are processed inline with reads. A handler that just blocks forever (e.g. `select {}` or `<-r.Context().Done()`) keeps the goroutine alive but does not move the close machinery: the conn stays "open" until the kernel TCP timeout, and `r.Context()` does not cancel on peer close (it cancels on *server* shutdown / client TCP RST seen by the http server). Use `c.CloseRead(r.Context())` to spawn a drain-and-discard goroutine and block on the returned context — that goroutine processes control frames and cancels the context on close. When the real frame loop lands, **delete the `CloseRead` call entirely** along with `<-readCtx.Done()`; the loop is now the sole reader and a parallel `CloseRead` goroutine would race the sole-reader contract on the underlying `*websocket.Conn`. Source: `/v1/server` (#16); confirmed by the #25 swap in `/v1/client`. ## `*websocket.Conn.Close(code, reason)` emits an application close code; the `WSConn` adapter does not diff --git a/docs/specs/architecture/25-phone-forwarder.md b/docs/specs/architecture/25-phone-forwarder.md new file mode 100644 index 0000000..4343265 --- /dev/null +++ b/docs/specs/architecture/25-phone-forwarder.md @@ -0,0 +1,405 @@ +# Phone-side frame forwarder (#25) + +Per-phone read pump that wraps each inbound phone frame in the routing +envelope and writes it to the binary registered for the phone's +server-id. Replaces the placeholder `<-readCtx.Done()` block in +`internal/relay/client_endpoint.go`. Mirror-image of the binary-side +forwarder (separate ticket). + +## Files to read first + +- `internal/relay/client_endpoint.go:26-88` — current `/v1/client` + handler. Lines 81-86 hold the `CloseRead` + `<-readCtx.Done()` block + this spec replaces. Note the existing + `defer { UnregisterPhone; Close; log }` at lines 73-79 — the + forwarder must NOT touch either. +- `internal/relay/ws_conn.go` (whole file, 83 lines) — adapter shape, + `closeCtx` cancellation discipline, the "reach the conn only through + WSConn methods" contract (lines 39-42), and the existing `Send` + method as the template for the new `Read`. +- `internal/relay/envelope.go:40-55` — `Marshal(connID, frame)` + signature, the JSON-validity precondition, and `ErrInvalidFrameJSON` + / `ErrEmptyConnID` sentinels. Forwarder is the sole caller in this + ticket. +- `internal/relay/registry.go:32-47` — `Conn` interface (`ConnID`, + `Send`, `Close`); `BinaryFor` at lines 230-236 — the lookup the + forwarder calls per frame. +- `internal/relay/registry.go:125-184` — grace-window semantics + (`ScheduleReleaseServer` doc + `handleGraceExpiry`). Read these + before deciding what the forwarder does when a `Send` fails or + `BinaryFor` returns false. Critical: during grace, `BinaryFor` + returns the dead binary; on expiry, registry calls `phone.Close()` + on every registered phone. +- `internal/relay/registry_test.go:11-45` — `fakeConn` shape (id, + `sent` slice, `closed` flag, optional `closeCh`). `forward_test.go` + needs a similar binary-side fake but with mu-protected `sent` (the + forwarder writes from a goroutine while the test reads). +- `internal/relay/envelope_test.go:10-38` — the canonical pattern for + asserting bytewise opacity via `json.Compact`. `forward_test.go` + reuses this exactly. +- `internal/relay/client_endpoint_test.go:21-65` — the + `startClient` / `seedBinary` / `waitForPhones` helpers. The + forwarder tests don't need an httptest server (they wire fakes to + a real `Registry` directly), but the polling shape of + `waitForPhones` is the model for `waitForSent`. +- `docs/lessons.md` § "json.RawMessage round-trips are byte-stable + modulo whitespace" (line 37-39); § "Race-test count is a CI-runner + knob" (line 45-47); § "A long-lived WS handler that does not read + frames will never observe peer close" (lines 13-15) — explains why + the existing `CloseRead` call must be deleted, not retained + alongside the new reader. +- `pyrycode/pyrycode/docs/protocol-mobile.md` § Routing envelope + (lines 100-120 in that file) — the wire shape `{conn_id, frame}`, + and the rule that `conn_id` is relay↔binary only (the phone never + sees it). + +## Context + +After `/v1/client` (#5) registers a phone, the handler currently parks +on `c.CloseRead(r.Context())` + `<-readCtx.Done()` so the WS +processes peer-side control frames but discards data frames. This +ticket lands the data path: read each inbound frame, wrap it in the +routing envelope keyed by the phone's relay-assigned `conn_id`, and +write the wrapped envelope to the binary's `Conn`. + +Stateless relay; opaque inner frames; per-frame `BinaryFor` lookup +picks up grace-window reclaim transparently. + +## Design + +### New file: `internal/relay/forward.go` + +Two pieces: + +1. A small package-private interface `phoneSource` for the read side. + Defined at the consumer (this file), not exported, not on + `WSConn`. Tests pass a fake; production passes a `*WSConn` (which + gains a `Read` method — see § "Edit: `internal/relay/ws_conn.go`" + below). + + ```go + type phoneSource interface { + ConnID() string + Read(ctx context.Context) ([]byte, error) + } + ``` + +2. The exported `StartPhoneForwarder` function — synchronous (despite + the `Start` verb; the AC fixes the name). It blocks until a + terminating condition is hit, returning the underlying cause for + observability. The caller (the `/v1/client` handler) discards the + return; its `defer` runs regardless. + + ```go + func StartPhoneForwarder( + ctx context.Context, + reg *Registry, + serverID string, + phone phoneSource, + logger *slog.Logger, + ) error + ``` + +### Loop body + +``` +for { + frame, err := phone.Read(ctx) + if err != nil { + // ctx cancellation, peer close, library error — all funnel + // here. Log at info; the caller's phone_unregistered log + // closes out the lifecycle. + logger.Info("phone_forwarder_read_end", + "server_id", serverID, + "conn_id", phone.ConnID(), + "err", err) + return err + } + + wrapped, err := Marshal(phone.ConnID(), frame) + if err != nil { + // Adversarial input: phone sent non-JSON. Loud failure — + // close the conn (handler's defer does this on return). + logger.Warn("phone_forwarder_marshal_err", + "server_id", serverID, + "conn_id", phone.ConnID(), + "err", err) + return err + } + + binary, ok := reg.BinaryFor(serverID) + if !ok { + // No binary: either grace already expired (registry will + // have called phone.Close so we'd normally exit via Read + // err), or the test seam where the handler skipped + // RegisterPhone. Either way, return. + logger.Info("phone_forwarder_no_binary", + "server_id", serverID, + "conn_id", phone.ConnID()) + return nil + } + + if err := binary.Send(wrapped); err != nil { + // During grace, BinaryFor returns the dead binary; its + // Send fails. Log + return, handler's defer cleans up. + logger.Info("phone_forwarder_send_failed", + "server_id", serverID, + "conn_id", phone.ConnID(), + "err", err) + return err + } +} +``` + +### Edit: `internal/relay/ws_conn.go` + +Add a `Read` method that delegates to the wrapped `*websocket.Conn`. +Mirror of `Send`'s shape, but no read mutex — the forwarder is the +sole reader by contract; concurrent `Read` is not supported (matches +nhooyr's library posture). + +```go +// Read returns the next inbound message as opaque bytes. ctx bounds +// the wait; cancellation aborts the read with the library's wrapped +// error. The message type (binary vs text) is discarded — the relay +// treats inner frames as opaque bytes. Concurrent Read callers are +// NOT supported; the per-WSConn forwarder goroutine is the sole +// reader. After Close, in-flight Reads return with a close error +// from the underlying *websocket.Conn. +func (w *WSConn) Read(ctx context.Context) ([]byte, error) { + _, data, err := w.conn.Read(ctx) + return data, err +} +``` + +Update the type-level doc comment (lines 16-23 of `ws_conn.go`) to +note that `Read` is single-caller while `Send`, `ConnID`, `Close` +remain concurrency-safe. + +`Read` does NOT take the `closeCtx` and does NOT join it with `ctx`. +Rationale: when `Close` cancels `closeCtx` AND closes the underlying +`*websocket.Conn`, the in-flight `c.Read(ctx)` returns immediately +with the library's close error. No need to plumb `closeCtx` through +the read path — the underlying close already aborts the read. + +### Edit: `internal/relay/client_endpoint.go` + +Replace the placeholder block (lines 81-86) with a +`StartPhoneForwarder` call: + +```go +// Before: +readCtx := c.CloseRead(r.Context()) +<-readCtx.Done() + +// After: +_ = StartPhoneForwarder(r.Context(), reg, serverID, wsconn, logger) +``` + +Drop the `CloseRead` call entirely. The new read loop processes +control frames inline with data reads, so the drain-and-discard +goroutine is no longer needed (and would race the forwarder if +retained — see `lessons.md` line 13-15 and the `WSConn` "sole reader" +contract). + +The existing `defer { UnregisterPhone; Close; log }` (lines 73-79) +runs unchanged when the forwarder returns. The forwarder MUST NOT +call `UnregisterPhone` or `wsconn.Close()` — that's the handler's +job, and doing it here would either double-close (idempotent, but +muddies the lifecycle) or unregister twice (no-op the second time +but signals a confused contract). + +The return value is intentionally discarded (`_ =`). The handler's +`phone_unregistered` log already terminates the lifecycle from the +HTTP side; the forwarder's own logs cover the data path. + +## Concurrency model + +- One forwarder goroutine per phone (the HTTP handler goroutine + itself; no extra goroutine spawned). +- `phone.Read` is single-caller (forwarder). +- `binary.Send` is multi-caller — `WSConn.writeMu` serialises across + all phone forwarders writing to the same binary. Existing + guarantee from #15. +- `reg.BinaryFor` takes RLock; cheap, contention-free under typical + load. +- Shutdown paths: + 1. **Phone closes WS:** `c.Read` returns close error → loop + returns → handler defer runs. + 2. **Server shutdown / request cancel:** `ctx` cancels → `c.Read` + returns ctx error → loop returns. + 3. **Binary disconnect + grace expiry:** registry's + `handleGraceExpiry` calls `wsconn.Close()` → + `*websocket.Conn.Close` aborts in-flight `Read` → loop returns. + 4. **Binary disconnect during grace, before expiry:** `Send` to + the (closed) binary fails → loop logs + returns. Phone's + handler defer fires; phone gets unregistered; on grace expiry + the phone is no longer in the snapshot so registry's + fan-out-Close is a no-op for it. +- No goroutine leaks: all four paths terminate the single + goroutine, and the handler's defer cleans up registry state. + +## Error handling + +| Cause | Action | Log level | +|--------------------------------------|-----------------------------------------|-----------| +| `phone.Read` error (any) | log, return err | info | +| `Marshal` error (`ErrInvalidFrameJSON`) | log, return err — closes the phone | warn | +| `BinaryFor` returns false | log, return nil | info | +| `binary.Send` error | log, return err | info | + +`Marshal` returning `ErrEmptyConnID` is unreachable: the WSConn +guarantees a non-empty `ConnID` (set in `NewWSConn`, the handler +constructs it as `"client-" + serverID + "-" + randHex8()`). The +spec does not add a defensive check; if the invariant is ever +broken upstream, the loop returns the marshal error like any other +malformed-input case. + +## Testing strategy + +`internal/relay/forward_test.go`. All tests use mocks against a real +`Registry`; no httptest server. + +Two test fakes (defined in this file, package-private): + +- `fakePhone`: implements `phoneSource`. Holds an `id` and a + `frames chan []byte`. `Read` selects on `ctx.Done()` and on + `frames`; on closed frames-chan, returns `io.EOF`. (Tests close + the chan to signal "phone disconnected".) +- `fakeBinary`: implements `Conn`. Holds an `id`, `mu sync.Mutex`, + and `sent [][]byte`. `Send` appends under the mutex (the existing + registry-test `fakeConn` does not, and the forwarder writes from + a goroutine while the test reads — so we need the mutex). Add a + `snapshot()` helper that copies under the lock for assertion. + Provide a `waitForSent(t, binary, want, timeout)` helper that + polls `len(snapshot()) == want`. + +Test cases: + +1. **Forwards 3 frames bytewise.** Claim a `fakeBinary` for + server-id `"s1"` via `reg.ClaimServer`. Construct a `fakePhone` + with id `"client-s1-aa00bb11"`; register it via + `reg.RegisterPhone`. Run `StartPhoneForwarder` in a goroutine. + Push 3 frames containing nested JSON (e.g. + `{"type":"hello","nested":{"a":[1,2,3],"b":null}}`). Wait for + `fakeBinary.sent` to have 3 entries. For each: `Unmarshal` the + wrapped bytes, assert `ConnID == phone.ConnID()`, and assert the + inner frame is byte-equal modulo whitespace via the `json.Compact` + shape from `envelope_test.go`. Close the frames chan to terminate + the forwarder; wait on its done chan. + +2. **Phone disconnects mid-stream.** Same setup. Push 1 frame + (assert it arrives). Close the frames chan → forwarder's `Read` + returns `io.EOF` → forwarder returns. From the test goroutine, + call `reg.UnregisterPhone(serverID, phone.ConnID())` (mimicking + the handler defer) and assert `reg.PhonesFor(serverID)` is nil. + +3. **No binary registered.** Construct a `fakePhone`. Do NOT call + `ClaimServer`; do NOT call `RegisterPhone` (we're testing the + forwarder's own resilience to a missing binary, not the + handler's). Run forwarder, push 1 frame. Forwarder's `BinaryFor` + lookup returns false → logs + returns nil. Assert via the + forwarder's done-chan that it returned, and that no panic + occurred. (Use `slog.New(slog.NewTextHandler(io.Discard, nil))` + so the warn/info logs don't pollute test output.) + +4. **Context cancellation.** Same setup as test 1 but DON'T push + frames. Cancel the parent ctx. Forwarder's `Read` returns + ctx.Err → loop returns. Assert return within 100 ms (generous + bound under `-race`). + +`make test` clean with `-race`. The package-level doc comment for +`forward_test.go` documents the manual stress invocation per the +race-count lesson: + +```go +// Manual stress: go test -race -count=20 ./internal/relay/ +// (count belongs on the command line, not as a loop in the test; +// see docs/lessons.md "Race-test count is a CI-runner knob".) +``` + +A `WSConn.Read` test isn't required by the AC, but a single +round-trip test in `ws_conn_test.go` (write a frame from the test +side, `Read` it through the adapter, assert byte-equal) prevents the +new method from regressing silently. ~15 LOC; counts toward the +production-side line budget but keeps `WSConn`'s contract test +parity with `Send`. Optional — the developer can defer if the budget +is tight. + +## Open questions + +None blocking. Two judgement calls the developer will hit: + +1. **Return value plumbing.** `error` return is for observability + only; the handler discards it. If the developer prefers a + `void` return for clarity, that's acceptable — the AC says + "exact signature shape is the architect's call". I've specified + `error` because it preserves diagnostic signal at zero cost. + +2. **`phoneSource` interface placement.** Defined in `forward.go` + per the "interface at the consumer" pattern. If the binary-side + forwarder ticket lands and wants the same shape mirrored for its + read side, we can promote it later. No need to anticipate now. + +## Security review + +**Verdict:** PASS + +**Findings:** + +- **[Trust boundaries]** No findings — phone WS bytes are untrusted; + the forwarder treats them as opaque (`json.RawMessage` via + `Marshal`) and the binary owns inner-frame validation. The single + trust transition is `Marshal`'s `json.Valid` check at + `envelope.go:44`; on failure the forwarder closes the phone (warn + log + return → handler defer). Downstream binary code already + knows it receives untrusted-via-relay bytes. +- **[Tokens, secrets, credentials]** No findings — the forwarder + never reads or constructs `X-Pyrycode-Token`. The token already + went out of scope unread in the `/v1/client` handler (#5). No + log call site in this spec includes any header value; the + enumerated log fields are `server_id`, `conn_id`, and `err` (a + library error, not request-derived). +- **[File operations]** N/A — no filesystem access. +- **[Subprocess execution]** N/A — no subprocess. +- **[Cryptographic primitives]** N/A — no cryptographic operations + in this ticket; the existing `randHex8`-derived `conn_id` is + passed through unchanged. +- **[Network & I/O]** SHOULD FIX (deferred) — `c.Read(ctx)` has no + per-message size cap. A malicious phone can send arbitrarily + large frames to exhaust memory in `Marshal` and on the binary's + buffers. **Out of scope for #25** — the per-frame size cap belongs + on the WS upgrade configuration (`websocket.AcceptOptions` does + not expose a max-message-size knob in nhooyr v1.8.x; the + `*websocket.Conn` has `SetReadLimit` which is the right hook). + Should be a follow-up ticket against `WSConn` so it covers the + binary-side forwarder too. Tracking note: this is the same + unbounded-read posture inherited from #15; not a regression. + Default nhooyr read limit is 32 MiB which provides a soft floor + but isn't the deliberate policy choice the threat model warrants. +- **[Network & I/O]** No finding on backpressure — a slow binary + blocks the phone's forwarder goroutine. AC explicitly accepts + this for v1; the `WSConn.Send` 10 s deadline (#15) bounds any + single write, so a wedged binary causes the forwarder to return + via `Send` error within 10 s rather than hanging indefinitely. +- **[Error messages, logs, telemetry]** No findings — the four log + call sites enumerate fields explicitly. No frame bytes, no + headers, no tokens enter logs. `err` carries library errors + (close codes, ctx cancellation, write deadlines), not user + payloads. No telemetry / metrics added. +- **[Concurrency]** No findings — single goroutine per phone (the + HTTP handler's own); no new locks; per-frame `BinaryFor` lookup + is RLock-only and does not nest with any caller-held lock. + Goroutine lifecycle: the four termination paths in § + "Concurrency model" cover ctx cancel, phone close, binary close, + grace expiry. No leaks. +- **[Threat model alignment]** No findings — the relay's role per + `protocol-mobile.md` § Routing envelope is "wrap, address, + forward; never inspect". This spec preserves that: `json.Valid` + is structural, not semantic; the binary owns token verification + and message-kind dispatch. Out-of-scope items (heartbeat, binary + disconnect grace, binary-side forwarder) are named in the + ticket's Technical Notes section. + +**Reviewer:** architect (self-review per `architect/security-review.md`) +**Date:** 2026-05-09 diff --git a/internal/relay/client_endpoint.go b/internal/relay/client_endpoint.go index 4793e22..ce80104 100644 --- a/internal/relay/client_endpoint.go +++ b/internal/relay/client_endpoint.go @@ -78,11 +78,12 @@ func ClientHandler(reg *Registry, logger *slog.Logger) http.Handler { "conn_id", connID) }() - // Hold the connection open until the peer closes it (or the - // registry tears it down on binary-grace expiry). CloseRead drains - // control frames so the conn observes peer-close. The frame loop - // (#6) replaces this block with a real read loop later. - readCtx := c.CloseRead(r.Context()) - <-readCtx.Done() + // Phone-side read pump: wraps each inbound frame in the routing + // envelope and writes it to the binary holding serverID. Blocks + // until the phone closes the WS, ctx cancels, the binary + // disappears, or a Send to the binary fails. The handler's defer + // above runs when this returns. Return value is observability- + // only; the forwarder logs the cause. + _ = StartPhoneForwarder(r.Context(), reg, serverID, wsconn, logger) }) } diff --git a/internal/relay/forward.go b/internal/relay/forward.go new file mode 100644 index 0000000..12ab86d --- /dev/null +++ b/internal/relay/forward.go @@ -0,0 +1,74 @@ +package relay + +import ( + "context" + "log/slog" +) + +// phoneSource is the read-side contract the forwarder needs from a phone +// connection. Defined at the consumer (this file), not on WSConn, so tests +// can substitute a fake. Production passes *WSConn, which satisfies it via +// its Read method. +// +// Concurrent callers are NOT supported; the forwarder goroutine is the sole +// reader (matches WSConn.Read's contract). +type phoneSource interface { + ConnID() string + Read(ctx context.Context) ([]byte, error) +} + +// StartPhoneForwarder runs the per-phone read pump synchronously: it reads +// frames from phone, wraps each in the routing envelope keyed by +// phone.ConnID(), looks up the binary holding serverID, and writes the +// wrapped envelope to that binary. Returns when phone.Read errors, ctx is +// cancelled, the registered binary disappears, or a Send to the binary +// fails — the underlying error is returned for observability. +// +// The caller's defer (in /v1/client) handles UnregisterPhone and Close; +// the forwarder must NOT touch either. The relay treats inner frames as +// opaque bytes: only Marshal's structural json.Valid check inspects them. +// +// Despite the Start verb, the call is synchronous; the verb matches the AC. +func StartPhoneForwarder( + ctx context.Context, + reg *Registry, + serverID string, + phone phoneSource, + logger *slog.Logger, +) error { + for { + frame, err := phone.Read(ctx) + if err != nil { + logger.Info("phone_forwarder_read_end", + "server_id", serverID, + "conn_id", phone.ConnID(), + "err", err) + return err + } + + wrapped, err := Marshal(phone.ConnID(), frame) + if err != nil { + logger.Warn("phone_forwarder_marshal_err", + "server_id", serverID, + "conn_id", phone.ConnID(), + "err", err) + return err + } + + binary, ok := reg.BinaryFor(serverID) + if !ok { + logger.Info("phone_forwarder_no_binary", + "server_id", serverID, + "conn_id", phone.ConnID()) + return nil + } + + if err := binary.Send(wrapped); err != nil { + logger.Info("phone_forwarder_send_failed", + "server_id", serverID, + "conn_id", phone.ConnID(), + "err", err) + return err + } + } +} diff --git a/internal/relay/forward_test.go b/internal/relay/forward_test.go new file mode 100644 index 0000000..0a862b7 --- /dev/null +++ b/internal/relay/forward_test.go @@ -0,0 +1,269 @@ +// Manual stress: go test -race -count=20 ./internal/relay/ +// (count belongs on the command line, not as a loop in the test; +// see docs/lessons.md "Race-test count is a CI-runner knob".) +package relay + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io" + "log/slog" + "sync" + "testing" + "time" +) + +// fakePhone implements phoneSource. Tests push frames onto frames; closing +// the channel signals "phone disconnected" — Read returns io.EOF. +type fakePhone struct { + id string + frames chan []byte +} + +func newFakePhone(id string) *fakePhone { + return &fakePhone{id: id, frames: make(chan []byte, 16)} +} + +func (p *fakePhone) ConnID() string { return p.id } + +func (p *fakePhone) Read(ctx context.Context) ([]byte, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case frame, ok := <-p.frames: + if !ok { + return nil, io.EOF + } + return frame, nil + } +} + +// fakeBinary implements Conn for use as the registry-side binary. Send is +// called from the forwarder goroutine while the test reads sent from the +// test goroutine, so writes are mu-protected. +type fakeBinary struct { + id string + mu sync.Mutex + sent [][]byte + sendErr error +} + +func (b *fakeBinary) ConnID() string { return b.id } + +func (b *fakeBinary) Send(msg []byte) error { + b.mu.Lock() + defer b.mu.Unlock() + if b.sendErr != nil { + return b.sendErr + } + cp := make([]byte, len(msg)) + copy(cp, msg) + b.sent = append(b.sent, cp) + return nil +} + +func (b *fakeBinary) Close() {} + +func (b *fakeBinary) snapshot() [][]byte { + b.mu.Lock() + defer b.mu.Unlock() + out := make([][]byte, len(b.sent)) + for i, m := range b.sent { + cp := make([]byte, len(m)) + copy(cp, m) + out[i] = cp + } + return out +} + +func waitForSent(t *testing.T, b *fakeBinary, want int, timeout time.Duration) [][]byte { + t.Helper() + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + got := b.snapshot() + if len(got) >= want { + return got + } + time.Sleep(5 * time.Millisecond) + } + got := b.snapshot() + t.Fatalf("waitForSent: got %d, want %d after %v", len(got), want, timeout) + return nil +} + +func discardLogger() *slog.Logger { + return slog.New(slog.NewTextHandler(io.Discard, nil)) +} + +// runForwarder spawns the forwarder on a goroutine and returns a done chan +// that closes once it returns, plus the cancel for the parent context. +func runForwarder(reg *Registry, serverID string, phone phoneSource) (done chan error, cancel context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + done = make(chan error, 1) + go func() { + done <- StartPhoneForwarder(ctx, reg, serverID, phone, discardLogger()) + }() + return done, cancel +} + +func compactJSON(t *testing.T, b []byte) []byte { + t.Helper() + var buf bytes.Buffer + if err := json.Compact(&buf, b); err != nil { + t.Fatalf("compact: %v (input: %s)", err, b) + } + return buf.Bytes() +} + +func TestStartPhoneForwarder_ForwardsFramesBytewise(t *testing.T) { + t.Parallel() + + reg := NewRegistry() + bin := &fakeBinary{id: "bin-s1"} + if err := reg.ClaimServer("s1", bin); err != nil { + t.Fatalf("ClaimServer: %v", err) + } + phone := newFakePhone("client-s1-aa00bb11") + if err := reg.RegisterPhone("s1", ®istryConn{phone}); err != nil { + t.Fatalf("RegisterPhone: %v", err) + } + + done, cancel := runForwarder(reg, "s1", phone) + defer cancel() + + innerFrames := [][]byte{ + []byte(`{"type":"hello","nested":{"a":[1,2,3],"b":null}}`), + []byte(`{"type":"keystroke","data":"abc","x":42}`), + []byte(`{"type":"resize","cols":80,"rows":24,"meta":{"k":"v"}}`), + } + for _, f := range innerFrames { + phone.frames <- f + } + + got := waitForSent(t, bin, len(innerFrames), 2*time.Second) + if len(got) != len(innerFrames) { + t.Fatalf("sent count = %d, want %d", len(got), len(innerFrames)) + } + + for i, wrapped := range got { + env, err := Unmarshal(wrapped) + if err != nil { + t.Fatalf("Unmarshal frame %d: %v", i, err) + } + if env.ConnID != phone.ConnID() { + t.Errorf("frame %d ConnID = %q, want %q", i, env.ConnID, phone.ConnID()) + } + want := compactJSON(t, innerFrames[i]) + gotInner := compactJSON(t, env.Frame) + if !bytes.Equal(want, gotInner) { + t.Errorf("frame %d inner bytes diverged\nwant: %s\n got: %s", i, want, gotInner) + } + } + + close(phone.frames) + select { + case err := <-done: + if !errors.Is(err, io.EOF) { + t.Fatalf("forwarder return = %v, want io.EOF", err) + } + case <-time.After(time.Second): + t.Fatal("forwarder did not return after frames chan closed") + } +} + +func TestStartPhoneForwarder_PhoneDisconnect_LeavesRegistryCleanable(t *testing.T) { + t.Parallel() + + reg := NewRegistry() + bin := &fakeBinary{id: "bin-s1"} + if err := reg.ClaimServer("s1", bin); err != nil { + t.Fatalf("ClaimServer: %v", err) + } + phone := newFakePhone("client-s1-cc11dd22") + if err := reg.RegisterPhone("s1", ®istryConn{phone}); err != nil { + t.Fatalf("RegisterPhone: %v", err) + } + + done, cancel := runForwarder(reg, "s1", phone) + defer cancel() + + phone.frames <- []byte(`{"type":"k","v":1}`) + waitForSent(t, bin, 1, time.Second) + + // Mid-stream phone disconnect: closing the chan makes Read return EOF. + close(phone.frames) + select { + case err := <-done: + if !errors.Is(err, io.EOF) { + t.Fatalf("forwarder return = %v, want io.EOF", err) + } + case <-time.After(time.Second): + t.Fatal("forwarder did not return after phone disconnect") + } + + // Mimic the handler-level defer that owns cleanup. + reg.UnregisterPhone("s1", phone.ConnID()) + if got := reg.PhonesFor("s1"); got != nil { + t.Fatalf("PhonesFor(s1) = %v, want nil after UnregisterPhone", got) + } +} + +func TestStartPhoneForwarder_NoBinary_ReturnsNil(t *testing.T) { + t.Parallel() + + reg := NewRegistry() + // No ClaimServer, no RegisterPhone — testing the forwarder's own + // resilience to a missing binary, not the handler's. + phone := newFakePhone("client-s1-ee22ff33") + + done, cancel := runForwarder(reg, "s1", phone) + defer cancel() + + phone.frames <- []byte(`{"x":1}`) + + select { + case err := <-done: + if err != nil { + t.Fatalf("forwarder return = %v, want nil", err) + } + case <-time.After(time.Second): + t.Fatal("forwarder did not return on missing binary") + } +} + +func TestStartPhoneForwarder_ContextCancellation_Returns(t *testing.T) { + t.Parallel() + + reg := NewRegistry() + bin := &fakeBinary{id: "bin-s1"} + if err := reg.ClaimServer("s1", bin); err != nil { + t.Fatalf("ClaimServer: %v", err) + } + phone := newFakePhone("client-s1-44aa55bb") + if err := reg.RegisterPhone("s1", ®istryConn{phone}); err != nil { + t.Fatalf("RegisterPhone: %v", err) + } + + done, cancel := runForwarder(reg, "s1", phone) + + cancel() + select { + case err := <-done: + if !errors.Is(err, context.Canceled) { + t.Fatalf("forwarder return = %v, want context.Canceled", err) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("forwarder did not return promptly on ctx cancel") + } +} + +// registryConn adapts a fakePhone to the registry's Conn interface so it can +// be RegisterPhone'd. The forwarder reads through the fakePhone directly via +// phoneSource; the registry needs a Conn (ConnID, Send, Close). +type registryConn struct{ p *fakePhone } + +func (c *registryConn) ConnID() string { return c.p.ConnID() } +func (c *registryConn) Send(msg []byte) error { return nil } +func (c *registryConn) Close() {} diff --git a/internal/relay/ws_conn.go b/internal/relay/ws_conn.go index 29d179e..f538560 100644 --- a/internal/relay/ws_conn.go +++ b/internal/relay/ws_conn.go @@ -18,9 +18,10 @@ const writeTimeout = 10 * time.Second // (the underlying library forbids concurrent Write) and a per-connection // cancellation context that Close trips to abort in-flight writes. // -// All exported methods are safe for concurrent use. Send serialises +// Send, ConnID, and Close are safe for concurrent use: Send serialises // concurrent callers; ConnID is a pure getter; Close is idempotent and -// may run concurrently with Send. +// may run concurrently with Send. Read is single-caller — the per-WSConn +// forwarder goroutine is the sole reader (see internal/relay/forward.go). type WSConn struct { conn *websocket.Conn connID string @@ -70,6 +71,18 @@ func (w *WSConn) Send(msg []byte) error { return w.conn.Write(ctx, websocket.MessageBinary, msg) } +// Read returns the next inbound message as opaque bytes. ctx bounds the +// wait; cancellation aborts the read with the library's wrapped error. +// The message type (binary vs text) is discarded — the relay treats +// inner frames as opaque bytes. Concurrent Read callers are NOT +// supported; the per-WSConn forwarder goroutine is the sole reader. +// After Close, an in-flight Read returns with the library's close +// error from the underlying *websocket.Conn. +func (w *WSConn) Read(ctx context.Context) ([]byte, error) { + _, data, err := w.conn.Read(ctx) + return data, err +} + // Close cancels in-flight writes and closes the WebSocket with // StatusNormalClosure. Idempotent: only the first call reaches the // underlying *websocket.Conn; subsequent calls are no-ops. Safe to call