Skip to content

Commit

Permalink
fix throughput time calculations
Browse files Browse the repository at this point in the history
Signed-off-by: Matthew Owen <mowen@anyscale.com>
  • Loading branch information
omatthew98 committed Mar 19, 2024
1 parent a28dfdd commit c0065ae
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -903,18 +903,19 @@ def to_string(

output_num_rows = self.operators_stats[-1].output_num_rows
total_num_out_rows = output_num_rows["sum"] if output_num_rows else 0
total_wall_time = self.get_total_wall_time()
if total_num_out_rows and self.time_total_s and total_wall_time:
total_time = self.get_total_wall_time()
total_time_all_blocks = self.get_total_time_all_blocks()
if total_num_out_rows and total_time and total_time_all_blocks:
out += "\n"
out += "Dataset throughput:\n"
out += (
"\t* Ray Data throughput:"
f" {total_num_out_rows / self.time_total_s} "
f" {total_num_out_rows / total_time} "
"rows/s\n"
)
out += (
"\t* Estimated single node throughput:"
f" {total_num_out_rows / total_wall_time} "
f" {total_num_out_rows / total_time_all_blocks} "
"rows/s\n"
)
if verbose_stats_logs and add_global_stats:
Expand Down Expand Up @@ -988,6 +989,14 @@ def get_total_wall_time(self) -> float:
for ss in self.operators_stats
)

def get_total_time_all_blocks(self) -> float:
parent_total_times = [p.get_total_time_all_blocks() for p in self.parents]
parent_time_total = sum(parent_total_times) if parent_total_times else 0
return parent_time_total + sum(
ss.wall_time.get("sum", 0) if ss.wall_time else 0
for ss in self.operators_stats
)

def get_total_cpu_time(self) -> float:
parent_sum = sum(p.get_total_cpu_time() for p in self.parents)
return parent_sum + sum(
Expand Down

0 comments on commit c0065ae

Please sign in to comment.