Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/common/gasoline/core/src/builder/common/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@
}
}

// TODO: Get rid of this
// NOTE: This is a bad implementation because it disregards other errors that may have happened earlier
pub fn bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING(mut self) -> Self {

Check warning on line 46 in packages/common/gasoline/core/src/builder/common/signal.rs

View workflow job for this annotation

GitHub Actions / Test

method `bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING` should have a snake case name
if let Some(BuilderError::CannotDispatchFromOpInWorkflow) = &self.error {
self.error = None;
}

self
}

pub fn to_workflow_id(mut self, workflow_id: Id) -> Self {
if self.error.is_some() {
return self;
Expand Down
24 changes: 17 additions & 7 deletions packages/common/gasoline/core/src/ctx/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct StandaloneCtx {
pools: rivet_pools::Pools,
cache: rivet_cache::Cache,
msg_ctx: MessageCtx,
from_workflow: bool,
}

impl StandaloneCtx {
Expand Down Expand Up @@ -61,33 +62,42 @@ impl StandaloneCtx {
pools,
cache,
msg_ctx,
from_workflow: false,
})
}

#[tracing::instrument(skip_all)]
pub fn new_from_activity(ctx: &ActivityCtx, req_id: Id) -> WorkflowResult<Self> {
StandaloneCtx::new(
let mut ctx = StandaloneCtx::new(
ctx.db().clone(),
ctx.config().clone(),
ctx.pools().clone(),
ctx.cache().clone(),
ctx.name(),
ctx.ray_id(),
req_id,
)
)?;

ctx.from_workflow = true;

Ok(ctx)
}

#[tracing::instrument(skip_all)]
pub fn new_from_operation(ctx: &OperationCtx, req_id: Id) -> WorkflowResult<Self> {
StandaloneCtx::new(
let mut ctx = StandaloneCtx::new(
ctx.db().clone(),
ctx.config().clone(),
ctx.pools().clone(),
ctx.cache().clone(),
ctx.name(),
ctx.ray_id(),
req_id,
)
)?;

ctx.from_workflow = ctx.from_workflow;

Ok(ctx)
}
}

Expand All @@ -107,7 +117,7 @@ impl StandaloneCtx {
self.config.clone(),
self.ray_id,
input,
false,
self.from_workflow,
)
}

Expand All @@ -134,7 +144,7 @@ impl StandaloneCtx {
self.config.clone(),
self.ray_id,
body,
false,
self.from_workflow,
)
}

Expand All @@ -153,7 +163,7 @@ impl StandaloneCtx {
&self.pools,
&self.cache,
self.ray_id,
false,
self.from_workflow,
input,
)
.in_current_span()
Expand Down
2 changes: 1 addition & 1 deletion packages/common/universaldb/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{future::Future, ops::Deref, pin::Pin, sync::Arc};

use anyhow::{Context, Result};
use futures_util::StreamExt;

Check failure on line 4 in packages/common/universaldb/src/transaction.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `futures_util::StreamExt`

Check warning on line 4 in packages/common/universaldb/src/transaction.rs

View workflow job for this annotation

GitHub Actions / Test

unused import: `futures_util::StreamExt`

use crate::{
driver::TransactionDriver,
Expand Down Expand Up @@ -150,7 +150,7 @@
.map_err(Into::into)
}

pub fn atomic_op<'de, T: FormalKey + TuplePack + TupleUnpack<'de>>(
pub fn atomic_op<'de, T: std::fmt::Debug + FormalKey + TuplePack + TupleUnpack<'de>>(
&self,
key: &'de T,
param: &[u8],
Expand Down
7 changes: 5 additions & 2 deletions packages/core/bootstrap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
)?;

tokio::try_join!(
setup_epoxy_coordinator(&ctx),
setup_epoxy_replica(&ctx),
async {
// Replicas must exist before coordinator
setup_epoxy_replica(&ctx).await?;
setup_epoxy_coordinator(&ctx).await
},
create_default_namespace(&ctx),
)?;

Expand Down
6 changes: 3 additions & 3 deletions packages/core/pegboard-serverless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ async fn tick(

// Log warning and reset to 0 if negative
let adjusted_desired_slots = if *desired_slots < 0 {
tracing::warn!(
tracing::error!(
?ns_id,
?runner_name,
desired_slots = ?desired_slots,
"Negative desired_slots detected, resetting to 0"
?desired_slots,
"negative desired slots, scaling to 0"
);
0
} else {
Expand Down
2 changes: 2 additions & 0 deletions packages/services/epoxy/src/replica/message_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub async fn message_request(
replica_id: req.replica_id,
status: req.status.into(),
})
.bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING()
.to_workflow::<crate::workflows::coordinator::Workflow>()
.tag("replica", replica_id)
.send()
Expand All @@ -121,6 +122,7 @@ pub async fn message_request(
ctx.signal(crate::workflows::replica::BeginLearning {
config: req.config.clone().into(),
})
.bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING()
.to_workflow::<crate::workflows::replica::Workflow>()
.tag("replica", replica_id)
.send()
Expand Down
74 changes: 54 additions & 20 deletions packages/services/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
ctx,
input.actor_id,
state.generation,
state.runner_workflow_id,
state.runner_workflow_id.context(
"should have runner_workflow_id set if sleeping",
)?,
)
.await?;
}
Expand All @@ -311,7 +313,9 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
ctx,
input.actor_id,
state.generation,
state.runner_workflow_id,
state.runner_workflow_id.context(
"should have runner_workflow_id set if stopping",
)?,
)
.await?;
}
Expand All @@ -330,7 +334,9 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
.await?;

ctx.msg(Ready {
runner_id: state.runner_id,
runner_id: state
.runner_id
.context("should have runner_id set if running")?,
})
.tag("actor_id", input.actor_id)
.send()
Expand All @@ -355,20 +361,28 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
}
}
Main::Wake(_sig) => {
// Ignore wake if we are not sleeping. This is expected to happen under certain
// circumstances.
if state.sleeping {
state.alarm_ts = None;
state.sleeping = false;

if runtime::reschedule_actor(ctx, &input, state).await? {
// Destroyed early
return Ok(Loop::Break(runtime::LifecycleRes {
generation: state.generation,
// False here because if we received the destroy signal, it is
// guaranteed that we did not allocate another actor.
kill: false,
}));
if state.runner_id.is_none() {
state.alarm_ts = None;
state.sleeping = false;
state.will_wake = false;

if runtime::reschedule_actor(ctx, &input, state).await? {
// Destroyed early
return Ok(Loop::Break(runtime::LifecycleRes {
generation: state.generation,
// False here because if we received the destroy signal, it is
// guaranteed that we did not allocate another actor.
kill: false,
}));
}
} else {
state.will_wake = true;

tracing::debug!(
actor_id=?input.actor_id,
"cannot wake an actor that intends to sleep but has not stopped yet, deferring wake until after stop",
);
}
} else {
tracing::debug!(
Expand Down Expand Up @@ -447,27 +461,30 @@ async fn handle_stopped(
) -> Result<Option<runtime::LifecycleRes>> {
tracing::debug!(?code, "actor stopped");

// Reset retry count
// Reset retry count on successful exit
if let Some(protocol::StopCode::Ok) = code {
state.reschedule_state = Default::default();
}

// Clear stop gc timeout to prevent being marked as lost in the lifecycle loop
state.gc_timeout_ts = None;
state.runner_id = None;
state.runner_workflow_id = None;

ctx.activity(runtime::DeallocateInput {
actor_id: input.actor_id,
})
.await?;

// Allocate other pending actors from queue
// Allocate other pending actors from queue since a slot has now cleared
let res = ctx
.activity(AllocatePendingActorsInput {
namespace_id: input.namespace_id,
name: input.runner_name_selector.clone(),
})
.await?;

// Dispatch pending allocs
// Dispatch pending allocs (if any)
for alloc in res.allocations {
ctx.signal(alloc.signal)
.to_workflow::<Workflow>()
Expand All @@ -476,6 +493,7 @@ async fn handle_stopped(
.await?;
}

// Handle rescheduling if not marked as sleeping
if !state.sleeping {
let failed = matches!(code, None | Some(protocol::StopCode::Error));

Expand All @@ -487,7 +505,9 @@ async fn handle_stopped(
ctx,
input.actor_id,
state.generation,
state.runner_workflow_id,
state
.runner_workflow_id
.context("should have runner_workflow_id set if not sleeping")?,
)
.await?;
}
Expand Down Expand Up @@ -531,6 +551,20 @@ async fn handle_stopped(
}
}
}
// Rewake actor immediately after stopping if `will_wake` was set
else if state.will_wake {
state.will_wake = false;

if runtime::reschedule_actor(ctx, &input, state).await? {
// Destroyed early
return Ok(Some(runtime::LifecycleRes {
generation: state.generation,
// False here because if we received the destroy signal, it is
// guaranteed that we did not allocate another actor.
kill: false,
}));
}
}

Ok(None)
}
Expand Down
28 changes: 11 additions & 17 deletions packages/services/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ use super::{
pub struct LifecycleState {
pub generation: u32,

// TODO: Make these optional? These might not match the properties in the workflow state but it shouldn't
// matter for the functionality of the lifecycle loop
pub runner_id: Id,
pub runner_workflow_id: Id,
// Set when currently running (not rescheduling or sleeping)
pub runner_id: Option<Id>,
pub runner_workflow_id: Option<Id>,

pub sleeping: bool,
#[serde(default)]
pub will_wake: bool,
pub alarm_ts: Option<i64>,
pub gc_timeout_ts: Option<i64>,

Expand All @@ -36,9 +37,10 @@ impl LifecycleState {
pub fn new(runner_id: Id, runner_workflow_id: Id) -> Self {
LifecycleState {
generation: 0,
runner_id,
runner_workflow_id,
runner_id: Some(runner_id),
runner_workflow_id: Some(runner_workflow_id),
sleeping: false,
will_wake: false,
alarm_ts: None,
gc_timeout_ts: Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS),
reschedule_state: RescheduleState::default(),
Expand Down Expand Up @@ -352,6 +354,7 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<()
tx.delete(&keys::actor::ConnectableKey::new(input.actor_id));

if let Some(runner_id) = runner_id {
// Only clear slot if we have a runner id
destroy::clear_slot(
input.actor_id,
namespace_id,
Expand All @@ -361,15 +364,6 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<()
&tx,
)
.await?;
} else if for_serverless {
tx.atomic_op(
&rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new(
namespace_id,
runner_name_selector.clone(),
),
&(-1i64).to_le_bytes(),
MutationType::Add,
);
}

Ok(())
Expand Down Expand Up @@ -551,8 +545,8 @@ pub async fn reschedule_actor(
// Update loop state
if let Some((reschedule_state, res)) = res {
state.generation = next_generation;
state.runner_id = res.runner_id;
state.runner_workflow_id = res.runner_workflow_id;
state.runner_id = Some(res.runner_id);
state.runner_workflow_id = Some(res.runner_workflow_id);

// Save reschedule state in global state
state.reschedule_state = reschedule_state;
Expand Down
8 changes: 4 additions & 4 deletions scripts/api/add-serverless.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ if (!rivetToken) {

const endpoint =
process.env.RIVET_ENDPOINT ||
(await rl.question("Rivet Endpoint (default: https://api.rivet.gg): ")) ||
"https://api.rivet.gg";
(await rl.question("Rivet Endpoint (default: http://localhost:6420): ")) ||
"http://localhost:6420";
const namespace =
(await rl.question("Namespace (default: default): ")) || "default";
const runnerName =
(await rl.question("Runner name (default: serverless): ")) || "serverless";
const serverlessUrl =
(await rl.question(
"Serverless URL (default: http://localhost:8080/api/start): ",
)) || "http://localhost:8080/api/start";
"Serverless URL (default: http://localhost:3000/api/rivet/start): ",
)) || "http://localhost:3000/api/rivet/start";

rl.close();

Expand Down
4 changes: 2 additions & 2 deletions scripts/api/delete-run-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ if (!rivetToken) {

const endpoint =
process.env.RIVET_ENDPOINT ||
(await rl.question("Rivet Endpoint (default: https://api.rivet.gg): ")) ||
"https://api.rivet.gg";
(await rl.question("Rivet Endpoint (default: http://localhost:6420): ")) ||
"http://localhost:6420";
const namespace =
(await rl.question("Namespace (default: default): ")) || "default";
const runnerName = await rl.question("Runner name to delete: ");
Expand Down
Loading
Loading