From 3391b665e9e5a36d2981713c8d71f035e4aab482 Mon Sep 17 00:00:00 2001 From: Shayne Fletcher Date: Sat, 14 Jun 2025 23:22:10 -0700 Subject: [PATCH] [hyperactor_mesh]: actor_mesh: test: test_pingpong_full_mesh Differential Revision: [D76680489](https://our.internmc.facebook.com/intern/diff/D76680489/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D76680489/)! [ghstack-poisoned] --- hyperactor_mesh/src/actor_mesh.rs | 86 +++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/hyperactor_mesh/src/actor_mesh.rs b/hyperactor_mesh/src/actor_mesh.rs index f517d4e6a..22921d07a 100644 --- a/hyperactor_mesh/src/actor_mesh.rs +++ b/hyperactor_mesh/src/actor_mesh.rs @@ -622,6 +622,92 @@ mod tests { assert!(done_rx.recv().await.unwrap()); } + #[tokio::test] + async fn test_pingpong_full_mesh() { + use hyperactor::test_utils::pingpong::PingPongActor; + use hyperactor::test_utils::pingpong::PingPongActorParams; + use hyperactor::test_utils::pingpong::PingPongMessage; + + use futures::future::join_all; + + const X: usize = 3; + const Y: usize = 3; + const Z: usize = 3; + let alloc = $allocator + .allocate(AllocSpec { + shape: shape! { x = X, y = Y, z = Z }, + constraints: Default::default(), + }) + .await + .unwrap(); + + let proc_mesh = ProcMesh::allocate(alloc).await.unwrap(); + let (undeliverable_tx, _undeliverable_rx) = proc_mesh.client().open_port(); + let params = PingPongActorParams::new(undeliverable_tx.bind(), None); + let actor_mesh: RootActorMesh = + proc_mesh.spawn::("pingpong", ¶ms).await.unwrap(); + let shape = actor_mesh.shape(); + let slice = shape.slice(); + + // 6-neighbor offsets. + const NEIGHBORS: &[[isize; 3]] = &[ + [-1, 0, 0], + [1, 0, 0], + [0, -1, 0], + [0, 1, 0], + [0, 0, -1], + [0, 0, 1], + ]; + + // Collect futures for all OncePorts. + let mut futures = Vec::new(); + + for rank in slice.iter() { + let coords = slice.coordinates(rank).unwrap(); + let actor = actor_mesh.get(rank).unwrap(); + + for offset in NEIGHBORS { + let ndim = coords.len(); + let mut neighbor_coords = Vec::with_capacity(ndim); + let mut in_bounds = true; + + for dim in 0..ndim { + let c = coords[dim]; + let o = offset[dim]; + let size = slice.sizes()[dim]; + + let val = c as isize + o; + if val < 0 || val >= size as isize { + in_bounds = false; + break; + } + neighbor_coords.push(val as usize); + } + + if !in_bounds { + continue; + } + + if let Ok(neighbor_rank) = shape.slice().location(&neighbor_coords) { + let neighbor = actor_mesh.get(neighbor_rank).unwrap(); + let (done_tx, done_rx) = proc_mesh.client().open_once_port(); + actor + .send( + proc_mesh.client(), + PingPongMessage(4, neighbor.clone(), done_tx.bind()), + ) + .unwrap(); + futures.push(done_rx.recv()); + } + } + } + + let results = join_all(futures).await; + for result in results { + assert_eq!(result.unwrap(), true); + } + } + #[tokio::test] async fn test_cast() { let alloc = $allocator