Skip to content

Commit

Permalink
update the throughput calculation further, add comments to document
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Owen <mowen@anyscale.com>
  • Loading branch information
omatthew98 committed Mar 21, 2024
1 parent 7c95ee4 commit 327169f
Showing 1 changed file with 63 additions and 9 deletions.
72 changes: 63 additions & 9 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,16 +901,30 @@ def to_string(
out += "\nDataset memory:\n"
out += "* Spilled to disk: {}MB\n".format(dataset_mb_spilled)

# For throughput, we compute both an observed Ray Data dataset throughput
# and an estimated single node dataset throughput.

# The observed dataset throughput is computed by dividing the total number
# of rows produced by the total wall time of the dataset (i.e. from start to
# finish how long did the dataset take to be processed). With the recursive
# nature of the DatasetStatsSummary, we use get_total_wall_time to determine
# the total wall time (this finds the difference between the earliest start
# and latest end for any block in any operator).

# The estimated single node dataset throughput is computed by dividing the
# total number of rows produced the sum of the wall times across all blocks
# of all operators. This assumes that on a single node the work done would
# be equivalent, with no concurrency.
output_num_rows = self.operators_stats[-1].output_num_rows
total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0
total_time = self.get_total_wall_time()
wall_time = self.get_total_wall_time()
total_time_all_blocks = self.get_total_time_all_blocks()
if total_num_out_rows and total_time and total_time_all_blocks:
if total_num_out_rows and wall_time and total_time_all_blocks:
out += "\n"
out += "Dataset throughput:\n"
out += (
"\t* Ray Data throughput:"
f" {total_num_out_rows / total_time} "
f" {total_num_out_rows / wall_time} "
"rows/s\n"
)
out += (
Expand Down Expand Up @@ -982,14 +996,39 @@ def __repr__(self, level=0) -> str:
)

def get_total_wall_time(self) -> float:
parent_wall_times = [p.get_total_wall_time() for p in self.parents]
parent_max_wall_time = max(parent_wall_times) if parent_wall_times else 0
return parent_max_wall_time + sum(
ss.wall_time.get("max", 0) if ss.wall_time else 0
for ss in self.operators_stats
)
"""Calculate the total wall time for the dataset, this is done by finding
the earliest start time and latest end time for any block in any operator.
The wall time is the difference of these two times.
"""

def find_start_end(self) -> tuple[float, float]:
curr_earliest_start = min(
ss.earliest_start_time for ss in self.operators_stats
)
curr_latest_end = max(ss.latest_end_time for ss in self.operators_stats)
parent_start_ends = [
p.get_wall_time_helper() for p in self.parents if p.operators_stats
]
parent_earliest_start = (
min(p[0] for p in parent_start_ends)
if parent_start_ends
else float("inf")
)
parent_latest_end = (
max(p[1] for p in parent_start_ends)
if parent_start_ends
else float("-inf")
)
return min(parent_earliest_start, curr_earliest_start), max(
parent_latest_end, curr_latest_end
)

earliest_start, latest_end = find_start_end()
wall_time = latest_end - earliest_start
return wall_time if abs(wall_time) != float("inf") else 0

def get_total_time_all_blocks(self) -> float:
"""Calculate the sum of the wall times across all blocks of all operators."""
parent_total_times = [p.get_total_time_all_blocks() for p in self.parents]
parent_time_total = sum(parent_total_times) if parent_total_times else 0
return parent_time_total + sum(
Expand Down Expand Up @@ -1026,6 +1065,8 @@ class OperatorStatsSummary:
# overall runtime of the operator, pulled from the stats actor, whereas the
# computed walltimes in `self.wall_time` are calculated on a operator level.
time_total_s: float
earliest_start_time: float
latest_end_time: float
# String summarizing high-level statistics from executing the operator
block_execution_summary_str: str
# The fields below are dicts with stats aggregated across blocks
Expand Down Expand Up @@ -1173,6 +1214,8 @@ def from_block_metadata(
operator_name=operator_name,
is_sub_operator=is_sub_operator,
time_total_s=time_total_s,
earliest_start_time=earliest_start_time,
latest_end_time=latest_end_time,
block_execution_summary_str=exec_summary_str,
wall_time=wall_time_stats,
cpu_time=cpu_stats,
Expand Down Expand Up @@ -1280,6 +1323,17 @@ def __str__(self) -> str:
node_count_stats["count"],
)
if output_num_rows_stats and self.time_total_s and wall_time_stats:
# For throughput, we compute both an observed Ray Data operator throughput
# and an estimated single node operator throughput.

# The observed Ray Data operator throughput is computed by dividing the
# total number of rows produced by the wall time of the operator,
# time_total_s.

# The estimated single node operator throughput is computed by dividing the
# total number of rows produced by the the sum of the wall times across all
# blocks of the operator. This assumes that on a single node the work done
# would be equivalent, with no concurrency.
total_num_out_rows = output_num_rows_stats["sum"]
out += indent
out += "* Operator throughput:\n"
Expand Down

0 comments on commit 327169f

Please sign in to comment.