Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add result blocking #1939

Merged
merged 9 commits into from
Jun 21, 2023
Merged

Conversation

ajnisbet
Copy link
Contributor

@ajnisbet ajnisbet commented Jun 8, 2023

Add blocking while waiting for results.

Fixes #151 and somewhat addresses #717

Design motivation copied from #717 (comment)


huey has result.get(blocking=True), though this is implemented with polling on a 100ms interval, which isn't a good experience.

dramatiq has message.get_result(block=True) and group.get_results(block=True), which use blocking brpoplpush. (However they use polling for the worker to grab jobs, unlike rq).

celery's AsyncResult blocks by default when using result.get(). Celery's source code is too confusing to to find where this is implemented, but the docs suggest no polling is done for redis: "Note that this does not have any effect when using the RPC/redis result store backends, as they don't use polling."

Right now, the easiest thing to do is handle this outside rq:

  • As a callback, RPUSH the result to a key like rq:custom-result-list:{job_id}
  • In the main non-worker BLPOP the result.

@codecov
Copy link

codecov bot commented Jun 8, 2023

Codecov Report

Patch coverage: 100.00% and project coverage change: +0.04 🎉

Comparison is base (a26f624) 93.59% compared to head (102eb2b) 93.63%.

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #1939      +/-   ##
==========================================
+ Coverage   93.59%   93.63%   +0.04%     
==========================================
  Files          28       28              
  Lines        3729     3755      +26     
==========================================
+ Hits         3490     3516      +26     
  Misses        239      239              
Impacted Files Coverage Δ
rq/job.py 97.41% <100.00%> (+<0.01%) ⬆️
rq/results.py 96.49% <100.00%> (+0.30%) ⬆️

... and 3 files with indirect coverage changes

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.


# Should block if there's no result.
timeout = 1
result_sync = Result.fetch_latest(job)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just crossed my thought: should we check whether the job has started or finished before waiting for result? If job has not been started, should it raise an exception?

Copy link
Contributor Author

@ajnisbet ajnisbet Jun 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If job has not been started, should it raise an exception?

My instinct is no.

Typically I just want to "fire and wait", without really caring if I'm waiting because my workers are busy with other jobs vs waiting for execution to finish. Do you have an example in mind where that difference would be meaningful?

Also I'll admit my motivation for adding this PR was to reduce the latency and performance hit of polling for results. Adding an exception for waiting for an un-started job would mean either going back to polling for a started job before blocking for results, or adding a new Job.block_until_started() method.

should we check whether the job has started or finished before waiting for result?

I think regardless of the job status we'd need to query the result:

  • If the job is FINISHED/FAILED we need to get the result.
  • If the job is QUEUEDor otherwise pending, by the time Job.get_status() returns, the job could have finished with a Result by now, so we need to check for a result.
  • If the job is CANCELLED/STOPPED I think there can still be a result from a prior retry attempt?

So yes if the job is FINISHED we could use non-blocking xrevrange rather than xread to get the result, which is a cheaper redis function (as xread always returns all results not just one). But to get that status requires a hget call which outweighs any performance advantage.


One status-related issue I can see is that when retries are used, people may want to block until a successful result: again not caring about the difference between a failed attempt and a pending task. Either way, I don't have a useful result!

We could add this into rq with a new Job.latest_successful_result() method, or only_successful=False argument to latest_result.

Though this can be handled by the user without adding extra complexity in the rq api:

block_until = time.time() + timeout
timeout_remaining = timeout
result = None
while timeout_remaining > 0 and (result is None or result_status != result.SUCCESSFUL) :
    result = job.latest_result(timeout=timeout_remaining)
    timeout_remaining = block_until - time.time()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your reasoning. I'll take another look soon.

@@ -156,3 +156,10 @@ job = Job.fetch(id='my_id', connection=redis)
for result in job.results():
print(result.created_at, result.type)
```

To block until a result arrives, you can pass a timeout in seconds to `job.latest_result()`. If a result already exists, it is returned immediately. If the timeout is reached without a result arriving a `None` object is returned.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should clarify that the latest result will be returned.

Copy link
Contributor Author

@ajnisbet ajnisbet Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now:

To block until a result arrives, you can pass a timeout in seconds to `job.latest_result()`. If 
any results already exist, the latest result is returned immediately. If the timeout is reached 
without a result arriving a `None` object is returned.

Comment on lines +158 to +160
response = response[0] # Querying single stream only.
response = response[1] # Xread also returns Result.id, which we don't need.
result_id, payload = response[-1] # Take most recent result.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use count argument so that xread only returns 1 result? https://redis.io/commands/xread/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately not, as COUNT N returns the first N results, not the latest.

Redis doesn't have a XREVREAD, nor a XREVRANGE BLOCK

w = Worker([q])
started_at = time.time()
w.work(burst=True)
result = job.latest_result(timeout=block_seconds)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't test the blocking feature because job.latest_result is called after job has finished executin (line 1232). To test if this truly blocks, the work has to be done in the background.

It should look like this:

w.work(burst=True)  # In the background
started_at = time.time()
result = job.latest_result(timeout=block_seconds)
self.assertGreaterEqual(blocked_for, job_sleep_seconds)

Copy link
Contributor Author

@ajnisbet ajnisbet Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, good catch!

I replaced w.work with fixtures.start_worker_process which runs in the background.

(I tried moving started_at =... after the worker start, but it creates a flaky race-condition if the job starts before time.time() is called).

q = Queue(queue_name)
job = q.enqueue(fixtures.long_running_job, job_sleep_seconds)
started_at = time.time()
fixtures.start_worker_process(queue_name, burst=True)
result = job.latest_result(timeout=block_seconds)
blocked_for = time.time() - started_at

@ajnisbet
Copy link
Contributor Author

I think the failing tests are unrelated?

tests/test_results.py Outdated Show resolved Hide resolved
tests/test_results.py Outdated Show resolved Hide resolved
docs/docs/results.md Outdated Show resolved Hide resolved
@selwin selwin merged commit 2d705f5 into rq:master Jun 21, 2023
36 checks passed
@selwin
Copy link
Collaborator

selwin commented Jun 21, 2023

Thanks!

selwin pushed a commit that referenced this pull request Feb 24, 2024
* Add result blocking

* Dev tidyup

* Skip XREAD test on old redis versions

* Lint

* Clarify that the latest result is returned.

* Fix job test ordering

* Remove test latency hack

* Readability improvements
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

adding blocking Job.get_result()
2 participants