diff --git a/packages/core/guard/server/src/routing/pegboard_gateway.rs b/packages/core/guard/server/src/routing/pegboard_gateway.rs index 4a23425160..4718468e51 100644 --- a/packages/core/guard/server/src/routing/pegboard_gateway.rs +++ b/packages/core/guard/server/src/routing/pegboard_gateway.rs @@ -4,7 +4,6 @@ use anyhow::Result; use gas::prelude::*; use hyper::header::HeaderName; use rivet_guard_core::proxy_service::{RouteConfig, RouteTarget, RoutingOutput, RoutingTimeout}; -use universaldb::utils::IsolationLevel::*; use super::SEC_WEBSOCKET_PROTOCOL; use crate::{errors, shared_state::SharedState}; @@ -93,26 +92,6 @@ pub async fn route_request( }))); } - // Lookup actor - find_actor(ctx, shared_state, actor_id, path).await -} - -struct FoundActor { - workflow_id: Id, - sleeping: bool, - destroyed: bool, -} - -/// Find an actor by actor_id -#[tracing::instrument(skip_all, fields(%actor_id, %path))] -async fn find_actor( - ctx: &StandaloneCtx, - shared_state: &SharedState, - actor_id: Id, - path: &str, -) -> Result> { - // TODO: Optimize this down to a single FDB call - // Create subs before checking if actor exists/is not destroyed let mut ready_sub = ctx .subscribe::(("actor_id", actor_id)) @@ -124,37 +103,11 @@ async fn find_actor( .subscribe::(("actor_id", actor_id)) .await?; - let actor_res = tokio::time::timeout( - Duration::from_secs(5), - ctx.udb()? - .run(|tx| async move { - let tx = tx.with_subspace(pegboard::keys::subspace()); - - let workflow_id_key = pegboard::keys::actor::WorkflowIdKey::new(actor_id); - let sleep_ts_key = pegboard::keys::actor::SleepTsKey::new(actor_id); - let destroy_ts_key = pegboard::keys::actor::DestroyTsKey::new(actor_id); - - let (workflow_id_entry, sleeping, destroyed) = tokio::try_join!( - tx.read_opt(&workflow_id_key, Serializable), - tx.exists(&sleep_ts_key, Serializable), - tx.exists(&destroy_ts_key, Serializable), - )?; - - let Some(workflow_id) = workflow_id_entry else { - return Ok(None); - }; - - Ok(Some(FoundActor { - workflow_id, - sleeping, - destroyed, - })) - }) - .custom_instrument(tracing::info_span!("actor_exists_tx")), - ) - .await??; - - let Some(actor) = actor_res else { + // Fetch actor info + let Some(actor) = ctx + .op(pegboard::ops::actor::get_for_gateway::Input { actor_id }) + .await? + else { return Err(errors::ActorNotFound { actor_id }.build()); }; @@ -170,15 +123,8 @@ async fn find_actor( .await?; } - // Check if actor is connectable and get runner_id - let get_runner_fut = ctx.op(pegboard::ops::actor::get_runner::Input { - actor_ids: vec![actor_id], - }); - let res = tokio::time::timeout(Duration::from_secs(5), get_runner_fut).await??; - let actor = res.actors.into_iter().next().filter(|x| x.is_connectable); - - let runner_id = if let Some(actor) = actor { - actor.runner_id + let runner_id = if let (Some(runner_id), true) = (actor.runner_id, actor.connectable) { + runner_id } else { tracing::debug!(?actor_id, "waiting for actor to become ready"); diff --git a/packages/services/pegboard/src/keys/actor.rs b/packages/services/pegboard/src/keys/actor.rs index e85d5fc07a..ef2563e9ed 100644 --- a/packages/services/pegboard/src/keys/actor.rs +++ b/packages/services/pegboard/src/keys/actor.rs @@ -269,3 +269,47 @@ impl<'de> TupleUnpack<'de> for DestroyTsKey { Ok((input, v)) } } + +#[derive(Debug)] +pub struct NamespaceIdKey { + actor_id: Id, +} + +impl NamespaceIdKey { + pub fn new(actor_id: Id) -> Self { + NamespaceIdKey { actor_id } + } +} + +impl FormalKey for NamespaceIdKey { + type Value = Id; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(Id::from_slice(raw)?) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.as_bytes()) + } +} + +impl TuplePack for NamespaceIdKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = (ACTOR, DATA, self.actor_id, NAMESPACE_ID); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for NamespaceIdKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, actor_id, _)) = <(usize, usize, Id, usize)>::unpack(input, tuple_depth)?; + + let v = NamespaceIdKey { actor_id }; + + Ok((input, v)) + } +} diff --git a/packages/services/pegboard/src/ops/actor/get_for_gateway.rs b/packages/services/pegboard/src/ops/actor/get_for_gateway.rs new file mode 100644 index 0000000000..b63acbe817 --- /dev/null +++ b/packages/services/pegboard/src/ops/actor/get_for_gateway.rs @@ -0,0 +1,70 @@ +use gas::prelude::*; +use universaldb::utils::IsolationLevel::*; + +use crate::keys; + +#[derive(Debug)] +pub struct Input { + pub actor_id: Id, +} + +#[derive(Debug)] +pub struct Output { + pub namespace_id: Id, + pub workflow_id: Id, + pub sleeping: bool, + pub destroyed: bool, + pub connectable: bool, + pub runner_id: Option, +} + +#[operation] +#[timeout = 5] +pub async fn pegboard_actor_get_for_gateway( + ctx: &OperationCtx, + input: &Input, +) -> Result> { + ctx.udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + + let namespace_id_key = keys::actor::NamespaceIdKey::new(input.actor_id); + let workflow_id_key = keys::actor::WorkflowIdKey::new(input.actor_id); + let sleep_ts_key = keys::actor::SleepTsKey::new(input.actor_id); + let destroy_ts_key = keys::actor::DestroyTsKey::new(input.actor_id); + let connectable_key = keys::actor::ConnectableKey::new(input.actor_id); + let runner_id_key = keys::actor::RunnerIdKey::new(input.actor_id); + + let ( + namespace_id_entry, + workflow_id_entry, + sleeping, + destroyed, + connectable, + runner_id, + ) = tokio::try_join!( + tx.read_opt(&namespace_id_key, Serializable), + tx.read_opt(&workflow_id_key, Serializable), + tx.exists(&sleep_ts_key, Serializable), + tx.exists(&destroy_ts_key, Serializable), + tx.exists(&connectable_key, Serializable), + tx.read_opt(&runner_id_key, Serializable), + )?; + + let (Some(namespace_id), Some(workflow_id)) = (namespace_id_entry, workflow_id_entry) + else { + return Ok(None); + }; + + Ok(Some(Output { + namespace_id, + workflow_id, + sleeping, + destroyed, + connectable, + runner_id, + })) + }) + .custom_instrument(tracing::info_span!("actor_get_for_gateway_tx")) + .await +} diff --git a/packages/services/pegboard/src/ops/actor/mod.rs b/packages/services/pegboard/src/ops/actor/mod.rs index 45e7391f9a..fec99d5eee 100644 --- a/packages/services/pegboard/src/ops/actor/mod.rs +++ b/packages/services/pegboard/src/ops/actor/mod.rs @@ -1,5 +1,6 @@ pub mod create; pub mod get; +pub mod get_for_gateway; pub mod get_for_key; pub mod get_reservation_for_key; pub mod get_runner; diff --git a/packages/services/pegboard/src/workflows/actor/setup.rs b/packages/services/pegboard/src/workflows/actor/setup.rs index 9ac87beeb2..313bc3b369 100644 --- a/packages/services/pegboard/src/workflows/actor/setup.rs +++ b/packages/services/pegboard/src/workflows/actor/setup.rs @@ -94,6 +94,10 @@ pub async fn insert_state_and_db(ctx: &ActivityCtx, input: &InitStateAndUdbInput &keys::actor::WorkflowIdKey::new(input.actor_id), ctx.workflow_id(), )?; + tx.write( + &keys::actor::NamespaceIdKey::new(input.actor_id), + input.namespace_id, + )?; Ok(()) })