Skip to content

Commit

Permalink
[core] add grpc opencensus plugin (#39082)
Browse files Browse the repository at this point in the history
gRPC has a built in plugin to export metrics via opencensus, as detailed in https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md . We already use opencensus to collect Ray metrics, and here we add the gRPC-sourced metrics to the list.

In consideration of potential performance impacts, we disable it by default and can be enabled by component via config enable_grpc_metrics_collection_for. Now we only support gcs, in following patches we plan to support raylet and core_worker. The reason we don't support it right away is, we need to configure before any gRPC traffic, but in raylet and in core worker we init our stats after a grpc call to gcs/raylet to get the configs updated.
  • Loading branch information
rynewang committed Sep 13, 2023
1 parent 1225d52 commit 77b4cb9
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 1 deletion.
16 changes: 16 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ ray_cc_library(
deps = [
":reporter_rpc",
":stats_metric",
"@com_github_grpc_grpc//:grpc_opencensus_plugin",
],
)

Expand Down Expand Up @@ -1492,6 +1493,21 @@ ray_cc_test(
],
)

ray_cc_test(
name = "metric_exporter_grpc_test",
size = "small",
srcs = [
"src/ray/stats/metric_exporter_grpc_test.cc"],
tags = [
"stats",
"team:core",
],
deps = [
":stats_lib",
"@com_google_googletest//:gtest_main",
],
)

ray_cc_library(
name = "gcs_test_util_lib",
hdrs = [
Expand Down
63 changes: 62 additions & 1 deletion dashboard/modules/reporter/tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
from collections import defaultdict
from multiprocessing import Process
from unittest.mock import MagicMock
from google.protobuf import text_format

import psutil
import ray
from mock import patch
from ray._private import ray_constants
from ray._private.metrics_agent import fix_grpc_metric
from ray._private.test_utils import (
fetch_prometheus,
format_web_url,
Expand All @@ -24,6 +26,7 @@
from ray.dashboard.modules.reporter.reporter_agent import ReporterAgent
from ray.dashboard.tests.conftest import * # noqa
from ray.dashboard.utils import Bunch
from ray.core.generated.metrics_pb2 import Metric

try:
import prometheus_client
Expand Down Expand Up @@ -151,8 +154,65 @@ def _check_workers():
wait_for_condition(_check_workers, timeout=10)


def test_fix_grpc_metrics():
"""
A real metric output from gcs_server, with name prefixed with "grpc.io/" and 1
distribution time series. It has 45 buckets, first of which bounds = 0.0.
"""
metric_textproto = (
'metric_descriptor { name: "grpc.io/server/server_latency" description: "Time '
"between first byte of request received to last byte of response sent, or "
'terminal error" unit: "ms" label_keys { key: "grpc_server_method" } label_keys'
' { key: "Component" } label_keys { key: "WorkerId" } label_keys { key: '
'"Version" } label_keys { key: "NodeAddress" } label_keys { key: "SessionName" '
"} } timeseries { start_timestamp { seconds: 1693693592 } label_values { value:"
' "ray.rpc.NodeInfoGcsService/RegisterNode" } label_values { value: '
'"gcs_server" } label_values { } label_values { value: "3.0.0.dev0" } '
'label_values { value: "127.0.0.1" } label_values { value: '
'"session_2023-09-02_15-26-32_589652_23265" } points { timestamp { seconds: '
"1693693602 } distribution_value { count: 1 sum: 0.266 bucket_options { "
"explicit { bounds: 0.0 bounds: 0.01 bounds: 0.05 bounds: 0.1 bounds: 0.3 "
"bounds: 0.6 bounds: 0.8 bounds: 1.0 bounds: 2.0 bounds: 3.0 bounds: 4.0 "
"bounds: 5.0 bounds: 6.0 bounds: 8.0 bounds: 10.0 bounds: 13.0 bounds: 16.0 "
"bounds: 20.0 bounds: 25.0 bounds: 30.0 bounds: 40.0 bounds: 50.0 bounds: 65.0 "
"bounds: 80.0 bounds: 100.0 bounds: 130.0 bounds: 160.0 bounds: 200.0 bounds: "
"250.0 bounds: 300.0 bounds: 400.0 bounds: 500.0 bounds: 650.0 bounds: 800.0 "
"bounds: 1000.0 bounds: 2000.0 bounds: 5000.0 bounds: 10000.0 bounds: 20000.0 "
"bounds: 50000.0 bounds: 100000.0 } } buckets { } buckets { } buckets { } "
"buckets { } buckets { count: 1 } buckets { } buckets { } buckets { } buckets {"
" } buckets { } buckets { } buckets { } buckets { } buckets { } buckets { } "
"buckets { } buckets { } buckets { } buckets { } buckets { } buckets { } "
"buckets { } buckets { } buckets { } buckets { } buckets { } buckets { } "
"buckets { } buckets { } buckets { } buckets { } buckets { } buckets { } "
"buckets { } buckets { } buckets { } buckets { } buckets { } buckets { } "
"buckets { } buckets { } buckets { } } } }"
)

metric = Metric()
text_format.Parse(metric_textproto, metric)

expected_fixed_metric = Metric()
expected_fixed_metric.CopyFrom(metric)
expected_fixed_metric.metric_descriptor.name = "grpc_io_server_server_latency"
expected_fixed_metric.timeseries[0].points[
0
].distribution_value.bucket_options.explicit.bounds[0] = 0.0000001

fix_grpc_metric(metric)
assert metric == expected_fixed_metric


@pytest.fixture
def enable_grpc_metrics_collection():
os.environ["RAY_enable_grpc_metrics_collection_for"] = "gcs"
yield
os.environ.pop("RAY_enable_grpc_metrics_collection_for", None)


@pytest.mark.skipif(prometheus_client is None, reason="prometheus_client not installed")
def test_prometheus_physical_stats_record(enable_test_module, shutdown_only):
def test_prometheus_physical_stats_record(
enable_grpc_metrics_collection, enable_test_module, shutdown_only
):
addresses = ray.init(include_dashboard=True, num_cpus=1)
metrics_export_port = addresses["metrics_export_port"]
addr = addresses["raylet_ip_address"]
Expand Down Expand Up @@ -184,6 +244,7 @@ def test_case_stats_exist():
"ray_node_network_received" in metric_names,
"ray_node_network_send_speed" in metric_names,
"ray_node_network_receive_speed" in metric_names,
"ray_grpc_io_client_sent_bytes_per_rpc_bucket" in metric_names,
]
if sys.platform == "linux" or sys.platform == "linux2":
predicates.append("ray_node_mem_shared_bytes" in metric_names)
Expand Down
41 changes: 41 additions & 0 deletions python/ray/_private/metrics_agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
import re
import threading
import time
import traceback
Expand Down Expand Up @@ -41,6 +42,7 @@
# this time, we treat workers as dead.
RAY_WORKER_TIMEOUT_S = "RAY_WORKER_TIMEOUT_S"
GLOBAL_COMPONENT_KEY = "CORE"
RE_NON_ALPHANUMS = re.compile(r"[^a-zA-Z0-9]")


class Gauge(View):
Expand Down Expand Up @@ -73,6 +75,44 @@ def name(self):
Record = namedtuple("Record", ["gauge", "value", "tags"])


def fix_grpc_metric(metric: Metric):
"""
Fix the inbound `opencensus.proto.metrics.v1.Metric` protos to make it acceptable
by opencensus.stats.DistributionAggregationData.
- metric name: gRPC OpenCensus metrics have names with slashes and dots, e.g.
`grpc.io/client/server_latency`[1]. However Prometheus metric names only take
alphanums,underscores and colons[2]. We santinize the name by replacing non-alphanum
chars to underscore, like the official opencensus prometheus exporter[3].
- distribution bucket bounds: The Metric proto asks distribution bucket bounds to
be > 0 [4]. However, gRPC OpenCensus metrics have their first bucket bound == 0 [1].
This makes the `DistributionAggregationData` constructor to raise Exceptions. This
applies to all bytes and milliseconds (latencies). The fix: we update the initial 0
bounds to be 0.000_000_1. This will not affect the precision of the metrics, since
we don't expect any less-than-1 bytes, or less-than-1-nanosecond times.
[1] https://github.com/census-instrumentation/opencensus-specs/blob/master/stats/gRPC.md#units # noqa: E501
[2] https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
[3] https://github.com/census-instrumentation/opencensus-cpp/blob/50eb5de762e5f87e206c011a4f930adb1a1775b1/opencensus/exporters/stats/prometheus/internal/prometheus_utils.cc#L39 # noqa: E501
[4] https://github.com/census-instrumentation/opencensus-proto/blob/master/src/opencensus/proto/metrics/v1/metrics.proto#L218 # noqa: E501
"""

if not metric.metric_descriptor.name.startswith("grpc.io/"):
return

metric.metric_descriptor.name = RE_NON_ALPHANUMS.sub(
"_", metric.metric_descriptor.name
)

for series in metric.timeseries:
for point in series.points:
if point.HasField("distribution_value"):
dist_value = point.distribution_value
bucket_bounds = dist_value.bucket_options.explicit.bounds
if len(bucket_bounds) > 0 and bucket_bounds[0] == 0:
bucket_bounds[0] = 0.000_000_1


class OpencensusProxyMetric:
def __init__(self, name: str, desc: str, unit: str, label_keys: List[str]):
"""Represents the OpenCensus metrics that will be proxy exported."""
Expand Down Expand Up @@ -175,6 +215,7 @@ def record(self, metrics: List[Metric]):
"""
self._last_reported_time = time.monotonic()
for metric in metrics:
fix_grpc_metric(metric)
descriptor = metric.metric_descriptor
name = descriptor.name
label_keys = [label_key.key for label_key in descriptor.label_keys]
Expand Down
11 changes: 11 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,17 @@ RAY_CONFIG(uint64_t, gcs_mark_task_failed_on_worker_dead_delay_ms, /* 1 secs */
/// Whether or not we enable metrics collection.
RAY_CONFIG(bool, enable_metrics_collection, true)

/// Comma separated list of components we enable grpc metrics collection for.
/// Only effective if `enable_metrics_collection` is also true. Will have some performance
/// degredations.
///
/// Valid fields: "gcs".
/// TODO: it only works for gcs now. The goal is to do "gcs,core_worker,raylet.". The
/// problem is we need this config field *before* any grpc call, but raylet and
/// core_worker received configs from gcs and raylet respectively, so the configs are only
/// available *after* a grpc call.
RAY_CONFIG(std::string, enable_grpc_metrics_collection_for, "")

// Max number bytes of inlined objects in a task rpc request/response.
RAY_CONFIG(int64_t, task_rpc_inlined_bytes_limit, 10 * 1024 * 1024)

Expand Down
2 changes: 2 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ int main(int argc, char *argv[]) {
// as soon as there is no more work to be processed.
boost::asio::io_service::work work(main_service);

ray::stats::enable_grpc_metrics_collection_if_needed("gcs");

const ray::stats::TagsType global_tags = {{ray::stats::ComponentKey, "gcs_server"},
{ray::stats::WorkerIdKey, ""},
{ray::stats::VersionKey, kRayVersion},
Expand Down
37 changes: 37 additions & 0 deletions src/ray/stats/metric_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@
#include "ray/stats/metric_exporter.h"

#include <future>
#include <string_view>

namespace ray {
namespace stats {

namespace {
inline constexpr std::string_view kGrpcIoMetricsNamePrefix = "grpc.io/";
}

template <>
void MetricPointExporter::ExportToPoints(
const opencensus::stats::ViewData::DataMap<opencensus::stats::Distribution>
Expand Down Expand Up @@ -103,9 +108,24 @@ void MetricPointExporter::ExportViewData(
break;
}
}
for (auto &point : points) {
addGlobalTagsToGrpcMetric(point);
}
metric_exporter_client_->ReportMetrics(points);
}

/// Hack. We want to add GlobalTags to all our metrics, but gRPC OpenCencus plugin is not
/// configurable at all so we don't have chance to add our own tags. We use this hack to
/// append the tags in export time.
void MetricPointExporter::addGlobalTagsToGrpcMetric(MetricPoint &metric) {
if (std::string_view(metric.metric_name).substr(0, kGrpcIoMetricsNamePrefix.size()) ==
kGrpcIoMetricsNamePrefix) {
for (const auto &[key, value] : ray::stats::StatsConfig::instance().GetGlobalTags()) {
metric.tags[key.name()] = value;
}
}
}

OpenCensusProtoExporter::OpenCensusProtoExporter(const int port,
instrumented_io_context &io_service,
const std::string address,
Expand All @@ -115,6 +135,22 @@ OpenCensusProtoExporter::OpenCensusProtoExporter(const int port,
client_.reset(new rpc::MetricsAgentClient(address, port, client_call_manager_));
};

/// Hack. We want to add GlobalTags to all our metrics, but gRPC OpenCencus plugin is not
/// configurable at all so we don't have chance to add our own tags. We use this hack to
/// append the tags in export time.
void OpenCensusProtoExporter::addGlobalTagsToGrpcMetric(
opencensus::proto::metrics::v1::Metric &metric) {
if (std::string_view(metric.metric_descriptor().name())
.substr(0, kGrpcIoMetricsNamePrefix.size()) == kGrpcIoMetricsNamePrefix) {
for (const auto &[key, value] : ray::stats::StatsConfig::instance().GetGlobalTags()) {
metric.mutable_metric_descriptor()->add_label_keys()->set_key(key.name());
for (auto &timeseries : *metric.mutable_timeseries()) {
timeseries.add_label_values()->set_value(value);
}
}
}
}

void OpenCensusProtoExporter::ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData>> &data) {
Expand Down Expand Up @@ -201,6 +237,7 @@ void OpenCensusProtoExporter::ExportViewData(
RAY_LOG(FATAL) << "Unknown view data type.";
break;
}
addGlobalTagsToGrpcMetric(*request_point_proto);
}

{
Expand Down
2 changes: 2 additions & 0 deletions src/ray/stats/metric_exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class MetricPointExporter final : public opencensus::stats::StatsExporter::Handl
void ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData>> &data) override;
void addGlobalTagsToGrpcMetric(MetricPoint &metric);

private:
template <class DTYPE>
Expand Down Expand Up @@ -113,6 +114,7 @@ class OpenCensusProtoExporter final : public opencensus::stats::StatsExporter::H
void ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData>> &data) override;
void addGlobalTagsToGrpcMetric(opencensus::proto::metrics::v1::Metric &metric);

private:
/// Call Manager for gRPC client.
Expand Down
Loading

0 comments on commit 77b4cb9

Please sign in to comment.