From f5294126a8feec1906bca439443c3864415415fb Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Thu, 5 Jan 2023 08:30:12 +0100 Subject: [PATCH] feat: implement connection state recovery More information about how this feature is supposed to work will be provided in the main repository. --- lib/contrib/yeast.ts | 65 ++++++++++++ lib/index.ts | 147 ++++++++++++++++++++++++++ package-lock.json | 14 +-- package.json | 2 +- test/index.ts | 240 ++++++++++++++++++++++++++++++++++++++++++- 5 files changed, 458 insertions(+), 10 deletions(-) create mode 100644 lib/contrib/yeast.ts diff --git a/lib/contrib/yeast.ts b/lib/contrib/yeast.ts new file mode 100644 index 0000000..3fc0f90 --- /dev/null +++ b/lib/contrib/yeast.ts @@ -0,0 +1,65 @@ +// imported from https://github.com/unshiftio/yeast +"use strict"; + +const alphabet = + "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-_".split( + "" + ), + length = 64, + map = {}; +let seed = 0, + i = 0, + prev; + +/** + * Return a string representing the specified number. + * + * @param {Number} num The number to convert. + * @returns {String} The string representation of the number. + * @api public + */ +export function encode(num) { + let encoded = ""; + + do { + encoded = alphabet[num % length] + encoded; + num = Math.floor(num / length); + } while (num > 0); + + return encoded; +} + +/** + * Return the integer value specified by the given string. + * + * @param {String} str The string to convert. + * @returns {Number} The integer value represented by the string. + * @api public + */ +export function decode(str) { + let decoded = 0; + + for (i = 0; i < str.length; i++) { + decoded = decoded * length + map[str.charAt(i)]; + } + + return decoded; +} + +/** + * Yeast: A tiny growing id generator. + * + * @returns {String} A unique id. + * @api public + */ +export function yeast() { + const now = encode(+new Date()); + + if (now !== prev) return (seed = 0), (prev = now); + return now + "." + encode(seed++); +} + +// +// Map each character to its index. +// +for (; i < length; i++) map[alphabet[i]] = i; diff --git a/lib/index.ts b/lib/index.ts index 87cd92a..33c84b2 100644 --- a/lib/index.ts +++ b/lib/index.ts @@ -1,6 +1,16 @@ import { EventEmitter } from "events"; +import { yeast } from "./contrib/yeast"; +/** + * A public ID, sent by the server at the beginning of the Socket.IO session and which can be used for private messaging + */ export type SocketId = string; +/** + * A private ID, sent by the server at the beginning of the Socket.IO session and used for connection state recovery + * upon reconnection + */ +export type PrivateSessionId = string; + // we could extend the Room type to "string | number", but that would be a breaking change // related: https://github.com/socketio/socket.io-redis-adapter/issues/418 export type Room = string; @@ -20,6 +30,15 @@ export interface BroadcastOptions { flags?: BroadcastFlags; } +interface SessionToPersist { + sid: SocketId; + pid: PrivateSessionId; + rooms: Room[]; + data: unknown; +} + +export type Session = SessionToPersist & { missedPackets: unknown[][] }; + export class Adapter extends EventEmitter { public rooms: Map> = new Map(); public sids: Map> = new Map(); @@ -331,4 +350,132 @@ export class Adapter extends EventEmitter { "this adapter does not support the serverSideEmit() functionality" ); } + + /** + * Save the client session in order to restore it upon reconnection. + */ + public persistSession(session: SessionToPersist) {} + + /** + * Restore the session and find the packets that were missed by the client. + * @param pid + * @param offset + */ + public restoreSession( + pid: PrivateSessionId, + offset: string + ): Promise { + return null; + } +} + +interface PersistedPacket { + id: string; + emittedAt: number; + data: unknown[]; + opts: BroadcastOptions; +} + +type SessionWithTimestamp = SessionToPersist & { disconnectedAt: number }; + +export class SessionAwareAdapter extends Adapter { + private readonly maxDisconnectionDuration: number; + + private sessions: Map = new Map(); + private packets: PersistedPacket[] = []; + + constructor(readonly nsp: any) { + super(nsp); + this.maxDisconnectionDuration = + nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration; + + const timer = setInterval(() => { + const threshold = Date.now() - this.maxDisconnectionDuration; + this.sessions.forEach((session, sessionId) => { + const hasExpired = session.disconnectedAt < threshold; + if (hasExpired) { + this.sessions.delete(sessionId); + } + }); + for (let i = this.packets.length - 1; i >= 0; i--) { + const hasExpired = this.packets[i].emittedAt < threshold; + if (hasExpired) { + this.packets.splice(0, i + 1); + break; + } + } + }, 60 * 1000); + // prevents the timer from keeping the process alive + timer.unref(); + } + + override persistSession(session: SessionToPersist) { + (session as SessionWithTimestamp).disconnectedAt = Date.now(); + this.sessions.set(session.pid, session as SessionWithTimestamp); + } + + override restoreSession( + pid: PrivateSessionId, + offset: string + ): Promise { + const session = this.sessions.get(pid); + if (!session) { + // the session may have expired + return null; + } + const hasExpired = + session.disconnectedAt + this.maxDisconnectionDuration < Date.now(); + if (hasExpired) { + // the session has expired + this.sessions.delete(pid); + return null; + } + const index = this.packets.findIndex((packet) => packet.id === offset); + if (index === -1) { + // the offset may be too old + return null; + } + const missedPackets = []; + for (let i = index + 1; i < this.packets.length; i++) { + const packet = this.packets[i]; + if (shouldIncludePacket(session.rooms, packet.opts)) { + missedPackets.push(packet.data); + } + } + return Promise.resolve({ + ...session, + missedPackets, + }); + } + + override broadcast(packet: any, opts: BroadcastOptions) { + const isEventPacket = packet.type === 2; + // packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and + // restored on another server upon reconnection + const withoutAcknowledgement = packet.id === undefined; + const notVolatile = opts.flags?.volatile === undefined; + if (isEventPacket && withoutAcknowledgement && notVolatile) { + const id = yeast(); + // the offset is stored at the end of the data array, so the client knows the ID of the last packet it has + // processed (and the format is backward-compatible) + packet.data.push(id); + this.packets.push({ + id, + opts, + data: packet.data, + emittedAt: Date.now(), + }); + } + super.broadcast(packet, opts); + } +} + +function shouldIncludePacket( + sessionRooms: Room[], + opts: BroadcastOptions +): boolean { + const included = + opts.rooms.size === 0 || sessionRooms.some((room) => opts.rooms.has(room)); + const notExcluded = sessionRooms.every((room) => !opts.except.has(room)); + return included && notExcluded; } diff --git a/package-lock.json b/package-lock.json index c91b920..bcb9c09 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,7 +16,7 @@ "nyc": "^15.1.0", "prettier": "^2.8.1", "ts-node": "^10.9.1", - "typescript": "^4.0.3" + "typescript": "^4.9.4" } }, "node_modules/@babel/code-frame": { @@ -2390,9 +2390,9 @@ } }, "node_modules/typescript": { - "version": "4.0.3", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.0.3.tgz", - "integrity": "sha512-tEu6DGxGgRJPb/mVPIZ48e69xCn2yRmCgYmDugAVwmJ6o+0u1RI18eO7E7WBTLYLaEVVOhwQmcdhQHweux/WPg==", + "version": "4.9.4", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.4.tgz", + "integrity": "sha512-Uz+dTXYzxXXbsFpM86Wh3dKCxrQqUcVMxwU54orwlJjOpO3ao8L7j5lH+dWfTwgCwIuM9GQ2kvVotzYJMXTBZg==", "dev": true, "bin": { "tsc": "bin/tsc", @@ -4416,9 +4416,9 @@ } }, "typescript": { - "version": "4.0.3", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.0.3.tgz", - "integrity": "sha512-tEu6DGxGgRJPb/mVPIZ48e69xCn2yRmCgYmDugAVwmJ6o+0u1RI18eO7E7WBTLYLaEVVOhwQmcdhQHweux/WPg==", + "version": "4.9.4", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.9.4.tgz", + "integrity": "sha512-Uz+dTXYzxXXbsFpM86Wh3dKCxrQqUcVMxwU54orwlJjOpO3ao8L7j5lH+dWfTwgCwIuM9GQ2kvVotzYJMXTBZg==", "dev": true }, "uuid": { diff --git a/package.json b/package.json index 087c11c..1f67604 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,7 @@ "nyc": "^15.1.0", "prettier": "^2.8.1", "ts-node": "^10.9.1", - "typescript": "^4.0.3" + "typescript": "^4.9.4" }, "scripts": { "test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/index.ts", diff --git a/test/index.ts b/test/index.ts index abd3b58..c13a0db 100644 --- a/test/index.ts +++ b/test/index.ts @@ -1,5 +1,5 @@ -const { Adapter } = require(".."); -const expect = require("expect.js"); +import { Adapter, SessionAwareAdapter } from "../lib"; +import expect = require("expect.js"); describe("socket.io-adapter", () => { it("should add/remove sockets", () => { @@ -274,4 +274,240 @@ describe("socket.io-adapter", () => { done(); }); }); + + describe("connection state recovery", () => { + it("should persist and restore session", async () => { + const adapter = new SessionAwareAdapter({ + server: { + encoder: { + encode(packet) { + return packet; + }, + }, + opts: { + connectionStateRecovery: { + maxDisconnectionDuration: 5000, + }, + }, + }, + }); + + adapter.persistSession({ + sid: "abc", + pid: "def", + data: "ghi", + rooms: ["r1", "r2"], + }); + + const packetData = ["hello"]; + + adapter.broadcast( + { + nsp: "/", + type: 2, + data: packetData, + }, + { + rooms: new Set(), + except: new Set(), + } + ); + + const offset = packetData[1]; + const session = await adapter.restoreSession("def", offset); + + expect(session).to.not.be(null); + expect(session.sid).to.eql("abc"); + expect(session.pid).to.eql("def"); + expect(session.missedPackets).to.eql([]); + }); + + it("should restore missed packets", async () => { + const adapter = new SessionAwareAdapter({ + server: { + encoder: { + encode(packet) { + return packet; + }, + }, + opts: { + connectionStateRecovery: { + maxDisconnectionDuration: 5000, + }, + }, + }, + }); + + adapter.persistSession({ + sid: "abc", + pid: "def", + data: "ghi", + rooms: ["r1", "r2"], + }); + + const packetData = ["hello"]; + + adapter.broadcast( + { + nsp: "/", + type: 2, + data: packetData, + }, + { + rooms: new Set(), + except: new Set(), + } + ); + + adapter.broadcast( + { + nsp: "/", + type: 2, + data: ["all"], + }, + { + rooms: new Set(), + except: new Set(), + } + ); + + adapter.broadcast( + { + nsp: "/", + type: 2, + data: ["room"], + }, + { + rooms: new Set(["r1"]), + except: new Set(), + } + ); + + adapter.broadcast( + { + nsp: "/", + type: 2, + data: ["except"], + }, + { + rooms: new Set(), + except: new Set(["r2"]), + } + ); + + adapter.broadcast( + { + nsp: "/", + type: 2, + data: ["no except"], + }, + { + rooms: new Set(), + except: new Set(["r3"]), + } + ); + + adapter.broadcast( + { + nsp: "/", + type: 2, + data: ["with ack"], + id: 0, + }, + { + rooms: new Set(), + except: new Set(), + } + ); + + adapter.broadcast( + { + nsp: "/", + type: 3, + data: ["ack type"], + }, + { + rooms: new Set(), + except: new Set(), + } + ); + + adapter.broadcast( + { + nsp: "/", + type: 2, + data: ["volatile"], + }, + { + rooms: new Set(), + except: new Set(), + flags: { + volatile: true, + }, + } + ); + + const offset = packetData[1]; + const session = await adapter.restoreSession("def", offset); + + expect(session).to.not.be(null); + expect(session.sid).to.eql("abc"); + expect(session.pid).to.eql("def"); + + expect(session.missedPackets.length).to.eql(3); + expect(session.missedPackets[0].length).to.eql(2); + expect(session.missedPackets[0][0]).to.eql("all"); + expect(session.missedPackets[1][0]).to.eql("room"); + expect(session.missedPackets[2][0]).to.eql("no except"); + }); + + it("should fail to restore an unknown session", async () => { + const adapter = new SessionAwareAdapter({ + server: { + encoder: { + encode(packet) { + return packet; + }, + }, + opts: { + connectionStateRecovery: { + maxDisconnectionDuration: 5000, + }, + }, + }, + }); + + const session = await adapter.restoreSession("abc", "def"); + + expect(session).to.be(null); + }); + + it("should fail to restore a known session with an unknown offset", async () => { + const adapter = new SessionAwareAdapter({ + server: { + encoder: { + encode(packet) { + return packet; + }, + }, + opts: { + connectionStateRecovery: { + maxDisconnectionDuration: 5000, + }, + }, + }, + }); + + adapter.persistSession({ + sid: "abc", + pid: "def", + data: "ghi", + rooms: ["r1", "r2"], + }); + + const session = await adapter.restoreSession("abc", "def"); + + expect(session).to.be(null); + }); + }); });