diff --git a/packages/common/metrics/src/providers.rs b/packages/common/metrics/src/providers.rs index 15dec7201f..04e0747eb7 100644 --- a/packages/common/metrics/src/providers.rs +++ b/packages/common/metrics/src/providers.rs @@ -1,9 +1,8 @@ // Based off of https://github.com/tokio-rs/tracing-opentelemetry/blob/v0.1.x/examples/opentelemetry-otlp.rs // Based off of https://github.com/tokio-rs/tracing-opentelemetry/blob/v0.1.x/examples/opentelemetry-otlp.rs -use std::sync::{Arc, RwLock, OnceLock}; -use opentelemetry::{KeyValue, global}; use opentelemetry::trace::{SamplingResult, SpanKind}; +use opentelemetry::{KeyValue, global}; use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{ Resource, @@ -12,6 +11,7 @@ use opentelemetry_sdk::{ trace::{RandomIdGenerator, Sampler, SdkTracerProvider}, }; use opentelemetry_semantic_conventions::{SCHEMA_URL, attribute::SERVICE_VERSION}; +use std::sync::{Arc, OnceLock, RwLock}; /// Dynamic sampler that can be updated at runtime #[derive(Clone, Debug)] diff --git a/packages/common/runtime/src/traces.rs b/packages/common/runtime/src/traces.rs index cd885392ff..9d10924bef 100644 --- a/packages/common/runtime/src/traces.rs +++ b/packages/common/runtime/src/traces.rs @@ -26,8 +26,8 @@ pub fn init_tracing_subscriber(otel_providers: &Option) { Some(providers) => { let tracer = providers.tracer_provider.tracer("tracing-otel-subscriber"); - let otel_trace_layer = - OpenTelemetryLayer::new(tracer).with_filter(build_filter_from_env_var("RUST_TRACE")); + let otel_trace_layer = OpenTelemetryLayer::new(tracer) + .with_filter(build_filter_from_env_var("RUST_TRACE")); let otel_metric_layer = MetricsLayer::new(providers.meter_provider.clone()) .with_filter(build_filter_from_env_var("RUST_TRACE")); diff --git a/packages/core/api-peer/src/internal.rs b/packages/core/api-peer/src/internal.rs index 679c8a98c1..14ad07d884 100644 --- a/packages/core/api-peer/src/internal.rs +++ b/packages/core/api-peer/src/internal.rs @@ -47,6 +47,7 @@ pub async fn bump_serverless_autoscaler( } #[derive(Serialize, Deserialize)] +#[serde(deny_unknown_fields)] pub struct SetTracingConfigRequest { #[serde(default, skip_serializing_if = "Option::is_none")] pub filter: Option>, @@ -55,6 +56,7 @@ pub struct SetTracingConfigRequest { } #[derive(Serialize)] +#[serde(deny_unknown_fields)] pub struct SetTracingConfigResponse {} #[tracing::instrument(skip_all)] diff --git a/packages/core/api-public/src/actors/get_or_create.rs b/packages/core/api-public/src/actors/get_or_create.rs index 7b113f564d..8f0403eef3 100644 --- a/packages/core/api-public/src/actors/get_or_create.rs +++ b/packages/core/api-public/src/actors/get_or_create.rs @@ -4,8 +4,8 @@ use axum::{ response::{IntoResponse, Response}, }; use rivet_api_builder::{ - extract::{Extension, Json, Query}, ApiError, + extract::{Extension, Json, Query}, }; use rivet_types::actors::CrashPolicy; use rivet_util::Id; diff --git a/packages/core/api-public/src/actors/utils.rs b/packages/core/api-public/src/actors/utils.rs index b6186fe5c7..3fa74e13f7 100644 --- a/packages/core/api-public/src/actors/utils.rs +++ b/packages/core/api-public/src/actors/utils.rs @@ -10,11 +10,7 @@ use std::collections::HashMap; /// Helper function to fetch an actor by ID, automatically routing to the correct datacenter /// based on the actor ID's label. #[tracing::instrument(skip_all)] -pub async fn fetch_actor_by_id( - ctx: &ApiCtx, - actor_id: Id, - namespace: String, -) -> Result { +pub async fn fetch_actor_by_id(ctx: &ApiCtx, actor_id: Id, namespace: String) -> Result { let list_query = rivet_api_types::actors::list::ListQuery { namespace, actor_ids: Some(actor_id.to_string()), diff --git a/packages/core/api-public/src/namespaces.rs b/packages/core/api-public/src/namespaces.rs index 1afa48b336..385ed54658 100644 --- a/packages/core/api-public/src/namespaces.rs +++ b/packages/core/api-public/src/namespaces.rs @@ -21,10 +21,7 @@ use crate::ctx::ApiCtx; security(("bearer_auth" = [])), )] #[tracing::instrument(skip_all)] -pub async fn list( - Extension(ctx): Extension, - Query(query): Query, -) -> Response { +pub async fn list(Extension(ctx): Extension, Query(query): Query) -> Response { match list_inner(ctx, query).await { Ok(response) => Json(response).into_response(), Err(err) => ApiError::from(err).into_response(), diff --git a/packages/core/api-public/src/runners.rs b/packages/core/api-public/src/runners.rs index 2b19109893..9d752fa3ec 100644 --- a/packages/core/api-public/src/runners.rs +++ b/packages/core/api-public/src/runners.rs @@ -22,10 +22,7 @@ use crate::ctx::ApiCtx; security(("bearer_auth" = [])), )] #[tracing::instrument(skip_all)] -pub async fn list( - Extension(ctx): Extension, - Query(query): Query, -) -> Response { +pub async fn list(Extension(ctx): Extension, Query(query): Query) -> Response { match list_inner(ctx, query).await { Ok(response) => Json(response).into_response(), Err(err) => ApiError::from(err).into_response(), diff --git a/packages/infra/engine/src/run_config.rs b/packages/infra/engine/src/run_config.rs index e1253e7e95..a149e9a257 100644 --- a/packages/infra/engine/src/run_config.rs +++ b/packages/infra/engine/src/run_config.rs @@ -22,9 +22,11 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result { ServiceKind::Standalone, |config, pools| Box::pin(pegboard_serverless::start(config, pools)), ), - Service::new("tracing_reconfigure", ServiceKind::Standalone, |config, pools| { - Box::pin(rivet_tracing_reconfigure::start(config, pools)) - }), + Service::new( + "tracing_reconfigure", + ServiceKind::Standalone, + |config, pools| Box::pin(rivet_tracing_reconfigure::start(config, pools)), + ), ]; Ok(RunConfigData { services }) diff --git a/packages/services/epoxy/src/http_client.rs b/packages/services/epoxy/src/http_client.rs index 5d94b230e1..779670c57c 100644 --- a/packages/services/epoxy/src/http_client.rs +++ b/packages/services/epoxy/src/http_client.rs @@ -5,6 +5,7 @@ use epoxy_protocol::{ versioned, }; use futures_util::{StreamExt, stream::FuturesUnordered}; +use gas::prelude::*; use rivet_api_builder::ApiCtx; use std::future::Future; use vbare::OwnedVersionedData; @@ -24,6 +25,7 @@ fn find_replica_address( .map(|r| r.api_peer_url.clone()) } +#[tracing::instrument(skip_all, fields(%from_replica_id, ?replica_ids, ?quorum_type))] pub async fn fanout_to_replicas( from_replica_id: ReplicaId, replica_ids: &[ReplicaId], @@ -93,6 +95,7 @@ where Ok(successful_responses) } +#[tracing::instrument(skip_all)] pub async fn send_message( ctx: &ApiCtx, config: &protocol::ClusterConfig, @@ -102,6 +105,7 @@ pub async fn send_message( send_message_to_address(ctx, replica_url, request).await } +#[tracing::instrument(skip_all, fields(%replica_url))] pub async fn send_message_to_address( ctx: &ApiCtx, replica_url: String, @@ -116,8 +120,7 @@ pub async fn send_message_to_address( "sending message to replica directly" ); - return crate::replica::message_request::message_request(&ctx, from_replica_id, request) - .await; + return crate::replica::message_request::message_request(&ctx, request).await; } let mut replica_url = url::Url::parse(&replica_url)?; @@ -139,6 +142,7 @@ pub async fn send_message_to_address( .post(replica_url.to_string()) .body(request.serialize()?) .send() + .custom_instrument(tracing::info_span!("http_request")) .await; let response = match response_result { diff --git a/packages/services/epoxy/src/http_routes.rs b/packages/services/epoxy/src/http_routes.rs index 50777228b3..79ba36d00d 100644 --- a/packages/services/epoxy/src/http_routes.rs +++ b/packages/services/epoxy/src/http_routes.rs @@ -28,8 +28,7 @@ pub async fn message(ctx: ApiCtx, path: VersionedPath, _query: (), body: Bytes) ); // Process message directly using ops - let response = - crate::replica::message_request::message_request(&ctx, current_replica_id, request).await?; + let response = crate::replica::message_request::message_request(&ctx, request).await?; versioned::Response::latest(response).serialize(path.version) } diff --git a/packages/services/epoxy/src/ops/kv/get_optimistic.rs b/packages/services/epoxy/src/ops/kv/get_optimistic.rs index a47236ac92..a05a84dd89 100644 --- a/packages/services/epoxy/src/ops/kv/get_optimistic.rs +++ b/packages/services/epoxy/src/ops/kv/get_optimistic.rs @@ -86,9 +86,7 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul // Request fanout to other datacenters, return first datacenter with any non-none value let config = ctx - .op(crate::ops::read_cluster_config::Input { - replica_id: input.replica_id, - }) + .op(crate::ops::read_cluster_config::Input {}) .await? .config; diff --git a/packages/services/epoxy/src/ops/read_cluster_config.rs b/packages/services/epoxy/src/ops/read_cluster_config.rs index 3440782a5c..62a23c2092 100644 --- a/packages/services/epoxy/src/ops/read_cluster_config.rs +++ b/packages/services/epoxy/src/ops/read_cluster_config.rs @@ -5,9 +5,7 @@ use gas::prelude::*; use crate::utils; #[derive(Debug)] -pub struct Input { - pub replica_id: ReplicaId, -} +pub struct Input {} #[derive(Debug)] pub struct Output { @@ -18,10 +16,7 @@ pub struct Output { pub async fn epoxy_read_cluster_config(ctx: &OperationCtx, input: &Input) -> Result { let config = ctx .udb()? - .run(|tx| { - let replica_id = input.replica_id; - async move { utils::read_config(&tx, replica_id).await } - }) + .run(|tx| async move { utils::read_config(&tx, ctx.config().epoxy_replica_id()).await }) .custom_instrument(tracing::info_span!("read_cluster_config_tx")) .await?; diff --git a/packages/services/epoxy/src/replica/message_request.rs b/packages/services/epoxy/src/replica/message_request.rs index dd63e9d4b3..4e41d1abf1 100644 --- a/packages/services/epoxy/src/replica/message_request.rs +++ b/packages/services/epoxy/src/replica/message_request.rs @@ -8,9 +8,10 @@ use crate::{ops, replica}; #[tracing::instrument(skip_all)] pub async fn message_request( ctx: &ApiCtx, - replica_id: ReplicaId, request: protocol::Request, ) -> Result { + let current_replica_id = ctx.config().epoxy_replica_id(); + let kind = match request.kind { protocol::RequestKind::UpdateConfigRequest(req) => { tracing::info!( @@ -23,7 +24,7 @@ pub async fn message_request( ctx.udb()? .run(|tx| { let req = req.clone(); - async move { replica::update_config::update_config(&*tx, replica_id, req) } + async move { replica::update_config::update_config(&*tx, current_replica_id, req) } }) .custom_instrument(tracing::info_span!("update_config_tx")) .await?; @@ -35,7 +36,7 @@ pub async fn message_request( .udb()? .run(|tx| { let req = req.clone(); - async move { replica::messages::pre_accept(&*tx, replica_id, req).await } + async move { replica::messages::pre_accept(&*tx, current_replica_id, req).await } }) .custom_instrument(tracing::info_span!("pre_accept_tx")) .await?; @@ -46,7 +47,7 @@ pub async fn message_request( .udb()? .run(|tx| { let req = req.clone(); - async move { replica::messages::accept(&*tx, replica_id, req).await } + async move { replica::messages::accept(&*tx, current_replica_id, req).await } }) .custom_instrument(tracing::info_span!("accept_tx")) .await?; @@ -58,7 +59,7 @@ pub async fn message_request( .run(|tx| { let req = req.clone(); async move { - replica::messages::commit(&*tx, replica_id, req, true).await?; + replica::messages::commit(&*tx, current_replica_id, req, true).await?; Result::Ok(()) } }) @@ -72,7 +73,7 @@ pub async fn message_request( .udb()? .run(|tx| { let req = req.clone(); - async move { replica::messages::prepare(&*tx, replica_id, req).await } + async move { replica::messages::prepare(&*tx, current_replica_id, req).await } }) .custom_instrument(tracing::info_span!("prepare_tx")) .await?; @@ -84,7 +85,9 @@ pub async fn message_request( .udb()? .run(|tx| { let req = req.clone(); - async move { replica::messages::download_instances(&*tx, replica_id, req).await } + async move { + replica::messages::download_instances(&*tx, current_replica_id, req).await + } }) .custom_instrument(tracing::info_span!("download_instances_tx")) .await?; @@ -101,9 +104,9 @@ pub async fn message_request( protocol::RequestKind::CoordinatorUpdateReplicaStatusRequest(req) => { // Send signal to coordinator workflow tracing::info!( - ?replica_id, - update_replica_id = ?req.replica_id, - update_status = ?req.status, + ?current_replica_id, + update_replica_id=?req.replica_id, + update_status=?req.status, "received coordinator update replica status request" ); @@ -113,7 +116,7 @@ pub async fn message_request( }) .bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING() .to_workflow::() - .tag("replica", replica_id) + .tag("replica", current_replica_id) .send() .await?; @@ -121,17 +124,14 @@ pub async fn message_request( } protocol::RequestKind::BeginLearningRequest(req) => { // Send signal to replica workflow - tracing::info!( - replica_id = ?replica_id, - "received begin learning request" - ); + tracing::info!(?current_replica_id, "received begin learning request"); ctx.signal(crate::workflows::replica::BeginLearning { config: req.config.clone().into(), }) .bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING() .to_workflow::() - .tag("replica", replica_id) + .tag("replica", current_replica_id) .send() .await?; @@ -141,7 +141,7 @@ pub async fn message_request( // Handle KV get request let result = ctx .op(ops::kv::get_local::Input { - replica_id, + replica_id: current_replica_id, key: req.key.clone(), }) .await?; diff --git a/scripts/decode_audit_entry.js b/scripts/decode_audit_entry.js index 9afa6123db..246e904707 100644 --- a/scripts/decode_audit_entry.js +++ b/scripts/decode_audit_entry.js @@ -13,7 +13,7 @@ const buffer = Buffer.from(hexString, 'hex'); const version = buffer.readUInt16LE(0); const dataBuffer = buffer.slice(2); -console.log('Version:', version); +console.log('Embedded VBARE Version:', version); class BareDecoder { constructor(buffer) { @@ -60,9 +60,17 @@ class BareDecoder { return readFn.call(this); } - readUnion(variants) { + readUnion(variants, tagNames) { const tag = this.readUint(); - return { tag, value: variants[tag].call(this) }; + const value = variants[tag].call(this); + const result = { + tag: tagNames ? tagNames[tag] : tag + }; + // Only include value if it's not void (undefined/null or the string representation) + if (value !== undefined && value !== null && value !== 'Any') { + result.value = value; + } + return result; } } @@ -70,7 +78,6 @@ class BareDecoder { const decoder = new BareDecoder(dataBuffer); console.log('Decoding audit entry from hex:', hexString); -console.log('Buffer length:', dataBuffer.length, 'bytes\n'); // Data struct const data = {}; @@ -83,7 +90,7 @@ const namespace = decoder.readUnion([ () => 'Any', // 0: Any (void) () => decoder.readData(), // 1: Id () => decoder.readData().toString('utf8') // 2: Name -]); +], ['Any', 'Id', 'Name']); data.request.namespace = namespace; // resource: ResourceKind enum @@ -94,7 +101,7 @@ data.request.resource = resourceKinds[decoder.readEnum()]; const target = decoder.readUnion([ () => 'Any', // 0: Any (void) () => decoder.readData() // 1: Id -]); +], ['Any', 'Id']); data.request.target = target; // operation: OperationKind enum