Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions packages/common/api-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ use anyhow::{Context, Result};
use axum::{body::Body, response::Response};
use futures_util::StreamExt;
use rivet_api_builder::{ApiCtx, ErrorResponse, RawErrorResponse};
use serde::{de::DeserializeOwned, Serialize};
use serde::{Serialize, de::DeserializeOwned};
use std::future::Future;

mod errors;

pub use axum::http::{HeaderMap, Method};

/// Generic function to make raw requests to remote datacenters by label (returns axum Response)
#[tracing::instrument(skip(ctx, headers, query, body))]
#[tracing::instrument(skip(ctx, query, body))]
pub async fn request_remote_datacenter_raw(
ctx: &ApiCtx,
dc_label: u16,
endpoint: &str,
method: Method,
headers: HeaderMap,
query: Option<&impl Serialize>,
body: Option<&impl Serialize>,
) -> Result<Response> {
Expand All @@ -35,7 +34,7 @@ pub async fn request_remote_datacenter_raw(

tracing::debug!(%method, %url, "sending raw request to remote datacenter");

let mut request = client.request(method, url).headers(headers);
let mut request = client.request(method, url);

if let Some(b) = body {
request = request.json(b);
Expand All @@ -51,13 +50,12 @@ pub async fn request_remote_datacenter_raw(
}

/// Generic function to make requests to a specific datacenter
#[tracing::instrument(skip(config, headers, query, body))]
#[tracing::instrument(skip(config, query, body))]
pub async fn request_remote_datacenter<T>(
config: &rivet_config::Config,
dc_label: u16,
endpoint: &str,
method: Method,
headers: HeaderMap,
query: Option<&impl Serialize>,
body: Option<&impl Serialize>,
) -> Result<T>
Expand All @@ -78,7 +76,7 @@ where

tracing::debug!(%method, %url, "sending request to remote datacenter");

let mut request = client.request(method, url).headers(headers);
let mut request = client.request(method, url);

if let Some(b) = body {
request = request.json(b);
Expand All @@ -95,10 +93,9 @@ where

/// Generic function to fanout requests to all datacenters and aggregate results
/// Returns aggregated results and errors only if all requests fail
#[tracing::instrument(skip(ctx, headers, query, local_handler, aggregator))]
#[tracing::instrument(skip(ctx, query, local_handler, aggregator))]
pub async fn fanout_to_datacenters<I, Q, F, Fut, A, R>(
ctx: ApiCtx,
headers: HeaderMap,
endpoint: &str,
query: Q,
local_handler: F,
Expand All @@ -116,7 +113,6 @@ where

let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| {
let ctx = ctx.clone();
let headers = headers.clone();
let query = query.clone();
let endpoint = endpoint.to_string();
let local_handler = local_handler.clone();
Expand All @@ -134,7 +130,6 @@ where
dc.datacenter_label,
&endpoint,
Method::GET,
headers,
Some(&query),
Option::<&()>::None,
)
Expand Down
5 changes: 1 addition & 4 deletions packages/common/runtime/src/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

// Build and apply otel layers to the registry if otel is enabled
let (otel_trace_layer, otel_metric_layer) = match otel_providers {
Some(providers) => {

Check warning on line 26 in packages/common/runtime/src/traces.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/common/runtime/src/traces.rs
let tracer = providers.tracer_provider.tracer("tracing-otel-subscriber");

let otel_trace_layer =
Expand All @@ -32,10 +32,7 @@
let otel_metric_layer = MetricsLayer::new(providers.meter_provider.clone())
.with_filter(build_filter_from_env_var("RUST_TRACE"));

(
Some(otel_trace_layer),
Some(otel_metric_layer),
)
(Some(otel_trace_layer), Some(otel_metric_layer))
}
None => (None, None),
};
Expand Down
10 changes: 2 additions & 8 deletions packages/core/api-public/src/actors/create.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::Result;
use axum::{
http::HeaderMap,
response::{IntoResponse, Response},
};
use axum::response::{IntoResponse, Response};
use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
Expand Down Expand Up @@ -49,11 +46,10 @@ pub struct CreateQuery {
)]
pub async fn create(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Query(query): Query<CreateQuery>,
Json(body): Json<CreateRequest>,
) -> Response {
match create_inner(ctx, headers, query, body).await {
match create_inner(ctx, query, body).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
Expand All @@ -62,7 +58,6 @@ pub async fn create(
#[tracing::instrument(skip_all)]
async fn create_inner(
ctx: ApiCtx,
headers: HeaderMap,
query: CreateQuery,
body: CreateRequest,
) -> Result<CreateResponse> {
Expand Down Expand Up @@ -96,7 +91,6 @@ async fn create_inner(
target_dc_label,
"/actors",
axum::http::Method::POST,
headers,
Some(&query),
Some(&body),
)
Expand Down
10 changes: 2 additions & 8 deletions packages/core/api-public/src/actors/delete.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::Result;
use axum::{
http::HeaderMap,
response::{IntoResponse, Response},
};
use axum::response::{IntoResponse, Response};
use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Path, Query},
Expand Down Expand Up @@ -52,20 +49,18 @@
#[tracing::instrument(skip_all)]
pub async fn delete(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Path(path): Path<DeletePath>,
Query(query): Query<DeleteQuery>,
) -> Response {
match delete_inner(ctx, headers, path, query).await {
match delete_inner(ctx, path, query).await {
Ok(response) => response,
Err(err) => ApiError::from(err).into_response(),
}
}

Check warning on line 59 in packages/core/api-public/src/actors/delete.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/actors/delete.rs

#[tracing::instrument(skip_all)]
async fn delete_inner(
ctx: ApiCtx,
headers: HeaderMap,
path: DeletePath,
query: DeleteQuery,
) -> Result<Response> {
Expand All @@ -87,7 +82,6 @@
path.actor_id.label(),
&format!("/actors/{}", path.actor_id),
axum::http::Method::DELETE,
headers,
Some(&query),
Option::<&()>::None,
)
Expand Down
12 changes: 4 additions & 8 deletions packages/core/api-public/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::Result;
use axum::{
http::HeaderMap,
response::{IntoResponse, Response},

Check warning on line 4 in packages/core/api-public/src/actors/get_or_create.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/actors/get_or_create.rs
};
use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
ApiError,
};
use rivet_types::actors::CrashPolicy;
use rivet_util::Id;
Expand Down Expand Up @@ -159,13 +159,9 @@
?existing_actor_id,
"received duplicate key error, returning existing actor id"
);
let actor = utils::fetch_actor_by_id(
&ctx,
headers.clone(),
existing_actor_id,
query.namespace.clone(),
)
.await?;
let actor =
utils::fetch_actor_by_id(&ctx, existing_actor_id, query.namespace.clone())
.await?;
return Ok(GetOrCreateResponse {
actor,
created: false,
Expand Down
13 changes: 3 additions & 10 deletions packages/core/api-public/src/actors/list.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::{Context, Result};
use axum::{
http::HeaderMap,
response::{IntoResponse, Response},
};
use axum::response::{IntoResponse, Response};
use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
Expand Down Expand Up @@ -63,21 +60,20 @@
path = "/actors",
params(ListQuery),
responses(
(status = 200, body = ListResponse),

Check warning on line 63 in packages/core/api-public/src/actors/list.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/actors/list.rs
),
)]
pub async fn list(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Query(query): Query<ListQuery>,
) -> Response {
match list_inner(ctx, headers, query).await {
match list_inner(ctx, query).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
}

async fn list_inner(ctx: ApiCtx, headers: HeaderMap, query: ListQuery) -> Result<ListResponse> {
async fn list_inner(ctx: ApiCtx, query: ListQuery) -> Result<ListResponse> {
ctx.auth().await?;

// Parse query
Expand Down Expand Up @@ -132,7 +128,6 @@
// Fetch actors
let actors = fetch_actors_by_ids(
&ctx,
headers,
actor_ids,
query.namespace.clone(),
query.include_destroyed,
Expand Down Expand Up @@ -181,7 +176,6 @@
// Fetch actors
let actors = fetch_actors_by_ids(
&ctx,
headers,
vec![actor_id],
query.namespace.clone(),
query.include_destroyed,
Expand Down Expand Up @@ -226,7 +220,6 @@
Vec<rivet_types::actors::Actor>,
>(
ctx.into(),
headers,
"/actors",
peer_query,
|ctx, query| async move { rivet_api_peer::actors::list::list(ctx, (), query).await },
Expand Down
10 changes: 2 additions & 8 deletions packages/core/api-public/src/actors/list_names.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::Result;
use axum::{
http::HeaderMap,
response::{IntoResponse, Response},
};
use axum::response::{IntoResponse, Response};
use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
Expand Down Expand Up @@ -31,19 +28,17 @@
#[tracing::instrument(skip_all)]
pub async fn list_names(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Query(query): Query<ListNamesQuery>,
) -> Response {
match list_names_inner(ctx, headers, query).await {
match list_names_inner(ctx, query).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
}

Check warning on line 37 in packages/core/api-public/src/actors/list_names.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/actors/list_names.rs

#[tracing::instrument(skip_all)]
async fn list_names_inner(
ctx: ApiCtx,
headers: HeaderMap,
query: ListNamesQuery,
) -> Result<ListNamesResponse> {
ctx.auth().await?;
Expand All @@ -59,7 +54,6 @@
let mut all_names =
fanout_to_datacenters::<ListNamesResponse, _, _, _, _, Vec<(String, ActorName)>>(
ctx.into(),
headers,
"/actors/names",
peer_query,
|ctx, query| async move {
Expand Down
7 changes: 1 addition & 6 deletions packages/core/api-public/src/actors/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use axum::http::{HeaderMap, Method};
use axum::http::Method;
use rivet_api_builder::ApiCtx;
use rivet_api_util::request_remote_datacenter;
use rivet_error::RivetError;
Expand All @@ -7,12 +7,11 @@
use rivet_util::Id;
use std::collections::HashMap;

/// Helper function to fetch an actor by ID, automatically routing to the correct datacenter

Check warning on line 10 in packages/core/api-public/src/actors/utils.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/actors/utils.rs
/// based on the actor ID's label.
#[tracing::instrument(skip_all)]
pub async fn fetch_actor_by_id(
ctx: &ApiCtx,
headers: HeaderMap,
actor_id: Id,
namespace: String,
) -> Result<Actor> {
Expand All @@ -39,7 +38,6 @@
actor_id.label(),
"/actors",
Method::GET,
headers,
Some(&list_query),
Option::<&()>::None,
)
Expand All @@ -59,7 +57,6 @@
#[tracing::instrument(skip_all)]
pub async fn fetch_actors_by_ids(
ctx: &ApiCtx,
headers: HeaderMap,
actor_ids: Vec<Id>,
namespace: String,
include_destroyed: Option<bool>,
Expand All @@ -81,7 +78,6 @@
// Fetch actors in batch from each datacenter
let fetch_futures = actors_by_dc.into_iter().map(|(dc_label, dc_actor_ids)| {
let ctx = ctx.clone();
let headers = headers.clone();
let namespace = namespace.clone();
let include_destroyed = include_destroyed;
let limit = limit;
Expand Down Expand Up @@ -116,7 +112,6 @@
dc_label,
"/actors",
Method::GET,
headers,
Some(&peer_query),
Option::<&()>::None,
)
Expand Down
2 changes: 1 addition & 1 deletion packages/core/api-public/src/health.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use axum::{extract::Extension, response::IntoResponse, Json};
use axum::{Json, extract::Extension, response::IntoResponse};
use futures_util::StreamExt;
use rivet_api_builder::ApiError;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -151,7 +151,7 @@
let response = res.json::<HealthResponse>().await?;
Ok(response)
} else {
anyhow::bail!("Health check returned status: {}", res.status())

Check warning on line 154 in packages/core/api-public/src/health.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/health.rs
}
}

Loading
Loading