From 17d427cfc1c88128da4e2f25f13aeff11d177c1d Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Mon, 21 Jul 2025 17:45:15 +0000 Subject: [PATCH] fix: clean up worker threads in the event of an error --- .../common/chirp-workflow/core/src/worker.rs | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/packages/common/chirp-workflow/core/src/worker.rs b/packages/common/chirp-workflow/core/src/worker.rs index 8b2b2520d0..0779323840 100644 --- a/packages/common/chirp-workflow/core/src/worker.rs +++ b/packages/common/chirp-workflow/core/src/worker.rs @@ -83,6 +83,18 @@ impl Worker { loop { tokio::select! { _ = tick_interval.tick() => {}, + res = wake_sub.next() => { + if res.is_none() { + // Cancel background tasks + gc_handle.abort(); + metrics_handle.abort(); + + return Err(WorkflowError::SubscriptionUnsubscribed.into()); + } + + tick_interval.reset(); + }, + res = &mut gc_handle => { tracing::error!(?res, "metrics task unexpectedly stopped"); break; @@ -91,18 +103,17 @@ impl Worker { tracing::error!(?res, "metrics task unexpectedly stopped"); break; }, - res = wake_sub.next() => { - if res.is_none() { - return Err(WorkflowError::SubscriptionUnsubscribed.into()); - } - - tick_interval.reset(); - }, _ = ctrl_c() => break, _ = sigterm.recv() => break, } - self.tick(&shared_client, &config, &pools, &cache).await?; + if let Err(err) = self.tick(&shared_client, &config, &pools, &cache).await { + // Cancel background tasks + gc_handle.abort(); + metrics_handle.abort(); + + return Err(err); + } } self.shutdown(sigterm).await; @@ -181,6 +192,7 @@ impl Worker { tracing::info!("shutdown complete"); + // This will stop all background tasks because the tokio runtime will stop rivet_runtime::shutdown().await; }