New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
jobs lost on hard shutdown #152
Comments
Yeah I think introducing a WorkingQueue is a good idea to make RQ more reliable. Perhaps we can introduce a What do you think @nvie ? |
My workaround for this is to audit the jobs after a worker fails. Basically iterate over all jobs and check each one is_queued and then compare it against a list of queued jobs, and then finally do a quick check that it hasn't just been processed. |
My thoughts on this subject:
For reasons the newsletter example above demonstrates, RQ never automatically retries executing jobs. But I agree that for some types of jobs (like in your case) that behaviour would be useful. It depends, basically. Your job is a so called idempotent operation in CS speak. Constructing jobs such that they are idempotent operations is a great thing. More on that in a minute. Let's reflect a bit first. Right now, RQ promises the following behaviour:
In your case, Heroku is terminating your jobs with a cold shutdown—you don't have much to control it, and due to (2), this is a problem in your case, hence this GitHub issue is submitted. Possible path forwardOne possible future could be that we require jobs to be idempotent operations. This way, RQ may assume that jobs are safe to re-execute, so it may do that automatically in case of a cold shutdown, or in case of an uncaught exception. (By the way: I have not yet decided on whether a WorkingQueue is a good idea, or not. Let's save that discussion for an implementation thread on the specific feature.) Of course, there are jobs in the field currently that aren't idempotent and changing RQ's semantics should be done carefully and purposefully. To provide a few migration paths for all current jobs out there that aren't compatible with this, we could add a few helper objects:
RoadmapI think this is something that we can plan for having in a 0.4 release, together with concurrency support. Or maybe in a 0.4.1 release, if we like to get 0.4.0 out without waiting for this. As for implementation timeline, I'd like to finish the concurrency support first, as this will be the biggest haul-over, code-wise. After the dust has landed on that, it's a good idea to add retry semantics. Thoughts? |
Sounds reasonable. I went with RQ because of stupid simple - so I'm inclined to be cautious about requiring jobs to be idempotent. Having written a basic management command for my Heroku django project I think an rqaudit script that deals with invalid job_ids, orphaned jobs, and other rq data in a user defined snapshot fashion (deleting, re-queuing, or moving to failed) is actually a reasonable solution to the basic problem. It them becomes a monitoring issue then rather than a queue behaviour issue. In a non-Heroku environment, if you were using some kind of process manager like Circus layered on top which manages both Redis and workers in tandem, you might have one configuration hook that audits redis with user arguments to requeue or delete and another that doesn't. http://circus.readthedocs.org/en/0.6/hooks/. |
I think we all love RQ's simplicity and after thinking about it more, having a WorkingQueueMy thinking behind adding a At the moment, jobs that are being executed when a hard shutdown happens are "lost" by the queueing mechanism (not in any to be executed Queue or FailedQueue). @bretth 's workaround is to have an audit script that iterates through all jobs and check if they never finished executing, but if job results are persisted, the number of jobs in Redis could potentially be huge so this may not be possible on certain use cases. In my opinion, the introduction of
I'm with @bretth in that I think we shouldn't require jobs to be idempotent. Right now, any function can be turned into an RQ job - they don't have to be aware of RQ and I think we should keep it that way. I like @nvie's idea of having |
How a job gets aborted isn't really relevant, I think. It could be a case of Heroku killing the process, or a job timeout, or by an unforeseen divide-by-zero operation (i.e. an unhandled exception). I don't think distinguishing on the exact reason is useful here. What's relevant is that part of the job might have been executed and the important thing is whether or not re-executing that part is harmful or helpful. It depends. In other words, the question is: will we re-execute a job, ever, in case of a failure? We should make a statement about that. Either:
We can pick either one, and support the other case via a decorator. If we pick (1), then: @max_retries(3)
def my_job():
...
1/0 # fake abnormal termination, no matter which
... If we pick (2), then: @no_retry
def my_job():
...
1/0 # fake abnormal termination, no matter which
... Thoughts? |
In general case (and my particular one) a job may perform a number of time consuming and/or CPU (or IO) consuming operations before its failure. Assuming that I would rather vote for statement 1, because it would not affect current installations of RQ and would not introduce any hidden price for retrying a failed job. |
I also vote for number 1 :) |
1 also. A retry decorator is definitely the way to go. So yes that narrows this issue down to the question 'under what circumstances should a job be moved to the failed queue' and in my opinion any system failure is no different from a python exception and RQ should try to handle them as well as it can. As a general principle though, Redis is not Postgresql so I think making 'some' tradeoffs for convenience and speed over integrity are reasonable. For that reason I personally wouldn't trade off the convenience of listening on multiple lists/queues for the ability to more reliably PUSH onto another list. |
Yeah, I also like (1) the best, so that's a unanimous decision—great. Say we have this decorator. Now, the next problems of the actual implementation are:
Clearly, the second implementation is the hardest, but I think it's also the most useful one. Let's gather a few ideas around this implementation first. Thoughts, guys? |
Well, I think RQ needs scheduling anyways so it makes sense to go with the second option. rq-scheduler for example has a separate ordered queue for a scheduled or repeating jobs. |
I have a working implementation of RQ based scheduling mechanism https://github.com/ui/rq-scheduler . Feel free if you want to steal/borrow some code from there. |
OMG, @selwin, you're the man—I didn't know of this project's existence! I want this in RQ and I'd be very happy to integrate both projects! |
Would you mind adding the scheduler stuff to RQ, in the namespace One question: is there some form of protection against scheduling the same job twice? For example, if I have a function invocation that I want to invoke repeatedly and I'm unsure whether that is already scheduled—could I check that, or just schedule it and would that "duplication" be detected somehow? |
(By the way, @selwin, I have no other means of contacting you—there's no public email address on your GitHub profile. If you want to discuss some details around RQ scheduler integration, please send me a personal mail.) |
Oh shit, I was a stupid prick 6 months ago. So sorry for dismissing this awesome project! |
@nvie haha no worries, we didn't need it back then. My email is selwin.ong@gmail.com and I'd be more than happy to help merging scheduler into RQ proper. There are also some minor things that I'd like to change (like renaming Maybe this can go in as part of 0.5, after we get the concurrency issue sorted out? Concurrent workers is something that I've been wanting for quite a while too. And to answer your question about scheduling the same job twice, the Scheduler queue is implemented using Redis' Sorted Sets so jobs in that queue are guaranteed to be unique. You can also check whether a job is already scheduled this way: if job_instance in scheduler:
# Do something
# or
if job_id in scheduler:
# Do something |
@nvie I've started the initial merge in this branch https://github.com/selwin/rq/tree/scheduler-integration and all tests pass (both My next step is to move |
Thanks, @selwin! Once you're done, I'll have a few minor requests for |
No, worker changes shouldn't impact the scheduler. All it does is move jobs to the correct queues. What do you think about introducing new # Currently, this is how we use scheduler to schedule a job to "default" queue
scheduler = Scheduler('default', connection=redis)
scheduler.enqueue_at(datetime.now(), foo)
# With the new API, alternative 1
queue = Queue('default', connection=redis)
queue.enqueue_at(datetime.now(), bar)
# Alternative 2, if we want to make it clear that we're using the scheduler
queue.schedule_at(datetime.now(), bar) If we do indeed add these, I think I have a slight preference for alternative 2. |
Hey Selwin, thanks. That's exactly what I wanted to change :) Alternative |
work_in_progress_queue proposalSummaryRight now a job is being process a worker has not trace in any queue and thus, if the worker is terminated, all jobs being actually processed (currently only one, but could be more in the future) are lost. And that is very bad (in most use cases). Issue #152 opens that very issue, with some work being suggested toward solving it. I try to highlight the importance of the issue and to sum up the discussion in this issue and expose my view and the implementation I am thinking of. The issue is important because it incurs a big responsibility on the workers : not failing, ever. And a big part of the 'complexity' in the worker comes from handling interrupts (first time take notice and wait for the 'horse' to finish, second signal received : force the kill and die) and this becomes even more of a problem when a worker can span multiple horses (cf issue #152 and https://github.com/nvie/new_workers). And last, even with all the safeties around its design, a worker/horse could fail or be terminated abruptly and a job would be lost "in the aether" something I think is really wrong and Problem analysisI think a job should always be in one and only one of 3 states that should be represented by redis structures :
RQ manage redis structures for states 1 and 3, but the state 2 is "implicit" and under the worker responsibility. ProposalWe intend to implement state 2 in the form of a Sorted Set, one for each queue, with a score as a timestamps of (now + timeout)). The only problem is the job retrieval strategy as explained by issue #152 Original Poster. I am not sure a "redis transaction" can work around a blocking BRPOP ;) so we might have to accept that after blocking for job the worker/horse is, for a short time the sole recipient of a job right before registering it in the queue_name_wip (work_in_progress) ZSet. The only other change I see to a worker would be then to add a timeout to the BRPOP so that we can periodically do a A job that is retried would be put again in the queue_name_wip sorted set with a new "updated score" A job that has normally ended would be removed from the queue_name_wip sorted set at the same time its result is recorded into the result queue. Would this implementation be useful and acceptable to you ? NotesAll the "retry_count" stuff can be implemented on top of that (unpickling the job, checking the retry_count, decrementing it and saving the job) Mock up algorithmProtocol for fetching a job from a worker or horse :while true:
(queue_key, job_id) = BRPOP Q1,Q2,Q3, timeout = 30s
if queue_key is None: #no job in queues for timeout duration
(queue_key, job_id) = fetch_from_wip_queue(Q1,Q2,Q3) # zrange() + pop
if queue_key is None:
continue
# Here we have a job to do
# So we quickly register it with a temprary score in the corresponding
# wip_queue so that we do not lose it even in case of unpickling error
put_in_progress(queue_key+'wip', jobid, score=-1) #ZADD
# unpickling
Job=job.create(job_id)
#retry_count check
if job.retry_count == -1:
discard_job(wip_queue, job_id) #remove the job and from the WIP_queue
else:
timestamp_out = now + job.timeout
job.retry_count -= 1
with pipeline:
put_in_progress(queue+'_wip', job_id, score=timestamp_out) #ZADD
job.save(pipeline = pipeline)
exec()
#now we can truly process the job
do_work(job) |
Work for supporting Work In Progress (WIP) queues as described by above comment started in https://github.com/glenfant/rq/tree/wip-queue |
I like @Annakan's proposal but have a few suggestions:
Any comments, @nvie ? |
Thanks Selwin for your interest.
|
Which queue a job belongs to is stored in Unfortunately, RPOPLPUSH has the limitation of being able to only listen on one list at any one time. RQ's workers are currently designed to be able to listen on multiple queues simultaneously. To use BRPOPLPUSH, we'd need to limit workers to only listen on one queue at any given time. |
Ok for the As for For the curious and clearly diverting from this topic : The idea is to use But for this to work :
Most of this is figured out but I need to find some time to implement a proof of concept before submitting it, since it departs a bit from the "simplicity" mantra but on the other hand we would have a 100% safe job picking. |
Hem ;) That means that, potentially, a worker must open every jobs in the First that augments the access concurrency on the Second in case of a class of job failing with a retry count set above 0, the Basically we introduce a design where queues indirectly impact themselves where previously it was never the case. What do you think, am I off the rail or missing something ? |
I don't think we can rely on workers cleaning up the The scheduler process is perfect for this. So in addition to checking for jobs to be scheduled every 60 seconds, it will also go through the |
I now understand what you are thinking but we do have a disagreement ;)
And none of those considerations would exists if we adopt a "as many WorkInProgress queues as queues" design where workers can safely manage their queues and their WorkInProgress queues. And they don't even have to unpickle the tasks to retrieve them from the WIP queue (since it is a sorted set scored on time due) when their time has come. I thus and hereby ask for the judgement of God : NVie ???? ;) .X. |
Well, looks like a lot of discussion. This was my first test when evaluating this fantastic tool. Depending on the problem, it send the message to FAILED queue. But if its a hard shutdown, it get lost. I am used to the ACK/NACK mechanism. Will you think this is kind of disrupt the simplicity of rq? This does not rely at all on schedulers to solve this. But yes, changes the process once you have to came back and tell "yes, worked". Does it make sense? Best |
I got some ideas from https://github.com/richardhenry/hotqueue and their forks, and did a Flask RESTfull api over it, addressing the job lost problem. Each GET has an reservation_id, and the consumer must give an ACK or NACK within a timeframe. Another process is responsible to check for reservations, and if it finds old reservations, it gives auto-nack to the messages. Best |
What is the status of this issue? While it depends on the situation, at-least-once delivery combined with idempotent operations are a fairly standard approach to job execution and something I would love rq to be able to offer. |
This is solved in |
Using Heroku, I discovered jobs are getting lost if the worker is terminated through scaling or through other auto-management such as too much memory usage.
An example from the heroku logs:
2012-11-10T06:25:12+00:00 app[worker.1]: [2012-11-10 06:25] DEBUG: worker: Registering birth of worker ec196e3c-15cc-4056-a54c-22793c11402f.2
2012-11-10T06:25:12+00:00 app[worker.1]: [2012-11-10 06:25] INFO: worker: RQ worker started, version 0.3.2
2012-11-10T06:25:12+00:00 app[worker.1]: [2012-11-10 06:25] INFO: worker:
2012-11-10T06:25:12+00:00 app[worker.1]: [2012-11-10 06:25] INFO: worker: *** Listening on high, default, low...
2012-11-10T06:25:12+00:00 app[worker.1]: [2012-11-10 06:25] INFO: worker: default: project.management.commands.test_rq.func() (a1eb90ea-1c59-49a8-ae11-a089df09c096)
2012-11-10T06:25:25+00:00 app[worker.1]: [2012-11-10 06:25] DEBUG: worker: Got signal SIGTERM.
2012-11-10T06:25:25+00:00 app[worker.1]: [2012-11-10 06:25] WARNING: worker: Warm shut down requested.
2012-11-10T06:25:25+00:00 app[worker.1]: [2012-11-10 06:25] DEBUG: worker: Stopping after current horse is finished. Press Ctrl+C again for a cold shutdown.
2012-11-10T06:25:25+00:00 app[worker.1]: [2012-11-10 06:25] INFO: worker: Stopping on request.
2012-11-10T06:25:25+00:00 app[worker.1]: [2012-11-10 06:25] DEBUG: worker: Registering death
Viewing the FailedQueue shows no jobs.
Ideally, the solution to this problem would be to move the job to a 'WorkingQueue' instance for that queue (eg 'default' -> 'working:default') using BRPOPLPUSH before returning the job from Redis and then remove it from the WorkingQueue instance once the job completes or is moved to FailedQueue. Sadly BRPOPLPUSH doesn't help with the current BLPOP behaviour of listening on multiple queues.
You could reduce the risk however by pushing the job immediately onto the WorkingQueue instance stack after BLPOP. In the event a worker experiences a hard shutdown mid job, when the next worker is fired up it would check the WorkingQueue for it's queue's jobs and check if any have expired and move them to the failed queue if they have.
The text was updated successfully, but these errors were encountered: