Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions hyperactor_mesh/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# @generated by autocargo from //monarch/hyperactor_mesh:[hyperactor_mesh,hyperactor_mesh_test_bootstrap]
# @generated by autocargo from //monarch/hyperactor_mesh:[hyperactor_mesh,hyperactor_mesh_test_bootstrap,process_allocator_cleanup,process_allocator_test_bin,process_allocator_test_bootstrap]

[package]
name = "hyperactor_mesh"
Expand All @@ -11,11 +11,24 @@ license = "BSD-3-Clause"
name = "hyperactor_mesh_test_bootstrap"
path = "test/bootstrap.rs"

[[bin]]
name = "process_allocator_test_bin"
path = "test/process_allocator_cleanup/process_allocator_test_bin.rs"

[[bin]]
name = "process_allocator_test_bootstrap"
path = "test/process_allocator_cleanup/process_allocator_test_bootstrap.rs"

[[test]]
name = "process_allocator_cleanup"
path = "test/process_allocator_cleanup/process_allocator_cleanup.rs"

[dependencies]
anyhow = "1.0.98"
async-trait = "0.1.86"
bincode = "1.3.3"
bitmaps = "3.2.1"
buck-resources = "1"
enum-as-inner = "0.6.0"
erased-serde = "0.3.27"
futures = { version = "0.3.30", features = ["async-await", "compat"] }
Expand All @@ -28,15 +41,16 @@ nix = { version = "0.29.0", features = ["dir", "event", "hostname", "inotify", "
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1.0.185", features = ["derive", "rc"] }
serde_bytes = "0.11"
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "unbounded_depth"] }
tempfile = "3.15"
thiserror = "2.0.12"
tokio = { version = "1.45.0", features = ["full", "test-util", "tracing"] }
tokio-stream = { version = "0.1.17", features = ["fs", "io-util", "net", "signal", "sync", "time"] }
tokio-util = { version = "0.7.15", features = ["full"] }
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
tracing-subscriber = { version = "0.3.19", features = ["chrono", "env-filter", "json", "local-time", "parking_lot", "registry"] }

[dev-dependencies]
buck-resources = "1"
dir-diff = "0.3"
maplit = "1.0"
timed_test = { version = "0.0.0", path = "../timed_test" }
Expand Down
1 change: 1 addition & 0 deletions hyperactor_mesh/src/alloc/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ impl Alloc for LocalAlloc {
let created = ProcState::Created {
proc_id: proc_id.clone(),
coords,
pid: std::process::id(),
};
self.queue.push_back(ProcState::Running {
proc_id,
Expand Down
17 changes: 13 additions & 4 deletions hyperactor_mesh/src/alloc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ pub enum ProcState {
proc_id: ProcId,
/// Its assigned coordinates (in the alloc's shape).
coords: Vec<usize>,
/// The system process ID of the created child process.
pid: u32,
},
/// A proc was started.
Running {
Expand Down Expand Up @@ -130,16 +132,21 @@ pub enum ProcState {
impl fmt::Display for ProcState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ProcState::Created { proc_id, coords } => {
ProcState::Created {
proc_id,
coords,
pid,
} => {
write!(
f,
"{}: created at ({})",
"{}: created at ({}) with PID {}",
proc_id,
coords
.iter()
.map(|c| c.to_string())
.collect::<Vec<_>>()
.join(",")
.join(","),
pid
)
}
ProcState::Running { proc_id, addr, .. } => {
Expand Down Expand Up @@ -334,7 +341,9 @@ pub(crate) mod testing {
let mut running = HashSet::new();
while running.len() != 4 {
match alloc.next().await.unwrap() {
ProcState::Created { proc_id, coords } => {
ProcState::Created {
proc_id, coords, ..
} => {
procs.insert(proc_id, coords);
}
ProcState::Running { proc_id, .. } => {
Expand Down
37 changes: 22 additions & 15 deletions hyperactor_mesh/src/alloc/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,22 +329,29 @@ impl ProcessAlloc {
description: message,
})
}
Ok(mut process) => match self.ranks.assign(index) {
Err(_index) => {
tracing::info!("could not assign rank to {}", proc_id);
let _ = process.kill().await;
None
}
Ok(rank) => {
let (handle, monitor) = Child::monitored(process);
self.children.spawn(async move { (index, monitor.await) });
self.active.insert(index, handle);
// Adjust for shape slice offset for non-zero shapes (sub-shapes).
let rank = rank + self.spec.shape.slice().offset();
let coords = self.spec.shape.slice().coordinates(rank).unwrap();
Some(ProcState::Created { proc_id, coords })
Ok(mut process) => {
let pid = process.id().unwrap_or(0);
match self.ranks.assign(index) {
Err(_index) => {
tracing::info!("could not assign rank to {}", proc_id);
let _ = process.kill().await;
None
}
Ok(rank) => {
let (handle, monitor) = Child::monitored(process);
self.children.spawn(async move { (index, monitor.await) });
self.active.insert(index, handle);
// Adjust for shape slice offset for non-zero shapes (sub-shapes).
let rank = rank + self.spec.shape.slice().offset();
let coords = self.spec.shape.slice().coordinates(rank).unwrap();
Some(ProcState::Created {
proc_id,
coords,
pid,
})
}
}
},
}
}
}

Expand Down
64 changes: 35 additions & 29 deletions hyperactor_mesh/src/alloc/remoteprocess.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,29 +936,24 @@ impl Alloc for RemoteProcessAlloc {
}

break match update {
Some(ProcState::Created { proc_id, coords }) => {
match self.project_proc_into_global_shape(&proc_id, &coords) {
Ok(global_coords) => {
tracing::debug!(
"reprojected coords: {:?} -> {:?}",
coords,
global_coords
);
Some(ProcState::Created {
proc_id,
coords: global_coords,
})
}
Err(e) => {
tracing::error!(
"failed to project coords for proc: {}: {}",
proc_id,
e
);
None
}
Some(ProcState::Created {
proc_id,
coords,
pid,
}) => match self.project_proc_into_global_shape(&proc_id, &coords) {
Ok(global_coords) => {
tracing::debug!("reprojected coords: {:?} -> {:?}", coords, global_coords);
Some(ProcState::Created {
proc_id,
coords: global_coords,
pid,
})
}
}
Err(e) => {
tracing::error!("failed to project coords for proc: {}: {}", proc_id, e);
None
}
},

Some(ProcState::Failed {
world_id: _,
Expand Down Expand Up @@ -1059,10 +1054,13 @@ mod test {
for i in 0..alloc_len {
let proc_id = format!("test[{}]", i).parse().unwrap();
let coords = shape.slice().coordinates(i).unwrap();
alloc
.expect_next()
.times(1)
.return_once(|| Some(ProcState::Created { proc_id, coords }));
alloc.expect_next().times(1).return_once(|| {
Some(ProcState::Created {
proc_id,
coords,
pid: 0,
})
});
}
for i in 0..alloc_len {
let proc_id = format!("test[{}]", i).parse().unwrap();
Expand Down Expand Up @@ -1155,7 +1153,11 @@ mod test {
while i < alloc_len {
let m = rx.recv().await.unwrap();
match m {
RemoteProcessProcStateMessage::Update(ProcState::Created { proc_id, coords }) => {
RemoteProcessProcStateMessage::Update(ProcState::Created {
proc_id,
coords,
..
}) => {
let expected_proc_id = format!("test[{}]", i).parse().unwrap();
let expected_coords = spec.shape.slice().coordinates(i).unwrap();
assert_eq!(proc_id, expected_proc_id);
Expand Down Expand Up @@ -1660,7 +1662,9 @@ mod test_alloc {
let proc_state = alloc.next().await.unwrap();
tracing::debug!("test got message: {:?}", proc_state);
match proc_state {
ProcState::Created { proc_id, coords } => {
ProcState::Created {
proc_id, coords, ..
} => {
procs.insert(proc_id);
proc_coords.insert(coords);
}
Expand Down Expand Up @@ -1904,7 +1908,9 @@ mod test_alloc {
let proc_state = alloc.next().await.unwrap();
tracing::debug!("test got message: {:?}", proc_state);
match proc_state {
ProcState::Created { proc_id, coords } => {
ProcState::Created {
proc_id, coords, ..
} => {
procs.insert(proc_id);
proc_coords.insert(coords);
}
Expand Down
4 changes: 3 additions & 1 deletion hyperactor_mesh/src/proc_mesh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ impl ProcMesh {
};

match state {
ProcState::Created { proc_id, coords } => {
ProcState::Created {
proc_id, coords, ..
} => {
let rank = shape
.slice()
.location(&coords)
Expand Down
Loading