Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gcs pull resource reports #14336

Merged
merged 57 commits into from Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
dc9336a
.
Feb 3, 2021
6432664
Merge branch 'master' of github.com:ray-project/ray into master
Feb 3, 2021
ca2afd1
done?
Feb 3, 2021
04b12a6
Merge branch 'master' of github.com:ray-project/ray into master
Feb 9, 2021
d69ca53
Merge branch 'master' of github.com:ray-project/ray into master
Feb 12, 2021
4b51207
Merge branch 'master' of github.com:ray-project/ray
Feb 23, 2021
652acb2
Merge branch 'master' of github.com:ray-project/ray into master
Feb 23, 2021
098e8c2
Merge branch 'master' of github.com:ray-project/ray
Feb 23, 2021
d92867c
Merge branch 'master' of github.com:wuisawesome/ray
Feb 23, 2021
58e2e78
raylet side done?
Feb 23, 2021
21cc430
.
Feb 23, 2021
fe94f3b
.
Feb 24, 2021
35a32ed
Update src/ray/raylet/node_manager.cc
Feb 24, 2021
3a084bc
.
Feb 25, 2021
7084689
.
Feb 25, 2021
b70ebe7
lint
Feb 25, 2021
bc6b3bd
added feature flag
Feb 25, 2021
4d061d9
.
Feb 26, 2021
5c915e7
.
Feb 26, 2021
0938b04
Without max inflight
Feb 27, 2021
8fb050f
Without max inflight
Feb 27, 2021
dc6405e
Looks like it might just work?
Feb 27, 2021
c59e5d7
.
Mar 1, 2021
fa4b71c
.
Mar 3, 2021
98deada
client
Mar 3, 2021
0304fbf
Merge branch 'raylet_request_resource_report' of github.com:wuisaweso…
Mar 3, 2021
c5ed4a2
Merge branch 'master' of github.com:ray-project/ray into raylet_reque…
Mar 3, 2021
6aba81a
.
Mar 3, 2021
64bacc3
Merge branch 'raylet_request_resource_report' of github.com:wuisaweso…
Mar 3, 2021
cb8591d
Merge branch 'master' of github.com:ray-project/ray into gcs_pull_res…
Mar 9, 2021
cf97a73
fix test
Mar 10, 2021
532a8d6
.
Mar 10, 2021
2239d0e
annotations
Mar 10, 2021
bc941bc
lint
Mar 10, 2021
088f36d
.
Mar 10, 2021
90ed7ec
.
Mar 16, 2021
d96c899
.
Mar 17, 2021
8750c09
.
Mar 17, 2021
61579e8
basic test
Mar 18, 2021
0b986c3
done?
Mar 18, 2021
b66640a
PullState constructor
Mar 19, 2021
ecebaef
PullState constructor
Mar 19, 2021
2800139
annotations
Mar 19, 2021
a1e52b0
comments
Mar 19, 2021
ff1f123
Merge branch 'master' of github.com:ray-project/ray into gcs_pull_res…
Mar 19, 2021
b3a717a
Done with testing?
Mar 19, 2021
31eab0d
logs
Mar 22, 2021
11d9677
Merge branch 'master' of github.com:ray-project/ray into gcs_pull_res…
Mar 22, 2021
d227826
lint
Mar 22, 2021
68bd287
Merge branch 'master' of github.com:ray-project/ray into gcs_pull_res…
Mar 22, 2021
a8f9ce0
works? need to fix logging now
Mar 22, 2021
67d574d
.
Mar 22, 2021
4bb447d
.
Mar 22, 2021
5a34b41
fix test
Mar 23, 2021
b66b5c3
appease clang
Mar 23, 2021
131d836
Merge branch 'master' of github.com:ray-project/ray into gcs_pull_res…
Mar 25, 2021
4646239
.
Mar 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions BUILD.bazel
Expand Up @@ -1232,6 +1232,19 @@ cc_test(
],
)

cc_test(
name = "gcs_resource_report_poller_test",
srcs = [
"src/ray/gcs/gcs_server/test/gcs_resource_report_poller_test.cc",
],
copts = COPTS,
deps = [
":gcs_server_lib",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_library(
name = "service_based_gcs_client_lib",
srcs = glob(
Expand Down
7 changes: 7 additions & 0 deletions src/ray/common/ray_config_def.h
Expand Up @@ -229,6 +229,13 @@ RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 100000)
RAY_CONFIG(uint32_t, maximum_gcs_dead_node_cached_count, 1000)
/// The interval at which the gcs server will print debug info.
RAY_CONFIG(int64_t, gcs_dump_debug_log_interval_minutes, 1)
// The interval at which the gcs server will pull a new resource.
RAY_CONFIG(int, gcs_resource_report_poll_period_ms, 100)
// The number of concurrent polls to polls to GCS.
RAY_CONFIG(uint64_t, gcs_max_concurrent_resource_pulls, 100)
// Feature flag to turn on resource report polling. Polling and raylet pushing are
// mutually exlusive.
RAY_CONFIG(bool, pull_based_resource_reporting, true)

/// Duration to sleep after failing to put an object in plasma because it is full.
RAY_CONFIG(uint32_t, object_store_full_delay_ms, 10)
Expand Down
38 changes: 20 additions & 18 deletions src/ray/gcs/gcs_server/gcs_resource_manager.cc
Expand Up @@ -161,12 +161,10 @@ void GcsResourceManager::HandleGetAllAvailableResources(
++counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST];
}

void GcsResourceManager::HandleReportResourceUsage(
const rpc::ReportResourceUsageRequest &request, rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.resources().node_id());
void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data) {
NodeID node_id = NodeID::FromBinary(data.node_id());
auto resources_data = std::make_shared<rpc::ResourcesData>();
resources_data->CopyFrom(request.resources());
resources_data->CopyFrom(data);
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved

// We use `node_resource_usages_` to filter out the nodes that report resource
// information for the first time. `UpdateNodeResourceUsage` will modify
Expand All @@ -177,13 +175,19 @@ void GcsResourceManager::HandleReportResourceUsage(
SetAvailableResources(node_id, ResourceSet(resource_changed));
}

UpdateNodeResourceUsage(node_id, request);
UpdateNodeResourceUsage(node_id, data);

if (resources_data->should_global_gc() || resources_data->resources_total_size() > 0 ||
resources_data->resources_available_changed() ||
resources_data->resource_load_changed()) {
resources_buffer_[node_id] = *resources_data;
}
}

void GcsResourceManager::HandleReportResourceUsage(
const rpc::ReportResourceUsageRequest &request, rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
UpdateFromResourceReport(request.resources());

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST];
Expand Down Expand Up @@ -238,26 +242,24 @@ void GcsResourceManager::HandleGetAllResourceUsage(
++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST];
}

void GcsResourceManager::UpdateNodeResourceUsage(
const NodeID node_id, const rpc::ReportResourceUsageRequest &request) {
void GcsResourceManager::UpdateNodeResourceUsage(const NodeID &node_id,
const rpc::ResourcesData &resources) {
auto iter = node_resource_usages_.find(node_id);
if (iter == node_resource_usages_.end()) {
auto resources_data = std::make_shared<rpc::ResourcesData>();
resources_data->CopyFrom(request.resources());
resources_data->CopyFrom(resources);
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved
node_resource_usages_[node_id] = *resources_data;
} else {
if (request.resources().resources_total_size() > 0) {
(*iter->second.mutable_resources_total()) = request.resources().resources_total();
if (resources.resources_total_size() > 0) {
(*iter->second.mutable_resources_total()) = resources.resources_total();
}
if (request.resources().resources_available_changed()) {
(*iter->second.mutable_resources_available()) =
request.resources().resources_available();
if (resources.resources_available_changed()) {
(*iter->second.mutable_resources_available()) = resources.resources_available();
}
if (request.resources().resource_load_changed()) {
(*iter->second.mutable_resource_load()) = request.resources().resource_load();
if (resources.resource_load_changed()) {
(*iter->second.mutable_resource_load()) = resources.resource_load();
}
(*iter->second.mutable_resource_load_by_shape()) =
request.resources().resource_load_by_shape();
(*iter->second.mutable_resource_load_by_shape()) = resources.resource_load_by_shape();
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/ray/gcs/gcs_server/gcs_resource_manager.h
Expand Up @@ -136,8 +136,12 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
///
/// \param node_id Node id.
/// \param request Request containing resource usage.
void UpdateNodeResourceUsage(const NodeID node_id,
const rpc::ReportResourceUsageRequest &request);
void UpdateNodeResourceUsage(const NodeID &node_id,
const rpc::ResourcesData &resources);

/// Process a new resource report from a node, independent of the rpc handler it came
/// from.
void UpdateFromResourceReport(const rpc::ResourcesData &data);

/// Update the placement group load information so that it will be reported through
/// heartbeat.
Expand Down
147 changes: 147 additions & 0 deletions src/ray/gcs/gcs_server/gcs_resource_report_poller.cc
@@ -0,0 +1,147 @@
#include "ray/gcs/gcs_server/gcs_resource_report_poller.h"

namespace ray {
namespace gcs {

GcsResourceReportPoller::GcsResourceReportPoller(
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool,
std::function<void(const rpc::ResourcesData &)> handle_resource_report,
std::function<int64_t(void)> get_current_time_milli,
std::function<void(
const rpc::Address &, std::shared_ptr<rpc::NodeManagerClientPool> &,
std::function<void(const Status &, const rpc::RequestResourceReportReply &)>)>
request_report)
: ticker_(polling_service_),
max_concurrent_pulls_(RayConfig::instance().gcs_max_concurrent_resource_pulls()),
inflight_pulls_(0),
gcs_resource_manager_(gcs_resource_manager),
raylet_client_pool_(raylet_client_pool),
handle_resource_report_(handle_resource_report),
get_current_time_milli_(get_current_time_milli),
request_report_(request_report),
poll_period_ms_(RayConfig::instance().gcs_resource_report_poll_period_ms()) {}

GcsResourceReportPoller::~GcsResourceReportPoller() { Stop(); }

void GcsResourceReportPoller::Initialize(const GcsInitData &gcs_init_data) {
for (const auto &pair : gcs_init_data.Nodes()) {
HandleNodeAdded(pair.second);
}
}

void GcsResourceReportPoller::Start() {
polling_thread_.reset(new std::thread{[this]() {
SetThreadName("resource_report_poller");
boost::asio::io_service::work work(polling_service_);

polling_service_.run();
RAY_LOG(DEBUG) << "GCSResourceReportPoller has stopped. This should only happen if "
"the cluster has stopped";
}});
ticker_.RunFnPeriodically([this] { TryPullResourceReport(); }, 10);
}

void GcsResourceReportPoller::Stop() {
if (polling_thread_ != nullptr) {
// TODO (Alex): There's technically a race condition here if we start and stop the
// thread in rapid succession.
polling_service_.stop();
if (polling_thread_->joinable()) {
polling_thread_->join();
}
}
}

void GcsResourceReportPoller::HandleNodeAdded(const rpc::GcsNodeInfo &node_info) {
absl::MutexLock guard(&mutex_);

rpc::Address address;
address.set_raylet_id(node_info.node_id());
address.set_ip_address(node_info.node_manager_address());
address.set_port(node_info.node_manager_port());

auto state =
std::make_shared<PullState>(NodeID::FromBinary(node_info.node_id()),
std::move(address), -1, get_current_time_milli_());

const auto &node_id = state->node_id;

RAY_CHECK(!nodes_.count(node_id)) << "Node with id: " << node_id << " was added twice!";

nodes_[node_id] = state;
to_pull_queue_.push_front(state);
RAY_LOG(DEBUG) << "Node was added with id: " << node_id;

polling_service_.post([this]() { TryPullResourceReport(); });
}

void GcsResourceReportPoller::HandleNodeRemoved(const rpc::GcsNodeInfo &node_info) {
NodeID node_id = NodeID::FromBinary(node_info.node_id());
{
absl::MutexLock guard(&mutex_);
nodes_.erase(node_id);
RAY_CHECK(!nodes_.count(node_id));
RAY_LOG(DEBUG) << "Node removed (node_id: " << node_id
<< ")# of remaining nodes: " << nodes_.size();
}
}

void GcsResourceReportPoller::TryPullResourceReport() {
absl::MutexLock guard(&mutex_);
int64_t cur_time = get_current_time_milli_();
ericl marked this conversation as resolved.
Show resolved Hide resolved

RAY_LOG(DEBUG) << "Trying to pull inflight_pulls " << inflight_pulls_ << "/"
<< max_concurrent_pulls_ << ", queue size: " << to_pull_queue_.size();
while (inflight_pulls_ < max_concurrent_pulls_ && !to_pull_queue_.empty()) {
auto to_pull = to_pull_queue_.front();
if (cur_time < to_pull->next_pull_time) {
break;
}

to_pull_queue_.pop_front();

if (!nodes_.count(to_pull->node_id)) {
RAY_LOG(DEBUG)
<< "Update finished, but node was already removed from the cluster. Ignoring.";
continue;
}

PullResourceReport(to_pull);
}
}

void GcsResourceReportPoller::PullResourceReport(const std::shared_ptr<PullState> state) {
inflight_pulls_++;

request_report_(
state->address, raylet_client_pool_,
[this, state](const Status &status, const rpc::RequestResourceReportReply &reply) {
if (status.ok()) {
// TODO (Alex): This callback is always posted onto the main thread. Since most
// of the work is in the callback we should move this callback's execution to
// the polling thread. We will need to implement locking once we switch threads.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this already posting it to the polling service? Remove TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to track, but this is being posted on the main thread, because the client pool posts all callbacks on the main thread: https://github.com/ray-project/ray/blob/master/src/ray/gcs/gcs_server/gcs_server.cc#L39

We should make it possible to specify the io service that each rpc is posted to, but I think that's out of scope for this PR (and will require additional synchronization work).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just post the entire callback to the polling_service_ and remove the TODO? I think that would also make the code simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but I'm not sure it would make things simpler. This way, we only update the resource manager from the main thread, so we don't have to worry about locking it.

Btw won't posting the entire callback require deep copying the entire resource report?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, is the resource manager not thread-safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unfortunately not (in fact my understanding is that most of gcs isn't actually thread safe)

handle_resource_report_(reply.resources());
} else {
RAY_LOG(INFO) << "Couldn't get resource request from raylet " << state->node_id
ericl marked this conversation as resolved.
Show resolved Hide resolved
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
<< ": " << status.ToString();
}
polling_service_.post([this, state]() { NodeResourceReportReceived(state); });
});
}

void GcsResourceReportPoller::NodeResourceReportReceived(
const std::shared_ptr<PullState> state) {
absl::MutexLock guard(&mutex_);
inflight_pulls_--;

// Schedule the next pull. The scheduling `TryPullResourceReport` loop will handle
// validating that this node is still in the cluster.
state->next_pull_time = get_current_time_milli_() + poll_period_ms_;
to_pull_queue_.push_back(state);

polling_service_.post([this] { TryPullResourceReport(); });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this post, isn't there a Tick() scheduled already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we explicitly want to pull more things without waiting for a tick. cc @stephanie-wang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just wait for the tick? Especially if it's firing at high frequency (<10ms) We should either have a ticker or call it periodically, having both is unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's going to make a big difference either way, but why not do it this way since it's more responsive?

The only scenario I can see this mattering in is if rpc's fail very quickly for some reason.

}

} // namespace gcs
} // namespace ray