In [11]:
from lightning_sdk import Machine, MMT, Studio

In [12]:
from utils.master_node import MasterNodeServer
private_master_host_ip_address = MasterNodeServer.get_master_ip()
public_master_host_ip_address = MasterNodeServer.get_master_public_ip_curl()
public_master_host_ip_address_services = MasterNodeServer.get_master_public_ip()
print(f"private_master_host_ip_address = {private_master_host_ip_address}")
print(f"public_master_host_ip_address = {public_master_host_ip_address}")
print(f"public_master_host_ip_address = {public_master_host_ip_address_services}")

private_master_host_ip_address = 10.192.10.43
public_master_host_ip_address = 34.201.107.243
public_master_host_ip_address = 34.201.107.243


In [5]:
# Configuration
import os
NUM_NODES = 2
NUM_CPUS = 2
NUM_GPUS = 8
NUM_PROCS = NUM_NODES * NUM_GPUS
TEAMSPACE = "general"  # Replace with your teamspace
USER = "meta-ai"  # Replace with your username
MONARCH_DEFAULT_PORT = 26600 # Monarch default port
HTTP_SERVER_PORT = MONARCH_DEFAULT_PORT # 8080 # HTTP Server PORT for IP registration
MMT_JOB_NAME = f"Monarch-v1-Titan-{NUM_NODES}_nodes-port_override"
os.environ["MONARCH_FILE_LOG"] = "debug"

In [6]:
def launch_mmt_job(num_nodes=2, teamspace="my-teamspace", user="my-user"):
    """
    Launch a multi-machine training job using Lightning SDK's MMT API.
    """

    studio = Studio()

    # Install the MMT plugin befor running the actual job
    studio.install_plugin("multi-machine-training")

    print(f"Launching MMT job with {num_nodes} nodes...")

    # Machine with CPUs
    # machine_type = getattr(Machine, f"CPU_X_{NUM_CPUS}")

    # Machine with T4 GPUs
    # machine_type = getattr(Machine, f"T4_X_{NUM_GPUS}")

     # Machine with L4 GPUs
    # machine_type = getattr(Machine, f"L4_X_{NUM_GPUS}")

    # Machine with L40S GPUs
    machine_type = getattr(Machine, f"L40S_X_{NUM_GPUS}")

    job = MMT.run(
        command="process_allocator",
        # command=f"tail -f /dev/null",
        name=MMT_JOB_NAME,
        machine=machine_type,
        studio=studio,
        num_machines=num_nodes,
        env={
            "CUDA_VISIBLE_DEVICES": "0,1,2,3,4,5,6,7",  # Make all GPUs visible # TODO: Should make this one dynamic
            "MONARCH_FILE_LOG": "debug",
            "HYPERACTOR_REMOTE_ALLOC_ALLOWED_PORT_RANGE": "26601-26610",
            "HYPERACTOR_REMOTE_ALLOC_BIND_TO_INADDR_ANY": "true",
        },
    )

    print(f"Job started with ID: {job.name}")
    print(f"Job status: {job.status}")

    # Monitor job status
    return job, studio

In [7]:
# Launch the job
job, studio = launch_mmt_job(
    num_nodes=NUM_NODES, teamspace=TEAMSPACE, user=USER
)

print(f"Job launched. You can monitor it using: job.status")
print(f"To stop the job: job.stop()")
print(f"To clean up: studio.stop()")

Launching MMT job with 2 nodes...


INFO - Multi-Machine Job was successfully launched. View it at https://lightning.ai/meta-ai/general/jobs/Monarch-v1-Titan-2_nodes-port_override-437zt?app_id=mmt


Job started with ID: Monarch-v1-Titan-2_nodes-port_override-437zt
Job status: Pending
Job launched. You can monitor it using: job.status
To stop the job: job.stop()
To clean up: studio.stop()


In [13]:
ip_addresses_list = [machine.public_ip for machine in job.machines]
ip_addresses_set = set(ip_addresses_list)
print(f"{ip_addresses_list=}")
print(f"{ip_addresses_set=}")
ips_available = not ip_addresses_set == {''}
print(f"IP addresses are available: {ips_available}")

ip_addresses_list=['3.150.40.243', '18.189.125.53']
ip_addresses_set={'18.189.125.53', '3.150.40.243'}
IP addresses are available: True


In [19]:
if ips_available:
    tcp_addresses = [f"tcp!{ip}:{MONARCH_DEFAULT_PORT}" for ip in ip_addresses_set]
    print(tcp_addresses)
else:
    raise ValueError("IPs are not available yet!")

['tcp!18.189.125.53:26600', 'tcp!3.150.40.243:26600']


In [20]:
import os
from monarch._src.actor.allocator import RemoteAllocator, StaticRemoteAllocInitializer
# from monarch._rust_bindings.monarch_hyperactor.alloc import AllocConstraints, AllocSpec
# from monarch.actor import ProcMesh
# tcp_addresses = ['tcp!3.21.117.93:26600', 'tcp!18.220.66.230:26600']

os.environ["HYPERACTOR_REMOTE_ALLOC_ALLOWED_PORT_RANGE"] = "26600-26610"
os.environ["HYPERACTOR_REMOTE_ALLOC_BOOTSTRAP_ADDR"] = f"tcp!{public_master_host_ip_address}:0"
os.environ["HYPERACTOR_REMOTE_ALLOC_BIND_TO_INADDR_ANY"] = "true"
os.environ["MONARCH_HOST_MESH_V1_REMOVE_ME_BEFORE_RELEASE"] = "1"

allocator = RemoteAllocator(
        world_id="foo",
        initializer=StaticRemoteAllocInitializer(*tcp_addresses),
    )

print(allocator)
print(os.environ["HYPERACTOR_REMOTE_ALLOC_BOOTSTRAP_ADDR"])

<monarch._src.actor.allocator.RemoteAllocator object at 0x79c87419ad50>
tcp!34.201.107.243:0


In [21]:
from monarch.actor import HostMesh
from monarch._rust_bindings.monarch_hyperactor.shape import Extent

host_mesh = HostMesh.allocate_nonblocking(
        "hostmeshtest",
        extent=Extent(["hosts", "procs"], [NUM_NODES, NUM_PROCS]),
        allocator=allocator,
    )
proc_mesh = host_mesh.spawn_procs({"gpus": NUM_GPUS})

In [22]:
import getpass
def get_job_name(num_hosts: int, num_gpus_per_host: int):
    return f"monarch-{getpass.getuser()}-hosts{num_hosts}-gpus{num_gpus_per_host}"
print(get_job_name(num_hosts=NUM_NODES, num_gpus_per_host=NUM_GPUS))

monarch-alisol-hosts2-gpus8


In [23]:
import sys
import logging
from monarch.actor import ProcMesh, Actor, endpoint, current_rank
import socket
from torchtitan.tools.logging import logger
from torchtitan.train import Trainer
from typing import Optional
import torch
from torchtitan.config import JobConfig


class TitanTrainerWrapper(Actor):
    def __init__(self, job_config: JobConfig):
        self.rank = current_rank().rank
        self.job_config = job_config

    def _rprint(self, msg):
        """Helper method to print with rank information."""
        print(f"{self.rank=} {msg}")

    @endpoint
    def init(self):
        logging.getLogger().addHandler(logging.StreamHandler(sys.stderr))
        print(f"Initializing actor: {self.rank} {current_rank()=} {socket.gethostname()=}")


    @endpoint
    def train(self):
        logger.info("Starting training")
        config = self.job_config
        trainer: Optional[Trainer] = None

        try:
            trainer = Trainer(config)
            trainer.train()

            if config.checkpoint.create_seed_checkpoint:
                assert (
                    int(os.environ["WORLD_SIZE"]) == 1
                ), "Must create seed checkpoint using a single device, to disable sharding."
                assert (
                    # config.checkpoint.enable_checkpoint
                    config.checkpoint.enable
                ), "Must enable checkpointing when creating a seed checkpoint."
                trainer.checkpointer.save(curr_step=0, )
                logger.info("Created seed checkpoint")
            else:
                trainer.train()
        finally:
            if trainer:
                trainer.close()

            if torch.distributed.is_initialized():
                torch.distributed.destroy_process_group()
                logger.info("Process group destroyed.")
        print("Done training")

In [24]:
from torch.xpu import stream
from torchtitan.config import JobConfig
from monarch.utils import setup_env_for_distributed

async def async_main(job_config: JobConfig):
    torch.use_deterministic_algorithms(True)
    job_name = get_job_name(NUM_NODES, NUM_GPUS)

    await setup_env_for_distributed(proc_mesh, )

    await proc_mesh.logging_option(stream_to_client=True, aggregate_window_sec=3)

    print(job_config)
    print(f"Spawning meshes on {job_name}")

    # trainer_actor = await proc_mesh.spawn("trainer_actor", TitanTrainerWrapper, job_config)
    trainer_actor = proc_mesh.spawn_procs("trainer_actor", TitanTrainerWrapper, job_config)

    await trainer_actor.init.call()
    await trainer_actor.train.call()

In [None]:
from torchtitan.config import ConfigManager
from torchtitan.tools.logging import init_logger
init_logger()
config_manager = ConfigManager()

job_name = get_job_name(NUM_NODES, NUM_GPUS)

manual_args = [
        "--job.config_file",
        os.path.expanduser("/teamspace/studios/this_studio/torchtitan/torchtitan/models/llama3/train_configs/llama3_8b.toml"),
        "--model.tokenizer-path",
        # f"{FUSE_DST}/Llama-3.1-8B",
        "/teamspace/studios/this_studio/torchtitan/assets/hf/Llama-3.1-8B",
        "--training.steps",
        "25",
        "--training.dataset_path",
        # f"{FUSE_DST}/c4",
        "/teamspace/studios/this_studio/torchtitan/tests/assets/c4_test",
        "--job.dump_folder",
        # f"{FUSE_DST}/outputs/" + job_name,
        "/teamspace/studios/this_studio/torchtitan/outputs/" + job_name,
        "--training.seq_len",
        "1024",
        # "8192",
    ]
config = config_manager.parse_args(manual_args)
await async_main(config)



In [None]:
from monarch.job import SlurmJob, JobTrait