Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Fix throughput time calculations #44138

Merged
merged 13 commits into from
Mar 26, 2024
99 changes: 81 additions & 18 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,20 +901,35 @@ def to_string(
out += "\nDataset memory:\n"
out += "* Spilled to disk: {}MB\n".format(dataset_mb_spilled)

# For throughput, we compute both an observed Ray Data dataset throughput
# and an estimated single node dataset throughput.

# The observed dataset throughput is computed by dividing the total number
# of rows produced by the total wall time of the dataset (i.e. from start to
# finish how long did the dataset take to be processed). With the recursive
# nature of the DatasetStatsSummary, we use get_total_wall_time to determine
# the total wall time (this finds the difference between the earliest start
# and latest end for any block in any operator).

# The estimated single node dataset throughput is computed by dividing the
# total number of rows produced the sum of the wall times across all blocks
# of all operators. This assumes that on a single node the work done would
# be equivalent, with no concurrency.
output_num_rows = self.operators_stats[-1].output_num_rows
total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0
total_wall_time = self.get_total_wall_time()
if total_num_out_rows and self.time_total_s and total_wall_time:
wall_time = self.get_total_wall_time()
total_time_all_blocks = self.get_total_time_all_blocks()
if total_num_out_rows and wall_time and total_time_all_blocks:
out += "\n"
out += "Dataset throughput:\n"
out += (
"\t* Ray Data throughput:"
f" {total_num_out_rows / self.time_total_s} "
f" {total_num_out_rows / wall_time} "
"rows/s\n"
)
out += (
"\t* Estimated single node throughput:"
f" {total_num_out_rows / total_wall_time} "
f" {total_num_out_rows / total_time_all_blocks} "
"rows/s\n"
)
if verbose_stats_logs and add_global_stats:
Expand All @@ -923,29 +938,39 @@ def to_string(
return out

@staticmethod
def _collect_parent_summaries(
def _collect_dataset_stats_summaries(
curr: "DatasetStatsSummary",
) -> List["DatasetStatsSummary"]:
summs = []
# TODO: Do operators ever have multiple parents? Do we need to deduplicate?
for p in curr.parents:
if p and p.parents:
summs.extend(DatasetStatsSummary._collect_parent_summaries(p))
summs.extend(DatasetStatsSummary._collect_dataset_stats_summaries(p))
return summs + [curr]

@staticmethod
def _find_start_and_end(summ: "DatasetStatsSummary") -> Tuple[float, float]:
earliest_start = min(ops.earliest_start_time for ops in summ.operators_stats)
latest_end = max(ops.latest_end_time for ops in summ.operators_stats)
return earliest_start, latest_end

def runtime_metrics(self) -> str:
total_wall_time = self.get_total_wall_time()

def fmt_line(name: str, time: float) -> str:
return f"* {name}: {fmt(time)} ({time / self.time_total_s * 100:.3f}%)\n"
return f"* {name}: {fmt(time)} ({time / total_wall_time * 100:.3f}%)\n"

summaries = DatasetStatsSummary._collect_parent_summaries(self)
summaries = DatasetStatsSummary._collect_dataset_stats_summaries(self)
out = "Runtime Metrics:\n"
for summ in summaries:
op_total_time = sum(
[op_stats.time_total_s for op_stats in summ.operators_stats]
)
out += fmt_line(summ.base_name, op_total_time)
if len(summ.operators_stats) > 0:
earliest_start, latest_end = DatasetStatsSummary._find_start_and_end(
summ
)
op_total_time = latest_end - earliest_start
out += fmt_line(summ.base_name, op_total_time)
out += fmt_line("Scheduling", self.streaming_exec_schedule_s)
out += fmt_line("Total", self.time_total_s)
out += fmt_line("Total", total_wall_time)
return out

def __repr__(self, level=0) -> str:
Expand Down Expand Up @@ -981,11 +1006,33 @@ def __repr__(self, level=0) -> str:
)

def get_total_wall_time(self) -> float:
parent_wall_times = [p.get_total_wall_time() for p in self.parents]
parent_max_wall_time = max(parent_wall_times) if parent_wall_times else 0
return parent_max_wall_time + sum(
ss.wall_time.get("max", 0) if ss.wall_time else 0
for ss in self.operators_stats
"""Calculate the total wall time for the dataset, this is done by finding
the earliest start time and latest end time for any block in any operator.
The wall time is the difference of these two times.
"""
start_ends = [
DatasetStatsSummary._find_start_and_end(summ)
for summ in DatasetStatsSummary._collect_dataset_stats_summaries(self)
if len(summ.operators_stats) > 0
]
if len(start_ends) == 0:
return 0
else:
earliest_start = min(start_end[0] for start_end in start_ends)
latest_end = max(start_end[1] for start_end in start_ends)
return latest_end - earliest_start

def get_total_time_all_blocks(self) -> float:
"""Calculate the sum of the wall times across all blocks of all operators."""
summaries = DatasetStatsSummary._collect_dataset_stats_summaries(self)
return sum(
(
sum(
ops.wall_time.get("sum", 0) if ops.wall_time else 0
for ops in summ.operators_stats
)
)
for summ in summaries
)

def get_total_cpu_time(self) -> float:
Expand Down Expand Up @@ -1017,6 +1064,8 @@ class OperatorStatsSummary:
# overall runtime of the operator, pulled from the stats actor, whereas the
# computed walltimes in `self.wall_time` are calculated on a operator level.
time_total_s: float
earliest_start_time: float
latest_end_time: float
# String summarizing high-level statistics from executing the operator
block_execution_summary_str: str
# The fields below are dicts with stats aggregated across blocks
Expand Down Expand Up @@ -1053,6 +1102,7 @@ def from_block_metadata(
exec_stats = [m.exec_stats for m in block_metas if m.exec_stats is not None]
rounded_total = 0
time_total_s = 0
earliest_start_time, latest_end_time = 0, 0

if exec_stats:
# Calculate the total execution time of operator as
Expand Down Expand Up @@ -1164,6 +1214,8 @@ def from_block_metadata(
operator_name=operator_name,
is_sub_operator=is_sub_operator,
time_total_s=time_total_s,
earliest_start_time=earliest_start_time,
latest_end_time=latest_end_time,
block_execution_summary_str=exec_summary_str,
wall_time=wall_time_stats,
cpu_time=cpu_stats,
Expand Down Expand Up @@ -1271,6 +1323,17 @@ def __str__(self) -> str:
node_count_stats["count"],
)
if output_num_rows_stats and self.time_total_s and wall_time_stats:
# For throughput, we compute both an observed Ray Data operator throughput
# and an estimated single node operator throughput.

# The observed Ray Data operator throughput is computed by dividing the
# total number of rows produced by the wall time of the operator,
# time_total_s.

# The estimated single node operator throughput is computed by dividing the
# total number of rows produced by the the sum of the wall times across all
# blocks of the operator. This assumes that on a single node the work done
# would be equivalent, with no concurrency.
Comment on lines +1326 to +1336
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this comment is kind of copied from above? or should they be in both places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One is for the dataset throughput and the other is for operator throughput, they are similar but have some differences. Either way, I wanted to have some information about the throughput in the two places that we are calculating it, but if that feels redundant / there is a common place to put the shared info that makes sense too.

total_num_out_rows = output_num_rows_stats["sum"]
out += indent
out += "* Operator throughput:\n"
Expand Down
104 changes: 96 additions & 8 deletions python/ray/data/tests/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,15 @@ def dummy_map_batches(x):
return x


def map_batches_sleep(x, n):
"""Dummy function used in calls to map_batches below, which
simply sleeps for `n` seconds before returning the input batch."""
time.sleep(n)
return x
def dummy_map_batches_sleep(n):
"""Function used to create a function that sleeps for n seconds
to be used in map_batches below."""

def f(x):
time.sleep(n)
return x

return f


@contextmanager
Expand Down Expand Up @@ -1022,7 +1026,7 @@ def test_summarize_blocks(ray_start_regular_shared, op_two_block):
def test_get_total_stats(ray_start_regular_shared, op_two_block):
"""Tests a set of similar getter methods which pull aggregated
statistics values after calculating operator-level stats:
`DatasetStats.get_max_wall_time()`,
`DatasetStats.get_total_wall_time()`,
`DatasetStats.get_total_cpu_time()`,
`DatasetStats.get_max_heap_memory()`."""
block_params, block_meta_list = op_two_block
Expand All @@ -1033,8 +1037,18 @@ def test_get_total_stats(ray_start_regular_shared, op_two_block):

dataset_stats_summary = stats.to_summary()
op_stats = dataset_stats_summary.operators_stats[0]
wall_time_stats = op_stats.wall_time
assert dataset_stats_summary.get_total_wall_time() == wall_time_stats.get("max")

# simple case with only one block / summary, result should match difference between
# the start and end time
assert (
dataset_stats_summary.get_total_wall_time()
== op_stats.latest_end_time - op_stats.earliest_start_time
)

# total time across all blocks is sum of wall times of blocks
assert dataset_stats_summary.get_total_time_all_blocks() == sum(
block_params["wall_time"]
)

cpu_time_stats = op_stats.cpu_time
assert dataset_stats_summary.get_total_cpu_time() == cpu_time_stats.get("sum")
Expand Down Expand Up @@ -1185,10 +1199,84 @@ def f(x):
assert ds._plan.stats().extra_metrics["task_submission_backpressure_time"] > 0


def test_runtime_metrics(ray_start_regular_shared):
from math import isclose

def time_to_seconds(time_str):
if time_str.endswith("us"):
# Convert microseconds to seconds
return float(time_str[:-2]) / (1000 * 1000)
elif time_str.endswith("ms"):
# Convert milliseconds to seconds
return float(time_str[:-2]) / 1000
elif time_str.endswith("s"):
# Already in seconds, just remove the 's' and convert to float
return float(time_str[:-1])

f = dummy_map_batches_sleep(0.01)
ds = ray.data.range(100).map(f).materialize().map(f).materialize()
metrics_str = ds._plan.stats().runtime_metrics()

# Dictionary to store the metrics for testing
metrics_dict = {}

# Regular expression to match the pattern of each metric line
pattern = re.compile(r"\* (.+?): ([\d\.]+(?:ms|s)) \(([\d\.]+)%\)")

# Split the input string into lines and iterate over them
for line in metrics_str.split("\n"):
match = pattern.match(line)
if match:
# Extracting the operator name, time, and percentage
operator_name, time_str, percent_str = match.groups()
# Converting percentage to float and keeping time as string
metrics_dict[operator_name] = (
time_to_seconds(time_str),
float(percent_str),
)

total_time, total_percent = metrics_dict.pop("Total")
assert total_percent == 100

for time_s, percent in metrics_dict.values():
assert time_s < total_time
# Check percentage, this is done with some expected loss of precision
# due to rounding in the intital output.
assert isclose(percent, time_s / total_time * 100, rel_tol=0.01)


# NOTE: All tests above share a Ray cluster, while the tests below do not. These
# tests should only be carefully reordered to retain this invariant!


def test_dataset_throughput():
ray.shutdown()
ray.init(num_cpus=2)

f = dummy_map_batches_sleep(0.01)
ds = ray.data.range(100).map(f).materialize().map(f).materialize()

# Pattern to match operator throughput
operator_pattern = re.compile(
r"Operator (\d+).*?Ray Data throughput: (\d+\.\d+) rows/s.*?Estimated single node throughput: (\d+\.\d+) rows/s", # noqa: E501
re.DOTALL,
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just adding a clarifying comment

Suggested change
# Ray data throughput should always be better than single node throughput for multi-cpu case.

# Ray data throughput should always be better than single node throughput for
# multi-cpu case.
for match in operator_pattern.findall(ds.stats()):
assert float(match[1]) >= float(match[2])

# Pattern to match dataset throughput
dataset_pattern = re.compile(
r"Dataset throughput:.*?Ray Data throughput: (\d+\.\d+) rows/s.*?Estimated single node throughput: (\d+\.\d+) rows/s", # noqa: E501
re.DOTALL,
)

dataset_match = dataset_pattern.search(ds.stats())
assert float(dataset_match[1]) >= float(dataset_match[2])


def test_stats_actor_cap_num_stats(ray_start_cluster):
actor = _StatsActor.remote(3)
metadatas = []
Expand Down
Loading