Skip to content

Commit

Permalink
Fix actor creation hang due to race in SWAP queue (#6280)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl committed Nov 26, 2019
1 parent cafdaa3 commit 30b2fc1
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 18 deletions.
2 changes: 2 additions & 0 deletions python/ray/cluster_utils.py
Expand Up @@ -92,6 +92,8 @@ def add_node(self, **node_args):
self.webui_url = self.head_node.webui_url
else:
ray_params.update_if_absent(redis_address=self.redis_address)
# We only need one log monitor per physical node.
ray_params.update_if_absent(include_log_monitor=False)
# Let grpc pick a port.
ray_params.update(node_manager_port=0)
node = ray.node.Node(
Expand Down
2 changes: 1 addition & 1 deletion python/ray/tests/BUILD
Expand Up @@ -10,7 +10,7 @@ py_test(
name = "test_actor_direct",
size = "medium",
srcs = ["test_actor_direct.py", "test_actor.py"],
tags = ["exclusive", "manual"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)

Expand Down
2 changes: 0 additions & 2 deletions python/ray/tests/test_multi_node_2.py
Expand Up @@ -215,8 +215,6 @@ def test_worker_plasma_store_failure(ray_start_cluster_head):
cluster = ray_start_cluster_head
worker = cluster.add_node()
cluster.wait_for_nodes()
# Log monitor doesn't die for some reason
worker.kill_log_monitor()
worker.kill_reporter()
worker.kill_plasma_store()
worker.all_processes[ray_constants.PROCESS_TYPE_RAYLET][0].process.wait()
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/core_worker.cc
Expand Up @@ -709,9 +709,9 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle) {
it->second->Reset();
}
} else if (actor_data.state() == gcs::ActorTableData::DEAD) {
RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(actor_id, nullptr));
// We cannot erase the actor handle here because clients can still
// submit tasks to dead actors.
// submit tasks to dead actors. This also means we defer unsubscription,
// otherwise we crash when bulk unsubscribing all actor handles.
}

direct_actor_submitter_->HandleActorUpdate(actor_id, actor_data);
Expand Down
14 changes: 1 addition & 13 deletions src/ray/raylet/node_manager.cc
Expand Up @@ -2632,21 +2632,9 @@ void NodeManager::ForwardTask(
task_entry.second.TaskData().GetTaskExecutionSpec().GetMessage());
}

// Move the FORWARDING task to the SWAP queue so that we remember that we
// have it queued locally. Once the ForwardTaskRequest has been sent, the
// task will get re-queued, depending on whether the message succeeded or
// not.
local_queues_.QueueTasks({task}, TaskState::SWAP);
client->ForwardTask(request, [this, on_error, task_id, node_id](
client->ForwardTask(request, [this, on_error, task, task_id, node_id](
Status status, const rpc::ForwardTaskReply &reply) {
// Remove the FORWARDING task from the SWAP queue.
Task task;
TaskState state;
if (!local_queues_.RemoveTask(task_id, &task, &state)) {
return;
}
RAY_CHECK(state == TaskState::SWAP);

if (status.ok()) {
const auto &spec = task.GetTaskSpecification();
// Mark as forwarded so that the task and its lineage are not
Expand Down

0 comments on commit 30b2fc1

Please sign in to comment.