Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RLlib] Add on_workers_recreated callback to Algorithm. #40354

Merged

Conversation

sven1977
Copy link
Contributor

@sven1977 sven1977 commented Oct 15, 2023

Add on_workers_recreated callback to DefaultCallbacks to be used in Algorithm.

Why are these changes needed?

Sometimes, when a local- or remote worker is restarted, users would like to be able to immediately send some information to these workers (e.g. the global steps sampled from the Algorithm's counters). Therefore, we added this new callback event.

The new on_workers_recreated() callback is triggered after the Algorithm has checked, whether any workers (training ones under Algorithm.workers and evaluation ones under Algorithm.evaluation_workers) have failed, via the WorkerSet.restore_workers() API.

Here is the docstring of the new callback method. Note the need to use the .apply() method when dealing with a remote worker, as the callback is called from the local algorithm process and NOT by the worker process itself:

    @OverrideToImplementCustomLogic
    def on_workers_recreated(
        self,
        *,
        algorithm: "Algorithm",
        worker_set: "WorkerSet",
        worker_ids: List[int],
        is_evaluation: bool,
        **kwargs,
    ) -> None:
        """Callback run after one or more workers have been recreated.

        You can access (and change) the worker(s) in question via the following code
        snippet inside your custom override of this method:

        Note that any "worker" inside the algorithm's `self.worker` and
        `self.evaluation_workers` WorkerSets are instances of a subclass of EnvRunner.

        .. code-block:: python
            from ray.rllib.algorithms.callbacks import DefaultCallbacks

            class MyCallbacks(DefaultCallbacks):
                def on_workers_recreated(
                    self,
                    *,
                    algorithm,
                    worker_set,
                    worker_ids,
                    is_evaluation,
                    **kwargs,
                ):
                    # Define what you would like to do on the recreated
                    # workers:
                    def func(w):
                        # Here, we just set some arbitrary property to 1.
                        if is_evaluation:
                            w._custom_property_for_evaluation = 1
                        else:
                            w._custom_property_for_training = 1

                    # Use the `foreach_workers` method of the worker set and
                    # only loop through those worker IDs that have been restarted.
                    # Note that we set `local_worker=False` to NOT include it (local
                    # workers are never recreated; if they fail, the entire Algorithm
                    # fails).
                    worker_set.foreach_worker(
                        func,
                        remote_worker_ids=worker_ids,
                        local_worker=False,
                    )

        Args:
            algorithm: Reference to the Algorithm instance.
            worker_set: The WorkerSet object in which the workers in question reside.
                You can use a `worker_set.foreach_worker(remote_worker_ids=...,
                local_worker=False)` method call to execute custom
                code on the recreated (remote) workers. Note that the local worker is
                never recreated as a failure of this would also crash the Algorithm.
            worker_ids: The list of (remote) worker IDs that have been recreated.
            is_evaluation: Whether `worker_set` is the evaluation WorkerSet (located
                in `Algorithm.evaluation_workers`) or not.
        """
        pass

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
…llback' into issue64_add_on_worker_created_callback
Signed-off-by: sven1977 <svenmika1977@gmail.com>
@sven1977 sven1977 changed the title [RLlib] Add on_worker_created callback to Algorithm/WorkerSet. [RLlib] Add on_workers_recreated callback to Algorithm. Oct 17, 2023
Note that any "worker" inside the algorithm's `self.worker` and
`self.evaluation_workers` WorkerSets are instances of a subclass of EnvRunner.

.. code-block:: python
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use code-blocks anymore, we use testcode and testoutput these days!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no! :D Will fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

for id_ in worker_ids_2:
# A newly created worker: It's recreated counter should be 1.
if id_ not in original_worker_ids:
self.assertTrue(algo._counters[f"worker_{id_}_recreated"] == 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this not become flakey in case a worker gets created >1 times? We iterate three times so i'd expect that after each iteration, workers might be recreated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine a recreated worker gets a new ID, hence us checking, which are the new healthy worker IDs right before this loop.

# TODO(jungong) : switch to True once Algorithm is migrated.
healthy_only=False,
local_worker: bool = True,
healthy_only: bool = True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just out of curiosity: Why can we switch to True and what migration was Jun's comment directed at?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good one :) I think now that Algorithm uses the ActorManager (within WorkerSet), we can switch the default behavior to only operate on the healthy workers. Which makes sense: You wouldn't want to "ping" an already failed worker with an apply() request as this one would most certainly also fail or - even worse - cause a timeout or hang.

@sven1977
Copy link
Contributor Author

All fixed, please take another look. Thanks a ton @ArturNiederfahrenhorst !

Signed-off-by: sven1977 <svenmika1977@gmail.com>
…llback' into issue64_add_on_worker_created_callback
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: Sven Mika <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
…llback' into issue64_add_on_worker_created_callback

# Conflicts:
#	rllib/algorithms/callbacks.py
#	rllib/algorithms/tests/test_callbacks.py
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
Signed-off-by: sven1977 <svenmika1977@gmail.com>
rllib/BUILD Outdated Show resolved Hide resolved
Signed-off-by: Sven Mika <sven@anyscale.io>
@sven1977 sven1977 merged commit 23eb7f7 into ray-project:master Oct 20, 2023
18 of 27 checks passed
can-anyscale added a commit that referenced this pull request Oct 24, 2023
This is a pick of part of #40354 that disable rllib:test_a3c tests in release branch. This particular test has been very flaky that it is blocking release branch as well.

Signed-off-by: can <can@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants