diff --git a/hyperactor_mesh/src/v1/actor_mesh.rs b/hyperactor_mesh/src/v1/actor_mesh.rs index 1fb9e76ff..05f29aa22 100644 --- a/hyperactor_mesh/src/v1/actor_mesh.rs +++ b/hyperactor_mesh/src/v1/actor_mesh.rs @@ -469,6 +469,7 @@ mod tests { use crate::v1::ActorMeshRef; use crate::v1::Name; use crate::v1::ProcMesh; + use crate::v1::proc_mesh::ACTOR_SPAWN_MAX_IDLE; use crate::v1::proc_mesh::GET_ACTOR_STATE_MAX_IDLE; use crate::v1::testactor; use crate::v1::testing; @@ -938,4 +939,97 @@ mod tests { n, count ); } + + /// Test that actors not responding within stop timeout are + /// forcibly aborted. This is the V1 equivalent of + /// hyperactor_multiprocess/src/proc_actor.rs::test_stop_timeout. + #[async_timed_test(timeout_secs = 30)] + #[cfg(fbcode_build)] + async fn test_actor_mesh_stop_timeout() { + hyperactor_telemetry::initialize_logging_for_test(); + + // Override ACTOR_SPAWN_MAX_IDLE to make test fast and + // deterministic. ACTOR_SPAWN_MAX_IDLE is the maximum idle + // time between status updates during mesh operations + // (spawn/stop). When stop() is called, it waits for actors to + // report they've stopped. If actors don't respond within this + // timeout, they're forcibly aborted via JoinHandle::abort(). + // We set this to 1 second (instead of default 30s) so hung + // actors (sleeping 5s in this test) get aborted quickly, + // making the test fast. + let config = hyperactor::config::global::lock(); + let _guard = config.override_key(ACTOR_SPAWN_MAX_IDLE, std::time::Duration::from_secs(1)); + + let instance = testing::instance().await; + + // Create proc mesh with 2 replicas + let meshes = testing::proc_meshes(instance, extent!(replicas = 2)).await; + let proc_mesh = &meshes[1]; // Use ProcessAllocator version + + // Spawn SleepActors across the mesh that will block longer + // than timeout + let sleep_mesh = proc_mesh + .spawn::(instance, "sleepers", &()) + .await + .unwrap(); + + // Send each actor a message to sleep for 5 seconds (longer + // than 1-second timeout) + for actor_ref in sleep_mesh.values() { + actor_ref + .send(instance, std::time::Duration::from_secs(5)) + .unwrap(); + } + + // Give actors time to start sleeping + RealClock.sleep(std::time::Duration::from_millis(200)).await; + + // Count how many actors we spawned (for verification later) + let expected_actors = sleep_mesh.values().count(); + + // Now stop the mesh - actors won't respond in time, should be + // aborted. Time this operation to verify abort behavior. + let stop_start = RealClock.now(); + let result = sleep_mesh.stop(instance).await; + let stop_duration = RealClock.now().duration_since(stop_start); + + // Stop will return an error because actors didn't stop within + // the timeout. This is expected - the actors were forcibly + // aborted, and V1 reports this as an error. + match result { + Ok(_) => { + // It's possible actors stopped in time, but unlikely + // given 5-second sleep vs 1-second timeout + tracing::warn!("Actors stopped gracefully (unexpected but ok)"); + } + Err(ref e) => { + // Expected: timeout error indicating actors were aborted + let err_str = format!("{:?}", e); + assert!( + err_str.contains("Timeout"), + "Expected Timeout error, got: {:?}", + e + ); + tracing::info!( + "Stop timed out as expected for {} actors, they were aborted", + expected_actors + ); + } + } + + // Verify that stop completed quickly (~1-2 seconds for + // timeout + abort) rather than waiting the full 5 seconds for + // actors to finish sleeping. This proves actors were aborted, + // not waited for. + assert!( + stop_duration < std::time::Duration::from_secs(3), + "Stop took {:?}, expected < 3s (actors should have been aborted, not waited for)", + stop_duration + ); + assert!( + stop_duration >= std::time::Duration::from_millis(900), + "Stop took {:?}, expected >= 900ms (should have waited for timeout)", + stop_duration + ); + } } diff --git a/hyperactor_mesh/src/v1/testactor.rs b/hyperactor_mesh/src/v1/testactor.rs index df56443d4..49a15939d 100644 --- a/hyperactor_mesh/src/v1/testactor.rs +++ b/hyperactor_mesh/src/v1/testactor.rs @@ -29,9 +29,7 @@ use hyperactor::Named; use hyperactor::PortRef; use hyperactor::RefClient; use hyperactor::Unbind; -#[cfg(test)] use hyperactor::clock::Clock as _; -#[cfg(test)] use hyperactor::clock::RealClock; use hyperactor::config; use hyperactor::config::global::Source; @@ -159,6 +157,27 @@ impl Handler for TestActorWithSupervisionHandling { } } +/// A test actor that sleeps when it receives a Duration message. +/// Used for testing timeout and abort behavior. +#[derive(Actor, Default, Debug)] +#[hyperactor::export( + spawn = true, + handlers = [std::time::Duration], +)] +pub struct SleepActor; + +#[async_trait] +impl Handler for SleepActor { + async fn handle( + &mut self, + _cx: &Context, + duration: std::time::Duration, + ) -> Result<(), anyhow::Error> { + RealClock.sleep(duration).await; + Ok(()) + } +} + /// A message to forward to a visit list of ports. /// Each port removes the next entry, and adds it to the /// 'visited' list. diff --git a/hyperactor_multiprocess/src/proc_actor.rs b/hyperactor_multiprocess/src/proc_actor.rs index 62725cacd..9ee0784f3 100644 --- a/hyperactor_multiprocess/src/proc_actor.rs +++ b/hyperactor_multiprocess/src/proc_actor.rs @@ -1063,16 +1063,14 @@ mod tests { } } - // V0 test - V1 needs equivalent coverage. Tests that actors not + // V0 test - V1 has equivalent coverage. Tests that actors not // responding within stop timeout are forcibly aborted // (JoinHandle::abort). Spawns SleepActors that block for 5 // seconds, calls stop() with 1-second timeout, verifies abort - // counts and "aborting JoinHandle" logs. V1 uses the same - // underlying mechanism (Proc::destroy_and_wait) but lacks test - // coverage. V1's ActorMesh::stop() uses global config timeout - // (ACTOR_SPAWN_MAX_IDLE) and doesn't expose stopped/aborted - // counts, but equivalent tests should verify timeout and abort - // behavior work correctly. + // counts and "aborting JoinHandle" logs. V1 equivalent: + // hyperactor_mesh/src/v1/actor_mesh.rs::test_actor_mesh_stop_timeout. + // Both use the same underlying mechanism (Proc::destroy_and_wait), + // but V1 returns Err(Timeout) instead of Ok with abort counts. #[tracing_test::traced_test] #[tokio::test] #[cfg_attr(not(fbcode_build), ignore)] diff --git a/hyperactor_multiprocess/src/system_actor.rs b/hyperactor_multiprocess/src/system_actor.rs index 2712a3950..f668b5a54 100644 --- a/hyperactor_multiprocess/src/system_actor.rs +++ b/hyperactor_multiprocess/src/system_actor.rs @@ -1890,6 +1890,14 @@ mod tests { } } + // V0-specific test - no V1 equivalent. Unit test for + // SystemSupervisionState which tracks proc health and failed + // actors centrally at world level. Tests heartbeat timeout + // detection (marks procs expired if no heartbeat within timeout) + // and failed actor aggregation. V1 does not have centralized + // supervision state - V1 uses local supervision where actors + // handle ActorSupervisionEvent locally rather than reporting to a + // central SystemActor for world-level health monitoring. #[tokio::test] async fn test_supervision_state() { let mut sv = SystemSupervisionState::new(Duration::from_secs(1)); @@ -1989,6 +1997,16 @@ mod tests { ); } + // V0-specific test - no V1 equivalent. Tests SystemActor world + // orchestration where hosts can join before world is created. + // Flow: hosts send Join messages → queued by SystemActor → + // UpsertWorld defines world topology → SystemActor sends + // SpawnProc messages telling each host which procs to spawn. + // Verifies correct proc assignment across hosts. V1 does not have + // this orchestration model - V1 uses coordinated ProcMesh + // allocation where meshes are allocated in one operation, not + // assembled from hosts independently joining a central + // SystemActor. #[tracing_test::traced_test] #[tokio::test] async fn test_host_join_before_world() { @@ -2064,6 +2082,14 @@ mod tests { } } + // V0-specific test - no V1 equivalent. Tests SystemActor world + // orchestration where world is created before hosts join (reverse + // order of test_host_join_before_world). Flow: UpsertWorld + // defines topology → hosts send Join messages → SystemActor + // immediately sends SpawnProc messages. Tests that join order + // doesn't matter. V1 does not have this orchestration model - V1 + // uses coordinated ProcMesh allocation where meshes are allocated + // in one operation. #[tokio::test] async fn test_host_join_after_world() { // Spins up a new world with 2 hosts, with 3 procs each. @@ -2138,6 +2164,12 @@ mod tests { } } + // V0-specific test - no V1 equivalent. Unit test for + // SystemSnapshotFilter which filters worlds by name and labels + // when querying SystemActor. Tests world_matches() and + // labels_match() logic. V1 does not have SystemActor or + // SystemSnapshot - V1 uses mesh-based iteration and state queries + // instead. #[test] fn test_snapshot_filter() { let test_world = World::new( @@ -2176,6 +2208,13 @@ mod tests { )); } + // V0-specific test - no V1 equivalent. Tests SystemActor + // supervision behavior when mailbox server crashes: undeliverable + // messages are handled AND system supervision detects the + // unhealthy world state. V1 does not have SystemActor or world + // supervision. V1 undeliverable message handling (without + // supervision) is tested in + // hyperactor_mesh/src/v1/actor_mesh.rs::test_undeliverable_message_return. #[tokio::test] async fn test_undeliverable_message_return() { // System can't send a message to a remote actor because the @@ -2349,6 +2388,13 @@ mod tests { )); } + // V0-specific test - no V1 equivalent. Tests SystemActor stop + // when system is empty (no worlds). Sends SystemMessage::Stop to + // central SystemActor which coordinates shutdown of all worlds. + // V1 does not have a central SystemActor - V1 uses mesh-level + // stop operations (ProcMesh::stop(), HostMesh::shutdown()) where + // you stop individual meshes rather than a system-wide + // coordinator. #[tokio::test] async fn test_stop_fast() -> Result<()> { let server_handle = System::serve( @@ -2380,6 +2426,12 @@ mod tests { Ok(()) } + // V0-specific test - no V1 equivalent. Tests ReportingRouter's + // UpdateAddress behavior in simnet mode. When messages are sent, + // post_update_address() sends MailboxAdminMessage::UpdateAddress + // to update address caches with simnet source routing info. V1 + // does not have ReportingRouter or dynamic address updates - V1 + // uses static/direct addressing. #[tokio::test] async fn test_update_sim_address() { simnet::start();