Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/common/metrics/src/providers.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// Based off of https://github.com/tokio-rs/tracing-opentelemetry/blob/v0.1.x/examples/opentelemetry-otlp.rs
// Based off of https://github.com/tokio-rs/tracing-opentelemetry/blob/v0.1.x/examples/opentelemetry-otlp.rs

use std::sync::{Arc, RwLock, OnceLock};
use opentelemetry::{KeyValue, global};
use opentelemetry::trace::{SamplingResult, SpanKind};
use opentelemetry::{KeyValue, global};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
Resource,
Expand All @@ -12,6 +11,7 @@ use opentelemetry_sdk::{
trace::{RandomIdGenerator, Sampler, SdkTracerProvider},
};
use opentelemetry_semantic_conventions::{SCHEMA_URL, attribute::SERVICE_VERSION};
use std::sync::{Arc, OnceLock, RwLock};

/// Dynamic sampler that can be updated at runtime
#[derive(Clone, Debug)]
Expand Down
4 changes: 2 additions & 2 deletions packages/common/runtime/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub fn init_tracing_subscriber(otel_providers: &Option<OtelProviderGuard>) {
Some(providers) => {
let tracer = providers.tracer_provider.tracer("tracing-otel-subscriber");

let otel_trace_layer =
OpenTelemetryLayer::new(tracer).with_filter(build_filter_from_env_var("RUST_TRACE"));
let otel_trace_layer = OpenTelemetryLayer::new(tracer)
.with_filter(build_filter_from_env_var("RUST_TRACE"));

let otel_metric_layer = MetricsLayer::new(providers.meter_provider.clone())
.with_filter(build_filter_from_env_var("RUST_TRACE"));
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-peer/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub async fn bump_serverless_autoscaler(
}

#[derive(Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct SetTracingConfigRequest {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub filter: Option<Option<String>>,
Expand All @@ -55,6 +56,7 @@ pub struct SetTracingConfigRequest {
}

#[derive(Serialize)]
#[serde(deny_unknown_fields)]
pub struct SetTracingConfigResponse {}

#[tracing::instrument(skip_all)]
Expand Down
2 changes: 1 addition & 1 deletion packages/core/api-public/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use axum::{
response::{IntoResponse, Response},
};
use rivet_api_builder::{
extract::{Extension, Json, Query},
ApiError,
extract::{Extension, Json, Query},
};
use rivet_types::actors::CrashPolicy;
use rivet_util::Id;
Expand Down
6 changes: 1 addition & 5 deletions packages/core/api-public/src/actors/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ use std::collections::HashMap;
/// Helper function to fetch an actor by ID, automatically routing to the correct datacenter
/// based on the actor ID's label.
#[tracing::instrument(skip_all)]
pub async fn fetch_actor_by_id(
ctx: &ApiCtx,
actor_id: Id,
namespace: String,
) -> Result<Actor> {
pub async fn fetch_actor_by_id(ctx: &ApiCtx, actor_id: Id, namespace: String) -> Result<Actor> {
let list_query = rivet_api_types::actors::list::ListQuery {
namespace,
actor_ids: Some(actor_id.to_string()),
Expand Down
5 changes: 1 addition & 4 deletions packages/core/api-public/src/namespaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn list(
Extension(ctx): Extension<ApiCtx>,
Query(query): Query<ListQuery>,
) -> Response {
pub async fn list(Extension(ctx): Extension<ApiCtx>, Query(query): Query<ListQuery>) -> Response {
match list_inner(ctx, query).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
Expand Down Expand Up @@ -69,7 +66,7 @@
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
}

Check warning on line 69 in packages/core/api-public/src/namespaces.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/namespaces.rs

#[tracing::instrument(skip_all)]
async fn create_inner(
Expand Down
5 changes: 1 addition & 4 deletions packages/core/api-public/src/runners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn list(
Extension(ctx): Extension<ApiCtx>,
Query(query): Query<ListQuery>,
) -> Response {
pub async fn list(Extension(ctx): Extension<ApiCtx>, Query(query): Query<ListQuery>) -> Response {
match list_inner(ctx, query).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
Expand Down Expand Up @@ -102,7 +99,7 @@
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
}

Check warning on line 102 in packages/core/api-public/src/runners.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/runners.rs

#[tracing::instrument(skip_all)]
async fn list_names_inner(
Expand Down
8 changes: 5 additions & 3 deletions packages/infra/engine/src/run_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
ServiceKind::Standalone,
|config, pools| Box::pin(pegboard_serverless::start(config, pools)),
),
Service::new("tracing_reconfigure", ServiceKind::Standalone, |config, pools| {
Box::pin(rivet_tracing_reconfigure::start(config, pools))
}),
Service::new(
"tracing_reconfigure",
ServiceKind::Standalone,
|config, pools| Box::pin(rivet_tracing_reconfigure::start(config, pools)),
),
];

Ok(RunConfigData { services })
Expand Down
8 changes: 6 additions & 2 deletions packages/services/epoxy/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use epoxy_protocol::{
versioned,
};
use futures_util::{StreamExt, stream::FuturesUnordered};
use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use std::future::Future;
use vbare::OwnedVersionedData;
Expand All @@ -24,6 +25,7 @@ fn find_replica_address(
.map(|r| r.api_peer_url.clone())
}

#[tracing::instrument(skip_all, fields(%from_replica_id, ?replica_ids, ?quorum_type))]
pub async fn fanout_to_replicas<F, Fut, T>(
from_replica_id: ReplicaId,
replica_ids: &[ReplicaId],
Expand Down Expand Up @@ -93,6 +95,7 @@ where
Ok(successful_responses)
}

#[tracing::instrument(skip_all)]
pub async fn send_message(
ctx: &ApiCtx,
config: &protocol::ClusterConfig,
Expand All @@ -102,6 +105,7 @@ pub async fn send_message(
send_message_to_address(ctx, replica_url, request).await
}

#[tracing::instrument(skip_all, fields(%replica_url))]
pub async fn send_message_to_address(
ctx: &ApiCtx,
replica_url: String,
Expand All @@ -116,8 +120,7 @@ pub async fn send_message_to_address(
"sending message to replica directly"
);

return crate::replica::message_request::message_request(&ctx, from_replica_id, request)
.await;
return crate::replica::message_request::message_request(&ctx, request).await;
}

let mut replica_url = url::Url::parse(&replica_url)?;
Expand All @@ -139,6 +142,7 @@ pub async fn send_message_to_address(
.post(replica_url.to_string())
.body(request.serialize()?)
.send()
.custom_instrument(tracing::info_span!("http_request"))
.await;

let response = match response_result {
Expand Down
3 changes: 1 addition & 2 deletions packages/services/epoxy/src/http_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ pub async fn message(ctx: ApiCtx, path: VersionedPath, _query: (), body: Bytes)
);

// Process message directly using ops
let response =
crate::replica::message_request::message_request(&ctx, current_replica_id, request).await?;
let response = crate::replica::message_request::message_request(&ctx, request).await?;

versioned::Response::latest(response).serialize(path.version)
}
4 changes: 1 addition & 3 deletions packages/services/epoxy/src/ops/kv/get_optimistic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ pub async fn epoxy_kv_get_optimistic(ctx: &OperationCtx, input: &Input) -> Resul

// Request fanout to other datacenters, return first datacenter with any non-none value
let config = ctx
.op(crate::ops::read_cluster_config::Input {
replica_id: input.replica_id,
})
.op(crate::ops::read_cluster_config::Input {})
.await?
.config;

Expand Down
9 changes: 2 additions & 7 deletions packages/services/epoxy/src/ops/read_cluster_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use gas::prelude::*;
use crate::utils;

#[derive(Debug)]
pub struct Input {
pub replica_id: ReplicaId,
}
pub struct Input {}

#[derive(Debug)]
pub struct Output {
Expand All @@ -18,10 +16,7 @@ pub struct Output {
pub async fn epoxy_read_cluster_config(ctx: &OperationCtx, input: &Input) -> Result<Output> {
let config = ctx
.udb()?
.run(|tx| {
let replica_id = input.replica_id;
async move { utils::read_config(&tx, replica_id).await }
})
.run(|tx| async move { utils::read_config(&tx, ctx.config().epoxy_replica_id()).await })
.custom_instrument(tracing::info_span!("read_cluster_config_tx"))
.await?;

Expand Down
34 changes: 17 additions & 17 deletions packages/services/epoxy/src/replica/message_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use crate::{ops, replica};
#[tracing::instrument(skip_all)]
pub async fn message_request(
ctx: &ApiCtx,
replica_id: ReplicaId,
request: protocol::Request,
) -> Result<protocol::Response> {
let current_replica_id = ctx.config().epoxy_replica_id();

let kind = match request.kind {
protocol::RequestKind::UpdateConfigRequest(req) => {
tracing::info!(
Expand All @@ -23,7 +24,7 @@ pub async fn message_request(
ctx.udb()?
.run(|tx| {
let req = req.clone();
async move { replica::update_config::update_config(&*tx, replica_id, req) }
async move { replica::update_config::update_config(&*tx, current_replica_id, req) }
})
.custom_instrument(tracing::info_span!("update_config_tx"))
.await?;
Expand All @@ -35,7 +36,7 @@ pub async fn message_request(
.udb()?
.run(|tx| {
let req = req.clone();
async move { replica::messages::pre_accept(&*tx, replica_id, req).await }
async move { replica::messages::pre_accept(&*tx, current_replica_id, req).await }
})
.custom_instrument(tracing::info_span!("pre_accept_tx"))
.await?;
Expand All @@ -46,7 +47,7 @@ pub async fn message_request(
.udb()?
.run(|tx| {
let req = req.clone();
async move { replica::messages::accept(&*tx, replica_id, req).await }
async move { replica::messages::accept(&*tx, current_replica_id, req).await }
})
.custom_instrument(tracing::info_span!("accept_tx"))
.await?;
Expand All @@ -58,7 +59,7 @@ pub async fn message_request(
.run(|tx| {
let req = req.clone();
async move {
replica::messages::commit(&*tx, replica_id, req, true).await?;
replica::messages::commit(&*tx, current_replica_id, req, true).await?;
Result::Ok(())
}
})
Expand All @@ -72,7 +73,7 @@ pub async fn message_request(
.udb()?
.run(|tx| {
let req = req.clone();
async move { replica::messages::prepare(&*tx, replica_id, req).await }
async move { replica::messages::prepare(&*tx, current_replica_id, req).await }
})
.custom_instrument(tracing::info_span!("prepare_tx"))
.await?;
Expand All @@ -84,7 +85,9 @@ pub async fn message_request(
.udb()?
.run(|tx| {
let req = req.clone();
async move { replica::messages::download_instances(&*tx, replica_id, req).await }
async move {
replica::messages::download_instances(&*tx, current_replica_id, req).await
}
})
.custom_instrument(tracing::info_span!("download_instances_tx"))
.await?;
Expand All @@ -101,9 +104,9 @@ pub async fn message_request(
protocol::RequestKind::CoordinatorUpdateReplicaStatusRequest(req) => {
// Send signal to coordinator workflow
tracing::info!(
?replica_id,
update_replica_id = ?req.replica_id,
update_status = ?req.status,
?current_replica_id,
update_replica_id=?req.replica_id,
update_status=?req.status,
"received coordinator update replica status request"
);

Expand All @@ -113,25 +116,22 @@ pub async fn message_request(
})
.bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING()
.to_workflow::<crate::workflows::coordinator::Workflow>()
.tag("replica", replica_id)
.tag("replica", current_replica_id)
.send()
.await?;

protocol::ResponseKind::CoordinatorUpdateReplicaStatusResponse
}
protocol::RequestKind::BeginLearningRequest(req) => {
// Send signal to replica workflow
tracing::info!(
replica_id = ?replica_id,
"received begin learning request"
);
tracing::info!(?current_replica_id, "received begin learning request");

ctx.signal(crate::workflows::replica::BeginLearning {
config: req.config.clone().into(),
})
.bypass_signal_from_workflow_I_KNOW_WHAT_IM_DOING()
.to_workflow::<crate::workflows::replica::Workflow>()
.tag("replica", replica_id)
.tag("replica", current_replica_id)
.send()
.await?;

Expand All @@ -141,7 +141,7 @@ pub async fn message_request(
// Handle KV get request
let result = ctx
.op(ops::kv::get_local::Input {
replica_id,
replica_id: current_replica_id,
key: req.key.clone(),
})
.await?;
Expand Down
19 changes: 13 additions & 6 deletions scripts/decode_audit_entry.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const buffer = Buffer.from(hexString, 'hex');
const version = buffer.readUInt16LE(0);
const dataBuffer = buffer.slice(2);

console.log('Version:', version);
console.log('Embedded VBARE Version:', version);

class BareDecoder {
constructor(buffer) {
Expand Down Expand Up @@ -60,17 +60,24 @@ class BareDecoder {
return readFn.call(this);
}

readUnion(variants) {
readUnion(variants, tagNames) {
const tag = this.readUint();
return { tag, value: variants[tag].call(this) };
const value = variants[tag].call(this);
const result = {
tag: tagNames ? tagNames[tag] : tag
};
// Only include value if it's not void (undefined/null or the string representation)
if (value !== undefined && value !== null && value !== 'Any') {
result.value = value;
}
return result;
}
}

// Decode the audit entry
const decoder = new BareDecoder(dataBuffer);

console.log('Decoding audit entry from hex:', hexString);
console.log('Buffer length:', dataBuffer.length, 'bytes\n');

// Data struct
const data = {};
Expand All @@ -83,7 +90,7 @@ const namespace = decoder.readUnion([
() => 'Any', // 0: Any (void)
() => decoder.readData(), // 1: Id
() => decoder.readData().toString('utf8') // 2: Name
]);
], ['Any', 'Id', 'Name']);
data.request.namespace = namespace;

// resource: ResourceKind enum
Expand All @@ -94,7 +101,7 @@ data.request.resource = resourceKinds[decoder.readEnum()];
const target = decoder.readUnion([
() => 'Any', // 0: Any (void)
() => decoder.readData() // 1: Id
]);
], ['Any', 'Id']);
data.request.target = target;

// operation: OperationKind enum
Expand Down
Loading