Skip to content

Commit

Permalink
fix: ensure the order of the commands
Browse files Browse the repository at this point in the history
Before this change, the broadcast() method would send the BROADCAST
command and then apply it locally (which is required to retrieve the
offset of the message, when connection state recovery is enabled),
while the other commands like disconnectSockets() would first apply it
locally and then send the command to the other nodes.

So, for example:

```js
io.emit("bye");
io.disconnectSockets();
```

In that case, the clients connected to the io instance would not receive
the "bye" event, while the clients connected to the other server
instances would receive it before being disconnected.

Related:

- socketio/socket.io-redis-streams-adapter#13
- socketio/socket.io-postgres-adapter#12
  • Loading branch information
darrachequesne committed Feb 22, 2024
1 parent 207c0db commit a13f35f
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 37 deletions.
81 changes: 45 additions & 36 deletions lib/cluster-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -503,55 +503,64 @@ export abstract class ClusterAdapter extends Adapter {
super.broadcastWithAck(packet, opts, clientCountCallback, ack);
}

override addSockets(opts: BroadcastOptions, rooms: Room[]) {
super.addSockets(opts, rooms);

override async addSockets(opts: BroadcastOptions, rooms: Room[]) {
const onlyLocal = opts.flags?.local;
if (onlyLocal) {
return;

if (!onlyLocal) {
try {
await this.publishAndReturnOffset({
type: MessageType.SOCKETS_JOIN,
data: {
opts: encodeOptions(opts),
rooms,
},
});
} catch (e) {
debug("[%s] error while publishing message: %s", this.uid, e.message);
}
}

this.publish({
type: MessageType.SOCKETS_JOIN,
data: {
opts: encodeOptions(opts),
rooms,
},
});
super.addSockets(opts, rooms);
}

override delSockets(opts: BroadcastOptions, rooms: Room[]) {
super.delSockets(opts, rooms);

override async delSockets(opts: BroadcastOptions, rooms: Room[]) {
const onlyLocal = opts.flags?.local;
if (onlyLocal) {
return;

if (!onlyLocal) {
try {
await this.publishAndReturnOffset({
type: MessageType.SOCKETS_LEAVE,
data: {
opts: encodeOptions(opts),
rooms,
},
});
} catch (e) {
debug("[%s] error while publishing message: %s", this.uid, e.message);
}
}

this.publish({
type: MessageType.SOCKETS_LEAVE,
data: {
opts: encodeOptions(opts),
rooms,
},
});
super.delSockets(opts, rooms);
}

override disconnectSockets(opts: BroadcastOptions, close: boolean) {
super.disconnectSockets(opts, close);

override async disconnectSockets(opts: BroadcastOptions, close: boolean) {
const onlyLocal = opts.flags?.local;
if (onlyLocal) {
return;

if (!onlyLocal) {
try {
await this.publishAndReturnOffset({
type: MessageType.DISCONNECT_SOCKETS,
data: {
opts: encodeOptions(opts),
close,
},
});
} catch (e) {
debug("[%s] error while publishing message: %s", this.uid, e.message);
}
}

this.publish({
type: MessageType.DISCONNECT_SOCKETS,
data: {
opts: encodeOptions(opts),
close,
},
});
super.disconnectSockets(opts, close);
}

async fetchSockets(opts: BroadcastOptions): Promise<any[]> {
Expand Down
26 changes: 25 additions & 1 deletion test/cluster-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Server, Socket as ServerSocket } from "socket.io";
import { io as ioc, Socket as ClientSocket } from "socket.io-client";
import expect = require("expect.js");
import type { AddressInfo } from "net";
import { times, shouldNotHappen } from "./util";
import { times, shouldNotHappen, sleep } from "./util";
import {
ClusterAdapterWithHeartbeat,
type ClusterMessage,
Expand Down Expand Up @@ -243,6 +243,8 @@ describe("cluster adapter", () => {
it("makes all socket instances join the specified room", async () => {
servers[0].socketsJoin("room1");

await sleep();

expect(serverSockets[0].rooms.has("room1")).to.be(true);
expect(serverSockets[1].rooms.has("room1")).to.be(true);
expect(serverSockets[2].rooms.has("room1")).to.be(true);
Expand All @@ -254,6 +256,8 @@ describe("cluster adapter", () => {

servers[0].in("room1").socketsJoin("room2");

await sleep();

expect(serverSockets[0].rooms.has("room2")).to.be(true);
expect(serverSockets[1].rooms.has("room2")).to.be(false);
expect(serverSockets[2].rooms.has("room2")).to.be(true);
Expand All @@ -275,6 +279,8 @@ describe("cluster adapter", () => {

servers[0].socketsLeave("room1");

await sleep();

expect(serverSockets[0].rooms.has("room1")).to.be(false);
expect(serverSockets[1].rooms.has("room1")).to.be(false);
expect(serverSockets[2].rooms.has("room1")).to.be(false);
Expand All @@ -287,6 +293,8 @@ describe("cluster adapter", () => {

servers[0].in("room1").socketsLeave("room2");

await sleep();

expect(serverSockets[0].rooms.has("room2")).to.be(false);
expect(serverSockets[1].rooms.has("room2")).to.be(false);
expect(serverSockets[2].rooms.has("room2")).to.be(true);
Expand Down Expand Up @@ -318,6 +326,22 @@ describe("cluster adapter", () => {

servers[0].disconnectSockets(true);
});

it("sends a packet before all socket instances disconnect", (done) => {
const partialDone = times(3, done);

clientSockets.forEach((clientSocket) => {
clientSocket.on("disconnect", shouldNotHappen(done));

clientSocket.on("bye", () => {
clientSocket.off("disconnect");
clientSocket.on("disconnect", partialDone);
});
});

servers[0].emit("bye");
servers[0].disconnectSockets(true);
});
});

describe("fetchSockets", () => {
Expand Down
4 changes: 4 additions & 0 deletions test/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ export function times(count: number, fn: () => void) {
export function shouldNotHappen(done) {
return () => done(new Error("should not happen"));
}

export function sleep() {
return new Promise<void>((resolve) => process.nextTick(resolve));
}

0 comments on commit a13f35f

Please sign in to comment.