In [2]:
# (c) Meta Platforms, Inc. and affiliates. Confidential and proprietary.

# @noautodeps
# pyre-ignore-all-errors

import argparse
import asyncio
import getpass
import json
import logging
import os
import pathlib
import sys
import math
import os
import socket

import cloudpickle
import torch
import torch.distributed as dist
import torch.nn.functional as F
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim

from monarch._rust_bindings.monarch_hyperactor.alloc import AllocConstraints, AllocSpec
from monarch.actor import ProcMesh
from monarch.tools import commands
from monarch.tools.components import hyperactor
from monarch.tools.config import Config, UnnamedAppDef
from monarch._src.actor.allocator import (
    RemoteAllocator,
    StaticRemoteAllocInitializer,
    TorchXRemoteAllocInitializer,
)
from monarch.actor import Actor, current_rank, current_size, endpoint
from monarch.actor import Actor, current_rank, endpoint, proc_mesh
from torch.nn.parallel import DistributedDataParallel as DDP
from example_actors.compute_world_size_actor import TestActor
from slurm.utils import get_appdef, get_server_info, create_proc_mesh


logging.basicConfig(
    level=logging.INFO,
    format="%(name)s %(asctime)s %(levelname)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    force=True,
)


logger: logging.Logger = logging.getLogger(__name__)


# TODO: Remove and replace with utils.setup_env_for_distributed once trunk is healthy.
def _find_free_port() -> int:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(("localhost", 0))
        addr = s.getsockname()
        port = addr[1]
        return port


class _TorchDistributedInitActor(Actor):
    def __init__(self) -> None:
        self.rank: int = current_rank().rank

    @endpoint
    def get_host_port(self) -> tuple[str, int]:
        return (socket.gethostname(), _find_free_port())

    @endpoint
    def setup_env(self, master_addr: str, master_port: int) -> None:
        cr = current_rank()
        # Assume last dimension is the local rank.
        last_label = cr.shape.labels[-1]
        local_world_size = cr.size(last_label)
        world_size = len(cr)
        global_rank = cr.rank
        local_rank = min(world_size, global_rank % local_world_size)
        group_rank = global_rank // local_world_size
        group_world_size = (world_size + local_world_size - 1) // local_world_size
        env = {
            "MASTER_ADDR": master_addr,
            "MASTER_PORT": str(master_port),
            "RANK": str(global_rank),
            "LOCAL_RANK": str(local_rank),
            "LOCAL_WORLD_SIZE": str(local_world_size),
            "GROUP_RANK": str(group_rank),
            "GROUP_WORLD_SIZE": str(group_world_size),
            "ROLE_RANK": str(global_rank),
            "ROLE_WORLD_SIZE": str(world_size),
            "ROLE_NAME": "rank",
            "WORLD_SIZE": str(world_size),
        }
        os.environ.update(env)


async def setup_env_for_distributed(
    proc_mesh: ProcMesh,
    master_addr: str | None = None,
    master_port: int | None = None,
) -> None:
    """
    Sets up environment variables for pytorch distributed.
    It selects a random proc in the proc_mesh to be the master node.
    It sets enviornment variables like RANK, LOCAL_RANK, WORLD_SIZE, etc.
    If master_addr and master_port are None, it will automatically select a master node and port.
    """
    assert (master_addr is None) == (
        master_port is None
    ), "Either both master_addr and master_port must be specified or neither must be specified."
    am = await proc_mesh.spawn("_TorchDistributedInitActor", _TorchDistributedInitActor)
    if master_addr is None:
        # We use call instead of call_one because call_one can't handle tuple return types.
        vm = await am.flatten("rank").slice(rank=0).get_host_port.call()
        master_addr, master_port = vm.item()
    assert master_port is not None, "master_port should not be None here."
    await am.setup_env.call(master_addr, master_port)


class ToyModel(nn.Module):
    """A simple toy model for demonstration purposes."""

    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 5)

    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))


class DDPActor(Actor):
    """This Actor wraps the basic functionality from Torch's DDP example.

    Conveniently, all of the methods we need are already laid out for us,
    so we can just wrap them in the usual Actor endpoint semantic with some
    light modifications.

    Adapted from: https://docs.pytorch.org/tutorials/intermediate/ddp_tutorial.html#basic-use-case
    """

    def __init__(self):
        self.rank = current_rank().rank

    def _rprint(self, msg):
        """Helper method to print with rank information."""
        print(f"{self.rank=} {msg}")

    @endpoint
    async def setup(self):
        """Initialize the PyTorch distributed process group."""
        self._rprint("Initializing torch distributed")

        WORLD_SIZE = int(os.environ["WORLD_SIZE"])
        # initialize the process group
        dist.init_process_group("gloo", rank=self.rank, world_size=WORLD_SIZE)
        self._rprint("Finished initializing torch distributed")

    @endpoint
    async def cleanup(self):
        """Clean up the PyTorch distributed process group."""
        self._rprint("Cleaning up torch distributed")
        dist.destroy_process_group()

    @endpoint
    async def demo_basic(self):
        """Run a basic DDP training example."""
        self._rprint("Running basic DDP example")

        # create model and move it to GPU with id rank
        local_rank = int(os.environ["LOCAL_RANK"])
        self._rprint(f"{local_rank=}")
        model = ToyModel().to(local_rank)
        ddp_model = DDP(model, device_ids=[local_rank])

        loss_fn = nn.MSELoss()
        optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

        optimizer.zero_grad()
        outputs = ddp_model(torch.randn(20, 10))
        labels = torch.randn(20, 5).to(local_rank)
        loss_fn(outputs, labels).backward()
        optimizer.step()

        print(f"{self.rank=} Finished running basic DDP example")


async def main():
    num_hosts = 2
    appdef = await get_appdef(num_hosts)
    server_info = await get_server_info(appdef)

    try:
        proc_mesh = await create_proc_mesh(num_hosts, appdef, server_info)

        ddp_actor = await proc_mesh.spawn("ddp_actor", DDPActor)

        await setup_env_for_distributed(proc_mesh)

        await ddp_actor.setup.call()
        await ddp_actor.demo_basic.call()
        await ddp_actor.cleanup.call()

        print("DDP example completed successfully!")

    finally:
        commands.kill(f"slurm:///{server_info.name}")


if __name__ == "__main__":
    # asyncio.run(main())
    await main()

torchx.schedulers.slurm_scheduler 2025-08-29 20:00:26 INFO unable to get job info for `monarch-ubuntu` with `squeue` (squeue: error: Invalid job id: monarch-ubuntu
), trying `sacct`
torchx.schedulers.slurm_scheduler 2025-08-29 20:00:26 INFO unable to get job info for `monarch-ubuntu` with `sacct` (sacct: fatal: Bad job/step specified: monarch-ubuntu
)
monarch.tools.commands 2025-08-29 20:00:26 INFO no existing RUNNING server `slurm:///monarch-ubuntu` creating new one...
torchx.runner.api 2025-08-29 20:00:26 INFO Tracker configurations: {}
torchx.runner.api 2025-08-29 20:00:26 INFO Checking for changes in workspace `/home/ubuntu/.monarch/out/tmpz8o72lub/workspace`...
torchx.runner.api 2025-08-29 20:00:26 INFO To disable workspaces pass: --workspace="" from CLI or workspace=None programmatically.
torchx.runner.api 2025-08-29 20:00:26 INFO Reusing original image `monarch_default_workspace:latest` for role[0]=mesh0. Either a patch was built or no changes to workspace was detected.
monarch.

Ahmad: {'requeue': None, 'ntasks-per-node': '1', 'cpus-per-task': '48', 'mem': '186777', 'gpus-per-task': '4', 'ntasks': '1'}
Ahmad: {'requeue': None, 'ntasks-per-node': '1', 'cpus-per-task': '48', 'mem': '186777', 'gpus-per-task': '4', 'ntasks': '1'}
Waiting for slurm:///392 to be RUNNING (current: PENDING); will check again in 5.0 seconds. Total wait time: 0:00:00.015990

slurm.utils 2025-08-29 20:00:31 INFO 
===== Server Info =====
{
  "name": "392",
  "server_handle": "slurm:///392",
  "state": "RUNNING",
  "meshes": {
    "mesh0": {
      "host_type": "__UNSET__",
      "hosts": 2,
      "gpus": -1,
      "hostnames": [
        "gpu-queue-st-gpu-compute-1",
        "gpu-queue-st-gpu-compute-2"
      ]
    }
  }
}
monarch._src.actor.allocator 2025-08-29 20:00:31 INFO no match label `procmesh.monarch.meta.com/name` specified in alloc constraints
monarch._src.actor.allocator 2025-08-29 20:00:31 INFO found a single proc mesh `mesh0` in slurm:///392, will allocate on it
monarch.tools.network 2025-08-29 20:00:31 INFO no AF_INET6 address that can bind TCP sockets for `gpu-queue-st-gpu-compute-1:26600` (error: [Errno -5] No address associated with hostname)
monarch.tools.network 2025-08-29 20:00:31 INFO resolved AF_INET address `10.0.2.236:26600` for `gpu-queue-st-gpu-compute-1:26600`
monarch.tools.network 2025-08-29 20:00:31 INFO no AF_INET6 address that ca

New job `slurm:///392` is ready to serve.


[-]E0829 20:00:31.750114  8536 hyperactor/src/channel/net.rs:620] session tcp:10.0.2.132:26600.1252649132718637493: failed while receiving ack: Connection reset by peer (os error 104)
[-]E0829 20:00:31.760992  8536 hyperactor/src/channel/net.rs:620] session tcp:10.0.2.236:26600.8446661579866624731: failed while receiving ack: Connection reset by peer (os error 104)
[-]E0829 20:00:31.803643  8536 hyperactor/src/channel/net.rs:620] session tcp:10.0.2.132:26600.1252649132718637493: failed while receiving ack: Connection reset by peer (os error 104)
[-]E0829 20:00:31.859766  8536 hyperactor/src/channel/net.rs:620] session tcp:10.0.2.132:26600.1252649132718637493: failed while receiving ack: Connection reset by peer (os error 104)
[-]E0829 20:00:31.867365  8536 hyperactor/src/channel/net.rs:620] session tcp:10.0.2.236:26600.8446661579866624731: failed while receiving ack: Connection reset by peer (os error 104)
[-]E0829 20:00:31.915078  8536 hyperactor/src/channel/net.rs:620] session tcp:10

Exception: no process has ever been allocated on 10.0.2.236 before the channel is closed; a common issue could be the channel was never established