Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graceful shutdown handler #425

Closed
francescov1 opened this issue Mar 10, 2021 · 14 comments
Closed

Graceful shutdown handler #425

francescov1 opened this issue Mar 10, 2021 · 14 comments

Comments

@francescov1
Copy link
Contributor

Hi there! I've been looking on docs related to graceful shutdowns with BullMQ but havent been able to find much.

Would like to verify that this is the correct approach for graceful shutdowns, ensuring all jobs are completed or at least can be picked up once restarted:

  • Express server in Node.js is closed, ensures no more jobs are added to queues
  • BullMQ Workers are paused - awaiting this function call should ensure that active jobs are finished being processed correct? Also, what happens here if we have a rate limiter and QueueScheduler? How do we ensure that throttled jobs are finished being processed as well?
  • Close all queues, workers, queueschedulers, etc.

Would be happy to add some docs for this somewhere if you'd like.

@manast
Copy link
Contributor

manast commented Mar 12, 2021

Hi. I updated the docs with some info that I think covers most of what you are wondering, let me know if something is missing:
https://docs.bullmq.io/guide/workers/graceful-shutdown
https://docs.bullmq.io/guide/workers/pausing-queues

@francescov1
Copy link
Contributor Author

Perfect, that's exactly what I was looking for. Thanks so much 😊

@francescov1
Copy link
Contributor Author

Hey @manast, I think there may be an issue in the pausing-queues docs you added, want to double check.

The call above will be executed almost immediately, but note it is an async method, this is because it can also be called with an extra parameter so that the call waits for all the jobs in that given worker to complete:

await myWorker.pause(true);

It is my understanding that calling pause with false or undefined should not wait for the worker to finish jobs, and if true is passed, it should. I checked the code and the worker pause function:

async pause(doNotWaitActive) {
        if (!this.paused) {
            this.paused = new Promise(resolve => {
                this.resumeWorker = function () {
                    resolve();
                    this.paused = null; // Allow pause to be checked externally for paused state.
                    this.resumeWorker = null;
                };
            });
            await (!doNotWaitActive && this.whenCurrentJobsFinished());
            this.emit('paused');
        }
    }

it seems that actually the opposite is happening - if doNotWaitActive is undefined or false, then await (!doNotWaitActive && this.whenCurrentJobsFinished()); resolves to await (true && this.whenCurrentJobsFinished());

@francescov1
Copy link
Contributor Author

I would also like to clarify something, is there a way that I can gracefully shutdown workers that use rate limits? So essentially I am looking for a way to handle graceful shutdowns so that our system can wait for all jobs to finish (whether they are currently being processed, or delayed due to the rate limiter) and then we know for certain that all jobs are done and we can close everything else.

@manast
Copy link
Contributor

manast commented Mar 23, 2021

it seems that actually the opposite is happening - if doNotWaitActive is undefined or false, then await (!doNotWaitActive && this.whenCurrentJobsFinished()); resolves to await (true && this.whenCurrentJobsFinished());

yes you are right. We should never use boolean parameters for this reason :). I will fix it asap.

@manast
Copy link
Contributor

manast commented Mar 23, 2021

I would also like to clarify something, is there a way that I can gracefully shutdown workers that use rate limits? So essentially I am looking for a way to handle graceful shutdowns so that our system can wait for all jobs to finish (whether they are currently being processed, or delayed due to the rate limiter) and then we know for certain that all jobs are done and we can close everything else.

That is not currently possible unfortunately. Since delayed jobs are always moved back to wait status, and pause pauses wait, it means there is no proper way to only wait for delayed jobs to complete...

@manast
Copy link
Contributor

manast commented Mar 23, 2021

@francescov1 interestingly this will be possible with the new redis module (https://github.com/taskforcesh/bullmq-redis) however it is months before release...

@francescov1
Copy link
Contributor Author

Booleans are confusing, totally agree ;)

Too bad about the delayed jobs, but understandable. Curious if it would be possible to achieve this by pausing the queue (while leaving the workers & queueScheduler running) and then continue checking if delayed/waiting jobs have all been picked up before pausing the worker? Or does pausing a queue also pause the wait status jobs?

I'm excited to try out the new redis module when it's ready! Will be following its development 🚀

@manast
Copy link
Contributor

manast commented Mar 23, 2021

Pausing the queue is implemented by renaming the "wait" list to "paused", this is quite powerful despite its simplicity, but since delayed jobs by design must be moved to the wait list before they can be picked up by workers... there may be a possibility though that I haven thought it through yet, namely to move the delayed jobs to a "wait" list even when paused. The problem to solve is how to deal when the queue is unpaused and you still have jobs left in the "wait" list that came from the delayed set... maybe moving those jobs to the tip of the paused list and then renaming back to wait or something like that.

@francescov1
Copy link
Contributor Author

Ahh I got it, interesting. Well the main reason I was curious about queue pausing was because I wanted to find the best way to ensure a graceful shutdown. Therefore the issue of needing to deal with jobs still left in "wait" after the queue being unpaused isn't super important, the idea here is that we'd try to ensure all the "wait" jobs get cleared and then we shut everything down.

I wonder if there's the possibility to instead define a new type of "close" function that does the logic you are describing (pause queue but move jobs to a "wait" list) and then simply wait until the list is cleared, then perform the standard "close" logic?

@manast
Copy link
Contributor

manast commented Mar 25, 2021

hmm, not sure. The thing is that you call close on one worker, but you may have hundreds of them, so normally you only want to wait for the jobs that are currently being processed by your current worker.

@sspread
Copy link

sspread commented Jul 7, 2021

@manast Hello - I'm trying to shut down workers gracefully in event of sigint/sigterm. In my case, I don't know how long the active jobs will take to complete/fail, so I'd rather force them out of 'active' state so that they will be placed back into the queue for later processing as opposed to waiting till finish. I'm fine with losing any progress. Using pause and close methods (with 'true' force boolean) effectively orphans any active jobs in the active state - they will never get picked up again and they will never complete - they just remain in active. I'm not using scheduler, and the workers are using the 'sandbox' mode. Is my case supported? Is the behavior I'm describing expected?

EDIT: Basically want the behavior (and for same practical reason) as described in sidekiq queue manager under 'TERM' section https://github.com/mperham/sidekiq/wiki/Signals, but would be OK with simply implementing without the 25s grace period to finish.

@sspread
Copy link

sspread commented Jul 7, 2021

I think I found how to do it with QueueScheduler.

async function shutdown(signal) {
    console.log(`got signal: ${signal}`)

    await Promise.all(workers.map(w => w.pause(true)))
    await Promise.all(queueSchedulers.map(s => s.close()))

    console.log('exiting...')
    process.kill(process.pid, signal)
}

process.once('SIGINT', shutdown)
process.once('SIGTERM', shutdown)

Where workers and queueSchedulers are each created 1-to-1 with my queues.

@manast
Copy link
Contributor

manast commented Jul 12, 2021

@sspread yes, that will work as long as you only have 1 node running the workers. The problem with a distributed system is that it is hard to know when all the workers are done with the current jobs they are working with, and currently we do not have such feature available in BullMQ yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants