diff --git a/crates/turborepo-cache/src/async_cache.rs b/crates/turborepo-cache/src/async_cache.rs index b3e463c80848f..c28cd4eb6d08a 100644 --- a/crates/turborepo-cache/src/async_cache.rs +++ b/crates/turborepo-cache/src/async_cache.rs @@ -1,10 +1,7 @@ use std::sync::{atomic::AtomicU8, Arc}; use futures::{stream::FuturesUnordered, StreamExt}; -use tokio::{ - sync::{mpsc, Semaphore}, - task::JoinHandle, -}; +use tokio::sync::{mpsc, Semaphore}; use tracing::{warn, Instrument, Level}; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf, AnchoredSystemPathBuf}; use turborepo_analytics::AnalyticsSender; @@ -14,10 +11,10 @@ use crate::{multiplexer::CacheMultiplexer, CacheError, CacheHitMetadata, CacheOp const WARNING_CUTOFF: u8 = 4; +#[derive(Clone)] pub struct AsyncCache { real_cache: Arc, writer_sender: mpsc::Sender, - writer_thread: JoinHandle<()>, } enum WorkerRequest { @@ -50,7 +47,7 @@ impl AsyncCache { // start a task to manage workers let worker_real_cache = real_cache.clone(); - let writer_thread = tokio::spawn(async move { + tokio::spawn(async move { let semaphore = Arc::new(Semaphore::new(max_workers)); let mut workers = FuturesUnordered::new(); let real_cache = worker_real_cache; @@ -108,7 +105,6 @@ impl AsyncCache { Ok(AsyncCache { real_cache, writer_sender, - writer_thread, }) } @@ -163,11 +159,6 @@ impl AsyncCache { // Wait until flush callback is finished rx.await.ok(); } - - pub async fn shutdown(self) { - let Self { writer_thread, .. } = self; - writer_thread.await.unwrap(); - } } #[cfg(test)]