Skip to content

Commit b80afc6

Browse files
committed
fix client.quit, add error events on cluster, fix some "deepsource.io" warnings
1 parent e421dc4 commit b80afc6

File tree

9 files changed

+86
-85
lines changed

9 files changed

+86
-85
lines changed

lib/client.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,10 @@ export default class RedisClient<M extends RedisModules = RedisModules, S extend
298298
}
299299

300300
QUIT(): Promise<void> {
301-
return this.#socket.quit(async () => {
302-
this.#queue.addEncodedCommand(encodeCommand(['QUIT']));
301+
return this.#socket.quit(() => {
302+
const promise = this.#queue.addEncodedCommand(encodeCommand(['QUIT']));
303303
this.#tick();
304+
return promise;
304305
});
305306
}
306307

lib/cluster-slots.ts

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,54 +17,40 @@ interface SlotNodes<M extends RedisModules, S extends RedisLuaScripts> {
1717
clientIterator: IterableIterator<RedisClientType<M, S>> | undefined;
1818
}
1919

20+
type OnError = (err: unknown) => void;
21+
2022
export default class RedisClusterSlots<M extends RedisModules, S extends RedisLuaScripts> {
2123
readonly #options: RedisClusterOptions;
24+
readonly #onError: OnError;
2225
readonly #nodeByUrl = new Map<string, ClusterNode<M, S>>();
2326
readonly #slots: Array<SlotNodes<M, S>> = [];
2427

25-
constructor(options: RedisClusterOptions) {
28+
constructor(options: RedisClusterOptions, onError: OnError) {
2629
this.#options = options;
30+
this.#onError = onError;
2731
}
2832

2933
async connect(): Promise<void> {
3034
for (const rootNode of this.#options.rootNodes) {
31-
try {
32-
await this.#discoverNodes(rootNode);
33-
return;
34-
} catch (err) {
35-
console.error(err);
36-
// this.emit('error', err);
37-
}
35+
if (await this.#discoverNodes(rootNode)) return;
3836
}
3937

4038
throw new Error('None of the root nodes is available');
4139
}
4240

4341
async discover(startWith: RedisClientType<M, S>): Promise<void> {
44-
try {
45-
await this.#discoverNodes(startWith.options?.socket);
46-
return;
47-
} catch (err) {
48-
console.error(err);
49-
// this.emit('error', err);
50-
}
42+
if (await this.#discoverNodes(startWith.options?.socket)) return;
5143

5244
for (const { client } of this.#nodeByUrl.values()) {
5345
if (client === startWith) continue;
54-
55-
try {
56-
await this.#discoverNodes(client.options?.socket);
57-
return;
58-
} catch (err) {
59-
console.error(err);
60-
// this.emit('error', err);
61-
}
46+
47+
if (await this.#discoverNodes(client.options?.socket)) return;
6248
}
6349

6450
throw new Error('None of the cluster nodes is available');
6551
}
6652

67-
async #discoverNodes(socketOptions?: RedisSocketOptions): Promise<void> {
53+
async #discoverNodes(socketOptions?: RedisSocketOptions): Promise<boolean> {
6854
const client = RedisClient.create({
6955
socket: socketOptions
7056
});
@@ -73,8 +59,14 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisLu
7359

7460
try {
7561
await this.#reset(await client.clusterNodes());
62+
return true;
63+
} catch (err) {
64+
this.#onError(err);
65+
return false;
7666
} finally {
77-
await client.disconnect(); // TODO: catch error from disconnect?
67+
if (client.isOpen) {
68+
await client.disconnect();
69+
}
7870
}
7971
}
8072

@@ -102,7 +94,6 @@ export default class RedisClusterSlots<M extends RedisModules, S extends RedisLu
10294
for (const [url, { client }] of this.#nodeByUrl.entries()) {
10395
if (clientsInUse.has(url)) continue;
10496

105-
// TODO: ignore error from `.disconnect`?
10697
promises.push(client.disconnect());
10798
this.#nodeByUrl.delete(url);
10899
}

lib/cluster.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import RedisClusterSlots, { ClusterNode } from './cluster-slots';
55
import { RedisLuaScript, RedisLuaScripts } from './lua-script';
66
import { extendWithModulesAndScripts, extendWithDefaultCommands, transformCommandArguments } from './commander';
77
import RedisMultiCommand, { MultiQueuedCommand, RedisMultiCommandType } from './multi-command';
8+
import { EventEmitter } from 'events';
89

910
export interface RedisClusterOptions<M = RedisModules, S = RedisLuaScripts> {
1011
rootNodes: Array<RedisSocketOptions>;
@@ -17,7 +18,7 @@ export interface RedisClusterOptions<M = RedisModules, S = RedisLuaScripts> {
1718
export type RedisClusterType<M extends RedisModules, S extends RedisLuaScripts> =
1819
WithPlugins<M, S> & RedisCluster;
1920

20-
export default class RedisCluster<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> {
21+
export default class RedisCluster<M extends RedisModules = RedisModules, S extends RedisLuaScripts = RedisLuaScripts> extends EventEmitter {
2122
static #extractFirstKey(command: RedisCommand, originalArgs: Array<unknown>, redisArgs: Array<string>): string | undefined {
2223
if (command.FIRST_KEY_INDEX === undefined) {
2324
return undefined;
@@ -83,8 +84,10 @@ export default class RedisCluster<M extends RedisModules = RedisModules, S exten
8384
readonly #Multi: new (...args: ConstructorParameters<typeof RedisMultiCommand>) => RedisMultiCommandType<M, S>;
8485

8586
constructor(options: RedisClusterOptions<M, S>) {
87+
super();
88+
8689
this.#options = options;
87-
this.#slots = new RedisClusterSlots(options);
90+
this.#slots = new RedisClusterSlots(options, err => this.emit('error', err));
8891
this.#Multi = RedisMultiCommand.extend(options);
8992
}
9093

lib/commander.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,12 @@ export function transformCommandArguments<T = unknown>(
9797
export function encodeCommand(args: Array<string>): string {
9898
const encoded = [
9999
`*${args.length}`,
100-
`$${Buffer.byteLength(args[0])}`,
100+
`$${Buffer.byteLength(args[0]).toString()}`,
101101
args[0]
102102
];
103103

104104
for (let i = 1; i < args.length; i++) {
105-
encoded.push(`$${Buffer.byteLength(args[i])}`, args[i]);
105+
encoded.push(`$${Buffer.byteLength(args[i]).toString()}`, args[i]);
106106
}
107107

108108
return encoded.join('\r\n') + '\r\n';

lib/commands-queue.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export interface QueueCommandOptions {
1212

1313
interface CommandWaitingToBeSent extends CommandWaitingForReply {
1414
encodedCommand: string;
15+
byteLength: number;
1516
chainId?: symbol;
1617
abort?: {
1718
signal: any; // TODO: `AbortSignal` type is incorrect
@@ -130,6 +131,7 @@ export default class RedisCommandsQueue {
130131
return new Promise((resolve, reject) => {
131132
const node = new LinkedList.Node<CommandWaitingToBeSent>({
132133
encodedCommand,
134+
byteLength: Buffer.byteLength(encodedCommand),
133135
chainId: options?.chainId,
134136
resolve,
135137
reject
@@ -156,7 +158,7 @@ export default class RedisCommandsQueue {
156158
this.#waitingToBeSent.pushNode(node);
157159
}
158160

159-
this.#waitingToBeSentCommandsLength += encodedCommand.length;
161+
this.#waitingToBeSentCommandsLength += node.value.byteLength;
160162
});
161163
}
162164

@@ -230,8 +232,12 @@ export default class RedisCommandsQueue {
230232
}
231233

232234
this.#pubSubState[inProgressKey] += channelsCounter;
235+
236+
const encodedCommand = encodeCommand(commandArgs),
237+
byteLength = Buffer.byteLength(encodedCommand);
233238
this.#waitingToBeSent.push({
234-
encodedCommand: encodeCommand(commandArgs),
239+
encodedCommand,
240+
byteLength,
235241
channelsCounter,
236242
resolve: () => {
237243
this.#pubSubState[inProgressKey] -= channelsCounter;
@@ -243,6 +249,7 @@ export default class RedisCommandsQueue {
243249
reject();
244250
}
245251
});
252+
this.#waitingToBeSentCommandsLength += byteLength;
246253
});
247254
}
248255

@@ -268,7 +275,7 @@ export default class RedisCommandsQueue {
268275
lastCommandChainId: symbol | undefined;
269276
for (const command of this.#waitingToBeSent) {
270277
encoded.push(command.encodedCommand);
271-
size += command.encodedCommand.length;
278+
size += command.byteLength;
272279
if (size > recommendedSize) {
273280
lastCommandChainId = command.chainId;
274281
break;

lib/commands/MIGRATE.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ export function transformArguments(
1919
isKeyString = typeof key === 'string';
2020

2121
if (isKeyString) {
22-
args.push(key as string);
22+
args.push(key);
2323
} else {
2424
args.push('""');
2525
}

lib/commands/generic-transformers.spec.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { strict as assert } from 'assert';
2-
import { isObject } from 'util';
32
import {
43
transformReplyBoolean,
54
transformReplyBooleanArray,

0 commit comments

Comments
 (0)