Skip to content

Commit

Permalink
[Metrics] Implement basic metrics changes (#11769)
Browse files Browse the repository at this point in the history
* Implement basic metrics changes

* Addressed code review.

* Fix build issue.

* Fix build issue.
  • Loading branch information
rkooo567 committed Nov 5, 2020
1 parent 049df70 commit 3cd1d7f
Show file tree
Hide file tree
Showing 19 changed files with 80 additions and 125 deletions.
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <utility>

#include "ray/common/ray_config.h"
#include "ray/stats/stats.h"

namespace ray {
namespace gcs {
Expand Down Expand Up @@ -689,6 +690,10 @@ absl::flat_hash_set<ActorID> GcsActorManager::GetUnresolvedActorsByOwnerWorker(
return actor_ids;
}

void GcsActorManager::CollectStats() const {
stats::PendingActors.Record(pending_actors_.size());
}

void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id,
const ray::WorkerID &worker_id,
bool intentional_exit) {
Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ class GcsActorManager : public rpc::ActorInfoHandler {
const absl::flat_hash_map<ActorID, std::vector<RegisterActorCallback>>
&GetActorRegisterCallbacks() const;

/// Collect stats from gcs actor manager in-memory data structures.
void CollectStats() const;

private:
/// A data structure representing an actor's owner.
struct Owner {
Expand Down
4 changes: 4 additions & 0 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "ray/common/ray_config.h"
#include "ray/gcs/pb_util.h"
#include "ray/stats/stats.h"
#include "src/ray/protobuf/gcs.pb.h"

namespace ray {
Expand Down Expand Up @@ -446,6 +447,8 @@ std::shared_ptr<rpc::GcsNodeInfo> GcsNodeManager::RemoveNode(
auto iter = alive_nodes_.find(node_id);
if (iter != alive_nodes_.end()) {
removed_node = std::move(iter->second);
// Record stats that there's a new removed node.
stats::NodeFailureTotal.Record(1);
// Remove from alive nodes.
alive_nodes_.erase(iter);
// Remove from cluster resources.
Expand Down Expand Up @@ -558,6 +561,7 @@ void GcsNodeManager::SendBatchedHeartbeat() {
for (auto &heartbeat : heartbeat_buffer_) {
batch->add_batch()->Swap(&heartbeat.second);
}
stats::OutboundHeartbeatSizeKB.Record((double)(batch->ByteSizeLong() / 1024.0));

RAY_CHECK_OK(gcs_pub_sub_->Publish(HEARTBEAT_BATCH_CHANNEL, "",
batch->SerializeAsString(), nullptr));
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "ray/common/ray_config.h"
#include "ray/gcs/pb_util.h"
#include "ray/stats/stats.h"
#include "ray/util/asio_util.h"
#include "src/ray/protobuf/gcs.pb.h"

Expand Down Expand Up @@ -422,6 +423,10 @@ void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenActorDead(
}
}

void GcsPlacementGroupManager::CollectStats() const {
stats::PendingPlacementGroups.Record(pending_placement_groups_.size());
}

void GcsPlacementGroupManager::Tick() {
UpdatePlacementGroupLoad();
execute_after(io_context_, [this] { Tick(); }, 1000 /* milliseconds */);
Expand Down
3 changes: 3 additions & 0 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler {
/// to.
void CleanPlacementGroupIfNeededWhenActorDead(const ActorID &actor_id);

/// Collect stats from gcs placement group manager in-memory data structures.
void CollectStats() const;

private:
/// Try to create placement group after a short time.
void RetryCreatingPlacementGroup();
Expand Down
9 changes: 9 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "ray/gcs/gcs_server/gcs_worker_manager.h"
#include "ray/gcs/gcs_server/stats_handler_impl.h"
#include "ray/gcs/gcs_server/task_info_handler_impl.h"
#include "ray/util/asio_util.h"

namespace ray {
namespace gcs {
Expand Down Expand Up @@ -287,5 +288,13 @@ std::unique_ptr<GcsWorkerManager> GcsServer::InitGcsWorkerManager() {
new GcsWorkerManager(gcs_table_storage_, gcs_pub_sub_));
}

void GcsServer::CollectStats() {
gcs_actor_manager_->CollectStats();
gcs_placement_group_manager_->CollectStats();
execute_after(
main_service_, [this] { CollectStats(); },
(RayConfig::instance().metrics_report_interval_ms() / 2) /* milliseconds */);
}

} // namespace gcs
} // namespace ray
3 changes: 3 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class GcsServer {
/// server address directly to raylets and get rid of this lookup.
void StoreGcsServerAddressInRedis();

/// Collect stats from each module for every (metrics_report_interval_ms / 2) ms.
void CollectStats();

/// Gcs server configuration
GcsServerConfig config_;
/// The main io service to drive event posted from grpc threads.
Expand Down
5 changes: 4 additions & 1 deletion src/ray/gcs/gcs_server/gcs_worker_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "ray/gcs/gcs_server/gcs_worker_manager.h"
#include "ray/stats/stats.h"

namespace ray {
namespace gcs {
Expand All @@ -31,7 +32,8 @@ void GcsWorkerManager::HandleReportWorkerFailure(
RAY_LOG(INFO) << log_stream.str();
} else {
RAY_LOG(WARNING) << log_stream.str()
<< ". If there are lots of this logs, that might indicate there are "
<< ". Unintentional worker failures have been reported. If there "
"are lots of this logs, that might indicate there are "
"unexpected failures in the cluster.";
}
auto worker_failure_data = std::make_shared<WorkerTableData>();
Expand All @@ -51,6 +53,7 @@ void GcsWorkerManager::HandleReportWorkerFailure(
<< ", node id = " << node_id
<< ", address = " << worker_address.ip_address();
} else {
stats::UnintentionalWorkerFailures.Record(1);
RAY_CHECK_OK(gcs_pub_sub_->Publish(WORKER_CHANNEL, worker_id.Binary(),
worker_failure_data->SerializeAsString(),
nullptr));
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/redis_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void ProcessCallback(int64_t callback_index,
if (!callback_item->is_subscription_) {
// Record the redis latency for non-subscription redis operations.
auto end_time = absl::GetCurrentTimeNanos() / 1000;
ray::stats::RedisLatency().Record(end_time - callback_item->start_time_);
ray::stats::GcsLatency().Record(end_time - callback_item->start_time_);
}

// Dispatch the callback.
Expand Down
3 changes: 0 additions & 3 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -930,10 +930,7 @@ void ObjectManager::RecordMetrics() const {
stats::ObjectStoreAvailableMemory().Record(config_.object_store_memory - used_memory_);
stats::ObjectStoreUsedMemory().Record(used_memory_);
stats::ObjectStoreLocalObjects().Record(local_objects_.size());
stats::ObjectManagerWaitRequests().Record(active_wait_requests_.size());
stats::ObjectManagerPullRequests().Record(pull_requests_.size());
stats::ObjectManagerUnfulfilledPushRequests().Record(unfulfilled_push_requests_.size());
stats::ObjectManagerProfileEvents().Record(profile_events_.size());
}

} // namespace ray
5 changes: 1 addition & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ void NodeManager::Heartbeat() {
"lagging, this node can be marked as dead mistakenly.";
}
last_heartbeat_at_ms_ = now_ms;
stats::HeartbeatReportMs.Record(interval);

auto heartbeat_data = std::make_shared<HeartbeatTableData>();
SchedulingResources &local_resources = cluster_resource_map_[self_node_id_];
Expand Down Expand Up @@ -2181,7 +2182,6 @@ void NodeManager::MarkObjectsAsFailed(
}

void NodeManager::SubmitTask(const Task &task) {
stats::TaskCountReceived().Record(1);
const TaskSpecification &spec = task.GetTaskSpecification();
// Actor tasks should be no longer submitted to raylet.
RAY_CHECK(!spec.IsActorTask());
Expand Down Expand Up @@ -3313,14 +3313,11 @@ void NodeManager::RecordMetrics() {
}

object_manager_.RecordMetrics();
worker_pool_.RecordMetrics();
local_queues_.RecordMetrics();
task_dependency_manager_.RecordMetrics();

auto statistical_data = GetActorStatisticalData(actor_registry_);
stats::LiveActors().Record(statistical_data.live_actors);
stats::RestartingActors().Record(statistical_data.restarting_actors);
stats::DeadActors().Record(statistical_data.dead_actors);
}

bool NodeManager::ReturnBundleResources(const BundleSpecification &bundle_spec) {
Expand Down
7 changes: 1 addition & 6 deletions src/ray/raylet/scheduling_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -537,12 +537,7 @@ std::string SchedulingQueue::DebugString() const {
}

void SchedulingQueue::RecordMetrics() const {
stats::NumPlaceableTasks().Record(
GetTaskQueue(TaskState::PLACEABLE)->GetTasks().size());
stats::NumPlaceableTasks().Record(GetTaskQueue(TaskState::WAITING)->GetTasks().size());
stats::NumPlaceableTasks().Record(GetTaskQueue(TaskState::READY)->GetTasks().size());
stats::NumPlaceableTasks().Record(GetTaskQueue(TaskState::RUNNING)->GetTasks().size());
stats::NumPlaceableTasks().Record(
stats::NumInfeasibleTasks().Record(
GetTaskQueue(TaskState::INFEASIBLE)->GetTasks().size());
}

Expand Down
7 changes: 0 additions & 7 deletions src/ray/raylet/task_dependency_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -447,13 +447,6 @@ std::string TaskDependencyManager::DebugString() const {
return result.str();
}

void TaskDependencyManager::RecordMetrics() const {
stats::NumSubscribedTasks().Record(task_dependencies_.size());
stats::NumRequiredTasks().Record(required_tasks_.size());
stats::NumRequiredObjects().Record(required_objects_.size());
stats::NumPendingTasks().Record(pending_tasks_.size());
}

bool TaskDependencyManager::GetOwnerAddress(const ObjectID &object_id,
rpc::Address *owner_address) const {
const auto creating_task_entry = required_tasks_.find(object_id.TaskId());
Expand Down
3 changes: 0 additions & 3 deletions src/ray/raylet/task_dependency_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ class TaskDependencyManager {
/// \return string.
std::string DebugString() const;

/// Record metrics.
void RecordMetrics() const;

/// Get the address of the owner of this object. An address will only be
/// returned if the caller previously specified that this object is required
/// on this node, through a call to SubscribeGetDependencies or
Expand Down
27 changes: 0 additions & 27 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -893,20 +893,13 @@ bool WorkerPool::DisconnectWorker(const std::shared_ptr<WorkerInterface> &worker
}
}

stats::CurrentWorker().Record(
0, {{stats::LanguageKey, Language_Name(worker->GetLanguage())},
{stats::WorkerPidKey, std::to_string(worker->GetProcess().GetId())}});

MarkPortAsFree(worker->AssignedPort());
return RemoveWorker(state.idle, worker);
}

void WorkerPool::DisconnectDriver(const std::shared_ptr<WorkerInterface> &driver) {
auto &state = GetStateForLanguage(driver->GetLanguage());
RAY_CHECK(RemoveWorker(state.registered_drivers, driver));
stats::CurrentDriver().Record(
0, {{stats::LanguageKey, Language_Name(driver->GetLanguage())},
{stats::WorkerPidKey, std::to_string(driver->GetProcess().GetId())}});
MarkPortAsFree(driver->AssignedPort());
}

Expand Down Expand Up @@ -1060,26 +1053,6 @@ std::string WorkerPool::DebugString() const {
return result.str();
}

void WorkerPool::RecordMetrics() const {
for (const auto &entry : states_by_lang_) {
// Record worker.
for (auto worker : entry.second.registered_workers) {
stats::CurrentWorker().Record(
worker->GetProcess().GetId(),
{{stats::LanguageKey, Language_Name(worker->GetLanguage())},
{stats::WorkerPidKey, std::to_string(worker->GetProcess().GetId())}});
}

// Record driver.
for (auto driver : entry.second.registered_drivers) {
stats::CurrentDriver().Record(
driver->GetProcess().GetId(),
{{stats::LanguageKey, Language_Name(driver->GetLanguage())},
{stats::WorkerPidKey, std::to_string(driver->GetProcess().GetId())}});
}
}
}

} // namespace raylet

} // namespace ray
3 changes: 0 additions & 3 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
/// \return string.
std::string DebugString() const;

/// Record metrics.
void RecordMetrics() const;

protected:
/// Asynchronously start a new worker process. Once the worker process has
/// registered with an external server, the process should create and
Expand Down
Loading

0 comments on commit 3cd1d7f

Please sign in to comment.