Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Repeatable jobs are not executed at all #1259

Closed
ff6347 opened this issue May 25, 2022 · 8 comments
Closed

Question: Repeatable jobs are not executed at all #1259

ff6347 opened this issue May 25, 2022 · 8 comments
Labels
question Further information is requested

Comments

@ff6347
Copy link

ff6347 commented May 25, 2022

Hi I'm a first time bullmq user and I'm a bit puzzled why I'm not seeing any output from my queues repeatable jobs. This only happens for repeatable jobs, if I remove the repeat section in the options the job runs.

The setup is pretty simple (mostly taken from the docs). I don't get any logging from the handler function in the worker, nor from the emitted events of the worker, nor from the queueEvents.

Can anyone point me to my misconceptions?

This is queue/index.ts

import Redis from "ioredis";
import { Queue as BMQueue, QueueEvents, Worker, QueueScheduler } from "bullmq";
import { redisURL } from "../env.js";


export async function runBullMQueue() {
  try {
    const scheduleQueue = new BMQueue("schedule", {
      prefix: "{schedule}",
      connection: new Redis(redisURL, { maxRetriesPerRequest: null }),
    });
    const scheduler = new QueueScheduler("schedule", {
      connection: new Redis(redisURL, { maxRetriesPerRequest: null }),
      prefix: "{schedule}",

    });

    const worker = new Worker(
      "schedule",
      async (job) => console.log("worker handler",job.data),
      {
        autorun: true,
        connection: new Redis(redisURL, { maxRetriesPerRequest: null }),
        prefix: "{schedule}",
      },
    );
/*
When do I need this?
    scheduleQueue.waitUntilReady();
    worker.waitUntilReady();
    scheduler.waitUntilReady();
*/
    worker.on("completed", () => console.log("worker job is completed"));
    worker.on("failed", () => console.log("worker job is failed"));
    worker.on("error", () => console.log("worker job is error"));
    worker.on("active", () => console.log("worker job is active"));

    const queueEvents = new QueueEvents("schedule", {
      connection: new Redis(redisURL, { maxRetriesPerRequest: null }),
      prefix: "{schedule}",
    });

    queueEvents.on("completed", ({ jobId, returnvalue }) => {
      // Called every time a job is completed in any worker.
      console.log("queueEvents completed", jobId, returnvalue);
    });

    queueEvents.on("failed", ({ jobId, failedReason }) => {
      // jobId received a progress event
      console.log("queueEvents failed", jobId, failedReason);
    });

    queueEvents.on("progress", ({ jobId, data }) => {
      // jobId received a progress event
      console.log("queueEvents progress", jobId, data);
    });

    await scheduleQueue.add(
      `schedule-${Math.random()}`,
      { type: "schedule", val: Math.random() },
      {
        repeat: {
          // cron: "* * * * *",
          every: 10000,
          limit: 100,
          immediately: true,
        },
      },
    );
  } catch (error) {
    console.error(error);

    throw error;
  }
}

this is index.ts

import { runBullMQueue, runBullQueues } from "./queues";

// runBullQueues();

async function main() {
  await runBullMQueue();
}

main().catch((err) => {
  console.error(err);
});

  • edit 1: added and tested QueueScheduler. Still no execution from repatables
  • edit 2: fix example
@roggervalf
Copy link
Collaborator

Hi @ff6347, you may need a queueScheduler instance https://docs.bullmq.io/guide/queuescheduler

@ff6347
Copy link
Author

ff6347 commented May 25, 2022

Hi @roggervalf Thanks for your reply. I saw that as well, added it in my code but still the queue is not executing. There is some thing happening in Redis but I can't say what (since it is my first day with bullmq).

I updated my code above to reflect what I did.

@roggervalf
Copy link
Collaborator

hey @ff6347 in your example, you should import QueueScheduler as well, prefix should be passed into queueScheduler options as you did with worker and scheduleQueue, also in queueEvents is missing the prefix

@roggervalf roggervalf added the question Further information is requested label May 26, 2022
@ff6347
Copy link
Author

ff6347 commented May 26, 2022

Hi @roggervalf thank you for your support. The imports where there, I just missed them when copying the code.
I added the prefixes to the QueueSchdeuler and the QueueEvents and now it is working. Thanks a lot.

Can I ask one last design related question?

I'm working on a tool where I need to add jobs to a queue dynamically. The scheduled worker collects data from a database and creates n new jobs based on what he found. He will have a return value. Where would be the best location to add new jobs to a second queue?

  1. in the handler of my schedule worker?
  2. In my workers on completed callback?
  3. In my queueEvents on completed callback?

My choice would be in the workers on completed callback. But maybe I'm wrong?

@manast
Copy link
Contributor

manast commented May 26, 2022

The best place is in 1. in the handler of my schedule worker.

@manast
Copy link
Contributor

manast commented May 26, 2022

Because if that handler completes you get the guarantee that the next job is in the queue, and if it fails you can fix it, retry it, etc.

@ff6347
Copy link
Author

ff6347 commented May 26, 2022

@manast thank you. I thought that it would be good to only add the jobs if the worker completes without an error. If the last thing the worker does is doing the DB request and returning the result. What could go wrong after that?

That why my choice was 2. (workers on completed).

Anyway thank you a lot for your support and your work on this (and I'm sure other) oss project

@ff6347 ff6347 closed this as completed May 26, 2022
@manast
Copy link
Contributor

manast commented May 26, 2022

@manast thank you. I thought that it would be good to only add the jobs if the worker completes without an error. If the last thing the worker does is doing the DB request and returning the result. What could go wrong after that?

Well, Redis could be down and then the job will not be marked as completed. Choice 2 will work as long as all services are online at all times, but eventually, it will fail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants