diff --git a/Cargo.lock b/Cargo.lock index 11bd2269ac..8e5a6b3311 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3357,6 +3357,7 @@ dependencies = [ "serde_bare", "serde_json", "strum", + "test-snapshot-gen", "tokio", "tracing", "tracing-subscriber", @@ -6135,11 +6136,14 @@ dependencies = [ "epoxy", "epoxy-protocol", "gasoline", + "namespace", + "pegboard", "portpicker", "rivet-api-builder", "rivet-config", "rivet-pools", "rivet-test-deps", + "rivet-types", "rivet-util", "serde", "serde_json", diff --git a/engine/packages/epoxy/src/consts.rs b/engine/packages/epoxy/src/consts.rs index fbead8e92f..1fa489080e 100644 --- a/engine/packages/epoxy/src/consts.rs +++ b/engine/packages/epoxy/src/consts.rs @@ -8,4 +8,3 @@ pub const REQUEST_TIMEOUT: Duration = Duration::from_secs(10); /// This keeps learner range reads bounded while still making steady progress through the /// immutable per-key commit history. pub const CHANGELOG_READ_COUNT: u64 = 5_000; - diff --git a/engine/packages/epoxy/src/http_routes.rs b/engine/packages/epoxy/src/http_routes.rs index 58dda80696..2cc9e24f06 100644 --- a/engine/packages/epoxy/src/http_routes.rs +++ b/engine/packages/epoxy/src/http_routes.rs @@ -16,7 +16,10 @@ pub fn mount_routes( ) -> axum::Router { router .route("/v{version}/epoxy/message", bin::post(message)) - .route("/v{version}/epoxy/changelog-read", bin::post(changelog_read)) + .route( + "/v{version}/epoxy/changelog-read", + bin::post(changelog_read), + ) } pub async fn message(ctx: ApiCtx, path: ProtocolPath, _query: (), body: Bytes) -> Result> { @@ -66,7 +69,9 @@ fn request_kind_label(kind: &protocol::RequestKind) -> &'static str { protocol::RequestKind::CommitRequest(_) => "commit", protocol::RequestKind::ChangelogReadRequest(_) => "changelog_read", protocol::RequestKind::HealthCheckRequest => "health_check", - protocol::RequestKind::CoordinatorUpdateReplicaStatusRequest(_) => "coordinator_update_replica_status", + protocol::RequestKind::CoordinatorUpdateReplicaStatusRequest(_) => { + "coordinator_update_replica_status" + } protocol::RequestKind::BeginLearningRequest(_) => "begin_learning", protocol::RequestKind::KvGetRequest(_) => "kv_get", protocol::RequestKind::KvPurgeCacheRequest(_) => "kv_purge_cache", diff --git a/engine/packages/epoxy/src/keys/keys.rs b/engine/packages/epoxy/src/keys/keys.rs index 35230892ab..7caa0c9598 100644 --- a/engine/packages/epoxy/src/keys/keys.rs +++ b/engine/packages/epoxy/src/keys/keys.rs @@ -336,8 +336,7 @@ impl TuplePack for ChangelogKey { impl<'de> TupleUnpack<'de> for ChangelogKey { fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { - let (input, (root, versionstamp)) = - <(usize, Versionstamp)>::unpack(input, tuple_depth)?; + let (input, (root, versionstamp)) = <(usize, Versionstamp)>::unpack(input, tuple_depth)?; if root != CHANGELOG { return Err(PackError::Message("expected CHANGELOG root".into())); } @@ -347,4 +346,3 @@ impl<'de> TupleUnpack<'de> for ChangelogKey { Ok((input, v)) } } - diff --git a/engine/packages/epoxy/src/metrics.rs b/engine/packages/epoxy/src/metrics.rs index d44f298f15..65fe312832 100644 --- a/engine/packages/epoxy/src/metrics.rs +++ b/engine/packages/epoxy/src/metrics.rs @@ -96,7 +96,9 @@ pub fn record_changelog_append() { } pub fn record_request(request_type: &str, result: &str, duration: std::time::Duration) { - REQUEST_TOTAL.with_label_values(&[request_type, result]).inc(); + REQUEST_TOTAL + .with_label_values(&[request_type, result]) + .inc(); REQUEST_DURATION .with_label_values(&[request_type]) .observe(duration.as_secs_f64()); diff --git a/engine/packages/epoxy/src/ops/kv/get_local.rs b/engine/packages/epoxy/src/ops/kv/get_local.rs index ce86d043f6..1c9ec16d17 100644 --- a/engine/packages/epoxy/src/ops/kv/get_local.rs +++ b/engine/packages/epoxy/src/ops/kv/get_local.rs @@ -19,14 +19,10 @@ pub struct Output { #[operation] pub async fn epoxy_kv_get_local(ctx: &OperationCtx, input: &Input) -> Result { - let committed_value = read_value::read_local_value( - ctx, - input.replica_id, - input.key.clone(), - false, - ) - .await? - .value; + let committed_value = + read_value::read_local_value(ctx, input.replica_id, input.key.clone(), false) + .await? + .value; Ok(Output { value: committed_value.as_ref().map(|value| value.value.clone()), diff --git a/engine/packages/epoxy/src/ops/kv/get_optimistic.rs b/engine/packages/epoxy/src/ops/kv/get_optimistic.rs index 0607607267..fe0b9a67af 100644 --- a/engine/packages/epoxy/src/ops/kv/get_optimistic.rs +++ b/engine/packages/epoxy/src/ops/kv/get_optimistic.rs @@ -123,13 +123,7 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul }; if input.caching_behavior == protocol::CachingBehavior::Optimistic { - cache_fanout_value( - ctx, - input.replica_id, - input.key.clone(), - value.clone(), - ) - .await?; + cache_fanout_value(ctx, input.replica_id, input.key.clone(), value.clone()).await?; } return Ok(Output { @@ -162,7 +156,10 @@ async fn cache_fanout_value( // This covers the race where a commit lands between the fanout read and // the cache write. if let Some(committed_value) = tx - .read_opt(&committed_key, universaldb::utils::IsolationLevel::Serializable) + .read_opt( + &committed_key, + universaldb::utils::IsolationLevel::Serializable, + ) .await? { if committed_value.version >= value_to_cache.version { @@ -174,10 +171,7 @@ async fn cache_fanout_value( // prevents a slow fanout response from overwriting a fresher cache entry // written by a concurrent request. if let Some(existing_cache) = tx - .read_opt( - &cache_key, - universaldb::utils::IsolationLevel::Serializable, - ) + .read_opt(&cache_key, universaldb::utils::IsolationLevel::Serializable) .await? { if existing_cache.version > value_to_cache.version { diff --git a/engine/packages/epoxy/src/ops/kv/read_value.rs b/engine/packages/epoxy/src/ops/kv/read_value.rs index f85678196a..3a7875f032 100644 --- a/engine/packages/epoxy/src/ops/kv/read_value.rs +++ b/engine/packages/epoxy/src/ops/kv/read_value.rs @@ -3,7 +3,9 @@ use epoxy_protocol::protocol::ReplicaId; use gas::prelude::*; use universaldb::utils::{FormalKey, IsolationLevel::Serializable}; -use crate::keys::{self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey}; +use crate::keys::{ + self, CommittedValue, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey, +}; #[derive(Debug)] pub(crate) struct LocalValueRead { diff --git a/engine/packages/epoxy/src/ops/propose.rs b/engine/packages/epoxy/src/ops/propose.rs index 395c1f5b15..badf62aabe 100644 --- a/engine/packages/epoxy/src/ops/propose.rs +++ b/engine/packages/epoxy/src/ops/propose.rs @@ -9,8 +9,9 @@ use std::collections::{BTreeSet, HashSet}; use std::time::{Duration, Instant}; use crate::{ - http_client, metrics, + http_client, keys::CommittedValue, + metrics, replica::{ ballot::{self, Ballot, BallotSelection}, commit_kv::{self, CommitKvOutcome}, @@ -101,13 +102,11 @@ impl SetProposal { key, expect_one_of, new_value: Some(value), - }) if expect_one_of.len() == 1 && matches!(expect_one_of.first(), Some(None)) => { - Ok(Self { - key: key.clone(), - value: value.clone(), - mutable, - }) - } + }) if expect_one_of.len() == 1 && matches!(expect_one_of.first(), Some(None)) => Ok(Self { + key: key.clone(), + value: value.clone(), + mutable, + }), _ => bail!( "epoxy v2 only supports single-key set-if-absent proposals with a concrete value" ), @@ -314,14 +313,14 @@ pub async fn epoxy_propose(ctx: &OperationCtx, input: &Input) -> Result { run_slow_path( @@ -444,16 +443,7 @@ async fn run_accept_path( version, mutable, } = value; - commit_kv::commit_kv( - &tx, - replica_id, - key, - value, - ballot, - mutable, - version, - ) - .await + commit_kv::commit_kv(&tx, replica_id, key, value, ballot, mutable, version).await } }) .custom_instrument(tracing::info_span!("commit_kv_tx")) @@ -472,16 +462,15 @@ async fn run_accept_path( let ballot = ballot; let purge_cache = purge_cache && proposal.mutable; async move { - if let Err(err) = - broadcast_commits( - &ctx, - &config, - replica_id, - key.clone(), - chosen_value.clone(), - ballot, - ) - .await + if let Err(err) = broadcast_commits( + &ctx, + &config, + replica_id, + key.clone(), + chosen_value.clone(), + ballot, + ) + .await { tracing::warn!(?err, "commit broadcast failed after local commit"); } @@ -550,7 +539,8 @@ async fn run_prepare_phase( } PrepareRoundOutcome::Retry { next_ballot } => { store_prepare_ballot(ctx, replica_id, key.clone(), next_ballot).await?; - let Some(retry_delay) = next_prepare_retry_delay(retry_count, &mut rand::thread_rng()) + let Some(retry_delay) = + next_prepare_retry_delay(retry_count, &mut rand::thread_rng()) else { tracing::warn!( %replica_id, @@ -592,31 +582,32 @@ async fn send_prepare_round( ballot: protocol::Ballot, ) -> Result { let target = utils::calculate_quorum(replica_ids.len(), utils::QuorumType::Slow); - let mut pending = futures_util::stream::iter(replica_ids.iter().copied().map(|to_replica_id| { - let key = key.clone(); - let proposed_value = proposed_value.clone(); - let ballot = ballot.clone(); - async move { - ( - to_replica_id, - tokio::time::timeout( - crate::consts::REQUEST_TIMEOUT, - send_prepare_request( - ctx, - config, - from_replica_id, - to_replica_id, - key, - proposed_value, - ballot, - ), + let mut pending = + futures_util::stream::iter(replica_ids.iter().copied().map(|to_replica_id| { + let key = key.clone(); + let proposed_value = proposed_value.clone(); + let ballot = ballot.clone(); + async move { + ( + to_replica_id, + tokio::time::timeout( + crate::consts::REQUEST_TIMEOUT, + send_prepare_request( + ctx, + config, + from_replica_id, + to_replica_id, + key, + proposed_value, + ballot, + ), + ) + .await, ) - .await, - ) - } - })) - .collect::>() - .await; + } + })) + .collect::>() + .await; let mut ok_responses = 0; let mut remaining = replica_ids.len(); @@ -655,7 +646,9 @@ async fn send_prepare_round( } (None, None) => {} _ => { - bail!("prepare response from replica {to_replica_id} returned partial accepted state"); + bail!( + "prepare response from replica {to_replica_id} returned partial accepted state" + ); } } @@ -720,31 +713,32 @@ async fn send_accept_round( accept_quorum: utils::QuorumType, ) -> Result { let target = utils::calculate_quorum(replica_ids.len(), accept_quorum); - let mut pending = futures_util::stream::iter(replica_ids.iter().copied().map(|to_replica_id| { - let key = key.clone(); - let value = value.clone(); - let ballot = ballot.clone(); - async move { - ( - to_replica_id, - tokio::time::timeout( - crate::consts::REQUEST_TIMEOUT, - send_accept_request( - ctx, - config, - from_replica_id, - to_replica_id, - key, - value, - ballot, - ), + let mut pending = + futures_util::stream::iter(replica_ids.iter().copied().map(|to_replica_id| { + let key = key.clone(); + let value = value.clone(); + let ballot = ballot.clone(); + async move { + ( + to_replica_id, + tokio::time::timeout( + crate::consts::REQUEST_TIMEOUT, + send_accept_request( + ctx, + config, + from_replica_id, + to_replica_id, + key, + value, + ballot, + ), + ) + .await, ) - .await, - ) - } - })) - .collect::>() - .await; + } + })) + .collect::>() + .await; let mut state = AcceptRoundState { target, @@ -754,9 +748,7 @@ async fn send_accept_round( while let Some((to_replica_id, response)) = pending.next().await { let observation = match response { - Ok(Ok(protocol::AcceptResponse::AcceptResponseOk(_))) => { - AcceptObservation::Ok - } + Ok(Ok(protocol::AcceptResponse::AcceptResponseOk(_))) => AcceptObservation::Ok, Ok(Ok(protocol::AcceptResponse::AcceptResponseAlreadyCommitted(committed))) => { AcceptObservation::AlreadyCommitted(committed.value) } diff --git a/engine/packages/epoxy/src/replica/ballot.rs b/engine/packages/epoxy/src/replica/ballot.rs index b3ec151e40..33e169178a 100644 --- a/engine/packages/epoxy/src/replica/ballot.rs +++ b/engine/packages/epoxy/src/replica/ballot.rs @@ -65,7 +65,9 @@ pub enum BallotSelection { value: CommittedValue, ballot: Ballot, }, - NeedsPrepare { ballot: Ballot }, + NeedsPrepare { + ballot: Ballot, + }, FreshBallot(Ballot), } @@ -88,12 +90,7 @@ pub async fn ballot_selection( let packed_legacy_v2_value_key = legacy_subspace.pack(&legacy_v2_value_key); let packed_ballot_key = subspace.pack(&ballot_key); - let ( - committed_value, - legacy_committed_value, - legacy_v2_committed_value, - current_ballot, - ) = tokio::try_join!( + let (committed_value, legacy_committed_value, legacy_v2_committed_value, current_ballot) = tokio::try_join!( async { let value = tx.get(&packed_value_key, Serializable).await?; if let Some(bytes) = value { @@ -136,9 +133,7 @@ pub async fn ballot_selection( mutable: false, }) }) - .or_else(|| { - legacy_v2_committed_value - }) + .or_else(|| legacy_v2_committed_value) { if !value.mutable || !mutable { return Ok(BallotSelection::AlreadyCommitted(value.value)); @@ -150,10 +145,7 @@ pub async fn ballot_selection( ballot_key, current_ballot.unwrap_or_else(|| Ballot::zero(replica_id)), )?; - return Ok(BallotSelection::AlreadyCommittedMutable { - value, - ballot, - }); + return Ok(BallotSelection::AlreadyCommittedMutable { value, ballot }); } let current_ballot = current_ballot.unwrap_or_else(|| Ballot::zero(replica_id)); diff --git a/engine/packages/epoxy/src/replica/changelog.rs b/engine/packages/epoxy/src/replica/changelog.rs index 060a4e7777..c86a654d77 100644 --- a/engine/packages/epoxy/src/replica/changelog.rs +++ b/engine/packages/epoxy/src/replica/changelog.rs @@ -5,13 +5,14 @@ use universaldb::{ KeySelector, RangeOption, Transaction, options::StreamingMode, tuple::Versionstamp, - utils::{ - FormalKey, IsolationLevel::Serializable, keys::CHANGELOG, - }, + utils::{FormalKey, IsolationLevel::Serializable, keys::CHANGELOG}, versionstamp::{generate_versionstamp, substitute_versionstamp}, }; -use crate::keys::{self, ChangelogKey, CommittedValue, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, KvValueKey}; +use crate::keys::{ + self, ChangelogKey, CommittedValue, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, + KvValueKey, +}; use crate::metrics; #[tracing::instrument(skip_all, fields(%replica_id, key = ?key))] @@ -52,15 +53,15 @@ pub async fn read( let replica_subspace = keys::subspace(replica_id); let changelog_subspace = replica_subspace.subspace(&(CHANGELOG,)); let mut range: RangeOption<'static> = (&changelog_subspace).into(); - range.limit = Some( - usize::try_from(req.count).context("changelog read count does not fit in usize")?, - ); + range.limit = + Some(usize::try_from(req.count).context("changelog read count does not fit in usize")?); range.mode = StreamingMode::WantAll; let mut last_versionstamp = req.after_versionstamp.clone().unwrap_or_default(); if let Some(after_versionstamp) = req.after_versionstamp { - let after_key = - replica_subspace.pack(&ChangelogKey::new(decode_versionstamp(&after_versionstamp)?)); + let after_key = replica_subspace.pack(&ChangelogKey::new(decode_versionstamp( + &after_versionstamp, + )?)); range.begin = KeySelector::first_greater_than(after_key); } @@ -144,4 +145,3 @@ fn decode_versionstamp(raw: &[u8]) -> Result { .context("expected 12-byte versionstamp cursor")?; Ok(Versionstamp::from(bytes)) } - diff --git a/engine/packages/epoxy/src/replica/commit_kv.rs b/engine/packages/epoxy/src/replica/commit_kv.rs index fe30533199..7bf98a66b3 100644 --- a/engine/packages/epoxy/src/replica/commit_kv.rs +++ b/engine/packages/epoxy/src/replica/commit_kv.rs @@ -1,9 +1,6 @@ use anyhow::Result; use epoxy_protocol::protocol; -use universaldb::{ - Transaction, - utils::IsolationLevel::Serializable, -}; +use universaldb::{Transaction, utils::IsolationLevel::Serializable}; use crate::{ keys::{self, CommittedValue, KvAcceptedKey, KvBallotKey, KvOptimisticCacheKey, KvValueKey}, diff --git a/engine/packages/epoxy/src/replica/messages/accept.rs b/engine/packages/epoxy/src/replica/messages/accept.rs index f0f59207fd..f388a31625 100644 --- a/engine/packages/epoxy/src/replica/messages/accept.rs +++ b/engine/packages/epoxy/src/replica/messages/accept.rs @@ -35,13 +35,11 @@ pub async fn accept( if let Some(committed_value) = committed_value { if !committed_value.mutable || !mutable || version <= committed_value.version { - return Ok( - protocol::AcceptResponse::AcceptResponseAlreadyCommitted( - protocol::AcceptResponseAlreadyCommitted { - value: committed_value.value, - }, - ), - ); + return Ok(protocol::AcceptResponse::AcceptResponseAlreadyCommitted( + protocol::AcceptResponseAlreadyCommitted { + value: committed_value.value, + }, + )); } } diff --git a/engine/packages/epoxy/src/replica/messages/commit.rs b/engine/packages/epoxy/src/replica/messages/commit.rs index fb90aad4e4..033b5a8274 100644 --- a/engine/packages/epoxy/src/replica/messages/commit.rs +++ b/engine/packages/epoxy/src/replica/messages/commit.rs @@ -27,9 +27,7 @@ pub async fn commit( protocol::CommitResponseAlreadyCommitted { value }, ) } - CommitKvOutcome::StaleBallot { .. } => { - protocol::CommitResponse::CommitResponseStaleCommit - } + CommitKvOutcome::StaleBallot { .. } => protocol::CommitResponse::CommitResponseStaleCommit, }; Ok(response) diff --git a/engine/packages/epoxy/src/replica/messages/mod.rs b/engine/packages/epoxy/src/replica/messages/mod.rs index 4aaa2cd76f..a49dff9014 100644 --- a/engine/packages/epoxy/src/replica/messages/mod.rs +++ b/engine/packages/epoxy/src/replica/messages/mod.rs @@ -1,7 +1,7 @@ -pub mod commit; pub mod accept; +pub mod commit; pub mod prepare; -pub use commit::commit; pub use accept::accept; +pub use commit::commit; pub use prepare::prepare; diff --git a/engine/packages/epoxy/src/types.rs b/engine/packages/epoxy/src/types.rs index e045405ac6..db3372f926 100644 --- a/engine/packages/epoxy/src/types.rs +++ b/engine/packages/epoxy/src/types.rs @@ -31,7 +31,11 @@ impl From for ClusterConfig { Self { coordinator_replica_id: config.coordinator_replica_id, epoch: config.epoch, - replicas: config.replicas.into_iter().map(ReplicaConfig::from).collect(), + replicas: config + .replicas + .into_iter() + .map(ReplicaConfig::from) + .collect(), } } } diff --git a/engine/packages/epoxy/src/utils.rs b/engine/packages/epoxy/src/utils.rs index 509ab51829..6c7ba3227e 100644 --- a/engine/packages/epoxy/src/utils.rs +++ b/engine/packages/epoxy/src/utils.rs @@ -107,13 +107,28 @@ mod tests { (7, 6, 4, 7, 1, 5, 3, 6, 1), ]; - for (n, fast, slow, all, any, fanout_fast, fanout_slow, fanout_all, fanout_any) in - expected + for (n, fast, slow, all, any, fanout_fast, fanout_slow, fanout_all, fanout_any) in expected { - assert_eq!(calculate_quorum(n, QuorumType::Fast), fast, "fast quorum for n={n}"); - assert_eq!(calculate_quorum(n, QuorumType::Slow), slow, "slow quorum for n={n}"); - assert_eq!(calculate_quorum(n, QuorumType::All), all, "all quorum for n={n}"); - assert_eq!(calculate_quorum(n, QuorumType::Any), any, "any quorum for n={n}"); + assert_eq!( + calculate_quorum(n, QuorumType::Fast), + fast, + "fast quorum for n={n}" + ); + assert_eq!( + calculate_quorum(n, QuorumType::Slow), + slow, + "slow quorum for n={n}" + ); + assert_eq!( + calculate_quorum(n, QuorumType::All), + all, + "all quorum for n={n}" + ); + assert_eq!( + calculate_quorum(n, QuorumType::Any), + any, + "any quorum for n={n}" + ); assert_eq!( calculate_fanout_quorum(n, QuorumType::Fast), fanout_fast, @@ -169,7 +184,10 @@ mod tests { ); if n >= 2 { - assert!(slow * 2 > n, "slow quorum must be a strict majority for n={n}"); + assert!( + slow * 2 > n, + "slow quorum must be a strict majority for n={n}" + ); assert!( (2 * fast) + slow > 2 * n, "fast quorum must satisfy the Fast Paxos intersection invariant for n={n}" diff --git a/engine/packages/epoxy/src/workflows/backfill.rs b/engine/packages/epoxy/src/workflows/backfill.rs index c15048630a..b0f86df8e8 100644 --- a/engine/packages/epoxy/src/workflows/backfill.rs +++ b/engine/packages/epoxy/src/workflows/backfill.rs @@ -7,7 +7,8 @@ use universaldb::{ KeySelector, RangeOption, options::StreamingMode, utils::{ - FormalKey, IsolationLevel::Serializable, + FormalKey, + IsolationLevel::Serializable, keys::{COMMITTED_VALUE, KV, VALUE}, }, }; @@ -76,7 +77,10 @@ pub struct BackfillChunkOutput { } #[activity(BackfillChunk)] -pub async fn backfill_chunk(ctx: &ActivityCtx, input: &BackfillChunkInput) -> Result { +pub async fn backfill_chunk( + ctx: &ActivityCtx, + input: &BackfillChunkInput, +) -> Result { let replica_id = ctx.config().epoxy_replica_id(); ctx.udb()? @@ -180,11 +184,9 @@ async fn migrate_legacy_key( legacy_value: Option>, legacy_committed_value: Option>, ) -> Result { - let Some(committed_value) = build_legacy_committed_value( - key.clone(), - legacy_value, - legacy_committed_value, - )? else { + let Some(committed_value) = + build_legacy_committed_value(key.clone(), legacy_value, legacy_committed_value)? + else { return Ok(false); }; diff --git a/engine/packages/epoxy/src/workflows/replica/setup.rs b/engine/packages/epoxy/src/workflows/replica/setup.rs index 88583e0e76..7b8df65a30 100644 --- a/engine/packages/epoxy/src/workflows/replica/setup.rs +++ b/engine/packages/epoxy/src/workflows/replica/setup.rs @@ -15,10 +15,7 @@ pub async fn setup_replica(ctx: &mut WorkflowCtx, _input: &super::Input) -> Resu } #[tracing::instrument(skip_all, fields(replica_id = %ctx.config().epoxy_replica_id()))] -pub async fn begin_learning( - ctx: &mut WorkflowCtx, - signal: &super::BeginLearning, -) -> Result<()> { +pub async fn begin_learning(ctx: &mut WorkflowCtx, signal: &super::BeginLearning) -> Result<()> { ctx.activity(StoreConfigInput { config: signal.config.clone(), }) @@ -111,9 +108,7 @@ pub async fn catch_up_replica(ctx: &ActivityCtx, input: &CatchUpReplicaInput) -> ctx.udb()? .run(|tx| { let entry = entry.clone(); - async move { - crate::replica::changelog::apply_entry(&*tx, replica_id, entry).await - } + async move { crate::replica::changelog::apply_entry(&*tx, replica_id, entry).await } }) .custom_instrument(tracing::info_span!("apply_changelog_entry_tx")) .await?; diff --git a/engine/packages/epoxy/tests/backfill.rs b/engine/packages/epoxy/tests/backfill.rs index 2a690a2eca..78b3b1ebd2 100644 --- a/engine/packages/epoxy/tests/backfill.rs +++ b/engine/packages/epoxy/tests/backfill.rs @@ -21,7 +21,9 @@ async fn backfill_migrates_legacy_values_into_v2_and_changelog() { .unwrap(); let workflow_id = ctx - .workflow(epoxy::workflows::backfill::Input { chunk_size: Some(1) }) + .workflow(epoxy::workflows::backfill::Input { + chunk_size: Some(1), + }) .tag("replica", replica_id) .dispatch() .await diff --git a/engine/packages/epoxy/tests/backfill_snapshot.rs b/engine/packages/epoxy/tests/backfill_snapshot.rs index 83d0ac8b7a..e438d54b2b 100644 --- a/engine/packages/epoxy/tests/backfill_snapshot.rs +++ b/engine/packages/epoxy/tests/backfill_snapshot.rs @@ -137,7 +137,9 @@ async fn v1_snapshot_dual_read_mutate_and_backfill() { // -- Phase 4: Run backfill and verify migration -- let workflow_id = ctx - .workflow(epoxy::workflows::backfill::Input { chunk_size: Some(10) }) + .workflow(epoxy::workflows::backfill::Input { + chunk_size: Some(10), + }) .tag("replica", replica_id) .dispatch() .await @@ -207,19 +209,17 @@ async fn v1_snapshot_dual_read_mutate_and_backfill() { // The backfilled keys are immutable (version 0, mutable=false). A same- // value re-proposal should be idempotent. A new-value proposal should // be rejected since the v2 committed value now exists. - let result = - propose_local(ctx, replica_id, b"actor:def456", b"stopped", false) - .await - .unwrap(); + let result = propose_local(ctx, replica_id, b"actor:def456", b"stopped", false) + .await + .unwrap(); assert!( matches!(result, ProposalResult::Committed), "replica {replica_id}: idempotent proposal on backfilled key should succeed: {result:?}", ); - let result = - propose_local(ctx, replica_id, b"actor:def456", b"changed", false) - .await - .unwrap(); + let result = propose_local(ctx, replica_id, b"actor:def456", b"changed", false) + .await + .unwrap(); assert!( matches!( result, diff --git a/engine/packages/epoxy/tests/common/mod.rs b/engine/packages/epoxy/tests/common/mod.rs index 1ad6c1fb7f..03fc31220c 100644 --- a/engine/packages/epoxy/tests/common/mod.rs +++ b/engine/packages/epoxy/tests/common/mod.rs @@ -279,15 +279,18 @@ impl TestCtx { for &other_replica_id in &replica_ids { let metadata = &self.replica_metadata[&other_replica_id]; let name = format!("dc-{}", other_replica_id); - datacenters.insert(name.clone(), rivet_config::config::topology::Datacenter { - name: format!("dc-{}", other_replica_id), - datacenter_label: other_replica_id as u16, - is_leader: other_replica_id == self.leader_id, - peer_url: Url::parse(&format!("http://127.0.0.1:{}", metadata.api_peer_port))?, - public_url: Url::parse(&format!("http://127.0.0.1:{}", metadata.guard_port))?, - proxy_url: None, - valid_hosts: None, - }); + datacenters.insert( + name.clone(), + rivet_config::config::topology::Datacenter { + name: format!("dc-{}", other_replica_id), + datacenter_label: other_replica_id as u16, + is_leader: other_replica_id == self.leader_id, + peer_url: Url::parse(&format!("http://127.0.0.1:{}", metadata.api_peer_port))?, + public_url: Url::parse(&format!("http://127.0.0.1:{}", metadata.guard_port))?, + proxy_url: None, + valid_hosts: None, + }, + ); } Ok(datacenters) diff --git a/engine/packages/epoxy/tests/common/utils.rs b/engine/packages/epoxy/tests/common/utils.rs index c0e942b5af..7c9fb5f36d 100644 --- a/engine/packages/epoxy/tests/common/utils.rs +++ b/engine/packages/epoxy/tests/common/utils.rs @@ -4,7 +4,9 @@ use epoxy::{ self, ChangelogKey, CommittedValue, KvAcceptedKey, KvAcceptedValue, KvBallotKey, KvOptimisticCacheKey, KvValueKey, LegacyCommittedValueKey, }, - ops::propose::{self, CheckAndSetCommand, Command, CommandKind, Proposal, ProposalResult, SetCommand}, + ops::propose::{ + self, CheckAndSetCommand, Command, CommandKind, Proposal, ProposalResult, SetCommand, + }, }; use epoxy_protocol::protocol::{self, ReplicaId}; use futures_util::TryStreamExt; @@ -12,10 +14,7 @@ use gas::prelude::TestCtx as WorkflowTestCtx; use universaldb::{ RangeOption, options::StreamingMode, - utils::{ - FormalKey, IsolationLevel::Serializable, - keys::CHANGELOG, - }, + utils::{FormalKey, IsolationLevel::Serializable, keys::CHANGELOG}, }; #[allow(dead_code)] @@ -74,7 +73,11 @@ pub async fn check_and_set_absent( } #[allow(dead_code)] -pub async fn set_mutable(ctx: &WorkflowTestCtx, key: &[u8], value: &[u8]) -> Result { +pub async fn set_mutable( + ctx: &WorkflowTestCtx, + key: &[u8], + value: &[u8], +) -> Result { let result = ctx .op(propose::Input { proposal: Proposal { @@ -176,7 +179,7 @@ pub async fn write_v2_committed_value( Ok(()) } }) - .await + .await } #[allow(dead_code)] @@ -191,7 +194,8 @@ pub async fn read_legacy_value( let key = key.clone(); async move { let tx = tx.with_subspace(keys::legacy_subspace(replica_id)); - tx.read_opt(&LegacyCommittedValueKey::new(key), Serializable).await + tx.read_opt(&LegacyCommittedValueKey::new(key), Serializable) + .await } }) .await @@ -254,7 +258,8 @@ pub async fn read_cache_committed_value( let key = key.clone(); async move { let tx = tx.with_subspace(keys::subspace(replica_id)); - tx.read_opt(&KvOptimisticCacheKey::new(key), Serializable).await + tx.read_opt(&KvOptimisticCacheKey::new(key), Serializable) + .await } }) .await diff --git a/engine/packages/epoxy/tests/consensus_regressions.rs b/engine/packages/epoxy/tests/consensus_regressions.rs index 65b3d17938..34f93f0a97 100644 --- a/engine/packages/epoxy/tests/consensus_regressions.rs +++ b/engine/packages/epoxy/tests/consensus_regressions.rs @@ -5,11 +5,9 @@ use common::{ THREE_REPLICAS, TestCtx, utils::{read_accepted_value, read_v2_value, set_if_absent, write_ballot}, }; -use epoxy::{ - protocol::{ - self, AcceptRequest, AcceptResponse, CommitRequest, CommitResponse, PrepareRequest, - PrepareResponse, Request, RequestKind, ResponseKind, - }, +use epoxy::protocol::{ + self, AcceptRequest, AcceptResponse, CommitRequest, CommitResponse, PrepareRequest, + PrepareResponse, Request, RequestKind, ResponseKind, }; use epoxy_protocol::PROTOCOL_VERSION; @@ -34,11 +32,17 @@ async fn slow_path_recovery_uses_majority_quorum_after_prepare() { .await .unwrap(); - test_ctx.stop_replica(THREE_REPLICAS[2], false).await.unwrap(); + test_ctx + .stop_replica(THREE_REPLICAS[2], false) + .await + .unwrap(); let ctx = test_ctx.get_ctx(replica_id); let result = set_if_absent(ctx, key, b"committed").await.unwrap(); - assert!(matches!(result, epoxy::ops::propose::ProposalResult::Committed)); + assert!(matches!( + result, + epoxy::ops::propose::ProposalResult::Committed + )); assert_eq!( read_v2_value(ctx, replica_id, key).await.unwrap(), Some(b"committed".to_vec()), @@ -67,8 +71,8 @@ async fn equal_ballot_accepts_do_not_overwrite_accepted_state() { b"value-1", ballot.clone(), ) - .await - .unwrap(); + .await + .unwrap(); assert!(matches!( first_response, AcceptResponse::AcceptResponseOk(_) @@ -89,8 +93,8 @@ async fn equal_ballot_accepts_do_not_overwrite_accepted_state() { b"value-2", ballot.clone(), ) - .await - .unwrap(); + .await + .unwrap(); assert!(matches!( second_response, AcceptResponse::AcceptResponseHigherBallot(_) diff --git a/engine/packages/epoxy/tests/kv.rs b/engine/packages/epoxy/tests/kv.rs index 35fbdfc6f1..e0db4c4dcb 100644 --- a/engine/packages/epoxy/tests/kv.rs +++ b/engine/packages/epoxy/tests/kv.rs @@ -49,14 +49,18 @@ async fn test_kv_operations() { let cas_key = b"immutable-cas-key"; - let first_result = check_and_set_absent(ctx, cas_key, b"created").await.unwrap(); + let first_result = check_and_set_absent(ctx, cas_key, b"created") + .await + .unwrap(); assert!(matches!(first_result, ProposalResult::Committed)); assert_eq!( get_local(ctx, replica_id, cas_key).await.unwrap(), Some(b"created".to_vec()), ); - let same_value_result = check_and_set_absent(ctx, cas_key, b"created").await.unwrap(); + let same_value_result = check_and_set_absent(ctx, cas_key, b"created") + .await + .unwrap(); assert!(matches!(same_value_result, ProposalResult::Committed)); let different_value_result = check_and_set_absent(ctx, cas_key, b"other").await.unwrap(); @@ -105,26 +109,29 @@ async fn test_kv_operations() { )); let changelog_entries = read_changelog_entries(ctx, replica_id).await.unwrap(); - assert!(changelog_entries.contains(&epoxy::protocol::ChangelogEntry { - key: key.to_vec(), - value: b"value1".to_vec(), - version: 1, - mutable: true, - })); - assert!(changelog_entries.contains(&epoxy::protocol::ChangelogEntry { - key: key.to_vec(), - value: b"value2".to_vec(), - version: 2, - mutable: true, - })); + assert!( + changelog_entries.contains(&epoxy::protocol::ChangelogEntry { + key: key.to_vec(), + value: b"value1".to_vec(), + version: 1, + mutable: true, + }) + ); + assert!( + changelog_entries.contains(&epoxy::protocol::ChangelogEntry { + key: key.to_vec(), + value: b"value2".to_vec(), + version: 2, + mutable: true, + }) + ); for _ in 0..20 { let mut replicated = true; for replica_id in THREE_REPLICAS { if read_v2_value(test_ctx.get_ctx(*replica_id), *replica_id, key) .await - .unwrap() - != Some(b"value2".to_vec()) + .unwrap() != Some(b"value2".to_vec()) { replicated = false; break; diff --git a/engine/packages/epoxy/tests/kv_get_optimistic.rs b/engine/packages/epoxy/tests/kv_get_optimistic.rs index d19dc95676..05f9e25b84 100644 --- a/engine/packages/epoxy/tests/kv_get_optimistic.rs +++ b/engine/packages/epoxy/tests/kv_get_optimistic.rs @@ -49,7 +49,10 @@ async fn test_kv_get_optimistic_paths() { let result = set_if_absent(ctx, key, value).await.unwrap(); assert!(matches!(result, ProposalResult::Committed)); - assert_eq!(optimistic_get(ctx, replica_id, key).await, Some(value.to_vec())); + assert_eq!( + optimistic_get(ctx, replica_id, key).await, + Some(value.to_vec()) + ); test_ctx.shutdown().await.unwrap(); } @@ -110,7 +113,10 @@ async fn test_kv_get_optimistic_paths() { Some(remote_value.to_vec()), ); - test_ctx.stop_replica(writer_replica_id, false).await.unwrap(); + test_ctx + .stop_replica(writer_replica_id, false) + .await + .unwrap(); assert_eq!( optimistic_get( test_ctx.get_ctx(reader_replica_id), @@ -257,11 +263,9 @@ async fn test_kv_get_optimistic_paths() { if read_cache_value(follower_ctx, follower_replica_id, key) .await .unwrap() - .is_none() - && read_v2_value(follower_ctx, follower_replica_id, key) - .await - .unwrap() - == Some(b"value2".to_vec()) + .is_none() && read_v2_value(follower_ctx, follower_replica_id, key) + .await + .unwrap() == Some(b"value2".to_vec()) { test_ctx.shutdown().await.unwrap(); return; diff --git a/engine/packages/epoxy/tests/migration.rs b/engine/packages/epoxy/tests/migration.rs index 45b145659f..b5783bc43f 100644 --- a/engine/packages/epoxy/tests/migration.rs +++ b/engine/packages/epoxy/tests/migration.rs @@ -7,13 +7,11 @@ use common::{ write_legacy_value, }, }; -use epoxy::{ - ops::propose::{CommandError, ProposalResult}, -}; +use epoxy::ops::propose::{CommandError, ProposalResult}; #[tokio::test(flavor = "multi_thread")] async fn dual_read_fallback_reads_legacy_subspaces_without_migrating() { -let mut test_ctx = TestCtx::new_with(&[1_u64]).await.unwrap(); + let mut test_ctx = TestCtx::new_with(&[1_u64]).await.unwrap(); let replica_id = test_ctx.leader_id; let ctx = test_ctx.get_ctx(replica_id); let blocked_key = b"legacy-committed-key"; @@ -25,9 +23,14 @@ let mut test_ctx = TestCtx::new_with(&[1_u64]).await.unwrap(); get_local(ctx, replica_id, blocked_key).await.unwrap(), Some(blocked_value.to_vec()), ); - assert_eq!(read_v2_value(ctx, replica_id, blocked_key).await.unwrap(), None); + assert_eq!( + read_v2_value(ctx, replica_id, blocked_key).await.unwrap(), + None + ); - let blocked_result = set_if_absent(ctx, blocked_key, b"new-v2-value").await.unwrap(); + let blocked_result = set_if_absent(ctx, blocked_key, b"new-v2-value") + .await + .unwrap(); assert!(matches!( blocked_result, ProposalResult::CommandError(CommandError::ExpectedValueDoesNotMatch { @@ -41,9 +44,7 @@ let mut test_ctx = TestCtx::new_with(&[1_u64]).await.unwrap(); Some(blocked_value.to_vec()), ); assert_eq!( - read_v2_value(ctx, replica_id, blocked_key) - .await - .unwrap(), + read_v2_value(ctx, replica_id, blocked_key).await.unwrap(), None, ); @@ -56,7 +57,10 @@ let mut test_ctx = TestCtx::new_with(&[1_u64]).await.unwrap(); get_local(ctx, replica_id, migrated_key).await.unwrap(), Some(migrated_value.to_vec()), ); - assert_eq!(read_v2_value(ctx, replica_id, migrated_key).await.unwrap(), None); + assert_eq!( + read_v2_value(ctx, replica_id, migrated_key).await.unwrap(), + None + ); let fresh_key = b"fresh-v2-key"; let fresh_value = b"fresh-v2-value"; diff --git a/engine/packages/epoxy/tests/proposal.rs b/engine/packages/epoxy/tests/proposal.rs index c49f72b641..50e8c31ffc 100644 --- a/engine/packages/epoxy/tests/proposal.rs +++ b/engine/packages/epoxy/tests/proposal.rs @@ -4,10 +4,7 @@ use common::{ THREE_REPLICAS, TestCtx, utils::{get_local, read_ballot, set_if_absent, set_mutable, write_ballot}, }; -use epoxy::{ - metrics, - ops::propose::ProposalResult, -}; +use epoxy::{metrics, ops::propose::ProposalResult}; use epoxy_protocol::protocol; static TEST_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(()); @@ -52,15 +49,13 @@ async fn proposal_uses_fast_and_contention_paths() { assert_eq!( metrics::PROPOSAL_TOTAL .with_label_values(&["committed"]) - .get() - - committed_before, + .get() - committed_before, 1, ); assert_eq!( metrics::PROPOSAL_TOTAL .with_label_values(&["slow_path"]) - .get() - - slow_result_before, + .get() - slow_result_before, 0, ); @@ -106,8 +101,7 @@ async fn proposal_uses_fast_and_contention_paths() { assert_eq!( metrics::PROPOSAL_TOTAL .with_label_values(&["slow_path"]) - .get() - - slow_result_before, + .get() - slow_result_before, 1, ); let key = b"mutable-fast-path"; @@ -132,8 +126,7 @@ async fn proposal_uses_fast_and_contention_paths() { for replica_id in THREE_REPLICAS { if get_local(test_ctx.get_ctx(*replica_id), *replica_id, key) .await - .unwrap() - != Some(b"value2".to_vec()) + .unwrap() != Some(b"value2".to_vec()) { replicated = false; break; diff --git a/engine/packages/epoxy/tests/reconfigure.rs b/engine/packages/epoxy/tests/reconfigure.rs index fd1c3ec51b..44cd7188f2 100644 --- a/engine/packages/epoxy/tests/reconfigure.rs +++ b/engine/packages/epoxy/tests/reconfigure.rs @@ -190,7 +190,8 @@ async fn verify_changelog_catch_up( replica_id: ReplicaId, expected_keys: &[(Vec, Vec)], ) -> Result<()> { - let changelog_entries = read_changelog_entries(test_ctx.get_ctx(replica_id), replica_id).await?; + let changelog_entries = + read_changelog_entries(test_ctx.get_ctx(replica_id), replica_id).await?; assert_eq!( changelog_entries.len(), expected_keys.len(), diff --git a/engine/packages/guard/src/cache/mod.rs b/engine/packages/guard/src/cache/mod.rs index 24efec175d..6bcc19d744 100644 --- a/engine/packages/guard/src/cache/mod.rs +++ b/engine/packages/guard/src/cache/mod.rs @@ -9,10 +9,7 @@ use rivet_guard_core::{CacheKeyFn, request_context::RequestContext}; pub mod pegboard_gateway; -use crate::routing::{ - SEC_WEBSOCKET_PROTOCOL, WS_PROTOCOL_TARGET, X_RIVET_TARGET, - actor_path::{self, QueryActorPathInfo}, -}; +use crate::routing::{SEC_WEBSOCKET_PROTOCOL, WS_PROTOCOL_TARGET, X_RIVET_TARGET, actor_path}; /// Creates the main cache key function that handles all incoming requests #[tracing::instrument(skip_all)] @@ -39,7 +36,10 @@ pub fn create_cache_key_function() -> CacheKeyFn { // is excluded because it does not affect which actor the // request routes to. tracing::debug!("using query-path cache key for actor"); - return Ok(query_path_cache_key(&query_path_info, req_ctx)); + return Ok(pegboard_gateway::build_cache_key_query_based( + &query_path_info, + req_ctx, + )); } } } @@ -87,45 +87,3 @@ fn host_path_method_cache_key(req_ctx: &RequestContext) -> u64 { req_ctx.method().as_str().hash(&mut hasher); hasher.finish() } - -/// Build a cache key from only the routing-relevant fields of a query gateway -/// path. Token is intentionally excluded so requests with different tokens but -/// the same query resolve to the same cached route. -fn query_path_cache_key(info: &QueryActorPathInfo, req_ctx: &RequestContext) -> u64 { - use crate::routing::actor_path::QueryActorQuery; - - let mut hasher = DefaultHasher::new(); - match &info.query { - QueryActorQuery::Get { - namespace, - name, - key, - } => { - "get".hash(&mut hasher); - namespace.hash(&mut hasher); - name.hash(&mut hasher); - key.hash(&mut hasher); - } - QueryActorQuery::GetOrCreate { - namespace, - name, - runner_name, - key, - input, - region, - crash_policy, - } => { - "getOrCreate".hash(&mut hasher); - namespace.hash(&mut hasher); - name.hash(&mut hasher); - runner_name.hash(&mut hasher); - key.hash(&mut hasher); - input.hash(&mut hasher); - region.hash(&mut hasher); - crash_policy.hash(&mut hasher); - } - } - info.stripped_path.hash(&mut hasher); - req_ctx.method().as_str().hash(&mut hasher); - hasher.finish() -} diff --git a/engine/packages/guard/src/cache/pegboard_gateway.rs b/engine/packages/guard/src/cache/pegboard_gateway.rs index 9816e477a0..5492f1c58a 100644 --- a/engine/packages/guard/src/cache/pegboard_gateway.rs +++ b/engine/packages/guard/src/cache/pegboard_gateway.rs @@ -8,7 +8,8 @@ use gas::prelude::*; use rivet_guard_core::request_context::RequestContext; use crate::routing::{ - SEC_WEBSOCKET_PROTOCOL, WS_PROTOCOL_ACTOR, actor_path::ActorPathInfo, + SEC_WEBSOCKET_PROTOCOL, WS_PROTOCOL_ACTOR, + actor_path::{ActorPathInfo, QueryActorPathInfo}, pegboard_gateway::X_RIVET_ACTOR, }; @@ -99,3 +100,45 @@ pub fn build_cache_key_target_based(req_ctx: &RequestContext, target: &str) -> R Ok(hash) } + +/// Build a cache key from only the routing-relevant fields of a query gateway +/// path. Token is intentionally excluded so requests with different tokens but +/// the same query resolve to the same cached route. +pub fn build_cache_key_query_based(info: &QueryActorPathInfo, req_ctx: &RequestContext) -> u64 { + use crate::routing::actor_path::QueryActorQuery; + + let mut hasher = DefaultHasher::new(); + match &info.query { + QueryActorQuery::Get { + namespace, + name, + key, + } => { + "get".hash(&mut hasher); + namespace.hash(&mut hasher); + name.hash(&mut hasher); + key.hash(&mut hasher); + } + QueryActorQuery::GetOrCreate { + namespace, + name, + runner_name, + key, + input, + region, + crash_policy, + } => { + "getOrCreate".hash(&mut hasher); + namespace.hash(&mut hasher); + name.hash(&mut hasher); + runner_name.hash(&mut hasher); + key.hash(&mut hasher); + input.hash(&mut hasher); + region.hash(&mut hasher); + crash_policy.hash(&mut hasher); + } + } + info.stripped_path.hash(&mut hasher); + req_ctx.method().as_str().hash(&mut hasher); + hasher.finish() +} diff --git a/engine/packages/guard/src/errors.rs b/engine/packages/guard/src/errors.rs index 365034e1f9..45912a8922 100644 --- a/engine/packages/guard/src/errors.rs +++ b/engine/packages/guard/src/errors.rs @@ -165,4 +165,3 @@ pub struct QueryInvalidCborInput { pub struct QueryInvalidPercentEncoding { pub name: String, } - diff --git a/engine/packages/guard/src/routing/actor_path.rs b/engine/packages/guard/src/routing/actor_path.rs index 94e6b55b30..7b41fa3e5b 100644 --- a/engine/packages/guard/src/routing/actor_path.rs +++ b/engine/packages/guard/src/routing/actor_path.rs @@ -115,7 +115,11 @@ pub fn parse_actor_path(path: &str) -> Result> { Some(q) => format!("?{q}"), None => String::new(), }; - Ok(parse_direct_actor_path(base_path, &segments, &raw_query_string)) + Ok(parse_direct_actor_path( + base_path, + &segments, + &raw_query_string, + )) } fn parse_direct_actor_path( @@ -143,10 +147,7 @@ fn parse_direct_actor_path( let token = urlencoding::decode(raw_token).ok()?.into_owned(); (actor_id, Some(token)) } else { - ( - urlencoding::decode(actor_segment).ok()?.into_owned(), - None, - ) + (urlencoding::decode(actor_segment).ok()?.into_owned(), None) }; let remaining_path = build_remaining_path(base_path, raw_query_string, 2); @@ -252,13 +253,11 @@ fn build_actor_query(name: &str, rvt: RvtParams) -> Result { }) } "getOrCreate" => { - let runner_name = rvt.runner.ok_or_else(|| errors::QueryMissingRunnerName.build())?; + let runner_name = rvt + .runner + .ok_or_else(|| errors::QueryMissingRunnerName.build())?; - let input = rvt - .input - .as_deref() - .map(decode_query_input) - .transpose()?; + let input = rvt.input.as_deref().map(decode_query_input).transpose()?; let crash_policy = rvt .crash_policy diff --git a/engine/packages/guard/src/routing/kv_channel.rs b/engine/packages/guard/src/routing/kv_channel.rs index b0bdd46549..7f56971b0a 100644 --- a/engine/packages/guard/src/routing/kv_channel.rs +++ b/engine/packages/guard/src/routing/kv_channel.rs @@ -43,7 +43,11 @@ pub async fn route_request_path_based( .build() })?; - if token.as_bytes().ct_ne(auth.admin_token.read().as_bytes()).into() { + if token + .as_bytes() + .ct_ne(auth.admin_token.read().as_bytes()) + .into() + { return Err(rivet_api_builder::ApiForbidden.build()); } diff --git a/engine/packages/guard/src/routing/mod.rs b/engine/packages/guard/src/routing/mod.rs index bf227c9d1a..f8a26a7aca 100644 --- a/engine/packages/guard/src/routing/mod.rs +++ b/engine/packages/guard/src/routing/mod.rs @@ -7,8 +7,8 @@ use rivet_guard_core::{RoutingFn, request_context::RequestContext}; use crate::{errors, metrics, shared_state::SharedState}; -mod api_public; pub mod actor_path; +mod api_public; mod envoy; mod kv_channel; pub mod pegboard_gateway; @@ -27,9 +27,9 @@ pub(crate) const WS_PROTOCOL_TOKEN: &str = "rivet_token."; #[tracing::instrument(skip_all)] pub fn create_routing_function(ctx: &StandaloneCtx, shared_state: SharedState) -> RoutingFn { let ctx = ctx.clone(); - let kv_channel_handler = Arc::new( - pegboard_kv_channel::PegboardKvChannelCustomServe::new(ctx.clone()), - ); + let kv_channel_handler = Arc::new(pegboard_kv_channel::PegboardKvChannelCustomServe::new( + ctx.clone(), + )); Arc::new(move |req_ctx| { let ctx = ctx.with_ray(req_ctx.ray_id(), req_ctx.req_id()).unwrap(); let shared_state = shared_state.clone(); @@ -58,21 +58,12 @@ pub fn create_routing_function(ctx: &StandaloneCtx, shared_state: SharedState) - // MARK: Path-based routing // Route actor - if let Some(actor_path_info) = actor_path::parse_actor_path(req_ctx.path())? { - tracing::debug!(?actor_path_info, "routing using path-based actor routing"); - - if let Some(routing_output) = pegboard_gateway::route_request_path_based( - &ctx, - &shared_state, - req_ctx, - &actor_path_info, - ) - .await? - { - metrics::ROUTE_TOTAL.with_label_values(&["gateway"]).inc(); + if let Some(routing_output) = + pegboard_gateway::route_request_path_based(&ctx, &shared_state, req_ctx).await? + { + metrics::ROUTE_TOTAL.with_label_values(&["gateway"]).inc(); - return Ok(routing_output); - } + return Ok(routing_output); } // Route runner @@ -94,8 +85,7 @@ pub fn create_routing_function(ctx: &StandaloneCtx, shared_state: SharedState) - // Route KV channel if let Some(routing_output) = - kv_channel::route_request_path_based(&ctx, req_ctx, &kv_channel_handler) - .await? + kv_channel::route_request_path_based(&ctx, req_ctx, &kv_channel_handler).await? { metrics::ROUTE_TOTAL .with_label_values(&["kv_channel"]) @@ -186,10 +176,7 @@ pub fn create_routing_function(ctx: &StandaloneCtx, shared_state: SharedState) - /// Validates that the request hostname is valid for the current datacenter. /// Returns an error if the host does not match a valid regional host. -pub(crate) fn validate_regional_host( - ctx: &StandaloneCtx, - req_ctx: &RequestContext, -) -> Result<()> { +pub(crate) fn validate_regional_host(ctx: &StandaloneCtx, req_ctx: &RequestContext) -> Result<()> { let current_dc = ctx.config().topology().current_dc()?; if !current_dc.is_valid_regional_host(req_ctx.hostname()) { tracing::warn!( diff --git a/engine/packages/guard/src/routing/pegboard_gateway/mod.rs b/engine/packages/guard/src/routing/pegboard_gateway/mod.rs index 6d444dd810..f7ac31b780 100644 --- a/engine/packages/guard/src/routing/pegboard_gateway/mod.rs +++ b/engine/packages/guard/src/routing/pegboard_gateway/mod.rs @@ -11,7 +11,7 @@ use super::{ SEC_WEBSOCKET_PROTOCOL, WS_PROTOCOL_ACTOR, WS_PROTOCOL_TOKEN, X_RIVET_TOKEN, actor_path::ParsedActorPath, }; -use crate::{errors, shared_state::SharedState}; +use crate::{errors, routing::actor_path::parse_actor_path, shared_state::SharedState}; use resolve_actor_query::resolve_query_actor_id; const ACTOR_FORCE_WAKE_PENDING_TIMEOUT: i64 = util::duration::seconds(60); @@ -30,9 +30,14 @@ pub async fn route_request_path_based( ctx: &StandaloneCtx, shared_state: &SharedState, req_ctx: &RequestContext, - actor_path: &ParsedActorPath, ) -> Result> { - let resolved_route = resolve_path_based_route(ctx, req_ctx, actor_path).await?; + let Some(actor_path) = parse_actor_path(req_ctx.path())? else { + return Ok(None); + }; + + tracing::debug!(?actor_path, "routing using path-based actor routing"); + + let resolved_route = resolve_path_based_route(ctx, req_ctx, &actor_path).await?; route_request_inner( ctx, @@ -43,6 +48,7 @@ pub async fn route_request_path_based( resolved_route.token.as_deref(), ) .await + .map(Some) } /// Route requests to actor services based on headers @@ -121,7 +127,9 @@ pub async fn route_request( // Find actor to route to let actor_id = Id::parse(&actor_id_str).context("invalid x-rivet-actor header")?; - route_request_inner(ctx, shared_state, req_ctx, actor_id, req_ctx.path(), token).await + route_request_inner(ctx, shared_state, req_ctx, actor_id, req_ctx.path(), token) + .await + .map(Some) } #[derive(Debug)] @@ -197,7 +205,7 @@ async fn route_request_inner( actor_id: Id, stripped_path: &str, _token: Option<&str>, -) -> Result> { +) -> Result { // NOTE: Token validation implemented in EE // Route to peer dc where the actor lives @@ -209,7 +217,7 @@ async fn route_request_inner( .dc_for_label(actor_id.label()) .context("dc with the given label not found")?; - return Ok(Some(RoutingOutput::Route(RouteConfig { + return Ok(RoutingOutput::Route(RouteConfig { targets: vec![RouteTarget { host: peer_dc .proxy_url_host() @@ -220,7 +228,7 @@ async fn route_request_inner( .context("bad peer dc proxy url port")?, path: req_ctx.path().to_owned(), }], - }))); + })); } // Create subs before checking if actor exists/is not destroyed @@ -278,7 +286,6 @@ async fn route_request_inner( destroy_sub2, ) .await - .map(Some) } 1 => { handle_actor_v1( @@ -298,7 +305,6 @@ async fn route_request_inner( destroy_sub2, ) .await - .map(Some) } _ => bail!("unknown actor version"), } diff --git a/engine/packages/guard/src/routing/pegboard_gateway/resolve_actor_query.rs b/engine/packages/guard/src/routing/pegboard_gateway/resolve_actor_query.rs index da4d1740ae..88b0216656 100644 --- a/engine/packages/guard/src/routing/pegboard_gateway/resolve_actor_query.rs +++ b/engine/packages/guard/src/routing/pegboard_gateway/resolve_actor_query.rs @@ -112,14 +112,9 @@ async fn resolve_query_get_or_create_actor_id( return Ok(actor_id); } - let target_dc_label = resolve_query_target_dc_label( - ctx, - namespace_id, - namespace_name, - runner_name, - region, - ) - .await?; + let target_dc_label = + resolve_query_target_dc_label(ctx, namespace_id, namespace_name, runner_name, region) + .await?; let encoded_input = input.map(|input| STANDARD.encode(input)); if target_dc_label == ctx.config().dc_label() { @@ -157,14 +152,16 @@ async fn resolve_query_get_or_create_actor_id( Some(&rivet_api_types::actors::get_or_create::GetOrCreateQuery { namespace: namespace_name.to_string(), }), - Some(&rivet_api_types::actors::get_or_create::GetOrCreateRequest { - datacenter: None, - name: name.to_string(), - key: serialized_key, - input: encoded_input, - runner_name_selector: runner_name.to_string(), - crash_policy, - }), + Some( + &rivet_api_types::actors::get_or_create::GetOrCreateRequest { + datacenter: None, + name: name.to_string(), + key: serialized_key, + input: encoded_input, + runner_name_selector: runner_name.to_string(), + crash_policy, + }, + ), ) .await?; Ok(response.actor.actor_id) @@ -190,10 +187,12 @@ async fn resolve_query_target_dc_label( } let res = ctx - .op(pegboard::ops::runner::list_runner_config_enabled_dcs::Input { - namespace_id, - runner_name: runner_name_selector.to_string(), - }) + .op( + pegboard::ops::runner::list_runner_config_enabled_dcs::Input { + namespace_id, + runner_name: runner_name_selector.to_string(), + }, + ) .await?; if let Some(dc_label) = res.dc_labels.into_iter().next() { diff --git a/engine/packages/guard/src/routing/runner.rs b/engine/packages/guard/src/routing/runner.rs index 071715d214..c67551ddd7 100644 --- a/engine/packages/guard/src/routing/runner.rs +++ b/engine/packages/guard/src/routing/runner.rs @@ -83,7 +83,11 @@ async fn route_runner_internal( }; // Validate token - if token.as_bytes().ct_ne(auth.admin_token.read().as_bytes()).into() { + if token + .as_bytes() + .ct_ne(auth.admin_token.read().as_bytes()) + .into() + { return Err(rivet_api_builder::ApiForbidden.build()); } diff --git a/engine/packages/guard/tests/parse_actor_path.rs b/engine/packages/guard/tests/parse_actor_path.rs index b0486170d9..eab5d81144 100644 --- a/engine/packages/guard/tests/parse_actor_path.rs +++ b/engine/packages/guard/tests/parse_actor_path.rs @@ -47,8 +47,8 @@ fn parses_query_actor_get_paths() { #[test] fn parses_query_actor_get_or_create_paths_with_input_and_region() { let input_bytes = vec![ - 0xa2, 0x65, b'c', b'o', b'u', b'n', b't', 0x02, 0x67, b'e', b'n', b'a', b'b', b'l', - b'e', b'd', 0xf5, + 0xa2, 0x65, b'c', b'o', b'u', b'n', b't', 0x02, 0x67, b'e', b'n', b'a', b'b', b'l', b'e', + b'd', 0xf5, ]; let input = encode_cbor_base64url(&input_bytes); let path = format!( @@ -245,7 +245,8 @@ fn preserves_percent_encoding_in_actor_query_params() { #[test] fn preserves_plus_in_actor_query_params() { // Actor params should preserve + literally, not re-encode to %2B or decode to space. - let path = "/gateway/lobby/api?rvt-namespace=default&rvt-method=get&search=hello+world&tag=c%2B%2B"; + let path = + "/gateway/lobby/api?rvt-namespace=default&rvt-method=get&search=hello+world&tag=c%2B%2B"; let result = parse_actor_path(path).unwrap().unwrap(); match result { @@ -282,7 +283,8 @@ fn handles_interleaved_rvt_and_actor_params() { fn decodes_plus_as_space_in_rvt_values() { // rvt-* values should decode + as space (form-urlencoded), while actor // params preserve + literally. - let path = "/gateway/lobby/api?rvt-namespace=my+ns&rvt-method=get&rvt-key=hello+world&q=search+term"; + let path = + "/gateway/lobby/api?rvt-namespace=my+ns&rvt-method=get&rvt-key=hello+world&q=search+term"; let result = parse_actor_path(path).unwrap().unwrap(); match result { @@ -332,10 +334,7 @@ fn rejects_missing_method() { let err = parse_actor_path("/gateway/lobby?rvt-namespace=default") .unwrap_err() .to_string(); - assert!( - err.contains("method"), - "expected method error, got: {err}" - ); + assert!(err.contains("method"), "expected method error, got: {err}"); } #[test] @@ -351,11 +350,10 @@ fn rejects_invalid_query_method() { #[test] fn rejects_unknown_query_params() { - let err = parse_actor_path( - "/gateway/lobby?rvt-namespace=default&rvt-method=get&rvt-unknown=value", - ) - .unwrap_err() - .to_string(); + let err = + parse_actor_path("/gateway/lobby?rvt-namespace=default&rvt-method=get&rvt-unknown=value") + .unwrap_err() + .to_string(); assert!( err.contains("unknown field"), "expected unknown field error, got: {err}" @@ -406,10 +404,9 @@ fn rejects_invalid_cbor_input() { #[test] fn rejects_raw_at_token_syntax_in_query_paths() { - let err = - parse_actor_path("/gateway/lobby@token/connect?rvt-namespace=default&rvt-method=get") - .unwrap_err() - .to_string(); + let err = parse_actor_path("/gateway/lobby@token/connect?rvt-namespace=default&rvt-method=get") + .unwrap_err() + .to_string(); assert!(err.contains("query gateway paths must not use @token syntax")); } @@ -454,11 +451,10 @@ fn rejects_crash_policy_for_get_queries() { #[test] fn rejects_runner_for_get_queries() { - let err = parse_actor_path( - "/gateway/lobby?rvt-namespace=default&rvt-method=get&rvt-runner=default", - ) - .unwrap_err() - .to_string(); + let err = + parse_actor_path("/gateway/lobby?rvt-namespace=default&rvt-method=get&rvt-runner=default") + .unwrap_err() + .to_string(); assert!(err.contains( "query gateway method=get does not allow rvt-input, rvt-region, rvt-crash-policy, or rvt-runner params" )); @@ -466,13 +462,10 @@ fn rejects_runner_for_get_queries() { #[test] fn rejects_missing_runner_for_get_or_create_queries() { - let err = - parse_actor_path("/gateway/lobby?rvt-namespace=default&rvt-method=getOrCreate") - .unwrap_err() - .to_string(); - assert!(err.contains( - "query gateway method=getOrCreate requires rvt-runner param" - )); + let err = parse_actor_path("/gateway/lobby?rvt-namespace=default&rvt-method=getOrCreate") + .unwrap_err() + .to_string(); + assert!(err.contains("query gateway method=getOrCreate requires rvt-runner param")); } #[test] diff --git a/engine/packages/pegboard-envoy/src/conn.rs b/engine/packages/pegboard-envoy/src/conn.rs index 9f47c426c9..95e0fc1d7d 100644 --- a/engine/packages/pegboard-envoy/src/conn.rs +++ b/engine/packages/pegboard-envoy/src/conn.rs @@ -354,36 +354,28 @@ pub async fn handle_init( // Send missed commands if !missed_commands.is_empty() { let db = ctx.udb()?; - let msg = - { - for cmd_wrapper in &mut missed_commands { - if let protocol::Command::CommandStartActor(ref mut start) = - cmd_wrapper.inner - { - let actor_id = cmd_wrapper - .checkpoint - .actor_id - .parse::() - .context( - "failed to parse actor_id from missed envoy command", - )?; - let preloaded = - pegboard::actor_kv::preload::fetch_preloaded_kv( - &db, - pb, - actor_id, - conn.namespace_id, - &start.config.name, - ) - .await?; - start.preloaded_kv = preloaded; - } + let msg = { + for cmd_wrapper in &mut missed_commands { + if let protocol::Command::CommandStartActor(ref mut start) = cmd_wrapper.inner { + let actor_id = cmd_wrapper + .checkpoint + .actor_id + .parse::() + .context("failed to parse actor_id from missed envoy command")?; + let preloaded = pegboard::actor_kv::preload::fetch_preloaded_kv( + &db, + pb, + actor_id, + conn.namespace_id, + &start.config.name, + ) + .await?; + start.preloaded_kv = preloaded; } + } - versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands( - missed_commands, - )) - }; + versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(missed_commands)) + }; let msg_serialized = msg.serialize(conn.protocol_version)?; conn.ws_handle .send(Message::Binary(msg_serialized.into())) diff --git a/engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs b/engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs index 32f230fdff..0fdb4c1716 100644 --- a/engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs +++ b/engine/packages/pegboard-envoy/src/tunnel_to_ws_task.rs @@ -126,15 +126,11 @@ async fn handle_message( protocol::ToEnvoyConn::ToEnvoyCommands(mut command_wrappers) => { // TODO: Parallelize for command_wrapper in &mut command_wrappers { - if let protocol::Command::CommandStartActor(start) = - &mut command_wrapper.inner - { + if let protocol::Command::CommandStartActor(start) = &mut command_wrapper.inner { let actor_id = Id::parse(&command_wrapper.checkpoint.actor_id)?; let actor_name = start.config.name.clone(); let ids = ctx - .op(pegboard::ops::actor::hibernating_request::list::Input { - actor_id, - }) + .op(pegboard::ops::actor::hibernating_request::list::Input { actor_id }) .await?; // Dynamically populate hibernating request ids @@ -148,15 +144,14 @@ async fn handle_message( if start.preloaded_kv.is_none() { let db = ctx.udb()?; - start.preloaded_kv = - pegboard::actor_kv::preload::fetch_preloaded_kv( - &db, - ctx.config().pegboard(), - actor_id, - conn.namespace_id, - &actor_name, - ) - .await?; + start.preloaded_kv = pegboard::actor_kv::preload::fetch_preloaded_kv( + &db, + ctx.config().pegboard(), + actor_id, + conn.namespace_id, + &actor_name, + ) + .await?; } } } diff --git a/engine/packages/pegboard-kv-channel/src/lib.rs b/engine/packages/pegboard-kv-channel/src/lib.rs index 79a45fcf2a..d403838f4a 100644 --- a/engine/packages/pegboard-kv-channel/src/lib.rs +++ b/engine/packages/pegboard-kv-channel/src/lib.rs @@ -7,8 +7,8 @@ mod metrics; use std::collections::{HashMap, HashSet}; -use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicI64, Ordering}; use std::time::{Duration, Instant}; use anyhow::{Context, Result}; @@ -21,8 +21,7 @@ use hyper::{Response, StatusCode}; use hyper_tungstenite::tungstenite::Message; use pegboard::actor_kv; use rivet_guard_core::{ - ResponseBody, WebSocketHandle, custom_serve::CustomServeTrait, - request_context::RequestContext, + ResponseBody, WebSocketHandle, custom_serve::CustomServeTrait, request_context::RequestContext, }; use tokio::sync::{Mutex, mpsc, watch}; use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame; @@ -270,7 +269,7 @@ async fn message_loop( // parallelism. Do not use tokio::spawn per request as that would break // optimistic pipelining and journal write ordering. // See docs-internal/engine/NATIVE_SQLITE_REVIEW_FINDINGS.md Finding 2. - let mut actor_channels: HashMap> = + let mut actor_channels: HashMap> = HashMap::new(); let mut actor_tasks = tokio::task::JoinSet::new(); @@ -339,7 +338,7 @@ async fn handle_binary_message( open_actors: &Arc>>, last_pong_ts: &AtomicI64, data: &[u8], - actor_channels: &mut HashMap>, + actor_channels: &mut HashMap>, actor_tasks: &mut tokio::task::JoinSet<()>, ) -> Result<()> { let msg = match protocol::decode_to_server(data) { @@ -355,11 +354,11 @@ async fn handle_binary_message( }; match msg { - protocol::ToServer::ToServerPong(pong) => { + protocol::ToRivet::ToRivetPong(pong) => { last_pong_ts.store(util::timestamp::now(), Ordering::Relaxed); tracing::trace!(ts = pong.ts, "received pong"); } - protocol::ToServer::ToServerRequest(req) => { + protocol::ToRivet::ToRivetRequest(req) => { let is_close = matches!(req.data, protocol::RequestData::ActorCloseRequest); let actor_id = req.actor_id.clone(); let request_id = req.request_id; @@ -401,10 +400,7 @@ async fn handle_binary_message( send_response( ws_handle, request_id, - error_response( - "internal_error", - "internal error", - ), + error_response("internal_error", "internal error"), ) .await; } @@ -432,7 +428,7 @@ async fn actor_request_task( conn_id: Uuid, namespace_id: Id, open_actors: Arc>>, - mut rx: mpsc::Receiver, + mut rx: mpsc::Receiver, ) { // Cached actor resolution. Populated on first KV request, reused for all // subsequent requests. Actor name is immutable so this never goes stale. @@ -443,8 +439,7 @@ async fn actor_request_task( let response_data = match &req.data { // Open/close are lifecycle ops that don't need a resolved actor. - protocol::RequestData::ActorOpenRequest - | protocol::RequestData::ActorCloseRequest => { + protocol::RequestData::ActorOpenRequest | protocol::RequestData::ActorCloseRequest => { handle_request(&ctx, &state, conn_id, namespace_id, &open_actors, &req).await } // KV ops: resolve once, cache, reuse. @@ -453,15 +448,9 @@ async fn actor_request_task( if !is_open { let locks = state.actor_locks.lock().await; if locks.contains_key(&req.actor_id) { - error_response( - "actor_locked", - "actor is locked by another connection", - ) + error_response("actor_locked", "actor is locked by another connection") } else { - error_response( - "actor_not_open", - "actor is not opened on this connection", - ) + error_response("actor_not_open", "actor is not opened on this connection") } } else { // Lazy-resolve and cache. @@ -518,15 +507,9 @@ async fn actor_request_task( } /// Encode and send a response to the client. Logs warnings on failure. -async fn send_response( - ws_handle: &WebSocketHandle, - request_id: u32, - data: protocol::ResponseData, -) { - let response = protocol::ToClient::ToClientResponse(protocol::ToClientResponse { - request_id, - data, - }); +async fn send_response(ws_handle: &WebSocketHandle, request_id: u32, data: protocol::ResponseData) { + let response = + protocol::ToClient::ToClientResponse(protocol::ToClientResponse { request_id, data }); match protocol::encode_to_client(&response) { Ok(encoded) => { @@ -550,7 +533,7 @@ async fn handle_request( conn_id: Uuid, _namespace_id: Id, open_actors: &Arc>>, - req: &protocol::ToServerRequest, + req: &protocol::ToRivetRequest, ) -> protocol::ResponseData { match &req.data { protocol::RequestData::ActorOpenRequest => { @@ -577,9 +560,7 @@ async fn handle_actor_open( if current_count >= MAX_ACTORS_PER_CONNECTION { return error_response( "too_many_actors", - &format!( - "connection has too many open actors (max {MAX_ACTORS_PER_CONNECTION})" - ), + &format!("connection has too many open actors (max {MAX_ACTORS_PER_CONNECTION})"), ); } } @@ -636,8 +617,12 @@ async fn handle_kv_get( body: &protocol::KvGetRequest, ) -> protocol::ResponseData { let start = Instant::now(); - metrics::KV_CHANNEL_REQUESTS_TOTAL.with_label_values(&["get"]).inc(); - metrics::KV_CHANNEL_REQUEST_KEYS.with_label_values(&["get"]).observe(body.keys.len() as f64); + metrics::KV_CHANNEL_REQUESTS_TOTAL + .with_label_values(&["get"]) + .inc(); + metrics::KV_CHANNEL_REQUEST_KEYS + .with_label_values(&["get"]) + .observe(body.keys.len() as f64); if let Err(resp) = validate_keys(&body.keys) { return resp; @@ -654,7 +639,9 @@ async fn handle_kv_get( } Err(err) => internal_error(&err), }; - metrics::KV_CHANNEL_REQUEST_DURATION.with_label_values(&["get"]).observe(start.elapsed().as_secs_f64()); + metrics::KV_CHANNEL_REQUEST_DURATION + .with_label_values(&["get"]) + .observe(start.elapsed().as_secs_f64()); result } @@ -664,8 +651,12 @@ async fn handle_kv_put( body: &protocol::KvPutRequest, ) -> protocol::ResponseData { let start = Instant::now(); - metrics::KV_CHANNEL_REQUESTS_TOTAL.with_label_values(&["put"]).inc(); - metrics::KV_CHANNEL_REQUEST_KEYS.with_label_values(&["put"]).observe(body.keys.len() as f64); + metrics::KV_CHANNEL_REQUESTS_TOTAL + .with_label_values(&["put"]) + .inc(); + metrics::KV_CHANNEL_REQUEST_KEYS + .with_label_values(&["put"]) + .observe(body.keys.len() as f64); // Validate keys/values length match. if body.keys.len() != body.values.len() { @@ -687,7 +678,10 @@ async fn handle_kv_put( if key.len() + KEY_WRAPPER_OVERHEAD > MAX_KEY_SIZE { return error_response( "key_too_large", - &format!("key is too long (max {} bytes)", MAX_KEY_SIZE - KEY_WRAPPER_OVERHEAD), + &format!( + "key is too long (max {} bytes)", + MAX_KEY_SIZE - KEY_WRAPPER_OVERHEAD + ), ); } } @@ -700,7 +694,11 @@ async fn handle_kv_put( } } - let payload_size: usize = body.keys.iter().map(|k| k.len() + KEY_WRAPPER_OVERHEAD).sum::() + let payload_size: usize = body + .keys + .iter() + .map(|k| k.len() + KEY_WRAPPER_OVERHEAD) + .sum::() + body.values.iter().map(|v| v.len()).sum::(); if payload_size > MAX_PUT_PAYLOAD_SIZE { return error_response( @@ -717,7 +715,8 @@ async fn handle_kv_put( Err(err) => return internal_error(&err), }; - let result = match actor_kv::put(&*udb, recipient, body.keys.clone(), body.values.clone()).await { + let result = match actor_kv::put(&*udb, recipient, body.keys.clone(), body.values.clone()).await + { Ok(()) => protocol::ResponseData::KvPutResponse, Err(err) => { let rivet_err = rivet_error::RivetError::extract(&err); @@ -728,7 +727,9 @@ async fn handle_kv_put( } } }; - metrics::KV_CHANNEL_REQUEST_DURATION.with_label_values(&["put"]).observe(start.elapsed().as_secs_f64()); + metrics::KV_CHANNEL_REQUEST_DURATION + .with_label_values(&["put"]) + .observe(start.elapsed().as_secs_f64()); result } @@ -738,8 +739,12 @@ async fn handle_kv_delete( body: &protocol::KvDeleteRequest, ) -> protocol::ResponseData { let start = Instant::now(); - metrics::KV_CHANNEL_REQUESTS_TOTAL.with_label_values(&["delete"]).inc(); - metrics::KV_CHANNEL_REQUEST_KEYS.with_label_values(&["delete"]).observe(body.keys.len() as f64); + metrics::KV_CHANNEL_REQUESTS_TOTAL + .with_label_values(&["delete"]) + .inc(); + metrics::KV_CHANNEL_REQUEST_KEYS + .with_label_values(&["delete"]) + .observe(body.keys.len() as f64); if let Err(resp) = validate_keys(&body.keys) { return resp; @@ -754,7 +759,9 @@ async fn handle_kv_delete( Ok(()) => protocol::ResponseData::KvDeleteResponse, Err(err) => internal_error(&err), }; - metrics::KV_CHANNEL_REQUEST_DURATION.with_label_values(&["delete"]).observe(start.elapsed().as_secs_f64()); + metrics::KV_CHANNEL_REQUEST_DURATION + .with_label_values(&["delete"]) + .observe(start.elapsed().as_secs_f64()); result } @@ -764,17 +771,25 @@ async fn handle_kv_delete_range( body: &protocol::KvDeleteRangeRequest, ) -> protocol::ResponseData { let start = Instant::now(); - metrics::KV_CHANNEL_REQUESTS_TOTAL.with_label_values(&["delete_range"]).inc(); + metrics::KV_CHANNEL_REQUESTS_TOTAL + .with_label_values(&["delete_range"]) + .inc(); if body.start.len() + KEY_WRAPPER_OVERHEAD > MAX_KEY_SIZE { return error_response( "key_too_large", - &format!("start key is too long (max {} bytes)", MAX_KEY_SIZE - KEY_WRAPPER_OVERHEAD), + &format!( + "start key is too long (max {} bytes)", + MAX_KEY_SIZE - KEY_WRAPPER_OVERHEAD + ), ); } if body.end.len() + KEY_WRAPPER_OVERHEAD > MAX_KEY_SIZE { return error_response( "key_too_large", - &format!("end key is too long (max {} bytes)", MAX_KEY_SIZE - KEY_WRAPPER_OVERHEAD), + &format!( + "end key is too long (max {} bytes)", + MAX_KEY_SIZE - KEY_WRAPPER_OVERHEAD + ), ); } @@ -783,11 +798,20 @@ async fn handle_kv_delete_range( Err(err) => return internal_error(&err), }; - let result = match actor_kv::delete_range(&*udb, recipient, body.start.clone(), body.end.clone()).await { + let result = match actor_kv::delete_range( + &*udb, + recipient, + body.start.clone(), + body.end.clone(), + ) + .await + { Ok(()) => protocol::ResponseData::KvDeleteResponse, Err(err) => internal_error(&err), }; - metrics::KV_CHANNEL_REQUEST_DURATION.with_label_values(&["delete_range"]).observe(start.elapsed().as_secs_f64()); + metrics::KV_CHANNEL_REQUEST_DURATION + .with_label_values(&["delete_range"]) + .observe(start.elapsed().as_secs_f64()); result } @@ -804,12 +828,8 @@ async fn resolve_actor( actor_id: &str, expected_namespace_id: Id, ) -> std::result::Result<(Id, String), protocol::ResponseData> { - let parsed_id = Id::parse(actor_id).map_err(|err| { - error_response( - "actor_not_found", - &format!("invalid actor id: {err}"), - ) - })?; + let parsed_id = Id::parse(actor_id) + .map_err(|err| error_response("actor_not_found", &format!("invalid actor id: {err}")))?; let actor = ctx .op(pegboard::ops::actor::get_for_runner::Input { @@ -847,7 +867,10 @@ fn validate_keys(keys: &[protocol::KvKey]) -> std::result::Result<(), protocol:: if key.len() + KEY_WRAPPER_OVERHEAD > MAX_KEY_SIZE { return Err(error_response( "key_too_large", - &format!("key is too long (max {} bytes)", MAX_KEY_SIZE - KEY_WRAPPER_OVERHEAD), + &format!( + "key is too long (max {} bytes)", + MAX_KEY_SIZE - KEY_WRAPPER_OVERHEAD + ), )); } } diff --git a/engine/packages/pegboard-outbound/src/lib.rs b/engine/packages/pegboard-outbound/src/lib.rs index d45a1726ec..3cf2d9ba58 100644 --- a/engine/packages/pegboard-outbound/src/lib.rs +++ b/engine/packages/pegboard-outbound/src/lib.rs @@ -167,7 +167,8 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()> tracing::debug!(?namespace_id, %pool_name, ?actor_id, ?generation, "received outbound request"); // Check pool - let (pool_res, namespace_res) = tokio::try_join!( + let db = ctx.udb()?; + let (pool_res, namespace_res, preloaded_kv) = tokio::try_join!( ctx.op(pegboard::ops::runner_config::get::Input { runners: vec![(namespace_id, pool_name.clone())], bypass_cache: false, @@ -175,6 +176,13 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()> ctx.op(namespace::ops::get_global::Input { namespace_ids: vec![namespace_id], }), + pegboard::actor_kv::preload::fetch_preloaded_kv( + &db, + ctx.config().pegboard(), + actor_id, + namespace_id, + &actor_config.name, + ), )?; let Some(pool) = pool_res.into_iter().next() else { tracing::debug!("pool does not exist, ending outbound handler"); @@ -192,16 +200,6 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()> return Ok(()); }; - let udb = ctx.udb()?; - let preloaded_kv = pegboard::actor_kv::preload::fetch_preloaded_kv( - &udb, - ctx.config().pegboard(), - actor_id, - namespace_id, - &actor_config.name, - ) - .await?; - let payload = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(vec![ protocol::CommandWrapper { checkpoint, diff --git a/engine/packages/pegboard-runner/src/lib.rs b/engine/packages/pegboard-runner/src/lib.rs index db97f070fd..fcd7a7f649 100644 --- a/engine/packages/pegboard-runner/src/lib.rs +++ b/engine/packages/pegboard-runner/src/lib.rs @@ -26,6 +26,7 @@ mod ws_to_tunnel_task; enum LifecycleResult { Closed, Aborted, + Evicted, } pub struct PegboardRunnerWsCustomServe { @@ -222,34 +223,40 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe { ); // Determine single result from all tasks - let lifecycle_res = match (tunnel_to_ws_res, ws_to_tunnel_res, ping_res) { + let mut lifecycle_res = match (tunnel_to_ws_res, ws_to_tunnel_res, ping_res) { // Prefer error (Err(err), _, _) => Err(err), (_, Err(err), _) => Err(err), (_, _, Err(err)) => Err(err), - // Prefer non aborted result if both succeed + // Prefer non aborted result (Ok(res), Ok(LifecycleResult::Aborted), _) => Ok(res), (Ok(LifecycleResult::Aborted), Ok(res), _) => Ok(res), // Unlikely case (res, _, _) => res, }; - // Make runner immediately ineligible when it disconnects - let update_alloc_res = self - .ctx - .op(pegboard::ops::runner::update_alloc_idx::Input { - runners: vec![pegboard::ops::runner::update_alloc_idx::Runner { - runner_id: conn.runner_id, - action: Action::ClearIdx, - }], - }) - .await; - if let Err(err) = update_alloc_res { - tracing::error!( - runner_id=?conn.runner_id, - ?err, - "critical: failed to evict runner from allocation index during disconnect" - ); + if let Ok(LifecycleResult::Evicted) = &lifecycle_res { + lifecycle_res = Err(errors::WsError::Eviction.build()); + } + // Clear alloc idx if not evicted + else { + // Make runner immediately ineligible when it disconnects + let update_alloc_res = self + .ctx + .op(pegboard::ops::runner::update_alloc_idx::Input { + runners: vec![pegboard::ops::runner::update_alloc_idx::Runner { + runner_id: conn.runner_id, + action: Action::ClearIdx, + }], + }) + .await; + if let Err(err) = update_alloc_res { + tracing::error!( + runner_id=?conn.runner_id, + ?err, + "failed to evict runner from allocation index during disconnect" + ); + } } tracing::debug!(%topic, "runner websocket closed"); diff --git a/engine/packages/pegboard-runner/src/metrics.rs b/engine/packages/pegboard-runner/src/metrics.rs index 931d0c9077..c5edab2c14 100644 --- a/engine/packages/pegboard-runner/src/metrics.rs +++ b/engine/packages/pegboard-runner/src/metrics.rs @@ -31,13 +31,13 @@ lazy_static::lazy_static! { ).unwrap(); pub static ref EVENT_MULTIPLEXER_COUNT: IntGauge = register_int_gauge_with_registry!( - "pegboard_event_multiplexer_count", + "pegboard_runner_event_multiplexer_count", "Number of active actor event multiplexers.", *REGISTRY ).unwrap(); pub static ref INGESTED_EVENTS_TOTAL: IntCounter = register_int_counter_with_registry!( - "pegboard_ingested_events_total", + "pegboard_runner_ingested_events_total", "Count of actor events.", *REGISTRY ).unwrap(); diff --git a/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs b/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs index 1c8f2168ba..6763e27519 100644 --- a/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs +++ b/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs @@ -66,7 +66,7 @@ async fn recv_msg( ]) .inc(); - return Err(errors::WsError::Eviction.build()); + return Ok(Err(LifecycleResult::Evicted)); } _ = tunnel_to_ws_abort_rx.changed() => { tracing::debug!("task aborted"); diff --git a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs index 861bf440ee..f3492a706f 100644 --- a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs @@ -6,8 +6,8 @@ use gas::prelude::*; use hyper_tungstenite::tungstenite::Message; use pegboard::actor_kv; use pegboard::pubsub_subjects::GatewayReceiverSubject; -use rivet_guard_core::websocket_handle::WebSocketReceiver; use rivet_envoy_protocol as ep; +use rivet_guard_core::websocket_handle::WebSocketReceiver; use rivet_runner_protocol::{self as protocol, PROTOCOL_MK2_VERSION, versioned}; use std::sync::{Arc, atomic::Ordering}; use tokio::sync::{Mutex, MutexGuard, watch}; @@ -106,7 +106,7 @@ async fn recv_msg( ]) .inc(); - return Err(errors::WsError::Eviction.build()); + return Ok(Err(LifecycleResult::Evicted)); } _ = ws_to_tunnel_abort_rx.changed() => { tracing::debug!("task aborted"); @@ -242,9 +242,9 @@ async fn handle_message_mk2( values, metadata: metadata .into_iter() - .map(|x| protocol::mk2::KvMetadata { - version: x.version, - update_ts: x.update_ts, + .map(|m| protocol::mk2::KvMetadata { + version: m.version, + update_ts: m.update_ts, }) .collect(), }, @@ -277,16 +277,16 @@ async fn handle_message_mk2( protocol::mk2::KvListQuery::KvListAllQuery => { ep::KvListQuery::KvListAllQuery } - protocol::mk2::KvListQuery::KvListRangeQuery(q) => { + protocol::mk2::KvListQuery::KvListRangeQuery(x) => { ep::KvListQuery::KvListRangeQuery(ep::KvListRangeQuery { - start: q.start, - end: q.end, - exclusive: q.exclusive, + start: x.start, + end: x.end, + exclusive: x.exclusive, }) } - protocol::mk2::KvListQuery::KvListPrefixQuery(q) => { + protocol::mk2::KvListQuery::KvListPrefixQuery(x) => { ep::KvListQuery::KvListPrefixQuery(ep::KvListPrefixQuery { - key: q.key, + key: x.key, }) } }, @@ -310,9 +310,9 @@ async fn handle_message_mk2( values, metadata: metadata .into_iter() - .map(|x| protocol::mk2::KvMetadata { - version: x.version, - update_ts: x.update_ts, + .map(|m| protocol::mk2::KvMetadata { + version: m.version, + update_ts: m.update_ts, }) .collect(), }, @@ -636,10 +636,10 @@ async fn handle_message_mk1(ctx: &StandaloneCtx, conn: &Conn, msg: Bytes) -> Res } protocol::KvListQuery::KvListRangeQuery(q) => { ep::KvListQuery::KvListRangeQuery(ep::KvListRangeQuery { - start: q.start, - end: q.end, - exclusive: q.exclusive, - }) + start: q.start, + end: q.end, + exclusive: q.exclusive, + }) } protocol::KvListQuery::KvListPrefixQuery(q) => { ep::KvListQuery::KvListPrefixQuery(ep::KvListPrefixQuery { diff --git a/engine/packages/pegboard/Cargo.toml b/engine/packages/pegboard/Cargo.toml index f069963674..d04bf7f095 100644 --- a/engine/packages/pegboard/Cargo.toml +++ b/engine/packages/pegboard/Cargo.toml @@ -47,6 +47,7 @@ vbare.workspace = true [dev-dependencies] portpicker.workspace = true +test-snapshot-gen.workspace = true rivet-config.workspace = true rivet-test-deps.workspace = true tokio.workspace = true diff --git a/engine/packages/pegboard/src/actor_kv/mod.rs b/engine/packages/pegboard/src/actor_kv/mod.rs index d4689e4268..683f8ae6f4 100644 --- a/engine/packages/pegboard/src/actor_kv/mod.rs +++ b/engine/packages/pegboard/src/actor_kv/mod.rs @@ -49,101 +49,108 @@ pub async fn get( keys: Vec, ) -> Result<(Vec, Vec, Vec)> { let start = std::time::Instant::now(); -metrics::ACTOR_KV_KEYS_PER_OP.with_label_values(&["get"]).observe(keys.len() as f64); + metrics::ACTOR_KV_KEYS_PER_OP + .with_label_values(&["get"]) + .observe(keys.len() as f64); validate_keys(&keys)?; - let result = db.run(|tx| { - let keys = keys.clone(); - async move { - let tx = tx.with_subspace(keys::actor_kv::subspace(recipient.actor_id)); - - let mut stream = futures_util::stream::iter(keys) - .map(|key| { - let key_subspace = keys::actor_kv::subspace(recipient.actor_id) - .subspace(&keys::actor_kv::KeyWrapper(key)); - - // Get all sub keys in the key subspace - tx.get_ranges_keyvalues( - universaldb::RangeOption { - mode: universaldb::options::StreamingMode::WantAll, - ..key_subspace.range().into() - }, - Serializable, - ) - }) - .flatten(); - - let mut keys = Vec::new(); - let mut values = Vec::new(); - let mut metadata = Vec::new(); - let mut total_size = 0; - let mut current_entry: Option = None; - - loop { - let Some(entry) = stream.try_next().await? else { - break; - }; - - total_size += entry.key().len() + entry.value().len(); - - let key = tx.unpack::(&entry.key())?.key; - - let current_entry = if let Some(inner) = &mut current_entry { - if inner.key != key { - let (key, value, meta) = - std::mem::replace(inner, EntryBuilder::new(key)).build()?; + let result = db + .run(|tx| { + let keys = keys.clone(); + async move { + let tx = tx.with_subspace(keys::actor_kv::subspace(recipient.actor_id)); + + let mut stream = futures_util::stream::iter(keys) + .map(|key| { + let key_subspace = keys::actor_kv::subspace(recipient.actor_id) + .subspace(&keys::actor_kv::KeyWrapper(key)); + + // Get all sub keys in the key subspace + tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: universaldb::options::StreamingMode::WantAll, + ..key_subspace.range().into() + }, + Serializable, + ) + }) + .flatten(); + + let mut keys = Vec::new(); + let mut values = Vec::new(); + let mut metadata = Vec::new(); + let mut total_size = 0; + let mut current_entry: Option = None; + + loop { + let Some(entry) = stream.try_next().await? else { + break; + }; + + total_size += entry.key().len() + entry.value().len(); + + let key = tx.unpack::(&entry.key())?.key; + + let current_entry = if let Some(inner) = &mut current_entry { + if inner.key != key { + let (key, value, meta) = + std::mem::replace(inner, EntryBuilder::new(key)).build()?; + + keys.push(key); + values.push(value); + metadata.push(meta); + } - keys.push(key); - values.push(value); - metadata.push(meta); + inner + } else { + current_entry = Some(EntryBuilder::new(key)); + + current_entry.as_mut().expect("must be set") + }; + + if let Ok(chunk_key) = + tx.unpack::(&entry.key()) + { + current_entry.append_chunk(chunk_key.chunk, entry.value()); + } else if let Ok(metadata_key) = + tx.unpack::(&entry.key()) + { + let value = metadata_key.deserialize(entry.value())?; + + current_entry.append_metadata(value); + } else { + bail!("unexpected sub key"); } + } - inner - } else { - current_entry = Some(EntryBuilder::new(key)); - - current_entry.as_mut().expect("must be set") - }; - - if let Ok(chunk_key) = tx.unpack::(&entry.key()) - { - current_entry.append_chunk(chunk_key.chunk, entry.value()); - } else if let Ok(metadata_key) = - tx.unpack::(&entry.key()) - { - let value = metadata_key.deserialize(entry.value())?; + if let Some(inner) = current_entry { + let (key, value, meta) = inner.build()?; - current_entry.append_metadata(value); - } else { - bail!("unexpected sub key"); + keys.push(key); + values.push(value); + metadata.push(meta); } - } - if let Some(inner) = current_entry { - let (key, value, meta) = inner.build()?; - - keys.push(key); - values.push(value); - metadata.push(meta); + // Total read bytes (rounded up to nearest chunk) + let total_size_chunked = (total_size as u64) + .div_ceil(util::metric::KV_BILLABLE_CHUNK) + * util::metric::KV_BILLABLE_CHUNK; + namespace::keys::metric::inc( + &tx.with_subspace(namespace::keys::subspace()), + recipient.namespace_id, + namespace::keys::metric::Metric::KvRead(recipient.name.clone()), + total_size_chunked.try_into().unwrap_or_default(), + ); + + Ok((keys, values, metadata)) } - - // Total read bytes (rounded up to nearest chunk) - let total_size_chunked = (total_size as u64).div_ceil(util::metric::KV_BILLABLE_CHUNK) - * util::metric::KV_BILLABLE_CHUNK; - namespace::keys::metric::inc( - &tx.with_subspace(namespace::keys::subspace()), - recipient.namespace_id, - namespace::keys::metric::Metric::KvRead(recipient.name.clone()), - total_size_chunked.try_into().unwrap_or_default(), - ); - - Ok((keys, values, metadata)) - } - }) - .custom_instrument(tracing::info_span!("kv_get_tx")) - .await - .map_err(Into::::into); - metrics::ACTOR_KV_OPERATION_DURATION.with_label_values(&["get"]).observe(start.elapsed().as_secs_f64()); + }) + .custom_instrument(tracing::info_span!("kv_get_tx")) + .await + .map_err(Into::::into); + metrics::ACTOR_KV_OPERATION_DURATION + .with_label_values(&["get"]) + .observe(start.elapsed().as_secs_f64()); result } @@ -268,79 +275,85 @@ pub async fn put( values: Vec, ) -> Result<()> { let start = std::time::Instant::now(); -metrics::ACTOR_KV_KEYS_PER_OP.with_label_values(&["put"]).observe(keys.len() as f64); + metrics::ACTOR_KV_KEYS_PER_OP + .with_label_values(&["put"]) + .observe(keys.len() as f64); let keys = &keys; let values = &values; - let result = db.run(|tx| { - async move { - let total_size = estimate_kv_size(&tx, recipient.actor_id).await? as usize; - - validate_entries(&keys, &values, total_size)?; - - let subspace = &keys::actor_kv::subspace(recipient.actor_id); - let tx = tx.with_subspace(subspace.clone()); - let now = util::timestamp::now(); - - // TODO: Include metadata size? - // Total written bytes (rounded up to nearest chunk) - let total_size = keys.iter().fold(0, |s, key| s + key.len()) - + values.iter().fold(0, |s, value| s + value.len()); - let total_size_chunked = (total_size as u64).div_ceil(util::metric::KV_BILLABLE_CHUNK) - * util::metric::KV_BILLABLE_CHUNK; - namespace::keys::metric::inc( - &tx.with_subspace(namespace::keys::subspace()), - recipient.namespace_id, - namespace::keys::metric::Metric::KvWrite(recipient.name.clone()), - total_size_chunked.try_into().unwrap_or_default(), - ); - - futures_util::stream::iter(0..keys.len()) - .map(|i| { - let tx = tx.clone(); - async move { - // TODO: Costly clone - let key = keys::actor_kv::KeyWrapper( - keys.get(i).context("index should exist")?.clone(), - ); - let value = values.get(i).context("index should exist")?; - // Clear previous key data before setting - tx.clear_subspace_range(&subspace.subspace(&key)); - - // Set metadata - tx.write( - &keys::actor_kv::EntryMetadataKey::new(key.clone()), - ep::KvMetadata { - version: VERSION.as_bytes().to_vec(), - update_ts: now, - }, - )?; - - // Set key data in chunks - for start in (0..value.len()).step_by(VALUE_CHUNK_SIZE) { - let idx = start / VALUE_CHUNK_SIZE; - let end = (start + VALUE_CHUNK_SIZE).min(value.len()); - - tx.set( - &subspace.pack(&keys::actor_kv::EntryValueChunkKey::new( - key.clone(), - idx, - )), - &value.get(start..end).context("bad slice")?, + let result = db + .run(|tx| { + async move { + let total_size = estimate_kv_size(&tx, recipient.actor_id).await? as usize; + + validate_entries(&keys, &values, total_size)?; + + let subspace = &keys::actor_kv::subspace(recipient.actor_id); + let tx = tx.with_subspace(subspace.clone()); + let now = util::timestamp::now(); + + // TODO: Include metadata size? + // Total written bytes (rounded up to nearest chunk) + let total_size = keys.iter().fold(0, |s, key| s + key.len()) + + values.iter().fold(0, |s, value| s + value.len()); + let total_size_chunked = (total_size as u64) + .div_ceil(util::metric::KV_BILLABLE_CHUNK) + * util::metric::KV_BILLABLE_CHUNK; + namespace::keys::metric::inc( + &tx.with_subspace(namespace::keys::subspace()), + recipient.namespace_id, + namespace::keys::metric::Metric::KvWrite(recipient.name.clone()), + total_size_chunked.try_into().unwrap_or_default(), + ); + + futures_util::stream::iter(0..keys.len()) + .map(|i| { + let tx = tx.clone(); + async move { + // TODO: Costly clone + let key = keys::actor_kv::KeyWrapper( + keys.get(i).context("index should exist")?.clone(), ); + let value = values.get(i).context("index should exist")?; + // Clear previous key data before setting + tx.clear_subspace_range(&subspace.subspace(&key)); + + // Set metadata + tx.write( + &keys::actor_kv::EntryMetadataKey::new(key.clone()), + ep::KvMetadata { + version: VERSION.as_bytes().to_vec(), + update_ts: now, + }, + )?; + + // Set key data in chunks + for start in (0..value.len()).step_by(VALUE_CHUNK_SIZE) { + let idx = start / VALUE_CHUNK_SIZE; + let end = (start + VALUE_CHUNK_SIZE).min(value.len()); + + tx.set( + &subspace.pack(&keys::actor_kv::EntryValueChunkKey::new( + key.clone(), + idx, + )), + &value.get(start..end).context("bad slice")?, + ); + } + + Ok(()) } - - Ok(()) - } - }) - .buffer_unordered(32) - .try_collect() - .await - } - }) - .custom_instrument(tracing::info_span!("kv_put_tx")) - .await - .map_err(Into::into); - metrics::ACTOR_KV_OPERATION_DURATION.with_label_values(&["put"]).observe(start.elapsed().as_secs_f64()); + }) + .buffer_unordered(32) + .try_collect() + .await + } + }) + .custom_instrument(tracing::info_span!("kv_put_tx")) + .await + .map_err(Into::into); + metrics::ACTOR_KV_OPERATION_DURATION + .with_label_values(&["put"]) + .observe(start.elapsed().as_secs_f64()); result } @@ -352,38 +365,44 @@ pub async fn delete( keys: Vec, ) -> Result<()> { let start = std::time::Instant::now(); -metrics::ACTOR_KV_KEYS_PER_OP.with_label_values(&["delete"]).observe(keys.len() as f64); + metrics::ACTOR_KV_KEYS_PER_OP + .with_label_values(&["delete"]) + .observe(keys.len() as f64); validate_keys(&keys)?; let keys = &keys; - let result = db.run(|tx| { - async move { - // Total written bytes (rounded up to nearest chunk) - let total_size = keys.iter().fold(0, |s, key| s + key.len()); - let total_size_chunked = (total_size as u64).div_ceil(util::metric::KV_BILLABLE_CHUNK) - * util::metric::KV_BILLABLE_CHUNK; - namespace::keys::metric::inc( - &tx.with_subspace(namespace::keys::subspace()), - recipient.namespace_id, - namespace::keys::metric::Metric::KvWrite(recipient.name.clone()), - total_size_chunked.try_into().unwrap_or_default(), - ); + let result = db + .run(|tx| { + async move { + // Total written bytes (rounded up to nearest chunk) + let total_size = keys.iter().fold(0, |s, key| s + key.len()); + let total_size_chunked = (total_size as u64) + .div_ceil(util::metric::KV_BILLABLE_CHUNK) + * util::metric::KV_BILLABLE_CHUNK; + namespace::keys::metric::inc( + &tx.with_subspace(namespace::keys::subspace()), + recipient.namespace_id, + namespace::keys::metric::Metric::KvWrite(recipient.name.clone()), + total_size_chunked.try_into().unwrap_or_default(), + ); + + for key in keys { + // TODO: Costly clone + let key_subspace = keys::actor_kv::subspace(recipient.actor_id) + .subspace(&keys::actor_kv::KeyWrapper(key.clone())); - for key in keys { - // TODO: Costly clone - let key_subspace = keys::actor_kv::subspace(recipient.actor_id) - .subspace(&keys::actor_kv::KeyWrapper(key.clone())); + tx.clear_subspace_range(&key_subspace); + } - tx.clear_subspace_range(&key_subspace); + Ok(()) } - - Ok(()) - } - }) - .custom_instrument(tracing::info_span!("kv_delete_tx")) - .await - .map_err(Into::into); - metrics::ACTOR_KV_OPERATION_DURATION.with_label_values(&["delete"]).observe(start.elapsed().as_secs_f64()); + }) + .custom_instrument(tracing::info_span!("kv_delete_tx")) + .await + .map_err(Into::into); + metrics::ACTOR_KV_OPERATION_DURATION + .with_label_values(&["delete"]) + .observe(start.elapsed().as_secs_f64()); result } @@ -396,45 +415,51 @@ pub async fn delete_range( end: ep::KvKey, ) -> Result<()> { let timer = std::time::Instant::now(); -validate_range(&start, &end)?; + validate_range(&start, &end)?; if start >= end { - metrics::ACTOR_KV_OPERATION_DURATION.with_label_values(&["delete_range"]).observe(timer.elapsed().as_secs_f64()); + metrics::ACTOR_KV_OPERATION_DURATION + .with_label_values(&["delete_range"]) + .observe(timer.elapsed().as_secs_f64()); return Ok(()); } - let result = db.run(|tx| { - let start = start.clone(); - let end = end.clone(); - async move { - // Total written bytes (rounded up to nearest chunk) - let total_size = start.len() + end.len(); - let total_size_chunked = (total_size as u64).div_ceil(util::metric::KV_BILLABLE_CHUNK) - * util::metric::KV_BILLABLE_CHUNK; - namespace::keys::metric::inc( - &tx.with_subspace(namespace::keys::subspace()), - recipient.namespace_id, - namespace::keys::metric::Metric::KvWrite(recipient.name.clone()), - total_size_chunked.try_into().unwrap_or_default(), - ); - - let subspace = keys::actor_kv::subspace(recipient.actor_id); - let begin = subspace - .subspace(&keys::actor_kv::KeyWrapper(start)) - .range() - .0; - let end = subspace - .subspace(&keys::actor_kv::KeyWrapper(end)) - .range() - .0; - tx.clear_range(&begin, &end); + let result = db + .run(|tx| { + let start = start.clone(); + let end = end.clone(); + async move { + // Total written bytes (rounded up to nearest chunk) + let total_size = start.len() + end.len(); + let total_size_chunked = (total_size as u64) + .div_ceil(util::metric::KV_BILLABLE_CHUNK) + * util::metric::KV_BILLABLE_CHUNK; + namespace::keys::metric::inc( + &tx.with_subspace(namespace::keys::subspace()), + recipient.namespace_id, + namespace::keys::metric::Metric::KvWrite(recipient.name.clone()), + total_size_chunked.try_into().unwrap_or_default(), + ); + + let subspace = keys::actor_kv::subspace(recipient.actor_id); + let begin = subspace + .subspace(&keys::actor_kv::KeyWrapper(start)) + .range() + .0; + let end = subspace + .subspace(&keys::actor_kv::KeyWrapper(end)) + .range() + .0; + tx.clear_range(&begin, &end); - Ok(()) - } - }) - .custom_instrument(tracing::info_span!("kv_delete_range_tx")) - .await - .map_err(Into::into); - metrics::ACTOR_KV_OPERATION_DURATION.with_label_values(&["delete_range"]).observe(timer.elapsed().as_secs_f64()); + Ok(()) + } + }) + .custom_instrument(tracing::info_span!("kv_delete_range_tx")) + .await + .map_err(Into::into); + metrics::ACTOR_KV_OPERATION_DURATION + .with_label_values(&["delete_range"]) + .observe(timer.elapsed().as_secs_f64()); result } diff --git a/engine/packages/pegboard/src/actor_kv/preload.rs b/engine/packages/pegboard/src/actor_kv/preload.rs index 3b7f4cd824..f22e28d27c 100644 --- a/engine/packages/pegboard/src/actor_kv/preload.rs +++ b/engine/packages/pegboard/src/actor_kv/preload.rs @@ -99,8 +99,7 @@ pub(crate) async fn batch_preload( // Mark this key as scanned regardless of whether it exists in FDB. requested_get_keys.push(key.clone()); - let key_subspace = - subspace.subspace(&keys::actor_kv::KeyWrapper(key.clone())); + let key_subspace = subspace.subspace(&keys::actor_kv::KeyWrapper(key.clone())); let mut stream = tx.get_ranges_keyvalues( universaldb::RangeOption { mode: universaldb::options::StreamingMode::WantAll, @@ -113,9 +112,9 @@ pub(crate) async fn batch_preload( while let Some(fdb_kv) = stream.try_next().await? { if builder.is_none() { - let parsed_key = - tx.unpack::(&fdb_kv.key())? - .key; + let parsed_key = tx + .unpack::(&fdb_kv.key())? + .key; builder = Some(EntryBuilder::new(parsed_key)); } @@ -185,14 +184,14 @@ pub(crate) async fn batch_preload( let mut exceeded = false; while let Some(fdb_kv) = stream.try_next().await? { - let key = - tx.unpack::(&fdb_kv.key())?.key; + let key = tx + .unpack::(&fdb_kv.key())? + .key; let curr = if let Some(inner) = &mut current_entry { if inner.key != key { // Finalize the previous entry. - let prev = - std::mem::replace(inner, EntryBuilder::new(key)); + let prev = std::mem::replace(inner, EntryBuilder::new(key)); let (k, v, m) = prev.build()?; let size = entry_size(&k, &v, &m); @@ -301,10 +300,10 @@ pub async fn fetch_preloaded_kv( let metadata = db .run(|tx| { let tx = tx.with_subspace(keys::subspace()); - let name_key = - keys::ns::ActorNameKey::new(namespace_id, actor_name.to_string()); + let name_key = keys::ns::ActorNameKey::new(namespace_id, actor_name.to_string()); async move { tx.read_opt(&name_key, Snapshot).await } }) + .instrument(tracing::info_span!("read_actor_metadata_tx")) .await?; let metadata_map = metadata @@ -318,7 +317,8 @@ pub async fn fetch_preloaded_kv( return Ok(None); }; - if config.preload_max_total_bytes() == 0 { + let preload_max_total_bytes = config.preload_max_total_bytes(); + if preload_max_total_bytes == 0 { return Ok(None); }; @@ -337,7 +337,7 @@ pub async fn fetch_preloaded_kv( actor_id, preload_config.keys, prefix_requests, - config.preload_max_total_bytes(), + preload_max_total_bytes, ) .await?; diff --git a/engine/packages/pegboard/src/errors.rs b/engine/packages/pegboard/src/errors.rs index f59b03a2b6..4856a1cbdc 100644 --- a/engine/packages/pegboard/src/errors.rs +++ b/engine/packages/pegboard/src/errors.rs @@ -74,7 +74,10 @@ pub enum Actor { "Not enough space left in storage.", "Not enough space left in storage ({remaining} bytes remaining, current payload is {payload_size} bytes)." )] - KvStorageQuotaExceeded { remaining: usize, payload_size: usize }, + KvStorageQuotaExceeded { + remaining: usize, + payload_size: usize, + }, } #[derive(RivetError, Debug, Clone, Deserialize, Serialize)] diff --git a/engine/packages/pegboard/src/ops/actor/get_for_runner.rs b/engine/packages/pegboard/src/ops/actor/get_for_runner.rs index b34639ced2..4bc81fef81 100644 --- a/engine/packages/pegboard/src/ops/actor/get_for_runner.rs +++ b/engine/packages/pegboard/src/ops/actor/get_for_runner.rs @@ -41,11 +41,19 @@ pub async fn pegboard_actor_get_for_runner( tx.exists(&connectable_key, Serializable), )?; - let (Some(workflow_id), Some(namespace_id), Some(runner_id)) = (workflow_id, namespace_id, runner_id_entry) else { + let (Some(workflow_id), Some(namespace_id), Some(runner_id)) = + (workflow_id, namespace_id, runner_id_entry) + else { return Ok(None); }; - Ok(Some((workflow_id, name_entry, namespace_id, runner_id, is_connectable))) + Ok(Some(( + workflow_id, + name_entry, + namespace_id, + runner_id, + is_connectable, + ))) }) .custom_instrument(tracing::info_span!("actor_get_for_runner_tx")) .await?; diff --git a/engine/packages/pegboard/src/workflows/actor/keys.rs b/engine/packages/pegboard/src/workflows/actor/keys.rs index 22d89f70cf..e7e8a629be 100644 --- a/engine/packages/pegboard/src/workflows/actor/keys.rs +++ b/engine/packages/pegboard/src/workflows/actor/keys.rs @@ -1,5 +1,7 @@ use epoxy::{ - ops::propose::{CheckAndSetCommand, Command, CommandError, CommandKind, Proposal, ProposalResult}, + ops::propose::{ + CheckAndSetCommand, Command, CommandError, CommandKind, Proposal, ProposalResult, + }, protocol::ReplicaId, }; use futures_util::TryStreamExt; diff --git a/engine/packages/pegboard/src/workflows/actor2/keys.rs b/engine/packages/pegboard/src/workflows/actor2/keys.rs index 27006ef961..36f7a2f817 100644 --- a/engine/packages/pegboard/src/workflows/actor2/keys.rs +++ b/engine/packages/pegboard/src/workflows/actor2/keys.rs @@ -1,5 +1,7 @@ use epoxy::{ - ops::propose::{CheckAndSetCommand, Command, CommandError, CommandKind, Proposal, ProposalResult}, + ops::propose::{ + CheckAndSetCommand, Command, CommandError, CommandKind, Proposal, ProposalResult, + }, protocol::ReplicaId, }; use futures_util::TryStreamExt; diff --git a/engine/packages/pegboard/src/workflows/actor2/runtime.rs b/engine/packages/pegboard/src/workflows/actor2/runtime.rs index 5986573e15..129a884d93 100644 --- a/engine/packages/pegboard/src/workflows/actor2/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor2/runtime.rs @@ -966,66 +966,25 @@ pub async fn insert_and_send_commands( state.envoy_last_command_idx += input.commands.len() as i64; - // Fetch preloaded KV at send time for any start commands. Preloaded KV is - // never persisted in the command queue or workflow history. - let preloaded_kv = { - let has_start_cmd = input - .commands - .iter() - .any(|command| matches!(command, protocol::Command::CommandStartActor(_))); - if has_start_cmd { - let db = ctx.udb()?; - crate::actor_kv::preload::fetch_preloaded_kv( - &db, - ctx.config().pegboard(), - actor_id, - namespace_id, - &input - .commands - .iter() - .find_map(|command| match command { - protocol::Command::CommandStartActor(start) => { - Some(start.config.name.clone()) - } - _ => None, - }) - .unwrap_or_default(), - ) - .await? - } else { - None - } - }; - let receiver_subject = crate::pubsub_subjects::EnvoyReceiverSubject::new( state.namespace_id, input.envoy_key.clone(), ) .to_string(); - let mut preloaded_kv = preloaded_kv; let message_serialized = versioned::ToEnvoyConn::wrap_latest(protocol::ToEnvoyConn::ToEnvoyCommands( input .commands .iter() .enumerate() - .map(|(i, command)| { - let mut command = command.clone(); - if let protocol::Command::CommandStartActor(ref mut start) = - command - { - start.preloaded_kv = preloaded_kv.take(); - } - - protocol::CommandWrapper { - checkpoint: protocol::ActorCheckpoint { - actor_id: state.actor_id.to_string(), - generation: input.generation, - index: old_last_command_idx + i as i64 + 1, - }, - inner: command, - } + .map(|(i, command)| protocol::CommandWrapper { + checkpoint: protocol::ActorCheckpoint { + actor_id: state.actor_id.to_string(), + generation: input.generation, + index: old_last_command_idx + i as i64 + 1, + }, + inner: command.clone(), }) .collect(), )) diff --git a/engine/packages/pegboard/tests/actor_v1_pre_migration.rs b/engine/packages/pegboard/tests/actor_v1_pre_migration.rs new file mode 100644 index 0000000000..ca7768c3b3 --- /dev/null +++ b/engine/packages/pegboard/tests/actor_v1_pre_migration.rs @@ -0,0 +1,77 @@ +use std::time::Duration; + +use gas::prelude::*; +use pegboard::workflows::actor::AllocationOverride; +use test_snapshot::SnapshotTestCtx; +use universaldb::prelude::*; + +#[tokio::test] +async fn actor_v1_pre_migration() { + let test_ctx = SnapshotTestCtx::from_snapshot_with_coordinator("pb-actor-v1-pre-migration") + .await + .unwrap(); + let ctx = test_ctx.get_ctx(test_ctx.leader_id); + + let existing_namespace = ctx + .op(namespace::ops::resolve_for_name_local::Input { + name: "default".to_string(), + }) + .await + .unwrap() + .expect("default ns should exist"); + + let actors_res = ctx + .op(pegboard::ops::actor::list_for_ns::Input { + namespace_id: existing_namespace.namespace_id, + name: "test".to_string(), + key: None, + include_destroyed: true, + created_before: None, + limit: 1, + fetch_error: false, + }) + .await + .unwrap(); + let actor = actors_res + .actors + .into_iter() + .next() + .expect("actor should exist"); + + ctx.signal(pegboard::workflows::actor::Wake { + allocation_override: AllocationOverride::default(), + }) + .to_workflow::() + .tag("actor_id", actor.actor_id) + .send() + .await + .unwrap(); + + tokio::time::sleep(Duration::from_secs(3)).await; + + // Get workflow id + let workflow_id = ctx + .udb() + .unwrap() + .run(|tx| async move { + let tx = tx.with_subspace(pegboard::keys::subspace()); + + tx.read( + &pegboard::keys::actor::WorkflowIdKey::new(actor.actor_id), + Serializable, + ) + .await + }) + .await + .unwrap(); + + let wf = ctx + .get_workflows(vec![workflow_id]) + .await + .unwrap() + .into_iter() + .next() + .expect("workflow should exist"); + + assert!(!wf.is_dead()); +} diff --git a/engine/packages/pegboard/tests/kv_list_edge_cases.rs b/engine/packages/pegboard/tests/kv_list_edge_cases.rs index d18b0b5001..d333a56544 100644 --- a/engine/packages/pegboard/tests/kv_list_edge_cases.rs +++ b/engine/packages/pegboard/tests/kv_list_edge_cases.rs @@ -1,7 +1,7 @@ use anyhow::Result; use gas::prelude::*; use pegboard::actor_kv as kv; -use rivet_runner_protocol::mk2 as rp; +use rivet_envoy_protocol as ep; #[tokio::test] async fn test_list_edge_cases() -> Result<()> { @@ -12,15 +12,20 @@ async fn test_list_edge_cases() -> Result<()> { let test_id = Uuid::new_v4(); let dc_label = 1; - let datacenters = vec![rivet_config::config::topology::Datacenter { - name: "test-dc".to_string(), - datacenter_label: dc_label, - is_leader: true, - peer_url: url::Url::parse("http://127.0.0.1:8080")?, - public_url: url::Url::parse("http://127.0.0.1:8081")?, - proxy_url: None, - valid_hosts: None, - }]; + let datacenters = [( + "test-dc".to_string(), + rivet_config::config::topology::Datacenter { + name: "test-dc".to_string(), + datacenter_label: dc_label, + is_leader: true, + peer_url: url::Url::parse("http://127.0.0.1:8080")?, + public_url: url::Url::parse("http://127.0.0.1:8081")?, + proxy_url: None, + valid_hosts: None, + }, + )] + .into_iter() + .collect(); let api_peer_port = portpicker::pick_unused_port().expect("failed to pick api peer port"); let guard_port = portpicker::pick_unused_port().expect("failed to pick guard port"); @@ -45,7 +50,7 @@ async fn test_list_edge_cases() -> Result<()> { // Test 1: List when empty tracing::info!("test 1: list when empty"); let (empty_keys, _, _) = - kv::list(db, &recipient, rp::KvListQuery::KvListAllQuery, false, None).await?; + kv::list(db, &recipient, ep::KvListQuery::KvListAllQuery, false, None).await?; assert_eq!(empty_keys.len(), 0, "should return empty list"); // Test 2: Prefix that matches nothing @@ -61,7 +66,7 @@ async fn test_list_edge_cases() -> Result<()> { let (no_match, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { + ep::KvListQuery::KvListPrefixQuery(ep::KvListPrefixQuery { key: b"xyz".to_vec(), }), false, @@ -79,7 +84,7 @@ async fn test_list_edge_cases() -> Result<()> { let (backwards_range, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListRangeQuery(rp::KvListRangeQuery { + ep::KvListQuery::KvListRangeQuery(ep::KvListRangeQuery { start: b"z".to_vec(), end: b"a".to_vec(), exclusive: false, @@ -99,7 +104,7 @@ async fn test_list_edge_cases() -> Result<()> { let (same_inclusive, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListRangeQuery(rp::KvListRangeQuery { + ep::KvListQuery::KvListRangeQuery(ep::KvListRangeQuery { start: b"foo".to_vec(), end: b"foo".to_vec(), exclusive: false, @@ -117,7 +122,7 @@ async fn test_list_edge_cases() -> Result<()> { let (same_exclusive, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListRangeQuery(rp::KvListRangeQuery { + ep::KvListQuery::KvListRangeQuery(ep::KvListRangeQuery { start: b"foo".to_vec(), end: b"foo".to_vec(), exclusive: true, @@ -153,7 +158,7 @@ async fn test_list_edge_cases() -> Result<()> { let (null_prefix, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { + ep::KvListQuery::KvListPrefixQuery(ep::KvListPrefixQuery { key: vec![b'a', 0x00], }), false, @@ -198,7 +203,7 @@ async fn test_list_edge_cases() -> Result<()> { let (empty_prefix, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { key: vec![] }), + ep::KvListQuery::KvListPrefixQuery(ep::KvListPrefixQuery { key: vec![] }), false, None, ) @@ -214,7 +219,7 @@ async fn test_list_edge_cases() -> Result<()> { let (long_prefix, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { + ep::KvListQuery::KvListPrefixQuery(ep::KvListPrefixQuery { key: b"abcdefghijk".to_vec(), }), false, @@ -248,7 +253,7 @@ async fn test_list_edge_cases() -> Result<()> { let (prefix_match, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { + ep::KvListQuery::KvListPrefixQuery(ep::KvListPrefixQuery { key: b"key".to_vec(), }), false, @@ -271,7 +276,7 @@ async fn test_list_edge_cases() -> Result<()> { let (byte_range, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListRangeQuery(rp::KvListRangeQuery { + ep::KvListQuery::KvListRangeQuery(ep::KvListRangeQuery { start: b"key\x00".to_vec(), end: b"key\x02".to_vec(), exclusive: false, @@ -297,7 +302,7 @@ async fn test_list_edge_cases() -> Result<()> { let (zero_limit, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListAllQuery, + ep::KvListQuery::KvListAllQuery, false, Some(0), ) @@ -309,7 +314,7 @@ async fn test_list_edge_cases() -> Result<()> { let (one_limit, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListAllQuery, + ep::KvListQuery::KvListAllQuery, false, Some(1), ) @@ -321,7 +326,7 @@ async fn test_list_edge_cases() -> Result<()> { let (large_limit, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListAllQuery, + ep::KvListQuery::KvListAllQuery, false, Some(1000), ) @@ -347,7 +352,7 @@ async fn test_list_edge_cases() -> Result<()> { let (reverse_limited, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListAllQuery, + ep::KvListQuery::KvListAllQuery, true, Some(2), ) @@ -366,7 +371,7 @@ async fn test_list_edge_cases() -> Result<()> { let (prefix_reverse, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { key: vec![] }), + ep::KvListQuery::KvListPrefixQuery(ep::KvListPrefixQuery { key: vec![] }), true, None, ) diff --git a/engine/packages/pegboard/tests/kv_operations.rs b/engine/packages/pegboard/tests/kv_operations.rs index 631f245429..141d09c6ea 100644 --- a/engine/packages/pegboard/tests/kv_operations.rs +++ b/engine/packages/pegboard/tests/kv_operations.rs @@ -1,7 +1,7 @@ use anyhow::Result; use gas::prelude::*; use pegboard::actor_kv as kv; -use rivet_runner_protocol::mk2 as rp; +use rivet_envoy_protocol as ep; #[tokio::test] async fn test_kv_operations() -> Result<()> { @@ -13,15 +13,20 @@ async fn test_kv_operations() -> Result<()> { let test_id = Uuid::new_v4(); let dc_label = 1; - let datacenters = vec![rivet_config::config::topology::Datacenter { - name: "test-dc".to_string(), - datacenter_label: dc_label, - is_leader: true, - peer_url: url::Url::parse("http://127.0.0.1:8080")?, - public_url: url::Url::parse("http://127.0.0.1:8081")?, - proxy_url: None, - valid_hosts: None, - }]; + let datacenters = [( + "test-dc".to_string(), + rivet_config::config::topology::Datacenter { + name: "test-dc".to_string(), + datacenter_label: dc_label, + is_leader: true, + peer_url: url::Url::parse("http://127.0.0.1:8080")?, + public_url: url::Url::parse("http://127.0.0.1:8081")?, + proxy_url: None, + valid_hosts: None, + }, + )] + .into_iter() + .collect(); let api_peer_port = portpicker::pick_unused_port().expect("failed to pick api peer port"); let guard_port = portpicker::pick_unused_port().expect("failed to pick guard port"); @@ -98,7 +103,7 @@ async fn test_kv_operations() -> Result<()> { // Test 3: List all keys tracing::info!("test 3: listing all keys"); let (list_keys, list_values, list_metadata) = - kv::list(db, &recipient, rp::KvListQuery::KvListAllQuery, false, None).await?; + kv::list(db, &recipient, ep::KvListQuery::KvListAllQuery, false, None).await?; assert_eq!(list_keys.len(), 5, "should list 5 keys"); assert_eq!(list_values.len(), 5, "should list 5 values"); @@ -110,7 +115,7 @@ async fn test_kv_operations() -> Result<()> { let (limited_keys, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListAllQuery, + ep::KvListQuery::KvListAllQuery, false, Some(2), ) @@ -122,9 +127,9 @@ async fn test_kv_operations() -> Result<()> { // Test 5: List with reverse tracing::info!("test 5: listing in reverse"); let (forward_keys, _, _) = - kv::list(db, &recipient, rp::KvListQuery::KvListAllQuery, false, None).await?; + kv::list(db, &recipient, ep::KvListQuery::KvListAllQuery, false, None).await?; let (reverse_keys, _, _) = - kv::list(db, &recipient, rp::KvListQuery::KvListAllQuery, true, None).await?; + kv::list(db, &recipient, ep::KvListQuery::KvListAllQuery, true, None).await?; assert_eq!(forward_keys.len(), reverse_keys.len()); // Keys should be in opposite order @@ -161,7 +166,7 @@ async fn test_kv_operations() -> Result<()> { let (users_keys, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { + ep::KvListQuery::KvListPrefixQuery(ep::KvListPrefixQuery { key: b"users:".to_vec(), }), false, @@ -177,7 +182,7 @@ async fn test_kv_operations() -> Result<()> { let (posts_keys, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListPrefixQuery(rp::KvListPrefixQuery { + ep::KvListQuery::KvListPrefixQuery(ep::KvListPrefixQuery { key: b"posts:".to_vec(), }), false, @@ -199,7 +204,7 @@ async fn test_kv_operations() -> Result<()> { let (range_keys, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListRangeQuery(rp::KvListRangeQuery { + ep::KvListQuery::KvListRangeQuery(ep::KvListRangeQuery { start: b"key1".to_vec(), end: b"key2".to_vec(), exclusive: false, @@ -223,7 +228,7 @@ async fn test_kv_operations() -> Result<()> { let (exclusive_range_keys, _, _) = kv::list( db, &recipient, - rp::KvListQuery::KvListRangeQuery(rp::KvListRangeQuery { + ep::KvListQuery::KvListRangeQuery(ep::KvListRangeQuery { start: b"key1".to_vec(), end: b"key2".to_vec(), exclusive: true, @@ -245,7 +250,7 @@ async fn test_kv_operations() -> Result<()> { // Verify keys are deleted let (remaining_keys, _, _) = - kv::list(db, &recipient, rp::KvListQuery::KvListAllQuery, false, None).await?; + kv::list(db, &recipient, ep::KvListQuery::KvListAllQuery, false, None).await?; assert_eq!(remaining_keys.len(), 3, "should have 3 keys remaining"); assert!(!remaining_keys.contains(&b"key1".to_vec())); assert!(!remaining_keys.contains(&b"key2".to_vec())); @@ -256,7 +261,7 @@ async fn test_kv_operations() -> Result<()> { kv::delete_range(db, &recipient, b"key3".to_vec(), b"key5".to_vec()).await?; let (post_range_keys, _, _) = - kv::list(db, &recipient, rp::KvListQuery::KvListAllQuery, false, None).await?; + kv::list(db, &recipient, ep::KvListQuery::KvListAllQuery, false, None).await?; assert_eq!(post_range_keys.len(), 1, "should have 1 key remaining"); assert_eq!( post_range_keys[0], @@ -271,7 +276,7 @@ async fn test_kv_operations() -> Result<()> { // Verify all keys are deleted let (all_keys, _, _) = - kv::list(db, &recipient, rp::KvListQuery::KvListAllQuery, false, None).await?; + kv::list(db, &recipient, ep::KvListQuery::KvListAllQuery, false, None).await?; assert_eq!(all_keys.len(), 0, "should have no keys remaining"); tracing::info!("successfully deleted all keys"); diff --git a/engine/packages/test-snapshot-gen/Cargo.toml b/engine/packages/test-snapshot-gen/Cargo.toml index 8850ce67e0..1fdbb18a17 100644 --- a/engine/packages/test-snapshot-gen/Cargo.toml +++ b/engine/packages/test-snapshot-gen/Cargo.toml @@ -18,17 +18,20 @@ anyhow.workspace = true async-trait.workspace = true axum.workspace = true clap.workspace = true -epoxy.workspace = true epoxy-protocol.workspace = true +epoxy.workspace = true gas.workspace = true +namespace.workspace = true +pegboard.workspace = true portpicker.workspace = true rivet-api-builder.workspace = true rivet-config.workspace = true rivet-pools.workspace = true rivet-test-deps.workspace = true +rivet-types.workspace = true rivet-util.workspace = true -serde.workspace = true serde_json.workspace = true +serde.workspace = true tokio.workspace = true tracing.workspace = true universaldb.workspace = true diff --git a/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/metadata.json b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/metadata.json new file mode 100644 index 0000000000..3c31d13aac --- /dev/null +++ b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/metadata.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e219c7a03b9c9722dc72666b9385a5af9fe58c93c9e2d2bab064084dde4fb4f0 +size 79 diff --git a/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/000008.log b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/000008.log new file mode 100644 index 0000000000..e69de29bb2 diff --git a/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/000009.sst b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/000009.sst new file mode 100644 index 0000000000..d22a2edba4 --- /dev/null +++ b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/000009.sst @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:34758e8b0294c9c264b56a7b38283a47aa4f66d7d857b2348983992ef7d8898a +size 10371 diff --git a/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/CURRENT b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/CURRENT new file mode 100644 index 0000000000..f8d5048625 --- /dev/null +++ b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/CURRENT @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9c283f6e81028b9eb0760d918ee4bc0aa256ed3b926393c1734c760c4bd724fd +size 16 diff --git a/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/MANIFEST-000005 b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/MANIFEST-000005 new file mode 100644 index 0000000000..1a7f8b79cb --- /dev/null +++ b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/MANIFEST-000005 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5f1ba367165061fdc717905d50320b368f4161fcc4daa510d46a9e0ca614297e +size 300 diff --git a/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/OPTIONS-000007 b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/OPTIONS-000007 new file mode 100644 index 0000000000..108379c184 --- /dev/null +++ b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-1/OPTIONS-000007 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:7e335d1f28391ae45a3b6b5acc70245581d7137ca22d213bddbcca518195aec8 +size 7749 diff --git a/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/000008.log b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/000008.log new file mode 100644 index 0000000000..e69de29bb2 diff --git a/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/000009.sst b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/000009.sst new file mode 100644 index 0000000000..3c57c2f524 --- /dev/null +++ b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/000009.sst @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:632cfb41b8b61ceb5682d11f1b2146697dfadb2a7e00f9ebfd5f776887aaeabe +size 2914 diff --git a/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/CURRENT b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/CURRENT new file mode 100644 index 0000000000..f8d5048625 --- /dev/null +++ b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/CURRENT @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:9c283f6e81028b9eb0760d918ee4bc0aa256ed3b926393c1734c760c4bd724fd +size 16 diff --git a/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/MANIFEST-000005 b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/MANIFEST-000005 new file mode 100644 index 0000000000..6156fc863b --- /dev/null +++ b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/MANIFEST-000005 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6dd5a89f06f320efc61e2f6a0846fc04c4d8aa5f836e4f542fb503d3c28e6e6f +size 263 diff --git a/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/OPTIONS-000007 b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/OPTIONS-000007 new file mode 100644 index 0000000000..9a63c24680 --- /dev/null +++ b/engine/packages/test-snapshot-gen/snapshots/pb-actor-v1-pre-migration/replica-2/OPTIONS-000007 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:aa1c4b471e8d7f80a1785ced50c1f422ddb237ccdc6ed354812a9cc435c6ad15 +size 7750 diff --git a/engine/packages/test-snapshot-gen/src/lib.rs b/engine/packages/test-snapshot-gen/src/lib.rs index f8aa454989..7a70263b75 100644 --- a/engine/packages/test-snapshot-gen/src/lib.rs +++ b/engine/packages/test-snapshot-gen/src/lib.rs @@ -21,10 +21,7 @@ pub fn snapshot_dir(scenario: &str) -> PathBuf { /// /// Returns a map of `replica_id -> temp_path` where each temp_path is a /// copy of that replica's RocksDB data directory. -pub fn load_snapshot( - scenario: &str, - test_id: uuid::Uuid, -) -> Result> { +pub fn load_snapshot(scenario: &str, test_id: uuid::Uuid) -> Result> { let dir = snapshot_dir(scenario); load_snapshot_from(&dir, test_id) } @@ -128,8 +125,7 @@ impl SnapshotTestCtx { for &replica_id in &replica_ids { let api_peer_port = portpicker::pick_unused_port().context("failed to pick API peer port")?; - let guard_port = - portpicker::pick_unused_port().context("failed to pick guard port")?; + let guard_port = portpicker::pick_unused_port().context("failed to pick guard port")?; ctx.replica_metadata.insert( replica_id, ReplicaMetadata { @@ -224,7 +220,9 @@ impl SnapshotTestCtx { ) .await?; - let reg = epoxy::registry()?; + let reg = epoxy::registry()? + .merge(namespace::registry()?)? + .merge(pegboard::registry()?)?; let test_ctx = WorkflowTestCtx::new_with_deps(reg, test_deps).await?; let api_handle = setup_api_server( @@ -269,14 +267,8 @@ impl SnapshotTestCtx { name: format!("dc-{}", id), datacenter_label: id as u16, is_leader: id == self.leader_id, - peer_url: Url::parse(&format!( - "http://127.0.0.1:{}", - metadata.api_peer_port - ))?, - public_url: Url::parse(&format!( - "http://127.0.0.1:{}", - metadata.guard_port - ))?, + peer_url: Url::parse(&format!("http://127.0.0.1:{}", metadata.api_peer_port))?, + public_url: Url::parse(&format!("http://127.0.0.1:{}", metadata.guard_port))?, proxy_url: None, valid_hosts: None, }, diff --git a/engine/packages/test-snapshot-gen/src/main.rs b/engine/packages/test-snapshot-gen/src/main.rs index 6c913f8fec..4341b88cf4 100644 --- a/engine/packages/test-snapshot-gen/src/main.rs +++ b/engine/packages/test-snapshot-gen/src/main.rs @@ -104,11 +104,9 @@ async fn run_scenario(scenario: &dyn scenarios::Scenario) -> Result<()> { // Clean previous snapshot if it exists. if scenario_dir.exists() { - std::fs::remove_dir_all(&scenario_dir) - .context("failed to remove old snapshot")?; + std::fs::remove_dir_all(&scenario_dir).context("failed to remove old snapshot")?; } - std::fs::create_dir_all(&scenario_dir) - .context("failed to create snapshot directory")?; + std::fs::create_dir_all(&scenario_dir).context("failed to create snapshot directory")?; // Build the cluster. let replica_ids: Vec = (1..=scenario.replica_count() as u64).collect(); diff --git a/engine/packages/test-snapshot-gen/src/scenarios/mod.rs b/engine/packages/test-snapshot-gen/src/scenarios/mod.rs index 47d7f4a8b5..5c7a74708d 100644 --- a/engine/packages/test-snapshot-gen/src/scenarios/mod.rs +++ b/engine/packages/test-snapshot-gen/src/scenarios/mod.rs @@ -4,6 +4,7 @@ use async_trait::async_trait; use crate::test_cluster::TestCluster; mod epoxy_keys; +mod pb_actor_v1_pre_migration; #[async_trait(?Send)] pub trait Scenario { @@ -19,5 +20,8 @@ pub trait Scenario { } pub fn all() -> Vec> { - vec![Box::new(epoxy_keys::EpoxyKeys)] + vec![ + Box::new(epoxy_keys::EpoxyKeys), + Box::new(pb_actor_v1_pre_migration::PbActorV1PreMigration), + ] } diff --git a/engine/packages/test-snapshot-gen/src/scenarios/pb_actor_v1_pre_migration.rs b/engine/packages/test-snapshot-gen/src/scenarios/pb_actor_v1_pre_migration.rs new file mode 100644 index 0000000000..6f512b42f9 --- /dev/null +++ b/engine/packages/test-snapshot-gen/src/scenarios/pb_actor_v1_pre_migration.rs @@ -0,0 +1,56 @@ +use std::time::Duration; + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use gas::prelude::*; +use rivet_types::actors::CrashPolicy; + +use crate::test_cluster::TestCluster; + +use super::Scenario; + +/// Scenario that creates a sleeping actor v1 before envoys were introduced. When loaded after envoys +/// introduced the wf should not die when awoken. +pub struct PbActorV1PreMigration; + +#[async_trait(?Send)] +impl Scenario for PbActorV1PreMigration { + fn name(&self) -> &'static str { + "pb-actor-v1-pre-migration" + } + + fn replica_count(&self) -> usize { + 2 + } + + async fn populate(&self, cluster: &TestCluster) -> Result<()> { + let ctx = cluster.get_ctx(cluster.leader_id()); + + let existing_namespace = ctx + .op(namespace::ops::resolve_for_name_local::Input { + name: "default".to_string(), + }) + .await? + .context("default ns should exist")?; + + let actor_id = Id::new_v1(ctx.config().dc_label()); + + ctx.op(pegboard::ops::actor::create::Input { + actor_id, + namespace_id: existing_namespace.namespace_id, + name: "test".to_string(), + key: None, + runner_name_selector: "default".to_string(), + input: None, + crash_policy: CrashPolicy::Sleep, + forward_request: false, + datacenter_name: None, + }) + .await?; + + // Wait for wf to sleep + tokio::time::sleep(Duration::from_secs(5)).await; + + Ok(()) + } +} diff --git a/engine/packages/test-snapshot-gen/src/test_cluster.rs b/engine/packages/test-snapshot-gen/src/test_cluster.rs index 40e267eced..861de75cb5 100644 --- a/engine/packages/test-snapshot-gen/src/test_cluster.rs +++ b/engine/packages/test-snapshot-gen/src/test_cluster.rs @@ -41,8 +41,7 @@ impl TestCluster { for &replica_id in replica_ids { let api_peer_port = portpicker::pick_unused_port().context("failed to pick API peer port")?; - let guard_port = - portpicker::pick_unused_port().context("failed to pick guard port")?; + let guard_port = portpicker::pick_unused_port().context("failed to pick guard port")?; cluster.replica_metadata.insert( replica_id, ReplicaMetadata { @@ -92,13 +91,6 @@ impl TestCluster { .wf_ctx } - pub fn replica_ids(&self) -> Vec { - let mut ids = self.replica_metadata.keys().copied().collect::>(); - ids.sort_unstable(); - ids - } - - #[allow(dead_code)] pub fn leader_id(&self) -> ReplicaId { self.leader_id } @@ -134,7 +126,9 @@ impl TestCluster { ) .await?; - let reg = epoxy::registry()?; + let reg = epoxy::registry()? + .merge(namespace::registry()?)? + .merge(pegboard::registry()?)?; let test_ctx = WorkflowTestCtx::new_with_deps(reg, test_deps).await?; let api_handle = setup_api_server( @@ -179,14 +173,8 @@ impl TestCluster { name: format!("dc-{}", id), datacenter_label: id as u16, is_leader: id == self.leader_id, - peer_url: Url::parse(&format!( - "http://127.0.0.1:{}", - metadata.api_peer_port - ))?, - public_url: Url::parse(&format!( - "http://127.0.0.1:{}", - metadata.guard_port - ))?, + peer_url: Url::parse(&format!("http://127.0.0.1:{}", metadata.api_peer_port))?, + public_url: Url::parse(&format!("http://127.0.0.1:{}", metadata.guard_port))?, proxy_url: None, valid_hosts: None, }, diff --git a/engine/sdks/rust/kv-channel-protocol/src/lib.rs b/engine/sdks/rust/kv-channel-protocol/src/lib.rs index 7502faf079..8851f713aa 100644 --- a/engine/sdks/rust/kv-channel-protocol/src/lib.rs +++ b/engine/sdks/rust/kv-channel-protocol/src/lib.rs @@ -5,13 +5,13 @@ pub use generated::v1::*; pub const PROTOCOL_VERSION: u32 = 1; -/// Serialize a ToServer message to BARE bytes. -pub fn encode_to_server(msg: &ToServer) -> Result, serde_bare::error::Error> { +/// Serialize a ToRivet message to BARE bytes. +pub fn encode_to_server(msg: &ToRivet) -> Result, serde_bare::error::Error> { serde_bare::to_vec(msg) } -/// Deserialize a ToServer message from BARE bytes. -pub fn decode_to_server(bytes: &[u8]) -> Result { +/// Deserialize a ToRivet message from BARE bytes. +pub fn decode_to_server(bytes: &[u8]) -> Result { serde_bare::from_slice(bytes) } @@ -33,7 +33,7 @@ mod tests { #[test] fn round_trip_to_server_request_actor_open() { - let msg = ToServer::ToServerRequest(ToServerRequest { + let msg = ToRivet::ToRivetRequest(ToRivetRequest { request_id: 1, actor_id: "abc".into(), data: RequestData::ActorOpenRequest, @@ -45,7 +45,7 @@ mod tests { #[test] fn round_trip_to_server_request_kv_get() { - let msg = ToServer::ToServerRequest(ToServerRequest { + let msg = ToRivet::ToRivetRequest(ToRivetRequest { request_id: 3, actor_id: "actor1".into(), data: RequestData::KvGetRequest(KvGetRequest { @@ -91,7 +91,7 @@ mod tests { #[test] fn bytes_to_server_request_actor_open() { - let msg = ToServer::ToServerRequest(ToServerRequest { + let msg = ToRivet::ToRivetRequest(ToRivetRequest { request_id: 1, actor_id: "abc".into(), data: RequestData::ActorOpenRequest, @@ -105,7 +105,7 @@ mod tests { #[test] fn bytes_to_server_pong() { - let msg = ToServer::ToServerPong(ToServerPong { ts: 1234567890 }); + let msg = ToRivet::ToRivetPong(ToRivetPong { ts: 1234567890 }); let bytes = encode_to_server(&msg).unwrap(); assert_eq!( bytes, @@ -132,7 +132,10 @@ mod tests { let bytes = encode_to_client(&msg).unwrap(); assert_eq!( bytes, - [0x00, 0x2A, 0x00, 0x00, 0x00, 0x03, 0x01, 0x02, 0x01, 0x02, 0x01, 0x03, 0x03, 0x04, 0x05] + [ + 0x00, 0x2A, 0x00, 0x00, 0x00, 0x03, 0x01, 0x02, 0x01, 0x02, 0x01, 0x03, 0x03, 0x04, + 0x05 + ] ); } } diff --git a/engine/sdks/schemas/kv-channel-protocol/v1.bare b/engine/sdks/schemas/kv-channel-protocol/v1.bare index 1a393301d7..1130a406da 100644 --- a/engine/sdks/schemas/kv-channel-protocol/v1.bare +++ b/engine/sdks/schemas/kv-channel-protocol/v1.bare @@ -19,7 +19,7 @@ type Id str # the server sends an error response for the open and rejects # subsequent KV requests for that actor with "actor_locked". -# actorId is on ToServerRequest, not on open/close. The outer +# actorId is on ToRivetRequest, not on open/close. The outer # actorId is the single source of truth for routing. type ActorOpenRequest void @@ -105,21 +105,21 @@ type ResponseData union { KvDeleteResponse } -# MARK: To Server +# MARK: To Rivet -type ToServerRequest struct { +type ToRivetRequest struct { requestId: u32 actorId: Id data: RequestData } -type ToServerPong struct { +type ToRivetPong struct { ts: i64 } -type ToServer union { - ToServerRequest | - ToServerPong +type ToRivet union { + ToRivetRequest | + ToRivetPong } # MARK: To Client diff --git a/engine/sdks/typescript/kv-channel-protocol/src/index.ts b/engine/sdks/typescript/kv-channel-protocol/src/index.ts index 229cde4a2f..fad75ec166 100644 --- a/engine/sdks/typescript/kv-channel-protocol/src/index.ts +++ b/engine/sdks/typescript/kv-channel-protocol/src/index.ts @@ -24,7 +24,7 @@ export function writeId(bc: bare.ByteCursor, x: Id): void { } /** - * actorId is on ToServerRequest, not on open/close. The outer + * actorId is on ToRivetRequest, not on open/close. The outer * actorId is the single source of truth for routing. */ export type ActorOpenRequest = null @@ -332,13 +332,13 @@ export function writeResponseData(bc: bare.ByteCursor, x: ResponseData): void { } } -export type ToServerRequest = { +export type ToRivetRequest = { readonly requestId: u32 readonly actorId: Id readonly data: RequestData } -export function readToServerRequest(bc: bare.ByteCursor): ToServerRequest { +export function readToRivetRequest(bc: bare.ByteCursor): ToRivetRequest { return { requestId: bare.readU32(bc), actorId: readId(bc), @@ -346,38 +346,38 @@ export function readToServerRequest(bc: bare.ByteCursor): ToServerRequest { } } -export function writeToServerRequest(bc: bare.ByteCursor, x: ToServerRequest): void { +export function writeToRivetRequest(bc: bare.ByteCursor, x: ToRivetRequest): void { bare.writeU32(bc, x.requestId) writeId(bc, x.actorId) writeRequestData(bc, x.data) } -export type ToServerPong = { +export type ToRivetPong = { readonly ts: i64 } -export function readToServerPong(bc: bare.ByteCursor): ToServerPong { +export function readToRivetPong(bc: bare.ByteCursor): ToRivetPong { return { ts: bare.readI64(bc), } } -export function writeToServerPong(bc: bare.ByteCursor, x: ToServerPong): void { +export function writeToRivetPong(bc: bare.ByteCursor, x: ToRivetPong): void { bare.writeI64(bc, x.ts) } -export type ToServer = - | { readonly tag: "ToServerRequest"; readonly val: ToServerRequest } - | { readonly tag: "ToServerPong"; readonly val: ToServerPong } +export type ToRivet = + | { readonly tag: "ToRivetRequest"; readonly val: ToRivetRequest } + | { readonly tag: "ToRivetPong"; readonly val: ToRivetPong } -export function readToServer(bc: bare.ByteCursor): ToServer { +export function readToRivet(bc: bare.ByteCursor): ToRivet { const offset = bc.offset const tag = bare.readU8(bc) switch (tag) { case 0: - return { tag: "ToServerRequest", val: readToServerRequest(bc) } + return { tag: "ToRivetRequest", val: readToRivetRequest(bc) } case 1: - return { tag: "ToServerPong", val: readToServerPong(bc) } + return { tag: "ToRivetPong", val: readToRivetPong(bc) } default: { bc.offset = offset throw new bare.BareError(offset, "invalid tag") @@ -385,34 +385,34 @@ export function readToServer(bc: bare.ByteCursor): ToServer { } } -export function writeToServer(bc: bare.ByteCursor, x: ToServer): void { +export function writeToRivet(bc: bare.ByteCursor, x: ToRivet): void { switch (x.tag) { - case "ToServerRequest": { + case "ToRivetRequest": { bare.writeU8(bc, 0) - writeToServerRequest(bc, x.val) + writeToRivetRequest(bc, x.val) break } - case "ToServerPong": { + case "ToRivetPong": { bare.writeU8(bc, 1) - writeToServerPong(bc, x.val) + writeToRivetPong(bc, x.val) break } } } -export function encodeToServer(x: ToServer, config?: Partial): Uint8Array { +export function encodeToRivet(x: ToRivet, config?: Partial): Uint8Array { const fullConfig = config != null ? bare.Config(config) : DEFAULT_CONFIG const bc = new bare.ByteCursor( new Uint8Array(fullConfig.initialBufferLength), fullConfig, ) - writeToServer(bc, x) + writeToRivet(bc, x) return new Uint8Array(bc.view.buffer, bc.view.byteOffset, bc.offset) } -export function decodeToServer(bytes: Uint8Array): ToServer { +export function decodeToRivet(bytes: Uint8Array): ToRivet { const bc = new bare.ByteCursor(bytes, DEFAULT_CONFIG) - const result = readToServer(bc) + const result = readToRivet(bc) if (bc.offset < bc.view.byteLength) { throw new bare.BareError(bc.offset, "remaining bytes") } diff --git a/rivetkit-typescript/packages/rivetkit/src/manager/kv-channel.ts b/rivetkit-typescript/packages/rivetkit/src/manager/kv-channel.ts index aa95b0ab22..709e2fcc41 100644 --- a/rivetkit-typescript/packages/rivetkit/src/manager/kv-channel.ts +++ b/rivetkit-typescript/packages/rivetkit/src/manager/kv-channel.ts @@ -8,12 +8,12 @@ import type { WSContext } from "hono/ws"; import { PROTOCOL_VERSION, - type ToServer, + type ToRivet, type ToClient, type RequestData, type ResponseData, - type ToServerRequest, - decodeToServer, + type ToRivetRequest, + decodeToRivet, encodeToClient, } from "@rivetkit/engine-kv-channel-protocol"; import { KvStorageQuotaExceededError } from "@/drivers/file-system/kv-limits"; @@ -125,8 +125,8 @@ export function createKvChannelManager(): KvChannelManager { return; } - const msg = decodeToServer(bytes); - handleToServerMessage(state, conn, managerDriver, msg); + const msg = decodeToRivet(bytes); + handleToRivetMessage(state, conn, managerDriver, msg); } catch (err: unknown) { logger().error({ msg: "kv channel failed to decode message", @@ -273,7 +273,7 @@ async function handleRequest( state: KvChannelManagerState, conn: KvChannelConnection, managerDriver: ManagerDriver, - request: ToServerRequest, + request: ToRivetRequest, ): Promise { const { requestId, actorId, data } = request; @@ -649,14 +649,14 @@ async function handleKvOperation( } } -function handleToServerMessage( +function handleToRivetMessage( state: KvChannelManagerState, conn: KvChannelConnection, managerDriver: ManagerDriver, - msg: ToServer, + msg: ToRivet, ): void { switch (msg.tag) { - case "ToServerRequest": { + case "ToRivetRequest": { const { actorId } = msg.val; // Chain requests per actor so they execute sequentially, @@ -688,7 +688,7 @@ function handleToServerMessage( break; } - case "ToServerPong": + case "ToRivetPong": conn.lastPongTs = Date.now(); break; } diff --git a/rivetkit-typescript/packages/sqlite-native/src/channel.rs b/rivetkit-typescript/packages/sqlite-native/src/channel.rs index eff118a3f2..8848a693a3 100644 --- a/rivetkit-typescript/packages/sqlite-native/src/channel.rs +++ b/rivetkit-typescript/packages/sqlite-native/src/channel.rs @@ -24,7 +24,7 @@ use tokio_tungstenite::tungstenite::Message; use crate::protocol::{ decode_to_client, encode_to_server, ErrorResponse, RequestData, ResponseData, ToClient, - ToServer, ToServerPong, ToServerRequest, PROTOCOL_VERSION, + ToRivet, ToRivetPong, ToRivetRequest, PROTOCOL_VERSION, }; // MARK: Constants @@ -324,7 +324,7 @@ impl KvChannel { }; // Serialize the message. - let msg = ToServer::ToServerRequest(ToServerRequest { + let msg = ToRivet::ToRivetRequest(ToRivetRequest { request_id, actor_id: actor_id.to_string(), data, @@ -483,7 +483,7 @@ async fn connection_loop(inner: Arc, mut shutdown_rx: watch::Receiver( Ok(ToClient::ToClientPing(ping)) => { // Respond with pong echoing the timestamp. let pong = - ToServer::ToServerPong(ToServerPong { ts: ping.ts }); + ToRivet::ToRivetPong(ToRivetPong { ts: ping.ts }); if let Ok(bytes) = encode_to_server(&pong) { let tx_guard = inner.outgoing_tx.lock().await; if let Some(tx) = tx_guard.as_ref() { diff --git a/rivetkit-typescript/packages/sqlite-native/src/integration_tests.rs b/rivetkit-typescript/packages/sqlite-native/src/integration_tests.rs index 12b558edf7..dfc6310ca1 100644 --- a/rivetkit-typescript/packages/sqlite-native/src/integration_tests.rs +++ b/rivetkit-typescript/packages/sqlite-native/src/integration_tests.rs @@ -180,7 +180,7 @@ async fn mock_accept_loop(listener: TcpListener, state: Arc) { msg = read.next() => { match msg { Some(Ok(Message::Binary(data))) => { - if let Ok(ToServer::ToServerRequest(req)) = decode_to_server(&data) { + if let Ok(ToRivet::ToRivetRequest(req)) = decode_to_server(&data) { let state = state.clone(); let resp_tx = resp_tx.clone(); let open_actors = open_actors.clone();