Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request: non-blocking version of getNextJob #516

Closed
Janpot opened this issue Apr 30, 2021 · 2 comments · Fixed by #517
Closed

Feature request: non-blocking version of getNextJob #516

Janpot opened this issue Apr 30, 2021 · 2 comments · Fixed by #517

Comments

@Janpot
Copy link
Contributor

Janpot commented Apr 30, 2021

Looking for ways to compose workers in a more elaborate way, e.g. to add priority, or throttling, or round-robin balancing between workers, e.g.:

const prio1 = new Worker('my-queue-1')
const prio2 = new Worker('my-queue-2')
const prio3a = new Worker('my-queue-3a')
const prio3b = new Worker('my-queue-3b')

async function getNextJob(token) {
  // highest priority first
  let job = await prio1.getNextJob(token)
  if (job) return job
  
  // second highest priority, but maximum 1 per second
  const lockAcquired = await redis.set('prio2-throttle', 'lock', 'NX', 'PX', 1000);
  if (lockAcquired) {
    job = await prio2.getNextJob(token)
    if (job) return job
  }
  
  // toggle between two queues
  const index = await redis.incr('prio3-index');
  const queue = [prio3a, prio3b][index % 2]
  job = await queue.getNextJob(token)
  if (job) return job
  
  // we're empty, just wait for the next poll
  await sleep(1000)
}

async function run (token) {
  while (true) {
    const job = await getNextJob(token)
    try {
      const result = await process(job)
      await job.moveToCompleted(result, token);
    } catch (err) {
      await job.moveToFailed(err, token);
    }
  }
}

When trying this out, the blocking behavior of worker.getNextJob makes this use-case impossible. It would be helpful if there was a non-blocking version of getNextJob that just returns immediately. Perhaps when setting drainDelay to 0?

@manast
Copy link
Contributor

manast commented Apr 30, 2021

drainDelay 0 means block for ever as per Redis semantics. It would be quite easy though to add an opts parameter with "block: boolean".

@LBC000
Copy link

LBC000 commented Mar 2, 2024

// bullMQUtils.ts
import { delay, Job, Queue, Worker } from "bullmq";

/**
    // 调用手动队列处理函数
    const token = nanoid();
    const { queue: manuallyQueue, worker } = processManualQueue(
      "manuallyQueue",
      {
        connection: connection,
        concurrency: 5,
        delayTime: 3000,
        successCallback: async (data) => {
          if (data.video == 3) return false;
          return true;
        },
      },
      token
    ); 
 */
export function processManualQueue(queueName, options, token) {
  const { connection, concurrency, delayTime, successCallback } = options;
  const queue = new Queue(queueName, { connection });
  const worker = new Worker(queueName, null, {
    connection,
    concurrency: concurrency || 5,
  });

  workerRun();

  async function workerRun() {
    let job;

    while (true) {
      await delay(delayTime || 3000);

      let jobData = null;
      let jobId;
      let success;

      if (job) {
        success = await successCallback(job.data);
        // console.log("处理手动作业", job.data, "状态", success);
        if (success) {
          [jobData, jobId] = await job.moveToCompleted(
            "some return value",
            token
          );
        } else {
          await job.moveToFailed(new Error("some error message"), token);
        }

        if (jobData) {
          job = Job.fromJSON(worker, jobData, jobId);
        } else {
          job = null;
        }
      } else {
        if (!job) {
          job = await worker.getNextJob(token);
        }
      }
    }
  }

  return { queue, worker };
}
// 调用手动队列处理函数
const token = nanoid();
const { queue: manuallyQueue, worker } = processManualQueue(
  "manuallyQueue",
  {
    connection: connection,
    concurrency: 5,
    delayTime: 3000,
    successCallback: async (data) => {
      if (data.video == 3) return false;
      return true;
    },
  },
  token
);
// 手动作业, 失败重试
  async manuallyQueue() {
    // console.log('触发了-test_queue_1');
    // manuallyQueue.add("jobName", { video: 1 });
    // manuallyQueue.add("jobName", { video: 2 });

    for (let i = 1; i < 6; i++) {
      manuallyQueue.add(
        "jobName",
        { video: i },
        {
          attempts: 3, // 重试3次
          backoff: {
            type: "exponential",
            delay: 3000, // 1秒后重试
          },
        }
      );
    }
  },

thewilkybarkid added a commit to PREreview/coar-notify that referenced this issue Apr 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants