Skip to content

Commit

Permalink
[tune] remove get_alive_node_ips.
Browse files Browse the repository at this point in the history
Will add another test that kills node and see if the failure is caught.
  • Loading branch information
xwjiang2010 committed Jan 12, 2022
1 parent e9787a5 commit b11bf18
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 97 deletions.
32 changes: 0 additions & 32 deletions python/ray/tune/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,38 +632,6 @@ def get_running_trials(self) -> List[Trial]:
"""Returns the running trials."""
return list(self._running.values())

def get_alive_node_ips(self):
now = time.time()
if now - self._last_ip_refresh < self._refresh_period:
return self._last_ip_addresses
logger.debug("Checking ips from Ray state.")
self._last_ip_refresh = now
nodes = ray.state.nodes()
ip_addresses = set()
for node in nodes:
if node["alive"]:
ip_addresses.add(node["NodeManagerAddress"])
self._last_ip_addresses = ip_addresses
return ip_addresses

def get_current_trial_ips(self):
return {t.node_ip for t in self.get_running_trials()}

def get_next_failed_trial(self) -> Optional[Trial]:
"""Gets the first trial found to be running on a node presumed dead.
Returns:
A Trial object that is ready for failure processing. None if
no failure detected.
"""
if ray.worker._mode() != ray.worker.LOCAL_MODE:
live_cluster_ips = self.get_alive_node_ips()
if live_cluster_ips - self.get_current_trial_ips():
for trial in self.get_running_trials():
if trial.node_ip and trial.node_ip not in live_cluster_ips:
return trial
return None

def get_next_available_trial(
self, timeout: Optional[float] = None) -> Optional[Trial]:
if not self._running:
Expand Down
7 changes: 0 additions & 7 deletions python/ray/tune/tests/test_multinode_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,6 @@ def testAutoscalingNewNode(self):
When new nodes are added to the cluster while a Tune job is running,
trials get scheduled on them immediately. However, Tune only
updates its internal list of available nodes every 10 seconds.
The new trial is thus detected as living on a stale node and
errored out.
We fixed thiws behavior by disabling this check. This test
previously failed, and we add it now to make sure that autoscaling
works as expected.
"""
self.cluster.update_config({
"provider": {
Expand Down
106 changes: 48 additions & 58 deletions python/ray/tune/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -817,65 +817,55 @@ def _get_next_trial(self):
return trial

def _process_events(self, timeout: Optional[float] = None):
with warn_if_slow("get_next_failed_trial"):
failed_trial = self.trial_executor.get_next_failed_trial()
if failed_trial:
error_msg = (
"{} (IP: {}) detected as stale. This is likely because the "
"node was lost").format(failed_trial, failed_trial.node_ip)
logger.info(error_msg)
with warn_if_slow("process_failed_trial"):
self._process_trial_failure(failed_trial, error_msg=error_msg)
else:
# TODO(ujvl): Consider combining get_next_available_trial and
# fetch_result functionality so that we don't timeout on fetch.
trial = self.trial_executor.get_next_available_trial(
timeout=timeout) # blocking
if not trial:
return
if trial.is_restoring:
with warn_if_slow("process_trial_restore"):
self._process_trial_restore(trial)
with warn_if_slow("callbacks.on_trial_restore"):
self._callbacks.on_trial_restore(
iteration=self._iteration,
trials=self._trials,
trial=trial)
elif trial.is_saving:
with warn_if_slow("process_trial_save") as _profile:
self._process_trial_save(trial)
with warn_if_slow("callbacks.on_trial_save"):
self._callbacks.on_trial_save(
iteration=self._iteration,
trials=self._trials,
trial=trial)
if _profile.too_slow and trial.sync_on_checkpoint:
# TODO(ujvl): Suggest using cloud checkpointing once
# API has converged.

msg = (
"Consider turning off forced head-worker trial "
"checkpoint syncs by setting sync_on_checkpoint=False"
". Note that this may result in faulty trial "
"restoration if a failure occurs while the checkpoint "
"is being synced from the worker to the head node.")

if trial.location.hostname and (trial.location.hostname !=
get_node_ip_address()):
if log_once("tune_head_worker_checkpoint"):
logger.warning(msg)
# TODO(ujvl): Consider combining get_next_available_trial and
# fetch_result functionality so that we don't timeout on fetch.
trial = self.trial_executor.get_next_available_trial(
timeout=timeout) # blocking
if not trial:
return
if trial.is_restoring:
with warn_if_slow("process_trial_restore"):
self._process_trial_restore(trial)
with warn_if_slow("callbacks.on_trial_restore"):
self._callbacks.on_trial_restore(
iteration=self._iteration,
trials=self._trials,
trial=trial)
elif trial.is_saving:
with warn_if_slow("process_trial_save") as _profile:
self._process_trial_save(trial)
with warn_if_slow("callbacks.on_trial_save"):
self._callbacks.on_trial_save(
iteration=self._iteration,
trials=self._trials,
trial=trial)
if _profile.too_slow and trial.sync_on_checkpoint:
# TODO(ujvl): Suggest using cloud checkpointing once
# API has converged.

msg = (
"Consider turning off forced head-worker trial "
"checkpoint syncs by setting sync_on_checkpoint=False"
". Note that this may result in faulty trial "
"restoration if a failure occurs while the checkpoint "
"is being synced from the worker to the head node.")

if trial.location.hostname and (trial.location.hostname !=
get_node_ip_address()):
if log_once("tune_head_worker_checkpoint"):
logger.warning(msg)

else:
with warn_if_slow("process_trial"):
self._process_trial(trial)

# `self._queued_trial_decisions` now contains a final decision
# based on all results
if trial not in self._cached_trial_decisions:
final_decision = self._queued_trial_decisions.pop(
trial.trial_id, None)
if final_decision:
self._execute_action(trial, final_decision)
else:
with warn_if_slow("process_trial"):
self._process_trial(trial)

# `self._queued_trial_decisions` now contains a final decision
# based on all results
if trial not in self._cached_trial_decisions:
final_decision = self._queued_trial_decisions.pop(
trial.trial_id, None)
if final_decision:
self._execute_action(trial, final_decision)

def _process_trial(self, trial):
"""Processes a trial result.
Expand Down

0 comments on commit b11bf18

Please sign in to comment.