diff --git a/Cargo.lock b/Cargo.lock index 71d2ee1d42..d6b2b0a595 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4115,6 +4115,7 @@ dependencies = [ "serde_json", "tokio", "tracing", + "universalpubsub", "utoipa", "uuid", ] @@ -4310,6 +4311,7 @@ dependencies = [ "rivet-telemetry", "rivet-term", "rivet-test-deps", + "rivet-tracing-reconfigure", "rivet-util", "rivet-workflow-worker", "rstest", @@ -4536,6 +4538,7 @@ dependencies = [ name = "rivet-runtime" version = "25.7.3" dependencies = [ + "anyhow", "console-subscriber", "lazy_static", "opentelemetry", @@ -4628,6 +4631,24 @@ dependencies = [ "uuid", ] +[[package]] +name = "rivet-tracing-reconfigure" +version = "0.1.0" +dependencies = [ + "anyhow", + "futures-util", + "gasoline", + "rivet-config", + "rivet-metrics", + "rivet-pools", + "rivet-runtime", + "serde", + "serde_json", + "tokio", + "tracing", + "universalpubsub", +] + [[package]] name = "rivet-types" version = "25.7.3" diff --git a/Cargo.toml b/Cargo.toml index e0c0967c2a..5a0183fc50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -280,16 +280,16 @@ path = "packages/common/error/core" [workspace.dependencies.rivet-error-macros] path = "packages/common/error/macros" -[workspace.dependencies.gas] -package = "gasoline" -path = "packages/common/gasoline/core" - [workspace.dependencies.gasoline] path = "packages/common/gasoline/core" [workspace.dependencies.gasoline-macros] path = "packages/common/gasoline/macros" +[workspace.dependencies.gas] +package = "gasoline" +path = "packages/common/gasoline/core" + [workspace.dependencies.rivet-logs] path = "packages/common/logs" @@ -375,6 +375,9 @@ path = "packages/services/epoxy" [workspace.dependencies.internal] path = "packages/services/internal" +[workspace.dependencies.rivet-tracing-reconfigure] +path = "packages/services/tracing-reconfigure" + [workspace.dependencies.namespace] path = "packages/services/namespace" diff --git a/dev-docs/operate/TRACING_RECONFIGURE.md b/dev-docs/operate/TRACING_RECONFIGURE.md new file mode 100644 index 0000000000..e51d2bc4bf --- /dev/null +++ b/dev-docs/operate/TRACING_RECONFIGURE.md @@ -0,0 +1,78 @@ +# Dynamic Tracing Configuration + +Dynamically reconfigure log levels and OpenTelemetry sampling for all running services without restart. + +## Log Filter Configuration + +Control which log messages are displayed by setting filter directives (similar to `RUST_LOG`). + +**Set log filter to debug** + +```bash +rivet-engine tracing config -f debug + +# Or via HTTP API: +curl -X PUT http://localhost:6421/debug/tracing/config \ + -H "Content-Type: application/json" \ + -d '{"filter":"debug"}' +``` + +**Debug a specific package** + +```bash +rivet-engine tracing config -f "debug,rivet_api_peer=trace" + +# Or via HTTP API: +curl -X PUT http://localhost:6421/debug/tracing/config \ + -H "Content-Type: application/json" \ + -d '{"filter":"debug,rivet_api_peer=trace"}' +``` + +**Reset log filter to defaults** + +```bash +rivet-engine tracing config -f "" + +# Or via HTTP API: +curl -X PUT http://localhost:6421/debug/tracing/config \ + -H "Content-Type: application/json" \ + -d '{"filter":null}' +``` + +## OpenTelemetry Sampler Ratio + +Control what percentage of traces are sampled and sent to the OpenTelemetry collector. + +**Set sampler ratio to 10%** + +```bash +rivet-engine tracing config -s 0.1 + +# Or via HTTP API: +curl -X PUT http://localhost:6421/debug/tracing/config \ + -H "Content-Type: application/json" \ + -d '{"sampler_ratio":0.1}' +``` + +**Set sampler ratio to 100% (capture all traces)** + +```bash +rivet-engine tracing config -s 1.0 + +# Or via HTTP API: +curl -X PUT http://localhost:6421/debug/tracing/config \ + -H "Content-Type: application/json" \ + -d '{"sampler_ratio":1.0}' +``` + +**Reset sampler ratio to default** + +```bash +rivet-engine tracing config -s 0.001 + +# Or via HTTP API: +curl -X PUT http://localhost:6421/debug/tracing/config \ + -H "Content-Type: application/json" \ + -d '{"sampler_ratio":null}' +``` + diff --git a/out/openapi.json b/out/openapi.json index 7964d88067..1e34f8af55 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -324,31 +324,6 @@ ] } }, - "/health/fanout": { - "get": { - "tags": [ - "health" - ], - "operationId": "health_fanout", - "responses": { - "200": { - "description": "", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/HealthFanoutResponse" - } - } - } - } - }, - "security": [ - { - "bearer_auth": [] - } - ] - } - }, "/namespaces": { "get": { "tags": [ diff --git a/packages/common/metrics/src/lib.rs b/packages/common/metrics/src/lib.rs index 91df1d927f..0095bec63e 100644 --- a/packages/common/metrics/src/lib.rs +++ b/packages/common/metrics/src/lib.rs @@ -5,4 +5,4 @@ mod buckets; pub use buckets::*; pub use opentelemetry as otel; pub use opentelemetry::KeyValue; -pub use providers::{OtelProviderGuard, init_otel_providers}; +pub use providers::{OtelProviderGuard, init_otel_providers, set_sampler_ratio}; diff --git a/packages/common/metrics/src/providers.rs b/packages/common/metrics/src/providers.rs index e8f5b08d78..15dec7201f 100644 --- a/packages/common/metrics/src/providers.rs +++ b/packages/common/metrics/src/providers.rs @@ -1,7 +1,9 @@ // Based off of https://github.com/tokio-rs/tracing-opentelemetry/blob/v0.1.x/examples/opentelemetry-otlp.rs // Based off of https://github.com/tokio-rs/tracing-opentelemetry/blob/v0.1.x/examples/opentelemetry-otlp.rs +use std::sync::{Arc, RwLock, OnceLock}; use opentelemetry::{KeyValue, global}; +use opentelemetry::trace::{SamplingResult, SpanKind}; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{ Resource, @@ -11,6 +13,65 @@ use opentelemetry_sdk::{ }; use opentelemetry_semantic_conventions::{SCHEMA_URL, attribute::SERVICE_VERSION}; +/// Dynamic sampler that can be updated at runtime +#[derive(Clone, Debug)] +struct DynamicSampler { + ratio: Arc>, +} + +impl DynamicSampler { + fn new(ratio: f64) -> Self { + Self { + ratio: Arc::new(RwLock::new(ratio)), + } + } + + fn set_ratio(&self, ratio: f64) { + if let Ok(mut r) = self.ratio.write() { + *r = ratio; + } + } +} + +impl opentelemetry_sdk::trace::ShouldSample for DynamicSampler { + fn should_sample( + &self, + parent_context: Option<&opentelemetry::Context>, + trace_id: opentelemetry::trace::TraceId, + _name: &str, + _span_kind: &SpanKind, + _attributes: &[KeyValue], + _links: &[opentelemetry::trace::Link], + ) -> SamplingResult { + let ratio = self.ratio.read().ok().map(|r| *r).unwrap_or(0.001); + + // Use TraceIdRatioBased sampling logic + let sampler = Sampler::TraceIdRatioBased(ratio); + sampler.should_sample( + parent_context, + trace_id, + _name, + _span_kind, + _attributes, + _links, + ) + } +} + +static SAMPLER: OnceLock = OnceLock::new(); + +/// Update the sampler ratio at runtime +pub fn set_sampler_ratio(ratio: f64) -> anyhow::Result<()> { + let sampler = SAMPLER + .get() + .ok_or_else(|| anyhow::anyhow!("sampler not initialized"))?; + + sampler.set_ratio(ratio); + tracing::info!(?ratio, "updated sampler ratio"); + + Ok(()) +} + fn resource() -> Resource { let mut resource = Resource::builder() .with_service_name(rivet_env::service_name()) @@ -48,14 +109,20 @@ fn init_tracer_provider() -> SdkTracerProvider { .build() .unwrap(); + // Create dynamic sampler with initial ratio from env + let initial_ratio = std::env::var("RIVET_OTEL_SAMPLER_RATIO") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(0.001); + + let dynamic_sampler = DynamicSampler::new(initial_ratio); + + // Store sampler globally for later updates + let _ = SAMPLER.set(dynamic_sampler.clone()); + SdkTracerProvider::builder() - // Customize sampling strategy - .with_sampler(Sampler::ParentBased(Box::new(Sampler::TraceIdRatioBased( - std::env::var("RIVET_OTEL_SAMPLER_RATIO") - .ok() - .and_then(|s| s.parse::().ok()) - .unwrap_or(0.001), - )))) + // Customize sampling strategy with parent-based sampling using our dynamic sampler + .with_sampler(Sampler::ParentBased(Box::new(dynamic_sampler))) // If export trace to AWS X-Ray, you can use XrayIdGenerator .with_id_generator(RandomIdGenerator::default()) .with_resource(resource()) diff --git a/packages/common/runtime/Cargo.toml b/packages/common/runtime/Cargo.toml index 3b922b4acc..73f92087fb 100644 --- a/packages/common/runtime/Cargo.toml +++ b/packages/common/runtime/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true edition.workspace = true [dependencies] +anyhow.workspace = true console-subscriber.workspace = true lazy_static.workspace = true rivet-metrics.workspace = true diff --git a/packages/common/runtime/src/lib.rs b/packages/common/runtime/src/lib.rs index 9fa53f930d..35f6090454 100644 --- a/packages/common/runtime/src/lib.rs +++ b/packages/common/runtime/src/lib.rs @@ -5,6 +5,8 @@ use tokio::sync::{Notify, OnceCell}; mod metrics; mod traces; +pub use traces::reload_log_filter; + static SHUTDOWN: OnceCell> = OnceCell::const_new(); /// Returns `None` if the runtime was shut down manually. diff --git a/packages/common/runtime/src/traces.rs b/packages/common/runtime/src/traces.rs index b931cc8073..32753ed235 100644 --- a/packages/common/runtime/src/traces.rs +++ b/packages/common/runtime/src/traces.rs @@ -2,13 +2,23 @@ use console_subscriber; use opentelemetry::trace::TracerProvider; -use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use rivet_metrics::OtelProviderGuard; +use std::sync::OnceLock; use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; -use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt}; +use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, reload, util::SubscriberInitExt}; + +type ReloadHandle = reload::Handle; + +static RELOAD_HANDLE: OnceLock = OnceLock::new(); /// Initialize tracing-subscriber pub fn init_tracing_subscriber(otel_providers: &Option) { + // Create reloadable env filter for RUST_LOG + let (reload_layer, reload_handle) = reload::Layer::new(build_filter_from_env_var("RUST_LOG")); + + // Store handle globally for later reloading + let _ = RELOAD_HANDLE.set(reload_handle); + let registry = tracing_subscriber::registry(); // Build and apply otel layers to the registry if otel is enabled @@ -17,10 +27,10 @@ pub fn init_tracing_subscriber(otel_providers: &Option) { let tracer = providers.tracer_provider.tracer("tracing-otel-subscriber"); let otel_trace_layer = - OpenTelemetryLayer::new(tracer).with_filter(env_filter("RUST_TRACE")); + OpenTelemetryLayer::new(tracer).with_filter(build_filter_from_env_var("RUST_TRACE")); let otel_metric_layer = MetricsLayer::new(providers.meter_provider.clone()) - .with_filter(env_filter("RUST_TRACE")); + .with_filter(build_filter_from_env_var("RUST_TRACE")); ( Some(otel_trace_layer), @@ -31,6 +41,7 @@ pub fn init_tracing_subscriber(otel_providers: &Option) { }; let registry = registry + .with(reload_layer) .with(otel_metric_layer) .with(otel_trace_layer); @@ -61,28 +72,50 @@ pub fn init_tracing_subscriber(otel_providers: &Option) { .with_location(std::env::var("RUST_LOG_LOCATION").map_or(false, |x| x == "1")) .with_module_path(std::env::var("RUST_LOG_MODULE_PATH").map_or(false, |x| x == "1")) .with_ansi_color(std::env::var("RUST_LOG_ANSI_COLOR").map_or(false, |x| x == "1")) - .layer() - .with_filter(env_filter("RUST_LOG")), + .layer(), ) .init() } -fn env_filter(env_var: &str) -> EnvFilter { - // Create env filter +/// Build an EnvFilter from a filter specification string +fn build_filter_from_spec(filter_spec: &str) -> anyhow::Result { + // Create env filter with defaults let mut env_filter = EnvFilter::default() // Default filter - .add_directive("info".parse().unwrap()) + .add_directive("info".parse()?) // Disable verbose logs - .add_directive("tokio_cron_scheduler=warn".parse().unwrap()) - .add_directive("tokio=warn".parse().unwrap()) - .add_directive("hyper=warn".parse().unwrap()) - .add_directive("h2=warn".parse().unwrap()); - - if let Ok(filter) = std::env::var(env_var) { - for s in filter.split(',').filter(|x| !x.is_empty()) { - env_filter = env_filter.add_directive(s.parse().expect("invalid env filter")); - } + .add_directive("tokio_cron_scheduler=warn".parse()?) + .add_directive("tokio=warn".parse()?) + .add_directive("hyper=warn".parse()?) + .add_directive("h2=warn".parse()?); + + // Add user-provided directives + for s in filter_spec.split(',').filter(|x| !x.is_empty()) { + env_filter = env_filter.add_directive(s.parse()?); } - env_filter + Ok(env_filter) +} + +/// Build an EnvFilter by reading from an environment variable +fn build_filter_from_env_var(env_var_name: &str) -> EnvFilter { + let filter_spec = std::env::var(env_var_name).unwrap_or_default(); + build_filter_from_spec(&filter_spec).expect("invalid env filter") +} + +/// Reload the log filter with a new specification +pub fn reload_log_filter(filter_spec: &str) -> anyhow::Result<()> { + let handle = RELOAD_HANDLE + .get() + .ok_or_else(|| anyhow::anyhow!("reload handle not initialized"))?; + + // Build the new filter + let env_filter = build_filter_from_spec(filter_spec)?; + + // Reload the filter + handle.reload(env_filter)?; + + tracing::info!(?filter_spec, "reloaded log filter"); + + Ok(()) } diff --git a/packages/core/api-peer/Cargo.toml b/packages/core/api-peer/Cargo.toml index 289585fbee..8b95d7e98a 100644 --- a/packages/core/api-peer/Cargo.toml +++ b/packages/core/api-peer/Cargo.toml @@ -27,5 +27,6 @@ tokio.workspace = true tracing.workspace = true namespace.workspace = true pegboard.workspace = true +universalpubsub.workspace = true uuid.workspace = true utoipa.workspace = true diff --git a/packages/core/api-peer/src/internal.rs b/packages/core/api-peer/src/internal.rs index 2ebfa04073..679c8a98c1 100644 --- a/packages/core/api-peer/src/internal.rs +++ b/packages/core/api-peer/src/internal.rs @@ -2,6 +2,7 @@ use anyhow::Result; use gas::prelude::*; use rivet_api_builder::ApiCtx; use serde::{Deserialize, Serialize}; +use universalpubsub::PublishOpts; #[derive(Serialize, Deserialize)] pub struct CachePurgeRequest { @@ -44,3 +45,38 @@ pub async fn bump_serverless_autoscaler( Ok(BumpServerlessAutoscalerResponse {}) } + +#[derive(Serialize, Deserialize)] +pub struct SetTracingConfigRequest { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub filter: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub sampler_ratio: Option>, +} + +#[derive(Serialize)] +pub struct SetTracingConfigResponse {} + +#[tracing::instrument(skip_all)] +pub async fn set_tracing_config( + ctx: ApiCtx, + _path: (), + _query: (), + body: SetTracingConfigRequest, +) -> Result { + // Broadcast message to all services via UPS + let subject = "rivet.debug.tracing.config"; + let message = serde_json::to_vec(&body)?; + + ctx.ups()? + .publish(subject, &message, PublishOpts::broadcast()) + .await?; + + tracing::info!( + filter = ?body.filter, + sampler_ratio = ?body.sampler_ratio, + "broadcasted tracing config update" + ); + + Ok(SetTracingConfigResponse {}) +} diff --git a/packages/core/api-peer/src/router.rs b/packages/core/api-peer/src/router.rs index 06c02aa7d1..6816732b9f 100644 --- a/packages/core/api-peer/src/router.rs +++ b/packages/core/api-peer/src/router.rs @@ -35,6 +35,8 @@ pub async fn router( "/bump-serverless-autoscaler", post(internal::bump_serverless_autoscaler), ) + // MARK: Debug + .route("/debug/tracing/config", put(internal::set_tracing_config)) }) .await } diff --git a/packages/infra/engine/Cargo.toml b/packages/infra/engine/Cargo.toml index ec65521d74..7c464e92a5 100644 --- a/packages/infra/engine/Cargo.toml +++ b/packages/infra/engine/Cargo.toml @@ -27,6 +27,7 @@ rivet-bootstrap.workspace = true rivet-cache.workspace = true rivet-config.workspace = true rivet-guard.workspace = true +rivet-tracing-reconfigure.workspace = true rivet-logs.workspace = true rivet-pools.workspace = true rivet-runtime.workspace = true diff --git a/packages/infra/engine/src/commands/mod.rs b/packages/infra/engine/src/commands/mod.rs index 911796ba8e..e89d509c3d 100644 --- a/packages/infra/engine/src/commands/mod.rs +++ b/packages/infra/engine/src/commands/mod.rs @@ -1,5 +1,6 @@ pub mod config; pub mod db; pub mod start; +pub mod tracing; pub mod udb; pub mod wf; diff --git a/packages/infra/engine/src/commands/tracing.rs b/packages/infra/engine/src/commands/tracing.rs new file mode 100644 index 0000000000..fee1cdde51 --- /dev/null +++ b/packages/infra/engine/src/commands/tracing.rs @@ -0,0 +1,86 @@ +use anyhow::*; +use clap::Parser; +use serde::{Deserialize, Serialize}; + +#[derive(Parser)] +pub enum SubCommand { + /// Configure tracing settings (log filter and sampler ratio) + Config { + /// Log filter (e.g., "debug", "info", "rivet_api_peer=trace") + /// Set to null to reset to defaults + #[clap(short, long)] + filter: Option, + + /// OpenTelemetry sampler ratio (0.0-1.0) + /// Set to null to reset to default + #[clap(short, long)] + sampler_ratio: Option, + + /// API peer endpoint + #[clap(long, default_value = "http://localhost:6421")] + endpoint: String, + }, +} + +#[derive(Serialize, Deserialize)] +struct SetTracingConfigRequest { + #[serde(skip_serializing_if = "Option::is_none")] + pub filter: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub sampler_ratio: Option>, +} + +impl SubCommand { + pub async fn execute(self, _config: rivet_config::Config) -> Result<()> { + match self { + Self::Config { + filter, + sampler_ratio, + endpoint, + } => { + // Build request body + let request = SetTracingConfigRequest { + filter: filter.map(|f| if f.is_empty() { None } else { Some(f) }), + sampler_ratio: sampler_ratio.map(Some), + }; + + // Send HTTP request + let client = rivet_pools::reqwest::client().await?; + let url = format!("{}/debug/tracing/config", endpoint); + + let response = client + .put(&url) + .json(&request) + .send() + .await + .context("failed to send request")?; + + if response.status().is_success() { + println!("Tracing configuration updated successfully"); + + if let Some(Some(f)) = &request.filter { + println!(" Filter: {}", f); + } else if let Some(None) = &request.filter { + println!(" Filter: reset to default"); + } + + if let Some(Some(r)) = request.sampler_ratio { + println!(" Sampler ratio: {}", r); + } else if let Some(None) = request.sampler_ratio { + println!(" Sampler ratio: reset to default (0.001)"); + } + } else { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + bail!( + "Failed to update tracing configuration: {} - {}", + status, + body + ); + } + + Ok(()) + } + } + } +} diff --git a/packages/infra/engine/src/lib.rs b/packages/infra/engine/src/lib.rs index 0a8a1ab685..21423c2d70 100644 --- a/packages/infra/engine/src/lib.rs +++ b/packages/infra/engine/src/lib.rs @@ -28,6 +28,11 @@ pub enum SubCommand { #[clap(subcommand)] command: config::SubCommand, }, + /// Manage tracing configuration + Tracing { + #[clap(subcommand)] + command: tracing::SubCommand, + }, /// Allows inspection of UDB data Udb(udb::Opts), } @@ -39,6 +44,7 @@ impl SubCommand { SubCommand::Database { command } => command.execute(config).await, SubCommand::Workflow { command } => command.execute(config).await, SubCommand::Config { command } => command.execute(config).await, + SubCommand::Tracing { command } => command.execute(config).await, SubCommand::Udb(opts) => opts.execute(config).await, } } diff --git a/packages/infra/engine/src/run_config.rs b/packages/infra/engine/src/run_config.rs index a0e90767f5..e1253e7e95 100644 --- a/packages/infra/engine/src/run_config.rs +++ b/packages/infra/engine/src/run_config.rs @@ -22,6 +22,9 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { ServiceKind::Standalone, |config, pools| Box::pin(pegboard_serverless::start(config, pools)), ), + Service::new("tracing_reconfigure", ServiceKind::Standalone, |config, pools| { + Box::pin(rivet_tracing_reconfigure::start(config, pools)) + }), ]; Ok(RunConfigData { services }) diff --git a/packages/services/tracing-reconfigure/Cargo.toml b/packages/services/tracing-reconfigure/Cargo.toml new file mode 100644 index 0000000000..d1995ba663 --- /dev/null +++ b/packages/services/tracing-reconfigure/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "rivet-tracing-reconfigure" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow.workspace = true +futures-util.workspace = true +gas.workspace = true +rivet-config.workspace = true +rivet-metrics.workspace = true +rivet-pools.workspace = true +rivet-runtime.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio.workspace = true +tracing.workspace = true +universalpubsub.workspace = true diff --git a/packages/services/tracing-reconfigure/src/lib.rs b/packages/services/tracing-reconfigure/src/lib.rs new file mode 100644 index 0000000000..9151db195b --- /dev/null +++ b/packages/services/tracing-reconfigure/src/lib.rs @@ -0,0 +1,82 @@ +use anyhow::Result; +use gas::prelude::*; +use serde::{Deserialize, Serialize}; +use universalpubsub::NextOutput; + +#[derive(Serialize, Deserialize)] +pub struct SetTracingConfigMessage { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub filter: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub sampler_ratio: Option>, +} + +#[tracing::instrument(skip_all)] +pub async fn start(_config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> { + tracing::info!("starting tracing reconfigure subscriber service"); + + // Subscribe to tracing config updates + let ups = pools.ups()?; + let subject = "rivet.debug.tracing.config"; + let mut sub = ups.subscribe(subject).await?; + + tracing::info!(subject = ?subject, "subscribed to tracing config updates"); + + // Process incoming messages + while let Ok(NextOutput::Message(msg)) = sub.next().await { + match serde_json::from_slice::(&msg.payload) { + Ok(update_msg) => { + tracing::info!( + filter = ?update_msg.filter, + sampler_ratio = ?update_msg.sampler_ratio, + "received tracing config update" + ); + + // Apply the new log filter if provided + match &update_msg.filter { + Some(Some(filter)) => { + // Set to specific value + if let Err(err) = rivet_runtime::reload_log_filter(filter) { + tracing::error!(?err, "failed to reload log filter"); + } + } + Some(None) => { + // Reset to default (empty string) + if let Err(err) = rivet_runtime::reload_log_filter("") { + tracing::error!(?err, "failed to reload log filter to default"); + } + } + None => { + // Not provided, no change + } + } + + // Apply the new sampler ratio if provided + match update_msg.sampler_ratio { + Some(Some(ratio)) => { + // Set to specific value + if let Err(err) = rivet_metrics::set_sampler_ratio(ratio) { + tracing::error!(?err, "failed to reload sampler ratio"); + } + } + Some(None) => { + // Reset to default (0.001) + if let Err(err) = rivet_metrics::set_sampler_ratio(0.001) { + tracing::error!(?err, "failed to reload sampler ratio to default"); + } + } + None => { + // Not provided, no change + } + } + } + Err(err) => { + tracing::error!(?err, "failed to deserialize tracing config update message"); + } + } + } + + tracing::warn!("tracing reconfigure subscriber service stopped"); + + Ok(()) +}