Skip to content

Commit

Permalink
fix(connection): better handling of attached listeners
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Nov 8, 2023
1 parent 4828b7e commit 02474ad
Showing 1 changed file with 41 additions and 19 deletions.
60 changes: 41 additions & 19 deletions src/classes/redis-connection.ts
Expand Up @@ -5,6 +5,8 @@ import { default as IORedis } from 'ioredis';
import { CONNECTION_CLOSED_ERROR_MSG } from 'ioredis/built/utils';
import { ConnectionOptions, RedisOptions, RedisClient } from '../interfaces';
import {
decreaseMaxListeners,
increaseMaxListeners,
isNotConnectionError,
isRedisCluster,
isRedisInstance,
Expand Down Expand Up @@ -136,28 +138,38 @@ export class RedisConnection extends EventEmitter {
throw new Error(CONNECTION_CLOSED_ERROR_MSG);
}

return new Promise<void>((resolve, reject) => {
let lastError: Error;
const errorHandler = (err: Error) => {
lastError = err;
};
let handleReady: () => void;
let handleEnd: () => void;
let handleError: (e: Error) => void;
try {
await new Promise<void>((resolve, reject) => {
let lastError: Error;

const handleReady = () => {
client.removeListener('end', endHandler);
client.removeListener('error', errorHandler);
resolve();
};
handleError = (err: Error) => {
lastError = err;
};

const endHandler = () => {
client.removeListener('ready', handleReady);
client.removeListener('error', errorHandler);
reject(lastError || new Error(CONNECTION_CLOSED_ERROR_MSG));
};
handleReady = () => {
resolve();
};

handleEnd = () => {
reject(lastError || new Error(CONNECTION_CLOSED_ERROR_MSG));
};

increaseMaxListeners(client, 3);

client.once('ready', handleReady);
client.on('end', handleEnd);
client.once('error', handleError);
});
} finally {
client.removeListener('end', handleEnd);
client.removeListener('error', handleError);
client.removeListener('ready', handleReady);

client.once('ready', handleReady);
client.on('end', endHandler);
client.once('error', errorHandler);
});
decreaseMaxListeners(client, 3);
}
}

get client(): Promise<RedisClient> {
Expand All @@ -183,6 +195,8 @@ export class RedisConnection extends EventEmitter {
this._client = new IORedis(this.opts);
}

increaseMaxListeners(this._client, 3);

this._client.on('error', this.handleClientError);
// ioredis treats connection errors as a different event ('close')
this._client.on('close', this.handleClientClose);
Expand Down Expand Up @@ -232,6 +246,8 @@ export class RedisConnection extends EventEmitter {
}

const disconnecting = new Promise<void>((resolve, reject) => {
increaseMaxListeners(client, 2);

client.once('end', resolve);
client.once('error', reject);
_resolve = resolve;
Expand All @@ -243,6 +259,8 @@ export class RedisConnection extends EventEmitter {
try {
await disconnecting;
} finally {
decreaseMaxListeners(client, 2);

client.removeListener('end', _resolve);
client.removeListener('error', _reject);
}
Expand Down Expand Up @@ -270,6 +288,10 @@ export class RedisConnection extends EventEmitter {
this._client.off('error', this.handleClientError);
this._client.off('close', this.handleClientClose);
this._client.off('ready', this.handleClientReady);

decreaseMaxListeners(this._client, 3);

this.removeAllListeners();
}
}
}
Expand Down

0 comments on commit 02474ad

Please sign in to comment.