Skip to content

Commit

Permalink
[Core] Fix race condition in multi-threaded actor creation (#44232)
Browse files Browse the repository at this point in the history
Signed-off-by: Rui Qiao <ruisearch42@gmail.com>
  • Loading branch information
ruisearch42 committed Mar 25, 2024
1 parent c6f3735 commit e75b97b
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
3 changes: 2 additions & 1 deletion python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ def _remote(self, args=None, kwargs=None, **actor_options):
# If this actor class was not exported in this session and job,
# we need to export this function again, because current GCS
# doesn't have it.
meta.last_export_session_and_job = worker.current_session_and_job

# After serialize / deserialize modified class, the __module__
# of modified class will be ray.cloudpickle.cloudpickle.
# So, here pass actor_creation_function_descriptor to make
Expand All @@ -1040,6 +1040,7 @@ def _remote(self, args=None, kwargs=None, **actor_options):
meta.actor_creation_function_descriptor,
meta.method_meta.methods.keys(),
)
meta.last_export_session_and_job = worker.current_session_and_job

resources = ray._private.utils.resources_from_ray_options(actor_options)
# Set the actor's default resources if not already set. First three
Expand Down
59 changes: 59 additions & 0 deletions python/ray/tests/test_actor_advanced.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os
import random
import sys
import threading
import time
import traceback

import numpy as np
import pytest
Expand Down Expand Up @@ -1170,6 +1173,62 @@ def do_run(name, concurrency=4):
assert ["ok"] * CONCURRENCY == results


def test_create_actor_race_condition(shutdown_only):
"""Make sure we can create actors in multiple threads without
race conditions.
Check https://github.com/ray-project/ray/issues/41324
"""

@ray.remote
class Actor:
pass

def create(name, namespace, results, i):
time.sleep(random.random())
try:
Actor.options(
name=name,
namespace=namespace,
get_if_exists=True,
lifetime="detached",
).remote()
results[i] = "ok"
except Exception:
e = traceback.format_exc()
results[i] = e

CONCURRENCY = 1000
ACTOR_NAME = "TestActor"
ACTOR_NAMESPACE = "TestNamespace"

def run_and_check():
results = [None] * CONCURRENCY
threads = [None] * CONCURRENCY
for i in range(CONCURRENCY):
threads[i] = threading.Thread(
target=create, args=(ACTOR_NAME, ACTOR_NAMESPACE, results, i)
)

for thread in threads:
thread.start()

for thread in threads:
thread.join()

for result in results:
assert result == "ok"

actor = ray.get_actor(
ACTOR_NAME, namespace=ACTOR_NAMESPACE
) # Creation and get should be successful
ray.kill(actor) # Cleanup

ray.init()
for _ in range(50):
run_and_check()


def test_get_actor_in_remote_workers(ray_start_cluster):
"""Make sure we can get and create actors without
race condition in a remote worker.
Expand Down

0 comments on commit e75b97b

Please sign in to comment.