Skip to content

Commit

Permalink
feat: 馃幐 feat add initial EVAL support in RedisCluster
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Jan 15, 2024
1 parent cd773d5 commit 52ccd19
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 5 deletions.
60 changes: 56 additions & 4 deletions src/cluster/RedisCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,15 @@ import {withTimeout} from '../util/timeout';
import {printTree} from 'json-joy/es2020/util/print/printTree';
import {getSlotAny} from '../util/slots';
import {isMultiCmd} from '../util/commands';
import {ScriptRegistry} from '../ScriptRegistry';
import {ascii} from '../util/buf';
import type {Printable} from 'json-joy/es2020/util/print/types';
import type {CmdOpts} from '../standalone';
import {isNoscriptError} from '../standalone/errors';

const EVALSHA = ascii`EVALSHA`;
const SCRIPT = ascii`SCRIPT`;
const LOAD = ascii`LOAD`;

export interface RedisClusterOpts extends RedisClientCodecOpts {
/**
Expand All @@ -41,6 +48,11 @@ export interface RedisClusterOpts extends RedisClientCodecOpts {
*/
// This probably is not something that user should control?
// maxRedirects?: number;

/**
* Scripts registry, used for EVALSHA command execution.
*/
scripts?: ScriptRegistry;
}

export class RedisCluster implements Printable {
Expand All @@ -49,9 +61,12 @@ export class RedisCluster implements Printable {
protected readonly router = new RedisClusterRouter(this);
protected readonly clients = new Map<string, RedisClusterNodeClient>();

public readonly scripts: ScriptRegistry;

constructor(protected readonly opts: PartialExcept<RedisClusterOpts, 'seeds'>) {
this.encoder = opts.encoder ?? new RespEncoder();
this.decoder = opts.decoder ?? new RespStreamingDecoder();
this.scripts = opts.scripts ?? new ScriptRegistry();
}

// ------------------------------------------------------------------- Events
Expand Down Expand Up @@ -176,7 +191,7 @@ export class RedisCluster implements Printable {
private async startClientFromNode(node: RedisClusterNode, hostIndex: number = 0): Promise<RedisClusterNodeClient> {
const host = node.hosts[hostIndex];
if (!host) throw new Error('NO_HOST');
const config: RedisClusterNodeClientOpts = {host, port: node.port};
const config: RedisClusterNodeClientOpts = {host, port: node.port, scripts: this.scripts};
if (node.tls) config.tls = true;
try {
const client = await withTimeout(5000, this.startClientFromConfig(config));
Expand Down Expand Up @@ -321,12 +336,19 @@ export class RedisCluster implements Printable {
isWrite = commands.write.has(cmd);
if (!key) key = (args.length > 1 ? args[1] + '' : '') || '';
}
const client = await this.getClientForKey(key, isWrite);
call.client = client;
if (!call.client) {
const client = await this.getClientForKey(key, isWrite);
call.client = client;
}
return await this.__call(call);
}

public async cmd(args: Cmd | MultiCmd, opts?: ClusterCmdOpts): Promise<unknown> {
const call = this.createCall(args, opts);
return await this.call(call);
}

protected createCall(args: Cmd | MultiCmd, opts?: ClusterCmdOpts): RedisClusterCall {
const call = new RedisClusterCall(args);
if (opts) {
if (opts.utf8) call.utf8 = true;
Expand All @@ -335,7 +357,37 @@ export class RedisCluster implements Printable {
if (opts.key) call.key = opts.key;
if (opts.maxRedirects) call.maxRedirects = opts.maxRedirects;
}
return await this.call(call);
return call;
}

// ------------------------------------------------------------------ Scripts

public async eval(
id: string,
numkeys: number,
keys: (string | Uint8Array)[],
args: (string | Uint8Array)[],
opts?: CmdOpts,
): Promise<unknown> {
const script = this.scripts.get(id);
if (!script) throw new Error('SCRIPT_NOT_REGISTERED');
const isWrite = true;
const sampleKey: string | undefined = keys.length ? (typeof keys[0] === 'string' ? keys[0] : Buffer.from(keys[0]).toString()) + '' : undefined;
const client = sampleKey !== undefined ? await this.getClientForKey(sampleKey, isWrite) : await this.getAnyClientOrSeedClient();
const cmd = [EVALSHA, script.sha1, numkeys, ...keys, ...args];
const call = this.createCall(cmd, opts);
call.client = client;
try {
return await this.__call(call);
} catch (error) {
if (!isNoscriptError(error)) throw error;
// const call2 = RedisClusterCall.retry(call, client);
const [, result] = await Promise.all([
client.cmd([SCRIPT, LOAD, script.script], {noRes: true}),
this.__call(call),
]);
return result;
}
}

// ---------------------------------------------------------------- Printable
Expand Down
7 changes: 6 additions & 1 deletion src/cluster/RedisClusterNodeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {ReconnectingSocket} from '../util/ReconnectingSocket';
import type {Printable} from 'json-joy/es2020/util/print/types';
import type {RedisClientCodecOpts} from '../types';
import type {RedisClusterShardsResponse} from './types';
import type {ScriptRegistry} from '../ScriptRegistry';

export interface RedisClusterNodeClientOpts {
/** Hostname or IP address of the Redis node. Defaults to 'localhost'. */
Expand All @@ -26,6 +27,9 @@ export interface RedisClusterNodeClientOpts {
* Defaults to 1MB.
*/
maxBufferSize?: number;

/** Script registry. */
scripts?: ScriptRegistry;
}

export class RedisClusterNodeClient extends StandaloneClient implements Printable {
Expand All @@ -34,7 +38,7 @@ export class RedisClusterNodeClient extends StandaloneClient implements Printabl
/** Port of the Redis node. */
public readonly port: number;

constructor({host = 'localhost', port = 6379, ...opts}: RedisClusterNodeClientOpts, codec: RedisClientCodecOpts) {
constructor({host = 'localhost', port = 6379, scripts, ...opts}: RedisClusterNodeClientOpts, codec: RedisClientCodecOpts) {
super({
socket: new ReconnectingSocket({
createSocket: opts.tls
Expand All @@ -51,6 +55,7 @@ export class RedisClusterNodeClient extends StandaloneClient implements Printabl
pwd: opts.pwd,
encoder: codec.encoder,
decoder: codec.decoder,
scripts,
});
this.host = host;
this.port = port;
Expand Down

0 comments on commit 52ccd19

Please sign in to comment.