-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[data] create StatsManager to manage _StatsActor remote calls #40913
Conversation
Should we still submit the stats update tasks on a different thread? If we're doing it every 10s the overhead is a lot less significant |
Could you measure the latency for the task submission? You can use one of the release tests. We just have to be careful here because even if the % time is small, it can block GPU time in training scenarios (i.e. tail latency is important, not just average). If we put it on a background thread, we could also reduce the interval a bit to get more interactivity in the metrics. |
Ran
|
Also timed the overhead for starting a thread on the same test:
|
Hmm the 1% overhead is not too bad, but I think it's probably best to put it in a background thread still (to allow shorter interval and make the code a bit more robust). No need to start new threads; you can put the stats update in the same loop as |
Sounds good 👍 .
What about the update done in |
eda3e67
to
8431a5d
Compare
Ah, that one is probably okay since it's running on the driver. For latency-critical scenarios, the iter_batches loop is usually run on a different process. Also, the granularity in the scheduling loop is coarser - there can be lots of batches produced by one Data op task. |
9b06ae1
to
ea31013
Compare
|
||
for batch in formatted_batch_iter: | ||
yield batch | ||
# Update stats in here to avoid blocking main | ||
# iteration thread with task submission overhead. | ||
if stats_update_lock.acquire(blocking=False): | ||
if ( | ||
time.time() - last_stats_update[0] | ||
>= STATS_ACTOR_UPDATE_INTERVAL_SECONDS | ||
): | ||
update_stats_actor_iter_metrics(stats, metrics_tag) | ||
last_stats_update[0] = time.time() | ||
stats_update_lock.release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up adding this here since format_batches
is also used in other places, please let me know if there's anything wrong with this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's kinda weird to couple stats updating code with batch formatting code. Especially the thread pool and lock would make the code hard to read. I'd prefer keeping it in the main thread given the 1% overhead, or using a dedicated component (e.g., StatsManager) with a dedicated thread to update the stats periodically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I asked for this:
Hmm the 1% overhead is not too bad, but I think it's probably best to put it in a background thread still (to allow shorter interval and make the code a bit more robust).
The robustness benefit is worth the code readability to me but happy to sync offline about it. I also think the readability is a bit subjective and it seems fine to me at the moment.
python/ray/data/_internal/stats.py
Outdated
def update_stats_actor_dataset(dataset_tag, state): | ||
global _stats_actor | ||
_check_cluster_stats_actor() | ||
_stats_actor.update_dataset.remote(dataset_tag, state) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is now merged with the general update_stats_actor_metrics
return (self._base_dataset._plan._dataset_name or "") + self._base_dataset._uuid | ||
return ( | ||
self._base_dataset._plan._dataset_name or "dataset" | ||
) + self._base_dataset._uuid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to update this part back when we started using statsactor to generate dataset uuids.
@@ -162,6 +161,7 @@ def _async_iter_batches( | |||
batch_format=batch_format, | |||
collate_fn=collate_fn, | |||
num_threadpool_workers=prefetch_batches, | |||
metrics_tag=metrics_tag, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(not blocking this PR) Does it make more sense to save this metrics_tag in DatasetStats? So we don't need to pass them around together
|
||
for batch in formatted_batch_iter: | ||
yield batch | ||
# Update stats in here to avoid blocking main | ||
# iteration thread with task submission overhead. | ||
if stats_update_lock.acquire(blocking=False): | ||
if ( | ||
time.time() - last_stats_update[0] | ||
>= STATS_ACTOR_UPDATE_INTERVAL_SECONDS | ||
): | ||
update_stats_actor_iter_metrics(stats, metrics_tag) | ||
last_stats_update[0] = time.time() | ||
stats_update_lock.release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's kinda weird to couple stats updating code with batch formatting code. Especially the thread pool and lock would make the code hard to read. I'd prefer keeping it in the main thread given the 1% overhead, or using a dedicated component (e.g., StatsManager) with a dedicated thread to update the stats periodically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
python/ray/data/_internal/stats.py
Outdated
def clear_stats_actor_iter_metrics(tags: Dict[str, str]): | ||
global _stats_actor | ||
_check_cluster_stats_actor() | ||
def update_stats_actor_iter_metrics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible for this to be called by multiple iterators at once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory yes, but the common case is that iterators will be in different processes.
What can happen if they're in the same process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't thread safe right now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a common practice, but theoretically possible. Can we pass in a tag to distinguish them? This tag will be useful for the dashboard as well. Basically, I think we need the dataset id + execution id + StreamSplitDataIterator index in this tag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added support for multiple iterators: we will store the last call from each iterator, and the background thread will send all of them to the stats actor at once.
For multiple iterators of the same dataset on the same process, it should be uncommon for their iteration to overlap right? if their iteration is non-overlapping then it should be pretty easy to see in the time-series view the separate iterations. And even if we do use execution id, is there a way for the user to get the id for themselves?
For streaming split iterator, I added the index.
2cd09a8
to
dd4f333
Compare
python/ray/data/_internal/stats.py
Outdated
|
||
def update_stats_actor_iter_metrics( | ||
self, stats: "DatasetStats", tags: Dict[str, str] | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also force-update when the iterator finishes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of force-update for the other codepath is to update the _StatsActor.datasets
, not to update the metrics, since we clear the metrics right after anyways, its unlikely those last metrics will even be emitted since prometheus only samples every couple seconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a separate issue that would require support for actually stopping the emitting of a metric for a certain tag (right now we just set the metric to 0). A simple way to make sure that the last metrics get emitted is to also just wait 10s before clearing.
@@ -449,6 +445,7 @@ def _debug_dump_topology(topology: Topology, log_to_stdout: bool = True) -> None | |||
for i, (op, state) in enumerate(topology.items()): | |||
logger.get_logger(log_to_stdout).info( | |||
f"{i}: {state.summary_str()}, " | |||
f"Blocks Outputted: {state.num_completed_tasks}/{op.num_outputs_total()}" | |||
f"Blocks Outputted: {state.num_completed_tasks}/{op.num_outputs_total()}\n" | |||
f"{op.metrics.as_dict()}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, maybe print the metrics in a different loop. mixing them together may make the logs harder to read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we moved the interval checking out of this part of the code, do we still want to log this in an interval?
b741a47
to
dd03bb2
Compare
Signed-off-by: Andrew Xue <andewzxue@gmail.com>
a3db5ca
to
cb3bdaf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great after this iteration. I left some small comments.
…oject#40913) Creates a `StatsManager` class to manage remote calls to `_StatsActor`. This singleton manager controls the time interval for reporting metrics to `_StatsActor`: - Runs a single background thread that reports metrics to `_StatsActor` every 5s - This thread is stopped after being inactive for too long, and will be restarted if there is a new update afterwards Also logs op metrics for `_debug_dump_topology`. --------- Signed-off-by: Andrew Xue <andewzxue@gmail.com>
Why are these changes needed?
Creates a
StatsManager
class to manage remote calls to_StatsActor
.This singleton manager controls the time interval for reporting metrics to
_StatsActor
:_StatsActor
every 5sAlso logs op metrics for
_debug_dump_topology
.Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.