diff --git a/engine/packages/gasoline/src/ctx/workflow.rs b/engine/packages/gasoline/src/ctx/workflow.rs index ed0b1b066e..870f2ba146 100644 --- a/engine/packages/gasoline/src/ctx/workflow.rs +++ b/engine/packages/gasoline/src/ctx/workflow.rs @@ -536,7 +536,7 @@ impl WorkflowCtx { pub(crate) fn check_stop(&self) -> WorkflowResult<()> { if self.stop.has_changed().unwrap_or(true) { - Err(WorkflowError::WorkflowStopped) + Err(WorkflowError::WorkflowEvicted) } else { Ok(()) } @@ -546,7 +546,7 @@ impl WorkflowCtx { // We have to clone here because this function can't have a mutable reference to self. The state of // the stop channel doesn't matter because it only ever receives one message let _ = self.stop.clone().changed().await; - Err(WorkflowError::WorkflowStopped) + Err(WorkflowError::WorkflowEvicted) } } diff --git a/engine/packages/gasoline/src/error.rs b/engine/packages/gasoline/src/error.rs index 7553046375..49ba8def6e 100644 --- a/engine/packages/gasoline/src/error.rs +++ b/engine/packages/gasoline/src/error.rs @@ -28,8 +28,8 @@ pub enum WorkflowError { #[error("workflow not found")] WorkflowNotFound, - #[error("workflow stopped")] - WorkflowStopped, + #[error("workflow evicted")] + WorkflowEvicted, #[error("history diverged: {0}")] HistoryDiverged(String), @@ -180,7 +180,7 @@ pub enum WorkflowError { impl WorkflowError { pub(crate) fn wake_immediate(&self) -> bool { - matches!(self, WorkflowError::WorkflowStopped) + matches!(self, WorkflowError::WorkflowEvicted) } /// Returns the next deadline for a workflow to be woken up again based on the error. @@ -225,7 +225,7 @@ impl WorkflowError { | WorkflowError::NoSignalFoundAndSleep(_, _) | WorkflowError::SubWorkflowIncomplete(_) | WorkflowError::Sleep(_) - | WorkflowError::WorkflowStopped => true, + | WorkflowError::WorkflowEvicted => true, _ => false, } } diff --git a/engine/packages/pegboard-serverless/src/lib.rs b/engine/packages/pegboard-serverless/src/lib.rs index a26b2a9408..32dd954bbf 100644 --- a/engine/packages/pegboard-serverless/src/lib.rs +++ b/engine/packages/pegboard-serverless/src/lib.rs @@ -217,6 +217,8 @@ async fn tick_runner_config( let drain_count = curr.len().saturating_sub(desired_count); let start_count = desired_count.saturating_sub(curr.len()); + tracing::debug!(%namespace_name, %runner_name, %desired_count, %drain_count, %start_count, "scaling"); + if drain_count != 0 { // TODO: Implement smart logic of draining runners with the lowest allocated actors let draining_connections = curr.split_off(desired_count); @@ -305,6 +307,8 @@ async fn outbound_handler( shutdown_rx: oneshot::Receiver<()>, draining: Arc, ) -> Result<()> { + tracing::debug!(%url, "sending outbound req"); + let current_dc = ctx.config().topology().current_dc()?; let client = rivet_pools::reqwest::client_no_timeout().await?; diff --git a/engine/sdks/typescript/runner/src/mod.ts b/engine/sdks/typescript/runner/src/mod.ts index ddbe239565..b735bf7d49 100644 --- a/engine/sdks/typescript/runner/src/mod.ts +++ b/engine/sdks/typescript/runner/src/mod.ts @@ -246,7 +246,7 @@ export class Runner { if (this.#started) throw new Error("Cannot call runner.start twice"); this.#started = true; - logger()?.info({ msg: "starting runner", runnerId: this.runnerId }); + logger()?.info({ msg: "starting runner" }); this.#tunnel.start(); @@ -266,12 +266,21 @@ export class Runner { logger()?.debug("received SIGINT"); this.shutdown(false, true); }); + + logger()?.debug({ + msg: "added SIGTERM listeners", + }); } + + logger()?.debug({ + msg: "current listeners", + listeners: process.listeners("SIGINT"), + }); } // MARK: Shutdown async shutdown(immediate: boolean, exit: boolean = false) { - logger()?.info({ msg: "starting shutdown...", runnerId: this.runnerId, immediate }); + logger()?.info({ msg: "starting shutdown", runnerId: this.runnerId, immediate }); this.#shutdown = true; // Clear reconnect timeout @@ -418,7 +427,7 @@ export class Runner { this.#pegboardWebSocket = ws; ws.addEventListener("open", () => { - logger()?.info({ msg: "Connected", runnerId: this.runnerId }); + logger()?.info({ msg: "Connected" }); // Reset reconnect attempt counter on successful connection this.#reconnectAttempt = 0;