Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Export Ray Counter as Prometheus Counter metric #43795

Merged
merged 5 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions dashboard/modules/reporter/tests/test_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ def test_prometheus_physical_stats_record(
prom_addresses = [f"{addr}:{metrics_export_port}"]

def test_case_stats_exist():
components_dict, metric_names, metric_samples = fetch_prometheus(prom_addresses)
_, metric_descriptors, _ = fetch_prometheus(prom_addresses)
metric_names = metric_descriptors.keys()
predicates = [
"ray_node_cpu_utilization" in metric_names,
"ray_node_cpu_count" in metric_names,
Expand Down Expand Up @@ -255,7 +256,7 @@ def test_case_stats_exist():
return all(predicates)

def test_case_ip_correct():
components_dict, metric_names, metric_samples = fetch_prometheus(prom_addresses)
_, _, metric_samples = fetch_prometheus(prom_addresses)
raylet_proc = ray._private.worker._global_node.all_processes[
ray_constants.PROCESS_TYPE_RAYLET
][0]
Expand Down Expand Up @@ -292,7 +293,8 @@ def f():
ray.get(ret)

def test_worker_stats():
_, metric_names, metric_samples = fetch_prometheus(prom_addresses)
_, metric_descriptors, _ = fetch_prometheus(prom_addresses)
metric_names = metric_descriptors.keys()
expected_metrics = [
"ray_component_cpu_percentage",
"ray_component_rss_mb",
Expand Down
119 changes: 93 additions & 26 deletions python/ray/_private/metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
HistogramMetricFamily,
)
from opencensus.metrics.export.value import ValueDouble
from opencensus.metrics.export.metric_descriptor import MetricDescriptorType
from opencensus.stats import aggregation
from opencensus.stats import measure as measure_module
from opencensus.stats.view_manager import ViewManager
Expand All @@ -24,6 +25,7 @@
CountAggregationData,
DistributionAggregationData,
LastValueAggregationData,
SumAggregationData,
)
from opencensus.stats.view import View
from opencensus.tags import tag_key as tag_key_module
Expand All @@ -34,6 +36,7 @@
from ray._raylet import GcsClient

from ray.core.generated.metrics_pb2 import Metric
from ray._private.ray_constants import env_bool

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -161,11 +164,22 @@ def record(self, metric: Metric):

# Aggregate points.
for point in series.points:
if point.HasField("int64_value"):
if (
metric.metric_descriptor.type
== MetricDescriptorType.CUMULATIVE_INT64
):
data = CountAggregationData(point.int64_value)
elif point.HasField("double_value"):
elif (
metric.metric_descriptor.type
== MetricDescriptorType.CUMULATIVE_DOUBLE
):
data = SumAggregationData(ValueDouble, point.double_value)
elif metric.metric_descriptor.type == MetricDescriptorType.GAUGE_DOUBLE:
data = LastValueAggregationData(ValueDouble, point.double_value)
elif point.HasField("distribution_value"):
elif (
metric.metric_descriptor.type
== MetricDescriptorType.CUMULATIVE_DISTRIBUTION
):
dist_value = point.distribution_value
counts_per_bucket = [bucket.count for bucket in dist_value.buckets]
bucket_bounds = dist_value.bucket_options.explicit.bounds
Expand Down Expand Up @@ -254,6 +268,10 @@ def __init__(self, namespace: str, component_timeout_s: int = 60):
# For other components (raylet, GCS),
# they contain the global key `GLOBAL_COMPONENT_KEY`.
self._components = {}
# Whether we want to export counter as gauge.
# This is for bug compatibility.
# See https://github.com/ray-project/ray/pull/43795.
self._export_counter_as_gauge = env_bool("RAY_EXPORT_COUNTER_AS_GAUGE", True)

def record(self, metrics: List[Metric], worker_id_hex: str = None):
"""Record the metrics reported from the component that reports it.
Expand Down Expand Up @@ -292,16 +310,16 @@ def clean_stale_components(self):
return stale_components

# TODO(sang): add start and end timestamp
def to_metric(
def to_metrics(
self,
metric_name: str,
metric_description: str,
label_keys: List[str],
metric_units: str,
label_values: Tuple[tag_value_module.TagValue],
agg_data: Any,
metrics_map: Dict[str, PrometheusMetric],
) -> PrometheusMetric:
metrics_map: Dict[str, List[PrometheusMetric]],
):
"""to_metric translate the data that OpenCensus create
to Prometheus format, using Prometheus Metric object.

Expand All @@ -315,9 +333,8 @@ def to_metric(
label_values: The values of `label_keys`.
agg_data: `opencensus.stats.aggregation_data.AggregationData` object.
Aggregated data that needs to be converted as Prometheus samples
metrics_map: The converted metric is added to this map.

Returns:
A Prometheus metric object
"""
assert self._components_lock.locked()
metric_name = f"{self._namespace}_{metric_name}"
Expand All @@ -328,17 +345,64 @@ def to_metric(
label_values = [tv if tv else "" for tv in label_values]

if isinstance(agg_data, CountAggregationData):
metric = metrics_map.get(metric_name)
if not metric:
metrics = metrics_map.get(metric_name)
if not metrics:
metric = CounterMetricFamily(
name=metric_name,
documentation=metric_description,
unit=metric_units,
labels=label_keys,
)
metrics_map[metric_name] = metric
metric.add_metric(labels=label_values, value=agg_data.count_data)
return metric
metrics = [metric]
metrics_map[metric_name] = metrics
metrics[0].add_metric(labels=label_values, value=agg_data.count_data)
return

if isinstance(agg_data, SumAggregationData):
# This should be emitted as prometheus counter
# but we used to emit it as prometheus gauge.
# To keep the backward compatibility
# (changing from counter to gauge changes the metric name
# since prometheus client will add "_total" suffix to counter
# per OpenMetrics specification),
# we now emit both counter and gauge and in the
# next major Ray release (3.0) we can stop emitting gauge.
# This leaves people enough time to migrate their dashboards.
# See https://github.com/ray-project/ray/pull/43795.
metrics = metrics_map.get(metric_name)
if not metrics:
metric = CounterMetricFamily(
name=metric_name,
documentation=metric_description,
labels=label_keys,
)
metrics = [metric]
metrics_map[metric_name] = metrics
metrics[0].add_metric(labels=label_values, value=agg_data.sum_data)

if not self._export_counter_as_gauge:
pass
elif metric_name.endswith("_total"):
# In this case, we only need to emit prometheus counter
# since for metric name already ends with _total suffix
# prometheus client won't change it
# so there is no backward compatibility issue.
# See https://prometheus.github.io/client_python/instrumenting/counter/
pass
else:
if len(metrics) == 1:
metric = GaugeMetricFamily(
name=metric_name,
documentation=(
f"(DEPRECATED, use {metric_name}_total metric instead) "
f"{metric_description}"
),
labels=label_keys,
)
metrics.append(metric)
assert len(metrics) == 2
metrics[1].add_metric(labels=label_values, value=agg_data.sum_data)
return

elif isinstance(agg_data, DistributionAggregationData):

Expand All @@ -356,32 +420,34 @@ def to_metric(
# In OpenCensus we don't have +Inf in the bucket bonds so need to
# append it here.
buckets.append(["+Inf", agg_data.count_data])
metric = metrics_map.get(metric_name)
if not metric:
metrics = metrics_map.get(metric_name)
if not metrics:
metric = HistogramMetricFamily(
name=metric_name,
documentation=metric_description,
labels=label_keys,
)
metrics_map[metric_name] = metric
metric.add_metric(
metrics = [metric]
metrics_map[metric_name] = metrics
metrics[0].add_metric(
labels=label_values,
buckets=buckets,
sum_value=agg_data.sum,
)
return metric
return

elif isinstance(agg_data, LastValueAggregationData):
metric = metrics_map.get(metric_name)
if not metric:
metrics = metrics_map.get(metric_name)
if not metrics:
metric = GaugeMetricFamily(
name=metric_name,
documentation=metric_description,
labels=label_keys,
)
metrics_map[metric_name] = metric
metric.add_metric(labels=label_values, value=agg_data.value)
return metric
metrics = [metric]
metrics_map[metric_name] = metrics
metrics[0].add_metric(labels=label_values, value=agg_data.value)
return

else:
raise ValueError(f"unsupported aggregation type {type(agg_data)}")
Expand All @@ -399,7 +465,7 @@ def collect(self): # pragma: NO COVER
for component in self._components.values():
for metric in component.metrics.values():
for label_values, data in metric.data.items():
self.to_metric(
self.to_metrics(
metric.name,
metric.desc,
metric.label_keys,
Expand All @@ -409,8 +475,9 @@ def collect(self): # pragma: NO COVER
metrics_map,
)

for metric in metrics_map.values():
yield metric
for metrics in metrics_map.values():
for metric in metrics:
yield metric


class MetricsAgent:
Expand Down
11 changes: 5 additions & 6 deletions python/ray/_private/prometheus_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ def to_metric(self, desc, tag_values, agg_data, metrics_map):
:class:`~prometheus_client.core.HistogramMetricFamily` or
:class:`~prometheus_client.core.UnknownMetricFamily` or
:class:`~prometheus_client.core.GaugeMetricFamily`
:returns: A Prometheus metric object
"""
metric_name = desc["name"]
metric_description = desc["documentation"]
Expand All @@ -160,7 +159,7 @@ def to_metric(self, desc, tag_values, agg_data, metrics_map):
)
metrics_map[metric_name] = metric
metric.add_metric(labels=tag_values, value=agg_data.count_data)
return metric
return

elif isinstance(agg_data, aggregation_data_module.DistributionAggregationData):

Expand Down Expand Up @@ -191,7 +190,7 @@ def to_metric(self, desc, tag_values, agg_data, metrics_map):
buckets=buckets,
sum_value=agg_data.sum,
)
return metric
return

elif isinstance(agg_data, aggregation_data_module.SumAggregationData):
metric = metrics_map.get(metric_name)
Expand All @@ -203,7 +202,7 @@ def to_metric(self, desc, tag_values, agg_data, metrics_map):
)
metrics_map[metric_name] = metric
metric.add_metric(labels=tag_values, value=agg_data.sum_data)
return metric
return

elif isinstance(agg_data, aggregation_data_module.LastValueAggregationData):
metric = metrics_map.get(metric_name)
Expand All @@ -215,7 +214,7 @@ def to_metric(self, desc, tag_values, agg_data, metrics_map):
)
metrics_map[metric_name] = metric
metric.add_metric(labels=tag_values, value=agg_data.value)
return metric
return

else:
raise ValueError(f"unsupported aggregation type {type(agg_data)}")
Expand All @@ -235,7 +234,7 @@ def collect(self): # pragma: NO COVER
desc = self.registered_views[v_name]
for tag_values in view_data.tag_value_aggregation_data_map:
agg_data = view_data.tag_value_aggregation_data_map[tag_values]
metric = self.to_metric(desc, tag_values, agg_data, metrics_map)
self.to_metric(desc, tag_values, agg_data, metrics_map)

for metric in metrics_map.values():
yield metric
Expand Down
19 changes: 9 additions & 10 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ def get_metric_check_condition(

def f():
for metric_pattern in metrics_to_check:
_, metric_names, metric_samples = fetch_prometheus([prom_addr])
_, _, metric_samples = fetch_prometheus([prom_addr])
for metric_sample in metric_samples:
if metric_pattern.matches(metric_sample):
break
Expand Down Expand Up @@ -1107,22 +1107,21 @@ def fetch_raw_prometheus(prom_addresses):

def fetch_prometheus(prom_addresses):
components_dict = {}
metric_names = set()
metric_descriptors = {}
metric_samples = []

for address in prom_addresses:
if address not in components_dict:
components_dict[address] = set()

for address, response in fetch_raw_prometheus(prom_addresses):
for line in response.split("\n"):
for family in text_string_to_metric_families(line):
for sample in family.samples:
metric_names.add(sample.name)
metric_samples.append(sample)
if "Component" in sample.labels:
components_dict[address].add(sample.labels["Component"])
return components_dict, metric_names, metric_samples
for metric in text_string_to_metric_families(response):
for sample in metric.samples:
metric_descriptors[sample.name] = metric
metric_samples.append(sample)
if "Component" in sample.labels:
components_dict[address].add(sample.labels["Component"])
return components_dict, metric_descriptors, metric_samples


def fetch_prometheus_metrics(prom_addresses: List[str]) -> Dict[str, List[Any]]:
Expand Down
Loading
Loading