Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion engine/artifacts/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

139 changes: 139 additions & 0 deletions engine/packages/api-peer/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use anyhow::Result;
use gas::prelude::*;
use rivet_api_builder::ApiCtx;
use rivet_api_types::actors::get_or_create::{
GetOrCreateQuery, GetOrCreateRequest, GetOrCreateResponse,
};
use rivet_error::RivetError;

#[tracing::instrument(skip_all)]
pub async fn get_or_create(
ctx: ApiCtx,
_path: (),
query: GetOrCreateQuery,
body: GetOrCreateRequest,
) -> Result<GetOrCreateResponse> {
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

// Check if actor already exists for the key
let existing = ctx
.op(pegboard::ops::actor::get_for_key::Input {
namespace_id: namespace.namespace_id,
name: body.name.clone(),
key: body.key.clone(),
})
.await?;

if let Some(actor) = existing.actor {
// Actor exists, return it
return Ok(GetOrCreateResponse {
actor,
created: false,
});
}

// Actor doesn't exist, create it
let actor_id = Id::new_v1(ctx.config().dc_label());

match ctx
.op(pegboard::ops::actor::create::Input {
actor_id,
namespace_id: namespace.namespace_id,
name: body.name.clone(),
key: Some(body.key.clone()),
runner_name_selector: body.runner_name_selector,
input: body.input.clone(),
crash_policy: body.crash_policy,
// NOTE: This can forward if the user attempts to create an actor with a target dc and this dc
// ends up forwarding to another.
forward_request: true,
// api-peer is always creating in its own datacenter
datacenter_name: None,
})
.await
{
Ok(res) => Ok(GetOrCreateResponse {
actor: res.actor,
created: true,
}),
Err(err) => {
// Check if this is a DuplicateKey error and extract the existing actor ID
if let Some(existing_actor_id) = extract_duplicate_key_error(&err) {
tracing::info!(
?existing_actor_id,
"received duplicate key error, fetching existing actor"
);

// Fetch the existing actor - it should be in this datacenter since
// the duplicate key error came from this datacenter
let res = ctx
.op(pegboard::ops::actor::get::Input {
actor_ids: vec![existing_actor_id],
})
.await?;

let actor = res
.actors
.into_iter()
.next()
.ok_or_else(|| pegboard::errors::Actor::NotFound.build())?;

return Ok(GetOrCreateResponse {
actor,
created: false,
});
}

// Re-throw the original error if it's not a DuplicateKey
Err(err)
}
}
}

/// Helper function to extract the existing actor ID from a duplicate key error
///
/// Returns Some(actor_id) if the error is a duplicate key error with metadata, None otherwise
pub fn extract_duplicate_key_error(err: &anyhow::Error) -> Option<Id> {
// Try to downcast to RivetError first (local calls)
let rivet_err = err.chain().find_map(|x| x.downcast_ref::<RivetError>());
if let Some(rivet_err) = rivet_err {
if rivet_err.group() == "actor" && rivet_err.code() == "duplicate_key" {
// Extract existing_actor_id from metadata
if let Some(metadata) = rivet_err.metadata() {
if let Some(actor_id_str) =
metadata.get("existing_actor_id").and_then(|v| v.as_str())
{
if let Ok(actor_id) = actor_id_str.parse::<Id>() {
return Some(actor_id);
}
}
}
}
}

// Try to downcast to RawErrorResponse (for remote API calls)
let raw_err = err
.chain()
.find_map(|x| x.downcast_ref::<rivet_api_builder::error_response::RawErrorResponse>());
if let Some(raw_err) = raw_err {
if raw_err.1.group == "actor" && raw_err.1.code == "duplicate_key" {
// Extract existing_actor_id from metadata (now available in ErrorResponse)
if let Some(metadata) = &raw_err.1.metadata {
if let Some(actor_id_str) =
metadata.get("existing_actor_id").and_then(|v| v.as_str())
{
if let Ok(actor_id) = actor_id_str.parse::<Id>() {
return Some(actor_id);
}
}
}
}
}

None
}
1 change: 1 addition & 0 deletions engine/packages/api-peer/src/actors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod create;
pub mod delete;
pub mod get_or_create;
pub mod kv_get;
pub mod list;
pub mod list_names;
1 change: 1 addition & 0 deletions engine/packages/api-peer/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub async fn router(
// MARK: Actors
.route("/actors", get(actors::list::list))
.route("/actors", post(actors::create::create))
.route("/actors", put(actors::get_or_create::get_or_create))
.route("/actors/{actor_id}", delete(actors::delete::delete))
.route("/actors/names", get(actors::list_names::list_names))
.route(
Expand Down
106 changes: 18 additions & 88 deletions engine/packages/api-public/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,13 @@ use rivet_api_builder::{
ApiError,
extract::{Extension, Json, Query},
};
use rivet_types::actors::CrashPolicy;
use rivet_util::Id;
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};
use rivet_api_types::actors::get_or_create::{
GetOrCreateQuery, GetOrCreateRequest, GetOrCreateResponse,
};
use rivet_api_util::request_remote_datacenter;

use crate::actors::utils;
use crate::ctx::ApiCtx;

#[derive(Debug, Deserialize, IntoParams)]
#[serde(deny_unknown_fields)]
#[into_params(parameter_in = Query)]
pub struct GetOrCreateQuery {
pub namespace: String,
}

#[derive(Deserialize, ToSchema)]
#[serde(deny_unknown_fields)]
#[schema(as = ActorsGetOrCreateRequest)]
pub struct GetOrCreateRequest {
pub datacenter: Option<String>,
pub name: String,
pub key: String,
pub input: Option<String>,
pub runner_name_selector: String,
pub crash_policy: CrashPolicy,
}

#[derive(Serialize, ToSchema)]
#[schema(as = ActorsGetOrCreateResponse)]
pub struct GetOrCreateResponse {
pub actor: rivet_types::actors::Actor,
pub created: bool,
}

/// ## Datacenter Round Trips
///
/// **If actor exists**
Expand Down Expand Up @@ -91,33 +64,13 @@ async fn get_or_create_inner(
) -> Result<GetOrCreateResponse> {
ctx.skip_auth();

// Resolve namespace
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

// Check if actor already exists for the key
// The get_for_key op uses global consistency and handles datacenter routing
let existing = ctx
.op(pegboard::ops::actor::get_for_key::Input {
namespace_id: namespace.namespace_id,
name: body.name.clone(),
key: body.key.clone(),
})
.await?;

if let Some(actor) = existing.actor {
// Actor exists, return it
return Ok(GetOrCreateResponse {
actor,
created: false,
});
}

// Actor doesn't exist for any key, create it
let target_dc_label = super::utils::find_dc_for_actor_creation(
&ctx,
namespace.namespace_id,
Expand All @@ -127,44 +80,21 @@ async fn get_or_create_inner(
)
.await?;

let actor_id = Id::new_v1(target_dc_label);
let query = GetOrCreateQuery {
namespace: query.namespace,
};

match ctx
.op(pegboard::ops::actor::create::Input {
actor_id,
namespace_id: namespace.namespace_id,
name: body.name.clone(),
key: Some(body.key.clone()),
runner_name_selector: body.runner_name_selector,
input: body.input.clone(),
crash_policy: body.crash_policy,
forward_request: true,
datacenter_name: body.datacenter.clone(),
})
if target_dc_label == ctx.config().dc_label() {
rivet_api_peer::actors::get_or_create::get_or_create(ctx.into(), (), query, body).await
} else {
request_remote_datacenter::<GetOrCreateResponse>(
ctx.config(),
target_dc_label,
"/actors",
axum::http::Method::PUT,
Some(&query),
Some(&body),
)
.await
{
Ok(res) => Ok(GetOrCreateResponse {
actor: res.actor,
created: true,
}),
Err(err) => {
// Check if this is a DuplicateKey error and extract the existing actor ID
if let Some(existing_actor_id) = utils::extract_duplicate_key_error(&err) {
tracing::info!(
?existing_actor_id,
"received duplicate key error, returning existing actor id"
);
let actor =
utils::fetch_actor_by_id(&ctx, existing_actor_id, query.namespace.clone())
.await?;
return Ok(GetOrCreateResponse {
actor,
created: false,
});
}

// Re-throw the original error if it's not a DuplicateKey
Err(err)
}
}
}
44 changes: 0 additions & 44 deletions engine/packages/api-public/src/actors/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use anyhow::Result;
use axum::http::Method;
use rivet_api_builder::ApiCtx;
use rivet_api_util::request_remote_datacenter;
use rivet_error::RivetError;
use rivet_types::actors::Actor;
use rivet_util::Id;
use std::collections::HashMap;
Expand Down Expand Up @@ -129,49 +128,6 @@ pub async fn fetch_actors_by_ids(
Ok(actors)
}

/// Helper function to extract the existing actor ID from a duplicate key error
///
/// Returns Some(actor_id) if the error is a duplicate key error with metadata, None otherwise
pub fn extract_duplicate_key_error(err: &anyhow::Error) -> Option<Id> {
// Try to downcast to RivetError first (local calls)
let rivet_err = err.chain().find_map(|x| x.downcast_ref::<RivetError>());
if let Some(rivet_err) = rivet_err {
if rivet_err.group() == "actor" && rivet_err.code() == "duplicate_key" {
// Extract existing_actor_id from metadata
if let Some(metadata) = rivet_err.metadata() {
if let Some(actor_id_str) =
metadata.get("existing_actor_id").and_then(|v| v.as_str())
{
if let Ok(actor_id) = actor_id_str.parse::<Id>() {
return Some(actor_id);
}
}
}
}
}

// Try to downcast to RawErrorResponse (for remote API calls)
let raw_err = err
.chain()
.find_map(|x| x.downcast_ref::<rivet_api_builder::error_response::RawErrorResponse>());
if let Some(raw_err) = raw_err {
if raw_err.1.group == "actor" && raw_err.1.code == "duplicate_key" {
// Extract existing_actor_id from metadata (now available in ErrorResponse)
if let Some(metadata) = &raw_err.1.metadata {
if let Some(actor_id_str) =
metadata.get("existing_actor_id").and_then(|v| v.as_str())
{
if let Ok(actor_id) = actor_id_str.parse::<Id>() {
return Some(actor_id);
}
}
}
}
}

None
}

/// Determine the datacenter label to create the actor in.
#[tracing::instrument(skip_all)]
pub async fn find_dc_for_actor_creation(
Expand Down
30 changes: 30 additions & 0 deletions engine/packages/api-types/src/actors/get_or_create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use serde::{Deserialize, Serialize};
use utoipa::{IntoParams, ToSchema};

#[derive(Debug, Serialize, Deserialize, IntoParams)]
#[serde(deny_unknown_fields)]
#[into_params(parameter_in = Query)]
pub struct GetOrCreateQuery {
pub namespace: String,
}

#[derive(Serialize, Deserialize, ToSchema)]
#[serde(deny_unknown_fields)]
#[schema(as = ActorsGetOrCreateRequest)]
pub struct GetOrCreateRequest {
// Ignored in api-peer
pub datacenter: Option<String>,
pub name: String,
pub key: String,
pub input: Option<String>,
pub runner_name_selector: String,
pub crash_policy: rivet_types::actors::CrashPolicy,
}

#[derive(Serialize, Deserialize, ToSchema)]
#[serde(deny_unknown_fields)]
#[schema(as = ActorsGetOrCreateResponse)]
pub struct GetOrCreateResponse {
pub actor: rivet_types::actors::Actor,
pub created: bool,
}
1 change: 1 addition & 0 deletions engine/packages/api-types/src/actors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod create;
pub mod get_or_create;
pub mod list;
pub mod list_names;
Loading
Loading