From 20daef47d1c7731d78c87a4be95933c4374e80fc Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 13 Feb 2024 23:44:57 +0200 Subject: [PATCH] POC of restructuring commands --- packages/client/lib/RESP/types.ts | 2 + packages/client/lib/client/index.ts | 106 +++++++++++++++++++++------- packages/client/lib/client/pool.ts | 24 +++++-- packages/client/lib/commander.ts | 82 ++++++++++++++++++++- packages/client/lib/commands/GET.ts | 9 ++- packages/test-utils/lib/index.ts | 2 +- 6 files changed, 188 insertions(+), 37 deletions(-) diff --git a/packages/client/lib/RESP/types.ts b/packages/client/lib/RESP/types.ts index 9f0e9217345..2ebe3a01b8d 100644 --- a/packages/client/lib/RESP/types.ts +++ b/packages/client/lib/RESP/types.ts @@ -1,3 +1,4 @@ +import { CommandParser } from '../commander'; import { BlobError, SimpleError } from '../errors'; import { RedisScriptConfig, SHA1 } from '../lua-script'; import { RESP_TYPES } from './decoder'; @@ -272,6 +273,7 @@ export type Command = { */ IS_FORWARD_COMMAND?: boolean; // POLICIES?: CommandPolicies; + parseCommand?(this: void, parser: CommandParser, ...args: Array): void; transformArguments(this: void, ...args: Array): CommandArguments; TRANSFORM_LEGACY_REPLY?: boolean; transformReply: TransformReply | Record; diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 3a14dc40e1a..de7ce53a7fd 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -2,7 +2,7 @@ import COMMANDS from '../commands'; import RedisSocket, { RedisSocketOptions } from './socket'; import RedisCommandsQueue, { CommandOptions } from './commands-queue'; import { EventEmitter } from 'node:events'; -import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; +import { BasicCommandParser, CachedCommandParser, CommandParser, attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { ClientClosedError, ClientOfflineError, DisconnectsClientError, WatchError } from '../errors'; import { URL } from 'node:url'; import { TcpSocketConnectOpts } from 'node:net'; @@ -150,23 +150,39 @@ export default class RedisClient< > extends EventEmitter { static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); + return async function (this: ProxyClient, ...args: Array) { - const redisArgs = command.transformArguments(...args), - reply = await this.sendCommand(redisArgs, this._commandOptions); - return transformReply ? - transformReply(reply, redisArgs.preserve) : - reply; - }; + if (command.parseCommand) { + const parser = this._self.#newCommandParser(); + command.parseCommand(parser, ...args); + + return this.executeCommand(undefined, parser, this._self._commandOptions); + } else { + const redisArgs = command.transformArguments(...args), + reply = await this.sendCommand(redisArgs, this._commandOptions); + return transformReply ? + transformReply(reply, redisArgs.preserve) : + reply; + }; + } } static #createModuleCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); + return async function (this: NamespaceProxyClient, ...args: Array) { - const redisArgs = command.transformArguments(...args), - reply = await this._self.sendCommand(redisArgs, this._self._commandOptions); - return transformReply ? - transformReply(reply, redisArgs.preserve) : - reply; + if (command.parseCommand) { + const parser = this._self.#newCommandParser(); + command.parseCommand(parser, ...args); + + return this._self.executeCommand(undefined, parser, this._self._commandOptions); + } else { + const redisArgs = command.transformArguments(...args), + reply = await this._self.sendCommand(redisArgs, this._self._commandOptions); + return transformReply ? + transformReply(reply, redisArgs.preserve) : + reply; + } }; } @@ -174,14 +190,21 @@ export default class RedisClient< const prefix = functionArgumentsPrefix(name, fn), transformReply = getTransformReply(fn, resp); return async function (this: NamespaceProxyClient, ...args: Array) { - const fnArgs = fn.transformArguments(...args), - reply = await this._self.sendCommand( - prefix.concat(fnArgs), - this._self._commandOptions - ); - return transformReply ? - transformReply(reply, fnArgs.preserve) : - reply; + if (fn.parseCommand) { + const parser = this._self.#newCommandParser(); + fn.parseCommand(parser, ...args); + + return this._self.executeCommand(prefix, parser, this._self._commandOptions); + } else { + const fnArgs = fn.transformArguments(...args), + reply = await this._self.sendCommand( + prefix.concat(fnArgs), + this._self._commandOptions + ); + return transformReply ? + transformReply(reply, fnArgs.preserve) : + reply; + } }; } @@ -189,13 +212,20 @@ export default class RedisClient< const prefix = scriptArgumentsPrefix(script), transformReply = getTransformReply(script, resp); return async function (this: ProxyClient, ...args: Array) { - const scriptArgs = script.transformArguments(...args), - redisArgs = prefix.concat(scriptArgs), - reply = await this.executeScript(script, redisArgs, this._commandOptions); - return transformReply ? - transformReply(reply, scriptArgs.preserve) : - reply; - }; + if (script.parseCommand) { + const parser = this._self.#newCommandParser(); + script.parseCommand(parser, ...args); + + return this.executeCommand(prefix, parser, this._self._commandOptions); + } else { + const scriptArgs = script.transformArguments(...args), + redisArgs = prefix.concat(scriptArgs), + reply = await this.executeScript(script, redisArgs, this._commandOptions); + return transformReply ? + transformReply(reply, scriptArgs.preserve) : + reply; + }; + } } static factory< @@ -308,6 +338,11 @@ export default class RedisClient< this._self.#dirtyWatch = msg; } + // FIXME: would choose parser here (i.e. if caching or not) + #newCommandParser(): CommandParser { + return new BasicCommandParser(); + } + constructor(options?: RedisClientOptions) { super(); this.#options = this.#initiateOptions(options); @@ -572,6 +607,23 @@ export default class RedisClient< return this as unknown as RedisClientType; } + async executeCommand( + prefix: Array | undefined, + parser: CommandParser, + commandOptions: CommandOptions | undefined, + ) { + const redisArgs = prefix ? prefix.concat(parser.redisArgs) : parser.redisArgs; + const fn = () => { return this.sendCommand(redisArgs, commandOptions) }; + + const defaultTypeMapping = this._self.#options?.commandOptions === commandOptions; + if (parser instanceof CachedCommandParser && parser.cachable && defaultTypeMapping) { + // TODO: caching goes here. + } else { + const reply = await fn(); + return parser.transformReply(reply); + } + } + sendCommand( args: Array, options?: CommandOptions diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index fc996e07625..94ac04c7f1c 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -4,7 +4,7 @@ import RedisClient, { RedisClientType, RedisClientOptions, RedisClientExtensions import { EventEmitter } from 'node:events'; import { DoublyLinkedNode, DoublyLinkedList, SinglyLinkedList } from './linked-list'; import { TimeoutError } from '../errors'; -import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; +import { BasicCommandParser, CommandParser, attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { CommandOptions } from './commands-queue'; import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; @@ -61,11 +61,18 @@ export class RedisClientPool< static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return async function (this: ProxyPool, ...args: Array) { - const redisArgs = command.transformArguments(...args), - reply = await this.sendCommand(redisArgs, this._commandOptions); - return transformReply ? - transformReply(reply, redisArgs.preserve) : - reply; + if (command.parseCommand) { + const parser = this._self.#newCommandParser(); + command.parseCommand(parser, ...args); + + return this._self.execute(client => client.executeCommand(undefined, parser, this._commandOptions)) + } else { + const redisArgs = command.transformArguments(...args), + reply = await this.sendCommand(redisArgs, this._commandOptions); + return transformReply ? + transformReply(reply, redisArgs.preserve) : + reply; + } }; } @@ -207,6 +214,11 @@ export class RedisClientPool< return this._self.#isClosing; } + // FIXME: would choose parser here (i.e. if caching or not) + #newCommandParser(): CommandParser { + return new BasicCommandParser(); + } + /** * You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`}, * {@link RedisClientPool.fromClient `RedisClientPool.fromClient`}, diff --git a/packages/client/lib/commander.ts b/packages/client/lib/commander.ts index d96aaa7128e..b911b77e891 100644 --- a/packages/client/lib/commander.ts +++ b/packages/client/lib/commander.ts @@ -1,4 +1,84 @@ -import { Command, CommanderConfig, RedisCommands, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, RespVersions } from './RESP/types'; +import { RedisArgument } from '..'; +import { Command, CommanderConfig, RedisCommands, RedisFunction, RedisFunctions, RedisModules, RedisScript, RedisScripts, RespVersions, TransformReply } from './RESP/types'; + +export interface CommandParser { + redisArgs: Array; + transformReply: TransformReply + + push: (arg: RedisArgument) => unknown; + pushKey: (key: RedisArgument) => unknown; + setTransformReply: (transformReply: TransformReply) => unknown; + setCachable: () => unknown; +} + +abstract class AbstractCommandParser implements CommandParser { + #redisArgs: Array = []; + #transformReply: TransformReply = (reply) => { return reply } + + get redisArgs() { + return this.#redisArgs; + } + + get transformReply() { + return this.#transformReply; + } + + push(arg: RedisArgument) { + this.#redisArgs.push(arg); + + }; + + pushKey(key: RedisArgument) { + this.#redisArgs.push(key); + }; + + setTransformReply(transformReply: TransformReply) { + this.#transformReply = transformReply; + } + + setCachable() {}; +} + +export class BasicCommandParser extends AbstractCommandParser {}; + +export class CachedCommandParser extends AbstractCommandParser { + keys: Array = []; + #cachable = false; + get cachable() { + return this.#cachable; + } + + override pushKey(key: RedisArgument) { + this.keys.push(key) + super.pushKey(key); + } + + override setCachable() { + this.#cachable = true; + } +} + +export class ClusterCommandParser extends BasicCommandParser { + firstKey?: RedisArgument; + + override pushKey(key: RedisArgument): void { + if (!this.firstKey) { + this.firstKey = key; + } + super.pushKey(key); + } +} + +export class ClusterCachedCommandParser extends CachedCommandParser { + firstKey?: RedisArgument; + + override pushKey(key: RedisArgument): void { + if (!this.firstKey) { + this.firstKey = key; + } + super.pushKey(key); + } +} interface AttachConfigOptions< M extends RedisModules, diff --git a/packages/client/lib/commands/GET.ts b/packages/client/lib/commands/GET.ts index bb3db4f76d9..5c7702b5818 100644 --- a/packages/client/lib/commands/GET.ts +++ b/packages/client/lib/commands/GET.ts @@ -1,10 +1,15 @@ import { RedisArgument, BlobStringReply, NullReply, Command } from '../RESP/types'; +import { CommandParser } from '../commander'; export default { FIRST_KEY_INDEX: 1, IS_READ_ONLY: true, - transformArguments(key: RedisArgument) { - return ['GET', key]; + parseCommand(parser: CommandParser, key: RedisArgument) { + parser.setCachable(); + parser.setTransformReply(undefined as unknown as () => BlobStringReply | NullReply) + parser.push('GET'); + parser.pushKey(key); }, + transformArguments: () => { return [] }, transformReply: undefined as unknown as () => BlobStringReply | NullReply } as const satisfies Command; diff --git a/packages/test-utils/lib/index.ts b/packages/test-utils/lib/index.ts index 87ba34db7ef..021d09457b7 100644 --- a/packages/test-utils/lib/index.ts +++ b/packages/test-utils/lib/index.ts @@ -229,7 +229,7 @@ export default class TestUtils { it(title, async function () { if (!dockerPromise) return this.skip(); - const pool = createClientPool({ + const pool = u({ ...options.clientOptions, socket: { ...options.clientOptions?.socket,