Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 77 additions & 39 deletions packages/services/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,25 +219,39 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
.send()
.await?;

let Some(allocate_res) = runtime::spawn_actor(ctx, input, 0).await? else {
// Destroyed early
ctx.workflow(destroy::Input {
namespace_id: input.namespace_id,
actor_id: input.actor_id,
name: input.name.clone(),
key: input.key.clone(),
generation: 0,
kill: false,
})
.output()
.await?;
let lifecycle_state = match runtime::spawn_actor(ctx, input, 0, false).await? {
runtime::SpawnActorOutput::Allocated {
runner_id,
runner_workflow_id,
} => runtime::LifecycleState::new(runner_id, runner_workflow_id),
runtime::SpawnActorOutput::Sleep => {
ctx.activity(runtime::SetSleepingInput {
actor_id: input.actor_id,
})
.await?;

return Ok(());
runtime::LifecycleState::new_sleeping()
}
runtime::SpawnActorOutput::Destroy => {
// Destroyed early
ctx.workflow(destroy::Input {
namespace_id: input.namespace_id,
actor_id: input.actor_id,
name: input.name.clone(),
key: input.key.clone(),
generation: 0,
kill: false,
})
.output()
.await?;

return Ok(());
}
};

let lifecycle_res = ctx
.loope(
runtime::LifecycleState::new(allocate_res.runner_id, allocate_res.runner_workflow_id),
lifecycle_state,
|ctx, state| {
let input = input.clone();

Expand All @@ -262,6 +276,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
} else {
tracing::debug!(actor_id=?input.actor_id, "actor wake");

state.wake_for_alarm = true;

// Fake signal
Main::Wake(Wake {})
}
Expand Down Expand Up @@ -370,16 +386,24 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
state.sleeping = false;
state.will_wake = false;

if runtime::reschedule_actor(ctx, &input, state).await? {
// Destroyed early
return Ok(Loop::Break(runtime::LifecycleRes {
generation: state.generation,
// False here because if we received the destroy signal, it is
// guaranteed that we did not allocate another actor.
kill: false,
}));
match runtime::reschedule_actor(ctx, &input, state).await? {
runtime::SpawnActorOutput::Allocated { .. } => {},
runtime::SpawnActorOutput::Sleep => {
state.sleeping = true;
}
runtime::SpawnActorOutput::Destroy => {
// Destroyed early
return Ok(Loop::Break(runtime::LifecycleRes {
generation: state.generation,
// False here because if we received the destroy signal, it is
// guaranteed that we did not allocate another actor.
kill: false,
}));
}
}
} else {

state.wake_for_alarm = false;
} else if !state.will_wake {
state.will_wake = true;

tracing::debug!(
Expand Down Expand Up @@ -525,14 +549,19 @@ async fn handle_stopped(
.await?;
}

if runtime::reschedule_actor(ctx, &input, state).await? {
// Destroyed early
return Ok(Some(runtime::LifecycleRes {
generation: state.generation,
// False here because if we received the destroy signal, it is
// guaranteed that we did not allocate another actor.
kill: false,
}));
match runtime::reschedule_actor(ctx, &input, state).await? {
runtime::SpawnActorOutput::Allocated { .. } => {}
// NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash
// policy is `Restart`.
runtime::SpawnActorOutput::Sleep | runtime::SpawnActorOutput::Destroy => {
// Destroyed early
return Ok(Some(runtime::LifecycleRes {
generation: state.generation,
// False here because if we received the destroy signal, it is
// guaranteed that we did not allocate another actor.
kill: false,
}));
}
}
}
(true, CrashPolicy::Sleep) => {
Expand Down Expand Up @@ -566,17 +595,26 @@ async fn handle_stopped(
}
// Rewake actor immediately after stopping if `will_wake` was set
else if state.will_wake {
state.sleeping = false;
state.will_wake = false;

if runtime::reschedule_actor(ctx, &input, state).await? {
// Destroyed early
return Ok(Some(runtime::LifecycleRes {
generation: state.generation,
// False here because if we received the destroy signal, it is
// guaranteed that we did not allocate another actor.
kill: false,
}));
match runtime::reschedule_actor(ctx, &input, state).await? {
runtime::SpawnActorOutput::Allocated { .. } => {}
runtime::SpawnActorOutput::Sleep => {
state.sleeping = true;
}
runtime::SpawnActorOutput::Destroy => {
// Destroyed early
return Ok(Some(runtime::LifecycleRes {
generation: state.generation,
// False here because if we received the destroy signal, it is
// guaranteed that we did not allocate another actor.
kill: false,
}));
}
}

state.wake_for_alarm = false;
}

Ok(None)
Expand Down
Loading
Loading