diff --git a/lib/adapter.ts b/lib/adapter.ts index 6b32476..70a2bb7 100644 --- a/lib/adapter.ts +++ b/lib/adapter.ts @@ -1,6 +1,5 @@ import type { PrivateSessionId, Session } from "socket.io-adapter"; import { decode, encode } from "@msgpack/msgpack"; -import { commandOptions } from "redis"; import { ClusterAdapter, ClusterAdapterOptions, @@ -8,7 +7,7 @@ import { MessageType, } from "./cluster-adapter"; import debugModule from "debug"; -import { hasBinary } from "./util"; +import { hasBinary, XADD, XREAD } from "./util"; const debug = debugModule("socket.io-redis-streams-adapter"); @@ -61,23 +60,15 @@ export function createAdapter( ); let offset = "$"; let polling = false; + let shouldClose = false; async function poll() { try { - let response = await redisClient.xRead( - commandOptions({ - isolated: true, - }), - [ - { - key: options.streamName, - id: offset, - }, - ], - { - COUNT: options.readCount, - BLOCK: 5000, - } + let response = await XREAD( + redisClient, + options.streamName, + offset, + options.readCount ); if (response) { @@ -98,7 +89,7 @@ export function createAdapter( debug("something went wrong while consuming the stream: %s", e.message); } - if (namespaceToAdapters.size > 0 && redisClient.isOpen) { + if (namespaceToAdapters.size > 0 && !shouldClose) { poll(); } else { polling = false; @@ -111,6 +102,7 @@ export function createAdapter( if (!polling) { polling = true; + shouldClose = false; poll(); } @@ -119,6 +111,10 @@ export function createAdapter( adapter.close = () => { namespaceToAdapters.delete(nsp.name); + if (namespaceToAdapters.size === 0) { + shouldClose = true; + } + defaultClose.call(adapter); }; @@ -141,17 +137,11 @@ class RedisStreamsAdapter extends ClusterAdapter { override doPublish(message: ClusterMessage) { debug("publishing %o", message); - return this.#redisClient.xAdd( + return XADD( + this.#redisClient, this.#opts.streamName, - "*", RedisStreamsAdapter.encode(message), - { - TRIM: { - strategy: "MAXLEN", - strategyModifier: "~", - threshold: this.#opts.maxLen, - }, - } + this.#opts.maxLen ); } diff --git a/lib/util.ts b/lib/util.ts index 03c97e0..a86fcbb 100644 --- a/lib/util.ts +++ b/lib/util.ts @@ -1,4 +1,5 @@ import { randomBytes } from "crypto"; +import { commandOptions } from "redis"; export function hasBinary(obj: any, toJSON?: boolean): boolean { if (!obj || typeof obj !== "object") { @@ -34,3 +35,93 @@ export function hasBinary(obj: any, toJSON?: boolean): boolean { export function randomId() { return randomBytes(8).toString("hex"); } + +/** + * Whether the client comes from the `redis` package + * + * @param redisClient + * + * @see https://github.com/redis/node-redis + */ +function isRedisV4Client(redisClient: any) { + return typeof redisClient.sSubscribe === "function"; +} + +/** + * @see https://redis.io/commands/xread/ + */ +export function XREAD( + redisClient: any, + streamName: string, + offset: string, + readCount: number +) { + if (isRedisV4Client(redisClient)) { + return redisClient.xRead( + commandOptions({ + isolated: true, + }), + [ + { + key: streamName, + id: offset, + }, + ], + { + COUNT: readCount, + BLOCK: 5000, + } + ); + } else { + return redisClient + .xread("BLOCK", 100, "COUNT", readCount, "STREAMS", streamName, offset) + .then((results) => { + if (results === null) { + return null; + } + return [ + { + messages: results[0][1].map((result) => { + const id = result[0]; + const inlineValues = result[1]; + const message = {}; + for (let i = 0; i < inlineValues.length; i += 2) { + message[inlineValues[i]] = inlineValues[i + 1]; + } + return { + id, + message, + }; + }), + }, + ]; + }); + } +} + +/** + * @see https://redis.io/commands/xadd/ + */ +export function XADD( + redisClient: any, + streamName: string, + payload: any, + maxLenThreshold: number +) { + if (isRedisV4Client(redisClient)) { + return redisClient.xAdd(streamName, "*", payload, { + TRIM: { + strategy: "MAXLEN", + strategyModifier: "~", + threshold: maxLenThreshold, + }, + }); + } else { + const args = [streamName, "MAXLEN", "~", maxLenThreshold, "*"]; + Object.keys(payload).forEach((k) => { + args.push(k, payload[k]); + }); + + return redisClient.xadd.call(redisClient, args); + } +} diff --git a/package-lock.json b/package-lock.json index d32e21c..3bbe63a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,6 +17,7 @@ "@types/mocha": "^8.2.1", "@types/node": "^18.15.11", "expect.js": "0.3.1", + "ioredis": "^5.3.2", "mocha": "^10.1.0", "nyc": "^15.1.0", "prettier": "^2.8.7", @@ -278,6 +279,12 @@ "node": ">=12" } }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==", + "dev": true + }, "node_modules/@istanbuljs/load-nyc-config": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@istanbuljs/load-nyc-config/-/load-nyc-config-1.1.0.tgz", @@ -863,6 +870,15 @@ "node": ">=8" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "dev": true, + "engines": { + "node": ">=0.10" + } + }, "node_modules/diff": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/diff/-/diff-5.0.0.tgz", @@ -1243,6 +1259,30 @@ "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", "dev": true }, + "node_modules/ioredis": { + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", + "integrity": "sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==", + "dev": true, + "dependencies": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/is-binary-path": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/is-binary-path/-/is-binary-path-2.1.0.tgz", @@ -1534,12 +1574,24 @@ "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==", "dev": true }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "dev": true + }, "node_modules/lodash.flattendeep": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/lodash.flattendeep/-/lodash.flattendeep-4.4.0.tgz", "integrity": "sha1-+wMJF/hqMTTlvJvsDWngAT3f7bI=", "dev": true }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "dev": true + }, "node_modules/log-symbols": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/log-symbols/-/log-symbols-4.1.0.tgz", @@ -2214,6 +2266,27 @@ "@redis/time-series": "1.0.5" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "dev": true, + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "dev": true, + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/release-zalgo": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/release-zalgo/-/release-zalgo-1.0.0.tgz", @@ -2420,6 +2493,12 @@ "integrity": "sha1-BOaSb2YolTVPPdAVIDYzuFcpfiw=", "dev": true }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "dev": true + }, "node_modules/string-width": { "version": "4.2.0", "resolved": "https://registry.npmjs.org/string-width/-/string-width-4.2.0.tgz", @@ -3083,6 +3162,12 @@ "@jridgewell/trace-mapping": "0.3.9" } }, + "@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==", + "dev": true + }, "@istanbuljs/load-nyc-config": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@istanbuljs/load-nyc-config/-/load-nyc-config-1.1.0.tgz", @@ -3551,6 +3636,12 @@ "strip-bom": "^4.0.0" } }, + "denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "dev": true + }, "diff": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/diff/-/diff-5.0.0.tgz", @@ -3825,6 +3916,23 @@ "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", "dev": true }, + "ioredis": { + "version": "5.3.2", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.3.2.tgz", + "integrity": "sha512-1DKMMzlIHM02eBBVOFQ1+AolGjs6+xEcM4PDL7NqOS6szq7H9jSaEkIUH6/a5Hl241LzW6JLSiAbNvTQjUupUA==", + "dev": true, + "requires": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + } + }, "is-binary-path": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/is-binary-path/-/is-binary-path-2.1.0.tgz", @@ -4042,12 +4150,24 @@ "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==", "dev": true }, + "lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "dev": true + }, "lodash.flattendeep": { "version": "4.4.0", "resolved": "https://registry.npmjs.org/lodash.flattendeep/-/lodash.flattendeep-4.4.0.tgz", "integrity": "sha1-+wMJF/hqMTTlvJvsDWngAT3f7bI=", "dev": true }, + "lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "dev": true + }, "log-symbols": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/log-symbols/-/log-symbols-4.1.0.tgz", @@ -4541,6 +4661,21 @@ "@redis/time-series": "1.0.5" } }, + "redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "dev": true + }, + "redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "dev": true, + "requires": { + "redis-errors": "^1.0.0" + } + }, "release-zalgo": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/release-zalgo/-/release-zalgo-1.0.0.tgz", @@ -4705,6 +4840,12 @@ "integrity": "sha1-BOaSb2YolTVPPdAVIDYzuFcpfiw=", "dev": true }, + "standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "dev": true + }, "string-width": { "version": "4.2.0", "resolved": "https://registry.npmjs.org/string-width/-/string-width-4.2.0.tgz", diff --git a/package.json b/package.json index dbca8b4..6e98066 100644 --- a/package.json +++ b/package.json @@ -17,9 +17,11 @@ "format:check": "prettier --parser typescript --check lib/**/*.ts test/**/*.ts", "format:fix": "prettier --parser typescript --write lib/**/*.ts test/**/*.ts", "prepack": "npm run compile", - "test": "npm run format:check && npm run compile && npm run test:redis-standalone && npm run test:redis-cluster", + "test": "npm run format:check && npm run compile && npm run test:redis-standalone && npm run test:redis-cluster && npm run test:ioredis-standalone && npm run test:ioredis-cluster", "test:redis-standalone": "nyc mocha --require ts-node/register test/**/*.ts", - "test:redis-cluster": "REDIS_CLUSTER=1 mocha --require ts-node/register test/**/*.ts" + "test:redis-cluster": "REDIS_CLUSTER=1 mocha --require ts-node/register test/**/*.ts", + "test:ioredis-standalone": "REDIS_LIB=ioredis mocha --require ts-node/register test/**/*.ts", + "test:ioredis-cluster": "REDIS_LIB=ioredis REDIS_CLUSTER=1 mocha --require ts-node/register test/**/*.ts" }, "dependencies": { "@msgpack/msgpack": "~2.8.0", @@ -33,6 +35,7 @@ "@types/mocha": "^8.2.1", "@types/node": "^18.15.11", "expect.js": "0.3.1", + "ioredis": "^5.3.2", "mocha": "^10.1.0", "nyc": "^15.1.0", "prettier": "^2.8.7", diff --git a/test/util.ts b/test/util.ts index 82f018a..7b4dd73 100644 --- a/test/util.ts +++ b/test/util.ts @@ -2,6 +2,7 @@ import { Server } from "socket.io"; import { Socket as ServerSocket } from "socket.io/dist/socket"; import { io as ioc, Socket as ClientSocket } from "socket.io-client"; import { createClient, createCluster } from "redis"; +import { Redis, Cluster } from "ioredis"; import { createServer } from "http"; import { createAdapter } from "../lib"; import { AddressInfo } from "net"; @@ -31,46 +32,78 @@ interface TestContext { cleanup: () => void; } -if (process.env.REDIS_CLUSTER === "1") { - console.log("[INFO] testing in cluster mode"); -} else { - console.log("[INFO] testing in standalone mode"); -} +const mode = process.env.REDIS_CLUSTER === "1" ? "cluster" : "standalone"; +const lib = process.env.REDIS_LIB || "redis"; + +console.log(`[INFO] testing in ${mode} mode with ${lib}`); async function initRedisClient() { if (process.env.REDIS_CLUSTER === "1") { - const redisClient = createCluster({ - rootNodes: [ + if (process.env.REDIS_LIB === "ioredis") { + return new Cluster([ { - url: "redis://localhost:7000", + host: "localhost", + port: 7000, }, { - url: "redis://localhost:7001", + host: "localhost", + port: 7001, }, { - url: "redis://localhost:7002", + host: "localhost", + port: 7002, }, { - url: "redis://localhost:7003", + host: "localhost", + port: 7003, }, { - url: "redis://localhost:7004", + host: "localhost", + port: 7004, }, { - url: "redis://localhost:7005", + host: "localhost", + port: 7005, }, - ], - }); + ]); + } else { + const redisClient = createCluster({ + rootNodes: [ + { + url: "redis://localhost:7000", + }, + { + url: "redis://localhost:7001", + }, + { + url: "redis://localhost:7002", + }, + { + url: "redis://localhost:7003", + }, + { + url: "redis://localhost:7004", + }, + { + url: "redis://localhost:7005", + }, + ], + }); - await redisClient.connect(); + await redisClient.connect(); - return redisClient; + return redisClient; + } } else { - const redisClient = createClient(); + if (process.env.REDIS_LIB === "ioredis") { + return new Redis(); + } else { + const redisClient = createClient(); - await redisClient.connect(); + await redisClient.connect(); - return redisClient; + return redisClient; + } } }