From e4b0114fafd86ffeeeeaff2a25db8416526bf083 Mon Sep 17 00:00:00 2001 From: Sam Lurye Date: Fri, 24 Oct 2025 16:18:19 -0700 Subject: [PATCH] [monarch] Fix race condition during ProcMesh stop Await `ProcMesh._proc_mesh` before flushing logging manager so that we can be sure the logging client has been spawned and avoid throwing an assertion error. Differential Revision: [D85472096](https://our.internmc.facebook.com/intern/diff/D85472096/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D85472096/)! [ghstack-poisoned] --- python/monarch/_src/actor/v1/proc_mesh.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/python/monarch/_src/actor/v1/proc_mesh.py b/python/monarch/_src/actor/v1/proc_mesh.py index 00b56f37d..3d75c04c0 100644 --- a/python/monarch/_src/actor/v1/proc_mesh.py +++ b/python/monarch/_src/actor/v1/proc_mesh.py @@ -258,6 +258,9 @@ async def task( await pm._logging_manager.init(hy_proc_mesh, stream_log_to_client) + # If the user has passed the setup lambda, we need to call + # it here before any of the other python actors are spawned so + # that the environment variables are set up before cuda init. if setup_actor is not None: await setup_actor.setup.call() @@ -267,9 +270,9 @@ async def task( if setup is not None: from monarch._src.actor.proc_mesh import SetupActor # noqa - # If the user has passed the setup lambda, we need to call - # it here before any of the other actors are spawned so that - # the environment variables are set up before cuda init. + # The SetupActor needs to be spawned outside of `task` for now, + # since spawning a python actor requires a blocking call to + # pickle the proc mesh, and we can't do that from the tokio runtime. setup_actor = pm._spawn_nonblocking_on( hy_proc_mesh, "setup", SetupActor, setup ) @@ -402,8 +405,9 @@ def stop(self) -> Future[None]: instance = context().actor_instance._as_rust() async def _stop_nonblocking(instance: HyInstance) -> None: + pm = await self._proc_mesh await PythonTask.spawn_blocking(lambda: self._logging_manager.flush()) - await (await self._proc_mesh).stop_nonblocking(instance) + await pm.stop_nonblocking(instance) self._stopped = True return Future(coro=_stop_nonblocking(instance))