Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,9 @@ path = "packages/core/workflow-worker"
[workspace.dependencies.rivet-engine]
path = "packages/infra/engine"

[workspace.dependencies.rivet-cache-purge]
path = "packages/services/cache-purge"

[workspace.dependencies.epoxy]
path = "packages/services/epoxy"

Expand Down
1 change: 1 addition & 0 deletions packages/common/cache/build/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
universalpubsub.workspace = true
uuid.workspace = true

[dev-dependencies]
Expand Down
15 changes: 11 additions & 4 deletions packages/common/cache/build/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub type Cache = Arc<CacheInner>;
pub struct CacheInner {
service_name: String,
pub(crate) driver: Driver,
pub(crate) ups: Option<universalpubsub::PubSub>,
}

impl Debug for CacheInner {
Expand All @@ -23,24 +24,30 @@ impl CacheInner {
#[tracing::instrument(skip_all)]
pub fn from_env(
config: &rivet_config::Config,
_pools: rivet_pools::Pools,
pools: rivet_pools::Pools,
) -> Result<Cache, Error> {
let service_name = rivet_env::service_name();
let ups = pools.ups().ok();

match &config.cache().driver {
rivet_config::config::CacheDriver::Redis => todo!(),
rivet_config::config::CacheDriver::InMemory => {
Ok(Self::new_in_memory(service_name.to_string(), 1000))
Ok(Self::new_in_memory(service_name.to_string(), 1000, ups))
}
}
}

#[tracing::instrument]
pub fn new_in_memory(service_name: String, max_capacity: u64) -> Cache {
#[tracing::instrument(skip(ups))]
pub fn new_in_memory(
service_name: String,
max_capacity: u64,
ups: Option<universalpubsub::PubSub>,
) -> Cache {
let driver = Driver::InMemory(InMemoryDriver::new(service_name.clone(), max_capacity));
Arc::new(CacheInner {
service_name,
driver,
ups,
})
}
}
Expand Down
5 changes: 5 additions & 0 deletions packages/common/cache/build/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ impl_to_string!(i64);
impl_to_string!(i128);
impl_to_string!(isize);

/// A cache key that's already formatted and doesn't require escaping.
///
/// Unlike other types that implement `CacheKey` (which escape special characters like `:` and `\`),
/// `RawCacheKey` uses the provided string as-is. This is useful when you already have a properly
/// formatted cache key string or need to preserve the exact format without transformations.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct RawCacheKey(String);

Expand Down
2 changes: 2 additions & 0 deletions packages/common/cache/build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod getter_ctx;
mod inner;
mod key;
mod metrics;
mod purge;
mod rate_limit;
mod req_config;

Expand All @@ -13,5 +14,6 @@ pub use errors::*;
pub use getter_ctx::*;
pub use inner::*;
pub use key::*;
pub use purge::*;
pub use rate_limit::*;
pub use req_config::*;
13 changes: 13 additions & 0 deletions packages/common/cache/build/src/purge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use serde::{Deserialize, Serialize};

use crate::RawCacheKey;

/// Topic for publishing cache purge messages via UniversalPubSub
pub const CACHE_PURGE_TOPIC: &str = "rivet.cache.purge";

/// Message format for cache purge requests
#[derive(Serialize, Deserialize)]
pub struct CachePurgeMessage {
pub base_key: String,
pub keys: Vec<RawCacheKey>,
}
24 changes: 23 additions & 1 deletion packages/common/cache/build/src/req_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,29 @@
return Ok(());
}

// Delete keys
// Publish cache purge message to all services via UPS
if let Some(ups) = &self.cache.ups {
let message = CachePurgeMessage {
base_key: base_key.to_string(),
keys: cache_keys
.iter()
.map(|k| RawCacheKey::from(k.clone()))
.collect(),
};

let payload = serde_json::to_vec(&message)?;

Check warning on line 303 in packages/common/cache/build/src/req_config.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/common/cache/build/src/req_config.rs

if let Err(err) = ups
.publish(CACHE_PURGE_TOPIC, &payload, universalpubsub::PublishOpts::broadcast())
.await
{
tracing::error!(?err, "failed to publish cache purge message");
} else {

Check warning on line 310 in packages/common/cache/build/src/req_config.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/common/cache/build/src/req_config.rs
tracing::debug!(base_key, keys_count = cache_keys.len(), "published cache purge message");
}
}

// Delete keys locally
match self.cache.driver.delete_keys(base_key, cache_keys).await {
Ok(_) => {
tracing::trace!("successfully deleted keys");
Expand Down
2 changes: 1 addition & 1 deletion packages/common/cache/build/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
use rand::{Rng, seq::IteratorRandom, thread_rng};

async fn build_in_memory_cache() -> rivet_cache::Cache {
rivet_cache::CacheInner::new_in_memory("cache-test".to_owned(), 1000)
rivet_cache::CacheInner::new_in_memory("cache-test".to_owned(), 1000, None)
}

async fn test_multiple_keys(cache: rivet_cache::Cache) {
Expand Down
18 changes: 18 additions & 0 deletions packages/core/api-public/src/runner_configs/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
}

Check warning on line 35 in packages/core/api-public/src/runner_configs/delete.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/runner_configs/delete.rs

#[tracing::instrument(skip_all)]
async fn delete_inner(
Expand Down Expand Up @@ -67,5 +67,23 @@
}
}

// Resolve namespace
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

// Purge cache
ctx.cache()
.clone()
.request()
.purge(
"namespace.runner_config.get",
vec![(namespace.namespace_id, path.runner_name.clone())],
)
.await?;

Ok(DeleteResponse {})
}
26 changes: 18 additions & 8 deletions packages/core/api-public/src/runner_configs/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,18 @@ async fn upsert_inner(
}
}

// Resolve namespace
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

// Update runner metadata
//
// This allows us to populate the actor names immediately upon configuring a serverless runner
if let Some((url, metadata_headers)) = serverless_config {
// Resolve namespace
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

if let Err(err) = utils::refresh_runner_config_metadata(
ctx.clone(),
namespace.namespace_id,
Expand All @@ -150,5 +150,15 @@ async fn upsert_inner(
}
}

// Purge cache
ctx.cache()
.clone()
.request()
.purge(
"namespace.runner_config.get",
vec![(namespace.namespace_id, path.runner_name.clone())],
)
.await?;

Ok(UpsertResponse {})
}
35 changes: 34 additions & 1 deletion packages/core/guard/core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub const X_FORWARDED_FOR: HeaderName = HeaderName::from_static("x-forwarded-for
pub const X_RIVET_ERROR: HeaderName = HeaderName::from_static("x-rivet-error");
const ROUTE_CACHE_TTL: Duration = Duration::from_secs(60 * 10); // 10 minutes
const PROXY_STATE_CACHE_TTL: Duration = Duration::from_secs(60 * 60); // 1 hour
const WEBSOCKET_CLOSE_LINGER: Duration = Duration::from_millis(100); // Keep TCP connection open briefly after WebSocket close

/// Response body type that can handle both streaming and buffered responses
#[derive(Debug)]
Expand Down Expand Up @@ -944,7 +945,7 @@ impl ProxyService {
if !err.is_connect() || attempts >= max_attempts {
tracing::error!(?err, "Request error after {} attempts", attempts);
return Err(errors::UpstreamError(
"failed to connect to runner. Make sure your runners are healthy and the provided runner address is reachable by Rivet."
"Failed to connect to runner. Make sure your runners are healthy and do not have any crash logs."
.to_string(),
)
.build());
Expand Down Expand Up @@ -1799,6 +1800,12 @@ impl ProxyService {
})))
.await?;

// Flush to ensure close frame is sent
ws_handle.flush().await?;

// Keep TCP connection open briefly to allow client to process close
tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await;

break;
}
Err(err) => {
Expand All @@ -1811,6 +1818,12 @@ impl ProxyService {
)))
.await?;

// Flush to ensure close frame is sent
ws_handle.flush().await?;

// Keep TCP connection open briefly to allow client to process close
tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await;

break;
} else {
let backoff = ProxyService::calculate_backoff(
Expand Down Expand Up @@ -1841,6 +1854,12 @@ impl ProxyService {
),
)))
.await?;

// Flush to ensure close frame is sent
ws_handle.flush().await?;

// Keep TCP connection open briefly to allow client to process close
tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await;
}
Ok(ResolveRouteOutput::Target(_)) => {
ws_handle
Expand All @@ -1850,6 +1869,13 @@ impl ProxyService {
),
)))
.await?;

// Flush to ensure close frame is sent
ws_handle.flush().await?;

// Keep TCP connection open briefly to allow client to process close
tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await;

break;
}
Err(err) => {
Expand All @@ -1858,6 +1884,13 @@ impl ProxyService {
err_to_close_frame(err),
)))
.await?;

// Flush to ensure close frame is sent
ws_handle.flush().await?;

// Keep TCP connection open briefly to allow client to process close
tokio::time::sleep(WEBSOCKET_CLOSE_LINGER).await;

break;
}
}
Expand Down
13 changes: 13 additions & 0 deletions packages/core/guard/core/src/websocket_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ impl WebSocketHandleInner {
}
}

pub async fn flush(&self) -> Result<()> {
let mut state = self.state.lock().await;
match &mut *state {
WebSocketState::Unaccepted { .. } | WebSocketState::Accepting => {
bail!("websocket has not been accepted");
}
WebSocketState::Split { ws_tx } => {
ws_tx.flush().await?;
Ok(())
}
}
}

async fn accept_inner(state: &mut WebSocketState) -> Result<WebSocketReceiver> {
if !matches!(*state, WebSocketState::Unaccepted { .. }) {
bail!("websocket already accepted")
Expand Down
1 change: 1 addition & 0 deletions packages/core/pegboard-serverless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rivet-types.workspace = true
rivet-util.workspace = true
tracing.workspace = true
universaldb.workspace = true
universalpubsub.workspace = true
vbare.workspace = true

namespace.workspace = true
Expand Down
Loading
Loading