Skip to content

Commit

Permalink
[data] add rows outputted to data metrics (ray-project#40280)
Browse files Browse the repository at this point in the history
Exports rows outputted as a data metric to the ray dashboard.
Adds it to the ray data overview, replacing bytes outputted in the table.
Also adds CPU/GPU usage to the ray data overview.

Signed-off-by: Andrew Xue <andewzxue@gmail.com>
  • Loading branch information
Zandew authored and ujjawal-khare committed Nov 29, 2023
1 parent b5748ef commit 6ae3a7f
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 17 deletions.
33 changes: 24 additions & 9 deletions dashboard/client/src/components/DataOverviewTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
Table,
TableBody,
TableCell,
TableContainer,
TableHead,
TableRow,
TextField,
Expand Down Expand Up @@ -33,9 +34,9 @@ const columns = [
helpInfo: <Typography>Blocks outputted by output operator.</Typography>,
},
{ label: "State", align: "center" },
{ label: "Bytes Outputted" },
{ label: "Rows Outputted" },
{
label: "Memory Usage (Current / Max)",
label: "Memory Usage (current / max)",
helpInfo: (
<Typography>
Amount of object store memory used by a dataset. Includes spilled
Expand All @@ -53,6 +54,14 @@ const columns = [
</Typography>
),
},
{
label: "Logical CPU Cores (current / max)",
align: "center",
},
{
label: "Logical GPU Cores (current / max)",
align: "center",
},
{ label: "Start Time", align: "center" },
{ label: "End Time", align: "center" },
];
Expand Down Expand Up @@ -100,7 +109,7 @@ const DataOverviewTable = ({
<StateCounter type="task" list={datasetList} />
</div>
</div>
<div className={classes.tableContainer}>
<TableContainer>
<Table>
<TableHead>
<TableRow>
Expand All @@ -123,7 +132,7 @@ const DataOverviewTable = ({
</TableRow>
</TableHead>
<TableBody>
{list.map((dataset, index) => (
{list.map((dataset) => (
<DatasetTable
datasetMetrics={dataset}
isExpanded={expandedDatasets[dataset.dataset]}
Expand All @@ -139,7 +148,7 @@ const DataOverviewTable = ({
))}
</TableBody>
</Table>
</div>
</TableContainer>
</div>
);
};
Expand Down Expand Up @@ -196,7 +205,7 @@ const DataRow = ({
{isDatasetRow && datasetMetrics.dataset}
{isOperatorRow && operatorMetrics.operator}
</TableCell>
<TableCell align="right" size={"small"}>
<TableCell align="right" style={{ width: 200 }}>
<TaskProgressBar
showLegend={false}
numFinished={data.progress}
Expand All @@ -212,16 +221,22 @@ const DataRow = ({
<TableCell align="center">
<StatusChip type="task" status={data.state} />
</TableCell>
<TableCell align="right">
{memoryConverter(Number(data.ray_data_output_bytes.max))}
</TableCell>
<TableCell align="right">{data.ray_data_output_rows.max}</TableCell>
<TableCell align="right">
{memoryConverter(Number(data.ray_data_current_bytes.value))}/
{memoryConverter(Number(data.ray_data_current_bytes.max))}
</TableCell>
<TableCell align="right">
{memoryConverter(Number(data.ray_data_spilled_bytes.max))}
</TableCell>
<TableCell align="center" style={{ width: 200 }}>
{data.ray_data_cpu_usage_cores.value}/
{data.ray_data_cpu_usage_cores.max}
</TableCell>
<TableCell align="center" style={{ width: 200 }}>
{data.ray_data_gpu_usage_cores.value}/
{data.ray_data_gpu_usage_cores.max}
</TableCell>
<TableCell align="center">
{isDatasetRow && formatDateFromTimeMs(datasetMetrics.start_time * 1000)}
</TableCell>
Expand Down
38 changes: 33 additions & 5 deletions dashboard/client/src/pages/data/DataOverview.component.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe("DataOverview", () => {
total: 100,
start_time: 0,
end_time: undefined,
ray_data_output_bytes: {
ray_data_output_rows: {
max: 10,
},
ray_data_spilled_bytes: {
Expand All @@ -24,13 +24,21 @@ describe("DataOverview", () => {
value: 30,
max: 40,
},
ray_data_cpu_usage_cores: {
value: 50,
max: 60,
},
ray_data_gpu_usage_cores: {
value: 70,
max: 80,
},
operators: [
{
operator: "test_ds1_op1",
state: "RUNNING",
progress: 99,
total: 101,
ray_data_output_bytes: {
ray_data_output_rows: {
max: 11,
},
ray_data_spilled_bytes: {
Expand All @@ -40,6 +48,14 @@ describe("DataOverview", () => {
value: 31,
max: 41,
},
ray_data_cpu_usage_cores: {
value: 51,
max: 61,
},
ray_data_gpu_usage_cores: {
value: 71,
max: 81,
},
},
],
},
Expand All @@ -50,7 +66,7 @@ describe("DataOverview", () => {
total: 200,
start_time: 1,
end_time: 2,
ray_data_output_bytes: {
ray_data_output_rows: {
max: 50,
},
ray_data_spilled_bytes: {
Expand All @@ -60,6 +76,14 @@ describe("DataOverview", () => {
value: 70,
max: 80,
},
ray_data_cpu_usage_cores: {
value: 90,
max: 100,
},
ray_data_gpu_usage_cores: {
value: 110,
max: 120,
},
operators: [],
},
];
Expand All @@ -71,9 +95,11 @@ describe("DataOverview", () => {
expect(screen.getByText("test_ds1")).toBeVisible();
expect(screen.getByText("50 / 100")).toBeVisible();
expect(screen.getByText("1969/12/31 16:00:00")).toBeVisible();
expect(screen.getByText("10.0000B")).toBeVisible();
expect(screen.getByText("10")).toBeVisible();
expect(screen.getByText("20.0000B")).toBeVisible();
expect(screen.getByText("30.0000B/40.0000B")).toBeVisible();
expect(screen.getByText("50/60")).toBeVisible();
expect(screen.getByText("70/80")).toBeVisible();

// Operator dropdown
expect(screen.queryByText("test_ds1_op1")).toBeNull();
Expand All @@ -87,8 +113,10 @@ describe("DataOverview", () => {
expect(screen.getByText("200 / 200")).toBeVisible();
expect(screen.getByText("1969/12/31 16:00:01")).toBeVisible();
expect(screen.getByText("1969/12/31 16:00:02")).toBeVisible();
expect(screen.getByText("50.0000B")).toBeVisible();
expect(screen.getByText("50")).toBeVisible();
expect(screen.getByText("60.0000B")).toBeVisible();
expect(screen.getByText("70.0000B/80.0000B")).toBeVisible();
expect(screen.getByText("90/100")).toBeVisible();
expect(screen.getByText("110/120")).toBeVisible();
});
});
4 changes: 4 additions & 0 deletions dashboard/client/src/pages/metrics/Metrics.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ const DATA_METRICS_CONFIG: MetricsSectionConfig[] = [
title: "Bytes Outputted",
pathParams: "orgId=1&theme=light&panelId=7",
},
{
title: "Rows Outputted",
pathParams: "orgId=1&theme=light&panelId=11",
},
{
title: "Block Generation Time",
pathParams: "orgId=1&theme=light&panelId=8",
Expand Down
10 changes: 9 additions & 1 deletion dashboard/client/src/type/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@ export type DataMetrics = {
value: number;
max: number;
};
ray_data_output_bytes: {
ray_data_output_rows: {
max: number;
};
ray_data_spilled_bytes: {
max: number;
};
ray_data_cpu_usage_cores: {
value: number;
max: number;
};
ray_data_gpu_usage_cores: {
value: number;
max: number;
};
progress: number;
total: number;
};
4 changes: 3 additions & 1 deletion dashboard/modules/data/data_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ class PrometheusQuery(Enum):


DATASET_METRICS = {
"ray_data_output_bytes": (PrometheusQuery.MAX,),
"ray_data_output_rows": (PrometheusQuery.MAX,),
"ray_data_spilled_bytes": (PrometheusQuery.MAX,),
"ray_data_current_bytes": (PrometheusQuery.VALUE, PrometheusQuery.MAX),
"ray_data_cpu_usage_cores": (PrometheusQuery.VALUE, PrometheusQuery.MAX),
"ray_data_gpu_usage_cores": (PrometheusQuery.VALUE, PrometheusQuery.MAX),
}


Expand Down
4 changes: 3 additions & 1 deletion dashboard/modules/data/tests/test_data_head.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
"state",
"progress",
"total",
"ray_data_output_bytes",
"ray_data_output_rows",
"ray_data_spilled_bytes",
"ray_data_current_bytes",
"ray_data_cpu_usage_cores",
"ray_data_gpu_usage_cores",
]

RESPONSE_SCHEMA = [
Expand Down
12 changes: 12 additions & 0 deletions dashboard/modules/metrics/dashboards/data_dashboard_panels.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,18 @@
)
],
),
Panel(
id=11,
title="Rows Outputted",
description="Total rows outputted by dataset operators.",
unit="rows",
targets=[
Target(
expr="sum(ray_data_output_rows{{{global_filters}}}) by (dataset, operator)",
legend="Rows Outputted: {{dataset}}",
)
],
),
Panel(
id=8,
title="Block Generation Time",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class OpRuntimeMetrics:
bytes_outputs_generated: int = field(
default=0, metadata={"map_only": True, "export_metric": True}
)
# Number of rows of generated output blocks that are from finished tasks.
rows_outputs_generated: int = field(
default=0, metadata={"map_only": True, "export_metric": True}
)

# Number of output blocks that are already taken by the downstream.
num_outputs_taken: int = 0
Expand Down Expand Up @@ -218,6 +222,8 @@ def on_output_generated(self, task_index: int, output: RefBundle):
for block_ref, meta in output.blocks:
assert meta.exec_stats and meta.exec_stats.wall_time_s
self.block_generation_time += meta.exec_stats.wall_time_s
assert meta.num_rows is not None
self.rows_outputs_generated += meta.num_rows
trace_allocation(block_ref, "operator_output")

def on_task_finished(self, task_index: int):
Expand Down
7 changes: 7 additions & 0 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,11 @@ def __init__(self, max_stats=1000):
description="Bytes outputted by dataset operators",
tag_keys=op_tags_keys,
)
self.rows_outputted = Gauge(
"data_output_rows",
description="Rows outputted by dataset operators",
tag_keys=op_tags_keys,
)
self.block_generation_time = Gauge(
"data_block_generation_seconds",
description="Time spent generating blocks.",
Expand Down Expand Up @@ -260,6 +265,7 @@ def update_metrics(
self.bytes_freed.set(stats.get("obj_store_mem_freed", 0), tags)
self.bytes_current.set(stats.get("obj_store_mem_cur", 0), tags)
self.bytes_outputted.set(stats.get("bytes_outputs_generated", 0), tags)
self.rows_outputted.set(stats.get("rows_outputs_generated", 0), tags)
self.cpu_usage.set(stats.get("cpu_usage", 0), tags)
self.gpu_usage.set(stats.get("gpu_usage", 0), tags)
self.block_generation_time.set(stats.get("block_generation_time", 0), tags)
Expand All @@ -279,6 +285,7 @@ def clear_metrics(self, tags_list: List[Dict[str, str]]):
self.bytes_freed.set(0, tags)
self.bytes_current.set(0, tags)
self.bytes_outputted.set(0, tags)
self.rows_outputted.set(0, tags)
self.cpu_usage.set(0, tags)
self.gpu_usage.set(0, tags)
self.block_generation_time.set(0, tags)
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def gen_expected_metrics(
"'bytes_inputs_processed': N",
"'num_outputs_generated': N",
"'bytes_outputs_generated': N",
"'rows_outputs_generated': N",
"'num_outputs_taken': N",
"'bytes_outputs_taken': N",
"'num_outputs_of_finished_tasks': N",
Expand Down Expand Up @@ -564,6 +565,7 @@ def check_stats():
" bytes_inputs_processed: N,\n"
" num_outputs_generated: N,\n"
" bytes_outputs_generated: N,\n"
" rows_outputs_generated: N,\n"
" num_outputs_taken: N,\n"
" bytes_outputs_taken: N,\n"
" num_outputs_of_finished_tasks: N,\n"
Expand Down Expand Up @@ -1193,6 +1195,7 @@ def test_stats_actor_metrics():
== ds._plan.stats().extra_metrics["obj_store_mem_freed"]
)
assert final_metric.bytes_outputs_generated == 1000 * 80 * 80 * 4 * 8 # 8B per int
assert final_metric.rows_outputs_generated == 1000 * 80 * 80 * 4
# There should be nothing in object store at the end of execution.
assert final_metric.obj_store_mem_cur == 0

Expand Down

0 comments on commit 6ae3a7f

Please sign in to comment.