Skip to content

Commit

Permalink
fix: connection hangs with failed connection fixes #656
Browse files Browse the repository at this point in the history
  • Loading branch information
10xjs authored and manast committed Jul 24, 2021
1 parent a4984ee commit c465611
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 35 deletions.
59 changes: 36 additions & 23 deletions src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ import { load } from '../commands';
import { ConnectionOptions, RedisOptions } from '../interfaces';
import { isRedisInstance } from '../utils';

/**
* See https://github.com/luin/ioredis/blob/d65f8b2/lib/utils/index.ts#L338
*/
const CONNECTION_CLOSED_ERROR_MSG = 'Connection is closed.';

export type RedisClient = Redis | Cluster;

export class RedisConnection extends EventEmitter {
Expand Down Expand Up @@ -48,29 +53,31 @@ export class RedisConnection extends EventEmitter {
* @param {Redis} redis client
*/
static async waitUntilReady(client: RedisClient) {
return new Promise<void>(function(resolve, reject) {
if (client.status === 'ready') {
if (client.status === 'ready') {
return;
}

if (client.status === 'wait') {
return client.connect();
}

if (client.status === 'end') {
throw new Error(CONNECTION_CLOSED_ERROR_MSG);
}

return new Promise<void>((resolve, reject) => {
const handleReady = () => {
client.removeListener('end', endHandler);
resolve();
} else {
const handleError = function(err: NodeJS.ErrnoException) {
if (err['code'] !== 'ECONNREFUSED') {
client.removeListener('ready', handleReady);
reject(err);
}
};

const handleReady = async function() {
client.removeListener('error', handleError);
resolve();
};

client.once('ready', handleReady);
client.once('error', handleError);

if (client.status === 'wait') {
client.connect();
}
}
};

const endHandler = () => {
client.removeListener('ready', handleReady);
reject(new Error(CONNECTION_CLOSED_ERROR_MSG));
};

client.once('ready', handleReady);
client.once('end', endHandler);
});
}

Expand Down Expand Up @@ -130,7 +137,13 @@ export class RedisConnection extends EventEmitter {
if (!this.closing) {
this.closing = true;
if (this.opts != this._client) {
await this._client.quit();
try {
await this._client.quit();
} catch (error) {
if (error.message !== CONNECTION_CLOSED_ERROR_MSG) {
throw error;
}
}
} else {
this._client.off('error', this.handleClientError);
}
Expand Down
49 changes: 37 additions & 12 deletions src/test/test_connection.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import * as IORedis from 'ioredis';
import { Queue, Job, Worker } from '../classes';
import { v4 } from 'uuid';
import { expect } from 'chai';
import * as chai from 'chai';
import * as chaiAsPromised from 'chai-as-promised';
import { removeAllQueueData } from '../utils';

chai.use(chaiAsPromised);
const expect = chai.expect;

describe('connection', () => {
let queue: Queue;
let queueName: string;
Expand Down Expand Up @@ -148,20 +152,41 @@ describe('connection', () => {
});
*/

it.skip('should fail if redis connection fails', async () => {
it('should fail if redis connection fails', async () => {
const queueFail = new Queue('connection fail port', {
connection: { port: 1234, host: '127.0.0.1' },
connection: { port: 1234, host: '127.0.0.1', retryStrategy: () => null },
});

return new Promise(async (resolve, reject) => {
try {
await queueFail.waitUntilReady();
reject(new Error('Did not fail connecting to invalid redis instance'));
} catch (err) {
expect(err.code).to.be.eql('ECONNREFUSED');
await queueFail.close();
resolve();
}
await expect(queueFail.waitUntilReady()).to.be.eventually.rejectedWith(
'Connection is closed.',
);
});

it('should close if connection has failed', async () => {
const queueFail = new Queue('connection fail port', {
connection: { port: 1234, host: '127.0.0.1', retryStrategy: () => null },
});

await expect(queueFail.waitUntilReady()).to.be.eventually.rejectedWith(
'Connection is closed.',
);

await expect(queueFail.close()).to.be.eventually.equal(undefined);
});

it('should close if connection is failing', async () => {
const queueFail = new Queue('connection fail port', {
connection: {
port: 1234,
host: '127.0.0.1',
retryStrategy: times => (times === 0 ? 10 : null),
},
});

await expect(queueFail.close()).to.be.eventually.equal(undefined);

await expect(queueFail.waitUntilReady()).to.be.eventually.rejectedWith(
'Connection is closed.',
);
});
});

0 comments on commit c465611

Please sign in to comment.