Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Elastic failure handling #23

Merged
merged 69 commits into from Dec 14, 2020

Conversation

krfricke
Copy link
Collaborator

@krfricke krfricke commented Dec 7, 2020

Introduces elastic failure handling. Alive actors are not recreated on error, but stick around and don't have to load data again. We can also choose to not restart failed actors, continuing training with fewer actors.

Depends on #21.

@krfricke
Copy link
Collaborator Author

This should be ready for review @amogkam

@amogkam
Copy link
Contributor

amogkam commented Dec 10, 2020

Thanks I'll review it today

Copy link
Contributor

@amogkam amogkam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really cool feature and looks good to me overall! The main thing is that there are a lot of different concurrency components (multiprocessing, threading, asyncio.Event, rabit_tracker). I know you have some of this in the Google doc already but it would really help to add some comments here about what each component's responsibilities are and how they interact with each other.

xgboost_ray/util.py Show resolved Hide resolved
xgboost_ray/main.py Show resolved Hide resolved
xgboost_ray/main.py Outdated Show resolved Hide resolved
xgboost_ray/main.py Outdated Show resolved Hide resolved
xgboost_ray/main.py Outdated Show resolved Hide resolved
xgboost_ray/main.py Show resolved Hide resolved
xgboost_ray/main.py Show resolved Hide resolved
xgboost_ray/tests/test_fault_tolerance.py Outdated Show resolved Hide resolved
xgboost_ray/main.py Show resolved Hide resolved
xgboost_ray/tests/test_fault_tolerance.py Outdated Show resolved Hide resolved
@krfricke
Copy link
Collaborator Author

Thanks a bunch for the thorough review. I addressed all your comments and tried to answer your questions!

@richardliaw
Copy link
Collaborator

richardliaw commented Dec 11, 2020 via email

Copy link
Collaborator

@richardliaw richardliaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks pretty good overall! A couple comments

  1. The architecture/design could be better commented in the code. I left some comments at places that required a bit more digging.
  2. It'd be great to document this failure model in train(.
  3. We will want to do a bit of refactoring to consolidate failure handling responsibilities in core functions, but I don't think that's a blocker for this PR.

Please ping when you've addressed the remaining comments!

xgboost_ray/main.py Outdated Show resolved Hide resolved
xgboost_ray/main.py Outdated Show resolved Hide resolved
xgboost_ray/main.py Show resolved Hide resolved
xgboost_ray/main.py Outdated Show resolved Hide resolved
xgboost_ray/main.py Show resolved Hide resolved
Comment on lines +469 to +470
for i in range(len(actors)):
actor = actors[i]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for actor in actors?

Copy link
Collaborator Author

@krfricke krfricke Dec 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're overwriting the array item here:

        actors[i] = None

we need the index anyway, so I'll probably keep it as is

xgboost_ray/main.py Outdated Show resolved Hide resolved
# Maybe we got a new queue actor
wait_queue = [
actor.set_queue.remote(_queue) for actor in _actors
if actor is not None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can there actually be actor is None here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! After an actor failed its entry in _actors is set to None in the train() function. _train is then called again, so entries can be None

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, don't all the None entries get filled in L586 to 603?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only those that are in _failed_actor_ranks - and this set is empty if we continue with fewer actors. See previous comment below

xgboost_ray/main.py Show resolved Hide resolved
@richardliaw
Copy link
Collaborator

Actually, 1 question I have is --

when you have 4 actors on 4 nodes, and 1 of the nodes die, where do you check the number of cluster resources to resume with only 3 workers?

@krfricke
Copy link
Collaborator Author

krfricke commented Dec 13, 2020

Since we re-use the existing actors, we currently don't check the cluster resources at all - we just invoke the local train() function on the remaining actors.

Re-scheduling actors once resources are available again will be part of a separate PR. I don't want to make this bigger than it already is.

Thanks a lot for the review by the way! I addressed the comments and just added a bunch of in-code documentation for better understanding of the code.

Comment on lines +586 to +591
for i in list(_failed_actor_ranks):
if _actors[i] is not None:
raise RuntimeError(
f"Trying to create actor with rank {i}, but it already "
f"exists.")
actor = _create_actor(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question - from what I understand, failed actors will have their rank added to _failed_actor_ranks. When _train is called, don't these actors get recreated?

I'm just specifically looking for the situation where a node dies and new actor creation step is skipped

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's in line 848:

                 start_actor_ranks = set() 

here the set is cleared (I guess we could just .clear() it instead). With it being empty, no actors are restarted

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it.

@richardliaw
Copy link
Collaborator

Unfortunately, my cluster yaml (and snapshot) no longer works. One last comment/question about actor recovery -- I keep re-reading the recovery code and it seems like the actors are always restarted. Can you help explain the purpose/code flow of _failed_actor_ranks?

@krfricke
Copy link
Collaborator Author

I'll run a couple of tests on a cluster later and post the results here.

@krfricke
Copy link
Collaborator Author

krfricke commented Dec 14, 2020

Re: _failed_actor_ranks: This set collects the ranks of failed actors. These are restarted on a call to _train. Note that we have the same information in the _actors list, as we could just filter for None values - however, the difference here is that we want to be able to deliberately alter the _failed_actor_ranks in order to re-start only specific ranks.
For instance, for elastic training we currently clear the set, so no actors will be restarted. In future failure handling modes we could only start actors that have resources available. With this object we can thus specify exactly which ranks we want to start and which not.

In the train() function the set is called start_actor_ranks, as it semantically describes which ranks will be started by _train(). In _train() though it is called _failed_actor_ranks as, after starting actors, it collects the ranks of failed actors. Would it be more clear if we renamed _failed_actor_ranks to _start_actor_ranks in _train() as well?

I can also add more comments to this - e.g. annotate the internal variables passed to _train().

@krfricke
Copy link
Collaborator Author

Works fine in my tests. Here is the results log for killing 2 nodes:

base) root@ip-172-31-18-125:/release_tests# python benchmark_cpu_gpu.py 10 100 200
2020-12-14 10:42:37,239 INFO worker.py:651 -- Connecting to existing Ray cluster at address: 172.31.18.125:6379
2020-12-14 10:42:37,992 INFO main.py:620 -- [RayXGBoost] Created 10 new actors (10 total actors).
2020-12-14 10:42:40,938 INFO main.py:638 -- [RayXGBoost] Starting XGBoost training.
(pid=4107, ip=172.31.20.197) [10:42:40] task [xgboost.ray]:140004211236752 got new rank 2
(pid=4038, ip=172.31.25.131) [10:42:40] task [xgboost.ray]:140546107764624 got new rank 5
(pid=4005, ip=172.31.24.220) [10:42:40] task [xgboost.ray]:140103861936592 got new rank 4
(pid=4006, ip=172.31.29.248) [10:42:40] task [xgboost.ray]:140411510034704 got new rank 7
(pid=4037, ip=172.31.17.26) [10:42:40] task [xgboost.ray]:139927338557264 got new rank 0
(pid=3993, ip=172.31.20.30) [10:42:40] task [xgboost.ray]:140255472527504 got new rank 3
(pid=4074, ip=172.31.26.109) [10:42:40] task [xgboost.ray]:140038723952528 got new rank 6
(pid=3939, ip=172.31.30.64) [10:42:40] task [xgboost.ray]:140290278254992 got new rank 9
(pid=4107, ip=172.31.30.251) [10:42:40] task [xgboost.ray]:140133743529872 got new rank 8
(pid=4011, ip=172.31.19.132) [10:42:40] task [xgboost.ray]:140660999946128 got new rank 1
E1214 10:42:53.648838  3040  3055 task_manager.cc:323] Task failed: IOError: 2: user code caused exit: Type=ACTOR_TASK, Language=PYTHON, Resources: {}
, function_descriptor={type=PythonFunctionDescriptor, module_name=xgboost_ray.main, class_name=RayXGBoostActor, function_name=train, function_hash=},
task_id=019554c512c2b9b51bd49a5609000000, task_name=RayXGBoostActor.train(), job_id=09000000, num_args=10, num_returns=2, actor_task_spec={actor_id=1b
d49a5609000000, actor_caller_id=ffffffffffffffffffffffff09000000, actor_counter=4}
E1214 10:42:53.650756  3040  3055 task_manager.cc:323] Task failed: IOError: cancelling all pending tasks of dead actor: Type=ACTOR_TASK, Language=PYT
HON, Resources: {}, function_descriptor={type=PythonFunctionDescriptor, module_name=xgboost_ray.main, class_name=RayXGBoostActor, function_name=pid, f
unction_hash=}, task_id=b9fed017052010151bd49a5609000000, task_name=RayXGBoostActor.pid(), job_id=09000000, num_args=0, num_returns=2, actor_task_spec
={actor_id=1bd49a5609000000, actor_caller_id=ffffffffffffffffffffffff09000000, actor_counter=5}
2020-12-14 10:42:53,729 INFO main.py:489 -- Actor status: 9 alive, 1 dead (10 total)
2020-12-14 10:42:53,729 WARNING main.py:851 -- A Ray actor died during training. Trying to restart and continue training from last checkpoint (restart
 1 of 2). This will use 9 existing actors and start 0 new actors.Sleeping for 10 seconds for cleanup.
2020-12-14 10:43:03,750 INFO main.py:620 -- [RayXGBoost] Created 0 new actors (9 total actors).
2020-12-14 10:43:03,758 INFO main.py:638 -- [RayXGBoost] Starting XGBoost training.
(pid=4107, ip=172.31.20.197) [10:43:03] task [xgboost.ray]:140004211236752 got new rank 2
(pid=4038, ip=172.31.25.131) [10:43:03] task [xgboost.ray]:140546107764624 got new rank 5
(pid=4011, ip=172.31.19.132) [10:43:03] task [xgboost.ray]:140660999946128 got new rank 1
(pid=4037, ip=172.31.17.26) [10:43:03] task [xgboost.ray]:139927338557264 got new rank 0
(pid=4006, ip=172.31.29.248) [10:43:03] task [xgboost.ray]:140411510034704 got new rank 6
(pid=3993, ip=172.31.20.30) [10:43:03] task [xgboost.ray]:140255472527504 got new rank 3
(pid=4005, ip=172.31.24.220) [10:43:03] task [xgboost.ray]:140103861936592 got new rank 4
(pid=4107, ip=172.31.30.251) [10:43:03] task [xgboost.ray]:140133743529872 got new rank 7
(pid=3939, ip=172.31.30.64) [10:43:03] task [xgboost.ray]:140290278254992 got new rank 8
E1214 10:43:15.269685  3040  3055 task_manager.cc:323] Task failed: IOError: 2: user code caused exit: Type=ACTOR_TASK, Language=PYTHON, Resources: {}
, function_descriptor={type=PythonFunctionDescriptor, module_name=xgboost_ray.main, class_name=RayXGBoostActor, function_name=train, function_hash=},
task_id=aff64b4aedc094a1f90e593909000000, task_name=RayXGBoostActor.train(), job_id=09000000, num_args=12, num_returns=2, actor_task_spec={actor_id=f9
0e593909000000, actor_caller_id=ffffffffffffffffffffffff09000000, actor_counter=10}
E1214 10:43:15.271399  3040  3055 task_manager.cc:323] Task failed: IOError: cancelling all pending tasks of dead actor: Type=ACTOR_TASK, Language=PYT
HON, Resources: {}, function_descriptor={type=PythonFunctionDescriptor, module_name=xgboost_ray.main, class_name=RayXGBoostActor, function_name=pid, f
unction_hash=}, task_id=b55dd8e99d574a31f90e593909000000, task_name=RayXGBoostActor.pid(), job_id=09000000, num_args=0, num_returns=2, actor_task_spec
={actor_id=f90e593909000000, actor_caller_id=ffffffffffffffffffffffff09000000, actor_counter=11}
2020-12-14 10:43:15,367 INFO main.py:489 -- Actor status: 8 alive, 2 dead (10 total)
2020-12-14 10:43:15,367 WARNING main.py:851 -- A Ray actor died during training. Trying to restart and continue training from last checkpoint (restart
 2 of 2). This will use 8 existing actors and start 0 new actors.Sleeping for 10 seconds for cleanup.
2020-12-14 10:43:25,388 INFO main.py:620 -- [RayXGBoost] Created 0 new actors (8 total actors).
2020-12-14 10:43:25,394 INFO main.py:638 -- [RayXGBoost] Starting XGBoost training.
(pid=4006, ip=172.31.29.248) [10:43:25] task [xgboost.ray]:140411510034704 got new rank 5
(pid=3993, ip=172.31.20.30) [10:43:25] task [xgboost.ray]:140255472527504 got new rank 2
(pid=4005, ip=172.31.24.220) [10:43:25] task [xgboost.ray]:140103861936592 got new rank 3
(pid=3939, ip=172.31.30.64) [10:43:25] task [xgboost.ray]:140290278254992 got new rank 7
(pid=4107, ip=172.31.30.251) [10:43:25] task [xgboost.ray]:140133743529872 got new rank 6
(pid=4037, ip=172.31.17.26) [10:43:25] task [xgboost.ray]:139927338557264 got new rank 0
(pid=4011, ip=172.31.19.132) [10:43:25] task [xgboost.ray]:140660999946128 got new rank 1
(pid=4038, ip=172.31.25.131) [10:43:25] task [xgboost.ray]:140546107764624 got new rank 4
(pid=4074, ip=172.31.26.109) [10:42:40] task [xgboost.ray]:140038723952528 got new rank 6
(pid=4074, ip=172.31.26.109) 2020-12-14 10:42:53,643    ERROR worker.py:384 -- SystemExit was raised from the worker
(pid=4074, ip=172.31.26.109) Traceback (most recent call last):
(pid=4074, ip=172.31.26.109)   File "python/ray/_raylet.pyx", line 551, in ray._raylet.task_execution_handler
(pid=4074, ip=172.31.26.109)   File "python/ray/_raylet.pyx", line 438, in ray._raylet.execute_task
(pid=4074, ip=172.31.26.109)   File "python/ray/_raylet.pyx", line 477, in ray._raylet.execute_task
(pid=4074, ip=172.31.26.109)   File "python/ray/_raylet.pyx", line 481, in ray._raylet.execute_task
(pid=4074, ip=172.31.26.109)   File "python/ray/_raylet.pyx", line 482, in ray._raylet.execute_task
(pid=4074, ip=172.31.26.109)   File "python/ray/_raylet.pyx", line 436, in ray._raylet.execute_task.function_executor
(pid=4074, ip=172.31.26.109)   File "/root/anaconda3/lib/python3.7/site-packages/ray/function_manager.py", line 553, in actor_method_executor
(pid=4074, ip=172.31.26.109)     return method(actor, *args, **kwargs)
(pid=4074, ip=172.31.26.109)   File "/root/anaconda3/lib/python3.7/site-packages/xgboost_ray/main.py", line 390, in train
(pid=4074, ip=172.31.26.109)     time.sleep(0.1)
(pid=4074, ip=172.31.26.109)   File "/root/anaconda3/lib/python3.7/site-packages/ray/worker.py", line 381, in sigterm_handler
(pid=4074, ip=172.31.26.109)     sys.exit(1)
(pid=4074, ip=172.31.26.109) SystemExit: 1
(pid=4107, ip=172.31.20.197) [10:42:40] task [xgboost.ray]:140004211236752 got new rank 2
(pid=4107, ip=172.31.20.197) [10:43:03] task [xgboost.ray]:140004211236752 got new rank 2
(pid=4107, ip=172.31.20.197) 2020-12-14 10:43:15,245    ERROR worker.py:384 -- SystemExit was raised from the worker
(pid=4107, ip=172.31.20.197) Traceback (most recent call last):
(pid=4107, ip=172.31.20.197)   File "python/ray/_raylet.pyx", line 551, in ray._raylet.task_execution_handler
(pid=4107, ip=172.31.20.197)   File "python/ray/_raylet.pyx", line 438, in ray._raylet.execute_task
(pid=4107, ip=172.31.20.197)   File "python/ray/_raylet.pyx", line 477, in ray._raylet.execute_task
(pid=4107, ip=172.31.20.197)   File "python/ray/_raylet.pyx", line 481, in ray._raylet.execute_task
(pid=4107, ip=172.31.20.197)   File "python/ray/_raylet.pyx", line 482, in ray._raylet.execute_task
(pid=4107, ip=172.31.20.197)   File "python/ray/_raylet.pyx", line 436, in ray._raylet.execute_task.function_executor
(pid=4107, ip=172.31.20.197)   File "/root/anaconda3/lib/python3.7/site-packages/ray/function_manager.py", line 553, in actor_method_executor
(pid=4107, ip=172.31.20.197)     return method(actor, *args, **kwargs)
(pid=4107, ip=172.31.20.197)   File "/root/anaconda3/lib/python3.7/site-packages/xgboost_ray/main.py", line 390, in train
(pid=4107, ip=172.31.20.197)     time.sleep(0.1)
(pid=4107, ip=172.31.20.197)   File "/root/anaconda3/lib/python3.7/site-packages/ray/worker.py", line 381, in sigterm_handler
(pid=4107, ip=172.31.20.197)     sys.exit(1)
(pid=4107, ip=172.31.20.197) SystemExit: 1
2020-12-14 10:45:12,824 INFO main.py:731 -- [RayXGBoost] Finished XGBoost training on training data with total N=160,000,000.
TRAIN TIME TAKEN: 155.57 seconds
Final training error: 0.4951
TOTAL TIME TAKEN: 155.58 seconds (0.03 for init)
(base) root@ip-172-31-18-125:/release_tests#

Note that the last two errors are sent to the driver some time after the nodes died.

I'll push some cosmetic changes, but other than I think we should be ready to merge.

@krfricke krfricke mentioned this pull request Dec 14, 2020
@richardliaw
Copy link
Collaborator

nice!

@richardliaw richardliaw merged commit 403262b into ray-project:master Dec 14, 2020
@krfricke krfricke deleted the failure-handling-polling branch December 14, 2020 20:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants