Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/common/api-helper/build/src/macro_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ pub fn __deserialize_query<T: DeserializeOwned + Send>(route: &Url) -> GlobalRes
}

#[doc(hidden)]
#[tracing::instrument(skip_all, name="setup_ctx")]
pub async fn __with_ctx<
A: auth::ApiAuth + Send,
DB: chirp_workflow::db::Database + Sync + 'static,
Expand Down
1 change: 1 addition & 0 deletions packages/common/api-helper/build/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ pub fn as_auth_expired<T>(res: GlobalResult<T>) -> GlobalResult<T> {
}
}

#[tracing::instrument(skip_all)]
pub async fn basic_rate_limit(
config: &rivet_config::Config,
rate_limit_ctx: crate::auth::AuthRateLimitCtx<'_>,
Expand Down
76 changes: 59 additions & 17 deletions packages/common/cache/build/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
use moka::future::{Cache, CacheBuilder};
use redis::AsyncCommands;
use rivet_pools::prelude::*;
use tracing::Instrument;

use crate::{error::Error, metrics};

Expand All @@ -21,6 +22,7 @@ pub enum Driver {

impl Driver {
/// Fetch multiple values from cache at once
#[tracing::instrument(skip_all, fields(driver=%self))]
pub async fn fetch_values<'a>(
&'a self,
base_key: &'a str,
Expand All @@ -33,6 +35,7 @@ impl Driver {
}

/// Set multiple values in cache at once
#[tracing::instrument(skip_all, fields(driver=%self))]
pub async fn set_values<'a>(
&'a self,
base_key: &'a str,
Expand All @@ -45,6 +48,7 @@ impl Driver {
}

/// Delete multiple keys from cache
#[tracing::instrument(skip_all, fields(driver=%self))]
pub async fn delete_keys<'a>(
&'a self,
base_key: &'a str,
Expand Down Expand Up @@ -95,6 +99,7 @@ impl Driver {
}

/// Increment a rate limit counter and return the new count
#[tracing::instrument(skip_all, fields(driver=%self))]
pub async fn rate_limit_increment<'a>(
&'a self,
key: &'a str,
Expand Down Expand Up @@ -123,6 +128,15 @@ impl Driver {
}
}

impl std::fmt::Display for Driver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Driver::Redis(_) => write!(f, "redis"),
Driver::InMemory(_) => write!(f, "in_memory"),
}
}
}

/// Redis cache driver implementation
#[derive(Clone)]
pub struct RedisDriver {
Expand Down Expand Up @@ -176,6 +190,7 @@ impl RedisDriver {

match mget_cmd
.query_async::<_, Vec<Option<CacheValue>>>(&mut redis_conn)
.instrument(tracing::info_span!("redis_query"))
.await
{
Ok(values) => {
Expand Down Expand Up @@ -215,7 +230,11 @@ impl RedisDriver {
.ignore();
}

match pipe.query_async(&mut redis_conn).await {
match pipe
.query_async(&mut redis_conn)
.instrument(tracing::info_span!("redis_query"))
.await
{
Ok(()) => {
tracing::trace!("successfully wrote to cache");
Ok(())
Expand All @@ -242,7 +261,11 @@ impl RedisDriver {
.with_label_values(&[&base_key])
.inc_by(redis_keys.len() as u64);

match redis_conn.del::<_, ()>(redis_keys).await {
match redis_conn
.del::<_, ()>(redis_keys)
.instrument(tracing::info_span!("redis_query"))
.await
{
Ok(_) => {
tracing::trace!("successfully deleted keys");
Ok(())
Expand Down Expand Up @@ -300,7 +323,11 @@ impl RedisDriver {
pipe.incr(key, 1);
pipe.pexpire(key, ttl_ms as usize).ignore();

match pipe.query_async::<_, (i64,)>(&mut redis_conn).await {
match pipe
.query_async::<_, (i64,)>(&mut redis_conn)
.instrument(tracing::info_span!("redis_query"))
.await
{
Ok((incr,)) => Ok(incr),
Err(err) => {
tracing::error!(?err, ?key, "failed to increment rate limit key");
Expand Down Expand Up @@ -415,9 +442,14 @@ impl InMemoryDriver {

let mut result = Vec::with_capacity(keys.len());

for key in keys {
result.push(cache.get(&key).await.map(|x| x.value.clone()));
// Async block for metrics
async {
for key in keys {
result.push(cache.get(&key).await.map(|x| x.value.clone()));
}
}
.instrument(tracing::info_span!("get"))
.await;

tracing::debug!(
cached_len = result.iter().filter(|x| x.is_some()).count(),
Expand All @@ -435,16 +467,21 @@ impl InMemoryDriver {
) -> Result<(), Error> {
let cache = self.cache.clone();

for (key, value, expire_at) in keys_values {
// Create an entry with the value and expiration time
let entry = ExpiringValue {
value,
expiry_time: expire_at,
};

// Store in cache - expiry will be handled by ValueExpiry
cache.insert(key, entry).await;
// Async block for metrics
async {
for (key, value, expire_at) in keys_values {
// Create an entry with the value and expiration time
let entry = ExpiringValue {
value,
expiry_time: expire_at,
};

// Store in cache - expiry will be handled by ValueExpiry
cache.insert(key, entry).await;
}
}
.instrument(tracing::info_span!("set"))
.await;

tracing::trace!("successfully wrote to in-memory cache with per-key expiry");
Ok(())
Expand All @@ -465,10 +502,15 @@ impl InMemoryDriver {
.with_label_values(&[&base_key])
.inc_by(keys.len() as u64);

for key in keys {
// Use remove instead of invalidate to ensure it's actually removed
cache.remove(&key).await;
// Async block for metrics
async {
for key in keys {
// Use remove instead of invalidate to ensure it's actually removed
cache.remove(&key).await;
}
}
.instrument(tracing::info_span!("remove"))
.await;

tracing::trace!("successfully deleted keys from in-memory cache");
Ok(())
Expand Down
1 change: 1 addition & 0 deletions packages/common/cache/build/src/rate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ impl CacheInner {
///
/// This is infallible in order to make sure that anything that depends on
/// this never fails.
#[tracing::instrument(skip_all)]
pub async fn rate_limit(
&self,
key: &impl CacheKey,
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api/actor/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub struct CheckOutput {

#[async_trait]
impl ApiAuth for Auth {
#[tracing::instrument(skip_all)]
async fn new(
config: rivet_config::Config,
api_token: Option<String>,
Expand All @@ -46,6 +47,7 @@ impl ApiAuth for Auth {
})
}

#[tracing::instrument(skip_all)]
async fn rate_limit(
config: &rivet_config::Config,
rate_limit_ctx: AuthRateLimitCtx<'_>,
Expand Down
10 changes: 5 additions & 5 deletions packages/core/api/actor/src/route/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ pub async fn create(
query.global.environment.as_deref(),
query.endpoint_type,
)
.instrument(tracing::info_span!("proxy request"))
.instrument(tracing::info_span!("proxy_request", base_path=%config.base_path))
.await
{
Ok(res) => Ok(res),
Expand Down Expand Up @@ -427,7 +427,7 @@ pub async fn destroy(
query.global.environment.as_deref(),
query.override_kill_timeout,
)
.instrument(tracing::info_span!("proxy request"))
.instrument(tracing::info_span!("proxy_request", base_path=%config.base_path))
.await
{
Ok(res) => Ok(res),
Expand Down Expand Up @@ -561,7 +561,7 @@ pub async fn upgrade(
query.project.as_deref(),
query.environment.as_deref(),
)
.instrument(tracing::info_span!("proxy request"))
.instrument(tracing::info_span!("proxy_request", base_path=%config.base_path))
.await
{
Ok(res) => Ok(res),
Expand Down Expand Up @@ -705,7 +705,7 @@ pub async fn upgrade_all(
query.project.as_deref(),
query.environment.as_deref(),
)
.instrument(tracing::info_span!("proxy request"))
.instrument(tracing::info_span!("proxy_request", base_path=%config.base_path))
.await
{
Ok(res) => Ok(res),
Expand Down Expand Up @@ -873,7 +873,7 @@ async fn list_actors_inner(
query.include_destroyed,
query.cursor.as_deref(),
)
.instrument(tracing::info_span!("proxy request")),
.instrument(tracing::info_span!("proxy_request", base_path=%config.base_path)),
)
.await;

Expand Down
1 change: 1 addition & 0 deletions packages/core/api/status/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub struct Auth {

#[async_trait]
impl ApiAuth for Auth {
#[tracing::instrument(skip_all)]
async fn new(
config: rivet_config::Config,
api_token: Option<String>,
Expand Down
6 changes: 3 additions & 3 deletions packages/core/api/status/src/route/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ pub async fn status(
Some(&system_test_env),
None,
)
.instrument(tracing::info_span!("actor create request"))
.instrument(tracing::info_span!("actor_create_request"))
.await?;
let actor_id = res.actor.id;

Expand Down Expand Up @@ -240,7 +240,7 @@ pub async fn status(
Some(&system_test_env),
None,
)
.instrument(tracing::info_span!("actor destroy request"))
.instrument(tracing::info_span!("actor_destroy_request"))
.await?;

// Unwrap res
Expand Down Expand Up @@ -350,7 +350,7 @@ async fn test_http(
client
.get(format!("{actor_origin}/health"))
.send()
.instrument(tracing::info_span!("health request"))
.instrument(tracing::info_span!("health_request"))
.await?
.error_for_status()?;

Expand Down
2 changes: 2 additions & 0 deletions packages/edge/api/actor/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct CheckOutput {

#[async_trait]
impl ApiAuth for Auth {
#[tracing::instrument(skip_all)]
async fn new(
config: rivet_config::Config,
api_token: Option<String>,
Expand All @@ -44,6 +45,7 @@ impl ApiAuth for Auth {
})
}

#[tracing::instrument(skip_all)]
async fn rate_limit(
config: &rivet_config::Config,
rate_limit_ctx: AuthRateLimitCtx<'_>,
Expand Down
Loading