Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Time ray data tasks (total time and udf time) #43241

Merged
merged 16 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ def _map_task(
# TODO(Clark): Add input file propagation from input blocks.
m_out = BlockAccessor.for_block(b_out).get_metadata([], None)
m_out.exec_stats = stats.build()
m_out.exec_stats.udf_time_s = map_transformer.udf_time()
m_out.exec_stats.task_idx = ctx.task_idx
yield b_out
yield m_out
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import itertools
import time
from abc import abstractmethod
from enum import Enum
from typing import Any, Callable, Dict, Iterable, List, Optional, TypeVar, Union
Expand Down Expand Up @@ -33,6 +34,7 @@ def __init__(
self,
input_type: MapTransformFnDataType,
output_type: MapTransformFnDataType,
is_udf: bool = False,
):
"""
Args:
Expand All @@ -44,6 +46,7 @@ def __init__(
self._input_type = input_type
self._output_type = output_type
self._target_max_block_size = None
self._is_udf = is_udf

@abstractmethod
def __call__(
Expand Down Expand Up @@ -89,6 +92,7 @@ def __init__(
self.set_transform_fns(transform_fns)
self._init_fn = init_fn if init_fn is not None else lambda: None
self._target_max_block_size = None
self._udf_time = 0

def set_transform_fns(self, transform_fns: List[MapTransformFn]) -> None:
"""Set the transform functions."""
Expand Down Expand Up @@ -121,6 +125,18 @@ def init(self) -> None:
"""
self._init_fn()

def _udf_timed_iter(
self, input: Iterable[MapTransformFnData]
) -> Iterable[MapTransformFnData]:
while True:
try:
start = time.perf_counter()
output = next(input)
self._udf_time += time.perf_counter() - start
yield output
except StopIteration:
break

def apply_transform(
self,
input_blocks: Iterable[Block],
Expand All @@ -137,6 +153,8 @@ def apply_transform(
# Apply the transform functions sequentially to the input iterable.
for transform_fn in self._transform_fns:
iter = transform_fn(iter, ctx)
if transform_fn._is_udf:
iter = self._udf_timed_iter(iter)
return iter

def fuse(self, other: "MapTransformer") -> "MapTransformer":
Expand All @@ -162,6 +180,9 @@ def fused_init_fn():
transformer.set_target_max_block_size(target_max_block_size)
return transformer

def udf_time(self) -> float:
return self._udf_time


def create_map_transformer_from_block_fn(
block_fn: MapTransformCallable[Block, Block],
Expand All @@ -185,11 +206,10 @@ def create_map_transformer_from_block_fn(
class RowMapTransformFn(MapTransformFn):
"""A rows-to-rows MapTransformFn."""

def __init__(self, row_fn: MapTransformCallable[Row, Row]):
def __init__(self, row_fn: MapTransformCallable[Row, Row], is_udf: bool = False):
self._row_fn = row_fn
super().__init__(
MapTransformFnDataType.Row,
MapTransformFnDataType.Row,
MapTransformFnDataType.Row, MapTransformFnDataType.Row, is_udf=is_udf
)

def __call__(self, input: Iterable[Row], ctx: TaskContext) -> Iterable[Row]:
Expand All @@ -202,11 +222,12 @@ def __repr__(self) -> str:
class BatchMapTransformFn(MapTransformFn):
"""A batch-to-batch MapTransformFn."""

def __init__(self, batch_fn: MapTransformCallable[DataBatch, DataBatch]):
def __init__(
self, batch_fn: MapTransformCallable[DataBatch, DataBatch], is_udf: bool = False
):
self._batch_fn = batch_fn
super().__init__(
MapTransformFnDataType.Batch,
MapTransformFnDataType.Batch,
MapTransformFnDataType.Batch, MapTransformFnDataType.Batch, is_udf=is_udf
)

def __call__(
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/_internal/planner/plan_udf_map_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def _create_map_transformer_for_map_batches_op(
zero_copy_batch=zero_copy_batch,
),
# Apply the UDF.
BatchMapTransformFn(batch_fn),
BatchMapTransformFn(batch_fn, is_udf=True),
# Convert output batches to blocks.
BuildOutputBlocksMapTransformFn.for_batches(),
]
Expand All @@ -296,7 +296,7 @@ def _create_map_transformer_for_row_based_map_op(
# Convert input blocks to rows.
BlocksToRowsMapTransformFn.instance(),
# Apply the UDF.
RowMapTransformFn(row_fn),
RowMapTransformFn(row_fn, is_udf=True),
# Convert output rows to blocks.
BuildOutputBlocksMapTransformFn.for_rows(),
]
Expand Down
24 changes: 20 additions & 4 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ class OperatorStatsSummary:
# {"min": ..., "max": ..., "mean": ..., "sum": ...}
wall_time: Optional[Dict[str, float]] = None
cpu_time: Optional[Dict[str, float]] = None
udf_time: Optional[Dict[str, float]] = None
# memory: no "sum" stat
memory: Optional[Dict[str, float]] = None
output_num_rows: Optional[Dict[str, float]] = None
Expand Down Expand Up @@ -937,17 +938,14 @@ def from_block_metadata(
len(task_rows), exec_summary_str
)

wall_time_stats = None
wall_time_stats, cpu_stats, memory_stats, udf_stats = None, None, None, None
if exec_stats:
wall_time_stats = {
"min": min([e.wall_time_s for e in exec_stats]),
"max": max([e.wall_time_s for e in exec_stats]),
"mean": np.mean([e.wall_time_s for e in exec_stats]),
"sum": sum([e.wall_time_s for e in exec_stats]),
}

cpu_stats, memory_stats = None, None
if exec_stats:
cpu_stats = {
"min": min([e.cpu_time_s for e in exec_stats]),
"max": max([e.cpu_time_s for e in exec_stats]),
Expand All @@ -964,6 +962,13 @@ def from_block_metadata(
"mean": int(np.mean(memory_stats_mb)),
}

udf_stats = {
"min": min([e.udf_time_s for e in exec_stats]),
"max": max([e.udf_time_s for e in exec_stats]),
"mean": np.mean([e.udf_time_s for e in exec_stats]),
"sum": sum([e.udf_time_s for e in exec_stats]),
}

output_num_rows_stats = None
output_num_rows = [m.num_rows for m in block_metas if m.num_rows is not None]
if output_num_rows:
Expand Down Expand Up @@ -1007,6 +1012,7 @@ def from_block_metadata(
block_execution_summary_str=exec_summary_str,
wall_time=wall_time_stats,
cpu_time=cpu_stats,
udf_time=udf_stats,
memory=memory_stats,
output_num_rows=output_num_rows_stats,
output_size_bytes=output_size_bytes_stats,
Expand Down Expand Up @@ -1045,6 +1051,16 @@ def __str__(self) -> str:
fmt(cpu_stats["sum"]),
)

udf_stats = self.udf_time
if udf_stats:
out += indent
out += "* UDF time: {} min, {} max, {} mean, {} total\n".format(
fmt(udf_stats["min"]),
fmt(udf_stats["max"]),
fmt(udf_stats["mean"]),
fmt(udf_stats["sum"]),
)

memory_stats = self.memory
if memory_stats:
out += indent
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/block.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def __init__(self):
self.start_time_s: Optional[float] = None
self.end_time_s: Optional[float] = None
self.wall_time_s: Optional[float] = None
self.udf_time_s: Optional[float] = 0
self.cpu_time_s: Optional[float] = None
self.node_id = ray.runtime_context.get_runtime_context().get_node_id()
# Max memory usage. May be an overestimate since we do not
Expand All @@ -149,6 +150,7 @@ def __repr__(self):
{
"wall_time_s": self.wall_time_s,
"cpu_time_s": self.cpu_time_s,
"udf_time_s": self.udf_time_s,
"node_id": self.node_id,
}
)
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ def op_two_block():
"max_rss_bytes": [1024 * 1024 * 2, 1024 * 1024 * 1],
"wall_time": [5, 10],
"cpu_time": [1.2, 3.4],
"udf_time": [1.1, 1.7],
"node_id": ["a1", "b2"],
"task_idx": [0, 1],
}
Expand All @@ -424,6 +425,7 @@ def op_two_block():
)
block_exec_stats.wall_time_s = block_params["wall_time"][i]
block_exec_stats.cpu_time_s = block_params["cpu_time"][i]
block_exec_stats.udf_time_s = block_params["udf_time"][i]
block_exec_stats.node_id = block_params["node_id"][i]
block_exec_stats.max_rss_bytes = block_params["max_rss_bytes"][i]
block_exec_stats.task_idx = block_params["task_idx"][i]
Expand Down