diff --git a/packages/common/api-helper/build/src/macro_util.rs b/packages/common/api-helper/build/src/macro_util.rs index dd7021f78f..03d556e0e7 100644 --- a/packages/common/api-helper/build/src/macro_util.rs +++ b/packages/common/api-helper/build/src/macro_util.rs @@ -237,6 +237,7 @@ pub fn __deserialize_query(route: &Url) -> GlobalRes } #[doc(hidden)] +#[tracing::instrument(skip_all, name="setup_ctx")] pub async fn __with_ctx< A: auth::ApiAuth + Send, DB: chirp_workflow::db::Database + Sync + 'static, diff --git a/packages/common/api-helper/build/src/util.rs b/packages/common/api-helper/build/src/util.rs index f9687199e7..a5e74e2949 100644 --- a/packages/common/api-helper/build/src/util.rs +++ b/packages/common/api-helper/build/src/util.rs @@ -284,6 +284,7 @@ pub fn as_auth_expired(res: GlobalResult) -> GlobalResult { } } +#[tracing::instrument(skip_all)] pub async fn basic_rate_limit( config: &rivet_config::Config, rate_limit_ctx: crate::auth::AuthRateLimitCtx<'_>, diff --git a/packages/common/cache/build/src/driver.rs b/packages/common/cache/build/src/driver.rs index 8a4f804f86..95519ee8e0 100644 --- a/packages/common/cache/build/src/driver.rs +++ b/packages/common/cache/build/src/driver.rs @@ -6,6 +6,7 @@ use std::{ use moka::future::{Cache, CacheBuilder}; use redis::AsyncCommands; use rivet_pools::prelude::*; +use tracing::Instrument; use crate::{error::Error, metrics}; @@ -21,6 +22,7 @@ pub enum Driver { impl Driver { /// Fetch multiple values from cache at once + #[tracing::instrument(skip_all, fields(driver=%self))] pub async fn fetch_values<'a>( &'a self, base_key: &'a str, @@ -33,6 +35,7 @@ impl Driver { } /// Set multiple values in cache at once + #[tracing::instrument(skip_all, fields(driver=%self))] pub async fn set_values<'a>( &'a self, base_key: &'a str, @@ -45,6 +48,7 @@ impl Driver { } /// Delete multiple keys from cache + #[tracing::instrument(skip_all, fields(driver=%self))] pub async fn delete_keys<'a>( &'a self, base_key: &'a str, @@ -95,6 +99,7 @@ impl Driver { } /// Increment a rate limit counter and return the new count + #[tracing::instrument(skip_all, fields(driver=%self))] pub async fn rate_limit_increment<'a>( &'a self, key: &'a str, @@ -123,6 +128,15 @@ impl Driver { } } +impl std::fmt::Display for Driver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Driver::Redis(_) => write!(f, "redis"), + Driver::InMemory(_) => write!(f, "in_memory"), + } + } +} + /// Redis cache driver implementation #[derive(Clone)] pub struct RedisDriver { @@ -176,6 +190,7 @@ impl RedisDriver { match mget_cmd .query_async::<_, Vec>>(&mut redis_conn) + .instrument(tracing::info_span!("redis_query")) .await { Ok(values) => { @@ -215,7 +230,11 @@ impl RedisDriver { .ignore(); } - match pipe.query_async(&mut redis_conn).await { + match pipe + .query_async(&mut redis_conn) + .instrument(tracing::info_span!("redis_query")) + .await + { Ok(()) => { tracing::trace!("successfully wrote to cache"); Ok(()) @@ -242,7 +261,11 @@ impl RedisDriver { .with_label_values(&[&base_key]) .inc_by(redis_keys.len() as u64); - match redis_conn.del::<_, ()>(redis_keys).await { + match redis_conn + .del::<_, ()>(redis_keys) + .instrument(tracing::info_span!("redis_query")) + .await + { Ok(_) => { tracing::trace!("successfully deleted keys"); Ok(()) @@ -300,7 +323,11 @@ impl RedisDriver { pipe.incr(key, 1); pipe.pexpire(key, ttl_ms as usize).ignore(); - match pipe.query_async::<_, (i64,)>(&mut redis_conn).await { + match pipe + .query_async::<_, (i64,)>(&mut redis_conn) + .instrument(tracing::info_span!("redis_query")) + .await + { Ok((incr,)) => Ok(incr), Err(err) => { tracing::error!(?err, ?key, "failed to increment rate limit key"); @@ -415,9 +442,14 @@ impl InMemoryDriver { let mut result = Vec::with_capacity(keys.len()); - for key in keys { - result.push(cache.get(&key).await.map(|x| x.value.clone())); + // Async block for metrics + async { + for key in keys { + result.push(cache.get(&key).await.map(|x| x.value.clone())); + } } + .instrument(tracing::info_span!("get")) + .await; tracing::debug!( cached_len = result.iter().filter(|x| x.is_some()).count(), @@ -435,16 +467,21 @@ impl InMemoryDriver { ) -> Result<(), Error> { let cache = self.cache.clone(); - for (key, value, expire_at) in keys_values { - // Create an entry with the value and expiration time - let entry = ExpiringValue { - value, - expiry_time: expire_at, - }; - - // Store in cache - expiry will be handled by ValueExpiry - cache.insert(key, entry).await; + // Async block for metrics + async { + for (key, value, expire_at) in keys_values { + // Create an entry with the value and expiration time + let entry = ExpiringValue { + value, + expiry_time: expire_at, + }; + + // Store in cache - expiry will be handled by ValueExpiry + cache.insert(key, entry).await; + } } + .instrument(tracing::info_span!("set")) + .await; tracing::trace!("successfully wrote to in-memory cache with per-key expiry"); Ok(()) @@ -465,10 +502,15 @@ impl InMemoryDriver { .with_label_values(&[&base_key]) .inc_by(keys.len() as u64); - for key in keys { - // Use remove instead of invalidate to ensure it's actually removed - cache.remove(&key).await; + // Async block for metrics + async { + for key in keys { + // Use remove instead of invalidate to ensure it's actually removed + cache.remove(&key).await; + } } + .instrument(tracing::info_span!("remove")) + .await; tracing::trace!("successfully deleted keys from in-memory cache"); Ok(()) diff --git a/packages/common/cache/build/src/rate_limit.rs b/packages/common/cache/build/src/rate_limit.rs index 388ee3b4b4..7878c5cc9d 100644 --- a/packages/common/cache/build/src/rate_limit.rs +++ b/packages/common/cache/build/src/rate_limit.rs @@ -18,6 +18,7 @@ impl CacheInner { /// /// This is infallible in order to make sure that anything that depends on /// this never fails. + #[tracing::instrument(skip_all)] pub async fn rate_limit( &self, key: &impl CacheKey, diff --git a/packages/core/api/actor/src/auth.rs b/packages/core/api/actor/src/auth.rs index 6ce4239b32..b5a0fbc062 100644 --- a/packages/core/api/actor/src/auth.rs +++ b/packages/core/api/actor/src/auth.rs @@ -26,6 +26,7 @@ pub struct CheckOutput { #[async_trait] impl ApiAuth for Auth { + #[tracing::instrument(skip_all)] async fn new( config: rivet_config::Config, api_token: Option, @@ -46,6 +47,7 @@ impl ApiAuth for Auth { }) } + #[tracing::instrument(skip_all)] async fn rate_limit( config: &rivet_config::Config, rate_limit_ctx: AuthRateLimitCtx<'_>, diff --git a/packages/core/api/actor/src/route/actors.rs b/packages/core/api/actor/src/route/actors.rs index 5b7cf88618..3be629b32a 100644 --- a/packages/core/api/actor/src/route/actors.rs +++ b/packages/core/api/actor/src/route/actors.rs @@ -232,7 +232,7 @@ pub async fn create( query.global.environment.as_deref(), query.endpoint_type, ) - .instrument(tracing::info_span!("proxy request")) + .instrument(tracing::info_span!("proxy_request", base_path=%config.base_path)) .await { Ok(res) => Ok(res), @@ -427,7 +427,7 @@ pub async fn destroy( query.global.environment.as_deref(), query.override_kill_timeout, ) - .instrument(tracing::info_span!("proxy request")) + .instrument(tracing::info_span!("proxy_request", base_path=%config.base_path)) .await { Ok(res) => Ok(res), @@ -561,7 +561,7 @@ pub async fn upgrade( query.project.as_deref(), query.environment.as_deref(), ) - .instrument(tracing::info_span!("proxy request")) + .instrument(tracing::info_span!("proxy_request", base_path=%config.base_path)) .await { Ok(res) => Ok(res), @@ -705,7 +705,7 @@ pub async fn upgrade_all( query.project.as_deref(), query.environment.as_deref(), ) - .instrument(tracing::info_span!("proxy request")) + .instrument(tracing::info_span!("proxy_request", base_path=%config.base_path)) .await { Ok(res) => Ok(res), @@ -873,7 +873,7 @@ async fn list_actors_inner( query.include_destroyed, query.cursor.as_deref(), ) - .instrument(tracing::info_span!("proxy request")), + .instrument(tracing::info_span!("proxy_request", base_path=%config.base_path)), ) .await; diff --git a/packages/core/api/status/src/auth.rs b/packages/core/api/status/src/auth.rs index 77223ce791..7e19b45305 100644 --- a/packages/core/api/status/src/auth.rs +++ b/packages/core/api/status/src/auth.rs @@ -8,6 +8,7 @@ pub struct Auth { #[async_trait] impl ApiAuth for Auth { + #[tracing::instrument(skip_all)] async fn new( config: rivet_config::Config, api_token: Option, diff --git a/packages/core/api/status/src/route/actor.rs b/packages/core/api/status/src/route/actor.rs index 2fec99c2d4..fb4b6e46a7 100644 --- a/packages/core/api/status/src/route/actor.rs +++ b/packages/core/api/status/src/route/actor.rs @@ -208,7 +208,7 @@ pub async fn status( Some(&system_test_env), None, ) - .instrument(tracing::info_span!("actor create request")) + .instrument(tracing::info_span!("actor_create_request")) .await?; let actor_id = res.actor.id; @@ -240,7 +240,7 @@ pub async fn status( Some(&system_test_env), None, ) - .instrument(tracing::info_span!("actor destroy request")) + .instrument(tracing::info_span!("actor_destroy_request")) .await?; // Unwrap res @@ -350,7 +350,7 @@ async fn test_http( client .get(format!("{actor_origin}/health")) .send() - .instrument(tracing::info_span!("health request")) + .instrument(tracing::info_span!("health_request")) .await? .error_for_status()?; diff --git a/packages/edge/api/actor/src/auth.rs b/packages/edge/api/actor/src/auth.rs index fb07b65439..2c805fc411 100644 --- a/packages/edge/api/actor/src/auth.rs +++ b/packages/edge/api/actor/src/auth.rs @@ -25,6 +25,7 @@ pub struct CheckOutput { #[async_trait] impl ApiAuth for Auth { + #[tracing::instrument(skip_all)] async fn new( config: rivet_config::Config, api_token: Option, @@ -44,6 +45,7 @@ impl ApiAuth for Auth { }) } + #[tracing::instrument(skip_all)] async fn rate_limit( config: &rivet_config::Config, rate_limit_ctx: AuthRateLimitCtx<'_>,