Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 7 additions & 61 deletions packages/core/guard/server/src/routing/pegboard_gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use anyhow::Result;
use gas::prelude::*;
use hyper::header::HeaderName;
use rivet_guard_core::proxy_service::{RouteConfig, RouteTarget, RoutingOutput, RoutingTimeout};
use universaldb::utils::IsolationLevel::*;

use super::SEC_WEBSOCKET_PROTOCOL;
use crate::{errors, shared_state::SharedState};
Expand Down Expand Up @@ -93,26 +92,6 @@ pub async fn route_request(
})));
}

// Lookup actor
find_actor(ctx, shared_state, actor_id, path).await
}

struct FoundActor {
workflow_id: Id,
sleeping: bool,
destroyed: bool,
}

/// Find an actor by actor_id
#[tracing::instrument(skip_all, fields(%actor_id, %path))]
async fn find_actor(
ctx: &StandaloneCtx,
shared_state: &SharedState,
actor_id: Id,
path: &str,
) -> Result<Option<RoutingOutput>> {
// TODO: Optimize this down to a single FDB call

// Create subs before checking if actor exists/is not destroyed
let mut ready_sub = ctx
.subscribe::<pegboard::workflows::actor::Ready>(("actor_id", actor_id))
Expand All @@ -124,37 +103,11 @@ async fn find_actor(
.subscribe::<pegboard::workflows::actor::DestroyStarted>(("actor_id", actor_id))
.await?;

let actor_res = tokio::time::timeout(
Duration::from_secs(5),
ctx.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(pegboard::keys::subspace());

let workflow_id_key = pegboard::keys::actor::WorkflowIdKey::new(actor_id);
let sleep_ts_key = pegboard::keys::actor::SleepTsKey::new(actor_id);
let destroy_ts_key = pegboard::keys::actor::DestroyTsKey::new(actor_id);

let (workflow_id_entry, sleeping, destroyed) = tokio::try_join!(
tx.read_opt(&workflow_id_key, Serializable),
tx.exists(&sleep_ts_key, Serializable),
tx.exists(&destroy_ts_key, Serializable),
)?;

let Some(workflow_id) = workflow_id_entry else {
return Ok(None);
};

Ok(Some(FoundActor {
workflow_id,
sleeping,
destroyed,
}))
})
.custom_instrument(tracing::info_span!("actor_exists_tx")),
)
.await??;

let Some(actor) = actor_res else {
// Fetch actor info
let Some(actor) = ctx
.op(pegboard::ops::actor::get_for_gateway::Input { actor_id })
.await?
else {
return Err(errors::ActorNotFound { actor_id }.build());
};

Expand All @@ -170,15 +123,8 @@ async fn find_actor(
.await?;
}

// Check if actor is connectable and get runner_id
let get_runner_fut = ctx.op(pegboard::ops::actor::get_runner::Input {
actor_ids: vec![actor_id],
});
let res = tokio::time::timeout(Duration::from_secs(5), get_runner_fut).await??;
let actor = res.actors.into_iter().next().filter(|x| x.is_connectable);

let runner_id = if let Some(actor) = actor {
actor.runner_id
let runner_id = if let (Some(runner_id), true) = (actor.runner_id, actor.connectable) {
runner_id
} else {
tracing::debug!(?actor_id, "waiting for actor to become ready");

Expand Down
44 changes: 44 additions & 0 deletions packages/services/pegboard/src/keys/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,47 @@ impl<'de> TupleUnpack<'de> for DestroyTsKey {
Ok((input, v))
}
}

#[derive(Debug)]
pub struct NamespaceIdKey {
actor_id: Id,
}

impl NamespaceIdKey {
pub fn new(actor_id: Id) -> Self {
NamespaceIdKey { actor_id }
}
}

impl FormalKey for NamespaceIdKey {
type Value = Id;

fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
Ok(Id::from_slice(raw)?)
}

fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
Ok(value.as_bytes())
}
}

impl TuplePack for NamespaceIdKey {
fn pack<W: std::io::Write>(
&self,
w: &mut W,
tuple_depth: TupleDepth,
) -> std::io::Result<VersionstampOffset> {
let t = (ACTOR, DATA, self.actor_id, NAMESPACE_ID);
t.pack(w, tuple_depth)
}
}

impl<'de> TupleUnpack<'de> for NamespaceIdKey {
fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> {
let (input, (_, _, actor_id, _)) = <(usize, usize, Id, usize)>::unpack(input, tuple_depth)?;

let v = NamespaceIdKey { actor_id };

Ok((input, v))
}
}
70 changes: 70 additions & 0 deletions packages/services/pegboard/src/ops/actor/get_for_gateway.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use gas::prelude::*;
use universaldb::utils::IsolationLevel::*;

use crate::keys;

#[derive(Debug)]
pub struct Input {
pub actor_id: Id,
}

#[derive(Debug)]
pub struct Output {
pub namespace_id: Id,
pub workflow_id: Id,
pub sleeping: bool,
pub destroyed: bool,
pub connectable: bool,
pub runner_id: Option<Id>,
}

#[operation]
#[timeout = 5]
pub async fn pegboard_actor_get_for_gateway(
ctx: &OperationCtx,
input: &Input,
) -> Result<Option<Output>> {
ctx.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());

let namespace_id_key = keys::actor::NamespaceIdKey::new(input.actor_id);
let workflow_id_key = keys::actor::WorkflowIdKey::new(input.actor_id);
let sleep_ts_key = keys::actor::SleepTsKey::new(input.actor_id);
let destroy_ts_key = keys::actor::DestroyTsKey::new(input.actor_id);
let connectable_key = keys::actor::ConnectableKey::new(input.actor_id);
let runner_id_key = keys::actor::RunnerIdKey::new(input.actor_id);

let (
namespace_id_entry,
workflow_id_entry,
sleeping,
destroyed,
connectable,
runner_id,
) = tokio::try_join!(
tx.read_opt(&namespace_id_key, Serializable),
tx.read_opt(&workflow_id_key, Serializable),
tx.exists(&sleep_ts_key, Serializable),
tx.exists(&destroy_ts_key, Serializable),
tx.exists(&connectable_key, Serializable),
tx.read_opt(&runner_id_key, Serializable),
)?;

let (Some(namespace_id), Some(workflow_id)) = (namespace_id_entry, workflow_id_entry)
else {
return Ok(None);
};

Ok(Some(Output {
namespace_id,
workflow_id,
sleeping,
destroyed,
connectable,
runner_id,
}))
})
.custom_instrument(tracing::info_span!("actor_get_for_gateway_tx"))
.await
}
1 change: 1 addition & 0 deletions packages/services/pegboard/src/ops/actor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod create;
pub mod get;
pub mod get_for_gateway;
pub mod get_for_key;
pub mod get_reservation_for_key;
pub mod get_runner;
Expand Down
4 changes: 4 additions & 0 deletions packages/services/pegboard/src/workflows/actor/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ pub async fn insert_state_and_db(ctx: &ActivityCtx, input: &InitStateAndUdbInput
&keys::actor::WorkflowIdKey::new(input.actor_id),
ctx.workflow_id(),
)?;
tx.write(
&keys::actor::NamespaceIdKey::new(input.actor_id),
input.namespace_id,
)?;

Ok(())
})
Expand Down
Loading