Skip to content

Commit

Permalink
fix(cluster): notify the other nodes when closing
Browse files Browse the repository at this point in the history
The clustered adapter will now:

- periodically check if a node has left the cluster
- send an event when it leaves the cluster

This should reduce the number of "timeout reached: only x responses
received out of y" errors.
  • Loading branch information
darrachequesne committed Feb 20, 2024
1 parent 80af4e9 commit 0e23ff0
Showing 1 changed file with 210 additions and 3 deletions.
213 changes: 210 additions & 3 deletions lib/cluster-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@ export enum MessageType {
SERVER_SIDE_EMIT_RESPONSE,
BROADCAST_CLIENT_COUNT,
BROADCAST_ACK,
ADAPTER_CLOSE,
}

export type ClusterMessage = {
uid: string;
nsp: string;
} & (
| {
type: MessageType.INITIAL_HEARTBEAT | MessageType.HEARTBEAT;
type:
| MessageType.INITIAL_HEARTBEAT
| MessageType.HEARTBEAT
| MessageType.ADAPTER_CLOSE;
}
| {
type: MessageType.BROADCAST;
Expand Down Expand Up @@ -643,11 +647,21 @@ export abstract class ClusterAdapter extends Adapter {
);
}

interface CustomClusterRequest {
type: MessageType;
resolve: Function;
timeout: NodeJS.Timeout;
missingUids: Set<string>;
responses: any[];
}

export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
private readonly _opts: Required<ClusterAdapterOptions>;

private heartbeatTimer: NodeJS.Timeout;
private nodesMap: Map<string, number> = new Map(); // uid => timestamp of last message
private readonly cleanupTimer: NodeJS.Timeout | undefined;
private customRequests: Map<string, CustomClusterRequest> = new Map();

protected constructor(nsp, opts: ClusterAdapterOptions) {
super(nsp);
Expand All @@ -658,9 +672,19 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
},
opts
);
this.cleanupTimer = setInterval(() => {
const now = Date.now();
this.nodesMap.forEach((lastSeen, uid) => {
const nodeSeemsDown = now - lastSeen > this._opts.heartbeatTimeout;
if (nodeSeemsDown) {
debug("node %s seems down", uid);
this.removeNode(uid);
}
});
}, 1_000);
}

override init(): Promise<void> | void {
override init() {
this.publish({
type: MessageType.INITIAL_HEARTBEAT,
});
Expand All @@ -677,8 +701,14 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
}, this._opts.heartbeatInterval);
}

override close(): Promise<void> | void {
override close() {
this.publish({
type: MessageType.ADAPTER_CLOSE,
});
clearTimeout(this.heartbeatTimer);
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer);
}
}

override async onMessage(message: ClusterMessage, offset?: string) {
Expand All @@ -700,6 +730,9 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {
case MessageType.HEARTBEAT:
// nothing to do
break;
case MessageType.ADAPTER_CLOSE:
this.removeNode(message.uid);
break;
default:
super.onMessage(message, offset);
}
Expand All @@ -722,4 +755,178 @@ export abstract class ClusterAdapterWithHeartbeat extends ClusterAdapter {

return super.publish(message);
}

override async serverSideEmit(packet: any[]) {
const withAck = typeof packet[packet.length - 1] === "function";

if (!withAck) {
return this.publish({
type: MessageType.SERVER_SIDE_EMIT,
data: {
packet,
},
});
}

const ack = packet.pop();
const expectedResponseCount = this.nodesMap.size;

debug(
'waiting for %d responses to "serverSideEmit" request',
expectedResponseCount
);

if (expectedResponseCount <= 0) {
return ack(null, []);
}

const requestId = randomId();

const timeout = setTimeout(() => {
const storedRequest = this.customRequests.get(requestId);
if (storedRequest) {
ack(
new Error(
`timeout reached: missing ${storedRequest.missingUids.size} responses`
),
storedRequest.responses
);
this.customRequests.delete(requestId);
}
}, DEFAULT_TIMEOUT);

const storedRequest = {
type: MessageType.SERVER_SIDE_EMIT,
resolve: ack,
timeout,
missingUids: new Set([...this.nodesMap.keys()]),
responses: [],
};
this.customRequests.set(requestId, storedRequest);

this.publish({
type: MessageType.SERVER_SIDE_EMIT,
data: {
requestId, // the presence of this attribute defines whether an acknowledgement is needed
packet,
},
});
}

override async fetchSockets(opts: BroadcastOptions): Promise<any[]> {
const [localSockets, serverCount] = await Promise.all([
super.fetchSockets({
rooms: opts.rooms,
except: opts.except,
flags: {
local: true,
},
}),
this.serverCount(),
]);
const expectedResponseCount = serverCount - 1;

if (opts.flags?.local || expectedResponseCount <= 0) {
return localSockets as any[];
}

const requestId = randomId();

return new Promise<any[]>((resolve, reject) => {
const timeout = setTimeout(() => {
const storedRequest = this.customRequests.get(requestId);
if (storedRequest) {
reject(
new Error(
`timeout reached: missing ${storedRequest.missingUids.size} responses`
)
);
this.customRequests.delete(requestId);
}
}, opts.flags.timeout || DEFAULT_TIMEOUT);

const storedRequest = {
type: MessageType.FETCH_SOCKETS,
resolve,
timeout,
missingUids: new Set([...this.nodesMap.keys()]),
responses: localSockets as any[],
};
this.customRequests.set(requestId, storedRequest);

this.publish({
type: MessageType.FETCH_SOCKETS,
data: {
opts: encodeOptions(opts),
requestId,
},
});
});
}

override onResponse(response: ClusterResponse) {
const requestId = response.data.requestId;

debug("received response %s to request %s", response.type, requestId);

switch (response.type) {
case MessageType.FETCH_SOCKETS_RESPONSE: {
const request = this.customRequests.get(requestId);

if (!request) {
return;
}

(response.data.sockets as any[]).forEach((socket) =>
request.responses.push(socket)
);

request.missingUids.delete(response.uid);
if (request.missingUids.size === 0) {
clearTimeout(request.timeout);
request.resolve(request.responses);
this.customRequests.delete(requestId);
}
break;
}

case MessageType.SERVER_SIDE_EMIT_RESPONSE: {
const request = this.customRequests.get(requestId);

if (!request) {
return;
}

request.responses.push(response.data.packet);

request.missingUids.delete(response.uid);
if (request.missingUids.size === 0) {
clearTimeout(request.timeout);
request.resolve(null, request.responses);
this.customRequests.delete(requestId);
}
break;
}

default:
super.onResponse(response);
}
}

private removeNode(uid: string) {
this.customRequests.forEach((request, requestId) => {
request.missingUids.delete(uid);
if (request.missingUids.size === 0) {
clearTimeout(request.timeout);
if (request.type === MessageType.FETCH_SOCKETS) {
request.resolve(request.responses);
} else if (request.type === MessageType.SERVER_SIDE_EMIT) {
request.resolve(null, request.responses);
}
this.customRequests.delete(requestId);
}
});

this.nodesMap.delete(uid);
}
}

0 comments on commit 0e23ff0

Please sign in to comment.