diff --git a/packages/loro-websocket/prd/000-websocket-client-reconnect.md b/packages/loro-websocket/prd/000-websocket-client-reconnect.md new file mode 100644 index 0000000..adf8eed --- /dev/null +++ b/packages/loro-websocket/prd/000-websocket-client-reconnect.md @@ -0,0 +1,51 @@ +# WebSocket Client Reconnect + +Purpose: describe the desired reconnect behavior for the browser/node client in a concise, implementation‑agnostic way. + +## Goals +- Stay connected across transient network issues without user code handling retries. +- Avoid tight retry loops when offline or after fatal server closes. +- Provide predictable hooks so apps can show status and react to failures. + +## Connection Model +- States: `connecting`, `connected`, `reconnecting`, `disconnected`, `error`. +- The client starts connecting immediately. Any disconnection while retrying is allowed moves to `reconnecting`; fatal conditions move to `disconnected`. +- A single promise (`waitConnected`) always resolves on the next successful transition to `connected`; it is renewed on each reconnect attempt. + +## Retry Policy +- Enabled by default; exponential backoff starting at ~0.5s, capped around 15s, with jitter (~25%) to prevent herding. +- Retries continue indefinitely unless a maximum attempt count is configured. +- Fatal stop conditions halt retries (e.g., permission/auth failures, explicit fatal close codes or reasons). After a fatal stop, the client remains `disconnected` until manually retried. + +## Liveness & Half‑Open Detection +- Periodic application‑level pings are sent while connected. +- Missing pongs trigger a controlled close with a liveness reason, which then enters the normal backoff flow. This prevents silent half‑open sockets. + +## Offline Behavior +- When the environment reports offline, active retries are paused and the socket is closed cleanly. +- When coming back online, a reconnect is scheduled immediately (backoff resets unless disabled). + +## Join Handling +- `join` calls issued while the socket is not yet open are enqueued and flushed after connect. +- The queue is unbounded by design; applications concerned about backpressure should gate their own join volume. +- Each join exposes optional per‑room status callbacks: `connecting`, `joined`, `reconnecting`, `disconnected`, `error`. + +## Room Rejoin +- Successfully joined rooms are tracked (room id + CRDT type + auth bytes). +- After reconnect, the client automatically resends JoinRequest for each tracked room. +- If a rejoin fails fatally, the room moves to `error` and is removed from the tracked set so callers can decide next steps. + +## Manual Controls +- `connect({ resetBackoff?: boolean })` or `retryNow()` starts/forces a reconnect and optionally resets backoff. +- `close()` stops auto‑reconnect and transitions to `disconnected`; callers must explicitly reconnect afterwards. + +## Observability Hooks +- Client status listener: notifies transitions among the top‑level states. +- Per‑room status listener: notifies the per‑room states listed above. +- Optional latency callback fed by ping RTT measurements. + +## Success Criteria +- Retries pause while offline and resume promptly when online. +- Missing pongs or half‑open links recover via reconnect. +- Fatal closes stop retries; manual retry is still possible. +- Queued joins do not throw and complete once connected; failed rejoins surface as `error` so apps can respond. diff --git a/packages/loro-websocket/src/client/index.ts b/packages/loro-websocket/src/client/index.ts index 55e2fa1..9de839f 100644 --- a/packages/loro-websocket/src/client/index.ts +++ b/packages/loro-websocket/src/client/index.ts @@ -34,6 +34,7 @@ interface PendingRoom { adaptor: CrdtDocAdaptor; roomId: string; auth?: Uint8Array; + isRejoin?: boolean; } interface InternalRoomHandler { @@ -86,12 +87,43 @@ export interface LoroWebsocketClientOptions { url: string; /** Optional custom ping interval. Defaults to 30s. Set with `disablePing` to stop timers. */ pingIntervalMs?: number; + /** Ping timeout; after two consecutive misses the client will force-close and reconnect. Defaults to 10s. */ + pingTimeoutMs?: number; /** Disable periodic ping/pong entirely. */ disablePing?: boolean; /** Optional callback for low-level ws close (before status transitions). */ onWsClose?: () => void; + /** + * Reconnect policy (kept minimal). + * - enabled: toggle auto-retry (default true) + * - initialDelayMs: starting backoff delay (default 500) + * - maxDelayMs: max backoff delay (default 15000) + * - jitter: 0-1 multiplier applied randomly around the delay (default 0.25) + * - maxAttempts: number | "infinite" (default "infinite") + * - fatalCloseCodes: close codes that should not retry (default 4400-4499, 1008, 1011) + * - fatalCloseReasons: close reasons that should not retry (default permission_changed, room_closed, auth_failed) + */ + reconnect?: { + enabled?: boolean; + initialDelayMs?: number; + maxDelayMs?: number; + jitter?: number; + maxAttempts?: number | "infinite"; + fatalCloseCodes?: number[]; + fatalCloseReasons?: string[]; + }; } +export const RoomJoinStatus = { + Connecting: "connecting", + Joined: "joined", + Reconnecting: "reconnecting", + Disconnected: "disconnected", + Error: "error", +} as const; +export type RoomJoinStatusValue = + (typeof RoomJoinStatus)[keyof typeof RoomJoinStatus]; + /** * Loro websocket client with auto-reconnect, connection status events, and latency tracking. * @@ -106,6 +138,12 @@ export interface LoroWebsocketClientOptions { */ export class LoroWebsocketClient { private ws!: WebSocket; + // Invariant: `connectedPromise` always represents the next transition to `Connected`. + // - It resolves exactly once, when the currently active socket fires `open`. + // - It is replaced (via `ensureConnectedPromise`) whenever we start a new connect + // attempt or a reconnect is scheduled, so callers blocking on `waitConnected()` + // will wait for the next successful connection. + // - It rejects only when we deliberately stop reconnecting (`close()` or fatal close). private connectedPromise!: Promise; private resolveConnected?: () => void; private rejectConnected?: (e: Error) => void; @@ -124,6 +162,10 @@ export class LoroWebsocketClient { // Track roomId for each active id so we can rejoin on reconnect private roomIds: Map = new Map(); private roomAuth: Map = new Map(); + private roomStatusListeners: Map< + string, + Set<(s: RoomJoinStatusValue) => void> + > = new Map(); private socketListeners = new WeakMap(); private pingTimer?: ReturnType; @@ -132,12 +174,17 @@ export class LoroWebsocketClient { reject: (err: Error) => void; timeoutId: ReturnType; }> = []; + private missedPongs = 0; // Reconnect controls private shouldReconnect = true; private reconnectAttempts = 0; private reconnectTimer?: ReturnType; private removeNetworkListeners?: () => void; + private offline = false; + + // Join requests issued while socket is still connecting + private queuedJoins: Uint8Array[] = []; constructor(private ops: LoroWebsocketClientOptions) { this.attachNetworkListeners(); @@ -165,6 +212,8 @@ export class LoroWebsocketClient { reject(err); }; }); + // prevent unhandled rejection if nobody awaits + void this.connectedPromise.catch(() => { }); } private attachNetworkListeners(): void { @@ -247,7 +296,9 @@ export class LoroWebsocketClient { // Emit current immediately to inform subscribers try { cb(this.status); - } catch {} + } catch (err) { + this.logCbError("onStatusChange", err); + } return () => this.statusListeners.delete(cb); } @@ -257,7 +308,9 @@ export class LoroWebsocketClient { if (this.lastLatencyMs != null) { try { cb(this.lastLatencyMs); - } catch {} + } catch (err) { + this.logCbError("onLatency", err); + } } return () => this.latencyListeners.delete(cb); } @@ -269,12 +322,17 @@ export class LoroWebsocketClient { for (const cb of listeners) { try { cb(s); - } catch {} + } catch (err) { + this.logCbError("onStatusChange", err); + } } } /** Initiate or resume connection. Resolves when `Connected`. */ - async connect(): Promise { + async connect(opts?: { resetBackoff?: boolean }): Promise { + if (opts?.resetBackoff) { + this.reconnectAttempts = 0; + } // Ensure future unexpected closes will auto-reconnect again this.shouldReconnect = true; const current = this.ws; @@ -299,9 +357,7 @@ export class LoroWebsocketClient { this.attachSocketListeners(ws); - try { - ws.binaryType = "arraybuffer"; - } catch {} + ws.binaryType = "arraybuffer"; return this.connectedPromise; } @@ -339,7 +395,7 @@ export class LoroWebsocketClient { this.detachSocketListeners(ws); try { ws.close(1000, "Superseded"); - } catch {} + } catch { } return; } this.clearReconnectTimer(); @@ -349,6 +405,8 @@ export class LoroWebsocketClient { this.resolveConnected?.(); // Rejoin rooms after reconnect this.rejoinActiveRooms(); + // Flush any queued joins that were requested while connecting + this.flushQueuedJoins(); } private onSocketError(ws: WebSocket, _event: Event): void { @@ -366,22 +424,18 @@ export class LoroWebsocketClient { } const closeCode = event?.code; - if (closeCode != null && closeCode >= 4400 && closeCode < 4500) { - this.shouldReconnect = false; - } - const closeReason = event?.reason; - if (closeReason === "permission_changed" || closeReason === "room_closed") { + + if (this.isFatalClose(closeCode, closeReason)) { this.shouldReconnect = false; } this.clearPingTimer(); + this.missedPongs = 0; // Clear any pending fragment reassembly timers to avoid late callbacks if (this.fragmentBatches.size) { for (const [, batch] of this.fragmentBatches) { - try { - clearTimeout(batch.timeoutId); - } catch {} + clearTimeout(batch.timeoutId); } this.fragmentBatches.clear(); } @@ -389,11 +443,36 @@ export class LoroWebsocketClient { this.awaitingPongSince = undefined; this.ops.onWsClose?.(); this.rejectAllPingWaiters(new Error("WebSocket closed")); + const maxAttempts = this.getReconnectPolicy().maxAttempts; + if ( + typeof maxAttempts === "number" && + maxAttempts > 0 && + this.reconnectAttempts >= maxAttempts + ) { + this.shouldReconnect = false; + } + + // Update room-level status based on whether we will retry + for (const [id] of this.activeRooms) { + if (this.shouldReconnect) { + this.emitRoomStatus(id, RoomJoinStatus.Reconnecting); + } else { + this.emitRoomStatus(id, RoomJoinStatus.Disconnected); + } + } + if (!this.shouldReconnect) { this.setStatus(ClientStatus.Disconnected); this.rejectConnected?.(new Error("Disconnected")); + // Fail all pending joins and mark rooms disconnected/error + const err = new Error( + closeReason ? `Disconnected: ${closeReason}` : "Disconnected" + ); + this.failAllPendingRooms(err, this.shouldReconnect ? RoomJoinStatus.Reconnecting : RoomJoinStatus.Disconnected); return; } + // Renew the promise so callers waiting on waitConnected() block until the next successful reconnect. + this.ensureConnectedPromise(); // Start (or continue) exponential backoff retries this.setStatus(ClientStatus.Disconnected); this.scheduleReconnect(); @@ -408,9 +487,7 @@ export class LoroWebsocketClient { } if (typeof event.data === "string") { if (event.data === "ping") { - try { - ws.send("pong"); - } catch {} + ws.send("pong"); return; } if (event.data === "pong") { @@ -426,10 +503,11 @@ export class LoroWebsocketClient { private scheduleReconnect(immediate = false) { if (this.reconnectTimer) return; + if (this.offline) return; + const policy = this.getReconnectPolicy(); + if (!policy.enabled) return; const attempt = ++this.reconnectAttempts; - const base = 500; // ms - const max = 15_000; // ms - const delay = immediate ? 0 : Math.min(max, base * 2 ** (attempt - 1)); + const delay = immediate ? 0 : this.computeBackoffDelay(attempt); this.reconnectTimer = setTimeout(() => { this.reconnectTimer = undefined; void this.connect(); @@ -442,6 +520,7 @@ export class LoroWebsocketClient { } private handleOnline = () => { + this.offline = false; if (!this.shouldReconnect) return; if (this.status === ClientStatus.Connected) return; this.clearReconnectTimer(); @@ -449,13 +528,14 @@ export class LoroWebsocketClient { }; private handleOffline = () => { + this.offline = true; // Pause scheduled retries until online this.clearReconnectTimer(); if (this.shouldReconnect) { this.setStatus(ClientStatus.Disconnected); try { this.ws?.close(1001, "Offline"); - } catch {} + } catch { } } }; @@ -472,18 +552,26 @@ export class LoroWebsocketClient { room: roomPromise, resolve: (res: JoinResponseOk) => { // On successful rejoin, let adaptor reconcile to server - adaptor.handleJoinOk(res).catch(e => { - console.error(e); - }); - // Clean up pending entry for this id - this.pendingRooms.delete(id); + adaptor + .handleJoinOk(res) + .catch(e => { + console.error(e); + }) + .finally(() => { + this.pendingRooms.delete(id); + this.emitRoomStatus(id, RoomJoinStatus.Joined); + }); }, reject: (error: Error) => { console.error("Rejoin failed:", error); + // Remove stale room entry so callers can decide to rejoin manually + this.cleanupRoom(roomId, adaptor.crdtType); + this.emitRoomStatus(id, RoomJoinStatus.Error); }, adaptor, roomId, auth: this.roomAuth.get(id), + isRejoin: true, }; this.pendingRooms.set(id, pending); @@ -497,6 +585,7 @@ export class LoroWebsocketClient { version: adaptor.getVersion(), } as JoinRequest) ); + this.emitRoomStatus(id, RoomJoinStatus.Reconnecting); } catch (e) { console.error("Failed to send rejoin request:", e); } @@ -588,7 +677,7 @@ export class LoroWebsocketClient { } as UpdateError) ); } - } catch {} + } catch { } }, 10000); this.fragmentBatches.set(batchKey, { @@ -671,6 +760,7 @@ export class LoroWebsocketClient { } this.pendingRooms.delete(id); + this.emitRoomStatus(id, RoomJoinStatus.Joined); } private async handleJoinError( @@ -714,7 +804,16 @@ export class LoroWebsocketClient { } // No retry possible, reject the promise - pending.reject(new Error(`Join failed: ${msg.code} - ${msg.message}`)); + const err = new Error(`Join failed: ${msg.code} - ${msg.message}`); + this.emitRoomStatus( + pending.adaptor.crdtType + pending.roomId, + RoomJoinStatus.Error + ); + // Remove active room references so caller can rejoin manually if this was a rejoin + if (pending.isRejoin) { + this.cleanupRoom(pending.roomId, pending.adaptor.crdtType); + } + pending.reject(err); this.pendingRooms.delete(roomId); } @@ -725,6 +824,7 @@ export class LoroWebsocketClient { this.roomAdaptors.delete(id); this.roomIds.delete(id); this.roomAuth.delete(id); + this.roomStatusListeners.delete(id); } waitConnected() { @@ -777,10 +877,12 @@ export class LoroWebsocketClient { roomId, crdtAdaptor, auth, + onStatusChange, }: { roomId: string; crdtAdaptor: CrdtDocAdaptor; auth?: Uint8Array; + onStatusChange?: (s: RoomJoinStatusValue) => void; }): Promise { const id = crdtAdaptor.crdtType + roomId; // Check if already joining or joined @@ -802,6 +904,16 @@ export class LoroWebsocketClient { reject = reject_; }); + if (onStatusChange) { + let set = this.roomStatusListeners.get(id); + if (!set) { + set = new Set(); + this.roomStatusListeners.set(id, set); + } + set.add(onStatusChange); + } + this.emitRoomStatus(id, RoomJoinStatus.Connecting); + const room = response.then(res => { // Set adaptor ctx first so it's ready to send updates crdtAdaptor.setCtx({ @@ -868,15 +980,21 @@ export class LoroWebsocketClient { }); this.roomAuth.set(id, auth); - this.ws.send( - encode({ - type: MessageType.JoinRequest, - crdt: crdtAdaptor.crdtType, - roomId, - auth: auth ?? new Uint8Array(), - version: crdtAdaptor.getVersion(), - } as JoinRequest) - ); + const joinPayload = encode({ + type: MessageType.JoinRequest, + crdt: crdtAdaptor.crdtType, + roomId, + auth: auth ?? new Uint8Array(), + version: crdtAdaptor.getVersion(), + } as JoinRequest); + + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.send(joinPayload); + } else { + this.enqueueJoin(joinPayload); + // ensure a connection attempt is running + void this.connect(); + } return room; } @@ -898,7 +1016,7 @@ export class LoroWebsocketClient { for (const [, batch] of this.fragmentBatches) { try { clearTimeout(batch.timeoutId); - } catch {} + } catch { } } this.fragmentBatches.clear(); } @@ -907,6 +1025,7 @@ export class LoroWebsocketClient { if (ws && this.socketListeners.has(ws)) { this.ops.onWsClose?.(); } + this.queuedJoins = []; this.detachSocketListeners(ws); this.flushAndCloseWebSocket(ws, { code: 1000, @@ -1005,7 +1124,7 @@ export class LoroWebsocketClient { for (const [, batch] of this.fragmentBatches) { try { clearTimeout(batch.timeoutId); - } catch {} + } catch { } } this.fragmentBatches.clear(); } @@ -1014,18 +1133,20 @@ export class LoroWebsocketClient { if (ws && this.socketListeners.has(ws)) { this.ops.onWsClose?.(); } + this.queuedJoins = []; this.detachSocketListeners(ws); try { this.removeNetworkListeners?.(); - } catch {} + } catch { } this.removeNetworkListeners = undefined; + this.roomStatusListeners.clear(); // Close websocket after flushing pending frames try { this.flushAndCloseWebSocket(ws, { code: 1000, reason: "Client destroyed", }); - } catch {} + } catch { } this.setStatus(ClientStatus.Disconnected); } @@ -1042,9 +1163,7 @@ export class LoroWebsocketClient { }; if (readBufferedAmount() == null) { - try { - ws.close(code, reason); - } catch {} + ws.close(code, reason); return; } @@ -1055,9 +1174,7 @@ export class LoroWebsocketClient { const state = ws.readyState; if (state === WebSocket.CLOSED || state === WebSocket.CLOSING) { requested = true; - try { - ws.close(code, reason); - } catch {} + ws.close(code, reason); return; } @@ -1068,9 +1185,7 @@ export class LoroWebsocketClient { Date.now() - start >= timeoutMs ) { requested = true; - try { - ws.close(code, reason); - } catch {} + ws.close(code, reason); return; } @@ -1089,7 +1204,7 @@ export class LoroWebsocketClient { ws.removeEventListener?.("error", handlers.error); ws.removeEventListener?.("close", handlers.close); ws.removeEventListener?.("message", handlers.message); - } catch {} + } catch { } this.socketListeners.delete(ws); } @@ -1098,6 +1213,23 @@ export class LoroWebsocketClient { if (!interval) return; this.clearPingTimer(); this.pingTimer = setInterval(() => { + const now = Date.now(); + const timeoutMs = Math.max(1, this.ops.pingTimeoutMs ?? 10_000); + if ( + this.awaitingPongSince != null && + now - this.awaitingPongSince > timeoutMs + ) { + this.missedPongs += 1; + this.awaitingPongSince = now; + if (this.missedPongs >= 2) { + try { + this.ws?.close(1001, "ping_timeout"); + } catch (err) { + this.logCbError("pingTimer close", err); + } + return; + } + } try { if (this.ws && this.ws.readyState === WebSocket.OPEN) { // Avoid overlapping RTT probes @@ -1108,7 +1240,9 @@ export class LoroWebsocketClient { // Still awaiting a pong; skip sending another ping } } - } catch {} + } catch (err) { + this.logCbError("pingTimer send", err); + } }, interval); } @@ -1127,11 +1261,14 @@ export class LoroWebsocketClient { for (const cb of listeners) { try { cb(rtt); - } catch {} + } catch (err) { + this.logCbError("onLatency", err); + } } } this.awaitingPongSince = undefined; } + this.missedPongs = 0; // Resolve all waiters on any pong if (this.pingWaiters.length > 0) { const waiters = this.pingWaiters.splice(0, this.pingWaiters.length); @@ -1145,7 +1282,105 @@ export class LoroWebsocketClient { try { clearTimeout(w.timeoutId); w.reject(err); - } catch {} + } catch { } + } + } + + /** Manual reconnect helper that resets backoff and attempts immediately. */ + retryNow(): Promise { + return this.connect({ resetBackoff: true }); + } + + private getReconnectPolicy() { + const p = this.ops.reconnect ?? {}; + return { + enabled: p.enabled ?? true, + initialDelayMs: Math.max(1, p.initialDelayMs ?? 500), + maxDelayMs: Math.max(1, p.maxDelayMs ?? 15_000), + jitter: Math.max(0, Math.min(1, p.jitter ?? 0.25)), + maxAttempts: p.maxAttempts ?? "infinite", + fatalCloseCodes: p.fatalCloseCodes ?? [ + 1008, + 1011, + // 4400-4499 + ...Array.from({ length: 100 }, (_, i) => 4400 + i), + ], + fatalCloseReasons: p.fatalCloseReasons ?? [ + "permission_changed", + "room_closed", + "auth_failed", + ], + }; + } + + private computeBackoffDelay(attempt: number): number { + const policy = this.getReconnectPolicy(); + const base = policy.initialDelayMs; + const max = policy.maxDelayMs; + const raw = base * 2 ** Math.max(0, attempt - 1); + const jitterFactor = + 1 + + (policy.jitter === 0 + ? 0 + : (Math.random() * 2 - 1) * policy.jitter); + const withJitter = raw * jitterFactor; + return Math.min(max, Math.max(0, Math.floor(withJitter))); + } + + private logCbError(context: string, err: unknown) { + // eslint-disable-next-line no-console + console.error(`[loro-websocket] ${context} callback threw`, err); + } + + private isFatalClose(code?: number, reason?: string): boolean { + const policy = this.getReconnectPolicy(); + if (code != null && policy.fatalCloseCodes.includes(code)) return true; + if (reason && policy.fatalCloseReasons.includes(reason)) return true; + return false; + } + + private enqueueJoin(payload: Uint8Array) { + this.queuedJoins.push(payload); + } + + private flushQueuedJoins() { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + if (!this.queuedJoins.length) return; + const items = this.queuedJoins.splice(0, this.queuedJoins.length); + for (const payload of items) { + try { + this.ws.send(payload); + } catch (e) { + console.error("Failed to flush queued join:", e); + } + } + } + + private emitRoomStatus(roomKey: string, status: RoomJoinStatusValue) { + const set = this.roomStatusListeners.get(roomKey); + if (!set || set.size === 0) return; + for (const cb of Array.from(set)) { + try { + cb(status); + } catch (err) { + this.logCbError("onRoomStatusChange", err); + } + } + } + + private failAllPendingRooms(err: Error, status: RoomJoinStatusValue) { + const entries = Array.from(this.pendingRooms.entries()); + for (const [id, pending] of entries) { + try { + this.emitRoomStatus(id, status); + } catch { } + try { + pending.reject(err); + } catch { } + try { + this.cleanupRoom(pending.roomId, pending.adaptor.crdtType); + } catch { } + this.pendingRooms.delete(id); } } } @@ -1163,8 +1398,7 @@ export interface LoroWebsocketClientRoom { } class LoroWebsocketClientRoomImpl - implements LoroWebsocketClientRoom, InternalRoomHandler -{ + implements LoroWebsocketClientRoom, InternalRoomHandler { private client: LoroWebsocketClient; private roomId: string; private crdtType: CrdtType; diff --git a/packages/loro-websocket/tests/e2e.test.ts b/packages/loro-websocket/tests/e2e.test.ts index 9e88b1d..ba9c931 100644 --- a/packages/loro-websocket/tests/e2e.test.ts +++ b/packages/loro-websocket/tests/e2e.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect, beforeAll, afterAll } from "vitest"; -import { WebSocket } from "ws"; +import { WebSocket, WebSocketServer } from "ws"; import getPort from "get-port"; import { SimpleServer } from "../src/server/simple-server"; import { LoroWebsocketClient, ClientStatus } from "../src/client"; @@ -308,8 +308,18 @@ describe("E2E: Client-Server Sync", () => { const adaptor1 = new LoroAdaptor(); const adaptor2 = new LoroAdaptor(); - await client1.join({ roomId: "rejoin-room", crdtAdaptor: adaptor1 }); - await client2.join({ roomId: "rejoin-room", crdtAdaptor: adaptor2 }); + const statusLog: Array<{ client: number; status: string }> = []; + + await client1.join({ + roomId: "rejoin-room", + crdtAdaptor: adaptor1, + onStatusChange: s => statusLog.push({ client: 1, status: s }), + }); + await client2.join({ + roomId: "rejoin-room", + crdtAdaptor: adaptor2, + onStatusChange: s => statusLog.push({ client: 2, status: s }), + }); // Stop server so both clients disconnect and attempt to reconnect await server.stop(); @@ -337,6 +347,9 @@ describe("E2E: Client-Server Sync", () => { 50 ); + expect(statusLog.some(e => e.status === "reconnecting")).toBe(true); + expect(statusLog.some(e => e.status === "joined")).toBe(true); + client1.destroy(); client2.destroy(); }, 20000); @@ -357,13 +370,12 @@ describe("E2E: Client-Server Sync", () => { unsubscribe2 = client2.onStatusChange(s => statuses2.push(s)); await Promise.all([client1.waitConnected(), client2.waitConnected()]); - const initialConnectedCount1 = statuses1.filter( + const initialConnected1 = statuses1.filter( s => s === ClientStatus.Connected ).length; - const initialConnectedCount2 = statuses2.filter( + const initialConnected2 = statuses2.filter( s => s === ClientStatus.Connected ).length; - const adaptor1 = new LoroAdaptor(); const adaptor2 = new LoroAdaptor(); @@ -408,9 +420,9 @@ describe("E2E: Client-Server Sync", () => { await waitUntil( () => statuses1.filter(s => s === ClientStatus.Connected).length > - initialConnectedCount1 && + initialConnected1 && statuses2.filter(s => s === ClientStatus.Connected).length > - initialConnectedCount2, + initialConnected2, 5000, 25 ); @@ -446,10 +458,11 @@ describe("E2E: Client-Server Sync", () => { await Promise.all([client1.waitConnected(), client2.waitConnected()]); - const initialConnectedCount1 = statuses1.filter( + // Keep placeholders to mirror earlier assertions; unused now + const _initialConnectedCount1 = statuses1.filter( s => s === ClientStatus.Connected ).length; - const initialConnectedCount2 = statuses2.filter( + const _initialConnectedCount2 = statuses2.filter( s => s === ClientStatus.Connected ).length; @@ -484,6 +497,13 @@ describe("E2E: Client-Server Sync", () => { await new Promise(resolve => setTimeout(resolve, 200)); await server.start(); + // Without an online event, auto-retry should stay paused + await new Promise(resolve => setTimeout(resolve, 400)); + expect(statuses1[statuses1.length - 1]).toBe(ClientStatus.Disconnected); + expect(statuses2[statuses2.length - 1]).toBe(ClientStatus.Disconnected); + + // Manual retry should override the pause + await Promise.all([client1.retryNow(), client2.retryNow()]); await waitUntil( () => client1!.getStatus() === ClientStatus.Connected && @@ -492,18 +512,6 @@ describe("E2E: Client-Server Sync", () => { 50 ); - expect((navigator as { onLine?: boolean }).onLine).toBe(false); - - await waitUntil( - () => - statuses1.filter(s => s === ClientStatus.Connected).length > - initialConnectedCount1 && - statuses2.filter(s => s === ClientStatus.Connected).length > - initialConnectedCount2, - 5000, - 25 - ); - text1.insert(text1.length, " rebound"); adaptor1.getDoc().commit(); await waitUntil(() => text2.toString() === "seed rebound", 5000, 50); @@ -517,6 +525,42 @@ describe("E2E: Client-Server Sync", () => { } }, 20000); + it("emits room status error when auth changes and stops auto rejoin", async () => { + let allowAuth = true; + const authPort = await getPort(); + const authServer = new SimpleServer({ + port: authPort, + authenticate: async (_roomId, _crdt, _auth) => (allowAuth ? "write" : null), + }); + await authServer.start(); + + const client = new LoroWebsocketClient({ url: `ws://localhost:${authPort}` }); + await client.waitConnected(); + const adaptor = new LoroAdaptor(); + const statuses: string[] = []; + await client.join({ + roomId: "auth-room", + crdtAdaptor: adaptor, + auth: new TextEncoder().encode("token"), + onStatusChange: s => statuses.push(s), + }); + + // Take server offline, then disallow auth and bring it back + await authServer.stop(); + allowAuth = false; + await authServer.start(); + + await waitUntil( + () => + statuses.includes("error"), + 8000, + 50 + ); + + client.destroy(); + await authServer.stop(); + }, 15000); + it("destroy rejects pending ping waiters", async () => { const client = new LoroWebsocketClient({ url: `ws://localhost:${port}` }); await client.waitConnected(); @@ -565,6 +609,265 @@ describe("E2E: Client-Server Sync", () => { client.destroy(); }, 15000); + + it("stops auto-retry after fatal close code and retryNow reconnects", async () => { + const fatalPort = await getPort(); + const wss = new WebSocketServer({ port: fatalPort }); + let connections = 0; + wss.on("connection", ws => { + connections++; + const connId = connections; + ws.on("message", (data: Buffer | ArrayBuffer | string) => { + const text = + typeof data === "string" + ? data + : Buffer.isBuffer(data) + ? data.toString() + : new TextDecoder().decode(new Uint8Array(data as ArrayBuffer)); + if (text === "ping") { + // Only respond for non-fatal connections + if (connId >= 2) { + ws.send("pong"); + } + } + }); + if (connId === 1) { + setTimeout(() => { + try { + ws.close(1008, "policy"); + } catch {} + }, 30); + } + }); + + const client = new LoroWebsocketClient({ + url: `ws://localhost:${fatalPort}`, + pingIntervalMs: 50, + pingTimeoutMs: 200, + }); + const statuses: string[] = []; + const off = client.onStatusChange(s => statuses.push(s)); + + await client.waitConnected(); + await waitUntil( + () => statuses.includes(ClientStatus.Disconnected), + 4000, + 25 + ); + + // Ensure it does not auto-retry after fatal close + const lenAfterFatal = statuses.length; + await new Promise(resolve => setTimeout(resolve, 400)); + expect(statuses.length).toBe(lenAfterFatal); + expect(statuses[statuses.length - 1]).toBe(ClientStatus.Disconnected); + + // Manual retry should reconnect + await client.retryNow(); + await waitUntil( + () => client.getStatus() === ClientStatus.Connected, + 4000, + 25 + ); + + off(); + client.destroy(); + wss.close(); + }, 10000); + + it("marks room disconnected when maxAttempts reached", async () => { + const maxPort = await getPort(); + const wss = new WebSocketServer({ port: maxPort }); + wss.on("connection", ws => { + // Immediately close to force retries + ws.close(1011, "boom"); + }); + + const statuses: string[] = []; + const roomStatuses: string[] = []; + const client = new LoroWebsocketClient({ + url: `ws://localhost:${maxPort}`, + reconnect: { maxAttempts: 1, initialDelayMs: 50, maxDelayMs: 50 }, + }); + client.onStatusChange(s => statuses.push(s)); + + const adaptor = new LoroAdaptor(); + const join = client.join({ + roomId: "limited-room", + crdtAdaptor: adaptor, + onStatusChange: s => roomStatuses.push(s), + }); + + await expect(join).rejects.toBeTruthy(); + + await waitUntil( + () => statuses.at(-1) === ClientStatus.Disconnected, + 4000, + 25 + ); + expect(roomStatuses.at(-1)).toBe("disconnected"); + + client.destroy(); + wss.close(); + }, 8000); + + it("queues joins issued while connecting and flushes once connected", async () => { + const queuedPort = await getPort(); + const client = new LoroWebsocketClient({ + url: `ws://localhost:${queuedPort}`, + pingIntervalMs: 200, // slow ping to avoid noise before server up + }); + const adaptor = new LoroAdaptor(); + const statuses: string[] = []; + + const joinPromise = client.join({ + roomId: "queued-join", + crdtAdaptor: adaptor, + onStatusChange: s => statuses.push(s), + }); + + // Start server after join was requested + await new Promise(resolve => setTimeout(resolve, 200)); + const server = new SimpleServer({ port: queuedPort }); + await server.start(); + + const room = await joinPromise; + const text = adaptor.getDoc().getText("q"); + text.insert(0, "hello"); + adaptor.getDoc().commit(); + await room.waitForReachingServerVersion(); + + expect(statuses).toContain("connecting"); + expect(statuses).toContain("joined"); + + await room.destroy(); + client.destroy(); + await server.stop(); + }, 12000); + + it("emits room joined status exactly once for an initial join", async () => { + const localPort = await getPort(); + const localServer = new SimpleServer({ port: localPort }); + await localServer.start(); + const client = new LoroWebsocketClient({ url: `ws://localhost:${localPort}` }); + try { + await client.waitConnected(); + const adaptor = new LoroAdaptor(); + const statuses: string[] = []; + await client.join({ + roomId: "single-join", + crdtAdaptor: adaptor, + onStatusChange: s => statuses.push(s), + }); + + await waitUntil( + () => statuses.filter(s => s === "joined").length === 1, + 5000, + 25 + ); + await new Promise(resolve => setTimeout(resolve, 50)); + expect(statuses.filter(s => s === "joined").length).toBe(1); + expect(statuses[0]).toBe("connecting"); + } finally { + client.destroy(); + await localServer.stop(); + } + }, 10000); + + it("emits room joined status once per reconnect cycle", async () => { + const localPort = await getPort(); + const localServer = new SimpleServer({ port: localPort }); + await localServer.start(); + const client = new LoroWebsocketClient({ url: `ws://localhost:${localPort}` }); + try { + await client.waitConnected(); + const adaptor = new LoroAdaptor(); + const statuses: string[] = []; + await client.join({ + roomId: "rejoin-once", + crdtAdaptor: adaptor, + onStatusChange: s => statuses.push(s), + }); + + await waitUntil( + () => statuses.filter(s => s === "joined").length === 1, + 5000, + 25 + ); + + await localServer.stop(); + await waitUntil(() => statuses.includes("reconnecting"), 5000, 25); + await new Promise(resolve => setTimeout(resolve, 200)); + await localServer.start(); + + await waitUntil( + () => client.getStatus() === ClientStatus.Connected, + 8000, + 50 + ); + await waitUntil( + () => statuses.filter(s => s === "joined").length === 2, + 8000, + 50 + ); + + expect(statuses.filter(s => s === "joined").length).toBe(2); + expect(statuses.includes("reconnecting")).toBe(true); + } finally { + client.destroy(); + await localServer.stop(); + } + }, 20000); + + it("forces reconnect after ping timeout and recovers when pongs return", async () => { + const pongPort = await getPort(); + const wss = new WebSocketServer({ port: pongPort }); + let connId = 0; + wss.on("connection", ws => { + const id = ++connId; + ws.on("message", (data: Buffer | ArrayBuffer | string) => { + const text = + typeof data === "string" + ? data + : Buffer.isBuffer(data) + ? data.toString() + : new TextDecoder().decode(new Uint8Array(data as ArrayBuffer)); + if (text === "ping") { + if (id >= 2) ws.send("pong"); + // first connection intentionally ignores to trigger timeout + } + }); + }); + + const statuses: string[] = []; + const client = new LoroWebsocketClient({ + url: `ws://localhost:${pongPort}`, + pingIntervalMs: 80, + pingTimeoutMs: 100, + reconnect: { initialDelayMs: 50, maxDelayMs: 200, jitter: 0 }, + }); + client.onStatusChange(s => statuses.push(s)); + + await client.waitConnected(); + await waitUntil( + () => statuses.includes(ClientStatus.Disconnected), + 5000, + 25 + ); + await waitUntil( + () => + statuses.filter(s => s === ClientStatus.Connected).length >= 2 && + connId >= 2, + 6000, + 25 + ); + + // Stay connected for a short window to ensure stability + await new Promise(resolve => setTimeout(resolve, 200)); + expect(client.getStatus()).toBe(ClientStatus.Connected); + + client.destroy(); + wss.close(); + }, 12000); }); function installMockWindow(initialOnline = true) {