diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 2c930ba7..06d950f4 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -173,16 +173,30 @@ where tokio::select! { biased; - Some(batch_info) = self.batch_receiver.recv(), if self.futures.len() < DERIVATION_PIPELINE_WORKER_CONCURRENCY => { - let fut = self.derivation_future(batch_info); - self.futures.push_back(fut); + maybe_batch = self.batch_receiver.recv(), if self.futures.len() < DERIVATION_PIPELINE_WORKER_CONCURRENCY => { + match maybe_batch { + Some(batch_info) => { + let fut = self.derivation_future(batch_info); + self.futures.push_back(fut); + } + None => { + tracing::info!(target: "scroll::derivation_pipeline", "Batch channel closed, shutting down derivation pipeline worker"); + break; + } + } + } Some(result) = self.futures.next() => { match result { - Ok(res) => self.result_sender.send(res).expect("Failed to send batch derivation result"), + Ok(res) => { + if self.result_sender.send(res).is_err() { + tracing::info!(target: "scroll::derivation_pipeline", "Result channel closed, shutting down derivation pipeline worker"); + break; + } + } Err((batch_info, err)) => { - tracing::error!(target: "scroll::derivation_pipeline", ?batch_info, ?err, "Failed to derive payload attributes"); - self.futures.push_front(self.derivation_future(batch_info)); + tracing::error!(target: "scroll::derivation_pipeline", ?batch_info, ?err, "Failed to derive payload attributes"); + self.futures.push_front(self.derivation_future(batch_info)); } } }