-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Train] Update run status and actor status for train runs. #46395
[Train] Update run status and actor status for train runs. #46395
Conversation
Co-authored-by: Alan Guo <aguo@aguo.software> Signed-off-by: Yunxuan Xiao <xiaoyunxuan1998@gmail.com>
Signed-off-by: yunxuanx <yunxuanx@anyscale.com>
Signed-off-by: yunxuanx <yunxuanx@anyscale.com>
Signed-off-by: yunxuanx <yunxuanx@anyscale.com>
Signed-off-by: yunxuanx <yunxuanx@anyscale.com>
…state_on_finish_and_error
Signed-off-by: yunxuanx <yunxuanx@anyscale.com>
…state_on_finish_and_error
Signed-off-by: yunxuanx <yunxuanx@anyscale.com>
Signed-off-by: yunxuanx <yunxuanx@anyscale.com>
try: | ||
from ray.train._internal.state.schema import ActorStatusEnum, RunStatusEnum | ||
except ImportError: | ||
logger.exception( | ||
"Train is not installed. Please run `pip install ray[train]` " | ||
"when setting up Ray on your cluster." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this in this method since it's already checked at the start of get_train_runs
? Also if this ever does happen it'll just error out when trying to use these imports later in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
# If the controller died but the run status is not updated, | ||
# mark the train run as aborted | ||
controller_actor_status = actor_status_table.get( | ||
train_run.controller_actor_id, None | ||
) | ||
if ( | ||
controller_actor_status == ActorStatusEnum.DEAD | ||
and train_run.run_status == RunStatusEnum.STARTED | ||
): | ||
train_run.run_status = RunStatusEnum.ABORTED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Feels a bit messy to update the run status here. If we keep this here I'd at least rename the method to _add_actor_and_update_run_status
or add a docstring that documents this behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is doing some post processing to handle some abnormally terminated cases. Let me update the function name here.
# If the controller died but the run status is not updated, | ||
# mark the train run as aborted | ||
controller_actor_status = actor_status_table.get( | ||
train_run.controller_actor_id, None | ||
) | ||
if ( | ||
controller_actor_status == ActorStatusEnum.DEAD | ||
and train_run.run_status == RunStatusEnum.STARTED | ||
): | ||
train_run.run_status = RunStatusEnum.ABORTED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a status_detail
for this one?
ERRORED = "ERRORED" | ||
ABORTED = "ABORTED" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between ERRORED and ABORTED?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only mark it as ERRORED
when Trainer detected an error and actively report it to StateActor
.
We mark it as ABORTED
when the TrainRun abnormally failed (e.g. due to node failure) and we passively set it in TrainHead
.
status_detail: str = Field( | ||
description="Detailed information about the current run status, " | ||
"such as error messages." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of this? We only ever have one "User Error" message right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for tracking the error reason for now, and can be extend to track the details of current run status in the future(e.g. scaling up / down/ recovering when doing elastic training.
self.train_run_info_dict = dict( | ||
id=run_id, | ||
job_id=job_id, | ||
name=run_name, | ||
controller_actor_id=controller_actor_id, | ||
workers=worker_info_list, | ||
datasets=dataset_info_list, | ||
start_time_ms=start_time_ms, | ||
run_status=run_status, | ||
status_detail=status_detail, | ||
) | ||
train_run_info = TrainRunInfo(**self.train_run_info_dict) | ||
ray.get(self.state_actor.register_train_run.remote(train_run_info)) | ||
|
||
def update_train_run_info(self, updates: Dict[str, Any]) -> None: | ||
"""Update specific fields of a registered TrainRunInfo instance.""" | ||
self.train_run_info_dict.update(updates) | ||
train_run_info = TrainRunInfo(**self.train_run_info_dict) | ||
ray.get(self.state_actor.register_train_run.remote(train_run_info)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can cause problems if not careful in the future e.g. if somewhere we call update_train_run_info
with a different id
. One way to limit this is to restrict the public API and use a private method instead.
Something like this:
self.train_run_info_dict = dict( | |
id=run_id, | |
job_id=job_id, | |
name=run_name, | |
controller_actor_id=controller_actor_id, | |
workers=worker_info_list, | |
datasets=dataset_info_list, | |
start_time_ms=start_time_ms, | |
run_status=run_status, | |
status_detail=status_detail, | |
) | |
train_run_info = TrainRunInfo(**self.train_run_info_dict) | |
ray.get(self.state_actor.register_train_run.remote(train_run_info)) | |
def update_train_run_info(self, updates: Dict[str, Any]) -> None: | |
"""Update specific fields of a registered TrainRunInfo instance.""" | |
self.train_run_info_dict.update(updates) | |
train_run_info = TrainRunInfo(**self.train_run_info_dict) | |
ray.get(self.state_actor.register_train_run.remote(train_run_info)) | |
updates = dict( | |
id=run_id, | |
job_id=job_id, | |
name=run_name, | |
controller_actor_id=controller_actor_id, | |
workers=worker_info_list, | |
datasets=dataset_info_list, | |
start_time_ms=start_time_ms, | |
run_status=run_status, | |
status_detail=status_detail, | |
) | |
# you can first assert that the info dict is empty here I guess | |
_update_train_run_info(updates) | |
def end_train_run(self, run_status, status_detail, end_time_ms): | |
updates=dict( | |
run_status=run_status, | |
status_detail=status_detail, | |
end_time_ms=end_time_ms, | |
) | |
_update_train_run_info(updates) | |
def _update_train_run_info(self, updates: Dict[str, Any]) -> None: | |
"""Update specific fields of a registered TrainRunInfo instance.""" | |
self.train_run_info_dict.update(updates) | |
train_run_info = TrainRunInfo(**self.train_run_info_dict) | |
ray.get(self.state_actor.register_train_run.remote(train_run_info)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point! Let me update this
Signed-off-by: yunxuanx <yunxuanx@anyscale.com>
Signed-off-by: yunxuanx <yunxuanx@anyscale.com>
Signed-off-by: yunxuanx <yunxuanx@anyscale.com>
) | ||
|
||
ray.get(self.state_actor.register_train_run.remote(train_run_info)) | ||
# Clear the cached info to avoid registering the same run twice | ||
self.train_run_info_dict[run_id].clear() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this populate the dictionary on the first call? Since the first line in _update_train_run_info
will check if the run_id
is present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it will populate the dict. But yeah I think it's a bit implicit, let me update it to
self.train_run_info_dict[run_id] = {}
Why are these changes needed?
Added two entries for
TrainRunInfo
:STARTED
,FINISHED
,ERRORED
ALIVE
,DEAD
ALIVE
,DEAD
Update Train run status when training finished or errored.
Update controller_actor_status and worker status on demand while calling TrainHead endpoint.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.