Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Worker reconnection after explicit .close() call due to Redis worker reconnection #2552

Closed
1 task done
ScreaminSauce opened this issue May 1, 2024 · 3 comments
Closed
1 task done
Labels
bug Something isn't working

Comments

@ScreaminSauce
Copy link

ScreaminSauce commented May 1, 2024

Version

v5.7.7

Platform

NodeJS

What happened?

Using the current release, it appears that Worker reconnection logic keeps the ioredis client open (and reconnects) after the .close() method has been called on both the Queue and the Worker

v5.7.6 does not hang after test completion, leading me to believe the cause may be the updated ioredis client including PR #2543 (#2543)

How to reproduce.

import { Redis } from 'ioredis';
import { strict as assert } from "node:assert";
import { Queue, QueueEvents, Worker } from 'bullmq';
describe.only("BullMq Test", async function () {
    it.only("should fire a completed event with a job in the completed state!", function (done) {
        (async function () {
            const eventsInFiredOrder = [];
            const redisClient = new Redis({ disconnectTimeout: 0, maxRetriesPerRequest: null });
            const jobName = new String(Date.now());
            const myQueue = new Queue(jobName, { connection: redisClient });

            const worker = new Worker(jobName, async job => {
                return job.data;
            }, { connection: redisClient });

            async function cleanUpTest() {
                myQueue.removeAllListeners();
                await myQueue.obliterate({ force: true });
                await myQueue.close();
                worker.removeAllListeners();
                await worker.close();
                redisClient.disconnect();
            }

            try {
                worker.on('completed', async (job) => {
                    eventsInFiredOrder.push('completed');
                    try {
                        const isCompleted = await job.isCompleted();
                        assert.ok(isCompleted);
                    }
                    catch (e) {
                        await cleanUpTest();
                        done(e);
                        return;//no need to process any further if we encountered an issue here 
                    }
                    try {
                        await cleanUpTest()
                        assert.deepEqual(eventsInFiredOrder, [
                            'completed'
                        ]);
                        done();
                    }
                    catch (e) {
                        done(e);
                    }
                });

                //Start Queue/Worker
                await myQueue.waitUntilReady();
                await worker.waitUntilReady();

                //Add job
                const jobData = { foo: 'bar' };
                await myQueue.add('myJobName', jobData);
            }
            catch (e) {
                await cleanUpTest();
                done(e);
            }
        })();
    });

Relevant log output

ioredis:redis status[localhost:6379]: wait -> connecting +0ms
  ioredis:redis status[localhost:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: wait -> connecting +1ms
  ioredis:redis status[127.0.0.1:6379]: connecting -> connect +6ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> info([]) +0ms
  ioredis:redis status[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: connecting -> connect +1ms
  ioredis:redis write command[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: 0 -> info([]) +0ms
  ioredis:redis status[127.0.0.1:6379]: connect -> ready +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> info([]) +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> info([]) +0ms
  ioredis:redis status[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: connect -> ready +0ms
  ioredis:connection set the connection name [bull:MTcxNDU4ODkyNjQ5Mg==] +0ms
  ioredis:redis write command[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: 0 -> client([ 'setname', 'bull:MTcxNDU4ODkyNjQ5Mg==' ]) +0ms
  ioredis:redis write command[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: 0 -> info([]) +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> hmset([ 'bull:1714588926492:meta', 'opts.maxLenEvents', '10000' ]) +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('92bfa7afe986c75e3f2c8dfb976776bb13b0c2c3,9,bull:1714588926492:stalled,bull:1714588926492:wait,bull:1714588926492:active,bull:1714588926492:failed,bull:1714588926492:stalled-check,bull:1714588926492:me ... <REDACTED full-length="322">') +0ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('9b67b263800b6f824ebff3728c865ac78083a12b,7,bull:1714588926492:wait,bull:1714588926492:paused,bull:1714588926492:meta,bull:1714588926492:id,bull:1714588926492:completed,bull:1714588926492:events,bull:1 ... <REDACTED full-length="317">') +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('135a97538ee393c145e6d418b575b3aa1799c691,11,bull:1714588926492:wait,bull:1714588926492:active,bull:1714588926492:prioritized,bull:1714588926492:events,bull:1714588926492:stalled,bull:1714588926492:lim ... <REDACTED full-length="444">') +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('450228499c97be9dd7d9c5a49346e0f96b9fa97f,14,bull:1714588926492:wait,bull:1714588926492:active,bull:1714588926492:prioritized,bull:1714588926492:events,bull:1714588926492:stalled,bull:1714588926492:lim ... <REDACTED full-length="626">') +2ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> zscore([ 'bull:1714588926492:completed', '1' ]) +2ms
  ioredis:redis write command[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: 0 -> bzpopmin([ 'bull:1714588926492:marker', '5' ]) +0ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('222d7dd2655288f01e5aa18389e92b86c157d7b3,7,bull:1714588926492:wait,bull:1714588926492:paused,bull:1714588926492:meta,bull:1714588926492:prioritized,bull:1714588926492:events,bull:1714588926492:delayed ... <REDACTED full-length="233">') +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('135a97538ee393c145e6d418b575b3aa1799c691,11,bull:1714588926492:wait,bull:1714588926492:active,bull:1714588926492:prioritized,bull:1714588926492:events,bull:1714588926492:stalled,bull:1714588926492:lim ... <REDACTED full-length="444">') +0ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha([ 'c04bb977e4c2b19abcb92e17e4af1a46f0215d34', '2', 'bull:1714588926492:meta', 'bull:1714588926492:', '1000', 'force' ]) +0ms
  ioredis:redis write command[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: 0 -> bzpopmin([ 'bull:1714588926492:marker', '5' ]) +0ms
  ioredis:redis status[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: ready -> close +2ms
  ioredis:connection skip reconnecting since the connection is manually closed. +11ms
  ioredis:redis status[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: close -> end +0ms
    ✔ should fire a completed event with a job in the completed state!
  ioredis:AbstractConnector stream 127.0.0.1:6379 still open, destroying it +0ms
  ioredis:AbstractConnector stream 127.0.0.1:6379 still open, destroying it +0ms
  ioredis:redis status[127.0.0.1:6379]: ready -> close +1ms
  ioredis:connection skip reconnecting since the connection is manually closed. +1ms
  ioredis:redis status[127.0.0.1:6379]: close -> end +0ms


  1 passing (24ms)

  ioredis:redis status[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: end -> connecting +6s
  ioredis:redis status[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: connecting -> connect +3ms
  ioredis:redis write command[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: 0 -> info([]) +1ms
  ioredis:redis status[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: connect -> ready +1ms
  ioredis:connection set the connection name [bull:MTcxNDU4ODkyNjQ5Mg==] +6s
  ioredis:redis write command[127.0.0.1:6379 (bull:MTcxNDU4ODkyNjQ5Mg==)]: 0 -> client([ 'setname', 'bull:MTcxNDU4ODkyNjQ5Mg==' ]) +0ms
  ioredis:connection resend 0 unfulfilled commands +1ms

Code of Conduct

  • I agree to follow this project's Code of Conduct
@ScreaminSauce ScreaminSauce added the bug Something isn't working label May 1, 2024
@manast
Copy link
Contributor

manast commented May 1, 2024

I am seeing a potential issue actually, clearTimeout is not being called if the call to bzpopmin throws an exception. Fix incoming...

@manast
Copy link
Contributor

manast commented May 1, 2024

@ScreaminSauce do you mind testing with version 5.7.8 to see if the issue is resolved?

@ScreaminSauce
Copy link
Author

New version ran successfully - test output with ioredis logs below:

BullMq Test
  ioredis:redis status[localhost:6379]: wait -> connecting +0ms
  ioredis:redis status[localhost:6379 (bull:MTcxNDU5ODMxMTk4NQ==)]: wait -> connecting +2ms
  ioredis:redis status[127.0.0.1:6379]: connecting -> connect +7ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> info([]) +0ms
  ioredis:redis status[127.0.0.1:6379 (bull:MTcxNDU5ODMxMTk4NQ==)]: connecting -> connect +2ms
  ioredis:redis write command[127.0.0.1:6379 (bull:MTcxNDU5ODMxMTk4NQ==)]: 0 -> info([]) +0ms
  ioredis:redis status[127.0.0.1:6379]: connect -> ready +2ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> info([]) +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> info([]) +0ms
  ioredis:redis status[127.0.0.1:6379 (bull:MTcxNDU5ODMxMTk4NQ==)]: connect -> ready +1ms
  ioredis:connection set the connection name [bull:MTcxNDU5ODMxMTk4NQ==] +0ms
  ioredis:redis write command[127.0.0.1:6379 (bull:MTcxNDU5ODMxMTk4NQ==)]: 0 -> client([ 'setname', 'bull:MTcxNDU5ODMxMTk4NQ==' ]) +0ms
  ioredis:redis write command[127.0.0.1:6379 (bull:MTcxNDU5ODMxMTk4NQ==)]: 0 -> info([]) +1ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> hmset([ 'bull:1714598311985:meta', 'opts.maxLenEvents', '10000' ]) +8ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('92bfa7afe986c75e3f2c8dfb976776bb13b0c2c3,9,bull:1714598311985:stalled,bull:1714598311985:wait,bull:1714598311985:active,bull:1714598311985:failed,bull:1714598311985:stalled-check,bull:1714598311985:me ... <REDACTED full-length="322">') +0ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('9b67b263800b6f824ebff3728c865ac78083a12b,7,bull:1714598311985:wait,bull:1714598311985:paused,bull:1714598311985:meta,bull:1714598311985:id,bull:1714598311985:completed,bull:1714598311985:events,bull:1 ... <REDACTED full-length="317">') +2ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('135a97538ee393c145e6d418b575b3aa1799c691,11,bull:1714598311985:wait,bull:1714598311985:active,bull:1714598311985:prioritized,bull:1714598311985:events,bull:1714598311985:stalled,bull:1714598311985:lim ... <REDACTED full-length="444">') +3ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('450228499c97be9dd7d9c5a49346e0f96b9fa97f,14,bull:1714598311985:wait,bull:1714598311985:active,bull:1714598311985:prioritized,bull:1714598311985:events,bull:1714598311985:stalled,bull:1714598311985:lim ... <REDACTED full-length="626">') +3ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> zscore([ 'bull:1714598311985:completed', '1' ]) +3ms
  ioredis:redis write command[127.0.0.1:6379 (bull:MTcxNDU5ODMxMTk4NQ==)]: 0 -> bzpopmin([ 'bull:1714598311985:marker', '5' ]) +0ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('222d7dd2655288f01e5aa18389e92b86c157d7b3,7,bull:1714598311985:wait,bull:1714598311985:paused,bull:1714598311985:meta,bull:1714598311985:prioritized,bull:1714598311985:events,bull:1714598311985:delayed ... <REDACTED full-length="233">') +2ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha('135a97538ee393c145e6d418b575b3aa1799c691,11,bull:1714598311985:wait,bull:1714598311985:active,bull:1714598311985:prioritized,bull:1714598311985:events,bull:1714598311985:stalled,bull:1714598311985:lim ... <REDACTED full-length="444">') +0ms
  ioredis:redis write command[127.0.0.1:6379]: 0 -> evalsha([ 'c04bb977e4c2b19abcb92e17e4af1a46f0215d34', '2', 'bull:1714598311985:meta', 'bull:1714598311985:', '1000', 'force' ]) +1ms
  ioredis:redis write command[127.0.0.1:6379 (bull:MTcxNDU5ODMxMTk4NQ==)]: 0 -> bzpopmin([ 'bull:1714598311985:marker', '5' ]) +2ms
  ioredis:redis status[127.0.0.1:6379 (bull:MTcxNDU5ODMxMTk4NQ==)]: ready -> close +2ms
  ioredis:connection skip reconnecting since the connection is manually closed. +27ms
  ioredis:redis status[127.0.0.1:6379 (bull:MTcxNDU5ODMxMTk4NQ==)]: close -> end +0ms
    ✔ should fire a completed event with a job in the completed state! (44ms)
  ioredis:AbstractConnector stream 127.0.0.1:6379 still open, destroying it +0ms
  ioredis:AbstractConnector stream 127.0.0.1:6379 still open, destroying it +0ms
  ioredis:redis status[127.0.0.1:6379]: ready -> close +1ms
  ioredis:connection skip reconnecting since the connection is manually closed. +1ms
  ioredis:redis status[127.0.0.1:6379]: close -> end +0ms


  1 passing (46ms)

Cheers for the quick turnaround on this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants