From 8ced988f6d72f594f3dac99b3567395877a57dfd Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 17 Oct 2025 02:13:41 -0700 Subject: [PATCH 1/6] chore(epoxy): update peer urls when manually reconfiguring replicas --- .../coordinator/replica_status_change.rs | 37 +++++++++++++++++-- 1 file changed, 34 insertions(+), 3 deletions(-) diff --git a/packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs b/packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs index 6d8fde554d..7445616136 100644 --- a/packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs +++ b/packages/services/epoxy/src/workflows/coordinator/replica_status_change.rs @@ -38,9 +38,9 @@ pub async fn replica_status_change( } #[tracing::instrument(skip_all)] -pub async fn replica_reconfigure( - ctx: &mut WorkflowCtx, -) -> Result<()> { +pub async fn replica_reconfigure(ctx: &mut WorkflowCtx) -> Result<()> { + ctx.activity(UpdateReplicaUrlsInput {}).await?; + let notify_out = ctx.activity(NotifyAllReplicasInput {}).await?; let replica_id = ctx.config().epoxy_replica_id(); @@ -108,6 +108,37 @@ pub async fn increment_epoch(ctx: &ActivityCtx, _input: &IncrementEpochInput) -> Ok(()) } +#[derive(Debug, Clone, Serialize, Deserialize, Hash)] +pub struct UpdateReplicaUrlsInput {} + +#[activity(UpdateReplicaUrls)] +pub async fn update_replica_urls(ctx: &ActivityCtx, _input: &UpdateReplicaUrlsInput) -> Result<()> { + let mut state = ctx.state::()?; + + // Update URLs for all replicas based on topology + for replica in state.config.replicas.iter_mut() { + let Some(dc) = ctx.config().dc_for_label(replica.replica_id as u16) else { + tracing::warn!( + replica_id = ?replica.replica_id, + "datacenter not found for replica, skipping url update" + ); + continue; + }; + + replica.api_peer_url = dc.peer_url.to_string(); + replica.guard_url = dc.public_url.to_string(); + + tracing::info!( + replica_id = ?replica.replica_id, + api_peer_url = ?dc.peer_url, + guard_url = ?dc.public_url, + "updated replica urls" + ); + } + + Ok(()) +} + #[derive(Debug, Clone, Serialize, Deserialize, Hash)] pub struct NotifyAllReplicasInput {} From 1387d6a0794c28205c734c4488e04bf52fff2829 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 17 Oct 2025 02:14:12 -0700 Subject: [PATCH 2/6] chore(engine): move guard to api public service kind --- packages/infra/engine/src/run_config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/infra/engine/src/run_config.rs b/packages/infra/engine/src/run_config.rs index a149e9a257..6de1959538 100644 --- a/packages/infra/engine/src/run_config.rs +++ b/packages/infra/engine/src/run_config.rs @@ -6,7 +6,7 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { Service::new("api_peer", ServiceKind::ApiPeer, |config, pools| { Box::pin(rivet_api_peer::start(config, pools)) }), - Service::new("guard", ServiceKind::Standalone, |config, pools| { + Service::new("guard", ServiceKind::ApiPublic, |config, pools| { Box::pin(rivet_guard::start(config, pools)) }), Service::new( From 00b8c73dcab9063908292bee0780ce6f9c2a6a37 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 17 Oct 2025 02:15:06 -0700 Subject: [PATCH 3/6] chore(pegboard-serverless): auto-close runner ws --- Cargo.lock | 1 + packages/core/pegboard-serverless/Cargo.toml | 1 + packages/core/pegboard-serverless/src/lib.rs | 84 +++++++++++++++----- packages/infra/engine/src/commands/start.rs | 43 +++++----- packages/infra/engine/src/run_config.rs | 3 +- 5 files changed, 92 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2918af3a57..c919d3b8b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3355,6 +3355,7 @@ dependencies = [ "rivet-util", "tracing", "universaldb", + "universalpubsub", "vbare", ] diff --git a/packages/core/pegboard-serverless/Cargo.toml b/packages/core/pegboard-serverless/Cargo.toml index ad985a1b4c..6112a75754 100644 --- a/packages/core/pegboard-serverless/Cargo.toml +++ b/packages/core/pegboard-serverless/Cargo.toml @@ -18,6 +18,7 @@ rivet-types.workspace = true rivet-util.workspace = true tracing.workspace = true universaldb.workspace = true +universalpubsub.workspace = true vbare.workspace = true namespace.workspace = true diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index 363de6ab7c..6ba8a5cd5f 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -19,6 +19,7 @@ use rivet_types::runner_configs::RunnerConfigKind; use tokio::{sync::oneshot, task::JoinHandle, time::Duration}; use universaldb::options::StreamingMode; use universaldb::utils::IsolationLevel::*; +use universalpubsub::PublishOpts; use vbare::OwnedVersionedData; const X_RIVET_ENDPOINT: HeaderName = HeaderName::from_static("x-rivet-endpoint"); @@ -27,6 +28,8 @@ const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-s const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name"); const X_RIVET_NAMESPACE_ID: HeaderName = HeaderName::from_static("x-rivet-namespace-id"); +const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(10); + struct OutboundConnection { handle: JoinHandle<()>, shutdown_tx: oneshot::Sender<()>, @@ -377,12 +380,14 @@ async fn outbound_handler( anyhow::Ok(()) }; + let sleep_until_drop = request_lifespan.saturating_sub(DRAIN_GRACE_PERIOD); tokio::select! { res = stream_handler => return res.map_err(Into::into), - _ = tokio::time::sleep(request_lifespan) => {} + _ = tokio::time::sleep(sleep_until_drop) => {} _ = shutdown_rx => {} } + // Stop runner draining.store(true, Ordering::SeqCst); ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) @@ -394,34 +399,56 @@ async fn outbound_handler( } // Continue waiting on req while draining - while let Some(event) = source.next().await { - match event { - Ok(sse::Event::Open) => {} - Ok(sse::Event::Message(msg)) => { - tracing::debug!(%msg.data, "received outbound req message"); - - // If runner_id is none at this point it means we did not send the stopping signal yet, so - // send it now - if runner_id.is_none() { - let data = BASE64.decode(msg.data).context("invalid base64 message")?; - let payload = + let wait_for_shutdown_fut = async move { + while let Some(event) = source.next().await { + match event { + Ok(sse::Event::Open) => {} + Ok(sse::Event::Message(msg)) => { + tracing::debug!(%msg.data, "received outbound req message"); + + // If runner_id is none at this point it means we did not send the stopping signal yet, so + // send it now + if runner_id.is_none() { + let data = BASE64.decode(msg.data).context("invalid base64 message")?; + let payload = protocol::versioned::ToServerlessServer::deserialize_with_embedded_version( &data, ) .context("invalid payload")?; - match payload { - protocol::ToServerlessServer::ToServerlessServerInit(init) => { - let runner_id = - Id::parse(&init.runner_id).context("invalid runner id")?; - stop_runner(ctx, runner_id).await?; + match payload { + protocol::ToServerlessServer::ToServerlessServerInit(init) => { + let runner_id_local = + Id::parse(&init.runner_id).context("invalid runner id")?; + runner_id = Some(runner_id_local); + stop_runner(ctx, runner_id_local).await?; + } } } } + Err(sse::Error::StreamEnded) => break, + Err(err) => return Err(err.into()), } - Err(sse::Error::StreamEnded) => break, - Err(err) => return Err(err.into()), } + + Result::<()>::Ok(()) + }; + + // Wait for runner to shut down + tokio::select! { + res = wait_for_shutdown_fut => return res.map_err(Into::into), + _ = tokio::time::sleep(DRAIN_GRACE_PERIOD) => { + tracing::debug!("reached drain grace period before runner shut down") + } + + } + + // Close connection + // + // This will force the runner to stop the request in order to avoid hitting the serverless + // timeout threshold + if let Some(runner_id) = runner_id { + publish_to_client_stop(ctx, runner_id).await?; } tracing::debug!("outbound req stopped"); @@ -454,3 +481,22 @@ async fn stop_runner(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> { Ok(()) } + +/// Send a stop message to the client. +/// +/// This will close the runner's WebSocket.. +async fn publish_to_client_stop(ctx: &StandaloneCtx, runner_id: Id) -> Result<()> { + let receiver_subject = + pegboard::pubsub_subjects::RunnerReceiverSubject::new(runner_id).to_string(); + + let message_serialized = rivet_runner_protocol::versioned::ToClient::latest( + rivet_runner_protocol::ToClient::ToClientClose, + ) + .serialize_with_embedded_version(rivet_runner_protocol::PROTOCOL_VERSION)?; + + ctx.ups()? + .publish(&receiver_subject, &message_serialized, PublishOpts::one()) + .await?; + + Ok(()) +} diff --git a/packages/infra/engine/src/commands/start.rs b/packages/infra/engine/src/commands/start.rs index d25735f9b8..5316072da8 100644 --- a/packages/infra/engine/src/commands/start.rs +++ b/packages/infra/engine/src/commands/start.rs @@ -14,7 +14,7 @@ pub struct Opts { /// Exclude the specified services instead of including them #[arg(long)] - exclude_services: bool, + except_services: Vec, } #[derive(clap::ValueEnum, Clone, PartialEq)] @@ -55,34 +55,37 @@ impl Opts { } // Select services to run - let services = if self.services.is_empty() { + let services = if self.services.is_empty() && self.except_services.is_empty() { // Run all services run_config.services.clone() + } else if !self.except_services.is_empty() { + // Exclude specified services + let except_service_kinds = self + .except_services + .iter() + .map(|x| x.clone().into()) + .collect::>(); + + run_config + .services + .iter() + .filter(|x| !except_service_kinds.iter().any(|y| y.eq(&x.kind))) + .cloned() + .collect::>() } else { - // Filter services + // Include only specified services let service_kinds = self .services .iter() .map(|x| x.clone().into()) .collect::>(); - if self.exclude_services { - // Exclude specified services - run_config - .services - .iter() - .filter(|x| !service_kinds.iter().any(|y| y.eq(&x.kind))) - .cloned() - .collect::>() - } else { - // Include only specified services - run_config - .services - .iter() - .filter(|x| service_kinds.iter().any(|y| y.eq(&x.kind))) - .cloned() - .collect::>() - } + run_config + .services + .iter() + .filter(|x| service_kinds.iter().any(|y| y.eq(&x.kind))) + .cloned() + .collect::>() }; // Start server diff --git a/packages/infra/engine/src/run_config.rs b/packages/infra/engine/src/run_config.rs index 6de1959538..e14d223c6a 100644 --- a/packages/infra/engine/src/run_config.rs +++ b/packages/infra/engine/src/run_config.rs @@ -19,7 +19,8 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { }), Service::new( "pegboard_serverless", - ServiceKind::Standalone, + // There should only be one of these, since it's auto-scaling requests + ServiceKind::Singleton, |config, pools| Box::pin(pegboard_serverless::start(config, pools)), ), Service::new( From 1ea1889466215a7a919692d25ed746197f733bd5 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 17 Oct 2025 02:16:09 -0700 Subject: [PATCH 4/6] fix(guard): fix ws close codes --- packages/core/guard/core/src/proxy_service.rs | 33 +++++++++++++++++++ .../core/guard/core/src/websocket_handle.rs | 13 ++++++++ 2 files changed, 46 insertions(+) diff --git a/packages/core/guard/core/src/proxy_service.rs b/packages/core/guard/core/src/proxy_service.rs index d3f2f22c83..a1c5b35859 100644 --- a/packages/core/guard/core/src/proxy_service.rs +++ b/packages/core/guard/core/src/proxy_service.rs @@ -37,6 +37,7 @@ pub const X_FORWARDED_FOR: HeaderName = HeaderName::from_static("x-forwarded-for pub const X_RIVET_ERROR: HeaderName = HeaderName::from_static("x-rivet-error"); const ROUTE_CACHE_TTL: Duration = Duration::from_secs(60 * 10); // 10 minutes const PROXY_STATE_CACHE_TTL: Duration = Duration::from_secs(60 * 60); // 1 hour +const WEBSOCKET_CLOSE_LINGER: Duration = Duration::from_millis(100); // Keep TCP connection open briefly after WebSocket close /// Response body type that can handle both streaming and buffered responses #[derive(Debug)] @@ -1799,6 +1800,12 @@ impl ProxyService { }))) .await?; + // Flush to ensure close frame is sent + ws_handle.flush().await?; + + // Keep TCP connection open briefly to allow client to process close + tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; + break; } Err(err) => { @@ -1811,6 +1818,12 @@ impl ProxyService { ))) .await?; + // Flush to ensure close frame is sent + ws_handle.flush().await?; + + // Keep TCP connection open briefly to allow client to process close + tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; + break; } else { let backoff = ProxyService::calculate_backoff( @@ -1841,6 +1854,12 @@ impl ProxyService { ), ))) .await?; + + // Flush to ensure close frame is sent + ws_handle.flush().await?; + + // Keep TCP connection open briefly to allow client to process close + tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; } Ok(ResolveRouteOutput::Target(_)) => { ws_handle @@ -1850,6 +1869,13 @@ impl ProxyService { ), ))) .await?; + + // Flush to ensure close frame is sent + ws_handle.flush().await?; + + // Keep TCP connection open briefly to allow client to process close + tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; + break; } Err(err) => { @@ -1858,6 +1884,13 @@ impl ProxyService { err_to_close_frame(err), ))) .await?; + + // Flush to ensure close frame is sent + ws_handle.flush().await?; + + // Keep TCP connection open briefly to allow client to process close + tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await; + break; } } diff --git a/packages/core/guard/core/src/websocket_handle.rs b/packages/core/guard/core/src/websocket_handle.rs index 0e6acd792b..bb17d2df3b 100644 --- a/packages/core/guard/core/src/websocket_handle.rs +++ b/packages/core/guard/core/src/websocket_handle.rs @@ -83,6 +83,19 @@ impl WebSocketHandleInner { } } + pub async fn flush(&self) -> Result<()> { + let mut state = self.state.lock().await; + match &mut *state { + WebSocketState::Unaccepted { .. } | WebSocketState::Accepting => { + bail!("websocket has not been accepted"); + } + WebSocketState::Split { ws_tx } => { + ws_tx.flush().await?; + Ok(()) + } + } + } + async fn accept_inner(state: &mut WebSocketState) -> Result { if !matches!(*state, WebSocketState::Unaccepted { .. }) { bail!("websocket already accepted") From 497077a89ede6afb5f16b10340771c99dcbaf695 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 17 Oct 2025 02:16:43 -0700 Subject: [PATCH 5/6] fix(pegboard): purge runner config cache on update --- .../api-public/src/runner_configs/delete.rs | 18 +++++++++++++ .../api-public/src/runner_configs/upsert.rs | 26 +++++++++++++------ packages/core/guard/core/src/proxy_service.rs | 2 +- 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/packages/core/api-public/src/runner_configs/delete.rs b/packages/core/api-public/src/runner_configs/delete.rs index 245903d6a1..411b43dc06 100644 --- a/packages/core/api-public/src/runner_configs/delete.rs +++ b/packages/core/api-public/src/runner_configs/delete.rs @@ -67,5 +67,23 @@ async fn delete_inner( } } + // Resolve namespace + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + // Purge cache + ctx.cache() + .clone() + .request() + .purge( + "namespace.runner_config.get", + vec![(namespace.namespace_id, path.runner_name.clone())], + ) + .await?; + Ok(DeleteResponse {}) } diff --git a/packages/core/api-public/src/runner_configs/upsert.rs b/packages/core/api-public/src/runner_configs/upsert.rs index 27782bd8a7..0e225ba97b 100644 --- a/packages/core/api-public/src/runner_configs/upsert.rs +++ b/packages/core/api-public/src/runner_configs/upsert.rs @@ -125,18 +125,18 @@ async fn upsert_inner( } } + // Resolve namespace + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + // Update runner metadata // // This allows us to populate the actor names immediately upon configuring a serverless runner if let Some((url, metadata_headers)) = serverless_config { - // Resolve namespace - let namespace = ctx - .op(namespace::ops::resolve_for_name_global::Input { - name: query.namespace.clone(), - }) - .await? - .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; - if let Err(err) = utils::refresh_runner_config_metadata( ctx.clone(), namespace.namespace_id, @@ -150,5 +150,15 @@ async fn upsert_inner( } } + // Purge cache + ctx.cache() + .clone() + .request() + .purge( + "namespace.runner_config.get", + vec![(namespace.namespace_id, path.runner_name.clone())], + ) + .await?; + Ok(UpsertResponse {}) } diff --git a/packages/core/guard/core/src/proxy_service.rs b/packages/core/guard/core/src/proxy_service.rs index a1c5b35859..b8530f0aa4 100644 --- a/packages/core/guard/core/src/proxy_service.rs +++ b/packages/core/guard/core/src/proxy_service.rs @@ -945,7 +945,7 @@ impl ProxyService { if !err.is_connect() || attempts >= max_attempts { tracing::error!(?err, "Request error after {} attempts", attempts); return Err(errors::UpstreamError( - "failed to connect to runner. Make sure your runners are healthy and the provided runner address is reachable by Rivet." + "Failed to connect to runner. Make sure your runners are healthy and do not have any crash logs." .to_string(), ) .build()); From 8dbf1a2d55f58a039f4c00b08de9c046ca3ab757 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 17 Oct 2025 02:17:17 -0700 Subject: [PATCH 6/6] chore(cache): purge cache via ups --- Cargo.lock | 19 ++++++++ Cargo.toml | 3 ++ packages/common/cache/build/Cargo.toml | 1 + packages/common/cache/build/src/inner.rs | 15 ++++-- packages/common/cache/build/src/key.rs | 5 ++ packages/common/cache/build/src/lib.rs | 2 + packages/common/cache/build/src/purge.rs | 13 +++++ packages/common/cache/build/src/req_config.rs | 24 +++++++++- .../common/cache/build/tests/integration.rs | 2 +- packages/infra/engine/Cargo.toml | 1 + packages/infra/engine/src/run_config.rs | 8 +++- packages/services/cache-purge/Cargo.toml | 17 +++++++ packages/services/cache-purge/src/lib.rs | 48 +++++++++++++++++++ 13 files changed, 151 insertions(+), 7 deletions(-) create mode 100644 packages/common/cache/build/src/purge.rs create mode 100644 packages/services/cache-purge/Cargo.toml create mode 100644 packages/services/cache-purge/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index c919d3b8b4..7373052478 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4212,9 +4212,27 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tracing", + "universalpubsub", "uuid", ] +[[package]] +name = "rivet-cache-purge" +version = "0.1.0" +dependencies = [ + "anyhow", + "futures-util", + "gasoline", + "rivet-cache", + "rivet-config", + "rivet-pools", + "serde", + "serde_json", + "tokio", + "tracing", + "universalpubsub", +] + [[package]] name = "rivet-cache-result" version = "25.7.3" @@ -4290,6 +4308,7 @@ dependencies = [ "rivet-api-public", "rivet-bootstrap", "rivet-cache", + "rivet-cache-purge", "rivet-config", "rivet-guard", "rivet-logs", diff --git a/Cargo.toml b/Cargo.toml index 5a0183fc50..15980c769d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -369,6 +369,9 @@ path = "packages/core/workflow-worker" [workspace.dependencies.rivet-engine] path = "packages/infra/engine" +[workspace.dependencies.rivet-cache-purge] +path = "packages/services/cache-purge" + [workspace.dependencies.epoxy] path = "packages/services/epoxy" diff --git a/packages/common/cache/build/Cargo.toml b/packages/common/cache/build/Cargo.toml index 411e7a9beb..0bde7eedb3 100644 --- a/packages/common/cache/build/Cargo.toml +++ b/packages/common/cache/build/Cargo.toml @@ -21,6 +21,7 @@ serde_json.workspace = true thiserror.workspace = true tokio.workspace = true tracing.workspace = true +universalpubsub.workspace = true uuid.workspace = true [dev-dependencies] diff --git a/packages/common/cache/build/src/inner.rs b/packages/common/cache/build/src/inner.rs index 7c678e0cee..888d496d4a 100644 --- a/packages/common/cache/build/src/inner.rs +++ b/packages/common/cache/build/src/inner.rs @@ -9,6 +9,7 @@ pub type Cache = Arc; pub struct CacheInner { service_name: String, pub(crate) driver: Driver, + pub(crate) ups: Option, } impl Debug for CacheInner { @@ -23,24 +24,30 @@ impl CacheInner { #[tracing::instrument(skip_all)] pub fn from_env( config: &rivet_config::Config, - _pools: rivet_pools::Pools, + pools: rivet_pools::Pools, ) -> Result { let service_name = rivet_env::service_name(); + let ups = pools.ups().ok(); match &config.cache().driver { rivet_config::config::CacheDriver::Redis => todo!(), rivet_config::config::CacheDriver::InMemory => { - Ok(Self::new_in_memory(service_name.to_string(), 1000)) + Ok(Self::new_in_memory(service_name.to_string(), 1000, ups)) } } } - #[tracing::instrument] - pub fn new_in_memory(service_name: String, max_capacity: u64) -> Cache { + #[tracing::instrument(skip(ups))] + pub fn new_in_memory( + service_name: String, + max_capacity: u64, + ups: Option, + ) -> Cache { let driver = Driver::InMemory(InMemoryDriver::new(service_name.clone(), max_capacity)); Arc::new(CacheInner { service_name, driver, + ups, }) } } diff --git a/packages/common/cache/build/src/key.rs b/packages/common/cache/build/src/key.rs index 0290799b4a..b1e2646d24 100644 --- a/packages/common/cache/build/src/key.rs +++ b/packages/common/cache/build/src/key.rs @@ -80,6 +80,11 @@ impl_to_string!(i64); impl_to_string!(i128); impl_to_string!(isize); +/// A cache key that's already formatted and doesn't require escaping. +/// +/// Unlike other types that implement `CacheKey` (which escape special characters like `:` and `\`), +/// `RawCacheKey` uses the provided string as-is. This is useful when you already have a properly +/// formatted cache key string or need to preserve the exact format without transformations. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct RawCacheKey(String); diff --git a/packages/common/cache/build/src/lib.rs b/packages/common/cache/build/src/lib.rs index 69c35d180b..e8cb19eb67 100644 --- a/packages/common/cache/build/src/lib.rs +++ b/packages/common/cache/build/src/lib.rs @@ -4,6 +4,7 @@ mod getter_ctx; mod inner; mod key; mod metrics; +mod purge; mod rate_limit; mod req_config; @@ -13,5 +14,6 @@ pub use errors::*; pub use getter_ctx::*; pub use inner::*; pub use key::*; +pub use purge::*; pub use rate_limit::*; pub use req_config::*; diff --git a/packages/common/cache/build/src/purge.rs b/packages/common/cache/build/src/purge.rs new file mode 100644 index 0000000000..cd7e52636f --- /dev/null +++ b/packages/common/cache/build/src/purge.rs @@ -0,0 +1,13 @@ +use serde::{Deserialize, Serialize}; + +use crate::RawCacheKey; + +/// Topic for publishing cache purge messages via UniversalPubSub +pub const CACHE_PURGE_TOPIC: &str = "rivet.cache.purge"; + +/// Message format for cache purge requests +#[derive(Serialize, Deserialize)] +pub struct CachePurgeMessage { + pub base_key: String, + pub keys: Vec, +} diff --git a/packages/common/cache/build/src/req_config.rs b/packages/common/cache/build/src/req_config.rs index e08469c8da..4383f7dfb0 100644 --- a/packages/common/cache/build/src/req_config.rs +++ b/packages/common/cache/build/src/req_config.rs @@ -290,7 +290,29 @@ impl RequestConfig { return Ok(()); } - // Delete keys + // Publish cache purge message to all services via UPS + if let Some(ups) = &self.cache.ups { + let message = CachePurgeMessage { + base_key: base_key.to_string(), + keys: cache_keys + .iter() + .map(|k| RawCacheKey::from(k.clone())) + .collect(), + }; + + let payload = serde_json::to_vec(&message)?; + + if let Err(err) = ups + .publish(CACHE_PURGE_TOPIC, &payload, universalpubsub::PublishOpts::broadcast()) + .await + { + tracing::error!(?err, "failed to publish cache purge message"); + } else { + tracing::debug!(base_key, keys_count = cache_keys.len(), "published cache purge message"); + } + } + + // Delete keys locally match self.cache.driver.delete_keys(base_key, cache_keys).await { Ok(_) => { tracing::trace!("successfully deleted keys"); diff --git a/packages/common/cache/build/tests/integration.rs b/packages/common/cache/build/tests/integration.rs index 8d356834f2..e2c11e0ed8 100644 --- a/packages/common/cache/build/tests/integration.rs +++ b/packages/common/cache/build/tests/integration.rs @@ -7,7 +7,7 @@ use std::{ use rand::{Rng, seq::IteratorRandom, thread_rng}; async fn build_in_memory_cache() -> rivet_cache::Cache { - rivet_cache::CacheInner::new_in_memory("cache-test".to_owned(), 1000) + rivet_cache::CacheInner::new_in_memory("cache-test".to_owned(), 1000, None) } async fn test_multiple_keys(cache: rivet_cache::Cache) { diff --git a/packages/infra/engine/Cargo.toml b/packages/infra/engine/Cargo.toml index 7c464e92a5..7ec983317d 100644 --- a/packages/infra/engine/Cargo.toml +++ b/packages/infra/engine/Cargo.toml @@ -25,6 +25,7 @@ reqwest.workspace = true rivet-api-peer.workspace = true rivet-bootstrap.workspace = true rivet-cache.workspace = true +rivet-cache-purge.workspace = true rivet-config.workspace = true rivet-guard.workspace = true rivet-tracing-reconfigure.workspace = true diff --git a/packages/infra/engine/src/run_config.rs b/packages/infra/engine/src/run_config.rs index e14d223c6a..32a561b3e1 100644 --- a/packages/infra/engine/src/run_config.rs +++ b/packages/infra/engine/src/run_config.rs @@ -23,11 +23,17 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { ServiceKind::Singleton, |config, pools| Box::pin(pegboard_serverless::start(config, pools)), ), + // Core services Service::new( "tracing_reconfigure", - ServiceKind::Standalone, + ServiceKind::Core, |config, pools| Box::pin(rivet_tracing_reconfigure::start(config, pools)), ), + Service::new( + "cache_purge", + ServiceKind::Core, + |config, pools| Box::pin(rivet_cache_purge::start(config, pools)), + ), ]; Ok(RunConfigData { services }) diff --git a/packages/services/cache-purge/Cargo.toml b/packages/services/cache-purge/Cargo.toml new file mode 100644 index 0000000000..52fd54cbf6 --- /dev/null +++ b/packages/services/cache-purge/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "rivet-cache-purge" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow.workspace = true +futures-util.workspace = true +gas.workspace = true +rivet-cache.workspace = true +rivet-config.workspace = true +rivet-pools.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio.workspace = true +tracing.workspace = true +universalpubsub.workspace = true diff --git a/packages/services/cache-purge/src/lib.rs b/packages/services/cache-purge/src/lib.rs new file mode 100644 index 0000000000..0e59f8387d --- /dev/null +++ b/packages/services/cache-purge/src/lib.rs @@ -0,0 +1,48 @@ +use anyhow::Result; +use gas::prelude::*; +use rivet_cache::{CachePurgeMessage, CACHE_PURGE_TOPIC}; +use universalpubsub::NextOutput; + +#[tracing::instrument(skip_all)] +pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> { + tracing::info!("starting cache purge subscriber service"); + + // Subscribe to cache purge updates + let ups = pools.ups()?; + let mut sub = ups.subscribe(CACHE_PURGE_TOPIC).await?; + + tracing::info!(subject = ?CACHE_PURGE_TOPIC, "subscribed to cache purge updates"); + + // Get cache instance + let cache = rivet_cache::CacheInner::from_env(&config, pools)?; + + // Process incoming messages + while let Ok(NextOutput::Message(msg)) = sub.next().await { + match serde_json::from_slice::(&msg.payload) { + Ok(purge_msg) => { + tracing::info!( + base_key = ?purge_msg.base_key, + keys_count = purge_msg.keys.len(), + "received cache purge request" + ); + + // Purge the cache locally + if let Err(err) = cache + .clone() + .request() + .purge(&purge_msg.base_key, purge_msg.keys) + .await + { + tracing::error!(?err, base_key = ?purge_msg.base_key, "failed to purge cache"); + } + } + Err(err) => { + tracing::error!(?err, "failed to deserialize cache purge message"); + } + } + } + + tracing::warn!("cache purge subscriber service stopped"); + + Ok(()) +}