diff --git a/engine/packages/pegboard/src/ops/runner_config/delete.rs b/engine/packages/pegboard/src/ops/runner_config/delete.rs index 7c19fc0ca4..b3d3c96b64 100644 --- a/engine/packages/pegboard/src/ops/runner_config/delete.rs +++ b/engine/packages/pegboard/src/ops/runner_config/delete.rs @@ -44,12 +44,18 @@ pub async fn pegboard_runner_config_delete(ctx: &OperationCtx, input: &Input) -> // Bump pool when a serverless config is modified if delete_pool { - ctx.signal(crate::workflows::runner_pool::Bump {}) + let res = ctx + .signal(crate::workflows::runner_pool::Bump {}) .to_workflow::() .tag("namespace_id", input.namespace_id) .tag("runner_name", input.name.clone()) + .graceful_not_found() .send() .await?; + + if res.is_none() { + tracing::debug!(namespace_id=?input.namespace_id, name=%input.name, "no runner pool workflow to bump"); + } } Ok(()) diff --git a/engine/packages/pegboard/src/ops/runner_config/upsert.rs b/engine/packages/pegboard/src/ops/runner_config/upsert.rs index 9c4f00ad72..09da081335 100644 --- a/engine/packages/pegboard/src/ops/runner_config/upsert.rs +++ b/engine/packages/pegboard/src/ops/runner_config/upsert.rs @@ -169,12 +169,27 @@ pub async fn pegboard_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> .dispatch() .await?; } else if input.config.affects_pool() { - ctx.signal(crate::workflows::runner_pool::Bump {}) + let res = ctx + .signal(crate::workflows::runner_pool::Bump {}) .to_workflow::() .tag("namespace_id", input.namespace_id) .tag("runner_name", input.name.clone()) + .graceful_not_found() .send() .await?; + + // Backfill + if res.is_none() { + ctx.workflow(crate::workflows::runner_pool::Input { + namespace_id: input.namespace_id, + runner_name: input.name.clone(), + }) + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.name.clone()) + .unique() + .dispatch() + .await?; + } } Ok(res.endpoint_config_changed) diff --git a/engine/packages/universalpubsub/src/chunking.rs b/engine/packages/universalpubsub/src/chunking.rs index 2d276233fc..03be93cf0b 100644 --- a/engine/packages/universalpubsub/src/chunking.rs +++ b/engine/packages/universalpubsub/src/chunking.rs @@ -103,7 +103,7 @@ impl ChunkTracker { .retain(|_, buffer| now.duration_since(buffer.last_chunk_ts) < CHUNK_BUFFER_MAX_AGE); let size_after = self.chunks_in_process.len(); - tracing::debug!( + tracing::trace!( ?size_before, ?size_after, "performed chunk buffer garbage collection"