Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix socket error handlers #2092

Merged
merged 4 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 38 additions & 50 deletions packages/client/lib/client/commands-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,6 @@ interface PubSubListeners {

type PubSubListenersMap = Map<string, PubSubListeners>;

interface PubSubState {
subscribing: number;
subscribed: number;
unsubscribing: number;
listeners: {
channels: PubSubListenersMap;
patterns: PubSubListenersMap;
};
}

export default class RedisCommandsQueue {
static #flushQueue<T extends CommandWaitingForReply>(queue: LinkedList<T>, err: Error): void {
while (queue.length) {
Expand Down Expand Up @@ -98,7 +88,16 @@ export default class RedisCommandsQueue {

readonly #waitingForReply = new LinkedList<CommandWaitingForReply>();

#pubSubState: PubSubState | undefined;
readonly #pubSubState = {
isActive: false,
subscribing: 0,
subscribed: 0,
unsubscribing: 0,
listeners: {
channels: new Map(),
patterns: new Map()
}
};

static readonly #PUB_SUB_MESSAGES = {
message: Buffer.from('message'),
Expand All @@ -111,7 +110,7 @@ export default class RedisCommandsQueue {

readonly #parser = new RedisParser({
returnReply: (reply: unknown) => {
if (this.#pubSubState && Array.isArray(reply)) {
if (this.#pubSubState.isActive && Array.isArray(reply)) {
if (RedisCommandsQueue.#PUB_SUB_MESSAGES.message.equals(reply[0])) {
return RedisCommandsQueue.#emitPubSubMessage(
this.#pubSubState.listeners.channels,
Expand Down Expand Up @@ -150,7 +149,7 @@ export default class RedisCommandsQueue {
}

addCommand<T = RedisCommandRawReply>(args: RedisCommandArguments, options?: QueueCommandOptions): Promise<T> {
if (this.#pubSubState && !options?.ignorePubSubMode) {
if (this.#pubSubState.isActive && !options?.ignorePubSubMode) {
return Promise.reject(new Error('Cannot send commands in PubSub mode'));
} else if (this.#maxLength && this.#waitingToBeSent.length + this.#waitingForReply.length >= this.#maxLength) {
return Promise.reject(new Error('The queue is full'));
Expand Down Expand Up @@ -190,27 +189,16 @@ export default class RedisCommandsQueue {
});
}

#initiatePubSubState(): PubSubState {
return this.#pubSubState ??= {
subscribed: 0,
subscribing: 0,
unsubscribing: 0,
listeners: {
channels: new Map(),
patterns: new Map()
}
};
}

subscribe<T extends boolean>(
command: PubSubSubscribeCommands,
channels: RedisCommandArgument | Array<RedisCommandArgument>,
listener: PubSubListener<T>,
returnBuffers?: T
): Promise<void> {
const pubSubState = this.#initiatePubSubState(),
channelsToSubscribe: Array<RedisCommandArgument> = [],
listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ? pubSubState.listeners.channels : pubSubState.listeners.patterns;
const channelsToSubscribe: Array<RedisCommandArgument> = [],
listenersMap = command === PubSubSubscribeCommands.SUBSCRIBE ?
this.#pubSubState.listeners.channels :
this.#pubSubState.listeners.patterns;
for (const channel of (Array.isArray(channels) ? channels : [channels])) {
const channelString = typeof channel === 'string' ? channel : channel.toString();
let listeners = listenersMap.get(channelString);
Expand All @@ -230,6 +218,7 @@ export default class RedisCommandsQueue {
if (!channelsToSubscribe.length) {
return Promise.resolve();
}

return this.#pushPubSubCommand(command, channelsToSubscribe);
}

Expand All @@ -239,10 +228,6 @@ export default class RedisCommandsQueue {
listener?: PubSubListener<T>,
returnBuffers?: T
): Promise<void> {
if (!this.#pubSubState) {
return Promise.resolve();
}

const listeners = command === PubSubUnsubscribeCommands.UNSUBSCRIBE ?
this.#pubSubState.listeners.channels :
this.#pubSubState.listeners.patterns;
Expand Down Expand Up @@ -280,8 +265,7 @@ export default class RedisCommandsQueue {

#pushPubSubCommand(command: PubSubSubscribeCommands | PubSubUnsubscribeCommands, channels: number | Array<RedisCommandArgument>): Promise<void> {
return new Promise((resolve, reject) => {
const pubSubState = this.#initiatePubSubState(),
isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE,
const isSubscribe = command === PubSubSubscribeCommands.SUBSCRIBE || command === PubSubSubscribeCommands.PSUBSCRIBE,
inProgressKey = isSubscribe ? 'subscribing' : 'unsubscribing',
commandArgs: Array<RedisCommandArgument> = [command];

Expand All @@ -293,38 +277,42 @@ export default class RedisCommandsQueue {
channelsCounter = channels.length;
}

pubSubState[inProgressKey] += channelsCounter;
this.#pubSubState.isActive = true;
this.#pubSubState[inProgressKey] += channelsCounter;

this.#waitingToBeSent.push({
args: commandArgs,
channelsCounter,
returnBuffers: true,
resolve: () => {
pubSubState[inProgressKey] -= channelsCounter;
if (isSubscribe) {
pubSubState.subscribed += channelsCounter;
} else {
pubSubState.subscribed -= channelsCounter;
if (!pubSubState.subscribed && !pubSubState.subscribing && !pubSubState.subscribed) {
this.#pubSubState = undefined;
}
}
this.#pubSubState[inProgressKey] -= channelsCounter;
this.#pubSubState.subscribed += channelsCounter * (isSubscribe ? 1 : -1);
this.#updatePubSubActiveState();
resolve();
},
reject: err => {
pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1);
this.#pubSubState[inProgressKey] -= channelsCounter * (isSubscribe ? 1 : -1);
this.#updatePubSubActiveState();
reject(err);
}
});
});
}

resubscribe(): Promise<any> | undefined {
if (!this.#pubSubState) {
return;
#updatePubSubActiveState(): void {
if (
!this.#pubSubState.subscribed &&
!this.#pubSubState.subscribing &&
!this.#pubSubState.subscribed
) {
this.#pubSubState.isActive = false;
}
}

resubscribe(): Promise<any> | undefined {
this.#pubSubState.subscribed = 0;
this.#pubSubState.subscribing = 0;
this.#pubSubState.unsubscribing = 0;

const promises = [],
{ channels, patterns } = this.#pubSubState.listeners;
Expand Down Expand Up @@ -369,8 +357,7 @@ export default class RedisCommandsQueue {
#setReturnBuffers() {
this.#parser.setReturnBuffers(
!!this.#waitingForReply.head?.value.returnBuffers ||
!!this.#pubSubState?.subscribed ||
!!this.#pubSubState?.subscribing
!!this.#pubSubState.isActive
);
}

Expand All @@ -390,6 +377,7 @@ export default class RedisCommandsQueue {
}

flushWaitingForReply(err: Error): void {
this.#parser.reset();
RedisCommandsQueue.#flushQueue(this.#waitingForReply, err);

if (!this.#chainInExecution) return;
Expand Down
33 changes: 3 additions & 30 deletions packages/client/lib/client/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import testUtils, { GLOBAL, waitTillBeenCalled } from '../test-utils';
import RedisClient, { RedisClientType } from '.';
import { RedisClientMultiCommandType } from './multi-command';
import { RedisCommandArguments, RedisCommandRawReply, RedisModules, RedisScripts } from '../commands';
import { AbortError, AuthError, ClientClosedError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors';
import { AbortError, ClientClosedError, ConnectionTimeoutError, DisconnectsClientError, SocketClosedUnexpectedlyError, WatchError } from '../errors';
import { defineScript } from '../lua-script';
import { spy } from 'sinon';
import { once } from 'events';
Expand Down Expand Up @@ -87,30 +87,6 @@ describe('Client', () => {
);
}, GLOBAL.SERVERS.PASSWORD);

testUtils.testWithClient('should not retry connecting if failed due to wrong auth', async client => {
let message;
if (testUtils.isVersionGreaterThan([6, 2])) {
message = 'WRONGPASS invalid username-password pair or user is disabled.';
} else if (testUtils.isVersionGreaterThan([6])) {
message = 'WRONGPASS invalid username-password pair';
} else {
message = 'ERR invalid password';
}

await assert.rejects(
client.connect(),
new AuthError(message)
);

assert.equal(client.isOpen, false);
}, {
...GLOBAL.SERVERS.PASSWORD,
clientOptions: {
password: 'wrongpassword'
},
disableClientSetup: true
});

testUtils.testWithClient('should execute AUTH before SELECT', async client => {
assert.equal(
(await client.clientInfo()).db,
Expand Down Expand Up @@ -300,7 +276,8 @@ describe('Client', () => {
await client.multi()
.sAdd('a', ['b', 'c'])
.v4.exec(),
[2])
[2]
);
}, {
...GLOBAL.SERVERS.OPEN,
clientOptions: {
Expand Down Expand Up @@ -681,10 +658,6 @@ describe('Client', () => {
const listener = spy();
await subscriber.subscribe('channel', listener);

subscriber.on('error', err => {
console.error('subscriber err', err.message);
});

await Promise.all([
once(subscriber, 'error'),
publisher.clientKill({
Expand Down
6 changes: 2 additions & 4 deletions packages/client/lib/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { ScanCommandOptions } from '../commands/SCAN';
import { HScanTuple } from '../commands/HSCAN';
import { extendWithCommands, extendWithModulesAndScripts, transformCommandArguments, transformCommandReply } from '../commander';
import { Pool, Options as PoolOptions, createPool } from 'generic-pool';
import { ClientClosedError, DisconnectsClientError, AuthError } from '../errors';
import { ClientClosedError, DisconnectsClientError } from '../errors';
import { URL } from 'url';
import { TcpSocketConnectOpts } from 'net';

Expand Down Expand Up @@ -254,9 +254,7 @@ export default class RedisClient<M extends RedisModules, S extends RedisScripts>
password: this.#options.password ?? ''
}),
{ asap: true }
).catch(err => {
throw new AuthError(err.message);
})
)
);
}

Expand Down
11 changes: 7 additions & 4 deletions packages/client/lib/client/socket.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ describe('Socket', () => {
return time;
});

const socket = new RedisSocket(undefined, {
host: 'error',
reconnectStrategy
});
const socket = new RedisSocket(
() => Promise.resolve(),
{
host: 'error',
reconnectStrategy
}
);

socket.on('error', () => {
// ignore errors
Expand Down
Loading