Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Eagerly evict objects that are no longer in scope #7220

Merged
merged 4 commits into from Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/ray/common/ray_config_def.h
Expand Up @@ -44,6 +44,25 @@ RAY_CONFIG(bool, fair_queueing_enabled, true)
/// enabled, objects in scope in the cluster will not be LRU evicted.
RAY_CONFIG(bool, object_pinning_enabled, true)

/// If object_pinning_enabled is on, then objects that have been unpinned are
/// added to a local cache. When the cache is flushed, all objects in the cache
/// will be eagerly evicted in a batch by freeing all plasma copies in the
/// cluster. If set, then this is the duration between attempts to flush the
/// local cache. If this is set to 0, then the objects will be freed as soon as
/// they enter the cache. To disable eager eviction, set this to -1.
/// NOTE(swang): If distributed_ref_counting_enabled is off, then this will
/// likely cause spurious object lost errors for Object IDs that were
/// serialized, then either passed as an argument or returned from a task.
RAY_CONFIG(int64_t, free_objects_period_milliseconds, -1)

/// If object_pinning_enabled is on, then objects that have been unpinned are
/// added to a local cache. When the cache is flushed, all objects in the cache
/// will be eagerly evicted in a batch by freeing all plasma copies in the
/// cluster. This is the maximum number of objects in the local cache before it
/// is flushed. To disable eager eviction, set free_objects_period_milliseconds
/// to -1.
RAY_CONFIG(size_t, free_objects_batch_size, 100)

/// Whether to enable the new scheduler. The new scheduler is designed
/// only to work with direct calls. Once direct calls afre becoming
/// the default, this scheduler will also become the default.
Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/main.cc
Expand Up @@ -124,6 +124,8 @@ int main(int argc, char *argv[]) {
RayConfig::instance().raylet_heartbeat_timeout_milliseconds();
node_manager_config.debug_dump_period_ms =
RayConfig::instance().debug_dump_period_milliseconds();
node_manager_config.free_objects_period_ms =
RayConfig::instance().free_objects_period_milliseconds();
node_manager_config.fair_queueing_enabled =
RayConfig::instance().fair_queueing_enabled();
node_manager_config.object_pinning_enabled =
Expand Down
29 changes: 29 additions & 0 deletions src/ray/raylet/node_manager.cc
Expand Up @@ -83,6 +83,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
heartbeat_timer_(io_service),
heartbeat_period_(std::chrono::milliseconds(config.heartbeat_period_ms)),
debug_dump_period_(config.debug_dump_period_ms),
free_objects_period_(config.free_objects_period_ms),
fair_queueing_enabled_(config.fair_queueing_enabled),
object_pinning_enabled_(config.object_pinning_enabled),
temp_dir_(config.temp_dir),
Expand Down Expand Up @@ -213,6 +214,7 @@ ray::Status NodeManager::RegisterGcs() {
// Start sending heartbeats to the GCS.
last_heartbeat_at_ms_ = current_time_ms();
last_debug_dump_at_ms_ = current_time_ms();
last_free_objects_at_ms_ = current_time_ms();
Heartbeat();
// Start the timer that gets object manager profiling information and sends it
// to the GCS.
Expand Down Expand Up @@ -312,6 +314,12 @@ void NodeManager::Heartbeat() {
last_debug_dump_at_ms_ = now_ms;
}

// Evict all copies of freed objects from the cluster.
if (free_objects_period_ > 0 &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this in the heartbeat means the flush duration needs to be much larger than or a multiple of the heartbeat duration - might be better to just have a separate timer on the event loop. Otherwise should indicate this in the comments in the config.

static_cast<int64_t>(now_ms - last_free_objects_at_ms_) > free_objects_period_) {
FlushObjectsToFree();
}

// Reset the timer.
heartbeat_timer_.expires_from_now(heartbeat_period_);
heartbeat_timer_.async_wait([this](const boost::system::error_code &error) {
Expand Down Expand Up @@ -3123,6 +3131,14 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
RAY_LOG(DEBUG) << "Unpinning object " << object_id;
pinned_objects_.erase(object_id);

// Try to evict all copies of the object from the cluster.
objects_to_free_.push_back(object_id);
if (objects_to_free_.size() ==
RayConfig::instance().free_objects_batch_size() ||
free_objects_period_ == 0) {
FlushObjectsToFree();
}

// Remove the cached worker client if there are no more pending requests.
if (--worker_rpc_clients_[worker_id].second == 0) {
worker_rpc_clients_.erase(worker_id);
Expand All @@ -3132,6 +3148,19 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
send_reply_callback(Status::OK(), nullptr, nullptr);
}

void NodeManager::FlushObjectsToFree() {
if (free_objects_period_ < 0) {
return;
}

if (!objects_to_free_.empty()) {
RAY_LOG(DEBUG) << "Freeing " << objects_to_free_.size() << " out-of-scope objects";
object_manager_.FreeObjects(objects_to_free_, /*local_only=*/false);
objects_to_free_.clear();
}
last_free_objects_at_ms_ = current_time_ms();
}

void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &request,
rpc::GetNodeStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
Expand Down
18 changes: 18 additions & 0 deletions src/ray/raylet/node_manager.h
Expand Up @@ -55,6 +55,9 @@ struct NodeManagerConfig {
uint64_t heartbeat_period_ms;
/// The time between debug dumps in milliseconds, or -1 to disable.
uint64_t debug_dump_period_ms;
/// The time between attempts to eagerly evict objects from plasma in
/// milliseconds, or -1 to disable.
int64_t free_objects_period_ms;
/// Whether to enable fair queueing between task classes in raylet.
bool fair_queueing_enabled;
/// Whether to enable pinning for plasma objects.
Expand Down Expand Up @@ -160,6 +163,10 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// Write out debug state to a file.
void DumpDebugState() const;

/// Flush objects that are out of scope in the application. This will attempt
/// to eagerly evict all plasma copies of the object from the cluster.
void FlushObjectsToFree();

/// Get profiling information from the object manager and push it to the GCS.
///
/// \return Void.
Expand Down Expand Up @@ -590,6 +597,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
std::chrono::milliseconds heartbeat_period_;
/// The period between debug state dumps.
int64_t debug_dump_period_;
/// The period between attempts to eagerly evict objects from plasma.
int64_t free_objects_period_;
/// Whether to enable fair queueing between task classes in raylet.
bool fair_queueing_enabled_;
/// Whether to enable pinning for plasma objects.
Expand All @@ -608,6 +617,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
uint64_t last_heartbeat_at_ms_;
/// The time that the last debug string was logged to the console.
uint64_t last_debug_dump_at_ms_;
/// The time that we last sent a FreeObjects request to other nodes for
/// objects that have gone out of scope in the application.
uint64_t last_free_objects_at_ms_;
/// Initial node manager configuration.
const NodeManagerConfig initial_config_;
/// The resources (and specific resource IDs) that are currently available.
Expand Down Expand Up @@ -690,6 +702,12 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
absl::flat_hash_set<WorkerID> failed_workers_cache_;
/// Cache for the ClientTable in the GCS.
absl::flat_hash_set<ClientID> failed_nodes_cache_;

/// Objects that are out of scope in the application and that should be freed
/// from plasma. The cache is flushed when it reaches the config's
/// free_objects_batch_size, or if objects have been in the cache for longer
/// than the config's free_objects_period, whichever occurs first.
std::vector<ObjectID> objects_to_free_;
};

} // namespace raylet
Expand Down