Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 54 additions & 31 deletions hyperactor/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,9 +736,7 @@ impl Proc {
RealClock
.timeout(
timeout,
root.wait_for(|state: &ActorStatus| {
matches!(*state, ActorStatus::Stopped)
}),
root.wait_for(|state: &ActorStatus| state.is_terminal()),
)
.await
.ok()
Expand Down Expand Up @@ -1073,6 +1071,21 @@ impl<A: Actor> Instance<A> {
#[track_caller]
fn change_status(&self, new: ActorStatus) {
let old = self.inner.status_tx.send_replace(new.clone());
// 2 cases are allowed:
// * non-terminal -> non-terminal
// * non-terminal -> terminal
// terminal -> terminal is not allowed unless it is the same status (no-op).
// terminal -> non-terminal is never allowed.
assert!(
!old.is_terminal() && !new.is_terminal()
|| !old.is_terminal() && new.is_terminal()
|| old == new,
"actor changing status illegally, only allow non-terminal -> non-terminal \
and non-terminal -> terminal statuses. actor_id={}, prev_status={}, status={}",
self.self_id(),
old,
new
);
// Actor status changes between Idle and Processing when handling every
// message. It creates too many logs if we want to log these 2 states.
// Therefore we skip the status changes between them.
Expand All @@ -1098,8 +1111,8 @@ impl<A: Actor> Instance<A> {
self.inner.status_tx.borrow().is_terminal()
}

fn is_stopped(&self) -> bool {
self.inner.status_tx.borrow().is_stopped()
fn is_stopping(&self) -> bool {
self.inner.status_tx.borrow().is_stopping()
}

/// This instance's actor ID.
Expand Down Expand Up @@ -1197,10 +1210,12 @@ impl<A: Actor> Instance<A> {
let result = self
.run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
.await;

assert!(self.is_stopping());
let event = match result {
Ok(_) => {
// actor should have been stopped by run_actor_tree
assert!(self.is_stopped());
// success exit case
self.change_status(ActorStatus::Stopped);
None
}
Err(err) => {
Expand Down Expand Up @@ -1296,15 +1311,10 @@ impl<A: Actor> Instance<A> {
}
};

match &result {
Ok(_) => assert!(self.is_stopped()),
Err(err) => {
tracing::error!("{}: actor failure: {}", self.self_id(), err);
assert!(!self.is_terminal());
// Send Stopping instead of Failed, because we still need to
// unlink child actors.
self.change_status(ActorStatus::Stopping);
}
assert!(!self.is_terminal());
self.change_status(ActorStatus::Stopping);
if let Err(err) = &result {
tracing::error!("{}: actor failure: {}", self.self_id(), err);
}

// After this point, we know we won't spawn any more children,
Expand Down Expand Up @@ -1447,7 +1457,6 @@ impl<A: Actor> Instance<A> {
}

if need_drain {
self.change_status(ActorStatus::Stopping);
let mut n = 0;
while let Ok(work) = work_rx.try_recv() {
if let Err(err) = work.handle(actor, self).await {
Expand All @@ -1461,7 +1470,6 @@ impl<A: Actor> Instance<A> {
tracing::debug!("drained {} messages", n);
}
tracing::debug!("exited actor loop: {}", self.self_id());
self.change_status(ActorStatus::Stopped);
Ok(())
}

Expand Down Expand Up @@ -2148,6 +2156,7 @@ mod tests {
use hyperactor_macros::export;
use maplit::hashmap;
use serde_json::json;
use timed_test::async_timed_test;
use tokio::sync::Barrier;
use tokio::sync::oneshot;
use tracing::Level;
Expand Down Expand Up @@ -2283,7 +2292,7 @@ mod tests {
}

#[tracing_test::traced_test]
#[tokio::test]
#[async_timed_test(timeout_secs = 30)]
async fn test_spawn_actor() {
let proc = Proc::local();
let handle = proc.spawn::<TestActor>("test", ()).await.unwrap();
Expand Down Expand Up @@ -2332,7 +2341,7 @@ mod tests {
assert_matches!(*state.borrow(), ActorStatus::Stopped);
}

#[tokio::test]
#[async_timed_test(timeout_secs = 30)]
async fn test_proc_actors_messaging() {
let proc = Proc::local();
let first = proc.spawn::<TestActor>("first", ()).await.unwrap();
Expand Down Expand Up @@ -2365,7 +2374,7 @@ mod tests {
}
}

#[tokio::test]
#[async_timed_test(timeout_secs = 30)]
async fn test_actor_lookup() {
let proc = Proc::local();
let (client, _handle) = proc.instance("client").unwrap();
Expand Down Expand Up @@ -2426,7 +2435,7 @@ mod tests {
}

#[tracing_test::traced_test]
#[tokio::test]
#[async_timed_test(timeout_secs = 30)]
async fn test_spawn_child() {
let proc = Proc::local();

Expand Down Expand Up @@ -2474,17 +2483,27 @@ mod tests {
assert!(first.cell().inner.parent.upgrade().is_none());

// Supervision tree is torn down correctly.
// Once each actor is stopped, it should have no linked children.
let third_cell = third.cell().clone();
third.drain_and_stop().unwrap();
third.await;
assert!(second.cell().inner.children.is_empty());
assert!(third_cell.inner.children.is_empty());
drop(third_cell);
validate_link(second.cell(), first.cell());

let second_cell = second.cell().clone();
second.drain_and_stop().unwrap();
second.await;
assert!(first.cell().inner.children.is_empty());
assert!(second_cell.inner.children.is_empty());
drop(second_cell);

let first_cell = first.cell().clone();
first.drain_and_stop().unwrap();
first.await;
assert!(first_cell.inner.children.is_empty());
}

#[tokio::test]
#[async_timed_test(timeout_secs = 30)]
async fn test_child_lifecycle() {
let proc = Proc::local();

Expand All @@ -2502,7 +2521,7 @@ mod tests {
}
}

#[tokio::test]
#[async_timed_test(timeout_secs = 30)]
async fn test_parent_failure() {
let proc = Proc::local();
// Need to set a supervison coordinator for this Proc because there will
Expand Down Expand Up @@ -2536,7 +2555,7 @@ mod tests {
assert_eq!(root_1.await, ActorStatus::Stopped);
}

#[tokio::test]
#[async_timed_test(timeout_secs = 30)]
async fn test_actor_ledger() {
async fn wait_until_idle(actor_handle: &ActorHandle<TestActor>) {
actor_handle
Expand Down Expand Up @@ -2600,6 +2619,8 @@ mod tests {

let root_1 = TestActor::spawn_child(&root).await;
wait_until_idle(&root_1).await;
// Wait until the root actor processes the message and is then idle again.
wait_until_idle(&root).await;
{
let snapshot = proc.state().ledger.snapshot();
assert_eq!(
Expand Down Expand Up @@ -2627,6 +2648,7 @@ mod tests {

let root_1_1 = TestActor::spawn_child(&root_1).await;
wait_until_idle(&root_1_1).await;
wait_until_idle(&root_1).await;
{
let snapshot = proc.state().ledger.snapshot();
assert_eq!(
Expand Down Expand Up @@ -2666,6 +2688,7 @@ mod tests {

let root_2 = TestActor::spawn_child(&root).await;
wait_until_idle(&root_2).await;
wait_until_idle(&root).await;
{
let snapshot = proc.state().ledger.snapshot();
assert_eq!(
Expand Down Expand Up @@ -2757,7 +2780,7 @@ mod tests {
}
}

#[tokio::test]
#[async_timed_test(timeout_secs = 30)]
async fn test_multi_handler() {
// TEMPORARY: This test is currently a bit awkward since we don't yet expose
// public interfaces to multi-handlers. This will be fixed shortly.
Expand Down Expand Up @@ -2816,7 +2839,7 @@ mod tests {
assert_eq!(state.load(Ordering::SeqCst), 123);
}

#[tokio::test]
#[async_timed_test(timeout_secs = 30)]
async fn test_actor_panic() {
// Need this custom hook to store panic backtrace in task_local.
panic_handler::set_panic_hook();
Expand Down Expand Up @@ -2849,7 +2872,7 @@ mod tests {
}
}

#[tokio::test]
#[async_timed_test(timeout_secs = 30)]
async fn test_local_supervision_propagation() {
#[derive(Debug)]
struct TestActor(Arc<AtomicBool>, bool);
Expand Down Expand Up @@ -2960,7 +2983,7 @@ mod tests {
);
}

#[tokio::test]
#[async_timed_test(timeout_secs = 30)]
async fn test_instance() {
#[derive(Debug, Default, Actor)]
struct TestActor;
Expand Down
2 changes: 1 addition & 1 deletion hyperactor_mesh/src/proc_mesh/mesh_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ impl MeshAgentMessageHandler for ProcMeshAgent {
match RealClock
.timeout(
tokio::time::Duration::from_millis(timeout_ms),
status.wait_for(|state: &ActorStatus| matches!(*state, ActorStatus::Stopped)),
status.wait_for(|state: &ActorStatus| state.is_terminal()),
)
.await
{
Expand Down