Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions hyperactor_mesh/src/v1/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ mod tests {
use crate::v1::ActorMeshRef;
use crate::v1::Name;
use crate::v1::ProcMesh;
use crate::v1::proc_mesh::ACTOR_SPAWN_MAX_IDLE;
use crate::v1::proc_mesh::GET_ACTOR_STATE_MAX_IDLE;
use crate::v1::testactor;
use crate::v1::testing;
Expand Down Expand Up @@ -938,4 +939,97 @@ mod tests {
n, count
);
}

/// Test that actors not responding within stop timeout are
/// forcibly aborted. This is the V1 equivalent of
/// hyperactor_multiprocess/src/proc_actor.rs::test_stop_timeout.
#[async_timed_test(timeout_secs = 30)]
#[cfg(fbcode_build)]
async fn test_actor_mesh_stop_timeout() {
hyperactor_telemetry::initialize_logging_for_test();

// Override ACTOR_SPAWN_MAX_IDLE to make test fast and
// deterministic. ACTOR_SPAWN_MAX_IDLE is the maximum idle
// time between status updates during mesh operations
// (spawn/stop). When stop() is called, it waits for actors to
// report they've stopped. If actors don't respond within this
// timeout, they're forcibly aborted via JoinHandle::abort().
// We set this to 1 second (instead of default 30s) so hung
// actors (sleeping 5s in this test) get aborted quickly,
// making the test fast.
let config = hyperactor::config::global::lock();
let _guard = config.override_key(ACTOR_SPAWN_MAX_IDLE, std::time::Duration::from_secs(1));

let instance = testing::instance().await;

// Create proc mesh with 2 replicas
let meshes = testing::proc_meshes(instance, extent!(replicas = 2)).await;
let proc_mesh = &meshes[1]; // Use ProcessAllocator version

// Spawn SleepActors across the mesh that will block longer
// than timeout
let sleep_mesh = proc_mesh
.spawn::<testactor::SleepActor>(instance, "sleepers", &())
.await
.unwrap();

// Send each actor a message to sleep for 5 seconds (longer
// than 1-second timeout)
for actor_ref in sleep_mesh.values() {
actor_ref
.send(instance, std::time::Duration::from_secs(5))
.unwrap();
}

// Give actors time to start sleeping
RealClock.sleep(std::time::Duration::from_millis(200)).await;

// Count how many actors we spawned (for verification later)
let expected_actors = sleep_mesh.values().count();

// Now stop the mesh - actors won't respond in time, should be
// aborted. Time this operation to verify abort behavior.
let stop_start = RealClock.now();
let result = sleep_mesh.stop(instance).await;
let stop_duration = RealClock.now().duration_since(stop_start);

// Stop will return an error because actors didn't stop within
// the timeout. This is expected - the actors were forcibly
// aborted, and V1 reports this as an error.
match result {
Ok(_) => {
// It's possible actors stopped in time, but unlikely
// given 5-second sleep vs 1-second timeout
tracing::warn!("Actors stopped gracefully (unexpected but ok)");
}
Err(ref e) => {
// Expected: timeout error indicating actors were aborted
let err_str = format!("{:?}", e);
assert!(
err_str.contains("Timeout"),
"Expected Timeout error, got: {:?}",
e
);
tracing::info!(
"Stop timed out as expected for {} actors, they were aborted",
expected_actors
);
}
}

// Verify that stop completed quickly (~1-2 seconds for
// timeout + abort) rather than waiting the full 5 seconds for
// actors to finish sleeping. This proves actors were aborted,
// not waited for.
assert!(
stop_duration < std::time::Duration::from_secs(3),
"Stop took {:?}, expected < 3s (actors should have been aborted, not waited for)",
stop_duration
);
assert!(
stop_duration >= std::time::Duration::from_millis(900),
"Stop took {:?}, expected >= 900ms (should have waited for timeout)",
stop_duration
);
}
}
23 changes: 21 additions & 2 deletions hyperactor_mesh/src/v1/testactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ use hyperactor::Named;
use hyperactor::PortRef;
use hyperactor::RefClient;
use hyperactor::Unbind;
#[cfg(test)]
use hyperactor::clock::Clock as _;
#[cfg(test)]
use hyperactor::clock::RealClock;
use hyperactor::config;
use hyperactor::config::global::Source;
Expand Down Expand Up @@ -159,6 +157,27 @@ impl Handler<ActorSupervisionEvent> for TestActorWithSupervisionHandling {
}
}

/// A test actor that sleeps when it receives a Duration message.
/// Used for testing timeout and abort behavior.
#[derive(Actor, Default, Debug)]
#[hyperactor::export(
spawn = true,
handlers = [std::time::Duration],
)]
pub struct SleepActor;

#[async_trait]
impl Handler<std::time::Duration> for SleepActor {
async fn handle(
&mut self,
_cx: &Context<Self>,
duration: std::time::Duration,
) -> Result<(), anyhow::Error> {
RealClock.sleep(duration).await;
Ok(())
}
}

/// A message to forward to a visit list of ports.
/// Each port removes the next entry, and adds it to the
/// 'visited' list.
Expand Down
12 changes: 5 additions & 7 deletions hyperactor_multiprocess/src/proc_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1063,16 +1063,14 @@ mod tests {
}
}

// V0 test - V1 needs equivalent coverage. Tests that actors not
// V0 test - V1 has equivalent coverage. Tests that actors not
// responding within stop timeout are forcibly aborted
// (JoinHandle::abort). Spawns SleepActors that block for 5
// seconds, calls stop() with 1-second timeout, verifies abort
// counts and "aborting JoinHandle" logs. V1 uses the same
// underlying mechanism (Proc::destroy_and_wait) but lacks test
// coverage. V1's ActorMesh::stop() uses global config timeout
// (ACTOR_SPAWN_MAX_IDLE) and doesn't expose stopped/aborted
// counts, but equivalent tests should verify timeout and abort
// behavior work correctly.
// counts and "aborting JoinHandle" logs. V1 equivalent:
// hyperactor_mesh/src/v1/actor_mesh.rs::test_actor_mesh_stop_timeout.
// Both use the same underlying mechanism (Proc::destroy_and_wait),
// but V1 returns Err(Timeout) instead of Ok with abort counts.
#[tracing_test::traced_test]
#[tokio::test]
#[cfg_attr(not(fbcode_build), ignore)]
Expand Down
52 changes: 52 additions & 0 deletions hyperactor_multiprocess/src/system_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1890,6 +1890,14 @@ mod tests {
}
}

// V0-specific test - no V1 equivalent. Unit test for
// SystemSupervisionState which tracks proc health and failed
// actors centrally at world level. Tests heartbeat timeout
// detection (marks procs expired if no heartbeat within timeout)
// and failed actor aggregation. V1 does not have centralized
// supervision state - V1 uses local supervision where actors
// handle ActorSupervisionEvent locally rather than reporting to a
// central SystemActor for world-level health monitoring.
#[tokio::test]
async fn test_supervision_state() {
let mut sv = SystemSupervisionState::new(Duration::from_secs(1));
Expand Down Expand Up @@ -1989,6 +1997,16 @@ mod tests {
);
}

// V0-specific test - no V1 equivalent. Tests SystemActor world
// orchestration where hosts can join before world is created.
// Flow: hosts send Join messages → queued by SystemActor →
// UpsertWorld defines world topology → SystemActor sends
// SpawnProc messages telling each host which procs to spawn.
// Verifies correct proc assignment across hosts. V1 does not have
// this orchestration model - V1 uses coordinated ProcMesh
// allocation where meshes are allocated in one operation, not
// assembled from hosts independently joining a central
// SystemActor.
#[tracing_test::traced_test]
#[tokio::test]
async fn test_host_join_before_world() {
Expand Down Expand Up @@ -2064,6 +2082,14 @@ mod tests {
}
}

// V0-specific test - no V1 equivalent. Tests SystemActor world
// orchestration where world is created before hosts join (reverse
// order of test_host_join_before_world). Flow: UpsertWorld
// defines topology → hosts send Join messages → SystemActor
// immediately sends SpawnProc messages. Tests that join order
// doesn't matter. V1 does not have this orchestration model - V1
// uses coordinated ProcMesh allocation where meshes are allocated
// in one operation.
#[tokio::test]
async fn test_host_join_after_world() {
// Spins up a new world with 2 hosts, with 3 procs each.
Expand Down Expand Up @@ -2138,6 +2164,12 @@ mod tests {
}
}

// V0-specific test - no V1 equivalent. Unit test for
// SystemSnapshotFilter which filters worlds by name and labels
// when querying SystemActor. Tests world_matches() and
// labels_match() logic. V1 does not have SystemActor or
// SystemSnapshot - V1 uses mesh-based iteration and state queries
// instead.
#[test]
fn test_snapshot_filter() {
let test_world = World::new(
Expand Down Expand Up @@ -2176,6 +2208,13 @@ mod tests {
));
}

// V0-specific test - no V1 equivalent. Tests SystemActor
// supervision behavior when mailbox server crashes: undeliverable
// messages are handled AND system supervision detects the
// unhealthy world state. V1 does not have SystemActor or world
// supervision. V1 undeliverable message handling (without
// supervision) is tested in
// hyperactor_mesh/src/v1/actor_mesh.rs::test_undeliverable_message_return.
#[tokio::test]
async fn test_undeliverable_message_return() {
// System can't send a message to a remote actor because the
Expand Down Expand Up @@ -2349,6 +2388,13 @@ mod tests {
));
}

// V0-specific test - no V1 equivalent. Tests SystemActor stop
// when system is empty (no worlds). Sends SystemMessage::Stop to
// central SystemActor which coordinates shutdown of all worlds.
// V1 does not have a central SystemActor - V1 uses mesh-level
// stop operations (ProcMesh::stop(), HostMesh::shutdown()) where
// you stop individual meshes rather than a system-wide
// coordinator.
#[tokio::test]
async fn test_stop_fast() -> Result<()> {
let server_handle = System::serve(
Expand Down Expand Up @@ -2380,6 +2426,12 @@ mod tests {
Ok(())
}

// V0-specific test - no V1 equivalent. Tests ReportingRouter's
// UpdateAddress behavior in simnet mode. When messages are sent,
// post_update_address() sends MailboxAdminMessage::UpdateAddress
// to update address caches with simnet source routing info. V1
// does not have ReportingRouter or dynamic address updates - V1
// uses static/direct addressing.
#[tokio::test]
async fn test_update_sim_address() {
simnet::start();
Expand Down