Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Fix throughput time calculations #44138

Merged
merged 13 commits into from
Mar 26, 2024

Conversation

omatthew98
Copy link
Contributor

Why are these changes needed?

When doing some documentation for the new stats we added, I noticed some quirks with the dataset throughput reporting. Dataset throughput was often reported as worse through Ray Data than the single node approximation, but the single node approximation should be a floor for how fast it should run through Ray Data (i.e. they should match if a single node / single block was used).

We want compute the Ray Data dataset throughput by dividing the total number of rows produced by the total wall time it took for the operation to run (i.e. from starting the process to process completion). We want to compute the single node approximation by dividing the total number of rows produced by the total wall time of all blocks across all operators (i.e. the total amount of time if there were no concurrent processes).

Upon digging, I found that the calculation was incorrect. For the Ray Data dataset throughput, we were dividing by self.time_total_s which is the total time for the last operator in the chain (sometimes correct, but incorrect with multiple non-fused operators). For the single node approximation we were dividing by self.get_total_wall_time(), which is actually what we want for the Ray Data dataset throughput, but incorrect for the single node approximation as it does not total time over the multiple blocks.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Contributor

@scottjlee scottjlee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, maybe add a unit test to cover the case with multiple non-fused ops

Comment on lines 906 to 921
total_time = self.get_total_wall_time()
total_time_all_blocks = self.get_total_time_all_blocks()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add docstrings for get_total_wall_time() and get_total_time_all_blocks() to explain their return value / differentiate them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do and explain a bit more why we are doing the calculation the way we are for both this and the operator throughput.

Comment on lines 995 to 1037
return parent_time_total + sum(
ss.wall_time.get("sum", 0) if ss.wall_time else 0
for ss in self.operators_stats
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really getting the total wall time across all operators, not across blocks right? would a more appropriate name for the method be get_total_time_all_operators?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that name makes more sense, but because we use ss.wall_time.get("sum", 0) that is a sum across all blocks within the operator.

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Comment on lines +1322 to +1332
# For throughput, we compute both an observed Ray Data operator throughput
# and an estimated single node operator throughput.

# The observed Ray Data operator throughput is computed by dividing the
# total number of rows produced by the wall time of the operator,
# time_total_s.

# The estimated single node operator throughput is computed by dividing the
# total number of rows produced by the the sum of the wall times across all
# blocks of the operator. This assumes that on a single node the work done
# would be equivalent, with no concurrency.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this comment is kind of copied from above? or should they be in both places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One is for the dataset throughput and the other is for operator throughput, they are similar but have some differences. Either way, I wanted to have some information about the throughput in the two places that we are calculating it, but if that feels redundant / there is a common place to put the shared info that makes sense too.

Comment on lines 1238 to 1239
total_time, total_percent = metrics_dict["Total"]
metrics_dict.pop("Total")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
total_time, total_percent = metrics_dict["Total"]
metrics_dict.pop("Total")
total_time, total_percent = metrics_dict.pop("Total")

r"Operator (\d+).*?Ray Data throughput: (\d+\.\d+) rows/s.*?Estimated single node throughput: (\d+\.\d+) rows/s", # noqa: E501
re.DOTALL,
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just adding a clarifying comment

Suggested change
# Ray data throughput should always be better than single node throughput for multi-cpu case.

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
@c21 c21 merged commit a7e3a2e into ray-project:master Mar 26, 2024
5 checks passed
stephanie-wang pushed a commit to stephanie-wang/ray that referenced this pull request Mar 27, 2024
When doing some documentation for the new stats we added, I noticed some quirks with the dataset throughput reporting. Dataset throughput was often reported as worse through Ray Data than the single node approximation, but the single node approximation should be a floor for how fast it should run through Ray Data (i.e. they should match if a single node / single block was used). 

We want compute the Ray Data dataset throughput by dividing the total number of rows produced by the total wall time it took for the operation to run (i.e. from starting the process to process completion). We want to compute the single node approximation by dividing the total number of rows produced by the total wall time of all blocks across all operators (i.e. the total amount of time if there were no concurrent processes). 

Upon digging, I found that the calculation was incorrect. For the Ray Data dataset throughput, we were dividing by `self.time_total_s` which is the total time for the last operator in the chain (sometimes correct, but incorrect with multiple non-fused operators). For the single node approximation we were dividing by `self.get_total_wall_time()`, which is actually what we want for the Ray Data dataset throughput, but incorrect for the single node approximation as it does not total time over the multiple blocks. 


Signed-off-by: Matthew Owen <mowen@anyscale.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 7, 2024
When doing some documentation for the new stats we added, I noticed some quirks with the dataset throughput reporting. Dataset throughput was often reported as worse through Ray Data than the single node approximation, but the single node approximation should be a floor for how fast it should run through Ray Data (i.e. they should match if a single node / single block was used). 

We want compute the Ray Data dataset throughput by dividing the total number of rows produced by the total wall time it took for the operation to run (i.e. from starting the process to process completion). We want to compute the single node approximation by dividing the total number of rows produced by the total wall time of all blocks across all operators (i.e. the total amount of time if there were no concurrent processes). 

Upon digging, I found that the calculation was incorrect. For the Ray Data dataset throughput, we were dividing by `self.time_total_s` which is the total time for the last operator in the chain (sometimes correct, but incorrect with multiple non-fused operators). For the single node approximation we were dividing by `self.get_total_wall_time()`, which is actually what we want for the Ray Data dataset throughput, but incorrect for the single node approximation as it does not total time over the multiple blocks. 


Signed-off-by: Matthew Owen <mowen@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants