Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add push message handler registration and make all pubsub use it. #2735

Draft
wants to merge 4 commits into
base: v5
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 63 additions & 20 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { SinglyLinkedList, DoublyLinkedNode, DoublyLinkedList } from './linked-list';
import encodeCommand from '../RESP/encoder';
import { Decoder, PUSH_TYPE_MAPPING, RESP_TYPES } from '../RESP/decoder';
import { CommandArguments, TypeMapping, ReplyUnion, RespVersions } from '../RESP/types';
import { ChannelListeners, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { TypeMapping, ReplyUnion, RespVersions, CommandArguments } from '../RESP/types';
import { COMMANDS, ChannelListeners, PUBSUB_TYPE, PubSub, PubSubCommand, PubSubListener, PubSubType, PubSubTypeListeners } from './pub-sub';
import { AbortError, ErrorReply } from '../errors';
import { MonitorCallback } from '.';

Expand Down Expand Up @@ -42,6 +42,8 @@ const RESP2_PUSH_TYPE_MAPPING = {
[RESP_TYPES.SIMPLE_STRING]: Buffer
};

export const pushHandlerError = 'Cannot override built in push message handler';

export default class RedisCommandsQueue {
readonly #respVersion;
readonly #maxLength;
Expand All @@ -51,6 +53,8 @@ export default class RedisCommandsQueue {
#chainInExecution: symbol | undefined;
readonly decoder;
readonly #pubSub = new PubSub();
readonly #pushHandlers: Map<string, (pushMsg: Array<any>) => unknown> = new Map();
readonly #builtInSet: ReadonlySet<string>;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we want to have builtInSet or just "hard-code" it instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the set object would remain, much quicker to lookup in a set than multiple if conditions. not that this is very commonly used code path (i.e. just at registration/unregistration time)


get isPubSubActive() {
return this.#pubSub.isActive;
Expand All @@ -64,6 +68,23 @@ export default class RedisCommandsQueue {
this.#respVersion = respVersion;
this.#maxLength = maxLength;
this.#onShardedChannelMoved = onShardedChannelMoved;

this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString(), this.#pubSub.handleMessageReplyChannel.bind(this.#pubSub));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].subscribe.toString(), this.#handleStatusReply.bind(this));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.CHANNELS].unsubscribe.toString(), this.#handleStatusReply.bind(this));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].message.toString(), this.#pubSub.handleMessageReplyPattern.bind(this.#pubSub));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].subscribe.toString(), this.#handleStatusReply.bind(this));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.PATTERNS].unsubscribe.toString(), this.#handleStatusReply.bind(this));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].message.toString(), this.#pubSub.handleMessageReplySharded.bind(this.#pubSub));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.toString(), this.#handleStatusReply.bind(this));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.toString(), this.#handleShardedUnsubscribe.bind(this));

const s = new Set<string>();
this.#builtInSet = s;
for (const str of this.#pushHandlers.keys()) {
s.add(str);
}

this.decoder = this.#initiateDecoder();
}

Expand All @@ -75,28 +96,52 @@ export default class RedisCommandsQueue {
this.#waitingForReply.shift()!.reject(err);
}

#onPush(push: Array<any>) {
// TODO: type
if (this.#pubSub.handleMessageReply(push)) return true;

const isShardedUnsubscribe = PubSub.isShardedUnsubscribe(push);
if (isShardedUnsubscribe && !this.#waitingForReply.length) {
#handleStatusReply(push: Array<any>) {
const head = this.#waitingForReply.head!.value;
if (
(Number.isNaN(head.channelsCounter!) && push[2] === 0) ||
--head.channelsCounter! === 0
) {
this.#waitingForReply.shift()!.resolve();
}
}

#handleShardedUnsubscribe(push: Array<any>) {
if (!this.#waitingForReply.length) {
const channel = push[1].toString();
this.#onShardedChannelMoved(
channel,
this.#pubSub.removeShardedListeners(channel)
);
return true;
} else if (isShardedUnsubscribe || PubSub.isStatusReply(push)) {
const head = this.#waitingForReply.head!.value;
if (
(Number.isNaN(head.channelsCounter!) && push[2] === 0) ||
--head.channelsCounter! === 0
) {
this.#waitingForReply.shift()!.resolve();
}
} else {
this.#handleStatusReply(push);
}
}

addPushHandler(messageType: string, handler: (pushMsg: Array<any>) => unknown) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either rename to setPushHandler or have support for multiple listeners, I'm not soo sure which one we wanna go for

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. It's an interesting Q (and then how would remove work?) (and would fit with allowing it to exist with current handlers). Let me think.

if (this.#builtInSet.has(messageType)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to block it? if the user wants a "global" listener - why not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is javascript, dont want to let people hang themselves.

throw new Error(pushHandlerError);
}

this.#pushHandlers.set(messageType, handler);
}

removePushHandler(messageType: string) {
if (this.#builtInSet.has(messageType)) {
throw new Error(pushHandlerError);
}

this.#pushHandlers.delete(messageType);
}

#onPush(push: Array<any>) {
const handler = this.#pushHandlers.get(push[0].toString());
if (handler) {
handler(push);
return true;
}

return false;
}

#getTypeMapping() {
Expand All @@ -108,9 +153,7 @@ export default class RedisCommandsQueue {
onReply: reply => this.#onReply(reply),
onErrorReply: err => this.#onErrorReply(err),
onPush: push => {
if (!this.#onPush(push)) {

}
return this.#onPush(push);
},
getTypeMapping: () => this.#getTypeMapping()
});
Expand Down
84 changes: 84 additions & 0 deletions packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec'
import { RESP_TYPES } from '../RESP/decoder';
import { BlobStringReply, NumberReply } from '../RESP/types';
import { SortedSetMember } from '../commands/generic-transformers';
import { COMMANDS, PUBSUB_TYPE } from './pub-sub';
import { pushHandlerError } from './commands-queue';

export const SQUARE_SCRIPT = defineScript({
SCRIPT:
Expand Down Expand Up @@ -769,4 +771,86 @@ describe('Client', () => {
}
}, GLOBAL.SERVERS.OPEN);
});

describe.only('Push Handlers', () => {
testUtils.testWithClient('prevent overriding a built in handler', async client => {
assert.throws(() => {client.addPushHandler(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString(), (push: Array<any>) => {})}, new Error(pushHandlerError));
assert.throws(() => {client.removePushHandler(COMMANDS[PUBSUB_TYPE.CHANNELS].message.toString())}, new Error(pushHandlerError));
}, {
...GLOBAL.SERVERS.OPEN
});

testUtils.testWithClient('RESP2: add/remove invalidate handler, and validate its called', async client => {
const key = 'x'

const duplicate = await client.duplicate().connect();
try {
const id = await duplicate.clientId();

let nodeResolve;

const promise = new Promise((res) => {
nodeResolve = res;
});

duplicate.addPushHandler("invalidate", (push: Array<any>) => {
assert.equal(push[0].toString(), "invalidate");
assert.notEqual(push[1], null);
assert.equal(push[1].length, 1);
assert.equal(push[1][0].toString(), key);
// this test removing the handler,
// as flushAll in cleanup of test will issue a full invalidate,
// which would fail if this handler is called on it
duplicate.removePushHandler("invalidate");
nodeResolve();
})

await client.sendCommand(['CLIENT', 'TRACKING', 'ON', 'REDIRECT', id.toString()]);
await client.get(key);
await client.set(key, '1');

// force an invalidate all
await client.flushAll();

await nodeResolve;
} finally {
duplicate.destroy();
}
}, {
...GLOBAL.SERVERS.OPEN
});

testUtils.testWithClient('RESP3: add/remove invalidate handler, and validate its called', async client => {
const key = 'x'

let nodeResolve;

const promise = new Promise((res) => {
nodeResolve = res;
});

client.addPushHandler("invalidate", (push: Array<any>) => {
assert.equal(push[0].toString(), "invalidate");
assert.equal(push[1].length, 1);
assert.equal(push[1].length, 1);
assert.equal(push[1][0].toString(), key);
// this test removing the handler,
// as flushAll in cleanup of test will issue a full invalidate,
// which would fail if this handler is called on it
client.removePushHandler("invalidate");
nodeResolve();
})

await client.sendCommand(['CLIENT', 'TRACKING', 'ON']);
await client.get(key);
await client.set(key, '1');

await nodeResolve;
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3
}
});
});
});
8 changes: 8 additions & 0 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,14 @@ export default class RedisClient<
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
}

addPushHandler(messageType: string, handler: (pushMsg: Array<any>) => unknown) {
this._self.#queue.addPushHandler(messageType, handler);
}

removePushHandler(messageType: string) {
this._self.#queue.removePushHandler(messageType);
}

sendCommand<T = ReplyUnion>(
args: Array<RedisArgument>,
options?: CommandOptions
Expand Down
49 changes: 23 additions & 26 deletions packages/client/lib/client/pub-sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export type PUBSUB_TYPE = typeof PUBSUB_TYPE;

export type PubSubType = PUBSUB_TYPE[keyof PUBSUB_TYPE];

const COMMANDS = {
export const COMMANDS = {
[PUBSUB_TYPE.CHANNELS]: {
subscribe: Buffer.from('subscribe'),
unsubscribe: Buffer.from('unsubscribe'),
Expand Down Expand Up @@ -344,32 +344,29 @@ export class PubSub {
return commands;
}

handleMessageReply(reply: Array<Buffer>): boolean {
if (COMMANDS[PUBSUB_TYPE.CHANNELS].message.equals(reply[0])) {
this.#emitPubSubMessage(
PUBSUB_TYPE.CHANNELS,
reply[2],
reply[1]
);
return true;
} else if (COMMANDS[PUBSUB_TYPE.PATTERNS].message.equals(reply[0])) {
this.#emitPubSubMessage(
PUBSUB_TYPE.PATTERNS,
reply[3],
reply[2],
reply[1]
);
return true;
} else if (COMMANDS[PUBSUB_TYPE.SHARDED].message.equals(reply[0])) {
this.#emitPubSubMessage(
PUBSUB_TYPE.SHARDED,
reply[2],
reply[1]
);
return true;
}
handleMessageReplyChannel(push: Array<Buffer>) {
this.#emitPubSubMessage(
PUBSUB_TYPE.CHANNELS,
push[2],
push[1]
);
}

return false;
handleMessageReplyPattern(push: Array<Buffer>) {
this.#emitPubSubMessage(
PUBSUB_TYPE.PATTERNS,
push[3],
push[2],
push[1]
);
}

handleMessageReplySharded(push: Array<Buffer>) {
this.#emitPubSubMessage(
PUBSUB_TYPE.SHARDED,
push[2],
push[1]
);
}

removeShardedListeners(channel: string): ChannelListeners {
Expand Down