Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual processing of jobs, grabs 2 jobs from waiting state #1297

Closed
presidenten opened this issue Jun 30, 2022 · 5 comments
Closed

Manual processing of jobs, grabs 2 jobs from waiting state #1297

presidenten opened this issue Jun 30, 2022 · 5 comments

Comments

@presidenten
Copy link

Describe the bug
Fetching jobs manually to process them according to this documentation:
https://docs.bullmq.io/patterns/manually-fetching-jobs
Grabs two jobs from the waiting queue, where as one of them gets stuck in active.

Expected behavior
Only one job gets fetched from the queue.

Environment (please complete the following information):
Using bullmq v1.86.2

To Reproduce

const bull = require('bullmq');
const crypto = require('crypto');

const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));

const queue = new bull.Queue('bug-example', {
  connection: { host: 'localhost', port: 6379 },
  settings: {
    maxStalledCount: 0,
  },
});

const uuid = crypto.randomUUID();
const worker = new bull.Worker('bug-example', null, {
  lockDuration: 1000,
  connection: { host: 'localhost', port: 6379 },
});

(async () => {
  // Prepare jobs
  await queue.add('task1');
  await queue.add('task2');
  console.log('initial waiting jobs', await queue.getWaitingCount());

  // Get a job
  let job;
  while(!job) {
    await sleep(10);
    job = (await worker.getNextJob(uuid));
  }

  await job.moveToCompleted(42, uuid);

  console.log('waiting jobs (should be 1)', await queue.getWaitingCount());
  console.log('active jobs (should be 0)', await queue.getActiveCount());
  await sleep(5000);
  console.log('and its stuck in active for longer than lockDuration above', await queue.getActiveCount());
})();

output

❯ node bug.js
initial waiting jobs 2
waiting jobs (should be 1) 0
active jobs (should be 0) 1
@manast
Copy link
Contributor

manast commented Jul 1, 2022

Thanks for reporting this issue. It is not really a bug, but I see that the first example is not correct in the documentation, if you use the second one it will work as you expect: https://docs.bullmq.io/patterns/manually-fetching-jobs#looping-through-jobs

@manast
Copy link
Contributor

manast commented Jul 1, 2022

Ok, the documentation has been corrected now.

@manast manast closed this as completed Jul 1, 2022
@presidenten
Copy link
Author

@manast I still dont understand.

I removed the loop, and it still doesnt work properly.

Can you update my code example (it only needs bullmq and a locally running redis) so that it only grabs one job from the waiting queue, instead of two, wheres it gets the job reference to one of them through worker.getNextJob?

Is getNextJob really supposed to take two jobs from the waiting state?

const bull = require('bullmq');
const crypto = require('crypto');

const queue = new bull.Queue('bug-example', {
  connection: { host: 'localhost', port: 6379 },
  settings: {
    maxStalledCount: 0,
  },
});

const uuid = crypto.randomUUID();
const worker = new bull.Worker('bug-example', null, {
  lockDuration: 1000,
  connection: { host: 'localhost', port: 6379 },
});

(async () => {
  // Prepare jobs
  await queue.add('task1');
  await queue.add('task2');
  console.log('\ninitial waiting jobs', await queue.getWaitingCount());

  // Get a job
  const job = (await worker.getNextJob(uuid));

  await job.moveToCompleted(42, uuid);

  console.log('\nHere we see that one job got processed as intended');
  console.log('- completed jobs (should be 1)', await queue.getCompletedCount());

  console.log('\nBut the waiting jobs are now clean, even though we had 2 jobs waiting');
  console.log('- waiting jobs (should be 1)', await queue.getWaitingCount());

  console.log('\nAnd here we see second job is stuck in active, even though we only called getNextJob once');
  console.log('- active jobs (should be 0)', await queue.getActiveCount());

  await worker.close();
  await queue.close();
})();

output:

❯ node bug.js

initial waiting jobs 2

Here we see that one job got processed as intended
- completed jobs (should be 1) 1

But the waiting jobs are now clean, even though we had 2 jobs waiting
- waiting jobs (should be 1) 0

And here we see second job is stuck in active, even though we only called getNextJob once
- active jobs (should be 0) 1

@manast
Copy link
Contributor

manast commented Jul 1, 2022

When you use this call await job.moveToCompleted(42, uuid); you are fetching the next job but ignoring it, so nobody process it. You should use "false" as third argument: await job.moveToCompleted(42, uuid, false); https://api.docs.bullmq.io/classes/Job.html#moveToCompleted

@LBC000
Copy link

LBC000 commented Mar 2, 2024

// bullMQUtils.ts
import { delay, Job, Queue, Worker } from "bullmq";

/**
    // 调用手动队列处理函数
    const token = nanoid();
    const { queue: manuallyQueue, worker } = processManualQueue(
      "manuallyQueue",
      {
        connection: connection,
        concurrency: 5,
        delayTime: 3000,
        successCallback: async (data) => {
          if (data.video == 3) return false;
          return true;
        },
      },
      token
    ); 
 */
export function processManualQueue(queueName, options, token) {
  const { connection, concurrency, delayTime, successCallback } = options;
  const queue = new Queue(queueName, { connection });
  const worker = new Worker(queueName, null, {
    connection,
    concurrency: concurrency || 5,
  });

  workerRun();

  async function workerRun() {
    let job;

    while (true) {
      await delay(delayTime || 3000);

      let jobData = null;
      let jobId;
      let success;

      if (job) {
        success = await successCallback(job.data);
        // console.log("处理手动作业", job.data, "状态", success);
        if (success) {
          [jobData, jobId] = await job.moveToCompleted(
            "some return value",
            token
          );
        } else {
          await job.moveToFailed(new Error("some error message"), token);
        }

        if (jobData) {
          job = Job.fromJSON(worker, jobData, jobId);
        } else {
          job = null;
        }
      } else {
        if (!job) {
          job = await worker.getNextJob(token);
        }
      }
    }
  }

  return { queue, worker };
}
// 调用手动队列处理函数
const token = nanoid();
const { queue: manuallyQueue, worker } = processManualQueue(
  "manuallyQueue",
  {
    connection: connection,
    concurrency: 5,
    delayTime: 3000,
    successCallback: async (data) => {
      if (data.video == 3) return false;
      return true;
    },
  },
  token
);
// 手动作业, 失败重试
  async manuallyQueue() {
    // console.log('触发了-test_queue_1');
    // manuallyQueue.add("jobName", { video: 1 });
    // manuallyQueue.add("jobName", { video: 2 });

    for (let i = 1; i < 6; i++) {
      manuallyQueue.add(
        "jobName",
        { video: i },
        {
          attempts: 3, // 重试3次
          backoff: {
            type: "exponential",
            delay: 3000, // 1秒后重试
          },
        }
      );
    }
  },

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants