Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flow] - away to wait until the flow is finished #1004

Open
felixmosh opened this issue Jan 19, 2022 · 8 comments
Open

[Flow] - away to wait until the flow is finished #1004

felixmosh opened this issue Jan 19, 2022 · 8 comments

Comments

@felixmosh
Copy link

felixmosh commented Jan 19, 2022

There is any way for a jobFlow to know when the flow is finished?

Assume the following code:

import { FlowProducer, QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('renovate');
const flowProducer = new FlowProducer();

async function addNewRenovateFlow() {
  const flow = await flowProducer.add({
    name: 'renovate-interior',
    queueName: 'renovate',
    children: [
      { name: 'paint', data: { place: 'ceiling' }, queueName: 'steps' },
      { name: 'paint', data: { place: 'walls' }, queueName: 'steps' },
      { name: 'fix', data: { place: 'floor' }, queueName: 'steps' },
    ],
  });

  return {
    waitUntilFinished(ttl = ms('1min')) {
      return Promise.all(
        [flow.job]
          .concat(flow.children.map((node) => node.job))
          .map((job) => job.waitUntilFinished(queueEvents, ttl))
      );
    },
  };
}

The implementation of my waitUntilFinished is causing the following error (in my case the flow has many tasks).

(node:54403) MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 closing listeners added to [FlowProducer]. Use emitter.setMaxListeners() to increase limit
MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 closing listeners added to [FlowProducer]. Use emitter.setMaxListeners() to increase limit

This makes sense since each job waitUntilFinished is registering a new listener.

Any suggestions?

@ccollie
Copy link
Contributor

ccollie commented Jan 19, 2022

Though I've not tried it, the most efficient way to do this is to get the parent id and listen for the completed or failed event using a QueueEvents listener. Modifying code from the docs :

import { QueueEvents } from 'bullmq';

const queueEvents = new QueueEvents('renovate');

queueEvents.on('completed', ({ jobId }) => {
 if (jobId === parentId) {
  console.log('Bingo !');
}
});

queueEvents.on('failed', ({ jobId: string, failedReason: string }) => {
  console.error('error painting', failedReason);
});

I imagine this is not a one-off thing, so I'd create a global QueueEvents instance per queue.

@ccollie
Copy link
Contributor

ccollie commented Jan 19, 2022

BTW, the event payload has a timestamp, so you can implement timeouts per id as well.

@felixmosh
Copy link
Author

@ccollie, thank you for the answer.

But, basically, you are telling me to reimplement waitUntilFinished which contains many edge cases which I don't want to reimplement each time I'm using Flow pattern.

WDYT?

@manast
Copy link
Contributor

manast commented Jan 22, 2022

@felixmosh waiting for a flow to finish is not a good pattern in a distributed, fault-tolerant system, instead, the best is to actually do whatever you want to do after a flow completes in the job itself, or add a new job in the flow parent job that actually does whatever you want to do after the parent does its processing.

@felixmosh
Copy link
Author

@manast, thank you for the explanation.

In my case there is no distribution, and I'm reporting back to the caller that it was done.
Which is not relevant as part of the flow as a new task.

@iam4x
Copy link

iam4x commented Jan 27, 2022

I've been using this pattern for more than a year in production with high concurrency of jobs, no issues:

const executeFlow = pTimeout(async (wallets: string[], queues: Queues[]) => {
  const flowProducer = new FlowProducer({ connection: redisConnection });
  const flow = await flowProducer.add({
    name: Queues.SAVE_HOLDINGS,
    queueName: Queues.SAVE_HOLDINGS,
    data: { wallets },
    opts: {
      attempts: 5,
      backoff: { type: 'fixed', delay: 2000 },
      removeOnComplete: 1,
    },
    children: queues.map((queue) => ({
      name: queue,
      queueName: queue,
      data: { wallets },
      opts: {
        attempts: 100,
        backoff: { type: 'fixed', delay: 2000 },
        removeOnComplete: true,
      },
    })),
  });
  await waitUntilCompleted(flow.job);
}, PROCESSOR_TIMEOUT);

function waitUntilCompleted(job: Job) {
  return new Promise<void>((resolve) => {
    const check = async () => {
      if (await job.isCompleted()) {
        if (job.id) {
          // FIX: delete `processed` and `events` data from redis to avoid memory usage grow
          // dunno if needed anymore with newer versions
          redisConnection.del(`bull:SAVE_HOLDINGS:${job.id}:events`);
          redisConnection.del(`bull:SAVE_HOLDINGS:${job.id}:processed`);
        }
        return resolve();
      }
      return setTimeout(() => check(), 300);
    };
    check();
  });
}

@felixmosh
Copy link
Author

@iam4x thank you for sharing, but looks like you are implemented a "busy wait" on the flow job.

I've actually managed to implement what I need using a listeners as recommended before.

@felixmosh
Copy link
Author

This is my version:

import { JobNode, QueueEvents } from 'bullmq';

export async function flowWaitUntilFinished(
  jobNode: JobNode,
  ttl: number,
  queueEvents: QueueEvents
) {
  const jobs = getFlowJobs(jobNode);
  const topJob = jobs[0];
  const completedStatus = await Promise.all(jobs.map((job) => job.isCompleted()));

  const unFinishedJobIds = jobs.filter((_job, idx) => !completedStatus[idx]).map((job) => job.id);

  let pointer = unFinishedJobIds.length;

  return new Promise<any>((resolve, reject) => {
    // eslint-disable-next-line no-var
    var clearCurrentTimeout = startTimeout(onFailure);

    function startTimeout(callback: (args: { jobId: string; failedReason: string }) => void) {
      if (typeof clearCurrentTimeout === 'function') {
        clearCurrentTimeout();
      }
      pointer--;

      const timeoutId = setTimeout(() => {
        callback({
          jobId: unFinishedJobIds[pointer],
          failedReason: `Flow Job wait timed out before finishing, no finish notification arrived after ${ttl}ms `,
        });
      }, ttl);

      return () => clearTimeout(timeoutId);
    }

    function onComplete({ jobId, returnvalue }) {
      if (unFinishedJobIds.includes(jobId) && jobId !== topJob.id) {
        clearCurrentTimeout = startTimeout(onFailure);
      } else if (jobId === topJob.id) {
        removeListeners();
        resolve(returnvalue);
      }
    }

    function onFailure({ jobId, failedReason }) {
      if (unFinishedJobIds.includes(jobId)) {
        const error: any = new Error(failedReason);
        error.failedJobId = `${topJob.id}:${jobId}`;

        removeListeners();
        reject(error);
      }
    }

    function removeListeners() {
      queueEvents.off('completed', onComplete);
      queueEvents.off('failed', onFailure);

      clearCurrentTimeout();
    }

    queueEvents.on('completed', onComplete);
    queueEvents.on('failed', onFailure);
  });
}

getFlowJobs returns a flat array of all jobs in the flow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants