Skip to content

Commit

Permalink
feat: 🎸 add pub/sub support
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Dec 18, 2023
1 parent fa39312 commit d19cd0e
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 30 deletions.
9 changes: 7 additions & 2 deletions src/__tests__/commands/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import {TestSetup} from '../types';
import {StandaloneTestSetup, TestSetup} from '../types';
import * as string from './string';
import * as pubsub from './pubsub';

export const run = (setup: TestSetup) => {
describe('commands', () => {
string.run(setup);
pubsub.run(setup);
});
};

export const standalone = (setup: StandaloneTestSetup) => {
describe('commands', () => {
pubsub.standalone(setup);
});
};
130 changes: 124 additions & 6 deletions src/__tests__/commands/pubsub/SUBSCRIBE.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,130 @@
import {TestSetup} from '../../types';
import {tick, until} from 'thingies';
import {StandaloneTestSetup} from '../../types';

export const run = (setup: TestSetup) => {
export const standalone = (setup: StandaloneTestSetup) => {
describe('SUBSCRIBE', () => {
test('can subscribe to a key', async () => {
test('can subscribe to a channel', async () => {
const {client} = await setup();
const key = 'subscribe_key_' + Date.now();
const res = await client.cmd(['SUBSCRIBE', key]);
expect(res).toBe(undefined);
const channel = 'subscribe_channel_just_sub_' + Date.now();
client.sub(channel, (recv) => {});
await new Promise(resolve => setTimeout(resolve, 5));
});

test('can un-subscribe from a channel', async () => {
const {client} = await setup();
const channel = 'subscribe_channel_sub_unsub_' + Date.now();
const unsubscribe = client.sub(channel, (recv) => {});
await new Promise(resolve => setTimeout(resolve, 5));
unsubscribe();
await new Promise(resolve => setTimeout(resolve, 5));
});

test('can receive a message on a channel', async () => {
const {client} = await setup();
const channel = 'subscribe_channel_' + Date.now();
const msgs: unknown[] = [];
client.sub(channel, (recv) => {
msgs.push(recv);
});
client.publish(channel, new Uint8Array([1, 2, 3]));
await until(() => msgs.length === 1);
expect(msgs[0]).toEqual(new Uint8Array([1, 2, 3]));
});

test('can receive multiple messages on a channel', async () => {
const {client} = await setup();
const channel = 'subscribe_channel_multiple_' + Date.now();
const msgs: unknown[] = [];
client.sub(channel, (recv) => {
msgs.push(recv);
});
client.pub(channel, new Uint8Array([1]));
client.pub(channel, new Uint8Array([2]));
client.pub(channel, new Uint8Array([3]));
await until(() => msgs.length === 3);
expect(msgs).toEqual([
new Uint8Array([1]),
new Uint8Array([2]),
new Uint8Array([3]),
]);
});

test('does not receive more messages after un-subscription', async () => {
const {client} = await setup();
const channel = 'channel_unsubscribe_' + Date.now();
const msgs: unknown[] = [];
const unsubscribe = client.sub(channel, (recv) => {
msgs.push(recv);
});
await client.publish(channel, new Uint8Array([1]));
await client.publish(channel, new Uint8Array([2]));
unsubscribe();
await client.publish(channel, new Uint8Array([3]));
await until(() => msgs.length === 2);
expect(msgs).toEqual([
new Uint8Array([1]),
new Uint8Array([2]),
]);
});

test('can subscribe twice from the same client', async () => {
const {client} = await setup();
const channel = 'subscribe_twice_' + Date.now();
const msgs: unknown[] = [];
client.sub(channel, (recv) => {
msgs.push(recv);
});
client.sub(channel, (recv) => {
msgs.push(recv);
});
await client.publish(channel, new Uint8Array([1]));
await until(() => msgs.length === 2);
expect(msgs).toEqual([
new Uint8Array([1]),
new Uint8Array([1]),
]);
});

test('can subscribe twice from the same client and unsubscribe on subscription', async () => {
const {client} = await setup();
const channel = 'subscribe_twice_and_unsub_' + Date.now();
const msgs: unknown[] = [];
const unsubscribe1 = client.sub(channel, (recv) => {
msgs.push([1, recv]);
});
client.sub(channel, (recv) => {
msgs.push([2, recv]);
});
await client.publish(channel, new Uint8Array([1]));
unsubscribe1();
await client.publish(channel, new Uint8Array([2]));
await until(() => msgs.length === 3);
expect(msgs).toEqual([
[1, new Uint8Array([1])],
[2, new Uint8Array([1])],
[2, new Uint8Array([2])],
]);
});

test('can subscribe twice from different clients', async () => {
const {client} = await setup();
const {client: client2} = await setup();
const channel = 'subscribe_twice_diff_clients_' + Date.now();
const msgs: unknown[] = [];
const [, subscribed1] = client.subscribe(channel, (recv) => {
msgs.push(recv);
});
const [, subscribed2] = client2.subscribe(channel, (recv) => {
msgs.push(recv);
});
await subscribed1;
await subscribed2;
await client.publish(channel, new Uint8Array([1]));
await until(() => msgs.length === 2);
expect(msgs).toEqual([
new Uint8Array([1]),
new Uint8Array([1]),
]);
});
});
};
6 changes: 3 additions & 3 deletions src/__tests__/commands/pubsub/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {TestSetup} from '../../types';
import {StandaloneTestSetup} from '../../types';
import * as SUBSCRIBE from './SUBSCRIBE';

export const run = (setup: TestSetup) => {
export const standalone = (setup: StandaloneTestSetup) => {
describe('pubsub commands', () => {
SUBSCRIBE.run(setup);
SUBSCRIBE.standalone(setup);
});
};
12 changes: 6 additions & 6 deletions src/__tests__/standalone.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as net from 'net';
import {RedisClient} from '../node/RedisClient';
import {ClusterTestSetup} from './types';
import {StandaloneTestSetup} from './types';
import * as commands from './commands';
import {ReconnectingSocket} from '../node';
import {RespEncoder} from 'json-joy/es2020/json-pack/resp/RespEncoder';
Expand All @@ -10,7 +10,7 @@ const host = '127.0.0.1';
const port = 6379;
const clients: RedisClient[] = [];

const setupCluster: ClusterTestSetup = async () => {
const setupCluster: StandaloneTestSetup = async () => {
const client = new RedisClient({
socket: new ReconnectingSocket({
createSocket: () => net.connect({host, port}),
Expand All @@ -21,17 +21,17 @@ const setupCluster: ClusterTestSetup = async () => {
client.onError.listen((err) => {
console.error('onError', err);
});
client.onPush.listen((push) => {
console.log(push);
});
// client.onPush.listen((push) => {
// console.log(push);
// });
client.start();
clients.push(client);
await client.whenReady;
return {client};
};

describe('standalone (client per test)', () => {
commands.run(setupCluster);
commands.standalone(setupCluster);
});

afterAll(() => {
Expand Down
6 changes: 6 additions & 0 deletions src/__tests__/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {RedisCluster} from '../cluster/RedisCluster';
import {RedisClient} from '../node';

export type TestSetup = ClusterTestSetup;
export type ClusterTestSetup = () => Promise<{
Expand All @@ -7,3 +8,8 @@ export type ClusterTestSetup = () => Promise<{

export type TextClient = ClusterTestClient;
export type ClusterTestClient = Pick<RedisCluster, 'cmd'>;

export type StandaloneTestSetup = () => Promise<{
client: StandaloneTestClient;
}>;
export type StandaloneTestClient = Pick<RedisClient, 'cmd' | 'subscribe' | 'sub' | 'publish' | 'pub'>;
93 changes: 82 additions & 11 deletions src/node/RedisClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,23 @@ import {FanOut} from 'thingies/es2020/fanout';
import {Defer} from 'thingies/es2020/Defer';
import {ReconnectingSocket} from './ReconnectingSocket';
import {RedisCall, callNoRes} from './RedisCall';
import {isMultiCmd, isSubscribeAckResponse} from '../util/commands';
import {isPushMessage, isMultiCmd} from '../util/commands';
import {AvlMap} from 'json-joy/es2020/util/trees/avl/AvlMap';
import {bufferToUint8Array} from 'json-joy/es2020/util/buffers/bufferToUint8Array';
import {cmpUint8Array, ascii} from '../util/buf';
import type {Cmd, MultiCmd, RedisClientCodecOpts} from '../types';
import type {RedisHelloResponse} from './types';

const HELLO = ascii `HELLO`;
const AUTH = ascii `AUTH`;
const SUBSCRIBE = ascii `SUBSCRIBE`;
const PUBLISH = ascii `PUBLISH`;
const UNSUBSCRIBE = ascii `UNSUBSCRIBE`;
const PSUBSCRIBE = ascii `PSUBSCRIBE`;
const PUNSUBSCRIBE = ascii `PUNSUBSCRIBE`;
const SSUBSCRIBE = ascii `SSUBSCRIBE`;
const SUNSUBSCRIBE = ascii `SUNSUBSCRIBE`;

export interface RedisClientOpts extends RedisClientCodecOpts {
socket: ReconnectingSocket;
user?: string;
Expand All @@ -17,6 +30,9 @@ export interface RedisClientOpts extends RedisClientCodecOpts {

export class RedisClient {
protected readonly socket: ReconnectingSocket;
public readonly subs = new AvlMap<Uint8Array, FanOut<Uint8Array>>(cmpUint8Array);
public readonly psubs = new AvlMap<Uint8Array, FanOut<[channel: Uint8Array, message: Uint8Array]>>(cmpUint8Array);
public readonly ssubs = new AvlMap<Uint8Array, FanOut<Uint8Array>>(cmpUint8Array);

constructor(opts: RedisClientOpts) {
const socket = (this.socket = opts.socket);
Expand Down Expand Up @@ -46,7 +62,7 @@ export class RedisClient {
public readonly whenReady = this.__whenReady.promise;
public readonly onReady = new FanOut<void>();
public readonly onError = new FanOut<Error | unknown>();
public readonly onPush = new FanOut<unknown>();
public readonly onPush = new FanOut<RespPush>();


// ------------------------------------------------------------ Socket writes
Expand Down Expand Up @@ -114,19 +130,22 @@ export class RedisClient {
// console.log(msg);
if (msg === undefined) break;
if (msg instanceof RespPush) {
this.onPush.emit(msg);
const val = msg.val;
if (isSubscribeAckResponse(val)) {
if (!call) throw new Error('UNEXPECTED_RESPONSE');
call.response.resolve(undefined);
i++;
// console.log('push', Buffer.from(val[0] as any).toString());
if (isPushMessage(val)) {
const fanout = this.subs.get(val[1] as Uint8Array);
if (fanout) fanout.emit(val[2] as Uint8Array);
continue;
}
this.onPush.emit(msg);
if (call) call.response.resolve(undefined);
i++;
continue;
}
if (!call) throw new Error('UNEXPECTED_RESPONSE');
const res = call.response;
if (msg instanceof Error) res.reject(msg); else res.resolve(msg);
if (call instanceof RedisCall) {
const res = call.response;
if (msg instanceof Error) res.reject(msg); else res.resolve(msg);
} else if (call !== null) throw new Error('UNEXPECTED_RESPONSE');
i++;
}
if (i > 0) responses.splice(0, i);
Expand Down Expand Up @@ -183,9 +202,61 @@ export class RedisClient {

/** Authenticate and negotiate protocol version. */
public async hello(protocol: 2 | 3, pwd?: string, usr: string = ''): Promise<RedisHelloResponse> {
const args: Cmd = pwd ? ['HELLO', protocol, 'AUTH', usr, pwd] : ['HELLO', protocol];
const args: Cmd = pwd ? [HELLO, protocol, AUTH, usr, pwd] : [HELLO, protocol];
return await this.call(new RedisCall(args)) as RedisHelloResponse;
}

public subscribe(channel: Uint8Array | string, listener: (message: Uint8Array) => void): [unsubscribe: () => void, subscribed: Promise<void>] {
const channelBuf = typeof channel === 'string' ? bufferToUint8Array(Buffer.from(channel)) : channel;
let fanout = this.subs.get(channelBuf);
let subscribed: Promise<void>;
if (!fanout) {
fanout = new FanOut<Uint8Array>();
this.subs.set(channelBuf, fanout);
const call = new RedisCall([SUBSCRIBE, channelBuf]);
this.call(call);
subscribed = call.response.promise as Promise<void>;
} else {
subscribed = Promise.resolve();
}
const unsubscribe = fanout.listen(listener);
return [
() => {
unsubscribe();
if (fanout!.listeners.size === 0) {
this.subs.del(channelBuf);
this.cmdFnF([UNSUBSCRIBE, channelBuf]);
}
},
subscribed,
];
}

public sub(channel: Uint8Array | string, listener: (message: Uint8Array) => void): (() => void) {
const channelBuf = typeof channel === 'string' ? bufferToUint8Array(Buffer.from(channel)) : channel;
let fanout = this.subs.get(channelBuf);
if (!fanout) {
fanout = new FanOut<Uint8Array>();
this.subs.set(channelBuf, fanout);
this.cmdFnF([SUBSCRIBE, channelBuf]);
}
const unsubscribe = fanout.listen(listener);
return () => {
unsubscribe();
if (fanout!.listeners.size === 0) {
this.subs.del(channelBuf);
this.cmdFnF([UNSUBSCRIBE, channelBuf]);
}
};
}

public async publish(channel: Uint8Array | string, message: Uint8Array | string): Promise<number> {
return await this.cmd([PUBLISH, channel, message]) as number;
}

public pub(channel: Uint8Array | string, message: Uint8Array | string): void {
return this.cmdFnF([PUBLISH, channel, message]);
}
}

export type CmdOpts = Partial<Pick<RedisCall, 'utf8' | 'utf8Res' | 'noRes'>>;
2 changes: 1 addition & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ export interface RedisClientCodecOpts {

export type MultiCmd = Cmd[];
export type Cmd = Arg[];
export type Arg = string | number | Buffer;
export type Arg = string | number | Uint8Array;
24 changes: 24 additions & 0 deletions src/util/buf.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import {bufferToUint8Array} from 'json-joy/es2020/util/buffers/bufferToUint8Array';

export const cmpUint8Array = (a: Uint8Array, b: Uint8Array): 1 | 0 | -1 => {
const len1 = a.length;
const len2 = b.length;
if (len1 > len2) return 1;
if (len1 < len2) return -1;
for (let i = 0; i < len1; i++) {
const o1 = a[i];
const o2 = b[i];
if (o1 > o2) return 1;
if (o1 < o2) return -1;
}
return 0;
};

export const ascii = ([txt]: TemplateStringsArray) => {
const len = txt.length;
const res = new Uint8Array(len);
for (let i = 0; i < len; i++) res[i] = txt.charCodeAt(i);
return res;
};

export const utf8 = ([txt]: TemplateStringsArray) => bufferToUint8Array(Buffer.from(txt, 'utf8'));
Loading

0 comments on commit d19cd0e

Please sign in to comment.