Skip to content

Commit

Permalink
improve encode performance
Browse files Browse the repository at this point in the history
  • Loading branch information
leibale committed Feb 10, 2022
1 parent f8dacb7 commit 1331698
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 42 deletions.
6 changes: 3 additions & 3 deletions packages/client/lib/client/RESP2/encoder.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import encodeCommand from './encoder';
describe('RESP2 Encoder', () => {
it('1 byte', () => {
assert.deepEqual(
[...encodeCommand(['a', 'z'])],
encodeCommand(['a', 'z']),
['*2\r\n$1\r\na\r\n$1\r\nz\r\n']
);
});

it('2 bytes', () => {
assert.deepEqual(
[...encodeCommand(['א', 'ת'])],
encodeCommand(['א', 'ת']),
['*2\r\n$2\r\nא\r\n$2\r\nת\r\n']
);
});
Expand All @@ -26,7 +26,7 @@ describe('RESP2 Encoder', () => {

it('buffer', () => {
assert.deepEqual(
[...encodeCommand([Buffer.from('string')])],
encodeCommand([Buffer.from('string')]),
['*1\r\n$6\r\n', Buffer.from('string'), '\r\n']
);
});
Expand Down
37 changes: 16 additions & 21 deletions packages/client/lib/client/RESP2/encoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,29 @@ import { RedisCommandArgument, RedisCommandArguments } from '../../commands';

const CRLF = '\r\n';

export default function* encodeCommand(args: RedisCommandArguments): IterableIterator<RedisCommandArgument> {
let strings = `*${args.length}${CRLF}`,
stringsLength = 0;
export default function encodeCommand(args: RedisCommandArguments): Array<RedisCommandArgument> {
const toWrite: Array<RedisCommandArgument> = [];

let strings = `*${args.length}${CRLF}`;

for (let i = 0; i < args.length; i++) {
const arg = args[i];
if (Buffer.isBuffer(arg)) {
yield `${strings}$${arg.length}${CRLF}`;
if (typeof arg === 'string') {
const byteLength = Buffer.byteLength(arg);
strings += `$${byteLength}${CRLF}`;
strings += arg;
} else if (arg instanceof Buffer) {
toWrite.push(`${strings}$${arg.length}${CRLF}`);
strings = '';
stringsLength = 0;
yield arg;
toWrite.push(arg);
} else {
const string = arg?.toString?.() ?? '',
byteLength = Buffer.byteLength(string);
strings += `$${byteLength}${CRLF}`;

const totalLength = stringsLength + byteLength;
if (totalLength > 1024) {
yield strings;
strings = string;
stringsLength = byteLength;
} else {
strings += string;
stringsLength = totalLength;
}
throw new TypeError('Invalid argument type');
}

strings += CRLF;
}

yield strings;
toWrite.push(strings);

return toWrite;
}
34 changes: 23 additions & 11 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import * as LinkedList from 'yallist';
import { AbortError, ErrorReply } from '../errors';
import { RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply } from '../commands';
import { RedisCommand, RedisCommandArgument, RedisCommandArguments, RedisCommandRawReply } from '../commands';
import RESP2Decoder from './RESP2/decoder';
import encodeCommand from './RESP2/encoder';

export interface QueueCommandOptions {
asap?: boolean;
Expand Down Expand Up @@ -337,17 +338,28 @@ export default class RedisCommandsQueue {

getCommandToSend(): RedisCommandArguments | undefined {
const toSend = this.#waitingToBeSent.shift();
if (toSend) {
this.#waitingForReply.push({
args: toSend.args,
resolve: toSend.resolve,
reject: toSend.reject,
channelsCounter: toSend.channelsCounter,
returnBuffers: toSend.returnBuffers
} as any);
if (!toSend) return;

let encoded: RedisCommandArguments;
try {
encoded = encodeCommand(toSend.args);
} catch (err) {
toSend.reject(err);
return;
}
this.#chainInExecution = toSend?.chainId;
return toSend?.args;

this.#waitingForReply.push({
resolve: toSend.resolve,
reject: toSend.reject,
channelsCounter: toSend.channelsCounter,
returnBuffers: toSend.returnBuffers
});
this.#chainInExecution = toSend.chainId;
return encoded;
}

rejectLastCommand(err: unknown): void {
this.#waitingForReply.pop()!.reject(err);
}

onReplyChunk(chunk: Buffer): void {
Expand Down
2 changes: 1 addition & 1 deletion packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ describe('Client', () => {
testUtils.testWithClient('undefined and null should not break the client', async client => {
await assert.rejects(
client.sendCommand([null as any, undefined as any]),
'ERR unknown command ``, with args beginning with: ``'
TypeError
);

assert.equal(
Expand Down
4 changes: 2 additions & 2 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CommandOptions, commandOptions, isCommandOptions } from '../command-opt
import { ScanOptions, ZMember } from '../commands/generic-transformers';
import { ScanCommandOptions } from '../commands/SCAN';
import { HScanTuple } from '../commands/HSCAN';
import { extendWithCommands, extendWithModulesAndScripts, transformCommandArguments, transformCommandReply } from '../commander';
import { extendWithCommands, extendWithModulesAndScripts, transformCommandArguments, transformCommandReply, transformLegacyCommandArguments } from '../commander';
import { Pool, Options as PoolOptions, createPool } from 'generic-pool';
import { ClientClosedError, DisconnectsClientError, AuthError } from '../errors';
import { URL } from 'url';
Expand Down Expand Up @@ -304,7 +304,7 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
callback = args.pop() as ClientLegacyCallback;
}

this.#sendCommand(args.flat())
this.#sendCommand(transformLegacyCommandArguments(args))
.then((reply: RedisCommandRawReply) => {
if (!callback) return;

Expand Down
4 changes: 2 additions & 2 deletions packages/client/lib/client/multi-command.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import COMMANDS from './commands';
import { RedisCommand, RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisPlugins, RedisScript, RedisScripts } from '../commands';
import RedisMultiCommand, { RedisMultiQueuedCommand } from '../multi-command';
import { extendWithCommands, extendWithModulesAndScripts } from '../commander';
import { extendWithCommands, extendWithModulesAndScripts, transformLegacyCommandArguments } from '../commander';
import { ExcludeMappedString } from '.';

type RedisClientMultiCommandSignature<C extends RedisCommand, M extends RedisModules, S extends RedisScripts> =
Expand Down Expand Up @@ -54,7 +54,7 @@ export default class RedisClientMultiCommand {
#legacyMode(): void {
this.v4.addCommand = this.addCommand.bind(this);
(this as any).addCommand = (...args: Array<any>): this => {
this.#multi.addCommand(args.flat());
this.#multi.addCommand(transformLegacyCommandArguments(args));
return this;
};
this.v4.exec = this.exec.bind(this);
Expand Down
4 changes: 2 additions & 2 deletions packages/client/lib/client/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as tls from 'tls';
import { RedisCommandArguments } from '../commands';
import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyError, AuthError, ReconnectStrategyError } from '../errors';
import { promiseTimeout } from '../utils';
import encodeCommand from './RESP2/encoder';

export interface RedisSocketCommonOptions {
connectTimeout?: number;
noDelay?: boolean;
Expand Down Expand Up @@ -222,7 +222,7 @@ export default class RedisSocket extends EventEmitter {
throw new ClientClosedError();
}

for (const toWrite of encodeCommand(args)) {
for (const toWrite of args) {
this.#writableNeedDrain = !this.#socket.write(toWrite);
}
}
Expand Down
4 changes: 4 additions & 0 deletions packages/client/lib/commander.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ export function transformCommandArguments<T>(
};
}

export function transformLegacyCommandArguments(args: Array<any>): Array<any> {
return args.flat().map(x => x?.toString?.());
}

export function transformCommandReply(
command: RedisCommand,
rawReply: RedisCommandRawReply,
Expand Down

0 comments on commit 1331698

Please sign in to comment.