Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing 'added' Events for Multiple Queues #1281

Closed
Trystan-C opened this issue Jun 10, 2022 · 7 comments
Closed

Missing 'added' Events for Multiple Queues #1281

Trystan-C opened this issue Jun 10, 2022 · 7 comments

Comments

@Trystan-C
Copy link

Trystan-C commented Jun 10, 2022

When unit testing against BullMQ, I'm creating new Queue, Worker, and QueueEvents objects per test case. Under these circumstances, my tests which depend on added event hooks on QueueEvents often fail for having not received an added event despite waiting what seems to be a significant amount of time.

The following demo code reproduces the issue:

Demo Code:

import { ConnectionOptions, Queue, QueueEvents, Worker } from "bullmq";
import { v4 as uuid } from "uuid";
import { expect } from "chai";

interface TestConfig {
    queue: Queue;
    worker: Worker;
    events: QueueEvents;
}

describe.only("'Added' Event Emission Inconsistency Demo", function() {
    this.timeout(120_000);

    describe("adding a bunch of jobs to the queue", () => {
        let jobsAddedToTheQueue: number = 0;
        let addedEventsReceived: number = 0;
        let completedEventsReceived: number = 0;
        before(async () => {
            console.log("Adding one job to a bunch of queues...");
            const configs: TestConfig[] = [];
            for (let i = 0; i < 100; i++) {
                const name = uuid();
                const config = setup(name);
                config.events.on("added", () => addedEventsReceived++);
                config.events.on("completed", () => completedEventsReceived++);
                configs.push(config);

                await Promise.all([
                    config.queue.waitUntilReady(),
                    config.worker.waitUntilReady(),
                    config.events.waitUntilReady(),
                ]);

                await config.queue.add(name, {});
                jobsAddedToTheQueue++;
            }

            console.log("Giving the jobs some time to finish...");
            await new Promise<void>((resolve) => setTimeout(resolve, 90_000));

            console.log("Cleaning up...");
            for (const config of configs) {
                await Promise.all([
                    config.queue.close(),
                    config.worker.close(),
                    config.events.close(),
                ]);
            }
        });
        it("emits an 'added' event for all new jobs added to the queue", () => {
            expect(addedEventsReceived).to.equal(jobsAddedToTheQueue);
        });
        it("emits a 'completed' event for all jobs added to the queue", () => {
            expect(completedEventsReceived).to.equal(jobsAddedToTheQueue);
        });
    });

    function setup(name: string): TestConfig {
        return {
            queue: new Queue(name, {
                defaultJobOptions: {
                    removeOnComplete: true,
                    removeOnFail: true,
                },
                connection: redisConnectionOptions(),
            }),
            worker: new Worker(
                name,
                async () => {},
                {
                    concurrency: 1,
                    lockDuration: 30_000,
                    connection: redisConnectionOptions(),
                },
            ),
            events: new QueueEvents(
                name,
                { connection: redisConnectionOptions() },
            ),
        };
    }

    function redisConnectionOptions(): ConnectionOptions {
        return {
            host: "localhost",
            port: 6379,
            maxRetriesPerRequest: null,
            enableReadyCheck: false,
        };
    }
});

Demo Code Output:

  'Added' Event Emission Inconsistency Demo
    adding a bunch of jobs to the queue
Adding one job to a bunch of queues...
Giving the jobs some time to finish...
Cleaning up...
      1) emits an 'added' event for all new jobs added to the queue
      2) emits a 'completed' event for all jobs added to the queue


  0 passing (2m)
  2 failing

  1) 'Added' Event Emission Inconsistency Demo
       adding a bunch of jobs to the queue
         emits an 'added' event for all new jobs added to the queue:

      AssertionError: expected 3 to equal 100
      + expected - actual

      -3
      +100

      at Context.<anonymous> (src/demo.spec.ts:51:44)
      at processImmediate (node:internal/timers:466:21)

  2) 'Added' Event Emission Inconsistency Demo
       adding a bunch of jobs to the queue
         emits a 'completed' event for all jobs added to the queue:

      AssertionError: expected 96 to equal 100
      + expected - actual

      -96
      +100

      at Context.<anonymous> (src/demo.spec.ts:54:48)
      at processImmediate (node:internal/timers:466:21)

Dependencies Used:

  • node: 16.14.2
  • bullmq: ^1.86.0
  • uuid: ^8.3.2
  • chai: ^4.3.4
  • mocha: ^9.0.3
  • redis: latest (through docker)

Notes:

  • Oddly enough, I don't observe this issue for other event types, including completed and failed.
  • It seems like completed events take some sort of precedence as compared to the added events.
  • This seems like it may be related to the following issue, open since 2020: Events from QueueEvents are late #242

Is this behavior expected? Is creating and closing many queues a testing anti-pattern that I should just avoid as much as possible? Is there an SLA for added event emission?

@roggervalf
Copy link
Collaborator

roggervalf commented Jun 11, 2022

hi @Trystan-C, I'm not really sure why you need to create a new worker, queue and queueEvent each time you add a new job in your test case, commonly you may need only one queueEvent, I changed your test case a little bit and the test case passed:

describe.only("'Added' Event Emission Inconsistency Demo", function() {
    this.timeout(120_000);

    describe("adding a bunch of jobs to the queue", () => {
        let jobsAddedToTheQueue = 0;
        let addedEventsReceived = 0;
        let completedEventsReceived = 0;
        before(async () => {
            console.log("Adding one job to a bunch of queues...");
            const configs: TestConfig[] = [];
            const name = uuid();
            const config = setup(name);
            config.events.on("added", () => addedEventsReceived++);
            config.events.on("completed", () => completedEventsReceived++);
            configs.push(config);
            await Promise.all([
              config.queue.waitUntilReady(),
              config.worker.waitUntilReady(),
              config.events.waitUntilReady(),
          ]);

        for (let i = 0; i < 100; i++) {

                
                await config.queue.add(name, {});
                jobsAddedToTheQueue++;
            }

            console.log("Giving the jobs some time to finish...");
            await new Promise<void>(resolve => setTimeout(resolve, 15_000));

            console.log("Cleaning up...");
                await Promise.all([
                    config.queue.close(),
                    config.worker.close(),
                    config.events.close(),
                ]);
            
        });
        it("emits an 'added' event for all new jobs added to the queue", () => {
            expect(addedEventsReceived).to.equal(jobsAddedToTheQueue);
        });
        it("emits a 'completed' event for all jobs added to the queue", () => {
            expect(completedEventsReceived).to.equal(jobsAddedToTheQueue);
        });
    });

    function setup(name: string): TestConfig {
        return {
            queue: new Queue(name, {
                defaultJobOptions: {
                    removeOnComplete: true,
                    removeOnFail: true,
                },
                connection: redisConnectionOptions(),
            }),
            worker: new Worker(
                name,
                async () => {},
                {
                    concurrency: 1,
                    lockDuration: 30_000,
                    connection: redisConnectionOptions(),
                },
            ),
            events: new QueueEvents(
                name,
                { connection: redisConnectionOptions() },
            ),
        };
    }

    function redisConnectionOptions(): ConnectionOptions {
        return {
            host: "localhost",
            port: 6379,
            maxRetriesPerRequest: null,
            enableReadyCheck: false,
        };
    }
});

(the configs array I think that is not necessary). Also you can use event tests as reference https://github.com/taskforcesh/bullmq/blob/master/tests/test_events.ts

@Trystan-C
Copy link
Author

Trystan-C commented Jun 13, 2022

@roggervalf The example I provided is a diluted version of a much larger test suite which takes advantaged of new queues per test case. Some of the tests cover behavior when closing BullMQ resources. For example, testing stalled job recovery when closing a queue / worker prematurely, cleanup on close, draining, etc.

You're correct that adjusting the example to only use one queue / worker / events resolves the issue, but my question is why does creating many queues cause such a large delay in event emission? Also, if this is a symptom of the implementation of BullMQ, is there anything that can be done about it?

If this is entirely expected due to trade-offs in the BullMQ design, then this issue can be closed as such, and I can rewrite my test suite to use as few queues as possible.

@manast
Copy link
Contributor

manast commented Jun 14, 2022

If this is entirely expected due to trade-offs in the BullMQ design, then this issue can be closed as such, and I can rewrite my test suite to use as few queues as possible.

@Trystan-C there is not a known trade-off about the speed events are delivered to any event observer vs the amount of queues or anything like that. However, we will need a clear and simple test case that demonstrates the issue you are experiencing so that we can investigate it ourselves.

@Trystan-C
Copy link
Author

Trystan-C commented Jun 14, 2022

@manast For sure. Is the example provided in the OP insufficiently clear / simple for reproducing the issue? I know that @roggervalf provided a workaround, which is great, but I believe the example code provided consistently reproduces the issue.

@manast
Copy link
Contributor

manast commented Jun 14, 2022

@Trystan-C the issue here is that the added events are arriving before the QueueEvents has started consuming events, so you only get most of the completed events. The solution for this particular test case is to use the "lastEventId" option in QueueEvents so that it starts consuming events from the start of the event stream, then you will not lose any events, but this may not be what you want to do in production code as it will "replay" old events that have already been consumed. So change your code to this:

  events: new QueueEvents(name, {
    lastEventId: '0-0',
    connection: redisConnectionOptions(),
  }),

@Trystan-C
Copy link
Author

Thanks, @manast! For the time being, I've added an option used during local testing for applying the lastEventId: '0-0' bit to new queues, but this option will be disabled for production usage. This workaround and the explanation provided are sufficient for me, so I'm going to close this out.

@dvins
Copy link

dvins commented Mar 14, 2023

@manast Wish I found your 0-0 solution hours earlier, but relieved I finally discovered it! Love BullMQ, but sure wish there was better guidance around unit and integration testing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants