Skip to content

Commit

Permalink
Merge pull request #14 from streamich/eval
Browse files Browse the repository at this point in the history
Adds EVAL commands support
  • Loading branch information
streamich committed Jan 16, 2024
2 parents abdbaae + f47e757 commit fa5662c
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 16 deletions.
23 changes: 23 additions & 0 deletions src/ScriptRegistry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import {createHash} from 'crypto';

export class Script {
public readonly sha1: Uint8Array;

constructor(public readonly script: string) {
const hash = createHash('sha1').update(script).digest('hex');
this.sha1 = Buffer.from(hash);
}
}

export class ScriptRegistry {
protected map: Map<string, Script> = new Map();

public set(id: string, script: string) {
const s = new Script(script);
this.map.set(id, s);
}

public get(id: string): Script | undefined {
return this.map.get(id);
}
}
64 changes: 60 additions & 4 deletions src/cluster/RedisCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@ import {withTimeout} from '../util/timeout';
import {printTree} from 'json-joy/es2020/util/print/printTree';
import {getSlotAny} from '../util/slots';
import {isMultiCmd} from '../util/commands';
import {ScriptRegistry} from '../ScriptRegistry';
import {ascii} from '../util/buf';
import type {Printable} from 'json-joy/es2020/util/print/types';
import type {CmdOpts} from '../standalone';
import {isNoscriptError} from '../standalone/errors';

const EVALSHA = ascii`EVALSHA`;
const SCRIPT = ascii`SCRIPT`;
const LOAD = ascii`LOAD`;

export interface RedisClusterOpts extends RedisClientCodecOpts {
/**
Expand All @@ -41,6 +48,11 @@ export interface RedisClusterOpts extends RedisClientCodecOpts {
*/
// This probably is not something that user should control?
// maxRedirects?: number;

/**
* Scripts registry, used for EVALSHA command execution.
*/
scripts?: ScriptRegistry;
}

export class RedisCluster implements Printable {
Expand All @@ -49,9 +61,12 @@ export class RedisCluster implements Printable {
protected readonly router = new RedisClusterRouter(this);
protected readonly clients = new Map<string, RedisClusterNodeClient>();

public readonly scripts: ScriptRegistry;

constructor(protected readonly opts: PartialExcept<RedisClusterOpts, 'seeds'>) {
this.encoder = opts.encoder ?? new RespEncoder();
this.decoder = opts.decoder ?? new RespStreamingDecoder();
this.scripts = opts.scripts ?? new ScriptRegistry();
}

// ------------------------------------------------------------------- Events
Expand Down Expand Up @@ -176,7 +191,7 @@ export class RedisCluster implements Printable {
private async startClientFromNode(node: RedisClusterNode, hostIndex: number = 0): Promise<RedisClusterNodeClient> {
const host = node.hosts[hostIndex];
if (!host) throw new Error('NO_HOST');
const config: RedisClusterNodeClientOpts = {host, port: node.port};
const config: RedisClusterNodeClientOpts = {host, port: node.port, scripts: this.scripts};
if (node.tls) config.tls = true;
try {
const client = await withTimeout(5000, this.startClientFromConfig(config));
Expand Down Expand Up @@ -321,12 +336,19 @@ export class RedisCluster implements Printable {
isWrite = commands.write.has(cmd);
if (!key) key = (args.length > 1 ? args[1] + '' : '') || '';
}
const client = await this.getClientForKey(key, isWrite);
call.client = client;
if (!call.client) {
const client = await this.getClientForKey(key, isWrite);
call.client = client;
}
return await this.__call(call);
}

public async cmd(args: Cmd | MultiCmd, opts?: ClusterCmdOpts): Promise<unknown> {
const call = this.createCall(args, opts);
return await this.call(call);
}

protected createCall(args: Cmd | MultiCmd, opts?: ClusterCmdOpts): RedisClusterCall {
const call = new RedisClusterCall(args);
if (opts) {
if (opts.utf8) call.utf8 = true;
Expand All @@ -335,7 +357,41 @@ export class RedisCluster implements Printable {
if (opts.key) call.key = opts.key;
if (opts.maxRedirects) call.maxRedirects = opts.maxRedirects;
}
return await this.call(call);
return call;
}

// ------------------------------------------------------------------ Scripts

public async eval(
id: string,
numkeys: number,
keys: (string | Uint8Array)[],
args: (string | Uint8Array)[],
opts?: CmdOpts,
): Promise<unknown> {
if (!this._routerReady) await this.whenRouterReady();
const script = this.scripts.get(id);
if (!script) throw new Error('SCRIPT_NOT_REGISTERED');
const isWrite = true;
const sampleKey: string | undefined = keys.length
? (typeof keys[0] === 'string' ? keys[0] : Buffer.from(keys[0]).toString()) + ''
: undefined;
const client =
sampleKey !== undefined ? await this.getClientForKey(sampleKey, isWrite) : await this.getAnyClientOrSeedClient();
const cmd = [EVALSHA, script.sha1, numkeys, ...keys, ...args];
const call = this.createCall(cmd, opts);
call.client = client;
try {
return await this.__call(call);
} catch (error) {
if (!isNoscriptError(error)) throw error;
// const call2 = RedisClusterCall.retry(call, client);
const [, result] = await Promise.all([
client.cmd([SCRIPT, LOAD, script.script], {noRes: true}),
this.__call(call),
]);
return result;
}
}

// ---------------------------------------------------------------- Printable
Expand Down
10 changes: 9 additions & 1 deletion src/cluster/RedisClusterNodeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {ReconnectingSocket} from '../util/ReconnectingSocket';
import type {Printable} from 'json-joy/es2020/util/print/types';
import type {RedisClientCodecOpts} from '../types';
import type {RedisClusterShardsResponse} from './types';
import type {ScriptRegistry} from '../ScriptRegistry';

export interface RedisClusterNodeClientOpts {
/** Hostname or IP address of the Redis node. Defaults to 'localhost'. */
Expand All @@ -26,6 +27,9 @@ export interface RedisClusterNodeClientOpts {
* Defaults to 1MB.
*/
maxBufferSize?: number;

/** Script registry. */
scripts?: ScriptRegistry;
}

export class RedisClusterNodeClient extends StandaloneClient implements Printable {
Expand All @@ -34,7 +38,10 @@ export class RedisClusterNodeClient extends StandaloneClient implements Printabl
/** Port of the Redis node. */
public readonly port: number;

constructor({host = 'localhost', port = 6379, ...opts}: RedisClusterNodeClientOpts, codec: RedisClientCodecOpts) {
constructor(
{host = 'localhost', port = 6379, scripts, ...opts}: RedisClusterNodeClientOpts,
codec: RedisClientCodecOpts,
) {
super({
socket: new ReconnectingSocket({
createSocket: opts.tls
Expand All @@ -51,6 +58,7 @@ export class RedisClusterNodeClient extends StandaloneClient implements Printabl
pwd: opts.pwd,
encoder: codec.encoder,
decoder: codec.decoder,
scripts,
});
this.host = host;
this.port = port;
Expand Down
37 changes: 37 additions & 0 deletions src/cluster/__tests__/scripts.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import {RedisCluster} from '../RedisCluster';

const setup = () => {
const cluster = new RedisCluster({
seeds: [{host: '127.0.0.1', port: 7000}],
});
return {cluster};
};

let keyCnt = 1;
const getKey = () => '{redis-cluster-script}-test-' + keyCnt++;

if (process.env.TEST_LOCAL_CLUSTER) {
test('can execute a script', async () => {
const {cluster} = setup();
cluster.start();
const scriptId = 'hello-scripting-cluster-test-' + Date.now();
cluster.scripts.set(scriptId, "return 'Hello, scripting!'");
const res = await cluster.eval(scriptId, 0, [], [], {utf8Res: true});
expect(res).toBe('Hello, scripting!');
cluster.stop();
});

test('can run a with keys and arguments', async () => {
const {cluster} = setup();
cluster.start();
const scriptName = '{add-script-cluster}-' + Date.now();
cluster.scripts.set(scriptName, `return tonumber(redis.call("get",KEYS[1])) + tonumber(ARGV[1])`);
const key = getKey();
await cluster.cmd(['SET', key, '1']);
const res = await cluster.eval(scriptName, 1, [key], ['2'], {utf8Res: true});
expect(res).toBe(3);
cluster.stop();
});
} else {
test.todo('To enable cluster tests, set TEST_LOCAL_CLUSTER=1 in your environment variables.');
}
6 changes: 6 additions & 0 deletions src/standalone/StandaloneCall.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ export class StandaloneCall {
*/
public noRes: boolean = false;

/**
* Whether to execute the command as soon as possible. This will place
* the command at the front of the write queue.
*/
public asap: boolean = false;

public readonly response = new Defer<unknown>();

constructor(public args: Cmd | MultiCmd) {}
Expand Down
68 changes: 58 additions & 10 deletions src/standalone/StandaloneClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import {AvlMap} from 'json-joy/es2020/util/trees/avl/AvlMap';
import {bufferToUint8Array} from 'json-joy/es2020/util/buffers/bufferToUint8Array';
import {cmpUint8Array, ascii} from '../util/buf';
import {ReconnectingSocket} from '../util/ReconnectingSocket';
import {ScriptRegistry} from '../ScriptRegistry';
import {isNoscriptError} from './errors';
import type {Cmd, MultiCmd, PublicKeys, RedisClientCodecOpts} from '../types';
import type {RedisHelloResponse} from './types';

Expand All @@ -22,36 +24,45 @@ const PUNSUBSCRIBE = ascii`PUNSUBSCRIBE`;
const SSUBSCRIBE = ascii`SSUBSCRIBE`;
const SPUBLISH = ascii`SPUBLISH`;
const SUNSUBSCRIBE = ascii`SUNSUBSCRIBE`;
const EVALSHA = ascii`EVALSHA`;
const SCRIPT = ascii`SCRIPT`;
const LOAD = ascii`LOAD`;

export interface RedisClientOpts extends RedisClientCodecOpts {
export interface RedisClientOpts extends Partial<RedisClientCodecOpts> {
socket: PublicKeys<ReconnectingSocket>;
user?: string;
pwd?: string;
scripts?: ScriptRegistry;
}

export class StandaloneClient {
protected readonly socket: PublicKeys<ReconnectingSocket>;
public readonly scripts: ScriptRegistry;
public readonly subs = new AvlMap<Uint8Array, FanOut<Uint8Array>>(cmpUint8Array);
public readonly psubs = new AvlMap<Uint8Array, FanOut<[channel: Uint8Array, message: Uint8Array]>>(cmpUint8Array);
public readonly ssubs = new AvlMap<Uint8Array, FanOut<Uint8Array>>(cmpUint8Array);

constructor(opts: RedisClientOpts) {
this.scripts = opts.scripts ?? new ScriptRegistry();
const socket = (this.socket = opts.socket);
this.encoder = opts.encoder;
const decoder = (this.decoder = opts.decoder);
this.encoder = opts.encoder ?? new RespEncoder();
const decoder = (this.decoder = opts.decoder ?? new RespStreamingDecoder());
socket.onData.listen((data) => {
decoder.push(data);
this.scheduleRead();
});
socket.onReady.listen(() => {
this.hello(3, opts.pwd, opts.user)
this.hello(3, opts.pwd, opts.user, true)
.then(() => {
this.__whenReady.resolve();
this.onReady.emit();
})
.catch((error) => {
this.__whenReady.reject(error);
this.onError.emit(error);
})
.finally(() => {
this._isReady = true;
});
const {subs, psubs, ssubs} = this;
if (!subs.isEmpty()) {
Expand Down Expand Up @@ -79,6 +90,7 @@ export class StandaloneClient {
// ------------------------------------------------------------------- Events

private readonly __whenReady = new Defer<void>();
private _isReady = false;
public readonly whenReady = this.__whenReady.promise;
public readonly onReady = new FanOut<void>();
public readonly onError = new FanOut<Error | unknown>();
Expand Down Expand Up @@ -145,12 +157,10 @@ export class StandaloneClient {
const call = responses[i];
if (call) decoder.tryUtf8 = !!call.utf8Res;
const msg = decoder.read();
// console.log(msg);
if (msg === undefined) break;
if (msg instanceof RespPush) {
this.onPush.emit(msg);
const val = msg.val;
// console.log('push', Buffer.from(val[0] as any).toString());
if (isPushMessage(val)) {
const fanout = this.subs.get(val[1] as Uint8Array);
if (fanout) fanout.emit(val[2] as Uint8Array);
Expand Down Expand Up @@ -198,8 +208,15 @@ export class StandaloneClient {

public async call(call: StandaloneCall): Promise<unknown> {
const noResponse = call.noRes;
this.requests.push(call);
this.responses.push(noResponse ? null : call);
if (call.asap) {
const responseIndex = this.responses.length - this.requests.length;
this.requests.unshift(call);
this.responses.splice(responseIndex, 0, noResponse ? null : call);
} else {
if (!this._isReady) await this.whenReady;
this.requests.push(call);
this.responses.push(noResponse ? null : call);
}
this.scheduleWrite();
return noResponse ? void 0 : call.response.promise;
}
Expand Down Expand Up @@ -227,9 +244,40 @@ export class StandaloneClient {
// -------------------------------------------------------- Built-in commands

/** Authenticate and negotiate protocol version. */
public async hello(protocol: 2 | 3, pwd?: string, usr: string = ''): Promise<RedisHelloResponse> {
public async hello(
protocol: 2 | 3,
pwd?: string,
usr: string = '',
asap: boolean = false,
): Promise<RedisHelloResponse> {
const args: Cmd = pwd ? [HELLO, protocol, AUTH, usr, pwd] : [HELLO, protocol];
return (await this.call(new StandaloneCall(args))) as RedisHelloResponse;
const call = new StandaloneCall(args);
if (asap) call.asap = true;
return (await this.call(call)) as RedisHelloResponse;
}

// ------------------------------------------------------------------ Scripts

public async eval(
id: string,
numkeys: number,
keys: (string | Uint8Array)[],
args: (string | Uint8Array)[],
opts?: CmdOpts,
): Promise<unknown> {
const script = this.scripts.get(id);
if (!script) throw new Error('SCRIPT_NOT_REGISTERED');
const cmd = [EVALSHA, script.sha1, numkeys, ...keys, ...args];
try {
return await this.cmd(cmd, opts);
} catch (error) {
if (!isNoscriptError(error)) throw error;
const [, result] = await Promise.all([
this.cmd([SCRIPT, LOAD, script.script], {noRes: true}),
this.cmd(cmd, opts),
]);
return result;
}
}

// --------------------------------------------------------- Pub/sub commands
Expand Down
Loading

0 comments on commit fa5662c

Please sign in to comment.