Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gcs pull resource reports #14336

Merged
merged 57 commits into from Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
dc9336a
.
Feb 3, 2021
6432664
Merge branch 'master' of github.com:ray-project/ray into master
Feb 3, 2021
ca2afd1
done?
Feb 3, 2021
04b12a6
Merge branch 'master' of github.com:ray-project/ray into master
Feb 9, 2021
d69ca53
Merge branch 'master' of github.com:ray-project/ray into master
Feb 12, 2021
4b51207
Merge branch 'master' of github.com:ray-project/ray
Feb 23, 2021
652acb2
Merge branch 'master' of github.com:ray-project/ray into master
Feb 23, 2021
098e8c2
Merge branch 'master' of github.com:ray-project/ray
Feb 23, 2021
d92867c
Merge branch 'master' of github.com:wuisawesome/ray
Feb 23, 2021
58e2e78
raylet side done?
Feb 23, 2021
21cc430
.
Feb 23, 2021
fe94f3b
.
Feb 24, 2021
35a32ed
Update src/ray/raylet/node_manager.cc
Feb 24, 2021
3a084bc
.
Feb 25, 2021
7084689
.
Feb 25, 2021
b70ebe7
lint
Feb 25, 2021
bc6b3bd
added feature flag
Feb 25, 2021
4d061d9
.
Feb 26, 2021
5c915e7
.
Feb 26, 2021
0938b04
Without max inflight
Feb 27, 2021
8fb050f
Without max inflight
Feb 27, 2021
dc6405e
Looks like it might just work?
Feb 27, 2021
c59e5d7
.
Mar 1, 2021
fa4b71c
.
Mar 3, 2021
98deada
client
Mar 3, 2021
0304fbf
Merge branch 'raylet_request_resource_report' of github.com:wuisaweso…
Mar 3, 2021
c5ed4a2
Merge branch 'master' of github.com:ray-project/ray into raylet_reque…
Mar 3, 2021
6aba81a
.
Mar 3, 2021
64bacc3
Merge branch 'raylet_request_resource_report' of github.com:wuisaweso…
Mar 3, 2021
cb8591d
Merge branch 'master' of github.com:ray-project/ray into gcs_pull_res…
Mar 9, 2021
cf97a73
fix test
Mar 10, 2021
532a8d6
.
Mar 10, 2021
2239d0e
annotations
Mar 10, 2021
bc941bc
lint
Mar 10, 2021
088f36d
.
Mar 10, 2021
90ed7ec
.
Mar 16, 2021
d96c899
.
Mar 17, 2021
8750c09
.
Mar 17, 2021
61579e8
basic test
Mar 18, 2021
0b986c3
done?
Mar 18, 2021
b66640a
PullState constructor
Mar 19, 2021
ecebaef
PullState constructor
Mar 19, 2021
2800139
annotations
Mar 19, 2021
a1e52b0
comments
Mar 19, 2021
ff1f123
Merge branch 'master' of github.com:ray-project/ray into gcs_pull_res…
Mar 19, 2021
b3a717a
Done with testing?
Mar 19, 2021
31eab0d
logs
Mar 22, 2021
11d9677
Merge branch 'master' of github.com:ray-project/ray into gcs_pull_res…
Mar 22, 2021
d227826
lint
Mar 22, 2021
68bd287
Merge branch 'master' of github.com:ray-project/ray into gcs_pull_res…
Mar 22, 2021
a8f9ce0
works? need to fix logging now
Mar 22, 2021
67d574d
.
Mar 22, 2021
4bb447d
.
Mar 22, 2021
5a34b41
fix test
Mar 23, 2021
b66b5c3
appease clang
Mar 23, 2021
131d836
Merge branch 'master' of github.com:ray-project/ray into gcs_pull_res…
Mar 25, 2021
4646239
.
Mar 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ray/tests/test_multi_node_2.py
Expand Up @@ -101,7 +101,7 @@ def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
pg_demands = [{"GPU": 2}, {"extra_resource": 2}]
strategy = "STRICT_PACK"
pg = placement_group(pg_demands, strategy=strategy)
pg.ready()
# ray.get(pg.ready())
time.sleep(2) # wait for placemnt groups to propogate.

# Disable event clearing for test.
Expand Down
5 changes: 5 additions & 0 deletions src/ray/common/ray_config_def.h
Expand Up @@ -228,6 +228,11 @@ RAY_CONFIG(uint32_t, maximum_gcs_destroyed_actor_cached_count, 100000)
RAY_CONFIG(uint32_t, maximum_gcs_dead_node_cached_count, 1000)
/// The interval at which the gcs server will print debug info.
RAY_CONFIG(int64_t, gcs_dump_debug_log_interval_minutes, 1)
// The interval at which the gcs server will pull a new resource.
RAY_CONFIG(int, gcs_resource_report_poll_period_ms, 100)
// Feature flag to turn on resource report polling. Polling and raylet pushing or mutually
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
// exlusive.
RAY_CONFIG(bool, pull_based_resource_reporting, true)

/// Duration to sleep after failing to put an object in plasma because it is full.
RAY_CONFIG(uint32_t, object_store_full_delay_ms, 10)
Expand Down
38 changes: 20 additions & 18 deletions src/ray/gcs/gcs_server/gcs_resource_manager.cc
Expand Up @@ -159,12 +159,10 @@ void GcsResourceManager::HandleGetAllAvailableResources(
++counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST];
}

void GcsResourceManager::HandleReportResourceUsage(
const rpc::ReportResourceUsageRequest &request, rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
NodeID node_id = NodeID::FromBinary(request.resources().node_id());
void GcsResourceManager::UpdateFromResourceReport(const rpc::ResourcesData &data) {
NodeID node_id = NodeID::FromBinary(data.node_id());
auto resources_data = std::make_shared<rpc::ResourcesData>();
resources_data->CopyFrom(request.resources());
resources_data->CopyFrom(data);
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved

// We use `node_resource_usages_` to filter out the nodes that report resource
// information for the first time. `UpdateNodeResourceUsage` will modify
Expand All @@ -175,13 +173,19 @@ void GcsResourceManager::HandleReportResourceUsage(
SetAvailableResources(node_id, ResourceSet(resource_changed));
}

UpdateNodeResourceUsage(node_id, request);
UpdateNodeResourceUsage(node_id, data);

if (resources_data->should_global_gc() || resources_data->resources_total_size() > 0 ||
resources_data->resources_available_changed() ||
resources_data->resource_load_changed()) {
resources_buffer_[node_id] = *resources_data;
}
}

void GcsResourceManager::HandleReportResourceUsage(
const rpc::ReportResourceUsageRequest &request, rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
UpdateFromResourceReport(request.resources());

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST];
Expand Down Expand Up @@ -236,26 +240,24 @@ void GcsResourceManager::HandleGetAllResourceUsage(
++counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST];
}

void GcsResourceManager::UpdateNodeResourceUsage(
const NodeID node_id, const rpc::ReportResourceUsageRequest &request) {
void GcsResourceManager::UpdateNodeResourceUsage(const NodeID node_id,
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
const rpc::ResourcesData &resources) {
auto iter = node_resource_usages_.find(node_id);
if (iter == node_resource_usages_.end()) {
auto resources_data = std::make_shared<rpc::ResourcesData>();
resources_data->CopyFrom(request.resources());
resources_data->CopyFrom(resources);
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved
node_resource_usages_[node_id] = *resources_data;
} else {
if (request.resources().resources_total_size() > 0) {
(*iter->second.mutable_resources_total()) = request.resources().resources_total();
if (resources.resources_total_size() > 0) {
(*iter->second.mutable_resources_total()) = resources.resources_total();
}
if (request.resources().resources_available_changed()) {
(*iter->second.mutable_resources_available()) =
request.resources().resources_available();
if (resources.resources_available_changed()) {
(*iter->second.mutable_resources_available()) = resources.resources_available();
}
if (request.resources().resource_load_changed()) {
(*iter->second.mutable_resource_load()) = request.resources().resource_load();
if (resources.resource_load_changed()) {
(*iter->second.mutable_resource_load()) = resources.resource_load();
}
(*iter->second.mutable_resource_load_by_shape()) =
request.resources().resource_load_by_shape();
(*iter->second.mutable_resource_load_by_shape()) = resources.resource_load_by_shape();
}
}

Expand Down
7 changes: 5 additions & 2 deletions src/ray/gcs/gcs_server/gcs_resource_manager.h
Expand Up @@ -134,8 +134,11 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler {
///
/// \param node_id Node id.
/// \param request Request containing resource usage.
void UpdateNodeResourceUsage(const NodeID node_id,
const rpc::ReportResourceUsageRequest &request);
void UpdateNodeResourceUsage(const NodeID node_id, const rpc::ResourcesData &resources);
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved

/// Process a new resource report from a node, independent of the rpc handler it came
/// from.
void UpdateFromResourceReport(const rpc::ResourcesData &data);

/// Update the placement group load information so that it will be reported through
/// heartbeat.
Expand Down
130 changes: 130 additions & 0 deletions src/ray/gcs/gcs_server/gcs_resource_report_poller.cc
@@ -0,0 +1,130 @@
#include "ray/gcs/gcs_server/gcs_resource_report_poller.h"

namespace ray {
namespace gcs {

GcsResourceReportPoller::GcsResourceReportPoller(
uint64_t max_concurrent_pulls,
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool)
: max_concurrent_pulls_(max_concurrent_pulls),
inflight_pulls_(0),
gcs_resource_manager_(gcs_resource_manager),
raylet_client_pool_(raylet_client_pool),
poll_period_ms_(boost::posix_time::milliseconds(
RayConfig::instance().gcs_resource_report_poll_period_ms())) {}

GcsResourceReportPoller::~GcsResourceReportPoller() {
Stop();
}

void GcsResourceReportPoller::Start() {
polling_thread_ = std::unique_ptr<std::thread>(new std::thread{[&]() {
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
boost::asio::io_service::work work(polling_service_);

polling_service_.run();
RAY_LOG(DEBUG) << "GCSResourceReportPoller has stopped. This should only happen if "
"the cluster has stopped";
}});
}

void GcsResourceReportPoller::Stop() {
polling_service_.stop();
if (polling_thread_->joinable()) {
polling_thread_->join();
}
}

void GcsResourceReportPoller::HandleNodeAdded(
std::shared_ptr<rpc::GcsNodeInfo> node_info) {
absl::MutexLock guard(&mutex_);
const auto node_id = NodeID::FromBinary(node_info->node_id());

RAY_CHECK(!nodes_.count(node_id)) << "Node with id: " << node_id << " was added twice!";

auto &state = nodes_[node_id];
state.node_id = node_id;

state.address.set_raylet_id(node_info->node_id());
state.address.set_ip_address(node_info->node_manager_address());
state.address.set_port(node_info->node_manager_port());

state.last_pull_time = absl::GetCurrentTimeNanos();

state.next_pull_timer = std::unique_ptr<boost::asio::deadline_timer>(
new boost::asio::deadline_timer(polling_service_));


polling_service_.post([&, node_id]() { TryPullResourceReport(node_id); });
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
}

void GcsResourceReportPoller::HandleNodeRemoved(
std::shared_ptr<rpc::GcsNodeInfo> node_info) {
NodeID node_id = NodeID::FromBinary(node_info->node_id());

{
absl::MutexLock guard(&mutex_);
nodes_.erase(node_id);
}
}

void GcsResourceReportPoller::TryPullResourceReport(const NodeID &node_id) {
absl::MutexLock guard(&mutex_);

to_pull_queue_.push_back(node_id);
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved

while (inflight_pulls_ < max_concurrent_pulls_ && !to_pull_queue_.empty()) {
const NodeID &to_pull = to_pull_queue_.front();
to_pull_queue_.pop_front();

auto it = nodes_.find(to_pull);
if (it == nodes_.end()) {
RAY_LOG(DEBUG)
<< "Update finished, but node was already removed from the cluster. Ignoring.";
continue;
}
auto &state = it->second;
PullResourceReport(state);
}
}

void GcsResourceReportPoller::PullResourceReport(PullState &state) {
inflight_pulls_++;
const auto &node_id = state.node_id;
auto raylet_client = raylet_client_pool_->GetOrConnectByAddress(state.address);
raylet_client->RequestResourceReport(
[&, node_id](const Status &status, const rpc::RequestResourceReportReply &reply) {
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
if (status.ok()) {
// TODO (Alex): This callback is always posted onto the main thread. Since most
// of the work is in the callback we should move this callback's execution to
// the polling thread. We will need to implement locking once we switch threads.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this already posting it to the polling service? Remove TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to track, but this is being posted on the main thread, because the client pool posts all callbacks on the main thread: https://github.com/ray-project/ray/blob/master/src/ray/gcs/gcs_server/gcs_server.cc#L39

We should make it possible to specify the io service that each rpc is posted to, but I think that's out of scope for this PR (and will require additional synchronization work).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just post the entire callback to the polling_service_ and remove the TODO? I think that would also make the code simpler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but I'm not sure it would make things simpler. This way, we only update the resource manager from the main thread, so we don't have to worry about locking it.

Btw won't posting the entire callback require deep copying the entire resource report?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, is the resource manager not thread-safe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah unfortunately not (in fact my understanding is that most of gcs isn't actually thread safe)

gcs_resource_manager_->UpdateFromResourceReport(reply.resources());
polling_service_.post([this, node_id] { NodeResourceReportReceived(node_id); });
} else {
RAY_LOG(INFO) << "Couldn't get resource request from raylet: " << node_id << ". " << status.ToString();
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

void GcsResourceReportPoller::NodeResourceReportReceived(const NodeID &node_id) {
absl::MutexLock guard(&mutex_);
inflight_pulls_--;
auto it = nodes_.find(node_id);
if (it == nodes_.end()) {
RAY_LOG(DEBUG)
<< "Update finished, but node was already removed from the cluster. Ignoring.";
return;
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
}

auto &state = it->second;
state.next_pull_timer->expires_from_now(poll_period_ms_);
state.next_pull_timer->async_wait([&, node_id](const boost::system::error_code &error) {
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
if (!error) {
TryPullResourceReport(node_id);
}
RAY_LOG(INFO) << "GcsResourceReportPoller timer failed: " << error.message() << ".";
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
});
}

} // namespace gcs
} // namespace ray
97 changes: 97 additions & 0 deletions src/ray/gcs/gcs_server/gcs_resource_report_poller.h
@@ -0,0 +1,97 @@
#include "ray/gcs/gcs_server/gcs_resource_manager.h"
#include "ray/rpc/node_manager/node_manager_client_pool.h"

namespace ray {
namespace gcs {

/// Polls raylets on a separate thread to update GCS's view of the cluster's resource
/// utilization. This class creates and manages a polling thread. All public methods are
/// thread safe.
class GcsResourceReportPoller {
/*
This class roughly polls each node independently (with the exception of max
concurrency). The process for polling a single node is as follows:

A new node joins the cluster.
1. (Main thread) Begin tracking the node, and begin the polling process.

Main polling procedure.
2. (Polling thread) Enqueue the node to be pulled.
3. (Polling thread) Node is popped off the back of the queue and RequestResourceReport
is sent to the raylet.
4. (Main thread) The raylet responds and the resource manager is updated. This section
is _not_ thread safe (i.e. should not modify the resource report poller state).
5. (Polling thread) The RequestResourceReport continuation runs, scheduling the next
pull time.
6. (Polling thread) The next pull time occurs, and step 2 is repeated.

The node leaves the cluster.
7. Untrack the node. The next time the main polling procedure comes across the node, it
should be dropped from the system.
*/

public:
GcsResourceReportPoller(uint64_t max_concurrent_pulls,
std::shared_ptr<GcsResourceManager> gcs_resource_manager,
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool);

~GcsResourceReportPoller();

/// Start a thread to poll for resource updates.
void Start();

/// Stop polling for resource updates.
void Stop();

/// Event handler when a new node joins the cluster.
void HandleNodeAdded(std::shared_ptr<rpc::GcsNodeInfo> node_info);
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved

/// Event handler when a node leaves the cluster.
void HandleNodeRemoved(std::shared_ptr<rpc::GcsNodeInfo> node_info);
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved

private:
// An asio service which does the polling work.
boost::asio::io_context polling_service_;
// The associated thread it runs on.
std::unique_ptr<std::thread> polling_thread_;

// The maximum number of pulls that can occur at once.
const uint64_t max_concurrent_pulls_;
// The number of ongoing pulls.
uint64_t inflight_pulls_;
// The resource manager which maintains GCS's view of the cluster's resource
// utilization.
std::shared_ptr<GcsResourceManager> gcs_resource_manager_;
// The shared, thread safe pool of raylet clients, which we use to minimize connections.
std::shared_ptr<rpc::NodeManagerClientPool> raylet_client_pool_;
// The minimum delay between two pull requests to the same thread.
const boost::posix_time::milliseconds poll_period_ms_;

struct PullState {
NodeID node_id;
rpc::Address address;
int64_t last_pull_time;
std::unique_ptr<boost::asio::deadline_timer> next_pull_timer;
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
};

// A global lock for internal operations. This lock is shared between the main thread
// and polling thread, so we should be mindful about how long we hold it.
absl::Mutex mutex_;
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
// All the state regarding how to and when to send a new pull request to a raylet.
std::unordered_map<NodeID, PullState> nodes_;
// The set of all nodes which we are allowed to pull from. We can't necessarily pull
// from this list immediately because we limit the number of concurrent pulls.
std::deque<NodeID> to_pull_queue_;

/// Try to pull from the node. We may not be able to if it violates max concurrent
/// pulls. This method is thread safe.
void TryPullResourceReport(const NodeID &node_id);
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
/// Pull resource report without validation. This method is NOT thread safe.
void PullResourceReport(PullState &state);
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
/// A resource report was successfully pulled (and the resource manager was already
/// updated). This method is thread safe.
void NodeResourceReportReceived(const NodeID &node_id);
wuisawesome marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace gcs
} // namespace ray
15 changes: 15 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Expand Up @@ -99,6 +99,9 @@ void GcsServer::DoStart(const GcsInitData &gcs_init_data) {
// Init stats handler.
InitStatsHandler();

// Init resource report polling.
InitResourceReportPolling();

// Install event listeners.
InstallEventListeners();

Expand Down Expand Up @@ -130,6 +133,8 @@ void GcsServer::Stop() {

gcs_heartbeat_manager_->Stop();

gcs_resource_report_poller_->Stop();

is_stopped_ = true;
RAY_LOG(INFO) << "GCS server stopped.";
}
Expand Down Expand Up @@ -281,6 +286,14 @@ void GcsServer::InitTaskInfoHandler() {
rpc_server_.RegisterService(*task_info_service_);
}

void GcsServer::InitResourceReportPolling() {
gcs_resource_report_poller_ = std::unique_ptr<GcsResourceReportPoller>(
new GcsResourceReportPoller(100, gcs_resource_manager_, raylet_client_pool_));
clarkzinzow marked this conversation as resolved.
Show resolved Hide resolved
if (config_.pull_based_resource_reporting) {
gcs_resource_report_poller_->Start();
}
}

void GcsServer::InitStatsHandler() {
RAY_CHECK(gcs_table_storage_);
stats_handler_.reset(new rpc::DefaultStatsHandler(gcs_table_storage_));
Expand All @@ -307,6 +320,7 @@ void GcsServer::InstallEventListeners() {
gcs_placement_group_manager_->SchedulePendingPlacementGroups();
gcs_actor_manager_->SchedulePendingActors();
gcs_heartbeat_manager_->AddNode(NodeID::FromBinary(node->node_id()));
gcs_resource_report_poller_->HandleNodeAdded(node);
});
gcs_node_manager_->AddNodeRemovedListener(
[this](std::shared_ptr<rpc::GcsNodeInfo> node) {
Expand All @@ -317,6 +331,7 @@ void GcsServer::InstallEventListeners() {
gcs_placement_group_manager_->OnNodeDead(node_id);
gcs_actor_manager_->OnNodeDead(node_id);
raylet_client_pool_->Disconnect(NodeID::FromBinary(node->node_id()));
gcs_resource_report_poller_->HandleNodeRemoved(node);
});

// Install worker event listener.
Expand Down