From 327169f6ac7ebbcecb91fc4e5f798bfe86698dbe Mon Sep 17 00:00:00 2001 From: Matthew Owen Date: Thu, 21 Mar 2024 16:36:07 -0700 Subject: [PATCH] update the throughput calculation further, add comments to document Signed-off-by: Matthew Owen --- python/ray/data/_internal/stats.py | 72 ++++++++++++++++++++++++++---- 1 file changed, 63 insertions(+), 9 deletions(-) diff --git a/python/ray/data/_internal/stats.py b/python/ray/data/_internal/stats.py index 748b2dfd0a03f..7f7b4fcf29359 100644 --- a/python/ray/data/_internal/stats.py +++ b/python/ray/data/_internal/stats.py @@ -901,16 +901,30 @@ def to_string( out += "\nDataset memory:\n" out += "* Spilled to disk: {}MB\n".format(dataset_mb_spilled) + # For throughput, we compute both an observed Ray Data dataset throughput + # and an estimated single node dataset throughput. + + # The observed dataset throughput is computed by dividing the total number + # of rows produced by the total wall time of the dataset (i.e. from start to + # finish how long did the dataset take to be processed). With the recursive + # nature of the DatasetStatsSummary, we use get_total_wall_time to determine + # the total wall time (this finds the difference between the earliest start + # and latest end for any block in any operator). + + # The estimated single node dataset throughput is computed by dividing the + # total number of rows produced the sum of the wall times across all blocks + # of all operators. This assumes that on a single node the work done would + # be equivalent, with no concurrency. output_num_rows = self.operators_stats[-1].output_num_rows total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0 - total_time = self.get_total_wall_time() + wall_time = self.get_total_wall_time() total_time_all_blocks = self.get_total_time_all_blocks() - if total_num_out_rows and total_time and total_time_all_blocks: + if total_num_out_rows and wall_time and total_time_all_blocks: out += "\n" out += "Dataset throughput:\n" out += ( "\t* Ray Data throughput:" - f" {total_num_out_rows / total_time} " + f" {total_num_out_rows / wall_time} " "rows/s\n" ) out += ( @@ -982,14 +996,39 @@ def __repr__(self, level=0) -> str: ) def get_total_wall_time(self) -> float: - parent_wall_times = [p.get_total_wall_time() for p in self.parents] - parent_max_wall_time = max(parent_wall_times) if parent_wall_times else 0 - return parent_max_wall_time + sum( - ss.wall_time.get("max", 0) if ss.wall_time else 0 - for ss in self.operators_stats - ) + """Calculate the total wall time for the dataset, this is done by finding + the earliest start time and latest end time for any block in any operator. + The wall time is the difference of these two times. + """ + + def find_start_end(self) -> tuple[float, float]: + curr_earliest_start = min( + ss.earliest_start_time for ss in self.operators_stats + ) + curr_latest_end = max(ss.latest_end_time for ss in self.operators_stats) + parent_start_ends = [ + p.get_wall_time_helper() for p in self.parents if p.operators_stats + ] + parent_earliest_start = ( + min(p[0] for p in parent_start_ends) + if parent_start_ends + else float("inf") + ) + parent_latest_end = ( + max(p[1] for p in parent_start_ends) + if parent_start_ends + else float("-inf") + ) + return min(parent_earliest_start, curr_earliest_start), max( + parent_latest_end, curr_latest_end + ) + + earliest_start, latest_end = find_start_end() + wall_time = latest_end - earliest_start + return wall_time if abs(wall_time) != float("inf") else 0 def get_total_time_all_blocks(self) -> float: + """Calculate the sum of the wall times across all blocks of all operators.""" parent_total_times = [p.get_total_time_all_blocks() for p in self.parents] parent_time_total = sum(parent_total_times) if parent_total_times else 0 return parent_time_total + sum( @@ -1026,6 +1065,8 @@ class OperatorStatsSummary: # overall runtime of the operator, pulled from the stats actor, whereas the # computed walltimes in `self.wall_time` are calculated on a operator level. time_total_s: float + earliest_start_time: float + latest_end_time: float # String summarizing high-level statistics from executing the operator block_execution_summary_str: str # The fields below are dicts with stats aggregated across blocks @@ -1173,6 +1214,8 @@ def from_block_metadata( operator_name=operator_name, is_sub_operator=is_sub_operator, time_total_s=time_total_s, + earliest_start_time=earliest_start_time, + latest_end_time=latest_end_time, block_execution_summary_str=exec_summary_str, wall_time=wall_time_stats, cpu_time=cpu_stats, @@ -1280,6 +1323,17 @@ def __str__(self) -> str: node_count_stats["count"], ) if output_num_rows_stats and self.time_total_s and wall_time_stats: + # For throughput, we compute both an observed Ray Data operator throughput + # and an estimated single node operator throughput. + + # The observed Ray Data operator throughput is computed by dividing the + # total number of rows produced by the wall time of the operator, + # time_total_s. + + # The estimated single node operator throughput is computed by dividing the + # total number of rows produced by the the sum of the wall times across all + # blocks of the operator. This assumes that on a single node the work done + # would be equivalent, with no concurrency. total_num_out_rows = output_num_rows_stats["sum"] out += indent out += "* Operator throughput:\n"