diff --git a/packages/common/gasoline/core/src/builder/common/signal.rs b/packages/common/gasoline/core/src/builder/common/signal.rs index 7fedb6e7c4..7d601ab861 100644 --- a/packages/common/gasoline/core/src/builder/common/signal.rs +++ b/packages/common/gasoline/core/src/builder/common/signal.rs @@ -41,6 +41,16 @@ impl SignalBuilder { } } + // TODO: Get rid of this + // NOTE: This is a bad implementation because it disregards other errors that may have happened earlier + pub fn bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING(mut self) -> Self { + if let Some(BuilderError::CannotDispatchFromOpInWorkflow) = &self.error { + self.error = None; + } + + self + } + pub fn to_workflow_id(mut self, workflow_id: Id) -> Self { if self.error.is_some() { return self; diff --git a/packages/common/gasoline/core/src/ctx/standalone.rs b/packages/common/gasoline/core/src/ctx/standalone.rs index 089afc7428..25e08c796e 100644 --- a/packages/common/gasoline/core/src/ctx/standalone.rs +++ b/packages/common/gasoline/core/src/ctx/standalone.rs @@ -30,6 +30,7 @@ pub struct StandaloneCtx { pools: rivet_pools::Pools, cache: rivet_cache::Cache, msg_ctx: MessageCtx, + from_workflow: bool, } impl StandaloneCtx { @@ -61,12 +62,13 @@ impl StandaloneCtx { pools, cache, msg_ctx, + from_workflow: false, }) } #[tracing::instrument(skip_all)] pub fn new_from_activity(ctx: &ActivityCtx, req_id: Id) -> WorkflowResult { - StandaloneCtx::new( + let mut ctx = StandaloneCtx::new( ctx.db().clone(), ctx.config().clone(), ctx.pools().clone(), @@ -74,12 +76,16 @@ impl StandaloneCtx { ctx.name(), ctx.ray_id(), req_id, - ) + )?; + + ctx.from_workflow = true; + + Ok(ctx) } #[tracing::instrument(skip_all)] pub fn new_from_operation(ctx: &OperationCtx, req_id: Id) -> WorkflowResult { - StandaloneCtx::new( + let mut ctx = StandaloneCtx::new( ctx.db().clone(), ctx.config().clone(), ctx.pools().clone(), @@ -87,7 +93,11 @@ impl StandaloneCtx { ctx.name(), ctx.ray_id(), req_id, - ) + )?; + + ctx.from_workflow = ctx.from_workflow; + + Ok(ctx) } } @@ -107,7 +117,7 @@ impl StandaloneCtx { self.config.clone(), self.ray_id, input, - false, + self.from_workflow, ) } @@ -134,7 +144,7 @@ impl StandaloneCtx { self.config.clone(), self.ray_id, body, - false, + self.from_workflow, ) } @@ -153,7 +163,7 @@ impl StandaloneCtx { &self.pools, &self.cache, self.ray_id, - false, + self.from_workflow, input, ) .in_current_span() diff --git a/packages/common/universaldb/src/transaction.rs b/packages/common/universaldb/src/transaction.rs index 7b63e90f0f..172284ed57 100644 --- a/packages/common/universaldb/src/transaction.rs +++ b/packages/common/universaldb/src/transaction.rs @@ -150,7 +150,7 @@ impl Transaction { .map_err(Into::into) } - pub fn atomic_op<'de, T: FormalKey + TuplePack + TupleUnpack<'de>>( + pub fn atomic_op<'de, T: std::fmt::Debug + FormalKey + TuplePack + TupleUnpack<'de>>( &self, key: &'de T, param: &[u8], diff --git a/packages/core/bootstrap/src/lib.rs b/packages/core/bootstrap/src/lib.rs index f2684557b6..35088c2d30 100644 --- a/packages/core/bootstrap/src/lib.rs +++ b/packages/core/bootstrap/src/lib.rs @@ -13,8 +13,11 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R )?; tokio::try_join!( - setup_epoxy_coordinator(&ctx), - setup_epoxy_replica(&ctx), + async { + // Replicas must exist before coordinator + setup_epoxy_replica(&ctx).await?; + setup_epoxy_coordinator(&ctx).await + }, create_default_namespace(&ctx), )?; diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index a1a57d8cb5..6be3b778b2 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -145,11 +145,11 @@ async fn tick( // Log warning and reset to 0 if negative let adjusted_desired_slots = if *desired_slots < 0 { - tracing::warn!( + tracing::error!( ?ns_id, ?runner_name, - desired_slots = ?desired_slots, - "Negative desired_slots detected, resetting to 0" + ?desired_slots, + "negative desired slots, scaling to 0" ); 0 } else { diff --git a/packages/services/epoxy/src/replica/message_request.rs b/packages/services/epoxy/src/replica/message_request.rs index f3ca78426a..518df80bb6 100644 --- a/packages/services/epoxy/src/replica/message_request.rs +++ b/packages/services/epoxy/src/replica/message_request.rs @@ -104,6 +104,7 @@ pub async fn message_request( replica_id: req.replica_id, status: req.status.into(), }) + .bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING() .to_workflow::() .tag("replica", replica_id) .send() @@ -121,6 +122,7 @@ pub async fn message_request( ctx.signal(crate::workflows::replica::BeginLearning { config: req.config.clone().into(), }) + .bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING() .to_workflow::() .tag("replica", replica_id) .send() diff --git a/packages/services/pegboard/src/workflows/actor/mod.rs b/packages/services/pegboard/src/workflows/actor/mod.rs index 8a0153288e..494129398b 100644 --- a/packages/services/pegboard/src/workflows/actor/mod.rs +++ b/packages/services/pegboard/src/workflows/actor/mod.rs @@ -294,7 +294,9 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> ctx, input.actor_id, state.generation, - state.runner_workflow_id, + state.runner_workflow_id.context( + "should have runner_workflow_id set if sleeping", + )?, ) .await?; } @@ -311,7 +313,9 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> ctx, input.actor_id, state.generation, - state.runner_workflow_id, + state.runner_workflow_id.context( + "should have runner_workflow_id set if stopping", + )?, ) .await?; } @@ -330,7 +334,9 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .await?; ctx.msg(Ready { - runner_id: state.runner_id, + runner_id: state + .runner_id + .context("should have runner_id set if running")?, }) .tag("actor_id", input.actor_id) .send() @@ -355,20 +361,28 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> } } Main::Wake(_sig) => { - // Ignore wake if we are not sleeping. This is expected to happen under certain - // circumstances. if state.sleeping { - state.alarm_ts = None; - state.sleeping = false; - - if runtime::reschedule_actor(ctx, &input, state).await? { - // Destroyed early - return Ok(Loop::Break(runtime::LifecycleRes { - generation: state.generation, - // False here because if we received the destroy signal, it is - // guaranteed that we did not allocate another actor. - kill: false, - })); + if state.runner_id.is_none() { + state.alarm_ts = None; + state.sleeping = false; + state.will_wake = false; + + if runtime::reschedule_actor(ctx, &input, state).await? { + // Destroyed early + return Ok(Loop::Break(runtime::LifecycleRes { + generation: state.generation, + // False here because if we received the destroy signal, it is + // guaranteed that we did not allocate another actor. + kill: false, + })); + } + } else { + state.will_wake = true; + + tracing::debug!( + actor_id=?input.actor_id, + "cannot wake an actor that intends to sleep but has not stopped yet, deferring wake until after stop", + ); } } else { tracing::debug!( @@ -447,19 +461,22 @@ async fn handle_stopped( ) -> Result> { tracing::debug!(?code, "actor stopped"); - // Reset retry count + // Reset retry count on successful exit if let Some(protocol::StopCode::Ok) = code { state.reschedule_state = Default::default(); } + // Clear stop gc timeout to prevent being marked as lost in the lifecycle loop state.gc_timeout_ts = None; + state.runner_id = None; + state.runner_workflow_id = None; ctx.activity(runtime::DeallocateInput { actor_id: input.actor_id, }) .await?; - // Allocate other pending actors from queue + // Allocate other pending actors from queue since a slot has now cleared let res = ctx .activity(AllocatePendingActorsInput { namespace_id: input.namespace_id, @@ -467,7 +484,7 @@ async fn handle_stopped( }) .await?; - // Dispatch pending allocs + // Dispatch pending allocs (if any) for alloc in res.allocations { ctx.signal(alloc.signal) .to_workflow::() @@ -476,6 +493,7 @@ async fn handle_stopped( .await?; } + // Handle rescheduling if not marked as sleeping if !state.sleeping { let failed = matches!(code, None | Some(protocol::StopCode::Error)); @@ -487,7 +505,9 @@ async fn handle_stopped( ctx, input.actor_id, state.generation, - state.runner_workflow_id, + state + .runner_workflow_id + .context("should have runner_workflow_id set if not sleeping")?, ) .await?; } @@ -531,6 +551,20 @@ async fn handle_stopped( } } } + // Rewake actor immediately after stopping if `will_wake` was set + else if state.will_wake { + state.will_wake = false; + + if runtime::reschedule_actor(ctx, &input, state).await? { + // Destroyed early + return Ok(Some(runtime::LifecycleRes { + generation: state.generation, + // False here because if we received the destroy signal, it is + // guaranteed that we did not allocate another actor. + kill: false, + })); + } + } Ok(None) } diff --git a/packages/services/pegboard/src/workflows/actor/runtime.rs b/packages/services/pegboard/src/workflows/actor/runtime.rs index b9228919af..c31556b133 100644 --- a/packages/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/services/pegboard/src/workflows/actor/runtime.rs @@ -20,12 +20,13 @@ use super::{ pub struct LifecycleState { pub generation: u32, - // TODO: Make these optional? These might not match the properties in the workflow state but it shouldn't - // matter for the functionality of the lifecycle loop - pub runner_id: Id, - pub runner_workflow_id: Id, + // Set when currently running (not rescheduling or sleeping) + pub runner_id: Option, + pub runner_workflow_id: Option, pub sleeping: bool, + #[serde(default)] + pub will_wake: bool, pub alarm_ts: Option, pub gc_timeout_ts: Option, @@ -36,9 +37,10 @@ impl LifecycleState { pub fn new(runner_id: Id, runner_workflow_id: Id) -> Self { LifecycleState { generation: 0, - runner_id, - runner_workflow_id, + runner_id: Some(runner_id), + runner_workflow_id: Some(runner_workflow_id), sleeping: false, + will_wake: false, alarm_ts: None, gc_timeout_ts: Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS), reschedule_state: RescheduleState::default(), @@ -352,6 +354,7 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<() tx.delete(&keys::actor::ConnectableKey::new(input.actor_id)); if let Some(runner_id) = runner_id { + // Only clear slot if we have a runner id destroy::clear_slot( input.actor_id, namespace_id, @@ -361,15 +364,6 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<() &tx, ) .await?; - } else if for_serverless { - tx.atomic_op( - &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new( - namespace_id, - runner_name_selector.clone(), - ), - &(-1i64).to_le_bytes(), - MutationType::Add, - ); } Ok(()) @@ -551,8 +545,8 @@ pub async fn reschedule_actor( // Update loop state if let Some((reschedule_state, res)) = res { state.generation = next_generation; - state.runner_id = res.runner_id; - state.runner_workflow_id = res.runner_workflow_id; + state.runner_id = Some(res.runner_id); + state.runner_workflow_id = Some(res.runner_workflow_id); // Save reschedule state in global state state.reschedule_state = reschedule_state; diff --git a/scripts/api/add-serverless.ts b/scripts/api/add-serverless.ts index 8556009bf7..ff384bd8db 100755 --- a/scripts/api/add-serverless.ts +++ b/scripts/api/add-serverless.ts @@ -15,16 +15,16 @@ if (!rivetToken) { const endpoint = process.env.RIVET_ENDPOINT || - (await rl.question("Rivet Endpoint (default: https://api.rivet.gg): ")) || - "https://api.rivet.gg"; + (await rl.question("Rivet Endpoint (default: http://localhost:6420): ")) || + "http://localhost:6420"; const namespace = (await rl.question("Namespace (default: default): ")) || "default"; const runnerName = (await rl.question("Runner name (default: serverless): ")) || "serverless"; const serverlessUrl = (await rl.question( - "Serverless URL (default: http://localhost:8080/api/start): ", - )) || "http://localhost:8080/api/start"; + "Serverless URL (default: http://localhost:3000/api/rivet/start): ", + )) || "http://localhost:3000/api/rivet/start"; rl.close(); diff --git a/scripts/api/delete-run-config.ts b/scripts/api/delete-run-config.ts index b0ff49c389..3a53c4f109 100755 --- a/scripts/api/delete-run-config.ts +++ b/scripts/api/delete-run-config.ts @@ -15,8 +15,8 @@ if (!rivetToken) { const endpoint = process.env.RIVET_ENDPOINT || - (await rl.question("Rivet Endpoint (default: https://api.rivet.gg): ")) || - "https://api.rivet.gg"; + (await rl.question("Rivet Endpoint (default: http://localhost:6420): ")) || + "http://localhost:6420"; const namespace = (await rl.question("Namespace (default: default): ")) || "default"; const runnerName = await rl.question("Runner name to delete: "); diff --git a/scripts/api/list-run-config.ts b/scripts/api/list-run-config.ts index 9a82ede8ed..27f3c147f7 100755 --- a/scripts/api/list-run-config.ts +++ b/scripts/api/list-run-config.ts @@ -15,8 +15,8 @@ if (!rivetToken) { const endpoint = process.env.RIVET_ENDPOINT || - (await rl.question("Rivet Endpoint (default: https://api.rivet.gg): ")) || - "https://api.rivet.gg"; + (await rl.question("Rivet Endpoint (default: http://localhost:6420): ")) || + "http://localhost:6420"; const namespace = (await rl.question("Namespace (default: default): ")) || "default"; diff --git a/scripts/api/list-runners.ts b/scripts/api/list-runners.ts index 527ab19609..129ddb43c9 100755 --- a/scripts/api/list-runners.ts +++ b/scripts/api/list-runners.ts @@ -15,8 +15,8 @@ if (!rivetToken) { const endpoint = process.env.RIVET_ENDPOINT || - (await rl.question("Rivet Endpoint (default: https://api.rivet.gg): ")) || - "https://api.rivet.gg"; + (await rl.question("Rivet Endpoint (default: http://localhost:6420): ")) || + "http://localhost:6420"; const namespace = (await rl.question("Namespace (default: default): ")) || "default";