Skip to content

Conversation

@ntlm1686
Copy link
Contributor

@ntlm1686 ntlm1686 commented Aug 2, 2022

Two features for Elastic Distributed Training are added to job launched by TorchX on Ray Cluster in this PR:

  1. Fault Tolerance - Node failure throws RayActorError which can be captured. Placement Groups have built-in fault tolerance, and can recover from node failure automatically.
  2. Elasticity - the execution of placement groups are pending tasks that will be scheduled by GCS when resources become availiable.

The logic of the new ray_driver.py is in the plot below:

Drawing 2022-08-02 15 51 39 excalidraw

Test plan:

[Note]: This PR is related to the previous PR #559. All future changes will be submitted to this PR.

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Aug 2, 2022
Copy link
Member

@d4l3k d4l3k left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getting into a nicer shape :) I like how you used different return values as state machine inputs


name: str
image: str
nnodes_rep: Optional[str] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: min_replicas: Optional[int] = None

self.reschedule_actor(failed_actor_id)


def parse_nnodes_rep(actors: List[RayActor]) -> Tuple[int, int]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: would be nice to get rid of this parsing by passing in min_replicas instead of rep

return min_nnodes, max_nnodes


def parse_actor_id_from_error(err: RayActorError) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big fan of this but not sure if there's any better approach

Copy link
Contributor Author

@ntlm1686 ntlm1686 Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems actor_id is the only thing useful we get from the exception without changing ray's code. Only error messages. https://github.com/ray-project/ray/blob/4c5c5763efff50bf9f76ae73a8c0073183dbb0cc/python/ray/exceptions.py#L233

except RayActorError as err:
# reschedule the failed command actor (node failure)
command_actors_count -= 1 # remove the failed actor
failed_actor_id: str = parse_actor_id_from_error(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of parsing can we use a map between the ObjectRef to the actor info? Seems like the ObjectRefs returned from wait should be equal to the ones passed in

https://github.com/ray-project/ray/blob/43aa2299e6623c8f8c7c4a1b80133459d0aa68b0/python/ray/includes/object_ref.pxi#L38

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked, but I think there isn't actor related information. https://github.com/ray-project/ray/blob/4c5c5763efff50bf9f76ae73a8c0073183dbb0cc/python/ray/includes/object_ref.pxi#L38

But we can create some issues about this one and the exception one.

break # exit
else:
raise RuntimeError(
"Ray actor returns unkown type. This is most likely bug in torchx"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit spelling


def init_placement_groups(self) -> None:
"""Initialize all placement groups needed for this job"""
replica_ix_of_pg: List[int] = [0] + list(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be better to explicitly make the calls here. + we need to wait for the first placement group before creating the remainders otherwise the smaller groups might get scheduled first (unless ray does fifo queuing, which if it does we should document that)

initial = create_placement_group_async(self.replicas[0: self.min_nodes])
pg.wait(itimeout_seconds=...)
self.groups = []
for range(self.min_nodes, self.max_nodes):
    self.groups.append(create_placement_group_async(self.replicas[i:i+1]))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ray doesn't do FIFO, will change.

if (
command_actors_count == 0
): # all the command actors have finished
break # exit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only breaks the inner loop -- is there a case where active_tasks >0 but command_actors_count == 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, active_tasks may contain some actors which haven't been scheduled successfully in a placement group, even after the job has finished. I guess I have to use return here.

return self.actor_info_of_id.pop(actor_id)

def reschedule_actor(self, actor_id: str) -> None:
"""Rescheule a failed actor"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should check the role max_retries/retry_policy here and throw an error if any of the workers exits more than N times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, here if we add a max_retries, it actually means how many node failures we can endure, so it's different from the max_retries in the ray actor's context.

Copy link
Contributor Author

@ntlm1686 ntlm1686 Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as the number of working nodes is bigger than min_nnodes, we should allow infinite node failures, unless nodes are failing too frequently. That can be a retry_policy.

with self.assertRaisesRegex(
Exception, "RAY_ADDRESS env variable is expected"
):
self._scheduler.list()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to add some "state machine" mock tests that runs through that driver loop step by step

need_more_actors: bool = True # if need more actors
command_actors_count: int = 0 # number of created command actors
# Await return result of remote ray function and initialize new command actors
while len(self.active_tasks) > 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we split loop out from run so it has a "_step()" and "run()" method so we can manually step through the behavior from a mocked unit test?

@atinsood
Copy link

atinsood commented Aug 3, 2022

@ljjsalt @d4l3k

I have been thinking more about this and I am wondering if ray.util.queue is a better way of implementing this.

you basically create 2 actors, PlacementGroupManager actor and CommandManager actor and exchange information between them using ray queue.

PGManager group actor is responsible for creating the placement group and then putting a message in the queue for the CommandManager actor to process which can then create the command actor. this helps us keep the knowledge of pg creation and command actor creation compartmentalized in these actors.

additionally, we can @Remote these fns so both of these can be run in parallel

https://docs.ray.io/en/releases-1.2.0/advanced.html#message-passing-using-ray-queue

@ntlm1686
Copy link
Contributor Author

ntlm1686 commented Aug 3, 2022

@ljjsalt @d4l3k

I have been thinking more about this and I am wondering if ray.util.queue is a better way of implementing this.

you basically create 2 actors, PlacementGroupManager actor and CommandManager actor and exchange information between them using ray queue.

PGManager group actor is responsible for creating the placement group and then putting a message in the queue for the CommandManager actor to process which can then create the command actor. this helps us keep the knowledge of pg creation and command actor creation compartmentalized in these actors.

additionally, we can @Remote these fns so both of these can be run in parallel

https://docs.ray.io/en/releases-1.2.0/advanced.html#message-passing-using-ray-queue

@atinsood

The reason that we should use Placement Group is: if we only use ray actors here, even though we can rerun the failed actor after it throws a RayActorError. But this job could potentially lose the computation resource it used to have, then fail the job even the node is recovered.

For example, there are 3 nodes each with 1 cpu, job 1 requires minimum nodes of 3, and it’s running with all 3 nodes, another job 2 launched later and it requires 1 cpu, and becomes a pending task. Once a node failure happens, we have to rerun the actor which becomes a pending task after job 2, and job 2 will take that node, job 1 will fail since there aren't enough nodes to restart.

Since placement groups rescheduling have the highest priority, this situation won't happen.

I didn't get it why to use queue, since we didn't use any inter process communication here.

I understand your concern in using ray.wait here. So I wrapped the return value of command(remote) actors with RayResult class. If anything unexpected happen, it throws an error, we will know right away. But based on the logic that ray.wait supposed to work, I think it's right to use it here.

Besides, considering each command actor is a finite state machine that has four states(SCHEDULING, FAILING, RUNNING, TASK_COMPLETED).
For each command actor, it starts with SCHEDULING state, and actor.schedule.remote() becomes an asynchronous step function. When it's SCHEDULING, the next state is RUNNING. When it's RUNNING, the next state is TASK_COMPLETED or FAILING. When it's FAILING, the next state is SCHEDULING.

@atinsood
Copy link

atinsood commented Aug 3, 2022

The reason that we should use Placement Group is

yeah, not disagreeing on that. I am just thinking how do we manage the interaction between pg creation and command actor creation

@ray.remote
class PlacementGroupManager(object):
    def __init__(self, min_nodes, max_nodes, queue):
        self.queue = queue
        pass

    def run() -> PlacementGroup:
        # step 1. create a PG with min replicas

        #put the initial pg in the queue
        self.queue.put(ready)

        # step 2. while loop to keep create the rest of PGs incrementatlly,

        #go through the rest of the pgs one by one and keep adding them to the queue
        #we can deal with the logic of pg failure here or when to stop pg creation if needed


@ray.remote
class CommandActorManager(object):
    def __init__(self, queue, active_workers):
        self.queue = queue
        self.active_workers = []
        pass

    def run(self):
        #some logic on when to stop the while loop, either a poison pill or a signal actor
        while True:
            self.queue.get() #get notified that the pg was created
            # logic to create the command actor and goes here

            active_workers.append(command_actor.exec_module.remote()) # or may be use another queue, pretty sure there is a better way to deal with this

            #once you are done reading all the PGs from the queue or if the queue 





def main() -> None:  # pragma: no cover
    actors: List[RayActor] = load_actor_json("actors.json")
    # pyre-fixme[16]: Module `worker` has no attribute `init`.
    ray.init(address="auto", namespace="torchx-ray")
    q = Queue() # we can set the set of the queue upfront
    pg_manager = PlacementGroupManager(2,4,q) #min, max and queue
    pg_manager.run.remote()
    cmd_actor_manager = CommandActorManager(q, active_workers)
    cmd_actor_manager.run.remote()
    # Await return result of remote ray function
    while len(active_workers) > 0:
        _logger.info(f"running ray.wait on {active_workers}")

        # pyre-fixme[16]: Module `worker` has no attribute `wait`.
        completed_workers, active_workers = ray.wait(active_workers)
        # If a failure occurs the ObjectRef will be marked as completed.
        # Calling ray.get will expose the failure as a RayActorError.
        for object_ref in completed_workers:
            ray.get(object_ref)


I was thinking something like this, this is not the correct code, but more of a thought process on how to do interaction between the logic that is creating placement groups and the logic that is creating command actors and how to keep them compartmentalized

@ntlm1686
Copy link
Contributor Author

ntlm1686 commented Aug 3, 2022

@ljjsalt @d4l3k

I have been thinking more about this and I am wondering if ray.util.queue is a better way of implementing this.

you basically create 2 actors, PlacementGroupManager actor and CommandManager actor and exchange information between them using ray queue.

PGManager group actor is responsible for creating the placement group and then putting a message in the queue for the CommandManager actor to process which can then create the command actor. this helps us keep the knowledge of pg creation and command actor creation compartmentalized in these actors.

additionally, we can @Remote these fns so both of these can be run in parallel

https://docs.ray.io/en/releases-1.2.0/advanced.html#message-passing-using-ray-queue

        # step 2. while loop to keep create the rest of PGs incrementatlly,

        #go through the rest of the pgs one by one and keep adding them to the queue
        #we can deal with the logic of pg failure here or when to stop pg creation if needed

Actually, this step is not necessary. First, increasing PG one by one can be a problem once the number of nodes is large.
Screen Shot 2022-08-03 at 12 31 14 AM
Please read the description of the X axis, it only makes the time that a PG creation event added to GCS pending queue longer.

The second change: instead of creating the command actor after a placement group has been scheduled, we create all the command actors at the beginning too(with SCHEDULING state).

@codecov
Copy link

codecov bot commented Aug 3, 2022

Codecov Report

Merging #572 (3228b9c) into main (b051e3f) will increase coverage by 0.15%.
The diff coverage is 98.41%.

@@            Coverage Diff             @@
##             main     #572      +/-   ##
==========================================
+ Coverage   94.85%   95.00%   +0.15%     
==========================================
  Files          66       66              
  Lines        4042     4144     +102     
==========================================
+ Hits         3834     3937     +103     
+ Misses        208      207       -1     
Impacted Files Coverage Δ
torchx/schedulers/ray_scheduler.py 95.26% <ø> (ø)
torchx/schedulers/ray/ray_driver.py 97.10% <98.07%> (+1.26%) ⬆️
torchx/components/dist.py 96.42% <100.00%> (+7.06%) ⬆️
torchx/schedulers/ray/ray_common.py 100.00% <100.00%> (ø)
torchx/specs/api.py 98.40% <100.00%> (+<0.01%) ⬆️
torchx/runner/api.py 96.85% <0.00%> (+0.01%) ⬆️

📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more

@d4l3k
Copy link
Member

d4l3k commented Aug 3, 2022

If we did want to separate the concerns here with scheduling vs retries we could use the builtin task max_retries config though that has some other implications.

I'm not sure that using a queue with two actors would simplify this logic -- state machines that you can step through are nice from a testing perspective

@ntlm1686
Copy link
Contributor Author

ntlm1686 commented Aug 4, 2022

If we did want to separate the concerns here with scheduling vs retries we could use the builtin task max_retries config though that has some other implications.

I'm not sure that using a queue with two actors would simplify this logic -- state machines that you can step through are nice from a testing perspective

But we are just running schedule(which cannot go wrong) and exec_module functions(which cannot go wrong unless script fails), it it necessary to use max_retries here? Of course, the feature can be easily added.

@ntlm1686 ntlm1686 requested a review from d4l3k August 4, 2022 15:16
@ntlm1686 ntlm1686 changed the title [Ray] Add Elasticity and Fault tolerance features to jobs launched on ray cluster [Ray] Add elasticity and fault tolerance features to jobs launched on ray cluster Aug 4, 2022
facebook-github-bot pushed a commit that referenced this pull request Aug 23, 2022
Summary:
Elasticity - the execution of placement groups are pending tasks that will be scheduled by GCS when resources become available.

Related PR: #572

Pull Request resolved: #580

Test Plan: Mock cluster scaling with `ray.cluster_utils`.

Reviewed By: priyaramani

Differential Revision: D38838786

Pulled By: d4l3k

fbshipit-source-id: b27073fd6ad4822c121e07de729b839f6cf6291a
@d4l3k d4l3k closed this Aug 23, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants