diff --git a/benchmark/package-lock.json b/benchmark/package-lock.json index 62a9608062e..857b3c423ac 100644 --- a/benchmark/package-lock.json +++ b/benchmark/package-lock.json @@ -20,7 +20,6 @@ "dependencies": { "cluster-key-slot": "1.1.0", "generic-pool": "3.8.2", - "redis-parser": "3.0.0", "yallist": "4.0.0" }, "devDependencies": { @@ -316,6 +315,14 @@ "@node-redis/time-series": "1.0.2" } }, + "node_modules/redis-v3/node_modules/denque": { + "version": "1.5.1", + "resolved": "https://registry.npmjs.org/denque/-/denque-1.5.1.tgz", + "integrity": "sha512-XwE+iZ4D6ZUB7mfYRMb5wByE8L74HCn30FBN7sWnXksWc1LO1bPDl67pBR9o/kC4z/xSNAwkMYcGgqDV3BE3Hw==", + "engines": { + "node": ">=0.10" + } + }, "node_modules/require-directory": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/require-directory/-/require-directory-2.1.1.tgz", @@ -395,9 +402,9 @@ } }, "node_modules/yargs-parser": { - "version": "21.0.0", - "resolved": "https://registry.npmjs.org/yargs-parser/-/yargs-parser-21.0.0.tgz", - "integrity": "sha512-z9kApYUOCwoeZ78rfRYYWdiU/iNL6mwwYlkkZfJoyMR1xps+NEBX5X7XmRpxkZHhXJ6+Ey00IwKxBBSW9FIjyA==", + "version": "21.0.1", + "resolved": "https://registry.npmjs.org/yargs-parser/-/yargs-parser-21.0.1.tgz", + "integrity": "sha512-9BK1jFpLzJROCI5TzwZL/TU4gqjK5xiHV/RfWLOahrjAko/e4DJkRDZQXfvqAsiZzzYhgAzbgz6lg48jcm4GLg==", "engines": { "node": ">=12" } @@ -678,9 +685,9 @@ } }, "yargs-parser": { - "version": "21.0.0", - "resolved": "https://registry.npmjs.org/yargs-parser/-/yargs-parser-21.0.0.tgz", - "integrity": "sha512-z9kApYUOCwoeZ78rfRYYWdiU/iNL6mwwYlkkZfJoyMR1xps+NEBX5X7XmRpxkZHhXJ6+Ey00IwKxBBSW9FIjyA==" + "version": "21.0.1", + "resolved": "https://registry.npmjs.org/yargs-parser/-/yargs-parser-21.0.1.tgz", + "integrity": "sha512-9BK1jFpLzJROCI5TzwZL/TU4gqjK5xiHV/RfWLOahrjAko/e4DJkRDZQXfvqAsiZzzYhgAzbgz6lg48jcm4GLg==" } } } diff --git a/packages/client/lib/client/RESP2/composers/buffer.spec.ts b/packages/client/lib/client/RESP2/composers/buffer.spec.ts new file mode 100644 index 00000000000..f57c369fecb --- /dev/null +++ b/packages/client/lib/client/RESP2/composers/buffer.spec.ts @@ -0,0 +1,14 @@ +import { strict as assert } from 'assert'; +import BufferComposer from './buffer'; + +describe('Buffer Composer', () => { + const composer = new BufferComposer(); + + it('should compose two buffers', () => { + composer.write(Buffer.from([0])); + assert.deepEqual( + composer.end(Buffer.from([1])), + Buffer.from([0, 1]) + ); + }); +}); diff --git a/packages/client/lib/client/RESP2/composers/buffer.ts b/packages/client/lib/client/RESP2/composers/buffer.ts new file mode 100644 index 00000000000..4affb4283e0 --- /dev/null +++ b/packages/client/lib/client/RESP2/composers/buffer.ts @@ -0,0 +1,18 @@ +import { Composer } from './interface'; + +export default class BufferComposer implements Composer { + private chunks: Array = []; + + write(buffer: Buffer): void { + this.chunks.push(buffer); + } + + end(buffer: Buffer): Buffer { + this.write(buffer); + return Buffer.concat(this.chunks.splice(0)); + } + + reset() { + this.chunks = []; + } +} diff --git a/packages/client/lib/client/RESP2/composers/interface.ts b/packages/client/lib/client/RESP2/composers/interface.ts new file mode 100644 index 00000000000..0fc8f031414 --- /dev/null +++ b/packages/client/lib/client/RESP2/composers/interface.ts @@ -0,0 +1,7 @@ +export interface Composer { + write(buffer: Buffer): void; + + end(buffer: Buffer): T; + + reset(): void; +} diff --git a/packages/client/lib/client/RESP2/composers/string.spec.ts b/packages/client/lib/client/RESP2/composers/string.spec.ts new file mode 100644 index 00000000000..9dd26aae021 --- /dev/null +++ b/packages/client/lib/client/RESP2/composers/string.spec.ts @@ -0,0 +1,14 @@ +import { strict as assert } from 'assert'; +import StringComposer from './string'; + +describe('String Composer', () => { + const composer = new StringComposer(); + + it('should compose two strings', () => { + composer.write(Buffer.from([0])); + assert.deepEqual( + composer.end(Buffer.from([1])), + Buffer.from([0, 1]).toString() + ); + }); +}); diff --git a/packages/client/lib/client/RESP2/composers/string.ts b/packages/client/lib/client/RESP2/composers/string.ts new file mode 100644 index 00000000000..0cd8f00e95c --- /dev/null +++ b/packages/client/lib/client/RESP2/composers/string.ts @@ -0,0 +1,22 @@ +import { StringDecoder } from 'string_decoder'; +import { Composer } from './interface'; + +export default class StringComposer implements Composer { + private decoder = new StringDecoder(); + + private string = ''; + + write(buffer: Buffer): void { + this.string += this.decoder.write(buffer); + } + + end(buffer: Buffer): string { + const string = this.string + this.decoder.end(buffer); + this.string = ''; + return string; + } + + reset() { + this.string = ''; + } +} diff --git a/packages/client/lib/client/RESP2/decoder.spec.ts b/packages/client/lib/client/RESP2/decoder.spec.ts new file mode 100644 index 00000000000..dcce9f60115 --- /dev/null +++ b/packages/client/lib/client/RESP2/decoder.spec.ts @@ -0,0 +1,195 @@ +import { strict as assert } from 'assert'; +import { SinonSpy, spy } from 'sinon'; +import RESP2Decoder from './decoder'; +import { ErrorReply } from '../../errors'; + +interface DecoderAndSpies { + decoder: RESP2Decoder; + returnStringsAsBuffersSpy: SinonSpy; + onReplySpy: SinonSpy; +} + +function createDecoderAndSpies(returnStringsAsBuffers: boolean): DecoderAndSpies { + const returnStringsAsBuffersSpy = spy(() => returnStringsAsBuffers), + onReplySpy = spy(); + + return { + decoder: new RESP2Decoder({ + returnStringsAsBuffers: returnStringsAsBuffersSpy, + onReply: onReplySpy + }), + returnStringsAsBuffersSpy, + onReplySpy + }; +} + +function writeChunks(stream: RESP2Decoder, buffer: Buffer) { + let i = 0; + while (i < buffer.length) { + stream.write(buffer.slice(i, ++i)); + } +} + +type Replies = Array>; + +interface TestsOptions { + toWrite: Buffer; + returnStringsAsBuffers: boolean; + replies: Replies; +} + +function generateTests({ + toWrite, + returnStringsAsBuffers, + replies +}: TestsOptions): void { + it('single chunk', () => { + const { decoder, returnStringsAsBuffersSpy, onReplySpy } = + createDecoderAndSpies(returnStringsAsBuffers); + decoder.write(toWrite); + assert.equal(returnStringsAsBuffersSpy.callCount, replies.length); + testReplies(onReplySpy, replies); + }); + + it('multiple chunks', () => { + const { decoder, returnStringsAsBuffersSpy, onReplySpy } = + createDecoderAndSpies(returnStringsAsBuffers); + writeChunks(decoder, toWrite); + assert.equal(returnStringsAsBuffersSpy.callCount, replies.length); + testReplies(onReplySpy, replies); + }); +} + +function testReplies(spy: SinonSpy, replies: Replies): void { + if (!replies) { + assert.equal(spy.callCount, 0); + return; + } + + assert.equal(spy.callCount, replies.length); + for (const [i, reply] of replies.entries()) { + assert.deepEqual( + spy.getCall(i).args, + reply + ); + } +} + +describe('RESP2Parser', () => { + describe('Simple String', () => { + describe('as strings', () => { + generateTests({ + toWrite: Buffer.from('+OK\r\n'), + returnStringsAsBuffers: false, + replies: [['OK']] + }); + }); + + describe('as buffers', () => { + generateTests({ + toWrite: Buffer.from('+OK\r\n'), + returnStringsAsBuffers: true, + replies: [[Buffer.from('OK')]] + }); + }); + }); + + describe('Error', () => { + generateTests({ + toWrite: Buffer.from('-ERR\r\n'), + returnStringsAsBuffers: false, + replies: [[new ErrorReply('ERR')]] + }); + }); + + describe('Integer', () => { + describe('-1', () => { + generateTests({ + toWrite: Buffer.from(':-1\r\n'), + returnStringsAsBuffers: false, + replies: [[-1]] + }); + }); + + describe('0', () => { + generateTests({ + toWrite: Buffer.from(':0\r\n'), + returnStringsAsBuffers: false, + replies: [[0]] + }); + }); + }); + + describe('Bulk String', () => { + describe('null', () => { + generateTests({ + toWrite: Buffer.from('$-1\r\n'), + returnStringsAsBuffers: false, + replies: [[null]] + }); + }); + + describe('as strings', () => { + generateTests({ + toWrite: Buffer.from('$2\r\naa\r\n'), + returnStringsAsBuffers: false, + replies: [['aa']] + }); + }); + + describe('as buffers', () => { + generateTests({ + toWrite: Buffer.from('$2\r\naa\r\n'), + returnStringsAsBuffers: true, + replies: [[Buffer.from('aa')]] + }); + }); + }); + + describe('Array', () => { + describe('null', () => { + generateTests({ + toWrite: Buffer.from('*-1\r\n'), + returnStringsAsBuffers: false, + replies: [[null]] + }); + }); + + const arrayBuffer = Buffer.from( + '*5\r\n' + + '+OK\r\n' + + '-ERR\r\n' + + ':0\r\n' + + '$1\r\na\r\n' + + '*0\r\n' + ); + + describe('as strings', () => { + generateTests({ + toWrite: arrayBuffer, + returnStringsAsBuffers: false, + replies: [[[ + 'OK', + new ErrorReply('ERR'), + 0, + 'a', + [] + ]]] + }); + }); + + describe('as buffers', () => { + generateTests({ + toWrite: arrayBuffer, + returnStringsAsBuffers: true, + replies: [[[ + Buffer.from('OK'), + new ErrorReply('ERR'), + 0, + Buffer.from('a'), + [] + ]]] + }); + }); + }); +}); diff --git a/packages/client/lib/client/RESP2/decoder.ts b/packages/client/lib/client/RESP2/decoder.ts new file mode 100644 index 00000000000..e9d2317deab --- /dev/null +++ b/packages/client/lib/client/RESP2/decoder.ts @@ -0,0 +1,254 @@ +import { ErrorReply } from '../../errors'; +import { Composer } from './composers/interface'; +import BufferComposer from './composers/buffer'; +import StringComposer from './composers/string'; + +// RESP2 specification +// https://redis.io/topics/protocol + +enum Types { + SIMPLE_STRING = 43, // + + ERROR = 45, // - + INTEGER = 58, // : + BULK_STRING = 36, // $ + ARRAY = 42 // * +} + +enum ASCII { + CR = 13, // \r + ZERO = 48, + MINUS = 45 +} + +export type Reply = string | Buffer | ErrorReply | number | null | Array; + +type ArrayReply = Array | null; + +export type ReturnStringsAsBuffers = () => boolean; + +interface RESP2Options { + returnStringsAsBuffers: ReturnStringsAsBuffers; + onReply(reply: Reply): unknown; +} + +interface ArrayInProcess { + array: Array; + pushCounter: number; +} + +// Using TypeScript `private` and not the build-in `#` to avoid __classPrivateFieldGet and __classPrivateFieldSet + +export default class RESP2Decoder { + constructor(private options: RESP2Options) {} + + private cursor = 0; + + private type?: Types; + + private bufferComposer = new BufferComposer(); + + private stringComposer = new StringComposer(); + + private currentStringComposer: BufferComposer | StringComposer = this.stringComposer; + + reset() { + this.cursor = 0; + this.type = undefined; + this.bufferComposer.reset(); + this.stringComposer.reset(); + this.currentStringComposer = this.stringComposer; + } + + write(chunk: Buffer): void { + while (this.cursor < chunk.length) { + if (!this.type) { + this.currentStringComposer = this.options.returnStringsAsBuffers() ? + this.bufferComposer : + this.stringComposer; + + this.type = chunk[this.cursor]; + if (++this.cursor >= chunk.length) break; + } + + const reply = this.parseType(chunk, this.type); + if (reply === undefined) break; + + this.type = undefined; + this.options.onReply(reply); + } + + this.cursor -= chunk.length; + } + + private parseType(chunk: Buffer, type: Types, arraysToKeep?: number): Reply | undefined { + switch (type) { + case Types.SIMPLE_STRING: + return this.parseSimpleString(chunk); + + case Types.ERROR: + return this.parseError(chunk); + + case Types.INTEGER: + return this.parseInteger(chunk); + + case Types.BULK_STRING: + return this.parseBulkString(chunk); + + case Types.ARRAY: + return this.parseArray(chunk, arraysToKeep); + } + } + + private compose< + C extends Composer, + T = C extends Composer ? TT : never + >( + chunk: Buffer, + composer: C + ): T | undefined { + for (let i = this.cursor; i < chunk.length; i++) { + if (chunk[i] === ASCII.CR) { + const reply = composer.end( + chunk.subarray(this.cursor, i) + ); + this.cursor = i + 2; + return reply; + } + } + + const toWrite = chunk.subarray(this.cursor); + composer.write(toWrite); + this.cursor = chunk.length; + } + + private parseSimpleString(chunk: Buffer): string | Buffer | undefined { + return this.compose(chunk, this.currentStringComposer); + } + + private parseError(chunk: Buffer): ErrorReply | undefined { + const message = this.compose(chunk, this.stringComposer); + if (message !== undefined) { + return new ErrorReply(message); + } + } + + private integer = 0; + + private isNegativeInteger?: boolean; + + private parseInteger(chunk: Buffer): number | undefined { + if (this.isNegativeInteger === undefined) { + this.isNegativeInteger = chunk[this.cursor] === ASCII.MINUS; + if (this.isNegativeInteger && ++this.cursor === chunk.length) return; + } + + do { + const byte = chunk[this.cursor]; + if (byte === ASCII.CR) { + const integer = this.isNegativeInteger ? -this.integer : this.integer; + this.integer = 0; + this.isNegativeInteger = undefined; + this.cursor += 2; + return integer; + } + + this.integer = this.integer * 10 + byte - ASCII.ZERO; + } while (++this.cursor < chunk.length); + } + + private bulkStringRemainingLength?: number; + + private parseBulkString(chunk: Buffer): string | Buffer | null | undefined { + if (this.bulkStringRemainingLength === undefined) { + const length = this.parseInteger(chunk); + if (length === undefined) return; + if (length === -1) return null; + + this.bulkStringRemainingLength = length; + + if (this.cursor >= chunk.length) return; + } + + const end = this.cursor + this.bulkStringRemainingLength; + if (chunk.length >= end) { + const reply = this.currentStringComposer.end( + chunk.subarray(this.cursor, end) + ); + this.bulkStringRemainingLength = undefined; + this.cursor = end + 2; + return reply; + } + + const toWrite = chunk.subarray(this.cursor); + this.currentStringComposer.write(toWrite); + this.bulkStringRemainingLength -= toWrite.length; + this.cursor = chunk.length; + } + + private arraysInProcess: Array = []; + + private initializeArray = false; + + private arrayItemType?: Types; + + private parseArray(chunk: Buffer, arraysToKeep = 0): ArrayReply | undefined { + if (this.initializeArray || this.arraysInProcess.length === arraysToKeep) { + const length = this.parseInteger(chunk); + if (length === undefined) { + this.initializeArray = true; + return undefined; + } + + this.initializeArray = false; + this.arrayItemType = undefined; + + if (length === -1) { + return this.returnArrayReply(null, arraysToKeep); + } else if (length === 0) { + return this.returnArrayReply([], arraysToKeep); + } + + this.arraysInProcess.push({ + array: new Array(length), + pushCounter: 0 + }); + } + + while (this.cursor < chunk.length) { + if (!this.arrayItemType) { + this.arrayItemType = chunk[this.cursor]; + + if (++this.cursor >= chunk.length) break; + } + + const item = this.parseType( + chunk, + this.arrayItemType, + arraysToKeep + 1 + ); + if (item === undefined) break; + + this.arrayItemType = undefined; + + const reply = this.pushArrayItem(item, arraysToKeep); + if (reply !== undefined) return reply; + } + } + + private returnArrayReply(reply: ArrayReply, arraysToKeep: number): ArrayReply | undefined { + if (this.arraysInProcess.length <= arraysToKeep) return reply; + + return this.pushArrayItem(reply, arraysToKeep); + } + + private pushArrayItem(item: Reply, arraysToKeep: number): ArrayReply | undefined { + const to = this.arraysInProcess[this.arraysInProcess.length - 1]!; + to.array[to.pushCounter] = item; + if (++to.pushCounter === to.array.length) { + return this.returnArrayReply( + this.arraysInProcess.pop()!.array, + arraysToKeep + ); + } + } +} diff --git a/packages/client/lib/client/RESP2/encoder.spec.ts b/packages/client/lib/client/RESP2/encoder.spec.ts new file mode 100644 index 00000000000..486259472a4 --- /dev/null +++ b/packages/client/lib/client/RESP2/encoder.spec.ts @@ -0,0 +1,33 @@ +import { strict as assert } from 'assert'; +import { describe } from 'mocha'; +import encodeCommand from './encoder'; + +describe('RESP2 Encoder', () => { + it('1 byte', () => { + assert.deepEqual( + encodeCommand(['a', 'z']), + ['*2\r\n$1\r\na\r\n$1\r\nz\r\n'] + ); + }); + + it('2 bytes', () => { + assert.deepEqual( + encodeCommand(['א', 'ת']), + ['*2\r\n$2\r\nא\r\n$2\r\nת\r\n'] + ); + }); + + it('4 bytes', () => { + assert.deepEqual( + [...encodeCommand(['🐣', '🐤'])], + ['*2\r\n$4\r\n🐣\r\n$4\r\n🐤\r\n'] + ); + }); + + it('buffer', () => { + assert.deepEqual( + encodeCommand([Buffer.from('string')]), + ['*1\r\n$6\r\n', Buffer.from('string'), '\r\n'] + ); + }); +}); diff --git a/packages/client/lib/client/RESP2/encoder.ts b/packages/client/lib/client/RESP2/encoder.ts new file mode 100644 index 00000000000..be48348a356 --- /dev/null +++ b/packages/client/lib/client/RESP2/encoder.ts @@ -0,0 +1,30 @@ +import { RedisCommandArgument, RedisCommandArguments } from '../../commands'; + +const CRLF = '\r\n'; + +export default function encodeCommand(args: RedisCommandArguments): Array { + const toWrite: Array = []; + + let strings = `*${args.length}${CRLF}`; + + for (let i = 0; i < args.length; i++) { + const arg = args[i]; + if (typeof arg === 'string') { + const byteLength = Buffer.byteLength(arg); + strings += `$${byteLength}${CRLF}`; + strings += arg; + } else if (arg instanceof Buffer) { + toWrite.push(`${strings}$${arg.length}${CRLF}`); + strings = ''; + toWrite.push(arg); + } else { + throw new TypeError('Invalid argument type'); + } + + strings += CRLF; + } + + toWrite.push(strings); + + return toWrite; +} diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index addc29e5afe..8cae914963e 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -1,11 +1,8 @@ import * as LinkedList from 'yallist'; -import { AbortError } from '../errors'; +import { AbortError, ErrorReply } from '../errors'; import { RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply } from '../commands'; - -// We need to use 'require', because it's not possible with Typescript to import -// classes that are exported as 'module.exports = class`, without esModuleInterop -// set to true. -const RedisParser = require('redis-parser'); +import RESP2Decoder from './RESP2/decoder'; +import encodeCommand from './RESP2/encoder'; export interface QueueCommandOptions { asap?: boolean; @@ -85,7 +82,6 @@ export default class RedisCommandsQueue { readonly #maxLength: number | null | undefined; readonly #waitingToBeSent = new LinkedList(); - readonly #waitingForReply = new LinkedList(); readonly #pubSubState = { @@ -104,45 +100,32 @@ export default class RedisCommandsQueue { pMessage: Buffer.from('pmessage'), subscribe: Buffer.from('subscribe'), pSubscribe: Buffer.from('psubscribe'), - unsubscribe: Buffer.from('unsunscribe'), + unsubscribe: Buffer.from('unsubscribe'), pUnsubscribe: Buffer.from('punsubscribe') }; - readonly #parser = new RedisParser({ - returnReply: (reply: unknown) => { - if (this.#pubSubState.isActive && Array.isArray(reply)) { - if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) { - return RedisCommandsQueue.#emitPubSubMessage( - this.#pubSubState.listeners.channels, - reply[2], - reply[1] - ); - } else if (RedisCommandsQueue.#PUB_SUB_MESSAGES.pMessage.equals(reply[0])) { - return RedisCommandsQueue.#emitPubSubMessage( - this.#pubSubState.listeners.patterns, - reply[3], - reply[2], - reply[1] - ); - } else if ( - RedisCommandsQueue.#PUB_SUB_MESSAGES.subscribe.equals(reply[0]) || - RedisCommandsQueue.#PUB_SUB_MESSAGES.pSubscribe.equals(reply[0]) || - RedisCommandsQueue.#PUB_SUB_MESSAGES.unsubscribe.equals(reply[0]) || - RedisCommandsQueue.#PUB_SUB_MESSAGES.pUnsubscribe.equals(reply[0]) - ) { - if (--this.#waitingForReply.head!.value.channelsCounter! === 0) { - this.#shiftWaitingForReply().resolve(); - } - return; - } - } + #chainInExecution: symbol | undefined; - this.#shiftWaitingForReply().resolve(reply); + #decoder = new RESP2Decoder({ + returnStringsAsBuffers: () => { + return !!this.#waitingForReply.head?.value.returnBuffers || + this.#pubSubState.isActive; }, - returnError: (err: Error) => this.#shiftWaitingForReply().reject(err) - }); + onReply: reply => { + if (this.#handlePubSubReply(reply)) { + return; + } else if (!this.#waitingForReply.length) { + throw new Error('Got an unexpected reply from Redis'); + } - #chainInExecution: symbol | undefined; + const { resolve, reject } = this.#waitingForReply.shift()!; + if (reply instanceof ErrorReply) { + reject(reply); + } else { + resolve(reply); + } + } + }); constructor(maxLength: number | null | undefined) { this.#maxLength = maxLength; @@ -257,9 +240,11 @@ export default class RedisCommandsQueue { listeners.delete(channel); } } + if (!channelsToUnsubscribe.length) { return Promise.resolve(); } + return this.#pushPubSubCommand(command, channelsToUnsubscribe); } @@ -342,42 +327,67 @@ export default class RedisCommandsQueue { getCommandToSend(): RedisCommandArguments | undefined { const toSend = this.#waitingToBeSent.shift(); - if (toSend) { - this.#waitingForReply.push({ - resolve: toSend.resolve, - reject: toSend.reject, - channelsCounter: toSend.channelsCounter, - returnBuffers: toSend.returnBuffers - }); + if (!toSend) return; + + let encoded: RedisCommandArguments; + try { + encoded = encodeCommand(toSend.args); + } catch (err) { + toSend.reject(err); + return; } - this.#chainInExecution = toSend?.chainId; - return toSend?.args; + + this.#waitingForReply.push({ + resolve: toSend.resolve, + reject: toSend.reject, + channelsCounter: toSend.channelsCounter, + returnBuffers: toSend.returnBuffers + }); + this.#chainInExecution = toSend.chainId; + return encoded; } - #setReturnBuffers() { - this.#parser.setReturnBuffers( - !!this.#waitingForReply.head?.value.returnBuffers || - !!this.#pubSubState.isActive - ); + rejectLastCommand(err: unknown): void { + this.#waitingForReply.pop()!.reject(err); } - parseResponse(data: Buffer): void { - this.#setReturnBuffers(); - this.#parser.execute(data); + onReplyChunk(chunk: Buffer): void { + this.#decoder.write(chunk); } - #shiftWaitingForReply(): CommandWaitingForReply { - if (!this.#waitingForReply.length) { - throw new Error('Got an unexpected reply from Redis'); + #handlePubSubReply(reply: any): boolean { + if (!this.#pubSubState.isActive || !Array.isArray(reply)) return false; + + if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) { + RedisCommandsQueue.#emitPubSubMessage( + this.#pubSubState.listeners.channels, + reply[2], + reply[1] + ); + } else if (RedisCommandsQueue.#PUB_SUB_MESSAGES.pMessage.equals(reply[0])) { + RedisCommandsQueue.#emitPubSubMessage( + this.#pubSubState.listeners.patterns, + reply[3], + reply[2], + reply[1] + ); + } else if ( + RedisCommandsQueue.#PUB_SUB_MESSAGES.subscribe.equals(reply[0]) || + RedisCommandsQueue.#PUB_SUB_MESSAGES.pSubscribe.equals(reply[0]) || + RedisCommandsQueue.#PUB_SUB_MESSAGES.unsubscribe.equals(reply[0]) || + RedisCommandsQueue.#PUB_SUB_MESSAGES.pUnsubscribe.equals(reply[0]) + ) { + if (--this.#waitingForReply.head!.value.channelsCounter! === 0) { + this.#waitingForReply.shift()!.resolve(); + } } - const waitingForReply = this.#waitingForReply.shift()!; - this.#setReturnBuffers(); - return waitingForReply; + return true; } flushWaitingForReply(err: Error): void { - this.#parser.reset(); + this.#decoder.reset(); + this.#pubSubState.isActive = false; RedisCommandsQueue.#flushQueue(this.#waitingForReply, err); if (!this.#chainInExecution) return; diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 09b974c910b..a6b924d42ac 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -348,7 +348,7 @@ describe('Client', () => { testUtils.testWithClient('undefined and null should not break the client', async client => { await assert.rejects( client.sendCommand([null as any, undefined as any]), - 'ERR unknown command ``, with args beginning with: ``' + TypeError ); assert.equal( diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index 25535e0728e..242c590cc80 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -9,7 +9,7 @@ import { CommandOptions, commandOptions, isCommandOptions } from '../command-opt import { ScanOptions, ZMember } from '../commands/generic-transformers'; import { ScanCommandOptions } from '../commands/SCAN'; import { HScanTuple } from '../commands/HSCAN'; -import { extendWithCommands, extendWithModulesAndScripts, transformCommandArguments, transformCommandReply } from '../commander'; +import { extendWithCommands, extendWithModulesAndScripts, transformCommandArguments, transformCommandReply, transformLegacyCommandArguments } from '../commander'; import { Pool, Options as PoolOptions, createPool } from 'generic-pool'; import { ClientClosedError, DisconnectsClientError } from '../errors'; import { URL } from 'url'; @@ -158,8 +158,8 @@ export default class RedisClient } readonly #options?: RedisClientOptions; - readonly #socket: RedisSocket; readonly #queue: RedisCommandsQueue; + readonly #socket: RedisSocket; readonly #isolationPool: Pool>; readonly #v4: Record = {}; #selectedDB = 0; @@ -183,8 +183,8 @@ export default class RedisClient constructor(options?: RedisClientOptions) { super(); this.#options = this.#initiateOptions(options); - this.#socket = this.#initiateSocket(); this.#queue = this.#initiateQueue(); + this.#socket = this.#initiateSocket(); this.#isolationPool = createPool({ create: async () => { const duplicate = this.duplicate({ @@ -215,6 +215,10 @@ export default class RedisClient return options; } + #initiateQueue(): RedisCommandsQueue { + return new RedisCommandsQueue(this.#options?.commandsQueueMaxLength); + } + #initiateSocket(): RedisSocket { const socketInitiator = async (): Promise => { const promises = []; @@ -270,7 +274,7 @@ export default class RedisClient }; return new RedisSocket(socketInitiator, this.#options?.socket) - .on('data', data => this.#queue.parseResponse(data)) + .on('data', chunk => this.#queue.onReplyChunk(chunk)) .on('error', err => { this.emit('error', err); if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { @@ -289,10 +293,6 @@ export default class RedisClient .on('end', () => this.emit('end')); } - #initiateQueue(): RedisCommandsQueue { - return new RedisCommandsQueue(this.#options?.commandsQueueMaxLength); - } - #legacyMode(): void { if (!this.#options?.legacyMode) return; @@ -303,7 +303,7 @@ export default class RedisClient callback = args.pop() as ClientLegacyCallback; } - this.#sendCommand(args.flat()) + this.#sendCommand(transformLegacyCommandArguments(args)) .then((reply: RedisCommandRawReply) => { if (!callback) return; diff --git a/packages/client/lib/client/multi-command.ts b/packages/client/lib/client/multi-command.ts index 7b28637d676..a9409075caa 100644 --- a/packages/client/lib/client/multi-command.ts +++ b/packages/client/lib/client/multi-command.ts @@ -1,7 +1,7 @@ import COMMANDS from './commands'; import { RedisCommand, RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands'; import RedisMultiCommand, { RedisMultiQueuedCommand } from '../multi-command'; -import { extendWithCommands, extendWithModulesAndScripts } from '../commander'; +import { extendWithCommands, extendWithModulesAndScripts, transformLegacyCommandArguments } from '../commander'; import { ExcludeMappedString } from '.'; type RedisClientMultiCommandSignature = @@ -54,7 +54,7 @@ export default class RedisClientMultiCommand { #legacyMode(): void { this.v4.addCommand = this.addCommand.bind(this); (this as any).addCommand = (...args: Array): this => { - this.#multi.addCommand(args.flat()); + this.#multi.addCommand(transformLegacyCommandArguments(args)); return this; }; this.v4.exec = this.exec.bind(this); diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index b04950a0724..224eb3fa886 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -1,7 +1,6 @@ import { EventEmitter } from 'events'; import * as net from 'net'; import * as tls from 'tls'; -import { encodeCommand } from '../commander'; import { RedisCommandArguments } from '../commands'; import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, ReconnectStrategyError } from '../errors'; import { promiseTimeout } from '../utils'; @@ -157,7 +156,7 @@ export default class RedisSocket extends EventEmitter { this.#writableNeedDrain = false; this.emit('drain'); }) - .on('data', (data: Buffer) => this.emit('data', data)); + .on('data', data => this.emit('data', data)); resolve(socket); }); @@ -192,7 +191,7 @@ export default class RedisSocket extends EventEmitter { throw new ClientClosedError(); } - for (const toWrite of encodeCommand(args)) { + for (const toWrite of args) { this.#writableNeedDrain = !this.#socket.write(toWrite); } } diff --git a/packages/client/lib/commander.spec.ts b/packages/client/lib/commander.spec.ts deleted file mode 100644 index f0690f37369..00000000000 --- a/packages/client/lib/commander.spec.ts +++ /dev/null @@ -1,36 +0,0 @@ -import { strict as assert } from 'assert'; -import { describe } from 'mocha'; -import { encodeCommand } from './commander'; - - -describe('Commander', () => { - describe('encodeCommand (see #1628)', () => { - it('1 byte', () => { - assert.deepEqual( - [...encodeCommand(['a', 'z'])], - ['*2\r\n$1\r\na\r\n$1\r\nz\r\n'] - ); - }); - - it('2 bytes', () => { - assert.deepEqual( - [...encodeCommand(['א', 'ת'])], - ['*2\r\n$2\r\nא\r\n$2\r\nת\r\n'] - ); - }); - - it('4 bytes', () => { - assert.deepEqual( - [...encodeCommand(['🐣', '🐤'])], - ['*2\r\n$4\r\n🐣\r\n$4\r\n🐤\r\n'] - ); - }); - - it('with a buffer', () => { - assert.deepEqual( - [...encodeCommand([Buffer.from('string')])], - ['*1\r\n$6\r\n', Buffer.from('string'), '\r\n'] - ); - }); - }); -}); diff --git a/packages/client/lib/commander.ts b/packages/client/lib/commander.ts index b3dfff0a99f..d70435e14ad 100644 --- a/packages/client/lib/commander.ts +++ b/packages/client/lib/commander.ts @@ -1,6 +1,6 @@ import { CommandOptions, isCommandOptions } from './command-options'; -import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisCommands, RedisModules, RedisScript, RedisScripts } from './commands'; +import { RedisCommand, RedisCommandArguments, RedisCommandRawReply, RedisCommandReply, RedisCommands, RedisModules, RedisScript, RedisScripts } from './commands'; type Instantiable = new(...args: Array) => T; @@ -89,37 +89,8 @@ export function transformCommandArguments( }; } -const DELIMITER = '\r\n'; - -export function* encodeCommand(args: RedisCommandArguments): IterableIterator { - let strings = `*${args.length}${DELIMITER}`, - stringsLength = 0; - for (const arg of args) { - if (Buffer.isBuffer(arg)) { - yield `${strings}$${arg.length}${DELIMITER}`; - strings = ''; - stringsLength = 0; - yield arg; - } else { - const string = arg?.toString?.() ?? '', - byteLength = Buffer.byteLength(string); - strings += `$${byteLength}${DELIMITER}`; - - const totalLength = stringsLength + byteLength; - if (totalLength > 1024) { - yield strings; - strings = string; - stringsLength = byteLength; - } else { - strings += string; - stringsLength = totalLength; - } - } - - strings += DELIMITER; - } - - yield strings; +export function transformLegacyCommandArguments(args: Array): Array { + return args.flat().map(x => x?.toString?.()); } export function transformCommandReply( diff --git a/packages/client/lib/errors.ts b/packages/client/lib/errors.ts index 01dff992290..3f3b9624987 100644 --- a/packages/client/lib/errors.ts +++ b/packages/client/lib/errors.ts @@ -50,3 +50,10 @@ export class ReconnectStrategyError extends Error { this.socketError = socketError; } } + +export class ErrorReply extends Error { + constructor(message: string) { + super(message); + this.stack = undefined; + } +} diff --git a/packages/client/lib/test-utils.ts b/packages/client/lib/test-utils.ts index 321a9da63d5..fbed7698896 100644 --- a/packages/client/lib/test-utils.ts +++ b/packages/client/lib/test-utils.ts @@ -44,6 +44,6 @@ export async function waitTillBeenCalled(spy: SinonSpy): Promise { throw new Error('Waiting for more than 1 second'); } - await promiseTimeout(1); + await promiseTimeout(50); } while (spy.callCount === calls); } diff --git a/packages/client/package.json b/packages/client/package.json index 7ce11bb6a6a..c56d8d91fd8 100644 --- a/packages/client/package.json +++ b/packages/client/package.json @@ -16,14 +16,12 @@ "dependencies": { "cluster-key-slot": "1.1.0", "generic-pool": "3.8.2", - "redis-parser": "3.0.0", "yallist": "4.0.0" }, "devDependencies": { "@istanbuljs/nyc-config-typescript": "^1.0.2", "@node-redis/test-utils": "*", "@types/node": "^17.0.23", - "@types/redis-parser": "^3.0.0", "@types/sinon": "^10.0.11", "@types/yallist": "^4.0.1", "@typescript-eslint/eslint-plugin": "^5.19.0", diff --git a/packages/client/tsconfig.json b/packages/client/tsconfig.json index 5e044cbaa1e..3271cf400a2 100644 --- a/packages/client/tsconfig.json +++ b/packages/client/tsconfig.json @@ -11,6 +11,9 @@ "./lib/test-utils.ts", "./lib/**/*.spec.ts" ], + "ts-node": { + "transpileOnly": true + }, "typedocOptions": { "entryPoints": [ "./index.ts",