Skip to content

Commit

Permalink
feat(sharded): add an option for dynamic private channels (#526)
Browse files Browse the repository at this point in the history
Related: #524
  • Loading branch information
MartinKolarik committed Mar 13, 2024
1 parent dc1407f commit 50220f4
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 12 deletions.
42 changes: 30 additions & 12 deletions lib/sharded-adapter.ts
Expand Up @@ -34,13 +34,17 @@ export interface ShardedRedisAdapterOptions {
* The default value, useful when some rooms have a low number of clients (so only a few Socket.IO servers are notified).
*
* Only public rooms (i.e. not related to a particular Socket ID) are taken in account, because:
*
* - a lot of connected clients would mean a lot of subscription/unsubscription
* - the Socket ID attribute is ephemeral
*
* - "dynamic-private"
*
* Like "dynamic" but creates separate channels for private rooms as well. Useful when there is lots of 1:1 communication
* via socket.emit() calls.
*
* @default "dynamic"
*/
subscriptionMode?: "static" | "dynamic";
subscriptionMode?: "static" | "dynamic" | "dynamic-private";
}

/**
Expand Down Expand Up @@ -89,17 +93,18 @@ class ShardedRedisAdapter extends ClusterAdapter {
SSUBSCRIBE(this.subClient, this.channel, handler);
SSUBSCRIBE(this.subClient, this.responseChannel, handler);

if (this.opts.subscriptionMode === "dynamic") {
if (
this.opts.subscriptionMode === "dynamic" ||
this.opts.subscriptionMode === "dynamic-private"
) {
this.on("create-room", (room) => {
const isPublicRoom = !this.sids.has(room);
if (isPublicRoom) {
if (this.shouldUseASeparateNamespace(room)) {
SSUBSCRIBE(this.subClient, this.dynamicChannel(room), handler);
}
});

this.on("delete-room", (room) => {
const isPublicRoom = !this.sids.has(room);
if (isPublicRoom) {
if (this.shouldUseASeparateNamespace(room)) {
SUNSUBSCRIBE(this.subClient, this.dynamicChannel(room));
}
});
Expand All @@ -109,10 +114,12 @@ class ShardedRedisAdapter extends ClusterAdapter {
override close(): Promise<void> | void {
const channels = [this.channel, this.responseChannel];

if (this.opts.subscriptionMode === "dynamic") {
if (
this.opts.subscriptionMode === "dynamic" ||
this.opts.subscriptionMode === "dynamic-private"
) {
this.rooms.forEach((_sids, room) => {
const isPublicRoom = !this.sids.has(room);
if (isPublicRoom) {
if (this.shouldUseASeparateNamespace(room)) {
channels.push(this.dynamicChannel(room));
}
});
Expand All @@ -136,11 +143,13 @@ class ShardedRedisAdapter extends ClusterAdapter {
// broadcast with ack can not use a dynamic channel, because the serverCount() method return the number of all
// servers, not only the ones where the given room exists
const useDynamicChannel =
this.opts.subscriptionMode === "dynamic" &&
message.type === MessageType.BROADCAST &&
message.data.requestId === undefined &&
message.data.opts.rooms.length === 1 &&
!looksLikeASocketId(message.data.opts.rooms[0]);
((this.opts.subscriptionMode === "dynamic" &&
!looksLikeASocketId(message.data.opts.rooms[0])) ||
this.opts.subscriptionMode === "dynamic-private");

if (useDynamicChannel) {
return this.dynamicChannel(message.data.opts.rooms[0]);
} else {
Expand Down Expand Up @@ -204,4 +213,13 @@ class ShardedRedisAdapter extends ClusterAdapter {
override serverCount(): Promise<number> {
return PUBSUB(this.pubClient, "SHARDNUMSUB", this.channel);
}

private shouldUseASeparateNamespace(room: string): boolean {
const isPublicRoom = !this.sids.has(room);

return (
(this.opts.subscriptionMode === "dynamic" && isPublicRoom) ||
this.opts.subscriptionMode === "dynamic-private"
);
}
}
22 changes: 22 additions & 0 deletions test/test-runner.ts
Expand Up @@ -175,6 +175,28 @@ describe("@socket.io/redis-adapter", () => {
true
));

describe("[sharded] redis@4 standalone (dynamic subscription mode & dynamic private channels)", () =>
testSuite(
async () => {
const pubClient = createClient();
const subClient = pubClient.duplicate();

await Promise.all([pubClient.connect(), subClient.connect()]);

return [
createShardedAdapter(pubClient, subClient, {
subscriptionMode: "dynamic-private",
}),
() => {
pubClient.disconnect();
subClient.disconnect();
},
];
},
"redis@4",
true
));

describe("[sharded] redis@4 standalone (static subscription mode)", () =>
testSuite(
async () => {
Expand Down

0 comments on commit 50220f4

Please sign in to comment.