Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions engine/packages/guard/src/routing/pegboard_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use rivet_guard_core::proxy_service::{RouteConfig, RouteTarget, RoutingOutput, R
use super::{SEC_WEBSOCKET_PROTOCOL, WS_PROTOCOL_ACTOR, WS_PROTOCOL_TOKEN, X_RIVET_TOKEN};
use crate::{errors, shared_state::SharedState};

const ACTOR_FORCE_WAKE_PENDING_TIMEOUT: i64 = util::duration::seconds(60);
const ACTOR_READY_TIMEOUT: Duration = Duration::from_secs(10);
pub const X_RIVET_ACTOR: HeaderName = HeaderName::from_static("x-rivet-actor");

Expand Down Expand Up @@ -169,10 +170,14 @@ async fn route_request_inner(
if actor.sleeping {
tracing::debug!(?actor_id, "actor sleeping, waking");

ctx.signal(pegboard::workflows::actor::Wake {})
.to_workflow_id(actor.workflow_id)
.send()
.await?;
ctx.signal(pegboard::workflows::actor::Wake {
allocation_override: pegboard::workflows::actor::AllocationOverride::DontSleep {
pending_timeout: Some(ACTOR_FORCE_WAKE_PENDING_TIMEOUT),
},
})
.to_workflow_id(actor.workflow_id)
.send()
.await?;
}

let runner_id = if let (Some(runner_id), true) = (actor.runner_id, actor.connectable) {
Expand All @@ -193,10 +198,14 @@ async fn route_request_inner(
tracing::debug!(?actor_id, ?wake_retries, "actor stopped while we were waiting for it to become ready, attempting rewake");
wake_retries += 1;

let res = ctx.signal(pegboard::workflows::actor::Wake {})
.to_workflow_id(actor.workflow_id)
.send()
.await;
let res = ctx.signal(pegboard::workflows::actor::Wake {
allocation_override: pegboard::workflows::actor::AllocationOverride::DontSleep {
pending_timeout: Some(ACTOR_FORCE_WAKE_PENDING_TIMEOUT),
},
})
.to_workflow_id(actor.workflow_id)
.send()
.await;

if let Some(WorkflowError::WorkflowNotFound) = res
.as_ref()
Expand Down
107 changes: 58 additions & 49 deletions engine/packages/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ mod keys;
mod runtime;
mod setup;

pub use runtime::AllocationOverride;

#[derive(Clone, Debug, Serialize, Deserialize, Hash)]
pub struct Input {
pub actor_id: Id,
Expand Down Expand Up @@ -211,38 +213,39 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
.send()
.await?;

let lifecycle_state = match runtime::spawn_actor(ctx, input, 0, false).await? {
runtime::SpawnActorOutput::Allocated {
runner_id,
runner_workflow_id,
} => runtime::LifecycleState::new(
runner_id,
runner_workflow_id,
ctx.config().pegboard().actor_start_threshold(),
),
runtime::SpawnActorOutput::Sleep => {
ctx.activity(runtime::SetSleepingInput {
actor_id: input.actor_id,
})
.await?;
let lifecycle_state =
match runtime::spawn_actor(ctx, input, 0, AllocationOverride::None).await? {
runtime::SpawnActorOutput::Allocated {
runner_id,
runner_workflow_id,
} => runtime::LifecycleState::new(
runner_id,
runner_workflow_id,
ctx.config().pegboard().actor_start_threshold(),
),
runtime::SpawnActorOutput::Sleep => {
ctx.activity(runtime::SetSleepingInput {
actor_id: input.actor_id,
})
.await?;

runtime::LifecycleState::new_sleeping()
}
runtime::SpawnActorOutput::Destroy => {
// Destroyed early
ctx.workflow(destroy::Input {
namespace_id: input.namespace_id,
actor_id: input.actor_id,
name: input.name.clone(),
key: input.key.clone(),
generation: 0,
})
.output()
.await?;
runtime::LifecycleState::new_sleeping()
}
runtime::SpawnActorOutput::Destroy => {
// Destroyed early
ctx.workflow(destroy::Input {
namespace_id: input.namespace_id,
actor_id: input.actor_id,
name: input.name.clone(),
key: input.key.clone(),
generation: 0,
})
.output()
.await?;

return Ok(());
}
};
return Ok(());
}
};

let lifecycle_res = ctx
.loope(
Expand Down Expand Up @@ -273,10 +276,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
} else {
tracing::debug!(actor_id=?input.actor_id, "actor wake");

state.wake_for_alarm = true;

// Fake signal
Main::Wake(Wake {})
Main::Wake(Wake { allocation_override: AllocationOverride::DontSleep { pending_timeout: None } })
}
} else {
// Listen for signal normally
Expand Down Expand Up @@ -404,14 +405,14 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
}
}
}
Main::Wake(_sig) => {
Main::Wake(sig) => {
if state.sleeping {
if state.runner_id.is_none() {
state.alarm_ts = None;
state.sleeping = false;
state.will_wake = false;

match runtime::reschedule_actor(ctx, &input, state, false)
match runtime::reschedule_actor(ctx, &input, state, sig.allocation_override)
.await?
{
runtime::SpawnActorOutput::Allocated { .. } => {}
Expand All @@ -425,8 +426,6 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
}));
}
}

state.wake_for_alarm = false;
} else if !state.will_wake {
state.will_wake = true;

Expand All @@ -440,8 +439,6 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
actor_id=?input.actor_id,
"cannot wake actor that is not sleeping",
);

state.wake_for_alarm = false;
}
}
Main::Lost(sig) => {
Expand Down Expand Up @@ -569,16 +566,15 @@ async fn handle_stopped(
tracing::debug!(?variant, "actor stopped");

let force_reschedule = match &variant {
StoppedVariant::Normal {
code: protocol::StopCode::Ok,
} => {
StoppedVariant::Normal { code } => {
// Reset retry count on successful exit
state.reschedule_state = Default::default();
if let protocol::StopCode::Ok = code {
state.reschedule_state = Default::default();
}

false
}
StoppedVariant::Lost { force_reschedule } => *force_reschedule,
_ => false,
};

// Clear stop gc timeout to prevent being marked as lost in the lifecycle loop
Expand Down Expand Up @@ -640,7 +636,16 @@ async fn handle_stopped(

// Reschedule no matter what
if force_reschedule {
match runtime::reschedule_actor(ctx, &input, state, true).await? {
match runtime::reschedule_actor(
ctx,
&input,
state,
AllocationOverride::DontSleep {
pending_timeout: None,
},
)
.await?
{
runtime::SpawnActorOutput::Allocated { .. } => {}
// NOTE: This should be unreachable because force_reschedule is true
runtime::SpawnActorOutput::Sleep => {
Expand All @@ -667,7 +672,9 @@ async fn handle_stopped(

match (input.crash_policy, graceful_exit) {
(CrashPolicy::Restart, false) => {
match runtime::reschedule_actor(ctx, &input, state, false).await? {
match runtime::reschedule_actor(ctx, &input, state, AllocationOverride::None)
.await?
{
runtime::SpawnActorOutput::Allocated { .. } => {}
// NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash
// policy is `Restart`.
Expand Down Expand Up @@ -698,7 +705,7 @@ async fn handle_stopped(
else if state.will_wake {
state.sleeping = false;

match runtime::reschedule_actor(ctx, &input, state, false).await? {
match runtime::reschedule_actor(ctx, &input, state, AllocationOverride::None).await? {
runtime::SpawnActorOutput::Allocated { .. } => {}
runtime::SpawnActorOutput::Sleep => {
state.sleeping = true;
Expand All @@ -708,7 +715,6 @@ async fn handle_stopped(
}
}

state.wake_for_alarm = false;
state.will_wake = false;
state.going_away = false;

Expand Down Expand Up @@ -749,7 +755,10 @@ pub struct Event {
}

#[signal("pegboard_actor_wake")]
pub struct Wake {}
pub struct Wake {
#[serde(default)]
pub allocation_override: AllocationOverride,
}

#[derive(Debug)]
#[signal("pegboard_actor_lost")]
Expand Down
Loading
Loading