Skip to content

Commit

Permalink
Misc direct call fixes from unit tests (#6394)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl committed Dec 9, 2019
1 parent 1a9948e commit a6bc2b1
Show file tree
Hide file tree
Showing 13 changed files with 99 additions and 70 deletions.
22 changes: 12 additions & 10 deletions python/ray/serialization.py
Expand Up @@ -177,18 +177,19 @@ def object_id_serializer(obj):

def object_id_deserializer(serialized_obj):
obj_id, owner_id, owner_address = pickle.loads(serialized_obj)
# Must deserialize the object in the core worker before we
# create the ObjectID to ensure that the reference is added
# before we increment its count to 1.
# NOTE(swang): Must deserialize the object first before asking
# the core worker to resolve the value. This is to make sure
# that the ref count for the ObjectID is greater than 0 by the
# time the core worker resolves the value of the object.
deserialized_object_id = obj_id[0](obj_id[1][0])
if owner_id:
worker = ray.worker.get_global_worker()
worker.check_connected()
# UniqueIDs are serialized as
# (class name, (unique bytes,)).
worker.core_worker.deserialize_and_register_object_id(
obj_id[1][0], owner_id[1][0], owner_address)
obj_id = obj_id[0](obj_id[1][0])
return obj_id
return deserialized_object_id

for id_type in ray._raylet._ID_TYPES:
if id_type == ray._raylet.ObjectID:
Expand Down Expand Up @@ -241,18 +242,19 @@ def object_id_serializer(obj):

def object_id_deserializer(serialized_obj):
obj_id, owner_id, owner_address = serialized_obj
# Must deserialize the object in the core worker before we
# create the ObjectID to ensure that the reference is added
# before we increment its count to 1.
# NOTE(swang): Must deserialize the object first before asking
# the core worker to resolve the value. This is to make sure
# that the ref count for the ObjectID is greater than 0 by the
# time the core worker resolves the value of the object.
deserialized_object_id = id_deserializer(obj_id)
if owner_id:
worker = ray.worker.get_global_worker()
worker.check_connected()
# UniqueIDs are serialized as
# (class name, (unique bytes,)).
worker.core_worker.deserialize_and_register_object_id(
obj_id[1][0], owner_id[1][0], owner_address)
obj_id = id_deserializer(obj_id)
return obj_id
return deserialized_object_id

for id_type in ray._raylet._ID_TYPES:
if id_type == ray._raylet.ObjectID:
Expand Down
3 changes: 1 addition & 2 deletions src/ray/common/task/task_spec.cc
Expand Up @@ -224,8 +224,7 @@ bool TaskSpecification::IsAsyncioActor() const {
}

bool TaskSpecification::IsDetachedActor() const {
RAY_CHECK(IsActorCreationTask());
return message_->actor_creation_task_spec().is_detached();
return IsActorCreationTask() && message_->actor_creation_task_spec().is_detached();
}

std::string TaskSpecification::DebugString() const {
Expand Down
29 changes: 18 additions & 11 deletions src/ray/core_worker/core_worker.cc
Expand Up @@ -972,26 +972,33 @@ void CoreWorker::HandleGetObjectStatus(const rpc::GetObjectStatusRequest &reques
ObjectID object_id = ObjectID::FromBinary(request.object_id());
TaskID owner_id = TaskID::FromBinary(request.owner_id());
if (owner_id != GetCallerId()) {
// We may have owned this object in the past, but we are now executing some
// other task or actor.
reply->set_status(rpc::GetObjectStatusReply::WRONG_OWNER);
send_reply_callback(Status::OK(), nullptr, nullptr);
} else {
// We own the task. Reply back to the borrower once the object has been
// created.
// TODO: We could probably just send the object value if it is small
// enough and we have it local.
reply->set_status(rpc::GetObjectStatusReply::CREATED);
RAY_LOG(INFO) << "Handling GetObjectStatus for object produced by previous task "
<< owner_id.Hex();
}
// We own the task. Reply back to the borrower once the object has been
// created.
// TODO(swang): We could probably just send the object value if it is small
// enough and we have it local.
reply->set_status(rpc::GetObjectStatusReply::CREATED);
if (task_manager_->IsTaskPending(object_id.TaskId())) {
// Acquire a reference and retry. This prevents the object from being
// evicted out from under us before we can start the get.
AddObjectIDReference(object_id);
if (task_manager_->IsTaskPending(object_id.TaskId())) {
// The task is pending. Send the reply once the task finishes.
memory_store_->GetAsync(object_id,
[send_reply_callback](std::shared_ptr<RayObject> obj) {
send_reply_callback(Status::OK(), nullptr, nullptr);
});
RemoveObjectIDReference(object_id);
} else {
// The task is done. Send the reply immediately.
// We lost the race, the task is done.
RemoveObjectIDReference(object_id);
send_reply_callback(Status::OK(), nullptr, nullptr);
}
} else {
// The task is done. Send the reply immediately.
send_reply_callback(Status::OK(), nullptr, nullptr);
}
}

Expand Down
8 changes: 3 additions & 5 deletions src/ray/core_worker/future_resolver.cc
Expand Up @@ -19,11 +19,9 @@ void FutureResolver::ResolveFutureAsync(const ObjectID &object_id, const TaskID
RAY_CHECK_OK(it->second->GetObjectStatus(
request,
[this, object_id](const Status &status, const rpc::GetObjectStatusReply &reply) {
if (!status.ok() || reply.status() == rpc::GetObjectStatusReply::WRONG_OWNER) {
RAY_LOG(ERROR)
<< "Error retrieving the value of object ID " << object_id
<< " that was deserialized. Probably, the task or actor that created the "
"object ID initially (via ray.put or task submission) has exited.";
if (!status.ok()) {
RAY_LOG(ERROR) << "Error retrieving the value of object ID " << object_id
<< " that was deserialized: " << status.ToString();
}
// Either the owner is gone or the owner replied that the object has
// been created. In both cases, we can now try to fetch the object via
Expand Down
8 changes: 6 additions & 2 deletions src/ray/core_worker/reference_count.cc
Expand Up @@ -6,8 +6,12 @@ void ReferenceCounter::AddBorrowedObject(const ObjectID &object_id,
const TaskID &owner_id,
const rpc::Address &owner_address) {
absl::MutexLock lock(&mutex_);
RAY_CHECK(
object_id_refs_.emplace(object_id, Reference(owner_id, owner_address)).second);
auto it = object_id_refs_.find(object_id);
RAY_CHECK(it != object_id_refs_.end());

if (!it->second.owner.has_value()) {
it->second.owner = {owner_id, owner_address};
}
}

void ReferenceCounter::AddOwnedObject(
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/reference_count.h
Expand Up @@ -107,7 +107,7 @@ class ReferenceCounter {
/// The object's owner, if we know it. This has no value if the object is
/// if we do not know the object's owner (because distributed ref counting
/// is not yet implemented).
const absl::optional<std::pair<TaskID, rpc::Address>> owner;
absl::optional<std::pair<TaskID, rpc::Address>> owner;
};

/// Helper function with the same semantics as AddReference to allow adding a reference
Expand Down
20 changes: 9 additions & 11 deletions src/ray/core_worker/task_manager.cc
Expand Up @@ -9,6 +9,11 @@ void TaskManager::AddPendingTask(const TaskSpecification &spec, int max_retries)
RAY_CHECK(pending_tasks_.emplace(spec.TaskId(), std::move(entry)).second);
}

bool TaskManager::IsTaskPending(const TaskID &task_id) const {
absl::MutexLock lock(&mu_);
return pending_tasks_.count(task_id) > 0;
}

void TaskManager::CompletePendingTask(const TaskID &task_id,
const rpc::PushTaskReply &reply) {
RAY_LOG(DEBUG) << "Completing task " << task_id;
Expand Down Expand Up @@ -50,17 +55,10 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
}

void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type) {
if (error_type == rpc::ErrorType::ACTOR_DIED) {
// Note that this might be the __ray_terminate__ task, so we don't log
// loudly with ERROR here.
RAY_LOG(INFO) << "Task " << task_id << " failed with error "
<< rpc::ErrorType_Name(error_type);
} else {
RAY_LOG(ERROR) << "Task " << task_id << " failed with error "
<< rpc::ErrorType_Name(error_type);
}

RAY_LOG(DEBUG) << "Failing task " << task_id;
// Note that this might be the __ray_terminate__ task, so we don't log
// loudly with ERROR here.
RAY_LOG(DEBUG) << "Task " << task_id << " failed with error "
<< rpc::ErrorType_Name(error_type);
int num_retries_left = 0;
TaskSpecification spec;
{
Expand Down
6 changes: 2 additions & 4 deletions src/ray/core_worker/task_manager.h
Expand Up @@ -43,9 +43,7 @@ class TaskManager : public TaskFinisherInterface {
///
/// \param[in] task_id ID of the task to query.
/// \return Whether the task is pending.
bool IsTaskPending(const TaskID &task_id) const {
return pending_tasks_.count(task_id) > 0;
}
bool IsTaskPending(const TaskID &task_id) const;

/// Write return objects for a pending task to the memory store.
///
Expand Down Expand Up @@ -75,7 +73,7 @@ class TaskManager : public TaskFinisherInterface {
const RetryTaskCallback retry_task_callback_;

/// Protects below fields.
absl::Mutex mu_;
mutable absl::Mutex mu_;

/// Map from task ID to a pair of:
/// {task spec, number of allowed retries left}
Expand Down
51 changes: 31 additions & 20 deletions src/ray/core_worker/transport/direct_task_transport.cc
Expand Up @@ -99,7 +99,7 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
auto lease_client = GetOrConnectLeaseClient(raylet_address);
TaskSpecification &resource_spec = it->second.front();
TaskID task_id = resource_spec.TaskId();
RAY_CHECK_OK(lease_client->RequestWorkerLease(
auto status = lease_client->RequestWorkerLease(
resource_spec,
[this, lease_client, task_id, scheduling_key](
const Status &status, const rpc::WorkerLeaseReply &reply) mutable {
Expand All @@ -120,41 +120,50 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
RequestNewWorkerIfNeeded(scheduling_key, &reply.retry_at_raylet_address());
}
} else {
RAY_LOG(DEBUG) << "Retrying lease request " << task_id;
if (lease_client != local_lease_client_) {
// A lease request to a remote raylet failed. Retry locally if the lease is
// still needed.
// TODO(swang): Fail after some number of retries?
RAY_LOG(ERROR) << "Retrying attempt to schedule task at remote node. Error: "
<< status.ToString();
RequestNewWorkerIfNeeded(scheduling_key);
} else {
// A local request failed. This shouldn't happen if the raylet is still alive
// and we don't currently handle raylet failures, so treat it as a fatal
// error.
RAY_LOG(FATAL) << "Lost connection with local raylet. Error: "
<< status.ToString();
}
RetryLeaseRequest(status, lease_client, scheduling_key);
}
}));
});
if (!status.ok()) {
RetryLeaseRequest(status, lease_client, scheduling_key);
}
pending_lease_requests_.insert(scheduling_key);
}

void CoreWorkerDirectTaskSubmitter::RetryLeaseRequest(
Status status, std::shared_ptr<WorkerLeaseInterface> lease_client,
const SchedulingKey &scheduling_key) {
if (lease_client != local_lease_client_) {
// A lease request to a remote raylet failed. Retry locally if the lease is
// still needed.
// TODO(swang): Fail after some number of retries?
RAY_LOG(ERROR) << "Retrying attempt to schedule task at remote node. Error: "
<< status.ToString();
RequestNewWorkerIfNeeded(scheduling_key);
} else {
// A local request failed. This shouldn't happen if the raylet is still alive
// and we don't currently handle raylet failures, so treat it as a fatal
// error.
RAY_LOG(FATAL) << "Lost connection with local raylet. Error: " << status.ToString();
}
}

void CoreWorkerDirectTaskSubmitter::PushNormalTask(
const rpc::WorkerAddress &addr, rpc::CoreWorkerClientInterface &client,
const SchedulingKey &scheduling_key, const TaskSpecification &task_spec,
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry> &assigned_resources) {
auto task_id = task_spec.TaskId();
auto request = std::unique_ptr<rpc::PushTaskRequest>(new rpc::PushTaskRequest);
bool is_actor = task_spec.IsActorTask();
RAY_LOG(DEBUG) << "Pushing normal task " << task_spec.TaskId();
// NOTE(swang): CopyFrom is needed because if we use Swap here and the task
// fails, then the task data will be gone when the TaskManager attempts to
// access the task.
request->mutable_task_spec()->CopyFrom(task_spec.GetMessage());
request->mutable_resource_mapping()->CopyFrom(assigned_resources);
RAY_CHECK_OK(client.PushNormalTask(
std::move(request), [this, task_id, scheduling_key, addr, assigned_resources](
Status status, const rpc::PushTaskReply &reply) {
std::move(request),
[this, task_id, is_actor, scheduling_key, addr, assigned_resources](
Status status, const rpc::PushTaskReply &reply) {
{
absl::MutexLock lock(&mu_);
OnWorkerIdle(addr, scheduling_key, /*error=*/!status.ok(), assigned_resources);
Expand All @@ -164,7 +173,9 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
// failure (e.g., by contacting the raylet). If it was a process
// failure, it may have been an application-level error and it may
// not make sense to retry the task.
task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::WORKER_DIED);
task_finisher_->PendingTaskFailed(task_id, is_actor
? rpc::ErrorType::ACTOR_DIED
: rpc::ErrorType::WORKER_DIED);
} else {
task_finisher_->CompletePendingTask(task_id, reply);
}
Expand Down
6 changes: 6 additions & 0 deletions src/ray/core_worker/transport/direct_task_transport.h
Expand Up @@ -64,6 +64,12 @@ class CoreWorkerDirectTaskSubmitter {
const google::protobuf::RepeatedPtrField<rpc::ResourceMapEntry> &assigned_resources)
EXCLUSIVE_LOCKS_REQUIRED(mu_);

/// Retry a failed lease request.
void RetryLeaseRequest(Status status,
std::shared_ptr<WorkerLeaseInterface> lease_client,
const SchedulingKey &scheduling_key)
EXCLUSIVE_LOCKS_REQUIRED(mu_);

/// Get an existing lease client or connect a new one. If a raylet_address is
/// provided, this connects to a remote raylet. Else, this connects to the
/// local raylet.
Expand Down
1 change: 0 additions & 1 deletion src/ray/protobuf/core_worker.proto
Expand Up @@ -103,7 +103,6 @@ message GetObjectStatusRequest {
message GetObjectStatusReply {
enum ObjectStatus {
CREATED = 0;
WRONG_OWNER = 1;
}
ObjectStatus status = 1;
}
Expand Down
11 changes: 9 additions & 2 deletions src/ray/raylet/node_manager.cc
Expand Up @@ -1472,7 +1472,10 @@ void NodeManager::ProcessReportActiveObjectIDs(
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
if (!worker) {
worker = worker_pool_.GetRegisteredDriver(client);
RAY_CHECK(worker);
if (!worker) {
RAY_LOG(ERROR) << "Ignoring object ids report from failed / unknown worker.";
return;
}
}

auto message = flatbuffers::GetRoot<protocol::ReportActiveObjectIDs>(message_data);
Expand Down Expand Up @@ -2273,8 +2276,12 @@ void NodeManager::AssignTask(const std::shared_ptr<Worker> &worker, const Task &

auto task_id = spec.TaskId();
if (task.OnDispatch() != nullptr) {
if (task.GetTaskSpecification().IsDetachedActor()) {
worker->MarkDetachedActor();
}
task.OnDispatch()(worker, initial_config_.node_manager_address, worker->Port(),
worker->GetTaskResourceIds());
spec.IsActorCreationTask() ? worker->GetLifetimeResourceIds()
: worker->GetTaskResourceIds());
post_assign_callbacks->push_back([this, worker, task_id]() {
FinishAssignTask(worker, task_id, /*success=*/true);
});
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/raylet_client.cc
Expand Up @@ -419,7 +419,7 @@ ray::Status RayletClient::ReturnWorker(int worker_port, bool disconnect_worker)
return grpc_client_->ReturnWorker(
request, [](const ray::Status &status, const ray::rpc::ReturnWorkerReply &reply) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Error returning worker: " << status;
RAY_LOG(INFO) << "Error returning worker: " << status;
}
});
}

0 comments on commit a6bc2b1

Please sign in to comment.