diff --git a/dashboard/modules/metrics/dashboards/data_dashboard_panels.py b/dashboard/modules/metrics/dashboards/data_dashboard_panels.py index 8d69ddb6e01a2..2207b9b0f4c4a 100644 --- a/dashboard/modules/metrics/dashboards/data_dashboard_panels.py +++ b/dashboard/modules/metrics/dashboards/data_dashboard_panels.py @@ -14,8 +14,8 @@ unit="bytes", targets=[ Target( - expr="sum(ray_data_spilled_bytes{{{global_filters}}}) by (dataset)", - legend="Bytes Spilled: {{dataset}}", + expr="sum(ray_data_spilled_bytes{{{global_filters}}}) by (dataset, operator)", + legend="Bytes Spilled: {{dataset}}, {{operator}}", ) ], ), @@ -26,8 +26,8 @@ unit="bytes", targets=[ Target( - expr="sum(ray_data_allocated_bytes{{{global_filters}}}) by (dataset)", - legend="Bytes Allocated: {{dataset}}", + expr="sum(ray_data_allocated_bytes{{{global_filters}}}) by (dataset, operator)", + legend="Bytes Allocated: {{dataset}}, {{operator}}", ) ], ), @@ -38,8 +38,8 @@ unit="bytes", targets=[ Target( - expr="sum(ray_data_freed_bytes{{{global_filters}}}) by (dataset)", - legend="Bytes Freed: {{dataset}}", + expr="sum(ray_data_freed_bytes{{{global_filters}}}) by (dataset, operator)", + legend="Bytes Freed: {{dataset}}, {{operator}}", ) ], ), @@ -50,8 +50,8 @@ unit="bytes", targets=[ Target( - expr="sum(ray_data_current_bytes{{{global_filters}}}) by (dataset)", - legend="Current Usage: {{dataset}}", + expr="sum(ray_data_current_bytes{{{global_filters}}}) by (dataset, operator)", + legend="Current Usage: {{dataset}}, {{operator}}", ) ], ), @@ -62,8 +62,8 @@ unit="cores", targets=[ Target( - expr="sum(ray_data_cpu_usage_cores{{{global_filters}}}) by (dataset)", - legend="CPU Usage: {{dataset}}", + expr="sum(ray_data_cpu_usage_cores{{{global_filters}}}) by (dataset, operator)", + legend="CPU Usage: {{dataset}}, {{operator}}", ) ], ), @@ -74,8 +74,8 @@ unit="cores", targets=[ Target( - expr="sum(ray_data_gpu_usage_cores{{{global_filters}}}) by (dataset)", - legend="GPU Usage: {{dataset}}", + expr="sum(ray_data_gpu_usage_cores{{{global_filters}}}) by (dataset, operator)", + legend="GPU Usage: {{dataset}}, {{operator}}", ) ], ), @@ -86,8 +86,8 @@ unit="bytes", targets=[ Target( - expr="sum(ray_data_output_bytes{{{global_filters}}}) by (dataset)", - legend="Bytes Outputted: {{dataset}}", + expr="sum(ray_data_output_bytes{{{global_filters}}}) by (dataset, operator)", + legend="Bytes Outputted: {{dataset}}, {{operator}}", ) ], ), @@ -98,8 +98,8 @@ unit="seconds", targets=[ Target( - expr="sum(ray_data_block_generation_seconds{{{global_filters}}}) by (dataset)", - legend="Block Generation Time: {{dataset}}", + expr="sum(ray_data_block_generation_seconds{{{global_filters}}}) by (dataset, operator)", + legend="Block Generation Time: {{dataset}}, {{operator}}", ) ], ), diff --git a/python/ray/data/_internal/execution/streaming_executor.py b/python/ray/data/_internal/execution/streaming_executor.py index a0bf044b8e266..db1d2fc347a88 100644 --- a/python/ray/data/_internal/execution/streaming_executor.py +++ b/python/ray/data/_internal/execution/streaming_executor.py @@ -213,7 +213,7 @@ def run(self): self._output_node.outqueue.append(None) # Clears metrics for this dataset so that they do # not persist in the grafana dashboard after execution - clear_stats_actor_metrics({"dataset": self._dataset_tag}) + clear_stats_actor_metrics(self._get_metrics_tags()) def get_stats(self): """Return the stats object for the streaming execution. @@ -295,7 +295,8 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: op_state.refresh_progress_bar() update_stats_actor_metrics( - [op.metrics for op in self._topology], {"dataset": self._dataset_tag} + [op.metrics for op in self._topology], + self._get_metrics_tags(), ) # Log metrics of newly completed operators. @@ -348,6 +349,13 @@ def _report_current_usage( if self._global_info: self._global_info.set_description(resources_status) + def _get_metrics_tags(self): + """Returns a list of tags for operator-level metrics.""" + return [ + {"dataset": self._dataset_tag, "operator": f"{op.name}{i}"} + for i, op in enumerate(self._topology) + ] + def _validate_dag(dag: PhysicalOperator, limits: ExecutionResources) -> None: """Raises an exception on invalid DAGs. diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 883cf96756996..033f73affbf1e 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -150,59 +150,60 @@ def __init__(self, max_stats=1000): # Ray Data dashboard metrics # Everything is a gauge because we need to reset all of # a dataset's metrics to 0 after each finishes execution. - tags_keys = ("dataset",) + op_tags_keys = ("dataset", "operator") self.bytes_spilled = Gauge( "data_spilled_bytes", description="""Bytes spilled by dataset operators. DataContext.enable_get_object_locations_for_metrics must be set to True to report this metric""", - tag_keys=tags_keys, + tag_keys=op_tags_keys, ) self.bytes_allocated = Gauge( "data_allocated_bytes", description="Bytes allocated by dataset operators", - tag_keys=tags_keys, + tag_keys=op_tags_keys, ) self.bytes_freed = Gauge( "data_freed_bytes", description="Bytes freed by dataset operators", - tag_keys=tags_keys, + tag_keys=op_tags_keys, ) self.bytes_current = Gauge( "data_current_bytes", description="Bytes currently in memory store used by dataset operators", - tag_keys=tags_keys, + tag_keys=op_tags_keys, ) self.cpu_usage = Gauge( "data_cpu_usage_cores", description="CPUs allocated to dataset operators", - tag_keys=tags_keys, + tag_keys=op_tags_keys, ) self.gpu_usage = Gauge( "data_gpu_usage_cores", description="GPUs allocated to dataset operators", - tag_keys=tags_keys, + tag_keys=op_tags_keys, ) self.bytes_outputted = Gauge( "data_output_bytes", description="Bytes outputted by dataset operators", - tag_keys=tags_keys, + tag_keys=op_tags_keys, ) self.block_generation_time = Gauge( "data_block_generation_seconds", description="Time spent generating blocks.", - tag_keys=tags_keys, + tag_keys=op_tags_keys, ) + iter_tag_keys = ("dataset",) self.iter_total_blocked_s = Gauge( "data_iter_total_blocked_seconds", description="Seconds user thread is blocked by iter_batches()", - tag_keys=tags_keys, + tag_keys=iter_tag_keys, ) self.iter_user_s = Gauge( "data_iter_user_seconds", description="Seconds spent in user code", - tag_keys=tags_keys, + tag_keys=iter_tag_keys, ) def record_start(self, stats_uuid): @@ -245,29 +246,35 @@ def get_dataset_id(self): self.next_dataset_id += 1 return dataset_id - def update_metrics(self, stats: Dict[str, Union[int, float]], tags: Dict[str, str]): - self.bytes_spilled.set(stats["obj_store_mem_spilled"], tags) - self.bytes_allocated.set(stats["obj_store_mem_alloc"], tags) - self.bytes_freed.set(stats["obj_store_mem_freed"], tags) - self.bytes_current.set(stats["obj_store_mem_cur"], tags) - self.bytes_outputted.set(stats["bytes_outputs_generated"], tags) - self.cpu_usage.set(stats["cpu_usage"], tags) - self.gpu_usage.set(stats["gpu_usage"], tags) - self.block_generation_time.set(stats["block_generation_time"], tags) + def update_metrics( + self, + op_metrics: List[Dict[str, Union[int, float]]], + tags_list: List[Dict[str, str]], + ): + for stats, tags in zip(op_metrics, tags_list): + self.bytes_spilled.set(stats.get("obj_store_mem_spilled", 0), tags) + self.bytes_allocated.set(stats.get("obj_store_mem_alloc", 0), tags) + self.bytes_freed.set(stats.get("obj_store_mem_freed", 0), tags) + self.bytes_current.set(stats.get("obj_store_mem_cur", 0), tags) + self.bytes_outputted.set(stats.get("bytes_outputs_generated", 0), tags) + self.cpu_usage.set(stats.get("cpu_usage", 0), tags) + self.gpu_usage.set(stats.get("gpu_usage", 0), tags) + self.block_generation_time.set(stats.get("block_generation_time", 0), tags) def update_iter_metrics(self, stats: "DatasetStats", tags): self.iter_total_blocked_s.set(stats.iter_total_blocked_s.get(), tags) self.iter_user_s.set(stats.iter_user_s.get(), tags) - def clear_metrics(self, tags: Dict[str, str]): - self.bytes_spilled.set(0, tags) - self.bytes_allocated.set(0, tags) - self.bytes_freed.set(0, tags) - self.bytes_current.set(0, tags) - self.bytes_outputted.set(0, tags) - self.cpu_usage.set(0, tags) - self.gpu_usage.set(0, tags) - self.block_generation_time.set(0, tags) + def clear_metrics(self, tags_list: List[Dict[str, str]]): + for tags in tags_list: + self.bytes_spilled.set(0, tags) + self.bytes_allocated.set(0, tags) + self.bytes_freed.set(0, tags) + self.bytes_current.set(0, tags) + self.bytes_outputted.set(0, tags) + self.cpu_usage.set(0, tags) + self.gpu_usage.set(0, tags) + self.block_generation_time.set(0, tags) def clear_iter_metrics(self, tags: Dict[str, str]): self.iter_total_blocked_s.set(0, tags) @@ -313,33 +320,28 @@ def _check_cluster_stats_actor(): def update_stats_actor_metrics( - op_metrics: List[OpRuntimeMetrics], tags: Dict[str, str] + op_metrics: List[OpRuntimeMetrics], tags_list: List[Dict[str, str]] ): global _stats_actor _check_cluster_stats_actor() - metric_keys = OpRuntimeMetrics.get_metric_keys() - stats = {key: 0 for key in metric_keys} - for op_metric in op_metrics: - metric_dict = op_metric.as_dict(metrics_only=True) - for key in metric_keys: - stats[key] += metric_dict.get(key, 0) - - _stats_actor.update_metrics.remote(stats, tags) + _stats_actor.update_metrics.remote( + [metric.as_dict() for metric in op_metrics], tags_list + ) -def update_stats_actor_iter_metrics(stats: "DatasetStats", tags: Dict[str, str]): +def update_stats_actor_iter_metrics(stats: "DatasetStats", tags_list: Dict[str, str]): global _stats_actor _check_cluster_stats_actor() - _stats_actor.update_iter_metrics.remote(stats, tags) + _stats_actor.update_iter_metrics.remote(stats, tags_list) -def clear_stats_actor_metrics(tags: Dict[str, str]): +def clear_stats_actor_metrics(tags_list: List[Dict[str, str]]): global _stats_actor _check_cluster_stats_actor() - _stats_actor.clear_metrics.remote(tags) + _stats_actor.clear_metrics.remote(tags_list) def clear_stats_actor_iter_metrics(tags: Dict[str, str]): diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 54e462f31d460..41170ae282fb2 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -1192,7 +1192,10 @@ def test_stats_actor_metrics(): # There should be nothing in object store at the end of execution. assert final_metric.obj_store_mem_cur == 0 - assert "dataset" + ds._uuid == update_fn.call_args_list[-1].args[1]["dataset"] + tags = update_fn.call_args_list[-1].args[1] + assert all([tag["dataset"] == "dataset" + ds._uuid for tag in tags]) + assert tags[0]["operator"] == "Input0" + assert tags[1]["operator"] == "ReadRange->MapBatches()1" def sleep_three(x): import time @@ -1231,7 +1234,7 @@ def test_dataset_name(): with patch_update_stats_actor() as update_fn: mds = ds.materialize() - assert update_fn.call_args_list[-1].args[1]["dataset"] == "test_ds" + mds._uuid + assert update_fn.call_args_list[-1].args[1][0]["dataset"] == "test_ds" + mds._uuid # Names persist after an execution ds = ds.random_shuffle() @@ -1239,7 +1242,7 @@ def test_dataset_name(): with patch_update_stats_actor() as update_fn: mds = ds.materialize() - assert update_fn.call_args_list[-1].args[1]["dataset"] == "test_ds" + mds._uuid + assert update_fn.call_args_list[-1].args[1][0]["dataset"] == "test_ds" + mds._uuid ds._set_name("test_ds_two") ds = ds.map_batches(lambda x: x) @@ -1247,7 +1250,9 @@ def test_dataset_name(): with patch_update_stats_actor() as update_fn: mds = ds.materialize() - assert update_fn.call_args_list[-1].args[1]["dataset"] == "test_ds_two" + mds._uuid + assert ( + update_fn.call_args_list[-1].args[1][0]["dataset"] == "test_ds_two" + mds._uuid + ) ds._set_name(None) ds = ds.map_batches(lambda x: x) @@ -1255,7 +1260,7 @@ def test_dataset_name(): with patch_update_stats_actor() as update_fn: mds = ds.materialize() - assert update_fn.call_args_list[-1].args[1]["dataset"] == "dataset" + mds._uuid + assert update_fn.call_args_list[-1].args[1][0]["dataset"] == "dataset" + mds._uuid ds = ray.data.range(100, parallelism=20) ds._set_name("very_loooooooong_name")