Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing segfaults in Async Streaming Generator #43775

Merged
merged 18 commits into from Mar 9, 2024

Conversation

alexeykudinkin
Copy link
Contributor

@alexeykudinkin alexeykudinkin commented Mar 7, 2024

Related issue number

Addresses #43771

This change makes sure that no side-effects outlive invocation of execute_task method:

Previously, after scheduling tasks onto Core Worker's ThreadPoolExecutor these could have continued executing, even after the request has been cancelled (cancelling of the future wouldn't cancel already running task), leading to SIGSEGV when the task running in TPE would try to access data-structures that were already cleaned up after returning from this method.

With this change:

  • Upon encountering any failure, we'd set an interrupt_signal_event interrupting already scheduled, but not yet executed tasks (preventing them from modifying externally passed in data-structures)

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

@alexeykudinkin alexeykudinkin changed the title Fixing Streaming Generator segfaults [WIP] Fixing Streaming Generator segfaults Mar 7, 2024
@alexeykudinkin alexeykudinkin changed the title [WIP] Fixing Streaming Generator segfaults Fixing segfaults in Async Streaming Generator Mar 7, 2024
Copy link
Contributor

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LG. Thanks for fixing it.

@@ -126,7 +126,7 @@ cdef class CoreWorker:
object fd_to_cgname_dict
object _task_id_to_future_lock
dict _task_id_to_future
object thread_pool_for_async_event_loop
object event_loop_executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we call it thread_pool_executor_for_asnc_event_loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big fan of adding sentences as names (event_loop_executor captures the intent perfectly)

@@ -0,0 +1,127 @@
import time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to add it to bazel BUILD file otherwise it won't run by CI.

Do you want to add it to test_streaming_generator_4.py or create a _5.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!

@alexeykudinkin
Copy link
Contributor Author

@jjyao have to revisit the approach:

  • TPE is used cooperatively by multiple requests (hence can't shut it down)
  • Running multiple TPEs proves costly in the benchmarks (streaming_core_throughput.py)
  • Instead, revisited approach to use an external interruption (using threading.Event) that only causes very minor effect on performance (see below)
# Before (master)
Core Actors streaming throughput (ASYNC) (num_replicas=1, tokens_per_request=1000, batch_size=10): 13052.62 +- 372.43 tokens/s
(CallerActor pid=56100) Individual request quantiles:
(CallerActor pid=56100) 	P50=497.5112084999988
(CallerActor pid=56100) 	P75=648.4424480000001
(CallerActor pid=56100) 	P99=807.8154376599998

# After 
Core Actors streaming throughput (ASYNC) (num_replicas=1, tokens_per_request=1000, batch_size=10): 12782.83 +- 216.97 tokens/s
(CallerActor pid=51634) Individual request quantiles:
(CallerActor pid=51634) 	P50=504.2683960000014
(CallerActor pid=51634) 	P75=667.4028960000006
(CallerActor pid=51634) 	P99=811.2643177199991

Copy link
Contributor

@rkooo567 rkooo567 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the issue is if any of coroutine is failed (cancelled), it raises an exception before all tasks in the thread pool is finished -> ExecuteTask finishes before task inside thread pool finishes right?

Hmm actually I am curious why don't we just use gather(return_exceptions=True)?

In this case, gather will not return until everything in the coroutine is finished, and the race condition won't happen?

@@ -1295,6 +1296,11 @@ cdef report_streaming_generator_output(
# usage asap.
del output_or_exception

# NOTE: Once interrupting event is set by the caller, we can NOT access
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm theoretically, isn't this possible the code is already executing the code below?

Is it okay because none of other functions are not accessing data structure that could be deleted when a task finishes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I have the same question: isn't there still a race condition where the signal event is set between line 1301 and 1304?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically -- yes, RC is still possible, but is much, much less likely now as the task will have to be paused by the interpreter right at the spot where we're pushing into the vector, then switch over to the event-loop thread and let the caller complete freeing the vector.

This interrupting signal event though serves as a barrier guaranteeing that any tasks that still gonna reach this point won't proceed if the caller completed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll fully eliminate RC only with removing pushing into vector completely (which i'm doing in #42443)

break

# Make sure all RPC I/O completes before returning
await asyncio.gather(*futures)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: isn't using await asyncio.gather(*futures, return_exceptions=True) solve the issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Answered here: #43775 (comment)

@alexeykudinkin
Copy link
Contributor Author

IIUC the issue is if any of coroutine is failed (cancelled), it raises an exception before all tasks in the thread pool is finished -> ExecuteTask finishes before task inside thread pool finishes right?

Hmm actually I am curious why don't we just use gather(return_exceptions=True)?

In this case, gather will not return until everything in the coroutine is finished, and the race condition won't happen?

That's not sufficient unfortunately, b/c of cancellation -- in this case there will be CancelledError thrown at await asyncio.gather(...) which we have to handle.

On top of that -- cancelling future of the task already running in the executor won't interrupt the task, hence we have to "enforce" the interruption via signal to guarantee that the vector isn't being pushed into once we return from this method

Tidying up

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
…ks after this method returns

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Replaced it with cooperative cancellation (using threading.Event)

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
This reverts commit 3d8b154.

Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
Signed-off-by: Alexey Kudinkin <ak@anyscale.com>
@akshay-anyscale akshay-anyscale added the release-blocker P0 Issue that blocks the release label Mar 8, 2024
@jjyao jjyao merged commit cfebe14 into ray-project:master Mar 9, 2024
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ray 2.10 release-blocker P0 Issue that blocks the release
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants