Skip to content

Commit

Permalink
feat: broadcast and expect multiple acks
Browse files Browse the repository at this point in the history
This feature was added in `socket.io@4.5.0`:

```js
io.timeout(1000).emit("some-event", (err, responses) => {
  // ...
});
```

Thanks to this change, it will now work with multiple Socket.IO
servers.

Related: socketio/socket.io@8b20457
  • Loading branch information
darrachequesne committed Apr 28, 2022
1 parent e546e4d commit 829a1f5
Show file tree
Hide file tree
Showing 4 changed files with 369 additions and 95 deletions.
103 changes: 99 additions & 4 deletions lib/index.ts
Expand Up @@ -21,6 +21,8 @@ enum EventType {
FETCH_SOCKETS_RESPONSE,
SERVER_SIDE_EMIT,
SERVER_SIDE_EMIT_RESPONSE,
BROADCAST_CLIENT_COUNT,
BROADCAST_ACK,
}

interface Request {
Expand All @@ -32,6 +34,12 @@ interface Request {
responses: any[];
}

interface AckRequest {
type: EventType.BROADCAST;
clientCountCallback: (clientCount: number) => void;
ack: (...args: any[]) => void;
}

/**
* UID of an emitter using the `@socket.io/postgres-emitter` package
*/
Expand Down Expand Up @@ -151,6 +159,7 @@ export class PostgresAdapter extends Adapter {
private heartbeatTimer: NodeJS.Timeout | undefined;
private cleanupTimer: NodeJS.Timeout | undefined;
private requests: Map<string, Request> = new Map();
private ackRequests: Map<string, AckRequest> = new Map();

/**
* Adapter constructor.
Expand Down Expand Up @@ -271,12 +280,54 @@ export class PostgresAdapter extends Adapter {
}
case EventType.BROADCAST: {
debug("broadcast with opts %j", document.data.opts);
super.broadcast(
document.data.packet,
PostgresAdapter.deserializeOptions(document.data.opts)
);

const withAck = document.data.requestId !== undefined;
if (withAck) {
super.broadcastWithAck(
document.data.packet,
PostgresAdapter.deserializeOptions(document.data.opts),
(clientCount) => {
debug("waiting for %d client acknowledgements", clientCount);
this.publish({
type: EventType.BROADCAST_CLIENT_COUNT,
data: {
requestId: document.data.requestId,
clientCount,
},
});
},
(arg) => {
debug("received acknowledgement with value %j", arg);
this.publish({
type: EventType.BROADCAST_ACK,
data: {
requestId: document.data.requestId,
packet: arg,
},
});
}
);
} else {
super.broadcast(
document.data.packet,
PostgresAdapter.deserializeOptions(document.data.opts)
);
}
break;
}

case EventType.BROADCAST_CLIENT_COUNT: {
const request = this.ackRequests.get(document.data.requestId);
request?.clientCountCallback(document.data.clientCount);
break;
}

case EventType.BROADCAST_ACK: {
const request = this.ackRequests.get(document.data.requestId);
request?.ack(document.data.packet);
break;
}

case EventType.SOCKETS_JOIN: {
debug("calling addSockets with opts %j", document.data.opts);
super.addSockets(
Expand All @@ -285,6 +336,7 @@ export class PostgresAdapter extends Adapter {
);
break;
}

case EventType.SOCKETS_LEAVE: {
debug("calling delSockets with opts %j", document.data.opts);
super.delSockets(
Expand Down Expand Up @@ -419,6 +471,7 @@ export class PostgresAdapter extends Adapter {
if (
[
EventType.BROADCAST,
EventType.BROADCAST_ACK,
EventType.SERVER_SIDE_EMIT,
EventType.SERVER_SIDE_EMIT_RESPONSE,
].includes(document.type) &&
Expand Down Expand Up @@ -506,6 +559,48 @@ export class PostgresAdapter extends Adapter {
});
}

public broadcastWithAck(
packet: any,
opts: BroadcastOptions,
clientCountCallback: (clientCount: number) => void,
ack: (...args: any[]) => void
) {
const onlyLocal = opts?.flags?.local;
if (!onlyLocal) {
const requestId = randomId();

this.publish({
type: EventType.BROADCAST,
data: {
packet,
requestId,
opts: PostgresAdapter.serializeOptions(opts),
},
});

this.ackRequests.set(requestId, {
type: EventType.BROADCAST,
clientCountCallback,
ack,
});

// we have no way to know at this level whether the server has received an acknowledgement from each client, so we
// will simply clean up the ackRequests map after the given delay
setTimeout(() => {
this.ackRequests.delete(requestId);
}, opts.flags!.timeout);
}

// packets with binary contents are modified by the broadcast method, hence the nextTick()
process.nextTick(() => {
super.broadcastWithAck(packet, opts, clientCountCallback, ack);
});
}

public serverCount(): Promise<number> {
return Promise.resolve(1 + this.nodesMap.size);
}

addSockets(opts: BroadcastOptions, rooms: Room[]) {
super.addSockets(opts, rooms);

Expand Down

0 comments on commit 829a1f5

Please sign in to comment.