From 9b1ac4c40df65bcecc5ca6ba7fcdbb497f1e6207 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Thu, 4 Apr 2024 22:14:20 +0300 Subject: [PATCH 1/4] add push message handler registration and make all pubsub use it. --- packages/client/lib/client/commands-queue.ts | 67 +++++++++++++++----- packages/client/lib/client/pub-sub.ts | 49 +++++++------- 2 files changed, 73 insertions(+), 43 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index a4029779fc..0f58ddab52 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,8 +1,8 @@ import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; -import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP/types'; -import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; +import { TypeMapping, ReplyUnion, RespVersions, CommandArguments } from '../RESP/types'; +import { COMMANDS, ChannelListeners, PUBSUB_TYPE, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; import { AbortError, ErrorReply } from '../errors'; import { MonitorCallback } from '.'; @@ -51,6 +51,8 @@ export default class RedisCommandsQueue { #chainInExecution: symbol | undefined; readonly decoder; readonly #pubSub = new PubSub(); + readonly #pushHandlers: Map) => unknown> = new Map(); + readonly #builtInSet = new Set; get isPubSubActive() { return this.#pubSub.isActive; @@ -64,6 +66,21 @@ export default class RedisCommandsQueue { this.#respVersion = respVersion; this.#maxLength = maxLength; this.#onShardedChannelMoved = onShardedChannelMoved; + + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString(), this.#pubSub.handleMessageReplyChannel.bind(this.#pubSub)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.toString(), this.#handleStatusReply.bind(this)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.toString(), this.#handleStatusReply.bind(this)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].message.toString(), this.#pubSub.handleMessageReplyPattern.bind(this.#pubSub)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.toString(), this.#handleStatusReply.bind(this)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.toString(), this.#handleStatusReply.bind(this)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].message.toString(), this.#pubSub.handleMessageReplySharded.bind(this.#pubSub)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.toString(), this.#handleStatusReply.bind(this)); + this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.toString(), this.#handleShardedUnsubscribe.bind(this)); + + for (const str in this.#pushHandlers.keys) { + this.#builtInSet.add(str); + } + this.decoder = this.#initiateDecoder(); } @@ -75,28 +92,44 @@ export default class RedisCommandsQueue { this.#waitingForReply.shift()!.reject(err); } - #onPush(push: Array) { - // TODO: type - if (this.#pubSub.handleMessageReply(push)) return true; - - const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push); - if (isShardedUnsubscribe && !this.#waitingForReply.length) { + #handleStatusReply(push: Array) { + const head = this.#waitingForReply.head!.value; + if ( + (Number.isNaN(head.channelsCounter!) && push[2] === 0) || + --head.channelsCounter! === 0 + ) { + this.#waitingForReply.shift()!.resolve(); + } + } + + #handleShardedUnsubscribe(push: Array) { + if (!this.#waitingForReply.length) { const channel = push[1].toString(); this.#onShardedChannelMoved( channel, this.#pubSub.removeShardedListeners(channel) ); - return true; - } else if (isShardedUnsubscribe || PubSub.isStatusReply(push)) { - const head = this.#waitingForReply.head!.value; - if ( - (Number.isNaN(head.channelsCounter!) && push[2] === 0) || - --head.channelsCounter! === 0 - ) { - this.#waitingForReply.shift()!.resolve(); - } + } else { + this.#handleStatusReply(push); + } + } + + addPushHandler(messageType: string, handler: (pushMsg: Array) => unknown) { + if (this.#builtInSet.has(messageType)) { + throw new Error("Cannot override built in push message handler"); + } + + this.#pushHandlers.set(messageType, handler); + } + + #onPush(push: Array) { + const handler = this.#pushHandlers.get(push[0].toString()); + if (handler) { + handler(push); return true; } + + return false; } #getTypeMapping() { diff --git a/packages/client/lib/client/pub-sub.ts b/packages/client/lib/client/pub-sub.ts index 1387aea841..246707953e 100644 --- a/packages/client/lib/client/pub-sub.ts +++ b/packages/client/lib/client/pub-sub.ts @@ -11,7 +11,7 @@ export type PUBSUB_TYPE = typeof PUBSUB_TYPE; export type PubSubType = PUBSUB_TYPE[keyof PUBSUB_TYPE]; -const COMMANDS = { +export const COMMANDS = { [PUBSUB_TYPE.CHANNELS]: { subscribe: Buffer.from('subscribe'), unsubscribe: Buffer.from('unsubscribe'), @@ -344,32 +344,29 @@ export class PubSub { return commands; } - handleMessageReply(reply: Array): boolean { - if (COMMANDS[PUBSUB_TYPE.CHANNELS].message.equals(reply[0])) { - this.#emitPubSubMessage( - PUBSUB_TYPE.CHANNELS, - reply[2], - reply[1] - ); - return true; - } else if (COMMANDS[PUBSUB_TYPE.PATTERNS].message.equals(reply[0])) { - this.#emitPubSubMessage( - PUBSUB_TYPE.PATTERNS, - reply[3], - reply[2], - reply[1] - ); - return true; - } else if (COMMANDS[PUBSUB_TYPE.SHARDED].message.equals(reply[0])) { - this.#emitPubSubMessage( - PUBSUB_TYPE.SHARDED, - reply[2], - reply[1] - ); - return true; - } + handleMessageReplyChannel(push: Array) { + this.#emitPubSubMessage( + PUBSUB_TYPE.CHANNELS, + push[2], + push[1] + ); + } - return false; + handleMessageReplyPattern(push: Array) { + this.#emitPubSubMessage( + PUBSUB_TYPE.PATTERNS, + push[3], + push[2], + push[1] + ); + } + + handleMessageReplySharded(push: Array) { + this.#emitPubSubMessage( + PUBSUB_TYPE.SHARDED, + push[2], + push[1] + ); } removeShardedListeners(channel: string): ChannelListeners { From 3340d49d48ceead47d08ba10eb20bf7243d9ddc0 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Sun, 7 Apr 2024 00:12:32 +0300 Subject: [PATCH 2/4] enable push handler support to be tested / test it --- packages/client/lib/client/commands-queue.ts | 20 ++++-- packages/client/lib/client/index.spec.ts | 76 ++++++++++++++++++++ packages/client/lib/client/index.ts | 8 +++ 3 files changed, 98 insertions(+), 6 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 0f58ddab52..91b7154d0a 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -52,7 +52,7 @@ export default class RedisCommandsQueue { readonly decoder; readonly #pubSub = new PubSub(); readonly #pushHandlers: Map) => unknown> = new Map(); - readonly #builtInSet = new Set; + readonly #builtInSet: ReadonlySet; get isPubSubActive() { return this.#pubSub.isActive; @@ -76,9 +76,11 @@ export default class RedisCommandsQueue { this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].message.toString(), this.#pubSub.handleMessageReplySharded.bind(this.#pubSub)); this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.toString(), this.#handleStatusReply.bind(this)); this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.toString(), this.#handleShardedUnsubscribe.bind(this)); - + + const s = new Set(); + this.#builtInSet = s; for (const str in this.#pushHandlers.keys) { - this.#builtInSet.add(str); + s.add(str); } this.decoder = this.#initiateDecoder(); @@ -122,6 +124,14 @@ export default class RedisCommandsQueue { this.#pushHandlers.set(messageType, handler); } + removePushHandler(messageType: string) { + if (this.#builtInSet.has(messageType)) { + throw new Error("Cannot override built in push message handler"); + } + + this.#pushHandlers.delete(messageType); + } + #onPush(push: Array) { const handler = this.#pushHandlers.get(push[0].toString()); if (handler) { @@ -141,9 +151,7 @@ export default class RedisCommandsQueue { onReply: reply => this.#onReply(reply), onErrorReply: err => this.#onErrorReply(err), onPush: push => { - if (!this.#onPush(push)) { - - } + return this.#onPush(push); }, getTypeMapping: () => this.#getTypeMapping() }); diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 2fd689b9d7..47cb0d62d0 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -9,6 +9,7 @@ import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec' import { RESP_TYPES } from '../RESP/decoder'; import { BlobStringReply, NumberReply } from '../RESP/types'; import { SortedSetMember } from '../commands/generic-transformers'; +import { createClient } from '../..'; export const SQUARE_SCRIPT = defineScript({ SCRIPT: @@ -769,4 +770,79 @@ describe('Client', () => { } }, GLOBAL.SERVERS.OPEN); }); + + describe('Push Handlers', () => { + testUtils.testWithClient('RESP2: add/remove invalidate handler, and validate its called', async client => { + const key = 'x' + + const duplicate = await client.duplicate().connect(); + try { + const id = await duplicate.clientId(); + + let nodeResolve; + + const promise = new Promise((res) => { + nodeResolve = res; + }); + + duplicate.addPushHandler("invalidate", (push: Array) => { + assert.equal(push[0].toString(), "invalidate"); + assert.notEqual(push[1], null); + assert.equal(push[1].length, 1); + assert.equal(push[1][0].toString(), key); + // this test removing the handler, + // as flushAll in cleanup of test will issue a full invalidate, + // which would fail if this handler is called on it + duplicate.removePushHandler("invalidate"); + nodeResolve(); + }) + + await client.sendCommand(['CLIENT', 'TRACKING', 'ON', 'REDIRECT', id.toString()]); + await client.get(key); + await client.set(key, '1'); + + // force an invalidate all + await client.flushAll(); + + await nodeResolve; + } finally { + duplicate.destroy(); + } + }, { + ...GLOBAL.SERVERS.OPEN + }); + + testUtils.testWithClient('RESP3: add/remove invalidate handler, and validate its called', async client => { + const key = 'x' + + let nodeResolve; + + const promise = new Promise((res) => { + nodeResolve = res; + }); + + client.addPushHandler("invalidate", (push: Array) => { + assert.equal(push[0].toString(), "invalidate"); + assert.equal(push[1].length, 1); + assert.equal(push[1].length, 1); + assert.equal(push[1][0].toString(), key); + // this test removing the handler, + // as flushAll in cleanup of test will issue a full invalidate, + // which would fail if this handler is called on it + client.removePushHandler("invalidate"); + nodeResolve(); + }) + + await client.sendCommand(['CLIENT', 'TRACKING', 'ON']); + await client.get(key); + await client.set(key, '1'); + + await nodeResolve; + }, { + ...GLOBAL.SERVERS.OPEN, + clientOptions: { + RESP: 3 + } + }); + }); }); diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 3efa793eeb..7ffacc4f3f 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -573,6 +573,14 @@ export default class RedisClient< return this as unknown as RedisClientType; } + addPushHandler(messageType: string, handler: (pushMsg: Array) => unknown) { + this._self.#queue.addPushHandler(messageType, handler); + } + + removePushHandler(messageType: string) { + this._self.#queue.removePushHandler(messageType); + } + sendCommand( args: Array, options?: CommandOptions From 775e3a9af9aa9ac66bad023c256f9f25a6a69d5b Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Sun, 7 Apr 2024 01:08:42 +0300 Subject: [PATCH 3/4] add test (/ fix code) to prevent overriding built in push handlers --- packages/client/lib/client/commands-queue.ts | 8 +++++--- packages/client/lib/client/index.spec.ts | 12 ++++++++++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 91b7154d0a..f5c6fc0f4d 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -42,6 +42,8 @@ const RESP2_PUSH_TYPE_MAPPING = { [RESP_TYPES.SIMPLE_STRING]: Buffer }; +export const pushHandlerError = 'Cannot override built in push message handler'; + export default class RedisCommandsQueue { readonly #respVersion; readonly #maxLength; @@ -79,7 +81,7 @@ export default class RedisCommandsQueue { const s = new Set(); this.#builtInSet = s; - for (const str in this.#pushHandlers.keys) { + for (const str of this.#pushHandlers.keys()) { s.add(str); } @@ -118,7 +120,7 @@ export default class RedisCommandsQueue { addPushHandler(messageType: string, handler: (pushMsg: Array) => unknown) { if (this.#builtInSet.has(messageType)) { - throw new Error("Cannot override built in push message handler"); + throw new Error(pushHandlerError); } this.#pushHandlers.set(messageType, handler); @@ -126,7 +128,7 @@ export default class RedisCommandsQueue { removePushHandler(messageType: string) { if (this.#builtInSet.has(messageType)) { - throw new Error("Cannot override built in push message handler"); + throw new Error(pushHandlerError); } this.#pushHandlers.delete(messageType); diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 47cb0d62d0..e7afb796ba 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -9,7 +9,8 @@ import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec' import { RESP_TYPES } from '../RESP/decoder'; import { BlobStringReply, NumberReply } from '../RESP/types'; import { SortedSetMember } from '../commands/generic-transformers'; -import { createClient } from '../..'; +import { COMMANDS, PUBSUB_TYPE } from './pub-sub'; +import { pushHandlerError } from './commands-queue'; export const SQUARE_SCRIPT = defineScript({ SCRIPT: @@ -771,7 +772,14 @@ describe('Client', () => { }, GLOBAL.SERVERS.OPEN); }); - describe('Push Handlers', () => { + describe.only('Push Handlers', () => { + testUtils.testWithClient('prevent overriding a built in handler', async client => { + assert.throws(() => {client.addPushHandler(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString(), (push: Array) => {})}, new Error(pushHandlerError)); + assert.throws(() => {client.removePushHandler(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString())}, new Error(pushHandlerError)); + }, { + ...GLOBAL.SERVERS.OPEN + }); + testUtils.testWithClient('RESP2: add/remove invalidate handler, and validate its called', async client => { const key = 'x' From 330e266c2d4f4da129b25ebb3a2fde12697da15b Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 9 Apr 2024 13:38:01 +0300 Subject: [PATCH 4/4] per github discussion, enable multiple handlers per push message type. discovered that these changes don't make sense for resp2 (no extensible push messages to implement, resp2 implements invaldiate as a pubsub channel) --- packages/client/lib/RESP/encoder.ts | 2 +- packages/client/lib/client/commands-queue.ts | 81 +++++++++++--------- packages/client/lib/client/index.spec.ts | 56 +------------- packages/client/lib/client/index.ts | 8 +- packages/client/lib/client/pub-sub.ts | 18 ++++- packages/client/lib/client/socket.ts | 2 +- 6 files changed, 69 insertions(+), 98 deletions(-) diff --git a/packages/client/lib/RESP/encoder.ts b/packages/client/lib/RESP/encoder.ts index af857711dc..854bedb60a 100644 --- a/packages/client/lib/RESP/encoder.ts +++ b/packages/client/lib/RESP/encoder.ts @@ -2,7 +2,7 @@ import { RedisArgument } from './types'; const CRLF = '\r\n'; -export default function encodeCommand(args: Array): Array { +export default function encodeCommand(args: ReadonlyArray): Array { const toWrite: Array = []; let strings = '*' + args.length + CRLF; diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index f5c6fc0f4d..43faae8330 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,7 +1,7 @@ import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list'; import encodeCommand from '../RESP/encoder'; import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder'; -import { TypeMapping, ReplyUnion, RespVersions, CommandArguments } from '../RESP/types'; +import { TypeMapping, ReplyUnion, RespVersions, RedisArgument } from '../RESP/types'; import { COMMANDS, ChannelListeners, PUBSUB_TYPE, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub'; import { AbortError, ErrorReply } from '../errors'; import { MonitorCallback } from '.'; @@ -17,7 +17,7 @@ export interface CommandOptions { } export interface CommandToWrite extends CommandWaitingForReply { - args: CommandArguments; + args: ReadonlyArray; chainId: symbol | undefined; abort: { signal: AbortSignal; @@ -42,8 +42,6 @@ const RESP2_PUSH_TYPE_MAPPING = { [RESP_TYPES.SIMPLE_STRING]: Buffer }; -export const pushHandlerError = 'Cannot override built in push message handler'; - export default class RedisCommandsQueue { readonly #respVersion; readonly #maxLength; @@ -53,8 +51,7 @@ export default class RedisCommandsQueue { #chainInExecution: symbol | undefined; readonly decoder; readonly #pubSub = new PubSub(); - readonly #pushHandlers: Map) => unknown> = new Map(); - readonly #builtInSet: ReadonlySet; + readonly #pushHandlers: Map) => unknown>> = new Map(); get isPubSubActive() { return this.#pubSub.isActive; @@ -69,22 +66,16 @@ export default class RedisCommandsQueue { this.#maxLength = maxLength; this.#onShardedChannelMoved = onShardedChannelMoved; - this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString(), this.#pubSub.handleMessageReplyChannel.bind(this.#pubSub)); - this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.toString(), this.#handleStatusReply.bind(this)); - this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.toString(), this.#handleStatusReply.bind(this)); - this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].message.toString(), this.#pubSub.handleMessageReplyPattern.bind(this.#pubSub)); - this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.toString(), this.#handleStatusReply.bind(this)); - this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.toString(), this.#handleStatusReply.bind(this)); - this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].message.toString(), this.#pubSub.handleMessageReplySharded.bind(this.#pubSub)); - this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.toString(), this.#handleStatusReply.bind(this)); - this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.toString(), this.#handleShardedUnsubscribe.bind(this)); + this.#addPushHandler(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString(), this.#pubSub.handleMessageReplyChannel.bind(this.#pubSub)); + this.#addPushHandler(COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.toString(), this.#handleStatusReply.bind(this)); + this.#addPushHandler(COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.toString(), this.#handleStatusReply.bind(this)); + this.#addPushHandler(COMMANDS[PUBSUB_TYPE.PATTERNS].message.toString(), this.#pubSub.handleMessageReplyPattern.bind(this.#pubSub)); + this.#addPushHandler(COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.toString(), this.#handleStatusReply.bind(this)); + this.#addPushHandler(COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.toString(), this.#handleStatusReply.bind(this)); + this.#addPushHandler(COMMANDS[PUBSUB_TYPE.SHARDED].message.toString(), this.#pubSub.handleMessageReplySharded.bind(this.#pubSub)); + this.#addPushHandler(COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.toString(), this.#handleStatusReply.bind(this)); + this.#addPushHandler(COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.toString(), this.#handleShardedUnsubscribe.bind(this)); - const s = new Set(); - this.#builtInSet = s; - for (const str of this.#pushHandlers.keys()) { - s.add(str); - } - this.decoder = this.#initiateDecoder(); } @@ -96,7 +87,7 @@ export default class RedisCommandsQueue { this.#waitingForReply.shift()!.reject(err); } - #handleStatusReply(push: Array) { + #handleStatusReply(push: ReadonlyArray) { const head = this.#waitingForReply.head!.value; if ( (Number.isNaN(head.channelsCounter!) && push[2] === 0) || @@ -106,7 +97,7 @@ export default class RedisCommandsQueue { } } - #handleShardedUnsubscribe(push: Array) { + #handleShardedUnsubscribe(push: ReadonlyArray) { if (!this.#waitingForReply.length) { const channel = push[1].toString(); this.#onShardedChannelMoved( @@ -118,26 +109,42 @@ export default class RedisCommandsQueue { } } - addPushHandler(messageType: string, handler: (pushMsg: Array) => unknown) { - if (this.#builtInSet.has(messageType)) { - throw new Error(pushHandlerError); + #addPushHandler(messageType: string, handler: (pushMsg: ReadonlyArray) => unknown) { + let handlerMap = this.#pushHandlers.get(messageType); + if (handlerMap === undefined) { + handlerMap = new Map(); + this.#pushHandlers.set(messageType, handlerMap); } - this.#pushHandlers.set(messageType, handler); + const symbol = Symbol(messageType); + handlerMap.set(symbol, handler); + + return symbol; } - removePushHandler(messageType: string) { - if (this.#builtInSet.has(messageType)) { - throw new Error(pushHandlerError); - } + addPushHandler(messageType: string, handler: (pushMsg: ReadonlyArray) => unknown) { + if (this.#respVersion !== 3) throw new Error("cannot add push handlers to resp2 clients") - this.#pushHandlers.delete(messageType); + return this.#addPushHandler(messageType, handler); + } + + removePushHandler(symbol: Symbol) { + const handlers = this.#pushHandlers.get(symbol.description!); + if (handlers) { + handlers.delete(symbol); + if (handlers.size === 0) { + this.#pushHandlers.delete(symbol.description!); + } + } } #onPush(push: Array) { - const handler = this.#pushHandlers.get(push[0].toString()); - if (handler) { - handler(push); + const handlers = this.#pushHandlers.get(push[0].toString()); + if (handlers) { + for (const handler of handlers.values()) { + handler(push); + } + return true; } @@ -160,7 +167,7 @@ export default class RedisCommandsQueue { } addCommand( - args: CommandArguments, + args: ReadonlyArray, options?: CommandOptions ): Promise { if (this.#maxLength && this.#toWrite.length + this.#waitingForReply.length >= this.#maxLength) { @@ -389,7 +396,7 @@ export default class RedisCommandsQueue { *commandsToWrite() { let toSend = this.#toWrite.shift(); while (toSend) { - let encoded: CommandArguments; + let encoded: ReadonlyArray try { encoded = encodeCommand(toSend.args); } catch (err) { diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index e7afb796ba..8f5852af98 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -10,7 +10,6 @@ import { RESP_TYPES } from '../RESP/decoder'; import { BlobStringReply, NumberReply } from '../RESP/types'; import { SortedSetMember } from '../commands/generic-transformers'; import { COMMANDS, PUBSUB_TYPE } from './pub-sub'; -import { pushHandlerError } from './commands-queue'; export const SQUARE_SCRIPT = defineScript({ SCRIPT: @@ -772,54 +771,7 @@ describe('Client', () => { }, GLOBAL.SERVERS.OPEN); }); - describe.only('Push Handlers', () => { - testUtils.testWithClient('prevent overriding a built in handler', async client => { - assert.throws(() => {client.addPushHandler(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString(), (push: Array) => {})}, new Error(pushHandlerError)); - assert.throws(() => {client.removePushHandler(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString())}, new Error(pushHandlerError)); - }, { - ...GLOBAL.SERVERS.OPEN - }); - - testUtils.testWithClient('RESP2: add/remove invalidate handler, and validate its called', async client => { - const key = 'x' - - const duplicate = await client.duplicate().connect(); - try { - const id = await duplicate.clientId(); - - let nodeResolve; - - const promise = new Promise((res) => { - nodeResolve = res; - }); - - duplicate.addPushHandler("invalidate", (push: Array) => { - assert.equal(push[0].toString(), "invalidate"); - assert.notEqual(push[1], null); - assert.equal(push[1].length, 1); - assert.equal(push[1][0].toString(), key); - // this test removing the handler, - // as flushAll in cleanup of test will issue a full invalidate, - // which would fail if this handler is called on it - duplicate.removePushHandler("invalidate"); - nodeResolve(); - }) - - await client.sendCommand(['CLIENT', 'TRACKING', 'ON', 'REDIRECT', id.toString()]); - await client.get(key); - await client.set(key, '1'); - - // force an invalidate all - await client.flushAll(); - - await nodeResolve; - } finally { - duplicate.destroy(); - } - }, { - ...GLOBAL.SERVERS.OPEN - }); - + describe('Push Handlers', () => { testUtils.testWithClient('RESP3: add/remove invalidate handler, and validate its called', async client => { const key = 'x' @@ -829,7 +781,7 @@ describe('Client', () => { nodeResolve = res; }); - client.addPushHandler("invalidate", (push: Array) => { + const symbol = client.addPushHandler("invalidate", (push: ReadonlyArray) => { assert.equal(push[0].toString(), "invalidate"); assert.equal(push[1].length, 1); assert.equal(push[1].length, 1); @@ -837,7 +789,7 @@ describe('Client', () => { // this test removing the handler, // as flushAll in cleanup of test will issue a full invalidate, // which would fail if this handler is called on it - client.removePushHandler("invalidate"); + client.removePushHandler(symbol); nodeResolve(); }) @@ -845,7 +797,7 @@ describe('Client', () => { await client.get(key); await client.set(key, '1'); - await nodeResolve; + await promise; }, { ...GLOBAL.SERVERS.OPEN, clientOptions: { diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 7ffacc4f3f..43811fde9b 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -573,12 +573,12 @@ export default class RedisClient< return this as unknown as RedisClientType; } - addPushHandler(messageType: string, handler: (pushMsg: Array) => unknown) { - this._self.#queue.addPushHandler(messageType, handler); + addPushHandler(messageType: string, handler: (pushMsg: ReadonlyArray) => unknown) { + return this._self.#queue.addPushHandler(messageType, handler); } - removePushHandler(messageType: string) { - this._self.#queue.removePushHandler(messageType); + removePushHandler(symbol: Symbol) { + this._self.#queue.removePushHandler(symbol); } sendCommand( diff --git a/packages/client/lib/client/pub-sub.ts b/packages/client/lib/client/pub-sub.ts index 246707953e..c5b0ba2409 100644 --- a/packages/client/lib/client/pub-sub.ts +++ b/packages/client/lib/client/pub-sub.ts @@ -344,7 +344,7 @@ export class PubSub { return commands; } - handleMessageReplyChannel(push: Array) { + handleMessageReplyChannel(push: ReadonlyArray) { this.#emitPubSubMessage( PUBSUB_TYPE.CHANNELS, push[2], @@ -352,7 +352,7 @@ export class PubSub { ); } - handleMessageReplyPattern(push: Array) { + handleMessageReplyPattern(push: ReadonlyArray) { this.#emitPubSubMessage( PUBSUB_TYPE.PATTERNS, push[3], @@ -361,7 +361,7 @@ export class PubSub { ); } - handleMessageReplySharded(push: Array) { + handleMessageReplySharded(push: ReadonlyArray) { this.#emitPubSubMessage( PUBSUB_TYPE.SHARDED, push[2], @@ -369,6 +369,18 @@ export class PubSub { ); } + handleMessageReply(reply: Array): boolean { + if (COMMANDS[PUBSUB_TYPE.CHANNELS].message.equals(reply[0])) { + return true; + } else if (COMMANDS[PUBSUB_TYPE.PATTERNS].message.equals(reply[0])) { + return true; + } else if (COMMANDS[PUBSUB_TYPE.SHARDED].message.equals(reply[0])) { + return true; + } + + return false; + } + removeShardedListeners(channel: string): ChannelListeners { const listeners = this.listeners[PUBSUB_TYPE.SHARDED].get(channel)!; this.listeners[PUBSUB_TYPE.SHARDED].delete(channel); diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index dcadad4c3d..384dd7364e 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -271,7 +271,7 @@ export default class RedisSocket extends EventEmitter { }); } - write(iterable: Iterable>) { + write(iterable: Iterable>) { if (!this.#socket) return; this.#socket.cork();