Skip to content

Commit

Permalink
enable push handler support to be tested / test it
Browse files Browse the repository at this point in the history
  • Loading branch information
sjpotter committed Apr 6, 2024
1 parent 9b1ac4c commit 3340d49
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 6 deletions.
20 changes: 14 additions & 6 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export default class RedisCommandsQueue {
readonly decoder;
readonly #pubSub = new PubSub();
readonly #pushHandlers: Map<string, (pushMsg: Array<any>) => unknown> = new Map();
readonly #builtInSet = new Set<string>;
readonly #builtInSet: ReadonlySet<string>;

get isPubSubActive() {
return this.#pubSub.isActive;
Expand All @@ -76,9 +76,11 @@ export default class RedisCommandsQueue {
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].message.toString(), this.#pubSub.handleMessageReplySharded.bind(this.#pubSub));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].subscribe.toString(), this.#handleStatusReply.bind(this));
this.#pushHandlers.set(COMMANDS[PUBSUB_TYPE.SHARDED].unsubscribe.toString(), this.#handleShardedUnsubscribe.bind(this));


const s = new Set<string>();
this.#builtInSet = s;
for (const str in this.#pushHandlers.keys) {
this.#builtInSet.add(str);
s.add(str);
}

this.decoder = this.#initiateDecoder();
Expand Down Expand Up @@ -122,6 +124,14 @@ export default class RedisCommandsQueue {
this.#pushHandlers.set(messageType, handler);
}

removePushHandler(messageType: string) {
if (this.#builtInSet.has(messageType)) {
throw new Error("Cannot override built in push message handler");
}

this.#pushHandlers.delete(messageType);
}

#onPush(push: Array<any>) {
const handler = this.#pushHandlers.get(push[0].toString());
if (handler) {
Expand All @@ -141,9 +151,7 @@ export default class RedisCommandsQueue {
onReply: reply => this.#onReply(reply),
onErrorReply: err => this.#onErrorReply(err),
onPush: push => {
if (!this.#onPush(push)) {

}
return this.#onPush(push);
},
getTypeMapping: () => this.#getTypeMapping()
});
Expand Down
76 changes: 76 additions & 0 deletions packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { MATH_FUNCTION, loadMathFunction } from '../commands/FUNCTION_LOAD.spec'
import { RESP_TYPES } from '../RESP/decoder';
import { BlobStringReply, NumberReply } from '../RESP/types';
import { SortedSetMember } from '../commands/generic-transformers';
import { createClient } from '../..';

export const SQUARE_SCRIPT = defineScript({
SCRIPT:
Expand Down Expand Up @@ -769,4 +770,79 @@ describe('Client', () => {
}
}, GLOBAL.SERVERS.OPEN);
});

describe('Push Handlers', () => {
testUtils.testWithClient('RESP2: add/remove invalidate handler, and validate its called', async client => {
const key = 'x'

const duplicate = await client.duplicate().connect();
try {
const id = await duplicate.clientId();

let nodeResolve;

const promise = new Promise((res) => {
nodeResolve = res;
});

duplicate.addPushHandler("invalidate", (push: Array<any>) => {
assert.equal(push[0].toString(), "invalidate");
assert.notEqual(push[1], null);
assert.equal(push[1].length, 1);
assert.equal(push[1][0].toString(), key);
// this test removing the handler,
// as flushAll in cleanup of test will issue a full invalidate,
// which would fail if this handler is called on it
duplicate.removePushHandler("invalidate");
nodeResolve();
})

await client.sendCommand(['CLIENT', 'TRACKING', 'ON', 'REDIRECT', id.toString()]);
await client.get(key);
await client.set(key, '1');

// force an invalidate all
await client.flushAll();

await nodeResolve;
} finally {
duplicate.destroy();
}
}, {
...GLOBAL.SERVERS.OPEN
});

testUtils.testWithClient('RESP3: add/remove invalidate handler, and validate its called', async client => {
const key = 'x'

let nodeResolve;

const promise = new Promise((res) => {
nodeResolve = res;
});

client.addPushHandler("invalidate", (push: Array<any>) => {
assert.equal(push[0].toString(), "invalidate");
assert.equal(push[1].length, 1);
assert.equal(push[1].length, 1);
assert.equal(push[1][0].toString(), key);
// this test removing the handler,
// as flushAll in cleanup of test will issue a full invalidate,
// which would fail if this handler is called on it
client.removePushHandler("invalidate");
nodeResolve();
})

await client.sendCommand(['CLIENT', 'TRACKING', 'ON']);
await client.get(key);
await client.set(key, '1');

await nodeResolve;
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
RESP: 3
}
});
});
});
8 changes: 8 additions & 0 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,14 @@ export default class RedisClient<
return this as unknown as RedisClientType<M, F, S, RESP, TYPE_MAPPING>;
}

addPushHandler(messageType: string, handler: (pushMsg: Array<any>) => unknown) {
this._self.#queue.addPushHandler(messageType, handler);
}

removePushHandler(messageType: string) {
this._self.#queue.removePushHandler(messageType);
}

sendCommand<T = ReplyUnion>(
args: Array<RedisArgument>,
options?: CommandOptions
Expand Down

0 comments on commit 3340d49

Please sign in to comment.