From 4eaa4b3e5e21967b7e57b9d871e6eef41f9abe2a Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Thu, 4 Dec 2025 22:57:18 +0800 Subject: [PATCH 1/9] docs: websocket reconnect redesign --- .../prd/000-websocket-client-reconnect.md | 75 +++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 packages/loro-websocket/prd/000-websocket-client-reconnect.md diff --git a/packages/loro-websocket/prd/000-websocket-client-reconnect.md b/packages/loro-websocket/prd/000-websocket-client-reconnect.md new file mode 100644 index 0000000..2aafc6d --- /dev/null +++ b/packages/loro-websocket/prd/000-websocket-client-reconnect.md @@ -0,0 +1,75 @@ +# PRD 000 — Stabilize WebSocket Client Reconnect + +## Problem +- Auto‑reconnect works in simple cases but is brittle in degraded networks and offers little visibility. Developers see flapping status events, stuck “connected” sockets that no longer deliver data, and endless retry loops after auth kicks or offline transitions. +- The current API exposes only coarse `onStatusChange` and a low‑level `onWsClose()` without context, making it hard to debug, alert, or build UX around reconnect. + +## Current Behavior (as implemented) +- Initial connect fires in the constructor; subsequent reconnects are scheduled with exponential backoff: 0.5s, 1s, 2s … capped at 15s (`src/client/index.ts:430-437`). +- Reconnect is controlled by a boolean `shouldReconnect`; it is only flipped to `false` for close codes 4400‑4499 or reasons `permission_changed` / `room_closed` (`src/client/index.ts:365-377`). +- Offline handling clears the timer and closes the socket, but does **not** block the next reconnect timer that will be scheduled by the ensuing `close` event (`src/client/index.ts:451-459`). +- Pings measure latency but never trigger a reconnect on missing pongs; half‑open connections stay in `Connected` forever (`src/client/index.ts:1095-1141`). +- Joins issued while the socket is still CONNECTING send immediately; in Node’s `ws` this throws, rejecting the join but leaving client state inconsistent (`src/client/index.ts:871-880`). +- No jitter in backoff → concurrent clients herd after an outage. +- No visibility hooks for reconnect attempts, reasons, backoff delay, or per‑room rejoin results; `onWsClose` lacks the `CloseEvent`. + +## Observed / Potential Failure Modes +1) **Offline loop**: `handleOffline()` closes the socket, `onSocketClose()` immediately schedules a retry even though the device is offline, causing tight retry/error loops and battery/CPU churn. +2) **Fatal kicks retry forever**: Server close codes like 1008/1011 or application errors keep auto‑retrying because only 440x and two strings are treated as fatal. This spams auth, logs, and server. +3) **Half‑open stall**: If the TCP path drops without a FIN (common on captive portals / mobile), the socket stays “open”; no pongs arrive, but the client never transitions to Disconnected or triggers backoff. +4) **Join during CONNECTING**: Calling `join` before `waitConnected()` can throw in Node, leaving `pendingRooms` populated and adapters without context. +5) **Thundering herd**: Identical backoff without jitter means many clients reconnect in lockstep after outages. +6) **Weak observability**: Callers can’t answer “why did we reconnect?”, “what attempt/delay are we on?”, “was rejoin successful?”, or “what close code did we get?”. Debugging relies on console noise. + +## Goals +- Make reconnect predictable across offline/online transitions, server kicks, and half‑open links. +- Provide API hooks and telemetry to let applications surface health, analytics, and UX cues. +- Preserve back‑compat by keeping defaults close to today’s behavior, but safer. + +## Non‑Goals +- Rewriting the server or changing wire protocol. +- Adding persistence or storage for reconnect state. + +## Proposed Design (kept minimal) + +1) **Small reconnect policy** + - `reconnect?: { enabled?: boolean; initialDelayMs?: number; maxDelayMs?: number; jitter?: number; maxAttempts?: number | "infinite"; fatalCloseCodes?: number[]; fatalCloseReasons?: string[] }` + - Defaults keep today’s timing (500ms start, cap 15s, jitter 0.25) and infinite retries. + - `maxAttempts` stays supported; callers can set a finite ceiling, otherwise `"infinite"`. + - Fatal closes (4400‑4499, 1008, 1011, `permission_changed`, `room_closed`, `auth_failed`) stop auto‑retry; caller can still call `retryNow()`. + +2) **Ping-based liveness** + - If two consecutive pings miss (`pingTimeoutMs` default 10s), forcibly close with reason `ping_timeout` and enter backoff. Prevents half‑open stalls. + +3) **Offline pause** + - While `navigator.onLine === false`, do not schedule retries. When `online` fires, schedule immediately. No extra backstop timer to keep behavior predictable. + +4) **Safer join during connect** + - Queue join requests while the socket is CONNECTING; flush on OPEN. (“Pending joins” = `join()` calls made before the socket is ready.) Default queue limit: 50 outstanding joins; beyond that, reject with a clear error so apps can back off. + +5) **Manual reconnect** + - Add `retryNow()` (alias `connect({ resetBackoff: true })`) to attempt immediately and reset backoff. Also re-enables auto-retry if it had stopped. + +## UX / API Back‑Compat +- Defaults mimic today’s timing and infinite retries; jitter and fatal code handling are additive improvements. +- Existing `onStatusChange` continues to work; new states (`reconnecting`) will be documented but won’t break existing string comparisons (add to `ClientStatus` map). +- `onWsClose` kept but now receives the `CloseEvent` for parity; deprecated in docs in favor of `onClose`. + +## Acceptance Criteria +- Reconnect pauses while offline; resumes immediately on `online`. +- Fatal kicks (440x, 1008/1011, auth strings) do **not** auto‑retry unless caller calls `retryNow()`. +- Half‑open sockets are detected by ping timeout and recover via reconnect. +- Joins issued during connect do not throw and complete once connected or fail with a clear error. +- Minimal API surface: no new event fan‑out beyond `onStatusChange` and `retryNow()`. + +## Testing Plan +- Unit: backoff jitter math, classifyClose matrix, join queuing during CONNECTING, half‑open ping timeout transition. +- E2E: + - server stop/start with offline events → reconnect after online. + - server closes with 1008 → no retry by default. + - missing pongs → forced reconnect. + - many clients (N≥20) reconnect → ensure jitter prevents synchronized attempts. +- Regression: existing e2e reconnection and offline tests must still pass with defaults. + +## Open Questions +- Is queue limit 50 sufficient, or should it scale with `pendingRooms` size or be user-configurable? From 01d1ddd64791991094a0e8fc6bf47a1ebad313b2 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Thu, 4 Dec 2025 23:14:57 +0800 Subject: [PATCH 2/9] fix: refine LoroWebsocketClientt reconnect --- packages/loro-websocket/src/client/index.ts | 170 +++++++++++++++++--- packages/loro-websocket/tests/e2e.test.ts | 164 ++++++++++++++++--- 2 files changed, 298 insertions(+), 36 deletions(-) diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index 55e2fa1..45ff053 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -86,10 +86,31 @@ export interface LoroWebsocketClientOptions { url: string; /** Optional custom ping interval. Defaults to 30s. Set with `disablePing` to stop timers. */ pingIntervalMs?: number; + /** Ping timeout; after two consecutive misses the client will force-close and reconnect. Defaults to 10s. */ + pingTimeoutMs?: number; /** Disable periodic ping/pong entirely. */ disablePing?: boolean; /** Optional callback for low-level ws close (before status transitions). */ onWsClose?: () => void; + /** + * Reconnect policy (kept minimal). + * - enabled: toggle auto-retry (default true) + * - initialDelayMs: starting backoff delay (default 500) + * - maxDelayMs: max backoff delay (default 15000) + * - jitter: 0-1 multiplier applied randomly around the delay (default 0.25) + * - maxAttempts: number | "infinite" (default "infinite") + * - fatalCloseCodes: close codes that should not retry (default 4400-4499, 1008, 1011) + * - fatalCloseReasons: close reasons that should not retry (default permission_changed, room_closed, auth_failed) + */ + reconnect?: { + enabled?: boolean; + initialDelayMs?: number; + maxDelayMs?: number; + jitter?: number; + maxAttempts?: number | "infinite"; + fatalCloseCodes?: number[]; + fatalCloseReasons?: string[]; + }; } /** @@ -132,12 +153,18 @@ export class LoroWebsocketClient { reject: (err: Error) => void; timeoutId: ReturnType; }> = []; + private missedPongs = 0; // Reconnect controls private shouldReconnect = true; private reconnectAttempts = 0; private reconnectTimer?: ReturnType; private removeNetworkListeners?: () => void; + private offline = false; + + // Join requests issued while socket is still connecting + private queuedJoins: Uint8Array[] = []; + private readonly maxQueuedJoins = 50; constructor(private ops: LoroWebsocketClientOptions) { this.attachNetworkListeners(); @@ -274,7 +301,10 @@ export class LoroWebsocketClient { } /** Initiate or resume connection. Resolves when `Connected`. */ - async connect(): Promise { + async connect(opts?: { resetBackoff?: boolean }): Promise { + if (opts?.resetBackoff) { + this.reconnectAttempts = 0; + } // Ensure future unexpected closes will auto-reconnect again this.shouldReconnect = true; const current = this.ws; @@ -349,6 +379,8 @@ export class LoroWebsocketClient { this.resolveConnected?.(); // Rejoin rooms after reconnect this.rejoinActiveRooms(); + // Flush any queued joins that were requested while connecting + this.flushQueuedJoins(); } private onSocketError(ws: WebSocket, _event: Event): void { @@ -366,16 +398,14 @@ export class LoroWebsocketClient { } const closeCode = event?.code; - if (closeCode != null && closeCode >= 4400 && closeCode < 4500) { - this.shouldReconnect = false; - } - const closeReason = event?.reason; - if (closeReason === "permission_changed" || closeReason === "room_closed") { + + if (this.isFatalClose(closeCode, closeReason)) { this.shouldReconnect = false; } this.clearPingTimer(); + this.missedPongs = 0; // Clear any pending fragment reassembly timers to avoid late callbacks if (this.fragmentBatches.size) { for (const [, batch] of this.fragmentBatches) { @@ -389,6 +419,15 @@ export class LoroWebsocketClient { this.awaitingPongSince = undefined; this.ops.onWsClose?.(); this.rejectAllPingWaiters(new Error("WebSocket closed")); + const maxAttempts = this.getReconnectPolicy().maxAttempts; + if ( + typeof maxAttempts === "number" && + maxAttempts > 0 && + this.reconnectAttempts >= maxAttempts + ) { + this.shouldReconnect = false; + } + if (!this.shouldReconnect) { this.setStatus(ClientStatus.Disconnected); this.rejectConnected?.(new Error("Disconnected")); @@ -426,10 +465,11 @@ export class LoroWebsocketClient { private scheduleReconnect(immediate = false) { if (this.reconnectTimer) return; + if (this.offline) return; + const policy = this.getReconnectPolicy(); + if (policy.enabled === false) return; const attempt = ++this.reconnectAttempts; - const base = 500; // ms - const max = 15_000; // ms - const delay = immediate ? 0 : Math.min(max, base * 2 ** (attempt - 1)); + const delay = immediate ? 0 : this.computeBackoffDelay(attempt); this.reconnectTimer = setTimeout(() => { this.reconnectTimer = undefined; void this.connect(); @@ -442,6 +482,7 @@ export class LoroWebsocketClient { } private handleOnline = () => { + this.offline = false; if (!this.shouldReconnect) return; if (this.status === ClientStatus.Connected) return; this.clearReconnectTimer(); @@ -449,6 +490,7 @@ export class LoroWebsocketClient { }; private handleOffline = () => { + this.offline = true; // Pause scheduled retries until online this.clearReconnectTimer(); if (this.shouldReconnect) { @@ -868,15 +910,21 @@ export class LoroWebsocketClient { }); this.roomAuth.set(id, auth); - this.ws.send( - encode({ - type: MessageType.JoinRequest, - crdt: crdtAdaptor.crdtType, - roomId, - auth: auth ?? new Uint8Array(), - version: crdtAdaptor.getVersion(), - } as JoinRequest) - ); + const joinPayload = encode({ + type: MessageType.JoinRequest, + crdt: crdtAdaptor.crdtType, + roomId, + auth: auth ?? new Uint8Array(), + version: crdtAdaptor.getVersion(), + } as JoinRequest); + + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(joinPayload); + } else { + this.enqueueJoin(joinPayload); + // ensure a connection attempt is running + void this.connect(); + } return room; } @@ -907,6 +955,7 @@ export class LoroWebsocketClient { if (ws && this.socketListeners.has(ws)) { this.ops.onWsClose?.(); } + this.queuedJoins = []; this.detachSocketListeners(ws); this.flushAndCloseWebSocket(ws, { code: 1000, @@ -1014,6 +1063,7 @@ export class LoroWebsocketClient { if (ws && this.socketListeners.has(ws)) { this.ops.onWsClose?.(); } + this.queuedJoins = []; this.detachSocketListeners(ws); try { this.removeNetworkListeners?.(); @@ -1098,6 +1148,21 @@ export class LoroWebsocketClient { if (!interval) return; this.clearPingTimer(); this.pingTimer = setInterval(() => { + const now = Date.now(); + const timeoutMs = Math.max(1, this.ops.pingTimeoutMs ?? 10_000); + if ( + this.awaitingPongSince != null && + now - this.awaitingPongSince > timeoutMs + ) { + this.missedPongs += 1; + this.awaitingPongSince = now; + if (this.missedPongs >= 2) { + try { + this.ws?.close(1001, "ping_timeout"); + } catch {} + return; + } + } try { if (this.ws && this.ws.readyState === WebSocket.OPEN) { // Avoid overlapping RTT probes @@ -1132,6 +1197,7 @@ export class LoroWebsocketClient { } this.awaitingPongSince = undefined; } + this.missedPongs = 0; // Resolve all waiters on any pong if (this.pingWaiters.length > 0) { const waiters = this.pingWaiters.splice(0, this.pingWaiters.length); @@ -1148,6 +1214,74 @@ export class LoroWebsocketClient { } catch {} } } + + /** Manual reconnect helper that resets backoff and attempts immediately. */ + retryNow(): Promise { + return this.connect({ resetBackoff: true }); + } + + private getReconnectPolicy() { + const p = this.ops.reconnect ?? {}; + return { + enabled: p.enabled ?? true, + initialDelayMs: Math.max(1, p.initialDelayMs ?? 500), + maxDelayMs: Math.max(1, p.maxDelayMs ?? 15_000), + jitter: Math.max(0, Math.min(1, p.jitter ?? 0.25)), + maxAttempts: p.maxAttempts ?? "infinite", + fatalCloseCodes: p.fatalCloseCodes ?? [ + 1008, + 1011, + // 4400-4499 + ...Array.from({ length: 100 }, (_, i) => 4400 + i), + ], + fatalCloseReasons: p.fatalCloseReasons ?? [ + "permission_changed", + "room_closed", + "auth_failed", + ], + }; + } + + private computeBackoffDelay(attempt: number): number { + const policy = this.getReconnectPolicy(); + const base = policy.initialDelayMs; + const max = policy.maxDelayMs; + const raw = base * 2 ** Math.max(0, attempt - 1); + const jitterFactor = + 1 + + (policy.jitter === 0 + ? 0 + : (Math.random() * 2 - 1) * policy.jitter); + const withJitter = raw * jitterFactor; + return Math.min(max, Math.max(0, Math.floor(withJitter))); + } + + private isFatalClose(code?: number, reason?: string): boolean { + const policy = this.getReconnectPolicy(); + if (code != null && policy.fatalCloseCodes.includes(code)) return true; + if (reason && policy.fatalCloseReasons.includes(reason)) return true; + return false; + } + + private enqueueJoin(payload: Uint8Array) { + if (this.queuedJoins.length >= this.maxQueuedJoins) { + throw new Error("Too many pending joins; connection not ready"); + } + this.queuedJoins.push(payload); + } + + private flushQueuedJoins() { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + if (!this.queuedJoins.length) return; + const items = this.queuedJoins.splice(0, this.queuedJoins.length); + for (const payload of items) { + try { + this.ws.send(payload); + } catch (e) { + console.error("Failed to flush queued join:", e); + } + } + } } export interface LoroWebsocketClientRoom { diff --git a/packages/loro-websocket/tests/e2e.test.ts b/packages/loro-websocket/tests/e2e.test.ts index 9e88b1d..7af4d24 100644 --- a/packages/loro-websocket/tests/e2e.test.ts +++ b/packages/loro-websocket/tests/e2e.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect, beforeAll, afterAll } from "vitest"; -import { WebSocket } from "ws"; +import { WebSocket, WebSocketServer } from "ws"; import getPort from "get-port"; import { SimpleServer } from "../src/server/simple-server"; import { LoroWebsocketClient, ClientStatus } from "../src/client"; @@ -357,13 +357,12 @@ describe("E2E: Client-Server Sync", () => { unsubscribe2 = client2.onStatusChange(s => statuses2.push(s)); await Promise.all([client1.waitConnected(), client2.waitConnected()]); - const initialConnectedCount1 = statuses1.filter( + const initialConnected1 = statuses1.filter( s => s === ClientStatus.Connected ).length; - const initialConnectedCount2 = statuses2.filter( + const initialConnected2 = statuses2.filter( s => s === ClientStatus.Connected ).length; - const adaptor1 = new LoroAdaptor(); const adaptor2 = new LoroAdaptor(); @@ -408,9 +407,9 @@ describe("E2E: Client-Server Sync", () => { await waitUntil( () => statuses1.filter(s => s === ClientStatus.Connected).length > - initialConnectedCount1 && + initialConnected1 && statuses2.filter(s => s === ClientStatus.Connected).length > - initialConnectedCount2, + initialConnected2, 5000, 25 ); @@ -484,6 +483,13 @@ describe("E2E: Client-Server Sync", () => { await new Promise(resolve => setTimeout(resolve, 200)); await server.start(); + // Without an online event, auto-retry should stay paused + await new Promise(resolve => setTimeout(resolve, 400)); + expect(statuses1[statuses1.length - 1]).toBe(ClientStatus.Disconnected); + expect(statuses2[statuses2.length - 1]).toBe(ClientStatus.Disconnected); + + // Manual retry should override the pause + await Promise.all([client1.retryNow(), client2.retryNow()]); await waitUntil( () => client1!.getStatus() === ClientStatus.Connected && @@ -492,18 +498,6 @@ describe("E2E: Client-Server Sync", () => { 50 ); - expect((navigator as { onLine?: boolean }).onLine).toBe(false); - - await waitUntil( - () => - statuses1.filter(s => s === ClientStatus.Connected).length > - initialConnectedCount1 && - statuses2.filter(s => s === ClientStatus.Connected).length > - initialConnectedCount2, - 5000, - 25 - ); - text1.insert(text1.length, " rebound"); adaptor1.getDoc().commit(); await waitUntil(() => text2.toString() === "seed rebound", 5000, 50); @@ -565,6 +559,140 @@ describe("E2E: Client-Server Sync", () => { client.destroy(); }, 15000); + + it("stops auto-retry after fatal close code and retryNow reconnects", async () => { + const fatalPort = await getPort(); + const wss = new WebSocketServer({ port: fatalPort }); + let connections = 0; + wss.on("connection", ws => { + connections++; + const connId = connections; + ws.on("message", data => { + const text = data.toString(); + if (text === "ping") { + // Only respond for non-fatal connections + if (connId >= 2) { + ws.send("pong"); + } + } + }); + if (connId === 1) { + setTimeout(() => { + try { + ws.close(1008, "policy"); + } catch {} + }, 30); + } + }); + + const client = new LoroWebsocketClient({ + url: `ws://localhost:${fatalPort}`, + pingIntervalMs: 50, + pingTimeoutMs: 200, + }); + const statuses: string[] = []; + const off = client.onStatusChange(s => statuses.push(s)); + + await client.waitConnected(); + await waitUntil( + () => statuses.includes(ClientStatus.Disconnected), + 4000, + 25 + ); + + // Ensure it does not auto-retry after fatal close + const lenAfterFatal = statuses.length; + await new Promise(resolve => setTimeout(resolve, 400)); + expect(statuses.length).toBe(lenAfterFatal); + expect(statuses[statuses.length - 1]).toBe(ClientStatus.Disconnected); + + // Manual retry should reconnect + await client.retryNow(); + await waitUntil( + () => client.getStatus() === ClientStatus.Connected, + 4000, + 25 + ); + + off(); + client.destroy(); + wss.close(); + }, 10000); + + it("queues joins issued while connecting and flushes once connected", async () => { + const queuedPort = await getPort(); + const client = new LoroWebsocketClient({ + url: `ws://localhost:${queuedPort}`, + pingIntervalMs: 200, // slow ping to avoid noise before server up + }); + const adaptor = new LoroAdaptor(); + + const joinPromise = client.join({ + roomId: "queued-join", + crdtAdaptor: adaptor, + }); + + // Start server after join was requested + await new Promise(resolve => setTimeout(resolve, 200)); + const server = new SimpleServer({ port: queuedPort }); + await server.start(); + + const room = await joinPromise; + const text = adaptor.getDoc().getText("q"); + text.insert(0, "hello"); + adaptor.getDoc().commit(); + await room.waitForReachingServerVersion(); + + await room.destroy(); + client.destroy(); + await server.stop(); + }, 12000); + + it("forces reconnect after ping timeout and recovers when pongs return", async () => { + const pongPort = await getPort(); + const wss = new WebSocketServer({ port: pongPort }); + let connId = 0; + wss.on("connection", ws => { + const id = ++connId; + ws.on("message", data => { + const text = data.toString(); + if (text === "ping") { + if (id >= 2) ws.send("pong"); + // first connection intentionally ignores to trigger timeout + } + }); + }); + + const statuses: string[] = []; + const client = new LoroWebsocketClient({ + url: `ws://localhost:${pongPort}`, + pingIntervalMs: 80, + pingTimeoutMs: 100, + reconnect: { initialDelayMs: 50, maxDelayMs: 200, jitter: 0 }, + }); + client.onStatusChange(s => statuses.push(s)); + + await client.waitConnected(); + await waitUntil( + () => statuses.includes(ClientStatus.Disconnected), + 5000, + 25 + ); + await waitUntil( + () => + statuses.filter(s => s === ClientStatus.Connected).length >= 2 && + connId >= 2, + 6000, + 25 + ); + + // Stay connected for a short window to ensure stability + await new Promise(resolve => setTimeout(resolve, 200)); + expect(client.getStatus()).toBe(ClientStatus.Connected); + + client.destroy(); + wss.close(); + }, 12000); }); function installMockWindow(initialOnline = true) { From 48606ccd72c3e9515b6e6aaf858bcffc81294cac Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Thu, 4 Dec 2025 23:37:51 +0800 Subject: [PATCH 3/9] fix: room auto rejoin and status listener --- .../prd/000-websocket-client-reconnect.md | 15 +++ packages/loro-websocket/src/client/index.ts | 100 +++++++++++++++++- packages/loro-websocket/tests/e2e.test.ts | 94 +++++++++++++++- 3 files changed, 202 insertions(+), 7 deletions(-) diff --git a/packages/loro-websocket/prd/000-websocket-client-reconnect.md b/packages/loro-websocket/prd/000-websocket-client-reconnect.md index 2aafc6d..bf43958 100644 --- a/packages/loro-websocket/prd/000-websocket-client-reconnect.md +++ b/packages/loro-websocket/prd/000-websocket-client-reconnect.md @@ -50,6 +50,21 @@ 5) **Manual reconnect** - Add `retryNow()` (alias `connect({ resetBackoff: true })`) to attempt immediately and reset backoff. Also re-enables auto-retry if it had stopped. +6) **Room rejoin resilience** + - Keep an `activeRooms` registry (roomId + crdt) for rooms that were successfully joined. + - On reconnect, automatically resend JoinRequest for every active room; include original auth bytes. + - If rejoin fails (fatal join error), emit a rejoin failure event and leave the room unjoined; caller can decide to retry or surface UI. + - Pending joins (issued while CONNECTING) remain queued and flush on OPEN; successful joins add to `activeRooms`. + +7) **Per-room status callbacks** + - `join` accepts `onStatusChange?: (status) => void` where status ∈ `connecting | joined | reconnecting | disconnected | error`. + - Transitions: + - `connecting`: initial join or queued join flushing. + - `joined`: join/rejoin success. + - `reconnecting`: socket closed unexpectedly and room queued for auto-rejoin. + - `disconnected`: reconnects disabled (fatal close/maxAttempts/close()). + - `error`: join/rejoin failure (e.g., auth denied); room is removed from active set so callers can rejoin manually. + ## UX / API Back‑Compat - Defaults mimic today’s timing and infinite retries; jitter and fatal code handling are additive improvements. - Existing `onStatusChange` continues to work; new states (`reconnecting`) will be documented but won’t break existing string comparisons (add to `ClientStatus` map). diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index 45ff053..aa805cf 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -34,6 +34,7 @@ interface PendingRoom { adaptor: CrdtDocAdaptor; roomId: string; auth?: Uint8Array; + isRejoin?: boolean; } interface InternalRoomHandler { @@ -113,6 +114,16 @@ export interface LoroWebsocketClientOptions { }; } +export const RoomJoinStatus = { + Connecting: "connecting", + Joined: "joined", + Reconnecting: "reconnecting", + Disconnected: "disconnected", + Error: "error", +} as const; +export type RoomJoinStatusValue = + (typeof RoomJoinStatus)[keyof typeof RoomJoinStatus]; + /** * Loro websocket client with auto-reconnect, connection status events, and latency tracking. * @@ -145,6 +156,10 @@ export class LoroWebsocketClient { // Track roomId for each active id so we can rejoin on reconnect private roomIds: Map = new Map(); private roomAuth: Map = new Map(); + private roomStatusListeners: Map< + string, + Set<(s: RoomJoinStatusValue) => void> + > = new Map(); private socketListeners = new WeakMap(); private pingTimer?: ReturnType; @@ -428,9 +443,23 @@ export class LoroWebsocketClient { this.shouldReconnect = false; } + // Update room-level status based on whether we will retry + for (const [id] of this.activeRooms) { + if (this.shouldReconnect) { + this.emitRoomStatus(id, RoomJoinStatus.Reconnecting); + } else { + this.emitRoomStatus(id, RoomJoinStatus.Disconnected); + } + } + if (!this.shouldReconnect) { this.setStatus(ClientStatus.Disconnected); this.rejectConnected?.(new Error("Disconnected")); + // Fail all pending joins and mark rooms disconnected/error + const err = new Error( + closeReason ? `Disconnected: ${closeReason}` : "Disconnected" + ); + this.failAllPendingRooms(err, this.shouldReconnect ? RoomJoinStatus.Reconnecting : RoomJoinStatus.Disconnected); return; } // Start (or continue) exponential backoff retries @@ -514,18 +543,27 @@ export class LoroWebsocketClient { room: roomPromise, resolve: (res: JoinResponseOk) => { // On successful rejoin, let adaptor reconcile to server - adaptor.handleJoinOk(res).catch(e => { - console.error(e); - }); - // Clean up pending entry for this id + adaptor + .handleJoinOk(res) + .catch(e => { + console.error(e); + }) + .finally(() => { + this.emitRoomStatus(id, RoomJoinStatus.Joined); + }); this.pendingRooms.delete(id); + this.emitRoomStatus(id, RoomJoinStatus.Joined); }, reject: (error: Error) => { console.error("Rejoin failed:", error); + // Remove stale room entry so callers can decide to rejoin manually + this.cleanupRoom(roomId, adaptor.crdtType); + this.emitRoomStatus(id, RoomJoinStatus.Error); }, adaptor, roomId, auth: this.roomAuth.get(id), + isRejoin: true, }; this.pendingRooms.set(id, pending); @@ -539,6 +577,7 @@ export class LoroWebsocketClient { version: adaptor.getVersion(), } as JoinRequest) ); + this.emitRoomStatus(id, RoomJoinStatus.Reconnecting); } catch (e) { console.error("Failed to send rejoin request:", e); } @@ -713,6 +752,7 @@ export class LoroWebsocketClient { } this.pendingRooms.delete(id); + this.emitRoomStatus(id, RoomJoinStatus.Joined); } private async handleJoinError( @@ -756,7 +796,16 @@ export class LoroWebsocketClient { } // No retry possible, reject the promise - pending.reject(new Error(`Join failed: ${msg.code} - ${msg.message}`)); + const err = new Error(`Join failed: ${msg.code} - ${msg.message}`); + this.emitRoomStatus( + pending.adaptor.crdtType + pending.roomId, + RoomJoinStatus.Error + ); + // Remove active room references so caller can rejoin manually if this was a rejoin + if (pending.isRejoin) { + this.cleanupRoom(pending.roomId, pending.adaptor.crdtType); + } + pending.reject(err); this.pendingRooms.delete(roomId); } @@ -767,6 +816,7 @@ export class LoroWebsocketClient { this.roomAdaptors.delete(id); this.roomIds.delete(id); this.roomAuth.delete(id); + this.roomStatusListeners.delete(id); } waitConnected() { @@ -819,10 +869,12 @@ export class LoroWebsocketClient { roomId, crdtAdaptor, auth, + onStatusChange, }: { roomId: string; crdtAdaptor: CrdtDocAdaptor; auth?: Uint8Array; + onStatusChange?: (s: RoomJoinStatusValue) => void; }): Promise { const id = crdtAdaptor.crdtType + roomId; // Check if already joining or joined @@ -844,6 +896,16 @@ export class LoroWebsocketClient { reject = reject_; }); + if (onStatusChange) { + let set = this.roomStatusListeners.get(id); + if (!set) { + set = new Set(); + this.roomStatusListeners.set(id, set); + } + set.add(onStatusChange); + } + this.emitRoomStatus(id, RoomJoinStatus.Connecting); + const room = response.then(res => { // Set adaptor ctx first so it's ready to send updates crdtAdaptor.setCtx({ @@ -894,6 +956,7 @@ export class LoroWebsocketClient { handler, crdtAdaptor ); + this.emitRoomStatus(id, RoomJoinStatus.Joined); crdtAdaptor.handleJoinOk(res).catch(e => { console.error(e); }); @@ -1069,6 +1132,7 @@ export class LoroWebsocketClient { this.removeNetworkListeners?.(); } catch {} this.removeNetworkListeners = undefined; + this.roomStatusListeners.clear(); // Close websocket after flushing pending frames try { this.flushAndCloseWebSocket(ws, { @@ -1282,6 +1346,32 @@ export class LoroWebsocketClient { } } } + + private emitRoomStatus(roomKey: string, status: RoomJoinStatusValue) { + const set = this.roomStatusListeners.get(roomKey); + if (!set || set.size === 0) return; + for (const cb of Array.from(set)) { + try { + cb(status); + } catch {} + } + } + + private failAllPendingRooms(err: Error, status: RoomJoinStatusValue) { + const entries = Array.from(this.pendingRooms.entries()); + for (const [id, pending] of entries) { + try { + this.emitRoomStatus(id, status); + } catch {} + try { + pending.reject(err); + } catch {} + try { + this.cleanupRoom(pending.roomId, pending.adaptor.crdtType); + } catch {} + this.pendingRooms.delete(id); + } + } } export interface LoroWebsocketClientRoom { diff --git a/packages/loro-websocket/tests/e2e.test.ts b/packages/loro-websocket/tests/e2e.test.ts index 7af4d24..3618a0e 100644 --- a/packages/loro-websocket/tests/e2e.test.ts +++ b/packages/loro-websocket/tests/e2e.test.ts @@ -308,8 +308,18 @@ describe("E2E: Client-Server Sync", () => { const adaptor1 = new LoroAdaptor(); const adaptor2 = new LoroAdaptor(); - await client1.join({ roomId: "rejoin-room", crdtAdaptor: adaptor1 }); - await client2.join({ roomId: "rejoin-room", crdtAdaptor: adaptor2 }); + const statusLog: Array<{ client: number; status: string }> = []; + + await client1.join({ + roomId: "rejoin-room", + crdtAdaptor: adaptor1, + onStatusChange: s => statusLog.push({ client: 1, status: s }), + }); + await client2.join({ + roomId: "rejoin-room", + crdtAdaptor: adaptor2, + onStatusChange: s => statusLog.push({ client: 2, status: s }), + }); // Stop server so both clients disconnect and attempt to reconnect await server.stop(); @@ -337,6 +347,9 @@ describe("E2E: Client-Server Sync", () => { 50 ); + expect(statusLog.some(e => e.status === "reconnecting")).toBe(true); + expect(statusLog.some(e => e.status === "joined")).toBe(true); + client1.destroy(); client2.destroy(); }, 20000); @@ -511,6 +524,42 @@ describe("E2E: Client-Server Sync", () => { } }, 20000); + it("emits room status error when auth changes and stops auto rejoin", async () => { + let allowAuth = true; + const authPort = await getPort(); + const authServer = new SimpleServer({ + port: authPort, + authenticate: async (_roomId, _crdt, _auth) => (allowAuth ? "write" : null), + }); + await authServer.start(); + + const client = new LoroWebsocketClient({ url: `ws://localhost:${authPort}` }); + await client.waitConnected(); + const adaptor = new LoroAdaptor(); + const statuses: string[] = []; + await client.join({ + roomId: "auth-room", + crdtAdaptor: adaptor, + auth: new TextEncoder().encode("token"), + onStatusChange: s => statuses.push(s), + }); + + // Take server offline, then disallow auth and bring it back + await authServer.stop(); + allowAuth = false; + await authServer.start(); + + await waitUntil( + () => + statuses.includes("error"), + 8000, + 50 + ); + + client.destroy(); + await authServer.stop(); + }, 15000); + it("destroy rejects pending ping waiters", async () => { const client = new LoroWebsocketClient({ url: `ws://localhost:${port}` }); await client.waitConnected(); @@ -619,6 +668,42 @@ describe("E2E: Client-Server Sync", () => { wss.close(); }, 10000); + it("marks room disconnected when maxAttempts reached", async () => { + const maxPort = await getPort(); + const wss = new WebSocketServer({ port: maxPort }); + wss.on("connection", ws => { + // Immediately close to force retries + ws.close(1011, "boom"); + }); + + const statuses: string[] = []; + const roomStatuses: string[] = []; + const client = new LoroWebsocketClient({ + url: `ws://localhost:${maxPort}`, + reconnect: { maxAttempts: 1, initialDelayMs: 50, maxDelayMs: 50 }, + }); + client.onStatusChange(s => statuses.push(s)); + + const adaptor = new LoroAdaptor(); + const join = client.join({ + roomId: "limited-room", + crdtAdaptor: adaptor, + onStatusChange: s => roomStatuses.push(s), + }); + + await expect(join).rejects.toBeTruthy(); + + await waitUntil( + () => statuses.at(-1) === ClientStatus.Disconnected, + 4000, + 25 + ); + expect(roomStatuses.at(-1)).toBe("disconnected"); + + client.destroy(); + wss.close(); + }, 8000); + it("queues joins issued while connecting and flushes once connected", async () => { const queuedPort = await getPort(); const client = new LoroWebsocketClient({ @@ -626,10 +711,12 @@ describe("E2E: Client-Server Sync", () => { pingIntervalMs: 200, // slow ping to avoid noise before server up }); const adaptor = new LoroAdaptor(); + const statuses: string[] = []; const joinPromise = client.join({ roomId: "queued-join", crdtAdaptor: adaptor, + onStatusChange: s => statuses.push(s), }); // Start server after join was requested @@ -643,6 +730,9 @@ describe("E2E: Client-Server Sync", () => { adaptor.getDoc().commit(); await room.waitForReachingServerVersion(); + expect(statuses).toContain("connecting"); + expect(statuses).toContain("joined"); + await room.destroy(); client.destroy(); await server.stop(); From 6fc9ef956f21c7173ace806532fa1c1a8241d30a Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Fri, 5 Dec 2025 09:43:47 +0800 Subject: [PATCH 4/9] chore: fix warnings --- packages/loro-websocket/src/client/index.ts | 2 +- packages/loro-websocket/tests/e2e.test.ts | 23 +++++++++++++++------ 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index aa805cf..6de99bc 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -496,7 +496,7 @@ export class LoroWebsocketClient { if (this.reconnectTimer) return; if (this.offline) return; const policy = this.getReconnectPolicy(); - if (policy.enabled === false) return; + if (!policy.enabled) return; const attempt = ++this.reconnectAttempts; const delay = immediate ? 0 : this.computeBackoffDelay(attempt); this.reconnectTimer = setTimeout(() => { diff --git a/packages/loro-websocket/tests/e2e.test.ts b/packages/loro-websocket/tests/e2e.test.ts index 3618a0e..6b20983 100644 --- a/packages/loro-websocket/tests/e2e.test.ts +++ b/packages/loro-websocket/tests/e2e.test.ts @@ -458,10 +458,11 @@ describe("E2E: Client-Server Sync", () => { await Promise.all([client1.waitConnected(), client2.waitConnected()]); - const initialConnectedCount1 = statuses1.filter( + // Keep placeholders to mirror earlier assertions; unused now + const _initialConnectedCount1 = statuses1.filter( s => s === ClientStatus.Connected ).length; - const initialConnectedCount2 = statuses2.filter( + const _initialConnectedCount2 = statuses2.filter( s => s === ClientStatus.Connected ).length; @@ -616,8 +617,13 @@ describe("E2E: Client-Server Sync", () => { wss.on("connection", ws => { connections++; const connId = connections; - ws.on("message", data => { - const text = data.toString(); + ws.on("message", (data: Buffer | ArrayBuffer | string) => { + const text = + typeof data === "string" + ? data + : Buffer.isBuffer(data) + ? data.toString() + : new TextDecoder().decode(new Uint8Array(data as ArrayBuffer)); if (text === "ping") { // Only respond for non-fatal connections if (connId >= 2) { @@ -744,8 +750,13 @@ describe("E2E: Client-Server Sync", () => { let connId = 0; wss.on("connection", ws => { const id = ++connId; - ws.on("message", data => { - const text = data.toString(); + ws.on("message", (data: Buffer | ArrayBuffer | string) => { + const text = + typeof data === "string" + ? data + : Buffer.isBuffer(data) + ? data.toString() + : new TextDecoder().decode(new Uint8Array(data as ArrayBuffer)); if (text === "ping") { if (id >= 2) ws.send("pong"); // first connection intentionally ignores to trigger timeout From 8173190193db48998dabefb9ad1877bd47c173d9 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Fri, 5 Dec 2025 10:33:46 +0800 Subject: [PATCH 5/9] fix: enhance error handling in LoroWebsocketClient callbacks and ensure proper logging --- packages/loro-websocket/src/client/index.ts | 90 +++++++++++---------- packages/loro-websocket/tests/e2e.test.ts | 74 +++++++++++++++++ 2 files changed, 121 insertions(+), 43 deletions(-) diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index 6de99bc..8d76710 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -289,7 +289,9 @@ export class LoroWebsocketClient { // Emit current immediately to inform subscribers try { cb(this.status); - } catch {} + } catch (err) { + this.logCbError("onStatusChange", err); + } return () => this.statusListeners.delete(cb); } @@ -299,7 +301,9 @@ export class LoroWebsocketClient { if (this.lastLatencyMs != null) { try { cb(this.lastLatencyMs); - } catch {} + } catch (err) { + this.logCbError("onLatency", err); + } } return () => this.latencyListeners.delete(cb); } @@ -311,7 +315,9 @@ export class LoroWebsocketClient { for (const cb of listeners) { try { cb(s); - } catch {} + } catch (err) { + this.logCbError("onStatusChange", err); + } } } @@ -344,9 +350,7 @@ export class LoroWebsocketClient { this.attachSocketListeners(ws); - try { - ws.binaryType = "arraybuffer"; - } catch {} + ws.binaryType = "arraybuffer"; return this.connectedPromise; } @@ -384,7 +388,7 @@ export class LoroWebsocketClient { this.detachSocketListeners(ws); try { ws.close(1000, "Superseded"); - } catch {} + } catch { } return; } this.clearReconnectTimer(); @@ -424,9 +428,7 @@ export class LoroWebsocketClient { // Clear any pending fragment reassembly timers to avoid late callbacks if (this.fragmentBatches.size) { for (const [, batch] of this.fragmentBatches) { - try { - clearTimeout(batch.timeoutId); - } catch {} + clearTimeout(batch.timeoutId); } this.fragmentBatches.clear(); } @@ -476,9 +478,7 @@ export class LoroWebsocketClient { } if (typeof event.data === "string") { if (event.data === "ping") { - try { - ws.send("pong"); - } catch {} + ws.send("pong"); return; } if (event.data === "pong") { @@ -526,7 +526,7 @@ export class LoroWebsocketClient { this.setStatus(ClientStatus.Disconnected); try { this.ws?.close(1001, "Offline"); - } catch {} + } catch { } } }; @@ -549,10 +549,9 @@ export class LoroWebsocketClient { console.error(e); }) .finally(() => { + this.pendingRooms.delete(id); this.emitRoomStatus(id, RoomJoinStatus.Joined); }); - this.pendingRooms.delete(id); - this.emitRoomStatus(id, RoomJoinStatus.Joined); }, reject: (error: Error) => { console.error("Rejoin failed:", error); @@ -669,7 +668,7 @@ export class LoroWebsocketClient { } as UpdateError) ); } - } catch {} + } catch { } }, 10000); this.fragmentBatches.set(batchKey, { @@ -956,7 +955,6 @@ export class LoroWebsocketClient { handler, crdtAdaptor ); - this.emitRoomStatus(id, RoomJoinStatus.Joined); crdtAdaptor.handleJoinOk(res).catch(e => { console.error(e); }); @@ -1009,7 +1007,7 @@ export class LoroWebsocketClient { for (const [, batch] of this.fragmentBatches) { try { clearTimeout(batch.timeoutId); - } catch {} + } catch { } } this.fragmentBatches.clear(); } @@ -1117,7 +1115,7 @@ export class LoroWebsocketClient { for (const [, batch] of this.fragmentBatches) { try { clearTimeout(batch.timeoutId); - } catch {} + } catch { } } this.fragmentBatches.clear(); } @@ -1130,7 +1128,7 @@ export class LoroWebsocketClient { this.detachSocketListeners(ws); try { this.removeNetworkListeners?.(); - } catch {} + } catch { } this.removeNetworkListeners = undefined; this.roomStatusListeners.clear(); // Close websocket after flushing pending frames @@ -1139,7 +1137,7 @@ export class LoroWebsocketClient { code: 1000, reason: "Client destroyed", }); - } catch {} + } catch { } this.setStatus(ClientStatus.Disconnected); } @@ -1156,9 +1154,7 @@ export class LoroWebsocketClient { }; if (readBufferedAmount() == null) { - try { - ws.close(code, reason); - } catch {} + ws.close(code, reason); return; } @@ -1169,9 +1165,7 @@ export class LoroWebsocketClient { const state = ws.readyState; if (state === WebSocket.CLOSED || state === WebSocket.CLOSING) { requested = true; - try { - ws.close(code, reason); - } catch {} + ws.close(code, reason); return; } @@ -1182,9 +1176,7 @@ export class LoroWebsocketClient { Date.now() - start >= timeoutMs ) { requested = true; - try { - ws.close(code, reason); - } catch {} + ws.close(code, reason); return; } @@ -1203,7 +1195,7 @@ export class LoroWebsocketClient { ws.removeEventListener?.("error", handlers.error); ws.removeEventListener?.("close", handlers.close); ws.removeEventListener?.("message", handlers.message); - } catch {} + } catch { } this.socketListeners.delete(ws); } @@ -1223,10 +1215,12 @@ export class LoroWebsocketClient { if (this.missedPongs >= 2) { try { this.ws?.close(1001, "ping_timeout"); - } catch {} - return; + } catch (err) { + this.logCbError("pingTimer close", err); } + return; } + } try { if (this.ws && this.ws.readyState === WebSocket.OPEN) { // Avoid overlapping RTT probes @@ -1237,7 +1231,9 @@ export class LoroWebsocketClient { // Still awaiting a pong; skip sending another ping } } - } catch {} + } catch (err) { + this.logCbError("pingTimer send", err); + } }, interval); } @@ -1256,7 +1252,9 @@ export class LoroWebsocketClient { for (const cb of listeners) { try { cb(rtt); - } catch {} + } catch (err) { + this.logCbError("onLatency", err); + } } } this.awaitingPongSince = undefined; @@ -1275,7 +1273,7 @@ export class LoroWebsocketClient { try { clearTimeout(w.timeoutId); w.reject(err); - } catch {} + } catch { } } } @@ -1320,6 +1318,11 @@ export class LoroWebsocketClient { return Math.min(max, Math.max(0, Math.floor(withJitter))); } + private logCbError(context: string, err: unknown) { + // eslint-disable-next-line no-console + console.error(`[loro-websocket] ${context} callback threw`, err); + } + private isFatalClose(code?: number, reason?: string): boolean { const policy = this.getReconnectPolicy(); if (code != null && policy.fatalCloseCodes.includes(code)) return true; @@ -1353,7 +1356,9 @@ export class LoroWebsocketClient { for (const cb of Array.from(set)) { try { cb(status); - } catch {} + } catch (err) { + this.logCbError("onRoomStatusChange", err); + } } } @@ -1362,13 +1367,13 @@ export class LoroWebsocketClient { for (const [id, pending] of entries) { try { this.emitRoomStatus(id, status); - } catch {} + } catch { } try { pending.reject(err); - } catch {} + } catch { } try { this.cleanupRoom(pending.roomId, pending.adaptor.crdtType); - } catch {} + } catch { } this.pendingRooms.delete(id); } } @@ -1387,8 +1392,7 @@ export interface LoroWebsocketClientRoom { } class LoroWebsocketClientRoomImpl - implements LoroWebsocketClientRoom, InternalRoomHandler -{ + implements LoroWebsocketClientRoom, InternalRoomHandler { private client: LoroWebsocketClient; private roomId: string; private crdtType: CrdtType; diff --git a/packages/loro-websocket/tests/e2e.test.ts b/packages/loro-websocket/tests/e2e.test.ts index 6b20983..ba9c931 100644 --- a/packages/loro-websocket/tests/e2e.test.ts +++ b/packages/loro-websocket/tests/e2e.test.ts @@ -744,6 +744,80 @@ describe("E2E: Client-Server Sync", () => { await server.stop(); }, 12000); + it("emits room joined status exactly once for an initial join", async () => { + const localPort = await getPort(); + const localServer = new SimpleServer({ port: localPort }); + await localServer.start(); + const client = new LoroWebsocketClient({ url: `ws://localhost:${localPort}` }); + try { + await client.waitConnected(); + const adaptor = new LoroAdaptor(); + const statuses: string[] = []; + await client.join({ + roomId: "single-join", + crdtAdaptor: adaptor, + onStatusChange: s => statuses.push(s), + }); + + await waitUntil( + () => statuses.filter(s => s === "joined").length === 1, + 5000, + 25 + ); + await new Promise(resolve => setTimeout(resolve, 50)); + expect(statuses.filter(s => s === "joined").length).toBe(1); + expect(statuses[0]).toBe("connecting"); + } finally { + client.destroy(); + await localServer.stop(); + } + }, 10000); + + it("emits room joined status once per reconnect cycle", async () => { + const localPort = await getPort(); + const localServer = new SimpleServer({ port: localPort }); + await localServer.start(); + const client = new LoroWebsocketClient({ url: `ws://localhost:${localPort}` }); + try { + await client.waitConnected(); + const adaptor = new LoroAdaptor(); + const statuses: string[] = []; + await client.join({ + roomId: "rejoin-once", + crdtAdaptor: adaptor, + onStatusChange: s => statuses.push(s), + }); + + await waitUntil( + () => statuses.filter(s => s === "joined").length === 1, + 5000, + 25 + ); + + await localServer.stop(); + await waitUntil(() => statuses.includes("reconnecting"), 5000, 25); + await new Promise(resolve => setTimeout(resolve, 200)); + await localServer.start(); + + await waitUntil( + () => client.getStatus() === ClientStatus.Connected, + 8000, + 50 + ); + await waitUntil( + () => statuses.filter(s => s === "joined").length === 2, + 8000, + 50 + ); + + expect(statuses.filter(s => s === "joined").length).toBe(2); + expect(statuses.includes("reconnecting")).toBe(true); + } finally { + client.destroy(); + await localServer.stop(); + } + }, 20000); + it("forces reconnect after ping timeout and recovers when pongs return", async () => { const pongPort = await getPort(); const wss = new WebSocketServer({ port: pongPort }); From df36bb38f0317e5cd778a1eed57c5cde2e50795d Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Fri, 5 Dec 2025 10:52:29 +0800 Subject: [PATCH 6/9] fix: rm maxQueue limit to improve DX --- packages/loro-websocket/prd/000-websocket-client-reconnect.md | 4 ++-- packages/loro-websocket/src/client/index.ts | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/packages/loro-websocket/prd/000-websocket-client-reconnect.md b/packages/loro-websocket/prd/000-websocket-client-reconnect.md index bf43958..8340d9b 100644 --- a/packages/loro-websocket/prd/000-websocket-client-reconnect.md +++ b/packages/loro-websocket/prd/000-websocket-client-reconnect.md @@ -45,7 +45,7 @@ - While `navigator.onLine === false`, do not schedule retries. When `online` fires, schedule immediately. No extra backstop timer to keep behavior predictable. 4) **Safer join during connect** - - Queue join requests while the socket is CONNECTING; flush on OPEN. (“Pending joins” = `join()` calls made before the socket is ready.) Default queue limit: 50 outstanding joins; beyond that, reject with a clear error so apps can back off. + - Queue join requests while the socket is CONNECTING; flush on OPEN. (“Pending joins” = `join()` calls made before the socket is ready.) 5) **Manual reconnect** - Add `retryNow()` (alias `connect({ resetBackoff: true })`) to attempt immediately and reset backoff. Also re-enables auto-retry if it had stopped. @@ -87,4 +87,4 @@ - Regression: existing e2e reconnection and offline tests must still pass with defaults. ## Open Questions -- Is queue limit 50 sufficient, or should it scale with `pendingRooms` size or be user-configurable? +- Do we need a queue limit or backpressure strategy for joins during CONNECTING, or is unbounded queuing acceptable given typical usage? diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index 8d76710..dbc59e9 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -179,7 +179,6 @@ export class LoroWebsocketClient { // Join requests issued while socket is still connecting private queuedJoins: Uint8Array[] = []; - private readonly maxQueuedJoins = 50; constructor(private ops: LoroWebsocketClientOptions) { this.attachNetworkListeners(); @@ -1331,9 +1330,6 @@ export class LoroWebsocketClient { } private enqueueJoin(payload: Uint8Array) { - if (this.queuedJoins.length >= this.maxQueuedJoins) { - throw new Error("Too many pending joins; connection not ready"); - } this.queuedJoins.push(payload); } From d1f1105cfd1c2bcbea993f5534ca612c4e35ac6e Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Fri, 5 Dec 2025 11:03:17 +0800 Subject: [PATCH 7/9] fix: connected promise maintain --- packages/loro-websocket/src/client/index.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index dbc59e9..d7cb06a 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -138,6 +138,12 @@ export type RoomJoinStatusValue = */ export class LoroWebsocketClient { private ws!: WebSocket; + // Invariant: `connectedPromise` always represents the next transition to `Connected`. + // - It resolves exactly once, when the currently active socket fires `open`. + // - It is replaced (via `ensureConnectedPromise`) whenever we start a new connect + // attempt or a reconnect is scheduled, so callers blocking on `waitConnected()` + // will wait for the next successful connection. + // - It rejects only when we deliberately stop reconnecting (`close()` or fatal close). private connectedPromise!: Promise; private resolveConnected?: () => void; private rejectConnected?: (e: Error) => void; @@ -463,6 +469,8 @@ export class LoroWebsocketClient { this.failAllPendingRooms(err, this.shouldReconnect ? RoomJoinStatus.Reconnecting : RoomJoinStatus.Disconnected); return; } + // Renew the promise so callers waiting on waitConnected() block until the next successful reconnect. + this.ensureConnectedPromise(); // Start (or continue) exponential backoff retries this.setStatus(ClientStatus.Disconnected); this.scheduleReconnect(); From d22bdf75121658e9ed013567759ca166109ceb28 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Fri, 5 Dec 2025 11:06:29 +0800 Subject: [PATCH 8/9] docs: simplify reconnect prd --- .../prd/000-websocket-client-reconnect.md | 135 +++++++----------- 1 file changed, 48 insertions(+), 87 deletions(-) diff --git a/packages/loro-websocket/prd/000-websocket-client-reconnect.md b/packages/loro-websocket/prd/000-websocket-client-reconnect.md index 8340d9b..adf8eed 100644 --- a/packages/loro-websocket/prd/000-websocket-client-reconnect.md +++ b/packages/loro-websocket/prd/000-websocket-client-reconnect.md @@ -1,90 +1,51 @@ -# PRD 000 — Stabilize WebSocket Client Reconnect +# WebSocket Client Reconnect -## Problem -- Auto‑reconnect works in simple cases but is brittle in degraded networks and offers little visibility. Developers see flapping status events, stuck “connected” sockets that no longer deliver data, and endless retry loops after auth kicks or offline transitions. -- The current API exposes only coarse `onStatusChange` and a low‑level `onWsClose()` without context, making it hard to debug, alert, or build UX around reconnect. - -## Current Behavior (as implemented) -- Initial connect fires in the constructor; subsequent reconnects are scheduled with exponential backoff: 0.5s, 1s, 2s … capped at 15s (`src/client/index.ts:430-437`). -- Reconnect is controlled by a boolean `shouldReconnect`; it is only flipped to `false` for close codes 4400‑4499 or reasons `permission_changed` / `room_closed` (`src/client/index.ts:365-377`). -- Offline handling clears the timer and closes the socket, but does **not** block the next reconnect timer that will be scheduled by the ensuing `close` event (`src/client/index.ts:451-459`). -- Pings measure latency but never trigger a reconnect on missing pongs; half‑open connections stay in `Connected` forever (`src/client/index.ts:1095-1141`). -- Joins issued while the socket is still CONNECTING send immediately; in Node’s `ws` this throws, rejecting the join but leaving client state inconsistent (`src/client/index.ts:871-880`). -- No jitter in backoff → concurrent clients herd after an outage. -- No visibility hooks for reconnect attempts, reasons, backoff delay, or per‑room rejoin results; `onWsClose` lacks the `CloseEvent`. - -## Observed / Potential Failure Modes -1) **Offline loop**: `handleOffline()` closes the socket, `onSocketClose()` immediately schedules a retry even though the device is offline, causing tight retry/error loops and battery/CPU churn. -2) **Fatal kicks retry forever**: Server close codes like 1008/1011 or application errors keep auto‑retrying because only 440x and two strings are treated as fatal. This spams auth, logs, and server. -3) **Half‑open stall**: If the TCP path drops without a FIN (common on captive portals / mobile), the socket stays “open”; no pongs arrive, but the client never transitions to Disconnected or triggers backoff. -4) **Join during CONNECTING**: Calling `join` before `waitConnected()` can throw in Node, leaving `pendingRooms` populated and adapters without context. -5) **Thundering herd**: Identical backoff without jitter means many clients reconnect in lockstep after outages. -6) **Weak observability**: Callers can’t answer “why did we reconnect?”, “what attempt/delay are we on?”, “was rejoin successful?”, or “what close code did we get?”. Debugging relies on console noise. +Purpose: describe the desired reconnect behavior for the browser/node client in a concise, implementation‑agnostic way. ## Goals -- Make reconnect predictable across offline/online transitions, server kicks, and half‑open links. -- Provide API hooks and telemetry to let applications surface health, analytics, and UX cues. -- Preserve back‑compat by keeping defaults close to today’s behavior, but safer. - -## Non‑Goals -- Rewriting the server or changing wire protocol. -- Adding persistence or storage for reconnect state. - -## Proposed Design (kept minimal) - -1) **Small reconnect policy** - - `reconnect?: { enabled?: boolean; initialDelayMs?: number; maxDelayMs?: number; jitter?: number; maxAttempts?: number | "infinite"; fatalCloseCodes?: number[]; fatalCloseReasons?: string[] }` - - Defaults keep today’s timing (500ms start, cap 15s, jitter 0.25) and infinite retries. - - `maxAttempts` stays supported; callers can set a finite ceiling, otherwise `"infinite"`. - - Fatal closes (4400‑4499, 1008, 1011, `permission_changed`, `room_closed`, `auth_failed`) stop auto‑retry; caller can still call `retryNow()`. - -2) **Ping-based liveness** - - If two consecutive pings miss (`pingTimeoutMs` default 10s), forcibly close with reason `ping_timeout` and enter backoff. Prevents half‑open stalls. - -3) **Offline pause** - - While `navigator.onLine === false`, do not schedule retries. When `online` fires, schedule immediately. No extra backstop timer to keep behavior predictable. - -4) **Safer join during connect** - - Queue join requests while the socket is CONNECTING; flush on OPEN. (“Pending joins” = `join()` calls made before the socket is ready.) - -5) **Manual reconnect** - - Add `retryNow()` (alias `connect({ resetBackoff: true })`) to attempt immediately and reset backoff. Also re-enables auto-retry if it had stopped. - -6) **Room rejoin resilience** - - Keep an `activeRooms` registry (roomId + crdt) for rooms that were successfully joined. - - On reconnect, automatically resend JoinRequest for every active room; include original auth bytes. - - If rejoin fails (fatal join error), emit a rejoin failure event and leave the room unjoined; caller can decide to retry or surface UI. - - Pending joins (issued while CONNECTING) remain queued and flush on OPEN; successful joins add to `activeRooms`. - -7) **Per-room status callbacks** - - `join` accepts `onStatusChange?: (status) => void` where status ∈ `connecting | joined | reconnecting | disconnected | error`. - - Transitions: - - `connecting`: initial join or queued join flushing. - - `joined`: join/rejoin success. - - `reconnecting`: socket closed unexpectedly and room queued for auto-rejoin. - - `disconnected`: reconnects disabled (fatal close/maxAttempts/close()). - - `error`: join/rejoin failure (e.g., auth denied); room is removed from active set so callers can rejoin manually. - -## UX / API Back‑Compat -- Defaults mimic today’s timing and infinite retries; jitter and fatal code handling are additive improvements. -- Existing `onStatusChange` continues to work; new states (`reconnecting`) will be documented but won’t break existing string comparisons (add to `ClientStatus` map). -- `onWsClose` kept but now receives the `CloseEvent` for parity; deprecated in docs in favor of `onClose`. - -## Acceptance Criteria -- Reconnect pauses while offline; resumes immediately on `online`. -- Fatal kicks (440x, 1008/1011, auth strings) do **not** auto‑retry unless caller calls `retryNow()`. -- Half‑open sockets are detected by ping timeout and recover via reconnect. -- Joins issued during connect do not throw and complete once connected or fail with a clear error. -- Minimal API surface: no new event fan‑out beyond `onStatusChange` and `retryNow()`. - -## Testing Plan -- Unit: backoff jitter math, classifyClose matrix, join queuing during CONNECTING, half‑open ping timeout transition. -- E2E: - - server stop/start with offline events → reconnect after online. - - server closes with 1008 → no retry by default. - - missing pongs → forced reconnect. - - many clients (N≥20) reconnect → ensure jitter prevents synchronized attempts. -- Regression: existing e2e reconnection and offline tests must still pass with defaults. - -## Open Questions -- Do we need a queue limit or backpressure strategy for joins during CONNECTING, or is unbounded queuing acceptable given typical usage? +- Stay connected across transient network issues without user code handling retries. +- Avoid tight retry loops when offline or after fatal server closes. +- Provide predictable hooks so apps can show status and react to failures. + +## Connection Model +- States: `connecting`, `connected`, `reconnecting`, `disconnected`, `error`. +- The client starts connecting immediately. Any disconnection while retrying is allowed moves to `reconnecting`; fatal conditions move to `disconnected`. +- A single promise (`waitConnected`) always resolves on the next successful transition to `connected`; it is renewed on each reconnect attempt. + +## Retry Policy +- Enabled by default; exponential backoff starting at ~0.5s, capped around 15s, with jitter (~25%) to prevent herding. +- Retries continue indefinitely unless a maximum attempt count is configured. +- Fatal stop conditions halt retries (e.g., permission/auth failures, explicit fatal close codes or reasons). After a fatal stop, the client remains `disconnected` until manually retried. + +## Liveness & Half‑Open Detection +- Periodic application‑level pings are sent while connected. +- Missing pongs trigger a controlled close with a liveness reason, which then enters the normal backoff flow. This prevents silent half‑open sockets. + +## Offline Behavior +- When the environment reports offline, active retries are paused and the socket is closed cleanly. +- When coming back online, a reconnect is scheduled immediately (backoff resets unless disabled). + +## Join Handling +- `join` calls issued while the socket is not yet open are enqueued and flushed after connect. +- The queue is unbounded by design; applications concerned about backpressure should gate their own join volume. +- Each join exposes optional per‑room status callbacks: `connecting`, `joined`, `reconnecting`, `disconnected`, `error`. + +## Room Rejoin +- Successfully joined rooms are tracked (room id + CRDT type + auth bytes). +- After reconnect, the client automatically resends JoinRequest for each tracked room. +- If a rejoin fails fatally, the room moves to `error` and is removed from the tracked set so callers can decide next steps. + +## Manual Controls +- `connect({ resetBackoff?: boolean })` or `retryNow()` starts/forces a reconnect and optionally resets backoff. +- `close()` stops auto‑reconnect and transitions to `disconnected`; callers must explicitly reconnect afterwards. + +## Observability Hooks +- Client status listener: notifies transitions among the top‑level states. +- Per‑room status listener: notifies the per‑room states listed above. +- Optional latency callback fed by ping RTT measurements. + +## Success Criteria +- Retries pause while offline and resume promptly when online. +- Missing pongs or half‑open links recover via reconnect. +- Fatal closes stop retries; manual retry is still possible. +- Queued joins do not throw and complete once connected; failed rejoins surface as `error` so apps can respond. From f20e287d934164ee183b97daf9bd0621184a8896 Mon Sep 17 00:00:00 2001 From: Zixuan Chen Date: Fri, 5 Dec 2025 11:14:16 +0800 Subject: [PATCH 9/9] fix: prevent unhandled rejection if nobody awaits the connected promise --- packages/loro-websocket/src/client/index.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index d7cb06a..9de839f 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -212,6 +212,8 @@ export class LoroWebsocketClient { reject(err); }; }); + // prevent unhandled rejection if nobody awaits + void this.connectedPromise.catch(() => { }); } private attachNetworkListeners(): void { @@ -1222,12 +1224,12 @@ export class LoroWebsocketClient { if (this.missedPongs >= 2) { try { this.ws?.close(1001, "ping_timeout"); - } catch (err) { - this.logCbError("pingTimer close", err); + } catch (err) { + this.logCbError("pingTimer close", err); + } + return; } - return; } - } try { if (this.ws && this.ws.readyState === WebSocket.OPEN) { // Avoid overlapping RTT probes