Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion packages/common/api-util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use anyhow::{Context, Result};
use axum::{body::Body, response::Response};

Check warning on line 2 in packages/common/api-util/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/common/api-util/src/lib.rs
use futures_util::StreamExt;
use rivet_api_builder::{ApiCtx, ErrorResponse, RawErrorResponse};
use serde::{Serialize, de::DeserializeOwned};
use serde::{de::DeserializeOwned, Serialize};
use std::future::Future;

mod errors;

pub use axum::http::{HeaderMap, Method};

/// Generic function to make raw requests to remote datacenters by label (returns axum Response)
#[tracing::instrument(skip(ctx, headers, query, body))]
pub async fn request_remote_datacenter_raw(
ctx: &ApiCtx,
dc_label: u16,
Expand All @@ -32,6 +33,8 @@
url.set_query(Some(&serde_html_form::to_string(q)?));
}

tracing::debug!(%method, %url, "sending raw request to remote datacenter");

let mut request = client.request(method, url).headers(headers);

if let Some(b) = body {
Expand All @@ -48,6 +51,7 @@
}

/// Generic function to make requests to a specific datacenter
#[tracing::instrument(skip(config, headers, query, body))]
pub async fn request_remote_datacenter<T>(
config: &rivet_config::Config,
dc_label: u16,
Expand All @@ -72,6 +76,8 @@
url.set_query(Some(&serde_html_form::to_string(q)?));
}

tracing::debug!(%method, %url, "sending request to remote datacenter");

let mut request = client.request(method, url).headers(headers);

if let Some(b) = body {
Expand All @@ -89,6 +95,7 @@

/// Generic function to fanout requests to all datacenters and aggregate results
/// Returns aggregated results and errors only if all requests fail
#[tracing::instrument(skip(ctx, headers, query, local_handler, aggregator))]
pub async fn fanout_to_datacenters<I, Q, F, Fut, A, R>(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down Expand Up @@ -164,6 +171,7 @@
Ok(aggregated)
}

#[tracing::instrument(skip_all)]
pub async fn reqwest_to_axum_response(reqwest_response: reqwest::Response) -> Result<Response> {
let status = reqwest_response.status();
let headers = reqwest_response.headers().clone();
Expand All @@ -178,6 +186,7 @@
Ok(response)
}

#[tracing::instrument(skip_all)]
pub async fn parse_response<T: DeserializeOwned>(reqwest_response: reqwest::Response) -> Result<T> {
let status = reqwest_response.status();
let response_text = reqwest_response.text().await?;
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-peer/src/actors/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use rivet_api_types::actors::create::{CreateQuery, CreateRequest, CreateResponse};

#[tracing::instrument(skip_all)]
pub async fn create(
ctx: ApiCtx,
_path: (),
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-peer/src/actors/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct DeletePath {
(status = 200, body = DeleteResponse),
),
)]
#[tracing::instrument(skip_all)]
pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result<DeleteResponse> {
// Get the actor first to verify it exists
let actors_res = ctx
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-peer/src/actors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use rivet_api_types::{actors::list::*, pagination::Pagination};
(status = 200, body = ListResponse),
),
)]
#[tracing::instrument(skip_all)]
pub async fn list(ctx: ApiCtx, _path: (), query: ListQuery) -> Result<ListResponse> {
let key = query.key;
let actor_ids = query.actor_ids.as_ref().map(|x| {
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-peer/src/actors/list_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use rivet_types::actors::ActorName;
(status = 200, body = ListNamesResponse),
),
)]
#[tracing::instrument(skip_all)]
pub async fn list_names(
ctx: ApiCtx,
_path: (),
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-peer/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct CachePurgeRequest {
#[derive(Serialize)]
pub struct CachePurgeResponse {}

#[tracing::instrument(skip_all)]
pub async fn cache_purge(
ctx: ApiCtx,
_path: (),
Expand All @@ -30,6 +31,7 @@ pub async fn cache_purge(
#[derive(Serialize)]
pub struct BumpServerlessAutoscalerResponse {}

#[tracing::instrument(skip_all)]
pub async fn bump_serverless_autoscaler(
ctx: ApiCtx,
_path: (),
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-peer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod runners;

pub use router::router as create_router;

#[tracing::instrument(skip_all)]
pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> {
let host = config.api_peer().host();
let port = config.api_peer().port();
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-peer/src/namespaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::ToSchema;

#[tracing::instrument(skip_all)]
pub async fn list(ctx: ApiCtx, _path: (), query: ListQuery) -> Result<ListResponse> {
let namespace_ids = query.namespace_ids.as_ref().map(|x| {
x.split(',')
Expand Down Expand Up @@ -72,6 +73,7 @@ pub struct CreateResponse {
pub namespace: rivet_types::namespaces::Namespace,
}

#[tracing::instrument(skip_all)]
pub async fn create(
ctx: ApiCtx,
_path: (),
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-peer/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use rivet_api_builder::{create_router, prelude::*};

use crate::{actors, internal, namespaces, runner_configs, runners};

#[tracing::instrument(skip_all)]
pub async fn router(
name: &'static str,
config: rivet_config::Config,
Expand Down
3 changes: 3 additions & 0 deletions packages/core/api-peer/src/runner_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use rivet_types::keys::namespace::runner_config::RunnerConfigVariant;
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};

#[tracing::instrument(skip_all)]
pub async fn list(ctx: ApiCtx, _path: ListPath, query: ListQuery) -> Result<ListResponse> {
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
Expand Down Expand Up @@ -94,6 +95,7 @@ pub struct UpsertRequest(pub rivet_api_types::namespaces::runner_configs::Runner
#[schema(as = RunnerConfigsUpsertResponse)]
pub struct UpsertResponse {}

#[tracing::instrument(skip_all)]
pub async fn upsert(
ctx: ApiCtx,
path: UpsertPath,
Expand Down Expand Up @@ -134,6 +136,7 @@ pub struct DeletePath {
#[schema(as = RunnerConfigsDeleteResponse)]
pub struct DeleteResponse {}

#[tracing::instrument(skip_all)]
pub async fn delete(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result<DeleteResponse> {
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-peer/src/runners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use utoipa::{IntoParams, ToSchema};
(status = 200, body = ListResponse),
),
)]
#[tracing::instrument(skip_all)]
pub async fn list(ctx: ApiCtx, _path: (), query: ListQuery) -> Result<ListResponse> {
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
Expand Down Expand Up @@ -76,6 +77,7 @@ pub struct ListNamesResponse {
pub pagination: Pagination,
}

#[tracing::instrument(skip_all)]
pub async fn list_names(
ctx: ApiCtx,
_path: (),
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-public/src/actors/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub async fn create(
}
}

#[tracing::instrument(skip_all)]
async fn create_inner(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-public/src/actors/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct DeleteResponse {}
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn delete(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Expand All @@ -61,6 +62,7 @@ pub async fn delete(
}
}

#[tracing::instrument(skip_all)]
async fn delete_inner(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-public/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ pub async fn get_or_create(
}
}

#[tracing::instrument(skip_all)]
async fn get_or_create_inner(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-public/src/actors/list_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::ctx::ApiCtx;
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn list_names(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Expand All @@ -39,6 +40,7 @@ pub async fn list_names(
}
}

#[tracing::instrument(skip_all)]
async fn list_names_inner(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down
3 changes: 3 additions & 0 deletions packages/core/api-public/src/actors/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::collections::HashMap;

/// Helper function to fetch an actor by ID, automatically routing to the correct datacenter
/// based on the actor ID's label.
#[tracing::instrument(skip_all)]
pub async fn fetch_actor_by_id(
ctx: &ApiCtx,
headers: HeaderMap,
Expand Down Expand Up @@ -55,6 +56,7 @@ pub async fn fetch_actor_by_id(

/// Helper function to fetch multiple actors by their IDs, automatically routing to the correct datacenters
/// based on each actor ID's label. This function batches requests by datacenter for efficiency.
#[tracing::instrument(skip_all)]
pub async fn fetch_actors_by_ids(
ctx: &ApiCtx,
headers: HeaderMap,
Expand Down Expand Up @@ -186,6 +188,7 @@ pub fn extract_duplicate_key_error(err: &anyhow::Error) -> Option<Id> {
}

/// Determine the datacenter label to create the actor in.
#[tracing::instrument(skip_all)]
pub async fn find_dc_for_actor_creation(
ctx: &ApiCtx,
namespace_id: Id,
Expand Down
1 change: 1 addition & 0 deletions packages/core/api-public/src/datacenters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::ctx::ApiCtx;
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn list(Extension(ctx): Extension<ApiCtx>) -> Response {
match list_inner(ctx).await {
Ok(response) => Json(response).into_response(),
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-public/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Result;

Check warning on line 1 in packages/core/api-public/src/health.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/health.rs
use axum::{extract::Extension, response::IntoResponse, Json};
use futures_util::StreamExt;
use rivet_api_builder::ApiError;
Expand Down Expand Up @@ -47,6 +47,7 @@
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn fanout(Extension(ctx): Extension<ApiCtx>) -> impl IntoResponse {
match fanout_inner(ctx).await {
Ok(response) => Json(response).into_response(),
Expand Down Expand Up @@ -126,6 +127,7 @@
})
}

#[tracing::instrument(skip_all)]
async fn send_health_check(
ctx: &ApiCtx,
dc: &rivet_config::config::topology::Datacenter,
Expand All @@ -149,7 +151,7 @@
let response = res.json::<HealthResponse>().await?;
Ok(response)
} else {
anyhow::bail!("Health check returned status: {}", res.status())

Check warning on line 154 in packages/core/api-public/src/health.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/health.rs
}
}

1 change: 1 addition & 0 deletions packages/core/api-public/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use axum::response::IntoResponse;
use serde_json::json;

/// Returns metadata about the API including runtime and version
#[tracing::instrument(skip_all)]
pub async fn get_metadata() -> impl IntoResponse {
Json(json!({
"runtime": "engine",
Expand Down
3 changes: 3 additions & 0 deletions packages/core/api-public/src/namespaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::ctx::ApiCtx;
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn list(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Expand Down Expand Up @@ -64,6 +65,7 @@ async fn list_inner(ctx: ApiCtx, headers: HeaderMap, query: ListQuery) -> Result
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn create(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Expand All @@ -75,6 +77,7 @@ pub async fn create(
}
}

#[tracing::instrument(skip_all)]
async fn create_inner(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-public/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use axum::{
extract::Request,
middleware::{self, Next},

Check warning on line 3 in packages/core/api-public/src/router.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/router.rs
response::{IntoResponse, Redirect, Response},
};
use reqwest::header::{HeaderMap, AUTHORIZATION};
Expand Down Expand Up @@ -38,6 +38,7 @@
)]
pub struct ApiDoc;

#[tracing::instrument(skip_all)]
pub async fn router(
name: &'static str,
config: rivet_config::Config,
Expand Down Expand Up @@ -114,6 +115,7 @@

/// Middleware to wrap ApiCtx with auth handling capabilities and to throw an error if auth was not explicitly
// handled in an endpoint
#[tracing::instrument(skip_all)]
async fn auth_middleware(
headers: HeaderMap,
mut req: Request,
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-public/src/runner_configs/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::ctx::ApiCtx;
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn delete(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Expand All @@ -37,6 +38,7 @@ pub async fn delete(
}
}

#[tracing::instrument(skip_all)]
async fn delete_inner(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down
2 changes: 2 additions & 0 deletions packages/core/api-public/src/runner_configs/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct RunnerConfigDatacenters {
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn list(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Expand All @@ -54,6 +55,7 @@ pub async fn list(
}
}

#[tracing::instrument(skip_all)]
async fn list_inner(
ctx: ApiCtx,
headers: HeaderMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct RefreshMetadataResponse {}
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn refresh_metadata(
Extension(ctx): Extension<ApiCtx>,
Path(path): Path<RefreshMetadataPath>,
Expand All @@ -60,6 +61,7 @@ pub async fn refresh_metadata(
}
}

#[tracing::instrument(skip_all)]
async fn refresh_metadata_inner(
ctx: ApiCtx,
path: RefreshMetadataPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub enum ServerlessHealthCheckResponse {
),
security(("bearer_auth" = [])),
)]
#[tracing::instrument(skip_all)]
pub async fn serverless_health_check(
Extension(ctx): Extension<ApiCtx>,
Query(query): Query<ServerlessHealthCheckQuery>,
Expand Down
Loading
Loading