Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions packages/common/api-util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::{Context, Result};
use axum::{body::Body, response::Response};

Check warning on line 2 in packages/common/api-util/src/lib.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/common/api-util/src/lib.rs
use futures_util::StreamExt;
use rivet_api_builder::{ApiCtx, ErrorResponse, RawErrorResponse};
use serde::{de::DeserializeOwned, Serialize};
Expand All @@ -10,13 +10,12 @@
pub use axum::http::{HeaderMap, Method};

/// Generic function to make raw requests to remote datacenters by label (returns axum Response)
#[tracing::instrument(skip(ctx, headers, query, body))]
#[tracing::instrument(skip(ctx, query, body))]
pub async fn request_remote_datacenter_raw(
ctx: &ApiCtx,
dc_label: u16,
endpoint: &str,
method: Method,
headers: HeaderMap,
query: Option<&impl Serialize>,
body: Option<&impl Serialize>,
) -> Result<Response> {
Expand All @@ -35,7 +34,7 @@

tracing::debug!(%method, %url, "sending raw request to remote datacenter");

let mut request = client.request(method, url).headers(headers);
let mut request = client.request(method, url);

if let Some(b) = body {
request = request.json(b);
Expand All @@ -51,13 +50,12 @@
}

/// Generic function to make requests to a specific datacenter
#[tracing::instrument(skip(config, headers, query, body))]
#[tracing::instrument(skip(config, query, body))]
pub async fn request_remote_datacenter<T>(
config: &rivet_config::Config,
dc_label: u16,
endpoint: &str,
method: Method,
headers: HeaderMap,
query: Option<&impl Serialize>,
body: Option<&impl Serialize>,
) -> Result<T>
Expand All @@ -78,7 +76,7 @@

tracing::debug!(%method, %url, "sending request to remote datacenter");

let mut request = client.request(method, url).headers(headers);
let mut request = client.request(method, url);

if let Some(b) = body {
request = request.json(b);
Expand All @@ -95,10 +93,9 @@

/// Generic function to fanout requests to all datacenters and aggregate results
/// Returns aggregated results and errors only if all requests fail
#[tracing::instrument(skip(ctx, headers, query, local_handler, aggregator))]
#[tracing::instrument(skip(ctx, query, local_handler, aggregator))]
pub async fn fanout_to_datacenters<I, Q, F, Fut, A, R>(
ctx: ApiCtx,
headers: HeaderMap,
endpoint: &str,
query: Q,
local_handler: F,
Expand All @@ -116,7 +113,6 @@

let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| {
let ctx = ctx.clone();
let headers = headers.clone();
let query = query.clone();
let endpoint = endpoint.to_string();
let local_handler = local_handler.clone();
Expand All @@ -134,7 +130,6 @@
dc.datacenter_label,
&endpoint,
Method::GET,
headers,
Some(&query),
Option::<&()>::None,
)
Expand Down
10 changes: 2 additions & 8 deletions packages/core/api-public/src/actors/create.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::Result;
use axum::{
http::HeaderMap,
response::{IntoResponse, Response},
};
use axum::response::{IntoResponse, Response};
use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
Expand Down Expand Up @@ -49,11 +46,10 @@ pub struct CreateQuery {
)]
pub async fn create(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Query(query): Query<CreateQuery>,
Json(body): Json<CreateRequest>,
) -> Response {
match create_inner(ctx, headers, query, body).await {
match create_inner(ctx, query, body).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
Expand All @@ -62,7 +58,6 @@ pub async fn create(
#[tracing::instrument(skip_all)]
async fn create_inner(
ctx: ApiCtx,
headers: HeaderMap,
query: CreateQuery,
body: CreateRequest,
) -> Result<CreateResponse> {
Expand Down Expand Up @@ -96,7 +91,6 @@ async fn create_inner(
target_dc_label,
"/actors",
axum::http::Method::POST,
headers,
Some(&query),
Some(&body),
)
Expand Down
10 changes: 2 additions & 8 deletions packages/core/api-public/src/actors/delete.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::Result;
use axum::{
http::HeaderMap,
response::{IntoResponse, Response},
};
use axum::response::{IntoResponse, Response};
use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Path, Query},
Expand Down Expand Up @@ -52,20 +49,18 @@
#[tracing::instrument(skip_all)]
pub async fn delete(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Path(path): Path<DeletePath>,
Query(query): Query<DeleteQuery>,
) -> Response {
match delete_inner(ctx, headers, path, query).await {
match delete_inner(ctx, path, query).await {
Ok(response) => response,
Err(err) => ApiError::from(err).into_response(),
}
}

Check warning on line 59 in packages/core/api-public/src/actors/delete.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/actors/delete.rs

#[tracing::instrument(skip_all)]
async fn delete_inner(
ctx: ApiCtx,
headers: HeaderMap,
path: DeletePath,
query: DeleteQuery,
) -> Result<Response> {
Expand All @@ -87,7 +82,6 @@
path.actor_id.label(),
&format!("/actors/{}", path.actor_id),
axum::http::Method::DELETE,
headers,
Some(&query),
Option::<&()>::None,
)
Expand Down
12 changes: 4 additions & 8 deletions packages/core/api-public/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use anyhow::Result;
use axum::{
http::HeaderMap,
response::{IntoResponse, Response},

Check warning on line 4 in packages/core/api-public/src/actors/get_or_create.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/actors/get_or_create.rs
};
use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
ApiError,
};
use rivet_types::actors::CrashPolicy;
use rivet_util::Id;
Expand Down Expand Up @@ -159,13 +159,9 @@
?existing_actor_id,
"received duplicate key error, returning existing actor id"
);
let actor = utils::fetch_actor_by_id(
&ctx,
headers.clone(),
existing_actor_id,
query.namespace.clone(),
)
.await?;
let actor =
utils::fetch_actor_by_id(&ctx, existing_actor_id, query.namespace.clone())
.await?;
return Ok(GetOrCreateResponse {
actor,
created: false,
Expand Down
13 changes: 3 additions & 10 deletions packages/core/api-public/src/actors/list.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::{Context, Result};
use axum::{
http::HeaderMap,
response::{IntoResponse, Response},
};
use axum::response::{IntoResponse, Response};
use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
Expand Down Expand Up @@ -63,21 +60,20 @@
path = "/actors",
params(ListQuery),
responses(
(status = 200, body = ListResponse),

Check warning on line 63 in packages/core/api-public/src/actors/list.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/actors/list.rs
),
)]
pub async fn list(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Query(query): Query<ListQuery>,
) -> Response {
match list_inner(ctx, headers, query).await {
match list_inner(ctx, query).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
}

async fn list_inner(ctx: ApiCtx, headers: HeaderMap, query: ListQuery) -> Result<ListResponse> {
async fn list_inner(ctx: ApiCtx, query: ListQuery) -> Result<ListResponse> {
ctx.auth().await?;

// Parse query
Expand Down Expand Up @@ -132,7 +128,6 @@
// Fetch actors
let actors = fetch_actors_by_ids(
&ctx,
headers,
actor_ids,
query.namespace.clone(),
query.include_destroyed,
Expand Down Expand Up @@ -181,7 +176,6 @@
// Fetch actors
let actors = fetch_actors_by_ids(
&ctx,
headers,
vec![actor_id],
query.namespace.clone(),
query.include_destroyed,
Expand Down Expand Up @@ -226,7 +220,6 @@
Vec<rivet_types::actors::Actor>,
>(
ctx.into(),
headers,
"/actors",
peer_query,
|ctx, query| async move { rivet_api_peer::actors::list::list(ctx, (), query).await },
Expand Down
10 changes: 2 additions & 8 deletions packages/core/api-public/src/actors/list_names.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::Result;
use axum::{
http::HeaderMap,
response::{IntoResponse, Response},
};
use axum::response::{IntoResponse, Response};
use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
Expand Down Expand Up @@ -31,19 +28,17 @@
#[tracing::instrument(skip_all)]
pub async fn list_names(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Query(query): Query<ListNamesQuery>,
) -> Response {
match list_names_inner(ctx, headers, query).await {
match list_names_inner(ctx, query).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
}

Check warning on line 37 in packages/core/api-public/src/actors/list_names.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/actors/list_names.rs

#[tracing::instrument(skip_all)]
async fn list_names_inner(
ctx: ApiCtx,
headers: HeaderMap,
query: ListNamesQuery,
) -> Result<ListNamesResponse> {
ctx.auth().await?;
Expand All @@ -59,7 +54,6 @@
let mut all_names =
fanout_to_datacenters::<ListNamesResponse, _, _, _, _, Vec<(String, ActorName)>>(
ctx.into(),
headers,
"/actors/names",
peer_query,
|ctx, query| async move {
Expand Down
7 changes: 1 addition & 6 deletions packages/core/api-public/src/actors/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use axum::http::{HeaderMap, Method};
use axum::http::Method;
use rivet_api_builder::ApiCtx;
use rivet_api_util::request_remote_datacenter;
use rivet_error::RivetError;
Expand All @@ -7,12 +7,11 @@
use rivet_util::Id;
use std::collections::HashMap;

/// Helper function to fetch an actor by ID, automatically routing to the correct datacenter

Check warning on line 10 in packages/core/api-public/src/actors/utils.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/engine/engine/packages/core/api-public/src/actors/utils.rs
/// based on the actor ID's label.
#[tracing::instrument(skip_all)]
pub async fn fetch_actor_by_id(
ctx: &ApiCtx,
headers: HeaderMap,
actor_id: Id,
namespace: String,
) -> Result<Actor> {
Expand All @@ -39,7 +38,6 @@
actor_id.label(),
"/actors",
Method::GET,
headers,
Some(&list_query),
Option::<&()>::None,
)
Expand All @@ -59,7 +57,6 @@
#[tracing::instrument(skip_all)]
pub async fn fetch_actors_by_ids(
ctx: &ApiCtx,
headers: HeaderMap,
actor_ids: Vec<Id>,
namespace: String,
include_destroyed: Option<bool>,
Expand All @@ -81,7 +78,6 @@
// Fetch actors in batch from each datacenter
let fetch_futures = actors_by_dc.into_iter().map(|(dc_label, dc_actor_ids)| {
let ctx = ctx.clone();
let headers = headers.clone();
let namespace = namespace.clone();
let include_destroyed = include_destroyed;
let limit = limit;
Expand Down Expand Up @@ -116,7 +112,6 @@
dc_label,
"/actors",
Method::GET,
headers,
Some(&peer_query),
Option::<&()>::None,
)
Expand Down
16 changes: 4 additions & 12 deletions packages/core/api-public/src/namespaces.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use anyhow::Result;
use axum::{
http::HeaderMap,
response::{IntoResponse, Response},
};
use axum::response::{IntoResponse, Response};
use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
Expand All @@ -26,16 +23,15 @@ use crate::ctx::ApiCtx;
#[tracing::instrument(skip_all)]
pub async fn list(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Query(query): Query<ListQuery>,
) -> Response {
match list_inner(ctx, headers, query).await {
match list_inner(ctx, query).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
}

async fn list_inner(ctx: ApiCtx, headers: HeaderMap, query: ListQuery) -> Result<ListResponse> {
async fn list_inner(ctx: ApiCtx, query: ListQuery) -> Result<ListResponse> {
ctx.auth().await?;

if ctx.config().is_leader() {
Expand All @@ -47,7 +43,6 @@ async fn list_inner(ctx: ApiCtx, headers: HeaderMap, query: ListQuery) -> Result
leader_dc.datacenter_label,
"/namespaces",
axum::http::Method::GET,
headers,
Some(&query),
Option::<&()>::None,
)
Expand All @@ -68,10 +63,9 @@ async fn list_inner(ctx: ApiCtx, headers: HeaderMap, query: ListQuery) -> Result
#[tracing::instrument(skip_all)]
pub async fn create(
Extension(ctx): Extension<ApiCtx>,
headers: HeaderMap,
Json(body): Json<CreateRequest>,
) -> Response {
match create_inner(ctx, headers, body).await {
match create_inner(ctx, body).await {
Ok(response) => Json(response).into_response(),
Err(err) => ApiError::from(err).into_response(),
}
Expand All @@ -80,7 +74,6 @@ pub async fn create(
#[tracing::instrument(skip_all)]
async fn create_inner(
ctx: ApiCtx,
headers: HeaderMap,
body: CreateRequest,
) -> Result<CreateResponse> {
ctx.auth().await?;
Expand All @@ -94,7 +87,6 @@ async fn create_inner(
leader_dc.datacenter_label,
"/namespaces",
axum::http::Method::POST,
headers,
Option::<&()>::None,
Some(&body),
)
Expand Down
Loading
Loading