Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions out/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions packages/common/api-builder/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ pub async fn http_logging_middleware(
&[
KeyValue::new("method", method_clone.to_string()),
KeyValue::new("path", path_clone.clone()),
KeyValue::new("watch", "false"),
],
);

Expand All @@ -214,7 +213,6 @@ pub async fn http_logging_middleware(
&[
KeyValue::new("method", method_clone.to_string()),
KeyValue::new("path", path_clone.clone()),
KeyValue::new("watch", "false"),
KeyValue::new("status", status.to_string()),
KeyValue::new("error_code", error_code.clone()),
],
Expand All @@ -226,7 +224,6 @@ pub async fn http_logging_middleware(
&[
KeyValue::new("method", method_clone.to_string()),
KeyValue::new("path", path_clone.clone()),
KeyValue::new("watch", "false"),
KeyValue::new("status", status.to_string()),
KeyValue::new("error_code", error_code),
],
Expand Down
11 changes: 10 additions & 1 deletion packages/common/api-util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use anyhow::{Context, Result};
use axum::{body::Body, response::Response};

Check warning on line 2 in packages/common/api-util/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/common/api-util/src/lib.rs
use futures_util::StreamExt;
use rivet_api_builder::{ApiCtx, ErrorResponse, RawErrorResponse};
use serde::{Serialize, de::DeserializeOwned};
use serde::{de::DeserializeOwned, Serialize};
use std::future::Future;

mod errors;

pub use axum::http::{HeaderMap, Method};

/// Generic function to make raw requests to remote datacenters by label (returns axum Response)
#[tracing::instrument(skip(ctx, headers, query, body))]
pub async fn request_remote_datacenter_raw(
ctx: &ApiCtx,
dc_label: u16,
Expand All @@ -32,6 +33,8 @@
url.set_query(Some(&serde_html_form::to_string(q)?));
}

tracing::debug!(%method, %url, "sending raw request to remote datacenter");

let mut request = client.request(method, url).headers(headers);

if let Some(b) = body {
Expand All @@ -48,6 +51,7 @@
}

/// Generic function to make requests to a specific datacenter
#[tracing::instrument(skip(config, headers, query, body))]
pub async fn request_remote_datacenter<T>(
config: &rivet_config::Config,
dc_label: u16,
Expand All @@ -72,6 +76,8 @@
url.set_query(Some(&serde_html_form::to_string(q)?));
}

tracing::debug!(%method, %url, "sending request to remote datacenter");

let mut request = client.request(method, url).headers(headers);

if let Some(b) = body {
Expand All @@ -89,6 +95,7 @@

/// Generic function to fanout requests to all datacenters and aggregate results
/// Returns aggregated results and errors only if all requests fail
#[tracing::instrument(skip(ctx, headers, query, local_handler, aggregator))]
pub async fn fanout_to_datacenters<I, Q, F, Fut, A, R>(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down Expand Up @@ -164,6 +171,7 @@
Ok(aggregated)
}

#[tracing::instrument(skip_all)]
pub async fn reqwest_to_axum_response(reqwest_response: reqwest::Response) -> Result<Response> {
let status = reqwest_response.status();
let headers = reqwest_response.headers().clone();
Expand All @@ -178,6 +186,7 @@
Ok(response)
}

#[tracing::instrument(skip_all)]
pub async fn parse_response<T: DeserializeOwned>(reqwest_response: reqwest::Response) -> Result<T> {
let status = reqwest_response.status();
let response_text = reqwest_response.text().await?;
Expand Down
11 changes: 3 additions & 8 deletions packages/common/runtime/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use console_subscriber;
use opentelemetry::trace::TracerProvider;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;

Check warning on line 5 in packages/common/runtime/src/traces.rs

View workflow job for this annotation

GitHub Actions / Test

unused import: `opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge`

Check failure on line 5 in packages/common/runtime/src/traces.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge`
use rivet_metrics::OtelProviderGuard;
use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer};
use tracing_subscriber::{EnvFilter, Layer, layer::SubscriberExt, util::SubscriberInitExt};
Expand All @@ -12,32 +12,27 @@
let registry = tracing_subscriber::registry();

// Build and apply otel layers to the registry if otel is enabled
let (otel_trace_layer, otel_metric_layer, otel_log_layer) = match otel_providers {
let (otel_trace_layer, otel_metric_layer) = match otel_providers {
Some(providers) => {
let tracer = providers.tracer_provider.tracer("tracing-otel-subscriber");

let otel_trace_layer =
OpenTelemetryLayer::new(tracer).with_filter(env_filter("RUST_TRACE"));

let otel_metric_layer = MetricsLayer::new(providers.meter_provider.clone())

Check warning on line 22 in packages/common/runtime/src/traces.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/common/runtime/src/traces.rs
.with_filter(env_filter("RUST_TRACE"));

let otel_log_layer = OpenTelemetryTracingBridge::new(&providers.logger_provider)
.with_filter(env_filter("RUST_LOG"));

(
Some(otel_trace_layer),
Some(otel_metric_layer),
Some(otel_log_layer),
)
}
None => (None, None, None),
None => (None, None),
};

Check warning on line 32 in packages/common/runtime/src/traces.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/common/runtime/src/traces.rs
let registry = registry
.with(otel_metric_layer)
.with(otel_trace_layer)
.with(otel_log_layer);
.with(otel_trace_layer);

// Check if tokio console is enabled
let enable_tokio_console = std::env::var("TOKIO_CONSOLE_ENABLE").map_or(false, |x| x == "1");
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-peer/src/actors/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use rivet_api_types::actors::create::{CreateQuery, CreateRequest, CreateResponse};

#[tracing::instrument(skip_all)]
pub async fn create(
ctx: ApiCtx,
_path: (),
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-peer/src/actors/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct DeletePath {
(status = 200, body = DeleteResponse),
),
)]
#[tracing::instrument(skip_all)]
pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result<DeleteResponse> {
// Get the actor first to verify it exists
let actors_res = ctx
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-peer/src/actors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use rivet_api_types::{actors::list::*, pagination::Pagination};
(status = 200, body = ListResponse),
),
)]
#[tracing::instrument(skip_all)]
pub async fn list(ctx: ApiCtx, _path: (), query: ListQuery) -> Result<ListResponse> {
let key = query.key;
let actor_ids = query.actor_ids.as_ref().map(|x| {
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-peer/src/actors/list_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use rivet_types::actors::ActorName;
(status = 200, body = ListNamesResponse),
),
)]
#[tracing::instrument(skip_all)]
pub async fn list_names(
ctx: ApiCtx,
_path: (),
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-peer/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct CachePurgeRequest {
#[derive(Serialize)]
pub struct CachePurgeResponse {}

#[tracing::instrument(skip_all)]
pub async fn cache_purge(
ctx: ApiCtx,
_path: (),
Expand All @@ -30,6 +31,7 @@ pub async fn cache_purge(
#[derive(Serialize)]
pub struct BumpServerlessAutoscalerResponse {}

#[tracing::instrument(skip_all)]
pub async fn bump_serverless_autoscaler(
ctx: ApiCtx,
_path: (),
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-peer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod runners;

pub use router::router as create_router;

#[tracing::instrument(skip_all)]
pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> {
let host = config.api_peer().host();
let port = config.api_peer().port();
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-peer/src/namespaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

#[tracing::instrument(skip_all)]
pub async fn list(ctx: ApiCtx, _path: (), query: ListQuery) -> Result<ListResponse> {
let namespace_ids = query.namespace_ids.as_ref().map(|x| {
x.split(',')
Expand Down Expand Up @@ -72,6 +73,7 @@ pub struct CreateResponse {
pub namespace: rivet_types::namespaces::Namespace,
}

#[tracing::instrument(skip_all)]
pub async fn create(
ctx: ApiCtx,
_path: (),
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-peer/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use rivet_api_builder::{create_router, prelude::*};

use crate::{actors, internal, namespaces, runner_configs, runners};

#[tracing::instrument(skip_all)]
pub async fn router(
name: &'static str,
config: rivet_config::Config,
Expand Down
3 changes: 3 additions & 0 deletions packages/core/api-peer/src/runner_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use rivet_types::keys::namespace::runner_config::RunnerConfigVariant;
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};

#[tracing::instrument(skip_all)]
pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<ListResponse> {
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
Expand Down Expand Up @@ -94,6 +95,7 @@ pub struct UpsertRequest(pub rivet_api_types::namespaces::runner_configs::Runner
#[schema(as = RunnerConfigsUpsertResponse)]
pub struct UpsertResponse {}

#[tracing::instrument(skip_all)]
pub async fn upsert(
ctx: ApiCtx,
path: UpsertPath,
Expand Down Expand Up @@ -134,6 +136,7 @@ pub struct DeletePath {
#[schema(as = RunnerConfigsDeleteResponse)]
pub struct DeleteResponse {}

#[tracing::instrument(skip_all)]
pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result<DeleteResponse> {
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-peer/src/runners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use utoipa::{IntoParams, ToSchema};
(status = 200, body = ListResponse),
),
)]
#[tracing::instrument(skip_all)]
pub async fn list(ctx: ApiCtx, _path: (), query: ListQuery) -> Result<ListResponse> {
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
Expand Down Expand Up @@ -76,6 +77,7 @@ pub struct ListNamesResponse {
pub pagination: Pagination,
}

#[tracing::instrument(skip_all)]
pub async fn list_names(
ctx: ApiCtx,
_path: (),
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-public/src/actors/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub async fn create(
}
}

#[tracing::instrument(skip_all)]
async fn create_inner(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-public/src/actors/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct DeleteResponse {}
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn delete(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Expand All @@ -61,6 +62,7 @@ pub async fn delete(
}
}

#[tracing::instrument(skip_all)]
async fn delete_inner(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-public/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub async fn get_or_create(
}
}

#[tracing::instrument(skip_all)]
async fn get_or_create_inner(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-public/src/actors/list_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::ctx::ApiCtx;
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn list_names(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Expand All @@ -39,6 +40,7 @@ pub async fn list_names(
}
}

#[tracing::instrument(skip_all)]
async fn list_names_inner(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down
3 changes: 3 additions & 0 deletions packages/core/api-public/src/actors/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::collections::HashMap;

/// Helper function to fetch an actor by ID, automatically routing to the correct datacenter
/// based on the actor ID's label.
#[tracing::instrument(skip_all)]
pub async fn fetch_actor_by_id(
ctx: &ApiCtx,
headers: HeaderMap,
Expand Down Expand Up @@ -55,6 +56,7 @@ pub async fn fetch_actor_by_id(

/// Helper function to fetch multiple actors by their IDs, automatically routing to the correct datacenters
/// based on each actor ID's label. This function batches requests by datacenter for efficiency.
#[tracing::instrument(skip_all)]
pub async fn fetch_actors_by_ids(
ctx: &ApiCtx,
headers: HeaderMap,
Expand Down Expand Up @@ -186,6 +188,7 @@ pub fn extract_duplicate_key_error(err: &anyhow::Error) -> Option<Id> {
}

/// Determine the datacenter label to create the actor in.
#[tracing::instrument(skip_all)]
pub async fn find_dc_for_actor_creation(
ctx: &ApiCtx,
namespace_id: Id,
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-public/src/datacenters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::ctx::ApiCtx;
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn list(Extension(ctx): Extension<ApiCtx>) -> Response {
match list_inner(ctx).await {
Ok(response) => Json(response).into_response(),
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-public/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Result;

Check warning on line 1 in packages/core/api-public/src/health.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/health.rs
use axum::{extract::Extension, response::IntoResponse, Json};
use futures_util::StreamExt;
use rivet_api_builder::ApiError;
Expand Down Expand Up @@ -47,6 +47,7 @@
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn fanout(Extension(ctx): Extension<ApiCtx>) -> impl IntoResponse {
match fanout_inner(ctx).await {
Ok(response) => Json(response).into_response(),
Expand Down Expand Up @@ -126,6 +127,7 @@
})
}

#[tracing::instrument(skip_all)]
async fn send_health_check(
ctx: &ApiCtx,
dc: &rivet_config::config::topology::Datacenter,
Expand All @@ -149,7 +151,7 @@
let response = res.json::<HealthResponse>().await?;
Ok(response)
} else {
anyhow::bail!("Health check returned status: {}", res.status())

Check warning on line 154 in packages/core/api-public/src/health.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/health.rs
}
}

1 change: 1 addition & 0 deletions packages/core/api-public/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use axum::response::IntoResponse;
use serde_json::json;

/// Returns metadata about the API including runtime and version
#[tracing::instrument(skip_all)]
pub async fn get_metadata() -> impl IntoResponse {
Json(json!({
"runtime": "engine",
Expand Down
Loading
Loading