From 42c8ab6764a3d4c855b27eea35b4e0cda9c34b37 Mon Sep 17 00:00:00 2001 From: Damien Arrachequesne Date: Wed, 10 May 2023 14:31:30 +0200 Subject: [PATCH] fix(sharded): ensure compatibility with ioredis Related: https://github.com/socketio/socket.io-redis-adapter/issues/499 --- lib/sharded-adapter.ts | 40 ++++----------- lib/util.ts | 86 ++++++++++++++++++++++++++++++++ package-lock.json | 108 ++++++++++++++++++++--------------------- package.json | 2 +- test/test-runner.ts | 65 +++++++++++++++++++++++-- 5 files changed, 213 insertions(+), 88 deletions(-) diff --git a/lib/sharded-adapter.ts b/lib/sharded-adapter.ts index 0f19579..2a3667e 100644 --- a/lib/sharded-adapter.ts +++ b/lib/sharded-adapter.ts @@ -1,12 +1,10 @@ import { ClusterAdapter, ClusterMessage, MessageType } from "./cluster-adapter"; import { decode, encode } from "notepack.io"; -import { hasBinary, parseNumSubResponse, sumValues } from "./util"; +import { hasBinary, PUBSUB, SPUBLISH, SSUBSCRIBE, SUNSUBSCRIBE } from "./util"; import debugModule from "debug"; const debug = debugModule("socket.io-redis"); -const RETURN_BUFFERS = true; - export interface ShardedRedisAdapterOptions { /** * The prefix for the Redis Pub/Sub channels. @@ -78,25 +76,21 @@ class ShardedRedisAdapter extends ClusterAdapter { const handler = (message, channel) => this.onRawMessage(message, channel); - this.subClient.sSubscribe(this.channel, handler, RETURN_BUFFERS); - this.subClient.sSubscribe(this.responseChannel, handler, RETURN_BUFFERS); + SSUBSCRIBE(this.subClient, this.channel, handler); + SSUBSCRIBE(this.subClient, this.responseChannel, handler); if (this.opts.subscriptionMode === "dynamic") { this.on("create-room", (room) => { const isPublicRoom = !this.sids.has(room); if (isPublicRoom) { - this.subClient.sSubscribe( - this.dynamicChannel(room), - handler, - RETURN_BUFFERS - ); + SSUBSCRIBE(this.subClient, this.dynamicChannel(room), handler); } }); this.on("delete-room", (room) => { const isPublicRoom = !this.sids.has(room); if (isPublicRoom) { - this.subClient.sUnsubscribe(this.dynamicChannel(room)); + SUNSUBSCRIBE(this.subClient, this.dynamicChannel(room)); } }); } @@ -114,13 +108,13 @@ class ShardedRedisAdapter extends ClusterAdapter { }); } - return this.subClient.sUnsubscribe(channels); + return SUNSUBSCRIBE(this.subClient, channels); } override publishMessage(message) { const channel = this.computeChannel(message); debug("publishing message of type %s to %s", message.type, channel); - this.pubClient.sPublish(channel, this.encode(message)); + SPUBLISH(this.pubClient, channel, this.encode(message)); return Promise.resolve(""); } @@ -147,7 +141,8 @@ class ShardedRedisAdapter extends ClusterAdapter { override publishResponse(requesterUid, response) { debug("publishing response of type %s to %s", response.type, requesterUid); - this.pubClient.sPublish( + SPUBLISH( + this.pubClient, `${this.channel}${requesterUid}#`, this.encode(response) ); @@ -189,21 +184,6 @@ class ShardedRedisAdapter extends ClusterAdapter { } override serverCount(): Promise { - if ( - this.pubClient.constructor.name === "Cluster" || - this.pubClient.isCluster - ) { - return Promise.all( - this.pubClient.nodes().map((node) => { - return node - .sendCommand(["PUBSUB", "SHARDNUMSUB", this.channel]) - .then(parseNumSubResponse); - }) - ).then(sumValues); - } else { - return this.pubClient - .sendCommand(["PUBSUB", "SHARDNUMSUB", this.channel]) - .then(parseNumSubResponse); - } + return PUBSUB(this.pubClient, "SHARDNUMSUB", this.channel); } } diff --git a/lib/util.ts b/lib/util.ts index ce37e5a..023655b 100644 --- a/lib/util.ts +++ b/lib/util.ts @@ -44,3 +44,89 @@ export function sumValues(values) { return acc + val; }, 0); } + +const RETURN_BUFFERS = true; + +/** + * Whether the client comes from the `redis` package + * + * @param redisClient + * + * @see https://github.com/redis/node-redis + */ +function isRedisV4Client(redisClient: any) { + return typeof redisClient.sSubscribe === "function"; +} + +export function SSUBSCRIBE( + redisClient: any, + channel: string, + handler: (rawMessage: Buffer, channel: Buffer) => void +) { + if (isRedisV4Client(redisClient)) { + redisClient.sSubscribe(channel, handler, RETURN_BUFFERS); + } else { + redisClient.ssubscribe(channel); + + redisClient.on("smessageBuffer", (rawChannel, message) => { + if (rawChannel.toString() === channel) { + handler(message, rawChannel); + } + }); + } +} + +export function SUNSUBSCRIBE(redisClient: any, channel: string | string[]) { + if (isRedisV4Client(redisClient)) { + redisClient.sUnsubscribe(channel); + } else { + redisClient.sunsubscribe(channel); + } +} + +export function SPUBLISH( + redisClient: any, + channel: string, + payload: string | Uint8Array +) { + if (isRedisV4Client(redisClient)) { + redisClient.sPublish(channel, payload); + } else { + redisClient.spublish(channel, payload); + } +} + +export function PUBSUB(redisClient: any, arg: string, channel: string) { + if (redisClient.constructor.name === "Cluster" || redisClient.isCluster) { + return Promise.all( + redisClient.nodes().map((node) => { + return node + .sendCommand(["PUBSUB", arg, channel]) + .then(parseNumSubResponse); + }) + ).then(sumValues); + } else if (isRedisV4Client(redisClient)) { + const isCluster = Array.isArray(redisClient.masters); + if (isCluster) { + const nodes = redisClient.masters; + return Promise.all( + nodes.map((node) => { + return node.client + .sendCommand(["PUBSUB", arg, channel]) + .then(parseNumSubResponse); + }) + ).then(sumValues); + } else { + return redisClient + .sendCommand(["PUBSUB", arg, channel]) + .then(parseNumSubResponse); + } + } else { + return new Promise((resolve, reject) => { + redisClient.send_command("PUBSUB", [arg, channel], (err, numSub) => { + if (err) return reject(err); + resolve(parseNumSubResponse(numSub)); + }); + }); + } +} diff --git a/package-lock.json b/package-lock.json index 1708903..3e1d36d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,7 +18,7 @@ "@types/mocha": "^8.2.1", "@types/node": "^14.14.7", "expect.js": "0.3.1", - "ioredis": "^4.0.0", + "ioredis": "^5.3.2", "mocha": "^10.1.0", "nyc": "^15.1.0", "prettier": "^2.8.7", @@ -281,6 +281,12 @@ "node": ">=12" } }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==", + "dev": true + }, "node_modules/@istanbuljs/load-nyc-config": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@istanbuljs/load-nyc-config/-/load-nyc-config-1.1.0.tgz", @@ -886,9 +892,9 @@ } }, "node_modules/denque": { - "version": "1.4.1", - "resolved": "https://registry.npmjs.org/denque/-/denque-1.4.1.tgz", - "integrity": "sha512-OfzPuSZKGcgr96rf1oODnfjqBFmr1DVoc/TrItj3Ohe0Ah1C5WX5Baquw/9U9KovnQ88EqmJbD66rKYUQYN1tQ==", + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", "dev": true, "engines": { "node": ">=0.10" @@ -1318,23 +1324,23 @@ "dev": true }, "node_modules/ioredis": { - "version": "4.17.3", - "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.17.3.tgz", - "integrity": "sha512-iRvq4BOYzNFkDnSyhx7cmJNOi1x/HWYe+A4VXHBu4qpwJaGT1Mp+D2bVGJntH9K/Z/GeOM/Nprb8gB3bmitz1Q==", + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", + "integrity": "sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==", "dev": true, "dependencies": { + "@ioredis/commands": "^1.1.1", "cluster-key-slot": "^1.1.0", - "debug": "^4.1.1", - "denque": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", "lodash.defaults": "^4.2.0", - "lodash.flatten": "^4.4.0", - "redis-commands": "1.5.0", + "lodash.isarguments": "^3.1.0", "redis-errors": "^1.2.0", "redis-parser": "^3.0.0", - "standard-as-callback": "^2.0.1" + "standard-as-callback": "^2.1.0" }, "engines": { - "node": ">=6" + "node": ">=12.22.0" }, "funding": { "type": "opencollective", @@ -1638,18 +1644,18 @@ "integrity": "sha1-0JF4cW/+pN3p5ft7N/bwgCJ0WAw=", "dev": true }, - "node_modules/lodash.flatten": { - "version": "4.4.0", - "resolved": "https://registry.npmjs.org/lodash.flatten/-/lodash.flatten-4.4.0.tgz", - "integrity": "sha1-8xwiIlqWMtK7+OSt2+8kCqdlph8=", - "dev": true - }, "node_modules/lodash.flattendeep": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/lodash.flattendeep/-/lodash.flattendeep-4.4.0.tgz", "integrity": "sha1-+wMJF/hqMTTlvJvsDWngAT3f7bI=", "dev": true }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "dev": true + }, "node_modules/log-symbols": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/log-symbols/-/log-symbols-4.1.0.tgz", @@ -2341,12 +2347,6 @@ "@redis/time-series": "1.0.4" } }, - "node_modules/redis-commands": { - "version": "1.5.0", - "resolved": "https://registry.npmjs.org/redis-commands/-/redis-commands-1.5.0.tgz", - "integrity": "sha512-6KxamqpZ468MeQC3bkWmCB1fp56XL64D4Kf0zJSwDZbVLLm7KFkoIcHrgRvQ+sk8dnhySs7+yBg94yIkAK7aJg==", - "dev": true - }, "node_modules/redis-errors": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", @@ -2640,9 +2640,9 @@ "dev": true }, "node_modules/standard-as-callback": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.0.1.tgz", - "integrity": "sha512-NQOxSeB8gOI5WjSaxjBgog2QFw55FV8TkS6Y07BiB3VJ8xNTvUYm0wl0s8ObgQ5NhdpnNfigMIKjgPESzgr4tg==", + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", "dev": true }, "node_modules/string-width": { @@ -3314,6 +3314,12 @@ "@jridgewell/trace-mapping": "0.3.9" } }, + "@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==", + "dev": true + }, "@istanbuljs/load-nyc-config": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@istanbuljs/load-nyc-config/-/load-nyc-config-1.1.0.tgz", @@ -3802,9 +3808,9 @@ } }, "denque": { - "version": "1.4.1", - "resolved": "https://registry.npmjs.org/denque/-/denque-1.4.1.tgz", - "integrity": "sha512-OfzPuSZKGcgr96rf1oODnfjqBFmr1DVoc/TrItj3Ohe0Ah1C5WX5Baquw/9U9KovnQ88EqmJbD66rKYUQYN1tQ==", + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", "dev": true }, "diff": { @@ -4110,20 +4116,20 @@ "dev": true }, "ioredis": { - "version": "4.17.3", - "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-4.17.3.tgz", - "integrity": "sha512-iRvq4BOYzNFkDnSyhx7cmJNOi1x/HWYe+A4VXHBu4qpwJaGT1Mp+D2bVGJntH9K/Z/GeOM/Nprb8gB3bmitz1Q==", + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", + "integrity": "sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==", "dev": true, "requires": { + "@ioredis/commands": "^1.1.1", "cluster-key-slot": "^1.1.0", - "debug": "^4.1.1", - "denque": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", "lodash.defaults": "^4.2.0", - "lodash.flatten": "^4.4.0", - "redis-commands": "1.5.0", + "lodash.isarguments": "^3.1.0", "redis-errors": "^1.2.0", "redis-parser": "^3.0.0", - "standard-as-callback": "^2.0.1" + "standard-as-callback": "^2.1.0" } }, "is-binary-path": { @@ -4349,18 +4355,18 @@ "integrity": "sha1-0JF4cW/+pN3p5ft7N/bwgCJ0WAw=", "dev": true }, - "lodash.flatten": { - "version": "4.4.0", - "resolved": "https://registry.npmjs.org/lodash.flatten/-/lodash.flatten-4.4.0.tgz", - "integrity": "sha1-8xwiIlqWMtK7+OSt2+8kCqdlph8=", - "dev": true - }, "lodash.flattendeep": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/lodash.flattendeep/-/lodash.flattendeep-4.4.0.tgz", "integrity": "sha1-+wMJF/hqMTTlvJvsDWngAT3f7bI=", "dev": true }, + "lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "dev": true + }, "log-symbols": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/log-symbols/-/log-symbols-4.1.0.tgz", @@ -4871,12 +4877,6 @@ "@redis/time-series": "1.0.4" } }, - "redis-commands": { - "version": "1.5.0", - "resolved": "https://registry.npmjs.org/redis-commands/-/redis-commands-1.5.0.tgz", - "integrity": "sha512-6KxamqpZ468MeQC3bkWmCB1fp56XL64D4Kf0zJSwDZbVLLm7KFkoIcHrgRvQ+sk8dnhySs7+yBg94yIkAK7aJg==", - "dev": true - }, "redis-errors": { "version": "1.2.0", "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", @@ -5107,9 +5107,9 @@ "dev": true }, "standard-as-callback": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.0.1.tgz", - "integrity": "sha512-NQOxSeB8gOI5WjSaxjBgog2QFw55FV8TkS6Y07BiB3VJ8xNTvUYm0wl0s8ObgQ5NhdpnNfigMIKjgPESzgr4tg==", + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", "dev": true }, "string-width": { diff --git a/package.json b/package.json index e2e3249..73303da 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "@types/mocha": "^8.2.1", "@types/node": "^14.14.7", "expect.js": "0.3.1", - "ioredis": "^4.0.0", + "ioredis": "^5.3.2", "mocha": "^10.1.0", "nyc": "^15.1.0", "prettier": "^2.8.7", diff --git a/test/test-runner.ts b/test/test-runner.ts index a2b5e99..e994c63 100644 --- a/test/test-runner.ts +++ b/test/test-runner.ts @@ -2,7 +2,7 @@ import { testSuite as commonTestSuite } from "./index"; import { testSuite as specificsTestSuite } from "./specifics"; import { createAdapter, createShardedAdapter } from "../lib"; import { createClient, createCluster } from "redis"; -import Redis from "ioredis"; +import { Redis, Cluster } from "ioredis"; import { createClient as createClientV3 } from "redis-v3"; const clusterNodes = [ @@ -123,7 +123,7 @@ describe("@socket.io/redis-adapter", () => { describe("ioredis standalone", () => testSuite(async () => { - const pubClient = Redis.createClient(); + const pubClient = new Redis(); const subClient = pubClient.duplicate(); return [ @@ -139,7 +139,7 @@ describe("@socket.io/redis-adapter", () => { describe("ioredis cluster", () => testSuite(async () => { - const pubClient = new Redis.Cluster(clusterNodes); + const pubClient = new Cluster(clusterNodes); const subClient = pubClient.duplicate(); return [ @@ -197,5 +197,64 @@ describe("@socket.io/redis-adapter", () => { true )); + describe("[sharded] redis@4 cluster", () => + testSuite( + async () => { + const pubClient = createCluster({ + rootNodes: clusterNodes, + }); + const subClient = pubClient.duplicate(); + + await Promise.all([pubClient.connect(), subClient.connect()]); + + return [ + createShardedAdapter(pubClient, subClient), + () => { + pubClient.disconnect(); + subClient.disconnect(); + }, + ]; + }, + "redis@4", + true + )); + + describe("[sharded] ioredis standalone", () => + testSuite( + async () => { + const pubClient = new Redis(); + const subClient = pubClient.duplicate(); + + return [ + createShardedAdapter(pubClient, subClient), + () => { + pubClient.disconnect(); + subClient.disconnect(); + }, + ]; + }, + "ioredis", + true + )); + + // FIXME see https://github.com/luin/ioredis/issues/1759 + describe.skip("[sharded] ioredis cluster", () => + testSuite( + async () => { + const pubClient = new Cluster(clusterNodes); + const subClient = pubClient.duplicate(); + + return [ + createShardedAdapter(pubClient, subClient), + () => { + pubClient.disconnect(); + subClient.disconnect(); + }, + ]; + }, + "ioredis", + true + )); + import("./custom-parser"); });