diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.cc b/src/ray/gcs/gcs_server/gcs_actor_manager.cc index 6ed5657b49040..5ec42193ea650 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.cc @@ -17,6 +17,7 @@ #include #include "ray/common/ray_config.h" +#include "ray/stats/stats.h" namespace ray { namespace gcs { @@ -689,6 +690,10 @@ absl::flat_hash_set GcsActorManager::GetUnresolvedActorsByOwnerWorker( return actor_ids; } +void GcsActorManager::CollectStats() const { + stats::PendingActors.Record(pending_actors_.size()); +} + void GcsActorManager::OnWorkerDead(const ray::NodeID &node_id, const ray::WorkerID &worker_id, bool intentional_exit) { diff --git a/src/ray/gcs/gcs_server/gcs_actor_manager.h b/src/ray/gcs/gcs_server/gcs_actor_manager.h index 213f2ed38a519..e64fe431f8f69 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_manager.h +++ b/src/ray/gcs/gcs_server/gcs_actor_manager.h @@ -299,6 +299,9 @@ class GcsActorManager : public rpc::ActorInfoHandler { const absl::flat_hash_map> &GetActorRegisterCallbacks() const; + /// Collect stats from gcs actor manager in-memory data structures. + void CollectStats() const; + private: /// A data structure representing an actor's owner. struct Owner { diff --git a/src/ray/gcs/gcs_server/gcs_node_manager.cc b/src/ray/gcs/gcs_server/gcs_node_manager.cc index 941c93adb5669..d88c0afbe21f3 100644 --- a/src/ray/gcs/gcs_server/gcs_node_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_node_manager.cc @@ -16,6 +16,7 @@ #include "ray/common/ray_config.h" #include "ray/gcs/pb_util.h" +#include "ray/stats/stats.h" #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -446,6 +447,8 @@ std::shared_ptr GcsNodeManager::RemoveNode( auto iter = alive_nodes_.find(node_id); if (iter != alive_nodes_.end()) { removed_node = std::move(iter->second); + // Record stats that there's a new removed node. + stats::NodeFailureTotal.Record(1); // Remove from alive nodes. alive_nodes_.erase(iter); // Remove from cluster resources. @@ -558,6 +561,7 @@ void GcsNodeManager::SendBatchedHeartbeat() { for (auto &heartbeat : heartbeat_buffer_) { batch->add_batch()->Swap(&heartbeat.second); } + stats::OutboundHeartbeatSizeKB.Record((double)(batch->ByteSizeLong() / 1024.0)); RAY_CHECK_OK(gcs_pub_sub_->Publish(HEARTBEAT_BATCH_CHANNEL, "", batch->SerializeAsString(), nullptr)); diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index 92953ec93433f..c48b6adfed60c 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -16,6 +16,7 @@ #include "ray/common/ray_config.h" #include "ray/gcs/pb_util.h" +#include "ray/stats/stats.h" #include "ray/util/asio_util.h" #include "src/ray/protobuf/gcs.pb.h" @@ -422,6 +423,10 @@ void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenActorDead( } } +void GcsPlacementGroupManager::CollectStats() const { + stats::PendingPlacementGroups.Record(pending_placement_groups_.size()); +} + void GcsPlacementGroupManager::Tick() { UpdatePlacementGroupLoad(); execute_after(io_context_, [this] { Tick(); }, 1000 /* milliseconds */); diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index c19ab4689a856..68e1c66709cce 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -228,6 +228,9 @@ class GcsPlacementGroupManager : public rpc::PlacementGroupInfoHandler { /// to. void CleanPlacementGroupIfNeededWhenActorDead(const ActorID &actor_id); + /// Collect stats from gcs placement group manager in-memory data structures. + void CollectStats() const; + private: /// Try to create placement group after a short time. void RetryCreatingPlacementGroup(); diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index a53437ef2e79f..ffba3eb0a6d75 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -24,6 +24,7 @@ #include "ray/gcs/gcs_server/gcs_worker_manager.h" #include "ray/gcs/gcs_server/stats_handler_impl.h" #include "ray/gcs/gcs_server/task_info_handler_impl.h" +#include "ray/util/asio_util.h" namespace ray { namespace gcs { @@ -287,5 +288,13 @@ std::unique_ptr GcsServer::InitGcsWorkerManager() { new GcsWorkerManager(gcs_table_storage_, gcs_pub_sub_)); } +void GcsServer::CollectStats() { + gcs_actor_manager_->CollectStats(); + gcs_placement_group_manager_->CollectStats(); + execute_after( + main_service_, [this] { CollectStats(); }, + (RayConfig::instance().metrics_report_interval_ms() / 2) /* milliseconds */); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 062ff7d17c81d..f1e799139043f 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -110,6 +110,9 @@ class GcsServer { /// server address directly to raylets and get rid of this lookup. void StoreGcsServerAddressInRedis(); + /// Collect stats from each module for every (metrics_report_interval_ms / 2) ms. + void CollectStats(); + /// Gcs server configuration GcsServerConfig config_; /// The main io service to drive event posted from grpc threads. diff --git a/src/ray/gcs/gcs_server/gcs_worker_manager.cc b/src/ray/gcs/gcs_server/gcs_worker_manager.cc index 70bb4a8b46e0d..bc380fd900814 100644 --- a/src/ray/gcs/gcs_server/gcs_worker_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_worker_manager.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "ray/gcs/gcs_server/gcs_worker_manager.h" +#include "ray/stats/stats.h" namespace ray { namespace gcs { @@ -31,7 +32,8 @@ void GcsWorkerManager::HandleReportWorkerFailure( RAY_LOG(INFO) << log_stream.str(); } else { RAY_LOG(WARNING) << log_stream.str() - << ". If there are lots of this logs, that might indicate there are " + << ". Unintentional worker failures have been reported. If there " + "are lots of this logs, that might indicate there are " "unexpected failures in the cluster."; } auto worker_failure_data = std::make_shared(); @@ -51,6 +53,7 @@ void GcsWorkerManager::HandleReportWorkerFailure( << ", node id = " << node_id << ", address = " << worker_address.ip_address(); } else { + stats::UnintentionalWorkerFailures.Record(1); RAY_CHECK_OK(gcs_pub_sub_->Publish(WORKER_CHANNEL, worker_id.Binary(), worker_failure_data->SerializeAsString(), nullptr)); diff --git a/src/ray/gcs/redis_context.cc b/src/ray/gcs/redis_context.cc index 8407527216102..8f6a157ef73a0 100644 --- a/src/ray/gcs/redis_context.cc +++ b/src/ray/gcs/redis_context.cc @@ -40,7 +40,7 @@ void ProcessCallback(int64_t callback_index, if (!callback_item->is_subscription_) { // Record the redis latency for non-subscription redis operations. auto end_time = absl::GetCurrentTimeNanos() / 1000; - ray::stats::RedisLatency().Record(end_time - callback_item->start_time_); + ray::stats::GcsLatency().Record(end_time - callback_item->start_time_); } // Dispatch the callback. diff --git a/src/ray/object_manager/object_manager.cc b/src/ray/object_manager/object_manager.cc index 88a2af01a7ee4..209eb4106d360 100644 --- a/src/ray/object_manager/object_manager.cc +++ b/src/ray/object_manager/object_manager.cc @@ -930,10 +930,7 @@ void ObjectManager::RecordMetrics() const { stats::ObjectStoreAvailableMemory().Record(config_.object_store_memory - used_memory_); stats::ObjectStoreUsedMemory().Record(used_memory_); stats::ObjectStoreLocalObjects().Record(local_objects_.size()); - stats::ObjectManagerWaitRequests().Record(active_wait_requests_.size()); stats::ObjectManagerPullRequests().Record(pull_requests_.size()); - stats::ObjectManagerUnfulfilledPushRequests().Record(unfulfilled_push_requests_.size()); - stats::ObjectManagerProfileEvents().Record(profile_events_.size()); } } // namespace ray diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index f4c91ff6bae38..2bbbe35d2da8a 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -410,6 +410,7 @@ void NodeManager::Heartbeat() { "lagging, this node can be marked as dead mistakenly."; } last_heartbeat_at_ms_ = now_ms; + stats::HeartbeatReportMs.Record(interval); auto heartbeat_data = std::make_shared(); SchedulingResources &local_resources = cluster_resource_map_[self_node_id_]; @@ -2181,7 +2182,6 @@ void NodeManager::MarkObjectsAsFailed( } void NodeManager::SubmitTask(const Task &task) { - stats::TaskCountReceived().Record(1); const TaskSpecification &spec = task.GetTaskSpecification(); // Actor tasks should be no longer submitted to raylet. RAY_CHECK(!spec.IsActorTask()); @@ -3313,14 +3313,11 @@ void NodeManager::RecordMetrics() { } object_manager_.RecordMetrics(); - worker_pool_.RecordMetrics(); local_queues_.RecordMetrics(); - task_dependency_manager_.RecordMetrics(); auto statistical_data = GetActorStatisticalData(actor_registry_); stats::LiveActors().Record(statistical_data.live_actors); stats::RestartingActors().Record(statistical_data.restarting_actors); - stats::DeadActors().Record(statistical_data.dead_actors); } bool NodeManager::ReturnBundleResources(const BundleSpecification &bundle_spec) { diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index 25d3ff7071894..5bdd0528d2381 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -537,12 +537,7 @@ std::string SchedulingQueue::DebugString() const { } void SchedulingQueue::RecordMetrics() const { - stats::NumPlaceableTasks().Record( - GetTaskQueue(TaskState::PLACEABLE)->GetTasks().size()); - stats::NumPlaceableTasks().Record(GetTaskQueue(TaskState::WAITING)->GetTasks().size()); - stats::NumPlaceableTasks().Record(GetTaskQueue(TaskState::READY)->GetTasks().size()); - stats::NumPlaceableTasks().Record(GetTaskQueue(TaskState::RUNNING)->GetTasks().size()); - stats::NumPlaceableTasks().Record( + stats::NumInfeasibleTasks().Record( GetTaskQueue(TaskState::INFEASIBLE)->GetTasks().size()); } diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index e0b585f451354..f2b0ab9594af7 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -447,13 +447,6 @@ std::string TaskDependencyManager::DebugString() const { return result.str(); } -void TaskDependencyManager::RecordMetrics() const { - stats::NumSubscribedTasks().Record(task_dependencies_.size()); - stats::NumRequiredTasks().Record(required_tasks_.size()); - stats::NumRequiredObjects().Record(required_objects_.size()); - stats::NumPendingTasks().Record(pending_tasks_.size()); -} - bool TaskDependencyManager::GetOwnerAddress(const ObjectID &object_id, rpc::Address *owner_address) const { const auto creating_task_entry = required_tasks_.find(object_id.TaskId()); diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h index 639283c99c147..d35d644e73f3e 100644 --- a/src/ray/raylet/task_dependency_manager.h +++ b/src/ray/raylet/task_dependency_manager.h @@ -152,9 +152,6 @@ class TaskDependencyManager { /// \return string. std::string DebugString() const; - /// Record metrics. - void RecordMetrics() const; - /// Get the address of the owner of this object. An address will only be /// returned if the caller previously specified that this object is required /// on this node, through a call to SubscribeGetDependencies or diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 16e9296cc0128..4778e6b5a11bb 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -893,10 +893,6 @@ bool WorkerPool::DisconnectWorker(const std::shared_ptr &worker } } - stats::CurrentWorker().Record( - 0, {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(worker->GetProcess().GetId())}}); - MarkPortAsFree(worker->AssignedPort()); return RemoveWorker(state.idle, worker); } @@ -904,9 +900,6 @@ bool WorkerPool::DisconnectWorker(const std::shared_ptr &worker void WorkerPool::DisconnectDriver(const std::shared_ptr &driver) { auto &state = GetStateForLanguage(driver->GetLanguage()); RAY_CHECK(RemoveWorker(state.registered_drivers, driver)); - stats::CurrentDriver().Record( - 0, {{stats::LanguageKey, Language_Name(driver->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(driver->GetProcess().GetId())}}); MarkPortAsFree(driver->AssignedPort()); } @@ -1060,26 +1053,6 @@ std::string WorkerPool::DebugString() const { return result.str(); } -void WorkerPool::RecordMetrics() const { - for (const auto &entry : states_by_lang_) { - // Record worker. - for (auto worker : entry.second.registered_workers) { - stats::CurrentWorker().Record( - worker->GetProcess().GetId(), - {{stats::LanguageKey, Language_Name(worker->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(worker->GetProcess().GetId())}}); - } - - // Record driver. - for (auto driver : entry.second.registered_drivers) { - stats::CurrentDriver().Record( - driver->GetProcess().GetId(), - {{stats::LanguageKey, Language_Name(driver->GetLanguage())}, - {stats::WorkerPidKey, std::to_string(driver->GetProcess().GetId())}}); - } - } -} - } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 4a876777a0a89..75fd641d1976c 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -266,9 +266,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// \return string. std::string DebugString() const; - /// Record metrics. - void RecordMetrics() const; - protected: /// Asynchronously start a new worker process. Once the worker process has /// registered with an external server, the process should create and diff --git a/src/ray/stats/metric_defs.h b/src/ray/stats/metric_defs.h index 1bfce58b4fd17..84e10d080e3d9 100644 --- a/src/ray/stats/metric_defs.h +++ b/src/ray/stats/metric_defs.h @@ -27,25 +27,14 @@ /// /// Common /// -static Histogram RedisLatency("redis_latency", "The latency of a Redis operation.", "us", - {100, 200, 300, 400, 500, 600, 700, 800, 900, 1000}, - {CustomKey}); +static Histogram GcsLatency("gcs_latency", + "The latency of a GCS (by default Redis) operation.", "us", + {100, 200, 300, 400, 500, 600, 700, 800, 900, 1000}, + {CustomKey}); /// /// Raylet Metrics /// -static Gauge CurrentWorker("current_worker", - "This metric is used for reporting states of workers." - "Through this, we can see the worker's state on dashboard.", - "1 pcs", {LanguageKey, WorkerPidKey}); - -static Gauge CurrentDriver("current_driver", - "This metric is used for reporting states of drivers.", - "1 pcs", {LanguageKey, DriverPidKey}); - -static Count TaskCountReceived("task_count_received", - "Number of tasks received by raylet.", "pcs", {}); - static Gauge LocalAvailableResource("local_available_resource", "The available resources on this node.", "pcs", {ResourceNameKey}); @@ -59,8 +48,6 @@ static Gauge LiveActors("live_actors", "Number of live actors.", "actors"); static Gauge RestartingActors("restarting_actors", "Number of restarting actors.", "actors"); -static Gauge DeadActors("dead_actors", "Number of dead actors.", "actors"); - static Gauge ObjectStoreAvailableMemory( "object_store_available_memory", "Amount of memory currently available in the object store.", "bytes"); @@ -73,54 +60,41 @@ static Gauge ObjectStoreLocalObjects("object_store_num_local_objects", "Number of objects currently in the object store.", "objects"); -static Gauge ObjectManagerWaitRequests("object_manager_num_wait_requests", - "Number of pending wait requests for objects.", - "requests"); - static Gauge ObjectManagerPullRequests("object_manager_num_pull_requests", "Number of active pull requests for objects.", "requests"); -static Gauge ObjectManagerUnfulfilledPushRequests( - "object_manager_unfulfilled_push_requests", - "Number of unfulfilled push requests for objects.", "requests"); - -static Gauge ObjectManagerProfileEvents("object_manager_num_buffered_profile_events", - "Number of locally-buffered profile events.", - "events"); - -static Gauge NumSubscribedTasks( - "num_subscribed_tasks", - "The number of tasks that are subscribed to object dependencies.", "tasks"); - -static Gauge NumRequiredTasks("num_required_tasks", - "The number of tasks whose output object(s) are " - "required by another subscribed task.", - "tasks"); - -static Gauge NumRequiredObjects( - "num_required_objects", - "The number of objects that are required by a subscribed task.", "objects"); - -static Gauge NumPendingTasks("num_pending_tasks", - "The number of tasks that are pending execution.", "tasks"); - -static Gauge NumPlaceableTasks( - "num_placeable_tasks", - "The number of tasks in the scheduler that are in the 'placeable' state.", "tasks"); - -static Gauge NumWaitingTasks( - "num_waiting_tasks", - "The number of tasks in the scheduler that are in the 'waiting' state.", "tasks"); - -static Gauge NumReadyTasks( - "num_ready_tasks", - "The number of tasks in the scheduler that are in the 'ready' state.", "tasks"); - -static Gauge NumRunningTasks( - "num_running_tasks", - "The number of tasks in the scheduler that are in the 'running' state.", "tasks"); - static Gauge NumInfeasibleTasks( "num_infeasible_tasks", "The number of tasks in the scheduler that are in the 'infeasible' state.", "tasks"); + +static Histogram HeartbeatReportMs( + "heartbeat_report_ms", + "Heartbeat report time in raylet. If this value is high, that means there's a high " + "system load. It is possible that this node will be killed because of missing " + "heartbeats.", + "ms", {100, 200, 400, 800, 1600, 3200, 6400, 15000, 30000}); + +/// +/// GCS Server Metrics +/// +static Count UnintentionalWorkerFailures( + "unintentional_worker_failures_total", + "Number of worker failures that are not intentional. For example, worker failures " + "due to system related errors.", + "worker_failures"); + +static Count NodeFailureTotal( + "node_failure_total", "Number of node failures that have happened in the cluster.", + "node_failures."); + +static Gauge PendingActors("pending_actors", "Number of pending actors in GCS server.", + "actors"); + +static Gauge PendingPlacementGroups( + "pending_placement_groups", "Number of pending placement groups in the GCS server.", + "placement_groups"); + +static Histogram OutboundHeartbeatSizeKB("outbound_heartbeat_size_kb", + "Outbound heartbeat payload size", "kb", + {10, 50, 100, 1000, 10000, 100000}); diff --git a/src/ray/stats/metric_exporter_client_test.cc b/src/ray/stats/metric_exporter_client_test.cc index 6ce53ccb94434..d1c9f19f90be4 100644 --- a/src/ray/stats/metric_exporter_client_test.cc +++ b/src/ray/stats/metric_exporter_client_test.cc @@ -144,7 +144,7 @@ bool DoubleEqualTo(double value, double compared_value) { TEST_F(MetricExporterClientTest, decorator_test) { // Export client should emit at least once in report flush interval. for (size_t i = 0; i < 100; ++i) { - stats::CurrentWorker().Record(i + 1); + stats::LiveActors().Record(i + 1); } opencensus::stats::DeltaProducer::Get()->Flush(); opencensus::stats::StatsExporterImpl::Get()->Export(); diff --git a/src/ray/stats/stats_test.cc b/src/ray/stats/stats_test.cc index 24013009a4e78..21e1627233a4d 100644 --- a/src/ray/stats/stats_test.cc +++ b/src/ray/stats/stats_test.cc @@ -41,14 +41,12 @@ class MockExporter : public opencensus::stats::StatsExporter::Handler { auto &descriptor = datum.first; auto &view_data = datum.second; - ASSERT_EQ("current_worker", descriptor.name()); + ASSERT_EQ("local_available_resource", descriptor.name()); ASSERT_EQ(opencensus::stats::ViewData::Type::kDouble, view_data.type()); for (const auto &row : view_data.double_data()) { for (size_t i = 0; i < descriptor.columns().size(); ++i) { - if (descriptor.columns()[i].name() == "WorkerPidKey") { - ASSERT_EQ("1000", row.first[i]); - } else if (descriptor.columns()[i].name() == "LanguageKey") { - ASSERT_EQ("CPP", row.first[i]); + if (descriptor.columns()[i].name() == "ResourceName") { + ASSERT_EQ("CPU", row.first[i]); } } // row.second store the data of this metric. @@ -69,8 +67,7 @@ class StatsTest : public ::testing::Test { absl::Duration harvest_interval = absl::Milliseconds(kReportFlushInterval / 2); ray::stats::StatsConfig::instance().SetReportInterval(report_interval); ray::stats::StatsConfig::instance().SetHarvestInterval(harvest_interval); - const stats::TagsType global_tags = {{stats::LanguageKey, "CPP"}, - {stats::WorkerPidKey, "1000"}}; + const stats::TagsType global_tags = {{stats::ResourceNameKey, "CPU"}}; std::shared_ptr exporter( new stats::StdoutExporterClient()); ray::stats::Init(global_tags, MetricsAgentPort, exporter); @@ -85,7 +82,7 @@ class StatsTest : public ::testing::Test { TEST_F(StatsTest, F) { for (size_t i = 0; i < 20; ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); - stats::CurrentWorker().Record(2345); + stats::LocalAvailableResource().Record(2345); } }