From dc15b4ef824b85c9cf0e08b772e1ee4ebb44d825 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 7 Mar 2024 14:10:16 -0800 Subject: [PATCH 1/5] Export Ray Counter as Prometheus Counter metric Signed-off-by: Jiajun Yao --- python/ray/_private/metrics_agent.py | 112 ++++++++++++++++++++------- src/ray/stats/metric_exporter.cc | 21 +++++ 2 files changed, 107 insertions(+), 26 deletions(-) diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index 01308eec9d761..77251a3d13d18 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -14,6 +14,7 @@ HistogramMetricFamily, ) from opencensus.metrics.export.value import ValueDouble +from opencensus.metrics.export.metric_descriptor import MetricDescriptorType from opencensus.stats import aggregation from opencensus.stats import measure as measure_module from opencensus.stats.view_manager import ViewManager @@ -24,6 +25,7 @@ CountAggregationData, DistributionAggregationData, LastValueAggregationData, + SumAggregationData, ) from opencensus.stats.view import View from opencensus.tags import tag_key as tag_key_module @@ -161,11 +163,22 @@ def record(self, metric: Metric): # Aggregate points. for point in series.points: - if point.HasField("int64_value"): + if ( + metric.metric_descriptor.type + == MetricDescriptorType.CUMULATIVE_INT64 + ): data = CountAggregationData(point.int64_value) - elif point.HasField("double_value"): + elif ( + metric.metric_descriptor.type + == MetricDescriptorType.CUMULATIVE_DOUBLE + ): + data = SumAggregationData(ValueDouble, point.double_value) + elif metric.metric_descriptor.type == MetricDescriptorType.GAUGE_DOUBLE: data = LastValueAggregationData(ValueDouble, point.double_value) - elif point.HasField("distribution_value"): + elif ( + metric.metric_descriptor.type + == MetricDescriptorType.CUMULATIVE_DISTRIBUTION + ): dist_value = point.distribution_value counts_per_bucket = [bucket.count for bucket in dist_value.buckets] bucket_bounds = dist_value.bucket_options.explicit.bounds @@ -292,7 +305,7 @@ def clean_stale_components(self): return stale_components # TODO(sang): add start and end timestamp - def to_metric( + def to_metrics( self, metric_name: str, metric_description: str, @@ -300,8 +313,8 @@ def to_metric( metric_units: str, label_values: Tuple[tag_value_module.TagValue], agg_data: Any, - metrics_map: Dict[str, PrometheusMetric], - ) -> PrometheusMetric: + metrics_map: Dict[str, List[PrometheusMetric]], + ): """to_metric translate the data that OpenCensus create to Prometheus format, using Prometheus Metric object. @@ -315,9 +328,8 @@ def to_metric( label_values: The values of `label_keys`. agg_data: `opencensus.stats.aggregation_data.AggregationData` object. Aggregated data that needs to be converted as Prometheus samples + metrics_map: The converted metric is added to this map. - Returns: - A Prometheus metric object """ assert self._components_lock.locked() metric_name = f"{self._namespace}_{metric_name}" @@ -328,17 +340,62 @@ def to_metric( label_values = [tv if tv else "" for tv in label_values] if isinstance(agg_data, CountAggregationData): - metric = metrics_map.get(metric_name) - if not metric: + metrics = metrics_map.get(metric_name) + if not metrics: metric = CounterMetricFamily( name=metric_name, documentation=metric_description, unit=metric_units, labels=label_keys, ) - metrics_map[metric_name] = metric - metric.add_metric(labels=label_values, value=agg_data.count_data) - return metric + metrics = [metric] + metrics_map[metric_name] = metrics + metrics[0].add_metric(labels=label_values, value=agg_data.count_data) + return + + if isinstance(agg_data, SumAggregationData): + # This should be emitted as prometheus counter + # but we used to emit it as prometheus gauge. + # To keep the backward compatibility + # (changing from counter to gauge changes the metric name + # since prometheus client will add "_total" suffix to counter + # per OpenMetrics specification), + # we now emit both counter and gauge and in the + # next major Ray release (3.0) we can stop emitting gauge. + # This leaves people enough time to migrate their dashboards. + # See https://github.com/ray-project/ray/issues/37768. + metrics = metrics_map.get(metric_name) + if not metrics: + metric = CounterMetricFamily( + name=metric_name, + documentation=metric_description, + labels=label_keys, + ) + metrics = [metric] + metrics_map[metric_name] = metrics + metrics[0].add_metric(labels=label_values, value=agg_data.sum_data) + + if metric_name.endswith("_total"): + # In this case, we only need to emit prometheus counter + # since for metric name already ends with _total suffix + # prometheus client won't change it + # so there is no backward compatibility issue. + # See https://prometheus.github.io/client_python/instrumenting/counter/ + pass + else: + if len(metrics) == 1: + metric = GaugeMetricFamily( + name=metric_name, + documentation=( + f"(DEPRECATED, use {metric_name}_total metric instead) " + f"{metric_description}" + ), + labels=label_keys, + ) + metrics.append(metric) + assert len(metrics) == 2 + metrics[1].add_metric(labels=label_values, value=agg_data.sum_data) + return elif isinstance(agg_data, DistributionAggregationData): @@ -356,32 +413,34 @@ def to_metric( # In OpenCensus we don't have +Inf in the bucket bonds so need to # append it here. buckets.append(["+Inf", agg_data.count_data]) - metric = metrics_map.get(metric_name) - if not metric: + metrics = metrics_map.get(metric_name) + if not metrics: metric = HistogramMetricFamily( name=metric_name, documentation=metric_description, labels=label_keys, ) - metrics_map[metric_name] = metric - metric.add_metric( + metrics = [metric] + metrics_map[metric_name] = metrics + metrics[0].add_metric( labels=label_values, buckets=buckets, sum_value=agg_data.sum, ) - return metric + return elif isinstance(agg_data, LastValueAggregationData): - metric = metrics_map.get(metric_name) - if not metric: + metrics = metrics_map.get(metric_name) + if not metrics: metric = GaugeMetricFamily( name=metric_name, documentation=metric_description, labels=label_keys, ) - metrics_map[metric_name] = metric - metric.add_metric(labels=label_values, value=agg_data.value) - return metric + metrics = [metric] + metrics_map[metric_name] = metrics + metrics[0].add_metric(labels=label_values, value=agg_data.value) + return else: raise ValueError(f"unsupported aggregation type {type(agg_data)}") @@ -399,7 +458,7 @@ def collect(self): # pragma: NO COVER for component in self._components.values(): for metric in component.metrics.values(): for label_values, data in metric.data.items(): - self.to_metric( + self.to_metrics( metric.name, metric.desc, metric.label_keys, @@ -409,8 +468,9 @@ def collect(self): # pragma: NO COVER metrics_map, ) - for metric in metrics_map.values(): - yield metric + for metrics in metrics_map.values(): + for metric in metrics: + yield metric class MetricsAgent: diff --git a/src/ray/stats/metric_exporter.cc b/src/ray/stats/metric_exporter.cc index 00a4e1168fae1..5a3762ab45d0e 100644 --- a/src/ray/stats/metric_exporter.cc +++ b/src/ray/stats/metric_exporter.cc @@ -129,6 +129,27 @@ opencensus::proto::metrics::v1::Metric *addMetricProtoPayload( metric_descriptor_proto->set_description(measure_descriptor.description()); metric_descriptor_proto->set_unit(measure_descriptor.units()); + auto descriptor_type = opencensus::proto::metrics::v1::MetricDescriptor::UNSPECIFIED; + auto view_aggregation = view_descriptor.aggregation(); + switch (view_aggregation.type()) { + case opencensus::stats::Aggregation::Type::kCount: + descriptor_type = opencensus::proto::metrics::v1::MetricDescriptor::CUMULATIVE_INT64; + break; + case opencensus::stats::Aggregation::Type::kSum: + descriptor_type = opencensus::proto::metrics::v1::MetricDescriptor::CUMULATIVE_DOUBLE; + break; + case opencensus::stats::Aggregation::Type::kDistribution: + descriptor_type = + opencensus::proto::metrics::v1::MetricDescriptor::CUMULATIVE_DISTRIBUTION; + break; + case opencensus::stats::Aggregation::Type::kLastValue: + descriptor_type = opencensus::proto::metrics::v1::MetricDescriptor::GAUGE_DOUBLE; + break; + default: + break; + } + metric_descriptor_proto->set_type(descriptor_type); + for (const auto &tag_key : view_descriptor.columns()) { metric_descriptor_proto->add_label_keys()->set_key(tag_key.name()); }; From 59e449aa95cce1098b9f4c37ac43b1ed66375724 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Thu, 7 Mar 2024 21:58:32 -0800 Subject: [PATCH 2/5] up Signed-off-by: Jiajun Yao --- python/ray/_private/prometheus_exporter.py | 11 +++--- python/ray/tests/test_metrics_agent_2.py | 45 +++++++++++++++++++--- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/python/ray/_private/prometheus_exporter.py b/python/ray/_private/prometheus_exporter.py index 8086c9a415451..28d09861bee5a 100644 --- a/python/ray/_private/prometheus_exporter.py +++ b/python/ray/_private/prometheus_exporter.py @@ -137,7 +137,6 @@ def to_metric(self, desc, tag_values, agg_data, metrics_map): :class:`~prometheus_client.core.HistogramMetricFamily` or :class:`~prometheus_client.core.UnknownMetricFamily` or :class:`~prometheus_client.core.GaugeMetricFamily` - :returns: A Prometheus metric object """ metric_name = desc["name"] metric_description = desc["documentation"] @@ -160,7 +159,7 @@ def to_metric(self, desc, tag_values, agg_data, metrics_map): ) metrics_map[metric_name] = metric metric.add_metric(labels=tag_values, value=agg_data.count_data) - return metric + return elif isinstance(agg_data, aggregation_data_module.DistributionAggregationData): @@ -191,7 +190,7 @@ def to_metric(self, desc, tag_values, agg_data, metrics_map): buckets=buckets, sum_value=agg_data.sum, ) - return metric + return elif isinstance(agg_data, aggregation_data_module.SumAggregationData): metric = metrics_map.get(metric_name) @@ -203,7 +202,7 @@ def to_metric(self, desc, tag_values, agg_data, metrics_map): ) metrics_map[metric_name] = metric metric.add_metric(labels=tag_values, value=agg_data.sum_data) - return metric + return elif isinstance(agg_data, aggregation_data_module.LastValueAggregationData): metric = metrics_map.get(metric_name) @@ -215,7 +214,7 @@ def to_metric(self, desc, tag_values, agg_data, metrics_map): ) metrics_map[metric_name] = metric metric.add_metric(labels=tag_values, value=agg_data.value) - return metric + return else: raise ValueError(f"unsupported aggregation type {type(agg_data)}") @@ -235,7 +234,7 @@ def collect(self): # pragma: NO COVER desc = self.registered_views[v_name] for tag_values in view_data.tag_value_aggregation_data_map: agg_data = view_data.tag_value_aggregation_data_map[tag_values] - metric = self.to_metric(desc, tag_values, agg_data, metrics_map) + self.to_metric(desc, tag_values, agg_data, metrics_map) for metric in metrics_map.values(): yield metric diff --git a/python/ray/tests/test_metrics_agent_2.py b/python/ray/tests/test_metrics_agent_2.py index f4ae4bb69bb70..6d1550d15b9e1 100644 --- a/python/ray/tests/test_metrics_agent_2.py +++ b/python/ray/tests/test_metrics_agent_2.py @@ -12,6 +12,7 @@ from opencensus.stats.stats_recorder import StatsRecorder from opencensus.stats import execution_context from prometheus_client.core import REGISTRY +from opencensus.metrics.export.metric_descriptor import MetricDescriptorType from ray._private.metrics_agent import Gauge, MetricsAgent, Record, RAY_WORKER_TIMEOUT_S from ray._private.services import new_port from ray.core.generated.metrics_pb2 import ( @@ -61,6 +62,7 @@ def generate_protobuf_metric( name: str, desc: str, unit: str, + type: MetricDescriptorType, label_keys: List[str] = None, timeseries: List[TimeSeries] = None, ): @@ -74,6 +76,7 @@ def generate_protobuf_metric( name=name, description=desc, unit=unit, + type=type, label_keys=[LabelKey(key="a"), LabelKey(key="b")], ), timeseries=timeseries, @@ -199,7 +202,12 @@ def test_metrics_agent_proxy_record_and_export_basic(get_agent): # Test the basic case. m = generate_protobuf_metric( - "test", "desc", "", label_keys=["a", "b"], timeseries=[] + "test", + "desc", + "", + MetricDescriptorType.GAUGE_DOUBLE, + label_keys=["a", "b"], + timeseries=[], ) m.timeseries.append(generate_timeseries(["a", "b"], [1, 2, 3])) agent.proxy_export_metrics([m]) @@ -211,7 +219,12 @@ def test_metrics_agent_proxy_record_and_export_basic(get_agent): # Test new metric has proxyed. m = generate_protobuf_metric( - "test", "desc", "", label_keys=["a", "b"], timeseries=[] + "test", + "desc", + "", + MetricDescriptorType.GAUGE_DOUBLE, + label_keys=["a", "b"], + timeseries=[], ) m.timeseries.append(generate_timeseries(["a", "b"], [4])) agent.proxy_export_metrics([m]) @@ -223,7 +236,12 @@ def test_metrics_agent_proxy_record_and_export_basic(get_agent): # Test new metric with different tag is reported. m = generate_protobuf_metric( - "test", "desc", "", label_keys=["a", "b"], timeseries=[] + "test", + "desc", + "", + MetricDescriptorType.GAUGE_DOUBLE, + label_keys=["a", "b"], + timeseries=[], ) m.timeseries.append(generate_timeseries(["a", "c"], [5])) agent.proxy_export_metrics([m]) @@ -247,7 +265,12 @@ def test_metrics_agent_proxy_record_and_export_from_workers(get_agent): worker_id = WorkerID.from_random() m = generate_protobuf_metric( - "test", "desc", "", label_keys=["a", "b"], timeseries=[] + "test", + "desc", + "", + MetricDescriptorType.GAUGE_DOUBLE, + label_keys=["a", "b"], + timeseries=[], ) m.timeseries.append(generate_timeseries(["a", "b"], [1, 2, 3])) agent.proxy_export_metrics([m], worker_id_hex=worker_id.hex()) @@ -284,7 +307,12 @@ def test_metrics_agent_proxy_record_and_export_from_workers_complicated( metrics = [] for i in range(8): m = generate_protobuf_metric( - f"test_{i}", "desc", "", label_keys=["a", "b"], timeseries=[] + f"test_{i}", + "desc", + "", + MetricDescriptorType.GAUGE_DOUBLE, + label_keys=["a", "b"], + timeseries=[], ) m.timeseries.append(generate_timeseries(["a", str(i)], [3])) metrics.append(m) @@ -341,7 +369,12 @@ def test_metrics_agent_proxy_record_and_export_from_workers_delay(get_agent): # worker_id = WorkerID.from_random() m = generate_protobuf_metric( - "test", "desc", "", label_keys=["a", "b"], timeseries=[] + "test", + "desc", + "", + MetricDescriptorType.GAUGE_DOUBLE, + label_keys=["a", "b"], + timeseries=[], ) m.timeseries.append(generate_timeseries(["a", "b"], [1, 2, 3])) agent.proxy_export_metrics([m], worker_id_hex=worker_id.hex()) From 0ccf9836c6f346ec61a00227c501175d38b96d37 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Fri, 8 Mar 2024 02:22:49 -0800 Subject: [PATCH 3/5] up Signed-off-by: Jiajun Yao --- .../modules/reporter/tests/test_reporter.py | 8 +- python/ray/_private/metrics_agent.py | 11 +- python/ray/_private/test_utils.py | 19 ++-- python/ray/tests/test_metrics_agent.py | 106 ++++++++++++++++-- python/ray/tests/test_plasma_unlimited.py | 2 +- python/ray/util/metrics.py | 7 ++ 6 files changed, 129 insertions(+), 24 deletions(-) diff --git a/dashboard/modules/reporter/tests/test_reporter.py b/dashboard/modules/reporter/tests/test_reporter.py index 35ef9f5cbdc79..8dd25c8e5c6c3 100644 --- a/dashboard/modules/reporter/tests/test_reporter.py +++ b/dashboard/modules/reporter/tests/test_reporter.py @@ -222,7 +222,8 @@ def test_prometheus_physical_stats_record( prom_addresses = [f"{addr}:{metrics_export_port}"] def test_case_stats_exist(): - components_dict, metric_names, metric_samples = fetch_prometheus(prom_addresses) + _, metric_descriptors, _ = fetch_prometheus(prom_addresses) + metric_names = metric_descriptors.keys() predicates = [ "ray_node_cpu_utilization" in metric_names, "ray_node_cpu_count" in metric_names, @@ -255,7 +256,7 @@ def test_case_stats_exist(): return all(predicates) def test_case_ip_correct(): - components_dict, metric_names, metric_samples = fetch_prometheus(prom_addresses) + _, _, metric_samples = fetch_prometheus(prom_addresses) raylet_proc = ray._private.worker._global_node.all_processes[ ray_constants.PROCESS_TYPE_RAYLET ][0] @@ -292,7 +293,8 @@ def f(): ray.get(ret) def test_worker_stats(): - _, metric_names, metric_samples = fetch_prometheus(prom_addresses) + _, metric_descriptors, _ = fetch_prometheus(prom_addresses) + metric_names = metric_descriptors.keys() expected_metrics = [ "ray_component_cpu_percentage", "ray_component_rss_mb", diff --git a/python/ray/_private/metrics_agent.py b/python/ray/_private/metrics_agent.py index 77251a3d13d18..4f2623e194467 100644 --- a/python/ray/_private/metrics_agent.py +++ b/python/ray/_private/metrics_agent.py @@ -36,6 +36,7 @@ from ray._raylet import GcsClient from ray.core.generated.metrics_pb2 import Metric +from ray._private.ray_constants import env_bool logger = logging.getLogger(__name__) @@ -267,6 +268,10 @@ def __init__(self, namespace: str, component_timeout_s: int = 60): # For other components (raylet, GCS), # they contain the global key `GLOBAL_COMPONENT_KEY`. self._components = {} + # Whether we want to export counter as gauge. + # This is for bug compatibility. + # See https://github.com/ray-project/ray/pull/43795. + self._export_counter_as_gauge = env_bool("RAY_EXPORT_COUNTER_AS_GAUGE", True) def record(self, metrics: List[Metric], worker_id_hex: str = None): """Record the metrics reported from the component that reports it. @@ -363,7 +368,7 @@ def to_metrics( # we now emit both counter and gauge and in the # next major Ray release (3.0) we can stop emitting gauge. # This leaves people enough time to migrate their dashboards. - # See https://github.com/ray-project/ray/issues/37768. + # See https://github.com/ray-project/ray/pull/43795. metrics = metrics_map.get(metric_name) if not metrics: metric = CounterMetricFamily( @@ -375,7 +380,9 @@ def to_metrics( metrics_map[metric_name] = metrics metrics[0].add_metric(labels=label_values, value=agg_data.sum_data) - if metric_name.endswith("_total"): + if not self._export_counter_as_gauge: + pass + elif metric_name.endswith("_total"): # In this case, we only need to emit prometheus counter # since for metric name already ends with _total suffix # prometheus client won't change it diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 3e2a93e3dd9ad..00acc62690cc5 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -702,7 +702,7 @@ def get_metric_check_condition( def f(): for metric_pattern in metrics_to_check: - _, metric_names, metric_samples = fetch_prometheus([prom_addr]) + _, _, metric_samples = fetch_prometheus([prom_addr]) for metric_sample in metric_samples: if metric_pattern.matches(metric_sample): break @@ -1107,7 +1107,7 @@ def fetch_raw_prometheus(prom_addresses): def fetch_prometheus(prom_addresses): components_dict = {} - metric_names = set() + metric_descriptors = {} metric_samples = [] for address in prom_addresses: @@ -1115,14 +1115,13 @@ def fetch_prometheus(prom_addresses): components_dict[address] = set() for address, response in fetch_raw_prometheus(prom_addresses): - for line in response.split("\n"): - for family in text_string_to_metric_families(line): - for sample in family.samples: - metric_names.add(sample.name) - metric_samples.append(sample) - if "Component" in sample.labels: - components_dict[address].add(sample.labels["Component"]) - return components_dict, metric_names, metric_samples + for metric in text_string_to_metric_families(response): + for sample in metric.samples: + metric_descriptors[sample.name] = metric + metric_samples.append(sample) + if "Component" in sample.labels: + components_dict[address].add(sample.labels["Component"]) + return components_dict, metric_descriptors, metric_samples def fetch_prometheus_metrics(prom_addresses: List[str]) -> Dict[str, List[Any]]: diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index d827918491fd9..da23cbdb99569 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -4,6 +4,7 @@ import pathlib import sys import requests +from collections import defaultdict from pprint import pformat from unittest.mock import MagicMock @@ -182,9 +183,11 @@ def _setup_cluster_for_test(request, ray_start_cluster): extra_tags = {"ray_version": ray.__version__} - # Generate a metric in the driver. + # Generate metrics in the driver. counter = Counter("test_driver_counter", description="desc") counter.inc(tags=extra_tags) + gauge = Gauge("test_gauge", description="gauge") + gauge.set(1, tags=extra_tags) # Generate some metrics from actor & tasks. @ray.remote @@ -250,7 +253,10 @@ def test_metrics_export_end_to_end(_setup_cluster_for_test): ) = _setup_cluster_for_test def test_cases(): - components_dict, metric_names, metric_samples = fetch_prometheus(prom_addresses) + components_dict, metric_descriptors, metric_samples = fetch_prometheus( + prom_addresses + ) + metric_names = metric_descriptors.keys() session_name = ray._private.worker.global_worker.node.session_name # Raylet should be on every node @@ -266,9 +272,26 @@ def test_cases(): "core_worker" in components for components in components_dict.values() ) - # Make sure our user defined metrics exist - for metric_name in ["test_counter", "test_histogram", "test_driver_counter"]: - assert any(metric_name in full_name for full_name in metric_names) + # Make sure our user defined metrics exist and have the correct types + for metric_name in [ + "test_counter", + "test_counter_total", + "test_histogram_bucket", + "test_driver_counter", + "test_driver_counter_total", + "test_gauge", + ]: + metric_name = f"ray_{metric_name}" + assert metric_name in metric_names + if metric_name.endswith("_total"): + assert metric_descriptors[metric_name].type == "counter" + elif metric_name.endswith("_counter"): + # Make sure we emit counter as gauge for bug compatibility + assert metric_descriptors[metric_name].type == "gauge" + elif metric_name.endswith("_bucket"): + assert metric_descriptors[metric_name].type == "histogram" + elif metric_name.endswith("_gauge"): + assert metric_descriptors[metric_name].type == "gauge" # Make sure metrics are recorded. for metric in _METRICS: @@ -320,9 +343,10 @@ def test_cases(): assert grpc_sample.labels["Component"] != "core_worker" # Autoscaler metrics - _, autoscaler_metric_names, autoscaler_samples = fetch_prometheus( + (_, autoscaler_metric_descriptors, autoscaler_samples,) = fetch_prometheus( [autoscaler_export_addr] ) # noqa + autoscaler_metric_names = autoscaler_metric_descriptors.keys() for metric in _AUTOSCALER_METRICS: # Metric name should appear with some suffix (_count, _total, # etc...) in the list of all names @@ -333,7 +357,8 @@ def test_cases(): assert sample.labels["SessionName"] == session_name # Dashboard metrics - _, dashboard_metric_names, _ = fetch_prometheus([dashboard_export_addr]) + _, dashboard_metric_descriptors, _ = fetch_prometheus([dashboard_export_addr]) + dashboard_metric_names = dashboard_metric_descriptors.keys() for metric in _DASHBOARD_METRICS: # Metric name should appear with some suffix (_count, _total, # etc...) in the list of all names @@ -540,6 +565,70 @@ def verify(): wait_for_condition(verify, timeout=60) +@pytest.mark.skipif(sys.platform == "win32", reason="Not working in Windows.") +def test_counter(shutdown_only): + # Test to make sure Counter emits the right Prometheus metrics + context = ray.init() + + counter = Counter("test_counter", description="desc") + counter.inc(2.0) + counter.inc(3.0) + + counter_with_total_suffix = Counter("test_counter2_total", description="desc2") + counter_with_total_suffix.inc(1.5) + + def check_metrics(): + metrics_page = "localhost:{}".format( + context.address_info["metrics_export_port"] + ) + _, metric_descriptors, metric_samples = fetch_prometheus([metrics_page]) + metric_samples_by_name = defaultdict(list) + for metric_sample in metric_samples: + metric_samples_by_name[metric_sample.name].append(metric_sample) + + assert "ray_test_counter" in metric_descriptors + assert metric_descriptors["ray_test_counter"].type == "gauge" + assert metric_descriptors["ray_test_counter"].documentation == "(DEPRECATED, use ray_test_counter_total metric instead) desc" + assert metric_samples_by_name["ray_test_counter"][-1].value == 5.0 + + assert "ray_test_counter_total" in metric_descriptors + assert metric_descriptors["ray_test_counter_total"].type == "counter" + assert metric_descriptors["ray_test_counter_total"].documentation == "desc" + assert metric_samples_by_name["ray_test_counter_total"][-1].value == 5.0 + + assert "ray_test_counter2_total" in metric_descriptors + assert metric_descriptors["ray_test_counter2_total"].type == "counter" + assert metric_descriptors["ray_test_counter2_total"].documentation == "desc2" + assert metric_samples_by_name["ray_test_counter2_total"][-1].value == 1.5 + + return True + + wait_for_condition(check_metrics, timeout=60) + + +@pytest.mark.skipif(sys.platform == "win32", reason="Not working in Windows.") +def test_counter_without_export_counter_as_gauge(monkeypatch, shutdown_only): + # Test to make sure we don't export counter as gauge if RAY_EXPORT_COUNTER_AS_GAUGE is 0 + monkeypatch.setenv("RAY_EXPORT_COUNTER_AS_GAUGE", "0") + context = ray.init() + counter = Counter("test_counter", description="desc") + counter.inc(2.0) + + def check_metrics(): + metrics_page = "localhost:{}".format( + context.address_info["metrics_export_port"] + ) + _, metric_descriptors, _ = fetch_prometheus([metrics_page]) + print(metric_descriptors.keys()) + + assert "ray_test_counter" not in metric_descriptors + assert "ray_test_counter_total" in metric_descriptors + + return True + + wait_for_condition(check_metrics, timeout=20) + + @pytest.mark.skipif(sys.platform == "win32", reason="Not working in Windows.") def test_per_func_name_stats(shutdown_only): # Test operation stats are available when flag is on. @@ -900,7 +989,8 @@ def test_metrics_disablement(_setup_cluster_for_test): prom_addresses, autoscaler_export_addr, _ = _setup_cluster_for_test def verify_metrics_not_collected(): - components_dict, metric_names, _ = fetch_prometheus(prom_addresses) + components_dict, metric_descriptors, _ = fetch_prometheus(prom_addresses) + metric_names = metric_descriptors.keys() # Make sure no component is reported. for _, comp in components_dict.items(): if len(comp) > 0: diff --git a/python/ray/tests/test_plasma_unlimited.py b/python/ray/tests/test_plasma_unlimited.py index 38737b8600f27..60048f817ad86 100644 --- a/python/ray/tests/test_plasma_unlimited.py +++ b/python/ray/tests/test_plasma_unlimited.py @@ -330,7 +330,7 @@ def test_object_store_memory_metrics_reported_correctly(shutdown_only): check_spilled_mb(address, spilled=800, restored=800, fallback=400) def verify_used_object_store_memory(expected_mb): - components_dict, metric_names, metric_samples = fetch_prometheus([prom_addr]) + _, _, metric_samples = fetch_prometheus([prom_addr]) def in_mb(bytes): return int(bytes / 1024 / 1024) diff --git a/python/ray/util/metrics.py b/python/ray/util/metrics.py index e6f21da5a1512..c0d1861143b01 100644 --- a/python/ray/util/metrics.py +++ b/python/ray/util/metrics.py @@ -140,6 +140,13 @@ class Counter(Metric): This corresponds to Prometheus' counter metric: https://prometheus.io/docs/concepts/metric_types/#counter + Before Ray 2.10, this exports a Prometheus gauge metric instead of + a counter metric, which is wrong. + Since 2.10, this exports a Prometheus counter metric with `_total` suffix + and a deprecated Prometheus gauge metric for bug compatiblity. + Users can disable the behavior of exporting Prometheus gauge metric + by setting env var `RAY_EXPORT_COUNTER_AS_GAUGE=0` before starting Ray. + Args: name: Name of the metric. description: Description of the metric. From 5e9789535dec840fc41e87ed65c731c7ca8edf8f Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Fri, 8 Mar 2024 02:44:20 -0800 Subject: [PATCH 4/5] up Signed-off-by: Jiajun Yao --- python/ray/tests/test_metrics_agent.py | 28 +++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index da23cbdb99569..b4a2524b15e92 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -570,12 +570,17 @@ def test_counter(shutdown_only): # Test to make sure Counter emits the right Prometheus metrics context = ray.init() - counter = Counter("test_counter", description="desc") - counter.inc(2.0) - counter.inc(3.0) + @ray.remote + class Actor: + def __init__(self): + self.counter = Counter("test_counter", description="desc") + self.counter.inc(2.0) + self.counter.inc(3.0) - counter_with_total_suffix = Counter("test_counter2_total", description="desc2") - counter_with_total_suffix.inc(1.5) + self.counter_with_total_suffix = Counter("test_counter2_total", description="desc2") + self.counter_with_total_suffix.inc(1.5) + + actor = Actor.remote() def check_metrics(): metrics_page = "localhost:{}".format( @@ -611,22 +616,27 @@ def test_counter_without_export_counter_as_gauge(monkeypatch, shutdown_only): # Test to make sure we don't export counter as gauge if RAY_EXPORT_COUNTER_AS_GAUGE is 0 monkeypatch.setenv("RAY_EXPORT_COUNTER_AS_GAUGE", "0") context = ray.init() - counter = Counter("test_counter", description="desc") - counter.inc(2.0) + + @ray.remote + class Actor: + def __init__(self): + self.counter = Counter("test_counter", description="desc") + self.counter.inc(2.0) + + actor = Actor.remote() def check_metrics(): metrics_page = "localhost:{}".format( context.address_info["metrics_export_port"] ) _, metric_descriptors, _ = fetch_prometheus([metrics_page]) - print(metric_descriptors.keys()) assert "ray_test_counter" not in metric_descriptors assert "ray_test_counter_total" in metric_descriptors return True - wait_for_condition(check_metrics, timeout=20) + wait_for_condition(check_metrics, timeout=60) @pytest.mark.skipif(sys.platform == "win32", reason="Not working in Windows.") From e238459dfc8ca3e2b976da198486003838fe5805 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Fri, 8 Mar 2024 08:52:39 -0800 Subject: [PATCH 5/5] up Signed-off-by: Jiajun Yao --- python/ray/tests/test_metrics_agent.py | 16 +++++++++++----- python/ray/util/metrics.py | 7 +++---- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/python/ray/tests/test_metrics_agent.py b/python/ray/tests/test_metrics_agent.py index b4a2524b15e92..e1b35f4722c25 100644 --- a/python/ray/tests/test_metrics_agent.py +++ b/python/ray/tests/test_metrics_agent.py @@ -577,10 +577,12 @@ def __init__(self): self.counter.inc(2.0) self.counter.inc(3.0) - self.counter_with_total_suffix = Counter("test_counter2_total", description="desc2") + self.counter_with_total_suffix = Counter( + "test_counter2_total", description="desc2" + ) self.counter_with_total_suffix.inc(1.5) - actor = Actor.remote() + _ = Actor.remote() def check_metrics(): metrics_page = "localhost:{}".format( @@ -593,7 +595,10 @@ def check_metrics(): assert "ray_test_counter" in metric_descriptors assert metric_descriptors["ray_test_counter"].type == "gauge" - assert metric_descriptors["ray_test_counter"].documentation == "(DEPRECATED, use ray_test_counter_total metric instead) desc" + assert ( + metric_descriptors["ray_test_counter"].documentation + == "(DEPRECATED, use ray_test_counter_total metric instead) desc" + ) assert metric_samples_by_name["ray_test_counter"][-1].value == 5.0 assert "ray_test_counter_total" in metric_descriptors @@ -613,7 +618,8 @@ def check_metrics(): @pytest.mark.skipif(sys.platform == "win32", reason="Not working in Windows.") def test_counter_without_export_counter_as_gauge(monkeypatch, shutdown_only): - # Test to make sure we don't export counter as gauge if RAY_EXPORT_COUNTER_AS_GAUGE is 0 + # Test to make sure we don't export counter as gauge + # if RAY_EXPORT_COUNTER_AS_GAUGE is 0 monkeypatch.setenv("RAY_EXPORT_COUNTER_AS_GAUGE", "0") context = ray.init() @@ -623,7 +629,7 @@ def __init__(self): self.counter = Counter("test_counter", description="desc") self.counter.inc(2.0) - actor = Actor.remote() + _ = Actor.remote() def check_metrics(): metrics_page = "localhost:{}".format( diff --git a/python/ray/util/metrics.py b/python/ray/util/metrics.py index c0d1861143b01..526ad27ddb179 100644 --- a/python/ray/util/metrics.py +++ b/python/ray/util/metrics.py @@ -142,10 +142,9 @@ class Counter(Metric): Before Ray 2.10, this exports a Prometheus gauge metric instead of a counter metric, which is wrong. - Since 2.10, this exports a Prometheus counter metric with `_total` suffix - and a deprecated Prometheus gauge metric for bug compatiblity. - Users can disable the behavior of exporting Prometheus gauge metric - by setting env var `RAY_EXPORT_COUNTER_AS_GAUGE=0` before starting Ray. + Since 2.10, this exports both counter (with a suffix "_total") and + gauge metrics (for bug compatibility). + Use `RAY_EXPORT_COUNTER_AS_GAUGE=0` to disable exporting the gauge metric. Args: name: Name of the metric.