Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion out/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 13 additions & 8 deletions packages/core/api-peer/src/runner_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ pub struct UpsertRequest(pub rivet_api_types::namespaces::runner_configs::Runner

#[derive(Deserialize, Serialize, ToSchema)]
#[schema(as = RunnerConfigsUpsertResponse)]
pub struct UpsertResponse {}
pub struct UpsertResponse {
pub endpoint_config_changed: bool,
}

#[tracing::instrument(skip_all)]
pub async fn upsert(
Expand All @@ -109,14 +111,17 @@ pub async fn upsert(
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

ctx.op(namespace::ops::runner_config::upsert::Input {
namespace_id: namespace.namespace_id,
name: path.runner_name,
config: body.0.into(),
})
.await?;
let endpoint_config_changed = ctx
.op(namespace::ops::runner_config::upsert::Input {
namespace_id: namespace.namespace_id,
name: path.runner_name,
config: body.0.into(),
})
.await?;

Ok(UpsertResponse {})
Ok(UpsertResponse {
endpoint_config_changed,
})
}

#[derive(Debug, Serialize, Deserialize, IntoParams)]
Expand Down
58 changes: 35 additions & 23 deletions packages/core/api-public/src/runner_configs/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,19 @@ async fn upsert_inner(

tracing::debug!(runner_name = ?path.runner_name, datacenters_count = body.datacenters.len(), "starting upsert");

// Resolve namespace
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

// Store serverless config before processing (since we'll remove from body.datacenters)
let serverless_config = body
.datacenters
.iter()
.filter_map(|(dc_name, runner_config)| {
.filter_map(|(_dc_name, runner_config)| {
if let rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Serverless {
url,
headers,
Expand All @@ -78,16 +86,17 @@ async fn upsert_inner(
.next();

// Apply config
let mut any_endpoint_config_changed = false;
for dc in &ctx.config().topology().datacenters {
if let Some(runner_config) = body.datacenters.remove(&dc.name) {
if ctx.config().dc_label() == dc.datacenter_label {
let response = if ctx.config().dc_label() == dc.datacenter_label {
rivet_api_peer::runner_configs::upsert(
ctx.clone().into(),
path.clone(),
query.clone(),
rivet_api_peer::runner_configs::UpsertRequest(runner_config),
)
.await?;
.await?
} else {
request_remote_datacenter::<UpsertResponse>(
ctx.config(),
Expand All @@ -97,7 +106,11 @@ async fn upsert_inner(
Some(&query),
Some(&runner_config),
)
.await?;
.await?
};

if response.endpoint_config_changed {
any_endpoint_config_changed = true;
}
} else {
if ctx.config().dc_label() == dc.datacenter_label {
Expand Down Expand Up @@ -125,28 +138,25 @@ async fn upsert_inner(
}
}

// Resolve namespace
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: query.namespace.clone(),
})
.await?
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())?;

// Update runner metadata
//
// This allows us to populate the actor names immediately upon configuring a serverless runner
if let Some((url, metadata_headers)) = serverless_config {
if let Err(err) = utils::refresh_runner_config_metadata(
ctx.clone(),
namespace.namespace_id,
path.runner_name.clone(),
url,
metadata_headers,
)
.await
{
tracing::warn!(?err, runner_name = ?path.runner_name, "failed to refresh runner config metadata");
if any_endpoint_config_changed {
tracing::debug!("endpoint config changed, refreshing metadata");
if let Err(err) = utils::refresh_runner_config_metadata(
ctx.clone(),
namespace.namespace_id,
path.runner_name.clone(),
url,
metadata_headers,
)
.await
{
tracing::warn!(?err, runner_name = ?path.runner_name, "failed to refresh runner config metadata");
}
} else {
tracing::debug!("endpoint config unchanged, skipping metadata refresh");
}
}

Expand All @@ -160,5 +170,7 @@ async fn upsert_inner(
)
.await?;

Ok(UpsertResponse {})
Ok(UpsertResponse {
endpoint_config_changed: any_endpoint_config_changed,
})
}
41 changes: 34 additions & 7 deletions packages/services/namespace/src/ops/runner_config/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,49 @@ pub struct Input {
}

#[operation]
pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result<()> {
ctx.udb()?
pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result<bool> {
let endpoint_config_changed = ctx
.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());

let runner_config_key =
keys::runner_config::DataKey::new(input.namespace_id, input.name.clone());

// Delete previous index
if let Some(existing_config) = tx.read_opt(&runner_config_key, Serializable).await? {
// Check if config changed (for serverless, compare URL and headers)
let endpoint_config_changed = if let Some(existing_config) =
tx.read_opt(&runner_config_key, Serializable).await?
{
// Delete previous index
tx.delete(&keys::runner_config::ByVariantKey::new(
input.namespace_id,
runner_config_variant(&existing_config),
input.name.clone(),
));
}

// Check if serverless endpoint config changed
match (&existing_config.kind, &input.config.kind) {
(
RunnerConfigKind::Serverless {
url: old_url,
headers: old_headers,
..
},
RunnerConfigKind::Serverless {
url: new_url,
headers: new_headers,
..
},
) => old_url != new_url || old_headers != new_headers,
_ => {
// Config type changed or is not serverless
true
}
}
} else {
// New config
true
};

// Write new config
tx.write(&runner_config_key, input.config.clone())?;
Expand Down Expand Up @@ -104,7 +131,7 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -
}
}

Ok(Ok(()))
Ok(Ok(endpoint_config_changed))
})
.custom_instrument(tracing::info_span!("runner_config_upsert_tx"))
.await?
Expand All @@ -117,5 +144,5 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -
.await?;
}

Ok(())
Ok(endpoint_config_changed)
}
Loading