From 005aa3067f60fe6e037353708682eb148abd64ed Mon Sep 17 00:00:00 2001 From: Allen Wang <9057208+allenwang28@users.noreply.github.com> Date: Tue, 14 Oct 2025 15:09:34 +0000 Subject: [PATCH 1/4] remove hostmesh v0 --- apps/grpo/main.py | 30 +++++++---------- src/forge/controller/launcher.py | 20 +++++------ src/forge/controller/provisioner.py | 43 +++++++++--------------- src/forge/env.py | 15 ++++++++- src/forge/observability/metric_actors.py | 8 ++--- 5 files changed, 53 insertions(+), 63 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 6439ead85..10b110ee6 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -327,11 +327,6 @@ async def main(cfg: DictConfig): mlogger = await get_or_create_metric_logger() await mlogger.init_backends.call_one(metric_logging_cfg) - # In the host mesh v0 case, actors on remote hosts are not able to communicate - # with one another. Therefore we use the controller as our storage volume. - if not MONARCH_HOSTMESH_V1.get_value(): - await ts.initialize(strategy=ts.ControllerStorageVolumes()) - print("Torchstore successfully initialized with controller storage strategy") # ---- Setup services ---- # @@ -364,21 +359,19 @@ async def main(cfg: DictConfig): print("All services initialized successfully!") shutdown_event = asyncio.Event() - # In the HostMesh v1 case, we spawn a torchstore storage volume - # per trainer process. + # Here we spawn a torchstore storage volume per trainer process. # We initialize after service initialization because torchstore currently # requires access to the underlying proc meshes in the local rank strategy. # We should be able to hide this in the future. - if MONARCH_HOSTMESH_V1.get_value(): - # TODO: support multiple host meshes - trainer_num_procs = cfg.actors.trainer["procs"] - trainer_host_mesh_name = cfg.actors.trainer["mesh_name"] - trainer_hosts = provisioner.get_host_mesh(trainer_host_mesh_name) - await ts.initialize( - mesh=trainer_hosts.spawn_procs(per_host={"procs": trainer_num_procs}), - strategy=ts.LocalRankStrategy(), - ) - print("Torchstore successfully initialized with local rank strategy") + # TODO: support multiple host meshes + trainer_num_procs = cfg.actors.trainer["procs"] + trainer_host_mesh_name = cfg.actors.trainer["mesh_name"] + trainer_hosts = provisioner.get_host_mesh(trainer_host_mesh_name) + await ts.initialize( + mesh=trainer_hosts.spawn_procs(per_host={"procs": trainer_num_procs}), + strategy=ts.LocalRankStrategy(), + ) + print("Torchstore successfully initialized with local rank strategy") # ---- Core RL loops ---- # async def continuous_rollouts(): @@ -556,6 +549,9 @@ async def continuous_training(): if __name__ == "__main__": + # This is temporary measure, setting the correct environment variable to + # enable host mesh V1 in Monarch's APIs. + MONARCH_HOSTMESH_V1.override_with_default() @parse def _main(cfg): diff --git a/src/forge/controller/launcher.py b/src/forge/controller/launcher.py index 96f9b42af..65ece6597 100644 --- a/src/forge/controller/launcher.py +++ b/src/forge/controller/launcher.py @@ -26,8 +26,6 @@ from monarch.tools.components import hyperactor from monarch.tools.config import Config, Workspace -from forge.env import MONARCH_HOSTMESH_V1 - from forge.types import Launcher, LauncherConfig _MAST_AVAILABLE = False @@ -120,11 +118,10 @@ async def remote_setup(self, procs: ProcMesh) -> None: class Slurmlauncher(BaseLauncher): async def initialize(self) -> None: - if MONARCH_HOSTMESH_V1.get_value(): - # HostMeshV1 currently requires explicit configuration - # of the underlying transport from client to mesh. - # This can be removed in the future once this has been removed. - configure(default_transport=ChannelTransport.Tcp) + # HostMesh currently requires explicit configuration + # of the underlying transport from client to mesh. + # This can be removed in the future once this has been removed. + configure(default_transport=ChannelTransport.Tcp) async def get_allocator(self, name: str, num_hosts: int) -> tuple[Any, Any, str]: appdef = hyperactor.host_mesh( @@ -180,11 +177,10 @@ def __init__(self, cfg: LauncherConfig | None = None): self.job_name = self.cfg.job_name or self.create_job_name() async def initialize(self) -> None: - if MONARCH_HOSTMESH_V1.get_value(): - # HostMeshV1 currently requires explicit configuration - # of the underlying transport from client to mesh. - # This can be removed in the future once this has been removed. - configure(default_transport=ChannelTransport.MetaTlsWithHostname) + # HostMesh currently requires explicit configuration + # of the underlying transport from client to mesh. + # This can be removed in the future once this has been removed. + configure(default_transport=ChannelTransport.MetaTlsWithHostname) await self.launch_mast_job() diff --git a/src/forge/controller/provisioner.py b/src/forge/controller/provisioner.py index 1bb340328..ff284fcc1 100644 --- a/src/forge/controller/provisioner.py +++ b/src/forge/controller/provisioner.py @@ -22,20 +22,13 @@ from forge.env import all_env_vars, FORGE_DISABLE_METRICS, MONARCH_HOSTMESH_V1 +from monarch.actor import HostMesh, this_host from forge.types import ProcessConfig, ProvisionerConfig logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -if MONARCH_HOSTMESH_V1.get_value(): - from monarch._src.actor.v1.host_mesh import HostMesh, this_host - - logger.info("Using Monarch HostMesh v1...") -else: - from monarch.actor import HostMesh, this_host - - def _get_port() -> str: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("localhost", 0)) @@ -159,27 +152,20 @@ async def create_host_mesh(self, name: str, num_hosts: int) -> HostMesh: name, num_hosts ) - if MONARCH_HOSTMESH_V1.get_value(): - # We are asking Monarch to allocate a single process on - # every host, reflected in the Extent we provide below. + # We are asking Monarch to allocate a single process on + # every host, reflected in the Extent we provide below. - # Technically, this is ["hosts", "procs"] but to reduce - # confusion on its relationship with procs elsewhere, - # we call it "no_dim". + # Technically, this is ["hosts", "procs"] but to reduce + # confusion on its relationship with procs elsewhere, + # we call it "no_dim". - # TODO - remove this once Monarch supports HostMesh without it. - host_mesh = HostMesh.allocate_nonblocking( - name=name, - extent=Extent(["hosts", "no_dim"], [num_hosts, 1]), - allocator=alloc, - alloc_constraints=alloc_constraints, - ) - else: - host_mesh = HostMesh( - Shape(["hosts"], NDSlice.new_row_major([num_hosts])), - allocator=alloc, - alloc_constraints=alloc_constraints, - ) + # TODO - remove this once Monarch supports HostMesh without it. + host_mesh = HostMesh.allocate_nonblocking( + name=name, + extent=Extent(["hosts", "no_dim"], [num_hosts, 1]), + allocator=alloc, + alloc_constraints=alloc_constraints, + ) return host_mesh, server_name def get_host_mesh(self, name: str) -> HostMesh: @@ -364,6 +350,9 @@ async def shutdown(self): async def init_provisioner(cfg: ProvisionerConfig | None = None): + # This is temporary measure, setting the correct environment variable to + # enable host mesh V1 in Monarch's APIs. + MONARCH_HOSTMESH_V1.override_with_default() global _provisioner if not _provisioner: _provisioner = Provisioner(cfg) diff --git a/src/forge/env.py b/src/forge/env.py index cdb08936a..86b0f36d4 100644 --- a/src/forge/env.py +++ b/src/forge/env.py @@ -9,6 +9,10 @@ import os from dataclasses import dataclass from typing import Any +import logging + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) @dataclass @@ -49,6 +53,15 @@ def get_value(self) -> Any: # Return as string for other types return value + def override_with_default(self): + """Override the environment value set with its default value.""" + original_value = os.environ.get(self.name, None) + if not original_value: + logging.info(f"Setting {self.name} to its default value {self.default}") + else: + logging.info(f"Overriding {self.name} from {original_value} to its default value {self.default}.") + os.environ[self.name] = str(self.default) + # Environment variable definitions DISABLE_PERF_METRICS = EnvVar( @@ -101,7 +114,7 @@ def get_value(self) -> Any: MONARCH_HOSTMESH_V1 = EnvVar( name="MONARCH_HOST_MESH_V1_REMOVE_ME_BEFORE_RELEASE", - default=False, + default=1, description="Whether or not to use Monarch's experimental hostmesh v1 APIs", ) diff --git a/src/forge/observability/metric_actors.py b/src/forge/observability/metric_actors.py index fae11556f..17dac2870 100644 --- a/src/forge/observability/metric_actors.py +++ b/src/forge/observability/metric_actors.py @@ -10,7 +10,7 @@ from monarch.actor import Actor, endpoint, ProcMesh -from forge.env import FORGE_DISABLE_METRICS, MONARCH_HOSTMESH_V1 +from forge.env import FORGE_DISABLE_METRICS from forge.observability.metrics import ( BackendRole, get_logger_backend_class, @@ -19,11 +19,7 @@ reduce_metrics_states, ) -if MONARCH_HOSTMESH_V1.get_value(): - from monarch._src.actor.v1.host_mesh import this_proc - from monarch._src.actor.v1.proc_mesh import get_or_spawn_controller -else: - from monarch.actor import get_or_spawn_controller, this_proc +from monarch.actor import get_or_spawn_controller, this_proc logger = logging.getLogger(__name__) From eb5c125d63c300c0f75f0c629a569523464de7e1 Mon Sep 17 00:00:00 2001 From: Allen Wang <9057208+allenwang28@users.noreply.github.com> Date: Tue, 14 Oct 2025 11:24:31 -0700 Subject: [PATCH 2/4] updates --- apps/grpo/main.py | 5 ----- scripts/install.sh | 3 +++ src/forge/controller/provisioner.py | 12 ++++-------- src/forge/env.py | 19 ------------------- 4 files changed, 7 insertions(+), 32 deletions(-) diff --git a/apps/grpo/main.py b/apps/grpo/main.py index 10b110ee6..ff46fea20 100644 --- a/apps/grpo/main.py +++ b/apps/grpo/main.py @@ -28,7 +28,6 @@ from forge.controller.actor import ForgeActor from forge.controller.provisioner import init_provisioner, shutdown from forge.data.rewards import MathReward, ThinkingReward -from forge.env import MONARCH_HOSTMESH_V1 from forge.observability.metric_actors import get_or_create_metric_logger from forge.observability.metrics import record_metric, Reduce from forge.observability.perf_tracker import Tracer @@ -327,7 +326,6 @@ async def main(cfg: DictConfig): mlogger = await get_or_create_metric_logger() await mlogger.init_backends.call_one(metric_logging_cfg) - # ---- Setup services ---- # ( @@ -549,9 +547,6 @@ async def continuous_training(): if __name__ == "__main__": - # This is temporary measure, setting the correct environment variable to - # enable host mesh V1 in Monarch's APIs. - MONARCH_HOSTMESH_V1.override_with_default() @parse def _main(cfg): diff --git a/scripts/install.sh b/scripts/install.sh index 324373938..30dcf1ff5 100755 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -309,6 +309,9 @@ export PATH="${CUDA_HOME}/bin:$PATH" export CUDA_INCLUDE_DIRS=$CUDA_HOME/include export CUDA_CUDART_LIBRARY=$CUDA_HOME/lib64/libcudart.so +# Temporary measure until this environment variable is removed +export MONARCH_HOST_MESH_V1_REMOVE_ME_BEFORE_RELEASE=1 + # Add only CUDA compat libs to LD_LIBRARY_PATH (safe for system tools) if [ -n "${LD_LIBRARY_PATH:-}" ]; then export LD_LIBRARY_PATH="/usr/local/cuda-${CUDA_VERSION}/compat:${LD_LIBRARY_PATH}" diff --git a/src/forge/controller/provisioner.py b/src/forge/controller/provisioner.py index ff284fcc1..8f5a77f41 100644 --- a/src/forge/controller/provisioner.py +++ b/src/forge/controller/provisioner.py @@ -13,16 +13,15 @@ import socket import uuid -from monarch._src.actor.shape import Extent, NDSlice, Shape -from monarch.actor import Actor, endpoint, ProcMesh +from monarch._src.actor.shape import Extent + +from monarch.actor import Actor, endpoint, HostMesh, ProcMesh, this_host from monarch.tools import commands from forge.controller.launcher import BaseLauncher, get_launcher -from forge.env import all_env_vars, FORGE_DISABLE_METRICS, MONARCH_HOSTMESH_V1 - -from monarch.actor import HostMesh, this_host +from forge.env import all_env_vars, FORGE_DISABLE_METRICS from forge.types import ProcessConfig, ProvisionerConfig logger = logging.getLogger(__name__) @@ -350,9 +349,6 @@ async def shutdown(self): async def init_provisioner(cfg: ProvisionerConfig | None = None): - # This is temporary measure, setting the correct environment variable to - # enable host mesh V1 in Monarch's APIs. - MONARCH_HOSTMESH_V1.override_with_default() global _provisioner if not _provisioner: _provisioner = Provisioner(cfg) diff --git a/src/forge/env.py b/src/forge/env.py index 86b0f36d4..1478909da 100644 --- a/src/forge/env.py +++ b/src/forge/env.py @@ -9,10 +9,6 @@ import os from dataclasses import dataclass from typing import Any -import logging - -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) @dataclass @@ -53,15 +49,6 @@ def get_value(self) -> Any: # Return as string for other types return value - def override_with_default(self): - """Override the environment value set with its default value.""" - original_value = os.environ.get(self.name, None) - if not original_value: - logging.info(f"Setting {self.name} to its default value {self.default}") - else: - logging.info(f"Overriding {self.name} from {original_value} to its default value {self.default}.") - os.environ[self.name] = str(self.default) - # Environment variable definitions DISABLE_PERF_METRICS = EnvVar( @@ -112,12 +99,6 @@ def override_with_default(self): description="Sets the maximum frame length for Monarch's actor message delivery in bytes.", ) -MONARCH_HOSTMESH_V1 = EnvVar( - name="MONARCH_HOST_MESH_V1_REMOVE_ME_BEFORE_RELEASE", - default=1, - description="Whether or not to use Monarch's experimental hostmesh v1 APIs", -) - TORCHSTORE_USE_RDMA = EnvVar( name="TORCHSTORE_RDMA_ENABLED", default=0, From 003570c1c1490d35b344404933a4251166e5423a Mon Sep 17 00:00:00 2001 From: Allen Wang <9057208+allenwang28@users.noreply.github.com> Date: Tue, 14 Oct 2025 12:30:36 -0700 Subject: [PATCH 3/4] lint --- src/forge/observability/metric_actors.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/forge/observability/metric_actors.py b/src/forge/observability/metric_actors.py index 17dac2870..2f9addfe6 100644 --- a/src/forge/observability/metric_actors.py +++ b/src/forge/observability/metric_actors.py @@ -8,9 +8,9 @@ import logging from typing import Any, Union -from monarch.actor import Actor, endpoint, ProcMesh +from monarch.actor import Actor, endpoint, get_or_spawn_controller, ProcMesh, this_proc -from forge.env import FORGE_DISABLE_METRICS +from forge.env import FORGE_DISABLE_METRICS from forge.observability.metrics import ( BackendRole, get_logger_backend_class, @@ -19,8 +19,6 @@ reduce_metrics_states, ) -from monarch.actor import get_or_spawn_controller, this_proc - logger = logging.getLogger(__name__) From d095a41d8194780977e40455d56ba66538835bed Mon Sep 17 00:00:00 2001 From: Allen Wang <9057208+allenwang28@users.noreply.github.com> Date: Tue, 14 Oct 2025 12:31:12 -0700 Subject: [PATCH 4/4] split into two, lint --- scripts/install.sh | 3 --- 1 file changed, 3 deletions(-) diff --git a/scripts/install.sh b/scripts/install.sh index 30dcf1ff5..324373938 100755 --- a/scripts/install.sh +++ b/scripts/install.sh @@ -309,9 +309,6 @@ export PATH="${CUDA_HOME}/bin:$PATH" export CUDA_INCLUDE_DIRS=$CUDA_HOME/include export CUDA_CUDART_LIBRARY=$CUDA_HOME/lib64/libcudart.so -# Temporary measure until this environment variable is removed -export MONARCH_HOST_MESH_V1_REMOVE_ME_BEFORE_RELEASE=1 - # Add only CUDA compat libs to LD_LIBRARY_PATH (safe for system tools) if [ -n "${LD_LIBRARY_PATH:-}" ]; then export LD_LIBRARY_PATH="/usr/local/cuda-${CUDA_VERSION}/compat:${LD_LIBRARY_PATH}"