Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -866,25 +866,6 @@ cc_test(
],
)

cc_test(
name = "reconstruction_policy_test",
srcs = ["src/ray/raylet/reconstruction_policy_test.cc"],
copts = COPTS + select({
"//:msvc-cl": [
],
"//conditions:default": [
# Ignore this warning since it's impractical to fix in the relevant headers
"-Wno-inconsistent-missing-override",
],
}),
deps = [
":node_manager_fbs",
":object_manager",
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "worker_pool_test",
srcs = ["src/ray/raylet/worker_pool_test.cc"],
Expand Down
1 change: 0 additions & 1 deletion python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ py_test_module_list(
"test_stress.py",
"test_stress_sharded.py",
"test_tensorflow.py",
"test_unreconstructable_errors.py",
],
size = "medium",
extra_srcs = SRCS,
Expand Down
30 changes: 0 additions & 30 deletions python/ray/tests/test_unreconstructable_errors.py

This file was deleted.

7 changes: 0 additions & 7 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,6 @@ RAY_CONFIG(size_t, free_objects_batch_size, 100)

RAY_CONFIG(bool, lineage_pinning_enabled, false)

/// Whether to enable the new scheduler. The new scheduler is designed
/// only to work with direct calls. Once direct calls are becoming
/// the default, this scheduler will also become the default.
RAY_CONFIG(bool, new_scheduler_enabled,
getenv("RAY_ENABLE_NEW_SCHEDULER") == nullptr ||
getenv("RAY_ENABLE_NEW_SCHEDULER") == std::string("1"))

// The max allowed size in bytes of a return object from direct actor calls.
// Objects larger than this size will be spilled/promoted to plasma.
RAY_CONFIG(int64_t, max_direct_call_object_size, 100 * 1024)
Expand Down
2 changes: 0 additions & 2 deletions src/ray/common/task/task_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ class TaskSpecification : public MessageWrapper<rpc::TaskSpec> {

ObjectID PreviousActorTaskDummyObjectId() const;

bool IsDirectCall() const;

int MaxActorConcurrency() const;

bool IsAsyncioActor() const;
Expand Down
40 changes: 23 additions & 17 deletions src/ray/object_manager/ownership_based_object_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
namespace ray {

OwnershipBasedObjectDirectory::OwnershipBasedObjectDirectory(
boost::asio::io_service &io_service, std::shared_ptr<gcs::GcsClient> &gcs_client)
: ObjectDirectory(io_service, gcs_client), client_call_manager_(io_service) {}
boost::asio::io_service &io_service, std::shared_ptr<gcs::GcsClient> &gcs_client,
std::function<void(const ObjectID &)> mark_as_failed)
: ObjectDirectory(io_service, gcs_client),
client_call_manager_(io_service),
mark_as_failed_(mark_as_failed) {}

namespace {

Expand All @@ -38,19 +41,22 @@ void FilterRemovedNodes(std::shared_ptr<gcs::GcsClient> gcs_client,
bool UpdateObjectLocations(const rpc::GetObjectLocationsOwnerReply &location_reply,
const Status &status, const ObjectID &object_id,
std::shared_ptr<gcs::GcsClient> gcs_client,
std::function<void(const ObjectID &)> mark_as_failed,
std::unordered_set<NodeID> *node_ids, std::string *spilled_url,
NodeID *spilled_node_id, size_t *object_size) {
bool is_updated = false;

std::unordered_set<NodeID> new_node_ids;

if (!status.ok()) {
RAY_LOG(INFO) << "Failed to return location updates to subscribers for " << object_id
RAY_LOG(INFO) << "Failed to return location updates to subscribers for " << object_id
<< ": " << status.ToString()
<< ", assuming that the object was freed or evicted.";
// When we can't get location updates from the owner, we assume that the object was
// freed or evicted, so we send an empty location update to all subscribers.
// When we can't get location updates from the owner, we assume that the object
// was freed or evicted, so we send an empty location update to all subscribers.
*node_ids = new_node_ids;
// Mark the object as failed immediately here since we know it can never appear.
mark_as_failed(object_id);
is_updated = true;
} else {
// The size can be 0 if the update was a deletion. This assumes that an
Expand Down Expand Up @@ -193,7 +199,7 @@ void OwnershipBasedObjectDirectory::SubscriptionCallback(
it->second.subscribed = true;

// Update entries for this object.
if (UpdateObjectLocations(reply, status, object_id, gcs_client_,
if (UpdateObjectLocations(reply, status, object_id, gcs_client_, mark_as_failed_,
&it->second.current_object_locations, &it->second.spilled_url,
&it->second.spilled_node_id, &it->second.object_size)) {
RAY_LOG(DEBUG) << "Pushing location updates to subscribers for object " << object_id
Expand Down Expand Up @@ -275,9 +281,9 @@ ray::Status OwnershipBasedObjectDirectory::SubscribeObjectLocations(
<< " locations, spilled_url: " << spilled_url
<< ", spilled node ID: " << spilled_node_id
<< ", object size: " << object_size;
// We post the callback to the event loop in order to avoid mutating data structures
// shared with the caller and potentially invalidating caller iterators.
// See https://github.com/ray-project/ray/issues/2959.
// We post the callback to the event loop in order to avoid mutating data
// structures shared with the caller and potentially invalidating caller
// iterators. See https://github.com/ray-project/ray/issues/2959.
io_service_.post(
[callback, locations, spilled_url, spilled_node_id, object_size, object_id]() {
callback(object_id, locations, spilled_url, spilled_node_id, object_size);
Expand Down Expand Up @@ -312,9 +318,9 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations(
auto &spilled_url = it->second.spilled_url;
auto &spilled_node_id = it->second.spilled_node_id;
auto object_size = it->second.object_size;
// We post the callback to the event loop in order to avoid mutating data structures
// shared with the caller and potentially invalidating caller iterators.
// See https://github.com/ray-project/ray/issues/2959.
// We post the callback to the event loop in order to avoid mutating data
// structures shared with the caller and potentially invalidating caller
// iterators. See https://github.com/ray-project/ray/issues/2959.
io_service_.post(
[callback, object_id, locations, spilled_url, spilled_node_id, object_size]() {
callback(object_id, locations, spilled_url, spilled_node_id, object_size);
Expand All @@ -325,9 +331,9 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations(
if (rpc_client == nullptr) {
RAY_LOG(WARNING) << "Object " << object_id << " does not have owner. "
<< "LookupLocations returns an empty list of locations.";
// We post the callback to the event loop in order to avoid mutating data structures
// shared with the caller and potentially invalidating caller iterators.
// See https://github.com/ray-project/ray/issues/2959.
// We post the callback to the event loop in order to avoid mutating data
// structures shared with the caller and potentially invalidating caller
// iterators. See https://github.com/ray-project/ray/issues/2959.
io_service_.post([callback, object_id]() {
callback(object_id, std::unordered_set<NodeID>(), "", NodeID::Nil(), 0);
});
Expand All @@ -350,8 +356,8 @@ ray::Status OwnershipBasedObjectDirectory::LookupLocations(
std::string spilled_url;
NodeID spilled_node_id;
size_t object_size = 0;
UpdateObjectLocations(reply, status, object_id, gcs_client_, &node_ids,
&spilled_url, &spilled_node_id, &object_size);
UpdateObjectLocations(reply, status, object_id, gcs_client_, mark_as_failed_,
&node_ids, &spilled_url, &spilled_node_id, &object_size);
RAY_LOG(DEBUG) << "Looked up locations for " << object_id
<< ", returning: " << node_ids.size()
<< " locations, spilled_url: " << spilled_url
Expand Down
5 changes: 4 additions & 1 deletion src/ray/object_manager/ownership_based_object_directory.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ class OwnershipBasedObjectDirectory : public ObjectDirectory {
/// \param gcs_client A Ray GCS client to request object and node
/// information from.
OwnershipBasedObjectDirectory(boost::asio::io_service &io_service,
std::shared_ptr<gcs::GcsClient> &gcs_client);
std::shared_ptr<gcs::GcsClient> &gcs_client,
std::function<void(const ObjectID &)> mark_as_failed);

virtual ~OwnershipBasedObjectDirectory() {}

Expand Down Expand Up @@ -70,6 +71,8 @@ class OwnershipBasedObjectDirectory : public ObjectDirectory {
private:
/// The client call manager used to create the RPC clients.
rpc::ClientCallManager client_call_manager_;
/// The callback used to mark an object as failed.
std::function<void(const ObjectID &)> mark_as_failed_;
/// Cache of gRPC clients to workers (not necessarily running on this node).
/// Also includes the number of inflight requests to each worker - when this
/// reaches zero, the client will be deleted and a new one will need to be created
Expand Down
11 changes: 0 additions & 11 deletions src/ray/raylet/dependency_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ void DependencyManager::RemoveObjectIfNotNeeded(
<< " request: " << required_object_it->second.wait_request_id;
object_manager_.CancelPull(required_object_it->second.wait_request_id);
}
if (!local_objects_.count(object_id)) {
reconstruction_policy_.Cancel(object_id);
}
required_objects_.erase(required_object_it);
}
}
Expand All @@ -43,9 +40,6 @@ DependencyManager::GetOrInsertRequiredObject(const ObjectID &object_id,
auto it = required_objects_.find(object_id);
if (it == required_objects_.end()) {
it = required_objects_.emplace(object_id, ref).first;
if (local_objects_.count(object_id) == 0) {
reconstruction_policy_.ListenAndMaybeReconstruct(object_id, ref.owner_address());
}
}
return it;
}
Expand Down Expand Up @@ -233,10 +227,6 @@ std::vector<TaskID> DependencyManager::HandleObjectMissing(
}
task_entry.num_missing_dependencies++;
}

// The object is missing and needed so wait for a possible failure again.
reconstruction_policy_.ListenAndMaybeReconstruct(object_entry->first,
object_entry->second.owner_address);
}

// Process callbacks for all of the tasks dependent on the object that are
Expand Down Expand Up @@ -285,7 +275,6 @@ std::vector<TaskID> DependencyManager::HandleObjectLocal(const ray::ObjectID &ob
object_manager_.CancelPull(object_entry->second.wait_request_id);
object_entry->second.wait_request_id = 0;
}
reconstruction_policy_.Cancel(object_entry->first);
RemoveObjectIfNotNeeded(object_entry);
}

Expand Down
15 changes: 2 additions & 13 deletions src/ray/raylet/dependency_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "ray/common/id.h"
#include "ray/common/task/task.h"
#include "ray/object_manager/object_manager.h"
#include "ray/raylet/reconstruction_policy.h"
// clang-format on

namespace ray {
Expand All @@ -28,8 +27,6 @@ namespace raylet {

using rpc::TaskLeaseData;

class ReconstructionPolicy;

/// Used for unit-testing the ClusterTaskManager, which requests dependencies
/// for queued tasks.
class TaskDependencyManagerInterface {
Expand All @@ -53,9 +50,8 @@ class TaskDependencyManagerInterface {
class DependencyManager : public TaskDependencyManagerInterface {
public:
/// Create a task dependency manager.
DependencyManager(ObjectManagerInterface &object_manager,
ReconstructionPolicyInterface &reconstruction_policy)
: object_manager_(object_manager), reconstruction_policy_(reconstruction_policy) {}
DependencyManager(ObjectManagerInterface &object_manager)
: object_manager_(object_manager) {}

/// Check whether an object is locally available.
///
Expand Down Expand Up @@ -225,13 +221,6 @@ class DependencyManager : public TaskDependencyManagerInterface {

/// The object manager, used to fetch required objects from remote nodes.
ObjectManagerInterface &object_manager_;
/// The reconstruction policy, used to reconstruct required objects that no
/// longer exist on any live nodes.
/// TODO(swang): This class is no longer needed for reconstruction, since the
/// object's owner handles reconstruction. We use this class as a timer to
/// detect the owner's failure. Remove this class and move the timer logic
/// into this class.
ReconstructionPolicyInterface &reconstruction_policy_;

/// A map from the ID of a queued task to metadata about whether the task's
/// dependencies are all local or not.
Expand Down
Loading