Skip to content

Commit

Permalink
[data] operator level metrics (#40805)
Browse files Browse the repository at this point in the history
Emits data metrics at the operator level. Operator tags are created by appending their name `op.name` with it's order in the dag topology.

For example: `ray.data.range(...).map_batches(...).limit(...).map_batches(...).materialize()`:
<img width="559" alt="Screenshot 2023-10-30 at 12 44 18 PM" src="https://github.com/ray-project/ray/assets/39287272/f8cbfc10-dd03-405c-8910-2d58e91190ab">
---------

Signed-off-by: Andrew Xue <andewzxue@gmail.com>
  • Loading branch information
Zandew committed Oct 31, 2023
1 parent df6fe4c commit 10f4008
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 65 deletions.
32 changes: 16 additions & 16 deletions dashboard/modules/metrics/dashboards/data_dashboard_panels.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
unit="bytes",
targets=[
Target(
expr="sum(ray_data_spilled_bytes{{{global_filters}}}) by (dataset)",
legend="Bytes Spilled: {{dataset}}",
expr="sum(ray_data_spilled_bytes{{{global_filters}}}) by (dataset, operator)",
legend="Bytes Spilled: {{dataset}}, {{operator}}",
)
],
),
Expand All @@ -26,8 +26,8 @@
unit="bytes",
targets=[
Target(
expr="sum(ray_data_allocated_bytes{{{global_filters}}}) by (dataset)",
legend="Bytes Allocated: {{dataset}}",
expr="sum(ray_data_allocated_bytes{{{global_filters}}}) by (dataset, operator)",
legend="Bytes Allocated: {{dataset}}, {{operator}}",
)
],
),
Expand All @@ -38,8 +38,8 @@
unit="bytes",
targets=[
Target(
expr="sum(ray_data_freed_bytes{{{global_filters}}}) by (dataset)",
legend="Bytes Freed: {{dataset}}",
expr="sum(ray_data_freed_bytes{{{global_filters}}}) by (dataset, operator)",
legend="Bytes Freed: {{dataset}}, {{operator}}",
)
],
),
Expand All @@ -50,8 +50,8 @@
unit="bytes",
targets=[
Target(
expr="sum(ray_data_current_bytes{{{global_filters}}}) by (dataset)",
legend="Current Usage: {{dataset}}",
expr="sum(ray_data_current_bytes{{{global_filters}}}) by (dataset, operator)",
legend="Current Usage: {{dataset}}, {{operator}}",
)
],
),
Expand All @@ -62,8 +62,8 @@
unit="cores",
targets=[
Target(
expr="sum(ray_data_cpu_usage_cores{{{global_filters}}}) by (dataset)",
legend="CPU Usage: {{dataset}}",
expr="sum(ray_data_cpu_usage_cores{{{global_filters}}}) by (dataset, operator)",
legend="CPU Usage: {{dataset}}, {{operator}}",
)
],
),
Expand All @@ -74,8 +74,8 @@
unit="cores",
targets=[
Target(
expr="sum(ray_data_gpu_usage_cores{{{global_filters}}}) by (dataset)",
legend="GPU Usage: {{dataset}}",
expr="sum(ray_data_gpu_usage_cores{{{global_filters}}}) by (dataset, operator)",
legend="GPU Usage: {{dataset}}, {{operator}}",
)
],
),
Expand All @@ -86,8 +86,8 @@
unit="bytes",
targets=[
Target(
expr="sum(ray_data_output_bytes{{{global_filters}}}) by (dataset)",
legend="Bytes Outputted: {{dataset}}",
expr="sum(ray_data_output_bytes{{{global_filters}}}) by (dataset, operator)",
legend="Bytes Outputted: {{dataset}}, {{operator}}",
)
],
),
Expand All @@ -98,8 +98,8 @@
unit="seconds",
targets=[
Target(
expr="sum(ray_data_block_generation_seconds{{{global_filters}}}) by (dataset)",
legend="Block Generation Time: {{dataset}}",
expr="sum(ray_data_block_generation_seconds{{{global_filters}}}) by (dataset, operator)",
legend="Block Generation Time: {{dataset}}, {{operator}}",
)
],
),
Expand Down
12 changes: 10 additions & 2 deletions python/ray/data/_internal/execution/streaming_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def run(self):
self._output_node.outqueue.append(None)
# Clears metrics for this dataset so that they do
# not persist in the grafana dashboard after execution
clear_stats_actor_metrics({"dataset": self._dataset_tag})
clear_stats_actor_metrics(self._get_metrics_tags())

def get_stats(self):
"""Return the stats object for the streaming execution.
Expand Down Expand Up @@ -295,7 +295,8 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
op_state.refresh_progress_bar()

update_stats_actor_metrics(
[op.metrics for op in self._topology], {"dataset": self._dataset_tag}
[op.metrics for op in self._topology],
self._get_metrics_tags(),
)

# Log metrics of newly completed operators.
Expand Down Expand Up @@ -348,6 +349,13 @@ def _report_current_usage(
if self._global_info:
self._global_info.set_description(resources_status)

def _get_metrics_tags(self):
"""Returns a list of tags for operator-level metrics."""
return [
{"dataset": self._dataset_tag, "operator": f"{op.name}{i}"}
for i, op in enumerate(self._topology)
]


def _validate_dag(dag: PhysicalOperator, limits: ExecutionResources) -> None:
"""Raises an exception on invalid DAGs.
Expand Down
86 changes: 44 additions & 42 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,59 +150,60 @@ def __init__(self, max_stats=1000):
# Ray Data dashboard metrics
# Everything is a gauge because we need to reset all of
# a dataset's metrics to 0 after each finishes execution.
tags_keys = ("dataset",)
op_tags_keys = ("dataset", "operator")
self.bytes_spilled = Gauge(
"data_spilled_bytes",
description="""Bytes spilled by dataset operators.
DataContext.enable_get_object_locations_for_metrics
must be set to True to report this metric""",
tag_keys=tags_keys,
tag_keys=op_tags_keys,
)
self.bytes_allocated = Gauge(
"data_allocated_bytes",
description="Bytes allocated by dataset operators",
tag_keys=tags_keys,
tag_keys=op_tags_keys,
)
self.bytes_freed = Gauge(
"data_freed_bytes",
description="Bytes freed by dataset operators",
tag_keys=tags_keys,
tag_keys=op_tags_keys,
)
self.bytes_current = Gauge(
"data_current_bytes",
description="Bytes currently in memory store used by dataset operators",
tag_keys=tags_keys,
tag_keys=op_tags_keys,
)
self.cpu_usage = Gauge(
"data_cpu_usage_cores",
description="CPUs allocated to dataset operators",
tag_keys=tags_keys,
tag_keys=op_tags_keys,
)
self.gpu_usage = Gauge(
"data_gpu_usage_cores",
description="GPUs allocated to dataset operators",
tag_keys=tags_keys,
tag_keys=op_tags_keys,
)
self.bytes_outputted = Gauge(
"data_output_bytes",
description="Bytes outputted by dataset operators",
tag_keys=tags_keys,
tag_keys=op_tags_keys,
)
self.block_generation_time = Gauge(
"data_block_generation_seconds",
description="Time spent generating blocks.",
tag_keys=tags_keys,
tag_keys=op_tags_keys,
)

iter_tag_keys = ("dataset",)
self.iter_total_blocked_s = Gauge(
"data_iter_total_blocked_seconds",
description="Seconds user thread is blocked by iter_batches()",
tag_keys=tags_keys,
tag_keys=iter_tag_keys,
)
self.iter_user_s = Gauge(
"data_iter_user_seconds",
description="Seconds spent in user code",
tag_keys=tags_keys,
tag_keys=iter_tag_keys,
)

def record_start(self, stats_uuid):
Expand Down Expand Up @@ -245,29 +246,35 @@ def get_dataset_id(self):
self.next_dataset_id += 1
return dataset_id

def update_metrics(self, stats: Dict[str, Union[int, float]], tags: Dict[str, str]):
self.bytes_spilled.set(stats["obj_store_mem_spilled"], tags)
self.bytes_allocated.set(stats["obj_store_mem_alloc"], tags)
self.bytes_freed.set(stats["obj_store_mem_freed"], tags)
self.bytes_current.set(stats["obj_store_mem_cur"], tags)
self.bytes_outputted.set(stats["bytes_outputs_generated"], tags)
self.cpu_usage.set(stats["cpu_usage"], tags)
self.gpu_usage.set(stats["gpu_usage"], tags)
self.block_generation_time.set(stats["block_generation_time"], tags)
def update_metrics(
self,
op_metrics: List[Dict[str, Union[int, float]]],
tags_list: List[Dict[str, str]],
):
for stats, tags in zip(op_metrics, tags_list):
self.bytes_spilled.set(stats.get("obj_store_mem_spilled", 0), tags)
self.bytes_allocated.set(stats.get("obj_store_mem_alloc", 0), tags)
self.bytes_freed.set(stats.get("obj_store_mem_freed", 0), tags)
self.bytes_current.set(stats.get("obj_store_mem_cur", 0), tags)
self.bytes_outputted.set(stats.get("bytes_outputs_generated", 0), tags)
self.cpu_usage.set(stats.get("cpu_usage", 0), tags)
self.gpu_usage.set(stats.get("gpu_usage", 0), tags)
self.block_generation_time.set(stats.get("block_generation_time", 0), tags)

def update_iter_metrics(self, stats: "DatasetStats", tags):
self.iter_total_blocked_s.set(stats.iter_total_blocked_s.get(), tags)
self.iter_user_s.set(stats.iter_user_s.get(), tags)

def clear_metrics(self, tags: Dict[str, str]):
self.bytes_spilled.set(0, tags)
self.bytes_allocated.set(0, tags)
self.bytes_freed.set(0, tags)
self.bytes_current.set(0, tags)
self.bytes_outputted.set(0, tags)
self.cpu_usage.set(0, tags)
self.gpu_usage.set(0, tags)
self.block_generation_time.set(0, tags)
def clear_metrics(self, tags_list: List[Dict[str, str]]):
for tags in tags_list:
self.bytes_spilled.set(0, tags)
self.bytes_allocated.set(0, tags)
self.bytes_freed.set(0, tags)
self.bytes_current.set(0, tags)
self.bytes_outputted.set(0, tags)
self.cpu_usage.set(0, tags)
self.gpu_usage.set(0, tags)
self.block_generation_time.set(0, tags)

def clear_iter_metrics(self, tags: Dict[str, str]):
self.iter_total_blocked_s.set(0, tags)
Expand Down Expand Up @@ -313,33 +320,28 @@ def _check_cluster_stats_actor():


def update_stats_actor_metrics(
op_metrics: List[OpRuntimeMetrics], tags: Dict[str, str]
op_metrics: List[OpRuntimeMetrics], tags_list: List[Dict[str, str]]
):
global _stats_actor
_check_cluster_stats_actor()

metric_keys = OpRuntimeMetrics.get_metric_keys()
stats = {key: 0 for key in metric_keys}
for op_metric in op_metrics:
metric_dict = op_metric.as_dict(metrics_only=True)
for key in metric_keys:
stats[key] += metric_dict.get(key, 0)

_stats_actor.update_metrics.remote(stats, tags)
_stats_actor.update_metrics.remote(
[metric.as_dict() for metric in op_metrics], tags_list
)


def update_stats_actor_iter_metrics(stats: "DatasetStats", tags: Dict[str, str]):
def update_stats_actor_iter_metrics(stats: "DatasetStats", tags_list: Dict[str, str]):
global _stats_actor
_check_cluster_stats_actor()

_stats_actor.update_iter_metrics.remote(stats, tags)
_stats_actor.update_iter_metrics.remote(stats, tags_list)


def clear_stats_actor_metrics(tags: Dict[str, str]):
def clear_stats_actor_metrics(tags_list: List[Dict[str, str]]):
global _stats_actor
_check_cluster_stats_actor()

_stats_actor.clear_metrics.remote(tags)
_stats_actor.clear_metrics.remote(tags_list)


def clear_stats_actor_iter_metrics(tags: Dict[str, str]):
Expand Down
15 changes: 10 additions & 5 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,10 @@ def test_stats_actor_metrics():
# There should be nothing in object store at the end of execution.
assert final_metric.obj_store_mem_cur == 0

assert "dataset" + ds._uuid == update_fn.call_args_list[-1].args[1]["dataset"]
tags = update_fn.call_args_list[-1].args[1]
assert all([tag["dataset"] == "dataset" + ds._uuid for tag in tags])
assert tags[0]["operator"] == "Input0"
assert tags[1]["operator"] == "ReadRange->MapBatches(<lambda>)1"

def sleep_three(x):
import time
Expand Down Expand Up @@ -1231,31 +1234,33 @@ def test_dataset_name():
with patch_update_stats_actor() as update_fn:
mds = ds.materialize()

assert update_fn.call_args_list[-1].args[1]["dataset"] == "test_ds" + mds._uuid
assert update_fn.call_args_list[-1].args[1][0]["dataset"] == "test_ds" + mds._uuid

# Names persist after an execution
ds = ds.random_shuffle()
assert ds._name == "test_ds"
with patch_update_stats_actor() as update_fn:
mds = ds.materialize()

assert update_fn.call_args_list[-1].args[1]["dataset"] == "test_ds" + mds._uuid
assert update_fn.call_args_list[-1].args[1][0]["dataset"] == "test_ds" + mds._uuid

ds._set_name("test_ds_two")
ds = ds.map_batches(lambda x: x)
assert ds._name == "test_ds_two"
with patch_update_stats_actor() as update_fn:
mds = ds.materialize()

assert update_fn.call_args_list[-1].args[1]["dataset"] == "test_ds_two" + mds._uuid
assert (
update_fn.call_args_list[-1].args[1][0]["dataset"] == "test_ds_two" + mds._uuid
)

ds._set_name(None)
ds = ds.map_batches(lambda x: x)
assert ds._name is None
with patch_update_stats_actor() as update_fn:
mds = ds.materialize()

assert update_fn.call_args_list[-1].args[1]["dataset"] == "dataset" + mds._uuid
assert update_fn.call_args_list[-1].args[1][0]["dataset"] == "dataset" + mds._uuid

ds = ray.data.range(100, parallelism=20)
ds._set_name("very_loooooooong_name")
Expand Down

0 comments on commit 10f4008

Please sign in to comment.