Skip to content

Commit

Permalink
feat: 馃幐 add support for SSUBSCRIBE commands
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Dec 18, 2023
1 parent b3774c3 commit ec5a276
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 2 deletions.
17 changes: 17 additions & 0 deletions src/__tests__/commands/pubsub/PSUBSCRIBE.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,22 @@ export const standalone = (setup: StandaloneTestSetup) => {
[ascii(pattern), new Uint8Array([1])],
]);
});

test('can SUBSCRIBE and PSUBSCRIBE to the same message', async () => {
const {client} = await setup();
const {client: client2} = await setup();
const channel = 'the_message_' + Date.now();
const msgs: unknown[] = [];
const [, subscribed1] = client.subscribe(channel, (recv) => {
msgs.push(recv);
});
const [, subscribed2] = client2.psubscribe(channel, (recv) => {
msgs.push(recv);
});
await subscribed1;
await subscribed2;
await client.publish(channel, new Uint8Array([1]));
await until(() => msgs.length === 2);
});
});
};
117 changes: 117 additions & 0 deletions src/__tests__/commands/pubsub/SSUBSCRIBE.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import {until} from 'thingies';
import {StandaloneTestSetup} from '../../types';

export const standalone = (setup: StandaloneTestSetup) => {
describe('SSUBSCRIBE', () => {
test('can subscribe to a channel', async () => {
const {client} = await setup();
const channel = 'shard_channel_' + Date.now();
client.ssub(channel, () => {});
await new Promise((resolve) => setTimeout(resolve, 5));
});

test('can un-subscribe from a channel', async () => {
const {client} = await setup();
const channel = 'shard_channel_sub_unsub_' + Date.now();
const unsubscribe = client.ssub(channel, (recv) => {});
await new Promise((resolve) => setTimeout(resolve, 5));
unsubscribe();
await new Promise((resolve) => setTimeout(resolve, 5));
});

test('can receive a message on a channel', async () => {
const {client} = await setup();
const channel = 'shard_channel_' + Date.now();
const msgs: unknown[] = [];
client.ssub(channel, (recv) => {
msgs.push(recv);
});
client.spub(channel, new Uint8Array([1, 2, 3]));
await until(() => msgs.length === 1);
expect(msgs[0]).toEqual(new Uint8Array([1, 2, 3]));
});

test('can receive multiple messages on a channel', async () => {
const {client} = await setup();
const channel = 'shard_channel_multiple_' + Date.now();
const msgs: unknown[] = [];
client.ssub(channel, (recv) => {
msgs.push(recv);
});
client.spub(channel, new Uint8Array([1]));
client.spub(channel, new Uint8Array([2]));
client.spub(channel, new Uint8Array([3]));
await until(() => msgs.length === 3);
expect(msgs).toEqual([new Uint8Array([1]), new Uint8Array([2]), new Uint8Array([3])]);
});

test('does not receive more messages after un-subscription', async () => {
const {client} = await setup();
const channel = 'shard_unsubscribe_' + Date.now();
const msgs: unknown[] = [];
const unsubscribe = client.ssub(channel, (recv) => {
msgs.push(recv);
});
await client.spublish(channel, new Uint8Array([1]));
await client.spublish(channel, new Uint8Array([2]));
unsubscribe();
await client.spublish(channel, new Uint8Array([3]));
await until(() => msgs.length === 2);
expect(msgs).toEqual([new Uint8Array([1]), new Uint8Array([2])]);
});

test('can subscribe twice from the same client', async () => {
const {client} = await setup();
const channel = 'shard_twice_' + Date.now();
const msgs: unknown[] = [];
client.ssub(channel, (recv) => {
msgs.push(recv);
});
client.ssub(channel, (recv) => {
msgs.push(recv);
});
await client.spublish(channel, new Uint8Array([1]));
await until(() => msgs.length === 2);
expect(msgs).toEqual([new Uint8Array([1]), new Uint8Array([1])]);
});

test('can subscribe twice from the same client and unsubscribe on subscription', async () => {
const {client} = await setup();
const channel = 'shard_twice_and_unsub_' + Date.now();
const msgs: unknown[] = [];
const unsubscribe1 = client.ssub(channel, (recv) => {
msgs.push([1, recv]);
});
client.ssub(channel, (recv) => {
msgs.push([2, recv]);
});
await client.spublish(channel, new Uint8Array([1]));
unsubscribe1();
await client.spublish(channel, new Uint8Array([2]));
await until(() => msgs.length === 3);
expect(msgs).toEqual([
[1, new Uint8Array([1])],
[2, new Uint8Array([1])],
[2, new Uint8Array([2])],
]);
});

test('can subscribe twice from different clients', async () => {
const {client} = await setup();
const {client: client2} = await setup();
const channel = 'subscribe_twice_diff_clients_' + Date.now();
const msgs: unknown[] = [];
const [, subscribed1] = client.ssubscribe(channel, (recv) => {
msgs.push(recv);
});
const [, subscribed2] = client2.ssubscribe(channel, (recv) => {
msgs.push(recv);
});
await subscribed1;
await subscribed2;
await client.spublish(channel, new Uint8Array([1]));
await until(() => msgs.length === 2);
expect(msgs).toEqual([new Uint8Array([1]), new Uint8Array([1])]);
});
});
};
2 changes: 2 additions & 0 deletions src/__tests__/commands/pubsub/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import {StandaloneTestSetup} from '../../types';
import * as SUBSCRIBE from './SUBSCRIBE';
import * as PSUBSCRIBE from './PSUBSCRIBE';
import * as SSUBSCRIBE from './SSUBSCRIBE';

export const standalone = (setup: StandaloneTestSetup) => {
describe('pubsub commands', () => {
SUBSCRIBE.standalone(setup);
PSUBSCRIBE.standalone(setup);
SSUBSCRIBE.standalone(setup);
});
};
2 changes: 1 addition & 1 deletion src/__tests__/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ export type StandaloneTestSetup = () => Promise<{
}>;
export type StandaloneTestClient = Pick<
RedisClient,
'cmd' | 'subscribe' | 'sub' | 'publish' | 'pub' | 'psubscribe' | 'psub'
'cmd' | 'subscribe' | 'sub' | 'publish' | 'pub' | 'psubscribe' | 'psub' | 'ssubscribe' | 'ssub' | 'spub' | 'spublish'
>;
62 changes: 61 additions & 1 deletion src/node/RedisClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {FanOut} from 'thingies/es2020/fanout';
import {Defer} from 'thingies/es2020/Defer';
import {ReconnectingSocket} from './ReconnectingSocket';
import {RedisCall, callNoRes} from './RedisCall';
import {isPushMessage, isMultiCmd, isPushPmessage} from '../util/commands';
import {isPushMessage, isMultiCmd, isPushPmessage, isPushSmessage} from '../util/commands';
import {AvlMap} from 'json-joy/es2020/util/trees/avl/AvlMap';
import {bufferToUint8Array} from 'json-joy/es2020/util/buffers/bufferToUint8Array';
import {cmpUint8Array, ascii} from '../util/buf';
Expand All @@ -20,6 +20,7 @@ const UNSUBSCRIBE = ascii`UNSUBSCRIBE`;
const PSUBSCRIBE = ascii`PSUBSCRIBE`;
const PUNSUBSCRIBE = ascii`PUNSUBSCRIBE`;
const SSUBSCRIBE = ascii`SSUBSCRIBE`;
const SPUBLISH = ascii`SPUBLISH`;
const SUNSUBSCRIBE = ascii`SUNSUBSCRIBE`;

export interface RedisClientOpts extends RedisClientCodecOpts {
Expand Down Expand Up @@ -140,6 +141,11 @@ export class RedisClient {
if (fanout) fanout.emit([val[2] as Uint8Array, val[3] as Uint8Array]);
continue;
}
if (isPushSmessage(val)) {
const fanout = this.ssubs.get(val[1] as Uint8Array);
if (fanout) fanout.emit(val[2] as Uint8Array);
continue;
}
if (call) call.response.resolve(undefined);
i++;
continue;
Expand Down Expand Up @@ -206,6 +212,8 @@ export class RedisClient {
return (await this.call(new RedisCall(args))) as RedisHelloResponse;
}

// --------------------------------------------------------- Pub/sub commands

public subscribe(
channel: Uint8Array | string,
listener: (message: Uint8Array) => void,
Expand Down Expand Up @@ -310,6 +318,58 @@ export class RedisClient {
}
};
}

public ssubscribe(channel: Uint8Array | string, listener: (message: Uint8Array) => void): [unsubscribe: () => void, subscribed: Promise<void>] {
const channelBuf = typeof channel === 'string' ? bufferToUint8Array(Buffer.from(channel)) : channel;
let fanout = this.ssubs.get(channelBuf);
let subscribed: Promise<void>;
if (!fanout) {
fanout = new FanOut<Uint8Array>();
this.ssubs.set(channelBuf, fanout);
const call = new RedisCall([SSUBSCRIBE, channelBuf]);
this.call(call);
subscribed = call.response.promise as Promise<void>;
} else {
subscribed = Promise.resolve();
}
const unsubscribe = fanout.listen(listener);
return [
() => {
unsubscribe();
if (fanout!.listeners.size === 0) {
this.ssubs.del(channelBuf);
this.cmdFnF([SUNSUBSCRIBE, channelBuf]);
}
},
subscribed,
];
}

public ssub(channel: Uint8Array | string, listener: (message: Uint8Array) => void): () => void {
const channelBuf = typeof channel === 'string' ? bufferToUint8Array(Buffer.from(channel)) : channel;
let fanout = this.ssubs.get(channelBuf);
if (!fanout) {
fanout = new FanOut<Uint8Array>();
this.ssubs.set(channelBuf, fanout);
this.cmdFnF([SSUBSCRIBE, channelBuf]);
}
const unsubscribe = fanout.listen(listener);
return () => {
unsubscribe();
if (fanout!.listeners.size === 0) {
this.ssubs.del(channelBuf);
this.cmdFnF([SUNSUBSCRIBE, channelBuf]);
}
};
}

public async spublish(channel: Uint8Array | string, message: Uint8Array | string): Promise<number> {
return (await this.cmd([SPUBLISH, channel, message])) as number;
}

public spub(channel: Uint8Array | string, message: Uint8Array | string): void {
return this.cmdFnF([SPUBLISH, channel, message]);
}
}

export type CmdOpts = Partial<Pick<RedisCall, 'utf8' | 'utf8Res' | 'noRes'>>;
21 changes: 21 additions & 0 deletions src/util/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,24 @@ export const isPushPmessage = (val: unknown): boolean => {
if (typeof type === 'string') return type === 'pmessage';
return false;
};

export const isPushSmessage = (val: unknown): boolean => {
if (!(val instanceof Array)) return false;
const type = val[0];
if (type instanceof Uint8Array) {
if (type.length !== 8) return false;
return (
type[0] === 115 && // s
type[1] === 109 && // m
type[2] === 101 && // e
type[3] === 115 && // s
type[4] === 115 && // s
type[5] === 97 && // a
type[6] === 103 && // g
type[7] === 101
); // e
}
if (typeof type === 'string') return type === 'smessage';
return false;
};

0 comments on commit ec5a276

Please sign in to comment.