Skip to content

Commit

Permalink
feat: implement connection state recovery
Browse files Browse the repository at this point in the history
More information about how this feature is supposed to work will be
provided in the main repository.
  • Loading branch information
darrachequesne committed Jan 5, 2023
1 parent d5c56d4 commit f529412
Show file tree
Hide file tree
Showing 5 changed files with 458 additions and 10 deletions.
65 changes: 65 additions & 0 deletions lib/contrib/yeast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// imported from https://github.com/unshiftio/yeast
"use strict";

const alphabet =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-_".split(
""
),
length = 64,
map = {};
let seed = 0,
i = 0,
prev;

/**
* Return a string representing the specified number.
*
* @param {Number} num The number to convert.
* @returns {String} The string representation of the number.
* @api public
*/
export function encode(num) {
let encoded = "";

do {
encoded = alphabet[num % length] + encoded;
num = Math.floor(num / length);
} while (num > 0);

return encoded;
}

/**
* Return the integer value specified by the given string.
*
* @param {String} str The string to convert.
* @returns {Number} The integer value represented by the string.
* @api public
*/
export function decode(str) {
let decoded = 0;

for (i = 0; i < str.length; i++) {
decoded = decoded * length + map[str.charAt(i)];
}

return decoded;
}

/**
* Yeast: A tiny growing id generator.
*
* @returns {String} A unique id.
* @api public
*/
export function yeast() {
const now = encode(+new Date());

if (now !== prev) return (seed = 0), (prev = now);
return now + "." + encode(seed++);
}

//
// Map each character to its index.
//
for (; i < length; i++) map[alphabet[i]] = i;
147 changes: 147 additions & 0 deletions lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import { EventEmitter } from "events";
import { yeast } from "./contrib/yeast";

/**
* A public ID, sent by the server at the beginning of the Socket.IO session and which can be used for private messaging
*/
export type SocketId = string;
/**
* A private ID, sent by the server at the beginning of the Socket.IO session and used for connection state recovery
* upon reconnection
*/
export type PrivateSessionId = string;

// we could extend the Room type to "string | number", but that would be a breaking change
// related: https://github.com/socketio/socket.io-redis-adapter/issues/418
export type Room = string;
Expand All @@ -20,6 +30,15 @@ export interface BroadcastOptions {
flags?: BroadcastFlags;
}

interface SessionToPersist {
sid: SocketId;
pid: PrivateSessionId;
rooms: Room[];
data: unknown;
}

export type Session = SessionToPersist & { missedPackets: unknown[][] };

export class Adapter extends EventEmitter {
public rooms: Map<Room, Set<SocketId>> = new Map();
public sids: Map<SocketId, Set<Room>> = new Map();
Expand Down Expand Up @@ -331,4 +350,132 @@ export class Adapter extends EventEmitter {
"this adapter does not support the serverSideEmit() functionality"
);
}

/**
* Save the client session in order to restore it upon reconnection.
*/
public persistSession(session: SessionToPersist) {}

/**
* Restore the session and find the packets that were missed by the client.
* @param pid
* @param offset
*/
public restoreSession(
pid: PrivateSessionId,
offset: string
): Promise<Session> {
return null;
}
}

interface PersistedPacket {
id: string;
emittedAt: number;
data: unknown[];
opts: BroadcastOptions;
}

type SessionWithTimestamp = SessionToPersist & { disconnectedAt: number };

export class SessionAwareAdapter extends Adapter {
private readonly maxDisconnectionDuration: number;

private sessions: Map<PrivateSessionId, SessionWithTimestamp> = new Map();
private packets: PersistedPacket[] = [];

constructor(readonly nsp: any) {
super(nsp);
this.maxDisconnectionDuration =
nsp.server.opts.connectionStateRecovery.maxDisconnectionDuration;

const timer = setInterval(() => {
const threshold = Date.now() - this.maxDisconnectionDuration;
this.sessions.forEach((session, sessionId) => {
const hasExpired = session.disconnectedAt < threshold;
if (hasExpired) {
this.sessions.delete(sessionId);
}
});
for (let i = this.packets.length - 1; i >= 0; i--) {
const hasExpired = this.packets[i].emittedAt < threshold;
if (hasExpired) {
this.packets.splice(0, i + 1);
break;
}
}
}, 60 * 1000);
// prevents the timer from keeping the process alive
timer.unref();
}

override persistSession(session: SessionToPersist) {
(session as SessionWithTimestamp).disconnectedAt = Date.now();
this.sessions.set(session.pid, session as SessionWithTimestamp);
}

override restoreSession(
pid: PrivateSessionId,
offset: string
): Promise<Session> {
const session = this.sessions.get(pid);
if (!session) {
// the session may have expired
return null;
}
const hasExpired =
session.disconnectedAt + this.maxDisconnectionDuration < Date.now();
if (hasExpired) {
// the session has expired
this.sessions.delete(pid);
return null;
}
const index = this.packets.findIndex((packet) => packet.id === offset);
if (index === -1) {
// the offset may be too old
return null;
}
const missedPackets = [];
for (let i = index + 1; i < this.packets.length; i++) {
const packet = this.packets[i];
if (shouldIncludePacket(session.rooms, packet.opts)) {
missedPackets.push(packet.data);
}
}
return Promise.resolve({
...session,
missedPackets,
});
}

override broadcast(packet: any, opts: BroadcastOptions) {
const isEventPacket = packet.type === 2;
// packets with acknowledgement are not stored because the acknowledgement function cannot be serialized and
// restored on another server upon reconnection
const withoutAcknowledgement = packet.id === undefined;
const notVolatile = opts.flags?.volatile === undefined;
if (isEventPacket && withoutAcknowledgement && notVolatile) {
const id = yeast();
// the offset is stored at the end of the data array, so the client knows the ID of the last packet it has
// processed (and the format is backward-compatible)
packet.data.push(id);
this.packets.push({
id,
opts,
data: packet.data,
emittedAt: Date.now(),
});
}
super.broadcast(packet, opts);
}
}

function shouldIncludePacket(
sessionRooms: Room[],
opts: BroadcastOptions
): boolean {
const included =
opts.rooms.size === 0 || sessionRooms.some((room) => opts.rooms.has(room));
const notExcluded = sessionRooms.every((room) => !opts.except.has(room));
return included && notExcluded;
}
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"nyc": "^15.1.0",
"prettier": "^2.8.1",
"ts-node": "^10.9.1",
"typescript": "^4.0.3"
"typescript": "^4.9.4"
},
"scripts": {
"test": "npm run format:check && tsc && nyc mocha --require ts-node/register test/index.ts",
Expand Down
Loading

0 comments on commit f529412

Please sign in to comment.