From ed31e712fd43d03e7b97957a7741e58547d70727 Mon Sep 17 00:00:00 2001 From: Colin Taylor Date: Fri, 7 Nov 2025 12:49:23 -0800 Subject: [PATCH] allow process allocator to take 1 dimension only (#1569) Summary: P1995406113 allow process allocator to take 1 dimension only updated based on mariusae feedback to use "So we should just make that explicit, and expose two different MAST allocators (the only difference is how they interpret the extent). But at least then we have clear and well-defined behavior." Reviewed By: mariusae Differential Revision: D84844137 --- hyperactor_mesh/benches/main.rs | 2 + hyperactor_mesh/examples/sieve.rs | 1 + hyperactor_mesh/src/actor_mesh.rs | 29 +- hyperactor_mesh/src/alloc.rs | 32 ++ hyperactor_mesh/src/alloc/process.rs | 1 + hyperactor_mesh/src/alloc/remoteprocess.rs | 303 +++++++++++++----- hyperactor_mesh/src/alloc/sim.rs | 1 + hyperactor_mesh/src/bootstrap.rs | 1 + hyperactor_mesh/src/comm.rs | 2 + hyperactor_mesh/src/proc_mesh.rs | 4 + hyperactor_mesh/src/reference.rs | 2 + hyperactor_mesh/src/v1/testing.rs | 5 + .../test/hyperactor_mesh_proxy_test.rs | 2 + .../process_allocator_test_bin.rs | 1 + hyperactor_mesh/test/remote_process_alloc.rs | 1 + monarch_hyperactor/src/alloc.rs | 1 + .../src/bin/process_allocator/common.rs | 4 + monarch_hyperactor/src/code_sync/manager.rs | 1 + monarch_hyperactor/src/code_sync/rsync.rs | 1 + monarch_hyperactor/src/proc_mesh.rs | 3 + .../tests/code_sync/auto_reload.rs | 1 + .../cuda_ping_pong/src/cuda_ping_pong.rs | 2 + .../parameter_server/src/parameter_server.rs | 2 + monarch_rdma/src/test_utils.rs | 2 + python/monarch/_src/job/meta.py | 13 +- 25 files changed, 326 insertions(+), 91 deletions(-) diff --git a/hyperactor_mesh/benches/main.rs b/hyperactor_mesh/benches/main.rs index 499f0da50..a56e9c0e1 100644 --- a/hyperactor_mesh/benches/main.rs +++ b/hyperactor_mesh/benches/main.rs @@ -48,6 +48,7 @@ fn bench_actor_scaling(c: &mut Criterion) { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -147,6 +148,7 @@ fn bench_actor_mesh_message_sizes(c: &mut Criterion) { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); diff --git a/hyperactor_mesh/examples/sieve.rs b/hyperactor_mesh/examples/sieve.rs index c0f9c8742..fa7da5502 100644 --- a/hyperactor_mesh/examples/sieve.rs +++ b/hyperactor_mesh/examples/sieve.rs @@ -112,6 +112,7 @@ async fn main() -> Result { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await?; diff --git a/hyperactor_mesh/src/actor_mesh.rs b/hyperactor_mesh/src/actor_mesh.rs index f7d003774..071cccc5e 100644 --- a/hyperactor_mesh/src/actor_mesh.rs +++ b/hyperactor_mesh/src/actor_mesh.rs @@ -783,6 +783,7 @@ pub(crate) mod test_util { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -888,7 +889,8 @@ mod tests { extent: extent! { replica = 1 }, constraints: Default::default(), proc_name: None, - transport: default_transport() + transport: default_transport(), + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -913,7 +915,8 @@ mod tests { extent: extent!(replica = 4), constraints: Default::default(), proc_name: None, - transport: default_transport() + transport: default_transport(), + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -942,6 +945,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: default_transport(), + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -980,6 +984,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: default_transport(), + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1026,6 +1031,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: default_transport(), + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1070,6 +1076,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: default_transport(), + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1104,6 +1111,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: default_transport(), + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1153,6 +1161,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: default_transport(), + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1185,6 +1194,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: default_transport(), + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1216,6 +1226,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: default_transport(), + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1282,6 +1293,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1352,6 +1364,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1421,6 +1434,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1535,6 +1549,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1620,6 +1635,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1783,7 +1799,8 @@ mod tests { extent, constraints: Default::default(), proc_name: None, - transport: ChannelTransport::Local + transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), })) .unwrap(); let instance = runtime.block_on(crate::v1::testing::instance()); @@ -1816,7 +1833,8 @@ mod tests { extent: extent.clone(), constraints: Default::default(), proc_name: None, - transport: ChannelTransport::Local + transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), })) .unwrap(); let instance = runtime.block_on(crate::v1::testing::instance()); @@ -1890,7 +1908,8 @@ mod tests { extent, constraints: Default::default(), proc_name: None, - transport: ChannelTransport::Local + transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), })) .unwrap(); let instance = runtime.block_on(crate::v1::testing::instance()); diff --git a/hyperactor_mesh/src/alloc.rs b/hyperactor_mesh/src/alloc.rs index 3676beaeb..b13848748 100644 --- a/hyperactor_mesh/src/alloc.rs +++ b/hyperactor_mesh/src/alloc.rs @@ -133,6 +133,27 @@ pub struct AllocConstraints { pub match_labels: HashMap, } +/// Specifies how to interpret the extent dimensions for allocation. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] +pub enum ProcAllocationMode { + /// Proc-level allocation: splits extent to allocate multiple processes per host. + /// Requires at least 2 dimensions (e.g., [hosts: N, gpus: M]). + /// Splits by second-to-last dimension, creating N regions with M processes each. + /// Used by MastAllocator. + ProcLevel, + /// Host-level allocation: each point in the extent is a host (no sub-host splitting). + /// For extent!(region = 2, host = 4), create 8 regions, each representing 1 host. + /// Used by MastHostAllocator. + HostLevel, +} + +impl Default for ProcAllocationMode { + fn default() -> Self { + // Default to ProcLevel for backward compatibility + Self::ProcLevel + } +} + /// A specification (desired state) of an alloc. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AllocSpec { @@ -151,6 +172,15 @@ pub struct AllocSpec { /// The transport to use for the procs in this alloc. pub transport: ChannelTransport, + + /// Specifies how to interpret the extent dimensions for allocation. + /// Defaults to ProcLevel for backward compatibility. + #[serde(default = "default_proc_allocation_mode")] + pub proc_allocation_mode: ProcAllocationMode, +} + +fn default_proc_allocation_mode() -> ProcAllocationMode { + ProcAllocationMode::ProcLevel } /// The core allocator trait, implemented by all allocators. @@ -767,6 +797,7 @@ pub(crate) mod testing { constraints: Default::default(), proc_name: None, transport: default_transport(), + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -919,6 +950,7 @@ pub(crate) mod testing { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await .unwrap(); diff --git a/hyperactor_mesh/src/alloc/process.rs b/hyperactor_mesh/src/alloc/process.rs index b56d3df0a..5ec79150c 100644 --- a/hyperactor_mesh/src/alloc/process.rs +++ b/hyperactor_mesh/src/alloc/process.rs @@ -710,6 +710,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await .unwrap(); diff --git a/hyperactor_mesh/src/alloc/remoteprocess.rs b/hyperactor_mesh/src/alloc/remoteprocess.rs index a49868755..317ac2709 100644 --- a/hyperactor_mesh/src/alloc/remoteprocess.rs +++ b/hyperactor_mesh/src/alloc/remoteprocess.rs @@ -41,6 +41,7 @@ use hyperactor::reference::Reference; use hyperactor::serde_json; use mockall::automock; use ndslice::Region; +use ndslice::Slice; use ndslice::View; use ndslice::ViewExt; use ndslice::view::Extent; @@ -249,6 +250,7 @@ impl RemoteProcessAllocator { constraints, proc_name: None, // TODO(meriksen, direct addressing): we need to pass the addressing mode here transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }; match process_allocator.allocate(spec.clone()).await { @@ -761,96 +763,230 @@ impl RemoteProcessAlloc { let hostnames: Vec<_> = hosts.iter().map(|e| e.hostname.clone()).collect(); tracing::info!("obtained {} hosts for this allocation", hostnames.len()); - // We require at least a dimension for hosts, and one for sub-host (e.g., GPUs) - anyhow::ensure!( - self.spec.extent.len() >= 2, - "invalid extent: {}, expected at least 2 dimensions", - self.spec.extent - ); + // Split the extent into regions, one per host. + use crate::alloc::ProcAllocationMode; + + // For HostLevel, pre-compute regions. For ProcLevel, skip this step. + let regions: Option> = match self.spec.proc_allocation_mode { + ProcAllocationMode::ProcLevel => { + // We require at least a dimension for hosts, and one for sub-host (e.g., GPUs) + anyhow::ensure!( + self.spec.extent.len() >= 2, + "invalid extent: {}, expected at least 2 dimensions", + self.spec.extent + ); + None + } + ProcAllocationMode::HostLevel => Some({ + // HostLevel: each point is a host, create a region for each point + let num_points = self.spec.extent.num_ranks(); + anyhow::ensure!( + hosts.len() >= num_points, + "HostLevel allocation mode requires {} hosts (one per point in extent {}), but only {} hosts were provided", + num_points, + self.spec.extent, + hosts.len() + ); - // We group by the innermost dimension of the extent. - let split_dim = &self.spec.extent.labels()[self.spec.extent.len() - 1]; - for (i, region) in self.spec.extent.group_by(split_dim)?.enumerate() { - let host = &hosts[i]; - tracing::debug!("allocating: {} for host: {}", region, host.id); + // For HostLevel, create a single-point region for each rank + // Each region contains one point that maps to the correct global rank + let labels = self.spec.extent.labels().to_vec(); - let remote_addr = match self.spec.transport { - ChannelTransport::MetaTls(_) => { - format!("metatls!{}:{}", host.hostname, self.remote_allocator_port) - } - ChannelTransport::Tcp(TcpMode::Localhost) => { - // TODO: @rusch see about moving over to config for this - format!("tcp![::1]:{}", self.remote_allocator_port) - } - ChannelTransport::Tcp(TcpMode::Hostname) => { - format!("tcp!{}:{}", host.hostname, self.remote_allocator_port) + // Compute strides for row-major layout: strides[i] = product of sizes[i+1..n] + let extent_sizes = self.spec.extent.sizes(); + let mut parent_strides = vec![1; extent_sizes.len()]; + for i in (0..extent_sizes.len() - 1).rev() { + parent_strides[i] = parent_strides[i + 1] * extent_sizes[i + 1]; } - // Used only for testing. - ChannelTransport::Unix => host.hostname.clone(), - _ => { - anyhow::bail!( - "unsupported transport for host {}: {:?}", - host.id, - self.spec.transport, + + (0..num_points) + .map(|rank| { + // Create a slice containing only this rank + // Use parent's strides so local point [0,0,...] maps to the correct global rank + let sizes = vec![1; labels.len()]; + Region::new( + labels.clone(), + Slice::new(rank, sizes, parent_strides.clone()).unwrap(), + ) + }) + .collect() + }), + }; + + match self.spec.proc_allocation_mode { + ProcAllocationMode::ProcLevel => { + // We group by the innermost dimension of the extent. + let split_dim = &self.spec.extent.labels()[self.spec.extent.len() - 1]; + for (i, region) in self.spec.extent.group_by(split_dim)?.enumerate() { + let host = &hosts[i]; + tracing::debug!("allocating: {} for host: {}", region, host.id); + + let remote_addr = match self.spec.transport { + ChannelTransport::MetaTls(_) => { + format!("metatls!{}:{}", host.hostname, self.remote_allocator_port) + } + ChannelTransport::Tcp(TcpMode::Localhost) => { + // TODO: @rusch see about moving over to config for this + format!("tcp![::1]:{}", self.remote_allocator_port) + } + ChannelTransport::Tcp(TcpMode::Hostname) => { + format!("tcp!{}:{}", host.hostname, self.remote_allocator_port) + } + // Used only for testing. + ChannelTransport::Unix => host.hostname.clone(), + _ => { + anyhow::bail!( + "unsupported transport for host {}: {:?}", + host.id, + self.spec.transport, + ); + } + }; + + tracing::debug!("dialing remote: {} for host {}", remote_addr, host.id); + let remote_addr = remote_addr.parse::()?; + let tx = channel::dial(remote_addr.clone()) + .map_err(anyhow::Error::from) + .context(format!( + "failed to dial remote {} for host {}", + remote_addr, host.id + ))?; + + // Possibly we could use the HostId directly here. + let alloc_key = ShortUuid::generate(); + assert!( + self.alloc_to_host + .insert(alloc_key.clone(), host.id.clone()) + .is_none() + ); + + let trace_id = hyperactor_telemetry::trace::get_or_create_trace_id(); + let client_context = Some(ClientContext { trace_id }); + let message = RemoteProcessAllocatorMessage::Allocate { + alloc_key: alloc_key.clone(), + extent: region.extent(), + bootstrap_addr: self.bootstrap_addr.clone(), + hosts: hostnames.clone(), + client_context, + // Make sure allocator's forwarder uses the same IP address + // which is known to alloc. This is to avoid allocator picks + // its host's private IP address, while its known addres to + // alloc is a public IP address. In some environment, that + // could lead to port unreachable error. + forwarder_addr: with_unspecified_port_or_any(&remote_addr), + }; + tracing::info!( + name = message.as_ref(), + "sending allocate message to workers" + ); + tx.post(message); + + self.host_states.insert( + host.id.clone(), + RemoteProcessAllocHostState { + alloc_key, + host_id: host.id.clone(), + tx, + active_procs: HashSet::new(), + region, + world_id: None, + failed: false, + allocated: false, + }, + remote_addr, ); } - }; - tracing::debug!("dialing remote: {} for host {}", remote_addr, host.id); - let remote_addr = remote_addr.parse::()?; - let tx = channel::dial(remote_addr.clone()) - .map_err(anyhow::Error::from) - .context(format!( - "failed to dial remote {} for host {}", - remote_addr, host.id - ))?; - - // Possibly we could use the HostId directly here. - let alloc_key = ShortUuid::generate(); - assert!( - self.alloc_to_host - .insert(alloc_key.clone(), host.id.clone()) - .is_none() - ); + self.ordered_hosts = hosts; + } + ProcAllocationMode::HostLevel => { + let regions = regions.unwrap(); + let num_regions = regions.len(); + for (i, region) in regions.into_iter().enumerate() { + let host = &hosts[i]; + tracing::debug!("allocating: {} for host: {}", region, host.id); + + let remote_addr = match self.spec.transport { + ChannelTransport::MetaTls(_) => { + format!("metatls!{}:{}", host.hostname, self.remote_allocator_port) + } + ChannelTransport::Tcp(TcpMode::Localhost) => { + // TODO: @rusch see about moving over to config for this + format!("tcp![::1]:{}", self.remote_allocator_port) + } + ChannelTransport::Tcp(TcpMode::Hostname) => { + format!("tcp!{}:{}", host.hostname, self.remote_allocator_port) + } + // Used only for testing. + ChannelTransport::Unix => host.hostname.clone(), + _ => { + anyhow::bail!( + "unsupported transport for host {}: {:?}", + host.id, + self.spec.transport, + ); + } + }; + + tracing::debug!("dialing remote: {} for host {}", remote_addr, host.id); + let remote_addr = remote_addr.parse::()?; + let tx = channel::dial(remote_addr.clone()) + .map_err(anyhow::Error::from) + .context(format!( + "failed to dial remote {} for host {}", + remote_addr, host.id + ))?; + + // Possibly we could use the HostId directly here. + let alloc_key = ShortUuid::generate(); + assert!( + self.alloc_to_host + .insert(alloc_key.clone(), host.id.clone()) + .is_none() + ); - let trace_id = hyperactor_telemetry::trace::get_or_create_trace_id(); - let client_context = Some(ClientContext { trace_id }); - let message = RemoteProcessAllocatorMessage::Allocate { - alloc_key: alloc_key.clone(), - extent: region.extent(), - bootstrap_addr: self.bootstrap_addr.clone(), - hosts: hostnames.clone(), - client_context, - // Make sure allocator's forwarder uses the same IP address - // which is known to alloc. This is to avoid allocator picks - // its host's private IP address, while its known addres to - // alloc is a public IP address. In some environment, that - // could lead to port unreachable error. - forwarder_addr: with_unspecified_port_or_any(&remote_addr), - }; - tracing::info!( - name = message.as_ref(), - "sending allocate message to workers" - ); - tx.post(message); - - self.host_states.insert( - host.id.clone(), - RemoteProcessAllocHostState { - alloc_key, - host_id: host.id.clone(), - tx, - active_procs: HashSet::new(), - region, - world_id: None, - failed: false, - allocated: false, - }, - remote_addr, - ); - } + let trace_id = hyperactor_telemetry::trace::get_or_create_trace_id(); + let client_context = Some(ClientContext { trace_id }); + let message = RemoteProcessAllocatorMessage::Allocate { + alloc_key: alloc_key.clone(), + extent: region.extent(), + bootstrap_addr: self.bootstrap_addr.clone(), + hosts: hostnames.clone(), + client_context, + // Make sure allocator's forwarder uses the same IP address + // which is known to alloc. This is to avoid allocator picks + // its host's private IP address, while its known addres to + // alloc is a public IP address. In some environment, that + // could lead to port unreachable error. + forwarder_addr: with_unspecified_port_or_any(&remote_addr), + }; + tracing::info!( + name = message.as_ref(), + "sending allocate message to workers" + ); + tx.post(message); - self.ordered_hosts = hosts; + self.host_states.insert( + host.id.clone(), + RemoteProcessAllocHostState { + alloc_key, + host_id: host.id.clone(), + tx, + active_procs: HashSet::new(), + region, + world_id: None, + failed: false, + allocated: false, + }, + remote_addr, + ); + } + + // Only store hosts that were actually used for regions + // If num_regions < hosts.len(), we only use the first num_regions hosts + self.ordered_hosts = hosts.into_iter().take(num_regions).collect(); + } + } self.start_comm_watcher().await; self.started = true; @@ -2076,6 +2212,7 @@ mod test_alloc { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }; let world_id = WorldId("test_world_id".to_string()); @@ -2205,6 +2342,7 @@ mod test_alloc { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }; let world_id = WorldId("test_world_id".to_string()); @@ -2336,6 +2474,7 @@ mod test_alloc { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }; let world_id = WorldId("test_world_id".to_string()); diff --git a/hyperactor_mesh/src/alloc/sim.rs b/hyperactor_mesh/src/alloc/sim.rs index 983425f6f..e14ec6997 100644 --- a/hyperactor_mesh/src/alloc/sim.rs +++ b/hyperactor_mesh/src/alloc/sim.rs @@ -196,6 +196,7 @@ mod tests { }, proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await .unwrap(); diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index cd55b59fd..89fd863d4 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -3558,6 +3558,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await .unwrap(); diff --git a/hyperactor_mesh/src/comm.rs b/hyperactor_mesh/src/comm.rs index 34b32822e..9539153a5 100644 --- a/hyperactor_mesh/src/comm.rs +++ b/hyperactor_mesh/src/comm.rs @@ -824,6 +824,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1061,6 +1062,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); diff --git a/hyperactor_mesh/src/proc_mesh.rs b/hyperactor_mesh/src/proc_mesh.rs index a02f75c92..6ae6210c6 100644 --- a/hyperactor_mesh/src/proc_mesh.rs +++ b/hyperactor_mesh/src/proc_mesh.rs @@ -1127,6 +1127,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1145,6 +1146,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1180,6 +1182,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -1237,6 +1240,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); diff --git a/hyperactor_mesh/src/reference.rs b/hyperactor_mesh/src/reference.rs index 7768825fe..a1b3b8d10 100644 --- a/hyperactor_mesh/src/reference.rs +++ b/hyperactor_mesh/src/reference.rs @@ -324,6 +324,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -333,6 +334,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); diff --git a/hyperactor_mesh/src/v1/testing.rs b/hyperactor_mesh/src/v1/testing.rs index b67e2e298..851e982f9 100644 --- a/hyperactor_mesh/src/v1/testing.rs +++ b/hyperactor_mesh/src/v1/testing.rs @@ -55,6 +55,7 @@ pub async fn proc_meshes(cx: &impl context::Actor, extent: Extent) -> Vec Vec Vec> { constraints: Default::default(), proc_name: None, transport: default_transport(), + proc_allocation_mode: Default::default(), }; vec![ @@ -121,6 +124,7 @@ pub async fn local_proc_mesh(extent: Extent) -> (ProcMesh, Instance<()>, DialMai constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -144,6 +148,7 @@ pub async fn host_mesh(extent: Extent) -> HostMesh { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await .unwrap(); diff --git a/hyperactor_mesh/test/hyperactor_mesh_proxy_test.rs b/hyperactor_mesh/test/hyperactor_mesh_proxy_test.rs index 4d9a78a3d..13f5b6737 100644 --- a/hyperactor_mesh/test/hyperactor_mesh_proxy_test.rs +++ b/hyperactor_mesh/test/hyperactor_mesh_proxy_test.rs @@ -122,6 +122,7 @@ impl Actor for ProxyActor { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -163,6 +164,7 @@ async fn run_client(exe_path: PathBuf, keep_alive: bool) -> Result<(), anyhow::E constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await .unwrap(); diff --git a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs index 18b32ba2b..1322c96ea 100644 --- a/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs +++ b/hyperactor_mesh/test/process_allocator_cleanup/process_allocator_test_bin.rs @@ -49,6 +49,7 @@ async fn main() -> Result<(), Box> { constraints: AllocConstraints::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await?; diff --git a/hyperactor_mesh/test/remote_process_alloc.rs b/hyperactor_mesh/test/remote_process_alloc.rs index 4cf971b88..f009b9da2 100644 --- a/hyperactor_mesh/test/remote_process_alloc.rs +++ b/hyperactor_mesh/test/remote_process_alloc.rs @@ -93,6 +93,7 @@ async fn main() { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }, WorldId("test_world_id".to_string()), 0, diff --git a/monarch_hyperactor/src/alloc.rs b/monarch_hyperactor/src/alloc.rs index 1cfedd2db..492e2a9b1 100644 --- a/monarch_hyperactor/src/alloc.rs +++ b/monarch_hyperactor/src/alloc.rs @@ -228,6 +228,7 @@ impl PyAllocSpec { constraints: constraints.inner.clone(), proc_name: None, transport: default_transport(), + proc_allocation_mode: Default::default(), }, transport: None, }) diff --git a/monarch_hyperactor/src/bin/process_allocator/common.rs b/monarch_hyperactor/src/bin/process_allocator/common.rs index b8a325cd1..0f32fa1c1 100644 --- a/monarch_hyperactor/src/bin/process_allocator/common.rs +++ b/monarch_hyperactor/src/bin/process_allocator/common.rs @@ -136,6 +136,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }; let mut initializer = remoteprocess::MockRemoteProcessAllocInitializer::new(); @@ -198,6 +199,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }; let mut initializer = remoteprocess::MockRemoteProcessAllocInitializer::new(); @@ -253,6 +255,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }; let mut initializer = remoteprocess::MockRemoteProcessAllocInitializer::new(); @@ -324,6 +327,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }; let mut initializer = remoteprocess::MockRemoteProcessAllocInitializer::new(); diff --git a/monarch_hyperactor/src/code_sync/manager.rs b/monarch_hyperactor/src/code_sync/manager.rs index 504221050..bcad94185 100644 --- a/monarch_hyperactor/src/code_sync/manager.rs +++ b/monarch_hyperactor/src/code_sync/manager.rs @@ -652,6 +652,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await?; diff --git a/monarch_hyperactor/src/code_sync/rsync.rs b/monarch_hyperactor/src/code_sync/rsync.rs index d41c8c668..f37c08bed 100644 --- a/monarch_hyperactor/src/code_sync/rsync.rs +++ b/monarch_hyperactor/src/code_sync/rsync.rs @@ -505,6 +505,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await?; diff --git a/monarch_hyperactor/src/proc_mesh.rs b/monarch_hyperactor/src/proc_mesh.rs index 08e793413..938495f62 100644 --- a/monarch_hyperactor/src/proc_mesh.rs +++ b/monarch_hyperactor/src/proc_mesh.rs @@ -559,6 +559,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await?; @@ -608,6 +609,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await?; @@ -673,6 +675,7 @@ mod tests { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await?; diff --git a/monarch_hyperactor/tests/code_sync/auto_reload.rs b/monarch_hyperactor/tests/code_sync/auto_reload.rs index 689cb45b6..ad71b7276 100644 --- a/monarch_hyperactor/tests/code_sync/auto_reload.rs +++ b/monarch_hyperactor/tests/code_sync/auto_reload.rs @@ -51,6 +51,7 @@ CONSTANT = "initial_constant" constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await?; diff --git a/monarch_rdma/examples/cuda_ping_pong/src/cuda_ping_pong.rs b/monarch_rdma/examples/cuda_ping_pong/src/cuda_ping_pong.rs index 2a65d02a1..e4e283807 100644 --- a/monarch_rdma/examples/cuda_ping_pong/src/cuda_ping_pong.rs +++ b/monarch_rdma/examples/cuda_ping_pong/src/cuda_ping_pong.rs @@ -696,6 +696,7 @@ pub async fn run() -> Result<(), anyhow::Error> { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await?, ) @@ -709,6 +710,7 @@ pub async fn run() -> Result<(), anyhow::Error> { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await?, ) diff --git a/monarch_rdma/examples/parameter_server/src/parameter_server.rs b/monarch_rdma/examples/parameter_server/src/parameter_server.rs index 659e32fc3..2d694d5fe 100644 --- a/monarch_rdma/examples/parameter_server/src/parameter_server.rs +++ b/monarch_rdma/examples/parameter_server/src/parameter_server.rs @@ -481,6 +481,7 @@ pub async fn run(num_workers: usize, num_steps: usize) -> Result<(), anyhow::Err constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await?, ) @@ -509,6 +510,7 @@ pub async fn run(num_workers: usize, num_steps: usize) -> Result<(), anyhow::Err constraints: Default::default(), proc_name: None, transport: ChannelTransport::Unix, + proc_allocation_mode: Default::default(), }) .await?, ) diff --git a/monarch_rdma/src/test_utils.rs b/monarch_rdma/src/test_utils.rs index d63a0902c..113ad9a60 100644 --- a/monarch_rdma/src/test_utils.rs +++ b/monarch_rdma/src/test_utils.rs @@ -329,6 +329,7 @@ pub mod test_utils { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); @@ -347,6 +348,7 @@ pub mod test_utils { constraints: Default::default(), proc_name: None, transport: ChannelTransport::Local, + proc_allocation_mode: Default::default(), }) .await .unwrap(); diff --git a/python/monarch/_src/job/meta.py b/python/monarch/_src/job/meta.py index 31d00ccc1..0adcdcef5 100644 --- a/python/monarch/_src/job/meta.py +++ b/python/monarch/_src/job/meta.py @@ -22,12 +22,13 @@ from monarch._src.actor.allocator import AllocateMixin from monarch._src.actor.host_mesh import host_mesh_from_alloc from monarch._src.actor.meta.allocator import ( - MastAllocator, MastAllocatorBase, MastAllocatorConfig, + MastHostAllocator, + MastHostAllocatorBase, ) -from monarch._src.job.job import BatchJob, JobState, JobTrait +from monarch._src.job.job import BatchJob, enable_transport, JobState, JobTrait from monarch.tools.commands import create, info, kill from monarch.tools.components.meta import hyperactor @@ -39,6 +40,8 @@ from torchx.specs import AppState from torchx.specs.fb.component_helpers import Packages +enable_transport("metatls") + class _MASTSpec(NamedTuple): hpcIdentity: str @@ -56,7 +59,7 @@ class _MASTSpec(NamedTuple): class _MASTAllocator(AllocateMixin): def __init__(self, config: MastAllocatorConfig, job_start: Shared[None]): - self._mast = MastAllocatorBase(config) + self._mast = MastHostAllocatorBase(config) self._job_start = job_start def allocate_nonblocking(self, spec: AllocSpec) -> "PythonTask[Alloc]": @@ -171,7 +174,9 @@ def _state(self) -> JobState: ), job_started, ) - constraints = AllocConstraints({MastAllocator.ALLOC_LABEL_TASK_GROUP: name}) + constraints = AllocConstraints( + {MastHostAllocator.ALLOC_LABEL_TASK_GROUP: name} + ) host_meshes[name] = host_mesh_from_alloc( name, Extent(["hosts"], [num_host]), allocator, constraints )