Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Time streaming split overhead #43477

Merged
merged 17 commits into from
Mar 4, 2024

Conversation

omatthew98
Copy link
Contributor

@omatthew98 omatthew98 commented Feb 27, 2024

Why are these changes needed?

This adds timing for the overhead created by using the streaming_split iterator. There are two primary overheads that are measured:

  1. The OutputSplitter operator. This operator is responsible for distributing data to different splits. It runs in the scheduler thread.
  2. The SplitCoordinator actor. The overhead is mainly from the get and the barrier, which happen in some background threads that handle the remote task requests.

The first overhead is tracked at the operator level in the OutputSplitter operator. The second overhead is tracked at the Dataset level in the SplitCoordinator actor.

Related issue number

Closes #42802

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
…inal stats object

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
@scottjlee scottjlee self-assigned this Feb 29, 2024
Comment on lines 239 to 243
stats = (
self._executor.get_stats()
if self._executor
else self._base_dataset._plan.stats()
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can rewrite SplitCoordinator.stats() to return this instead of DatasetStatsSummary, and use the method here

Signed-off-by: Matthew Owen <mowen@anyscale.com>
…dd to streaming test

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
@omatthew98 omatthew98 changed the title Time streaming split overhead [Data] Time streaming split overhead Feb 29, 2024
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Copy link
Contributor

@scottjlee scottjlee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you include an example of the new additions to the output string in the PR description?

@@ -1256,6 +1259,9 @@ def to_string(self) -> str:
out += " * Num blocks unknown location: {}\n".format(
self.iter_unknown_location
)
if self.streaming_split_coord_time.get() != 0:
out += "* Streaming split coordinator overhead time: "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for formatting, should we omit the * since this is the start of a new section?

@@ -631,6 +631,9 @@ def __init__(
self.global_bytes_restored: int = 0
self.dataset_bytes_spilled: int = 0

# Streaming split iterator stats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also note in the comment that this is measured at Dataset level (as opposed to operator level)

Suggested change
# Streaming split iterator stats
# Streaming split coordinator stats

@@ -47,6 +48,8 @@ def __init__(
self._output_queue: deque[RefBundle] = deque()
# The number of rows output to each output split so far.
self._num_output: List[int] = [0 for _ in range(n)]
# The time of the overhead for the output splitter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also note in the comment that this is measured at operator level (as opposed to Dataset level)

@omatthew98 omatthew98 marked this pull request as ready for review March 1, 2024 22:47
Signed-off-by: Matthew Owen <mowen@anyscale.com>
…mments

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
Signed-off-by: Matthew Owen <mowen@anyscale.com>
@@ -204,7 +208,7 @@ def get(

This is intended to be called concurrently from multiple clients.
"""

start_time = time.perf_counter()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, consider creating a context manager or decorator util to make the code cleaner.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered that, I think best to leave as is in case the stats object is None. I think for my previous PR switching to a context manager caused some integration tests to fail (#43283). I could use the context manager only when stats is not None but not sure if that would end up being cleaner.

@omatthew98 omatthew98 added the release-blocker P0 Issue that blocks the release label Mar 4, 2024
@raulchen raulchen merged commit 164fc16 into ray-project:master Mar 4, 2024
8 of 9 checks passed
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Jun 7, 2024
This adds timing for the overhead created by using the [`streaming_split`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.streaming_split.html) iterator. There are two primary overheads that are measured:
1. The [OutputSplitter](https://github.com/ray-project/ray/blob/1cc18ea622a6e1899a17f4548aa3734c3656a90f/python/ray/data/_internal/execution/operators/output_splitter.py#L18) operator. This operator is responsible for distributing data to different splits. It runs in the scheduler thread.
2. The [SplitCoordinator actor](https://github.com/ray-project/ray/blob/783da640a20ddbd3b41b893485abf187f0f27223/python/ray/data/_internal/iterator/stream_split_iterator.py#L124). The overhead is mainly from the get and the barrier, which happen in some background threads that handle the remote task requests.

The first overhead is tracked at the operator level in the OutputSplitter operator. The second overhead is tracked at the Dataset level in the SplitCoordinator actor.

---------

Signed-off-by: Matthew Owen <mowen@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-blocker P0 Issue that blocks the release
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Data] Measure overhead from streaming split
3 participants