From 682d67430835e101e3c90074fb7e7a3e89f919a9 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 2 May 2025 12:03:10 +0200 Subject: [PATCH 1/6] flush key spaces when done with them --- turbopack/crates/turbo-persistence/src/lib.rs | 1 + .../turbo-persistence/src/write_batch.rs | 45 +++++++++++++++++-- .../src/database/noop_kv.rs | 8 ++++ .../turbo-tasks-backend/src/database/turbo.rs | 4 ++ .../src/database/write_batch.rs | 33 ++++++++++++++ .../src/kv_backing_storage.rs | 8 ++++ 6 files changed, 96 insertions(+), 3 deletions(-) diff --git a/turbopack/crates/turbo-persistence/src/lib.rs b/turbopack/crates/turbo-persistence/src/lib.rs index fd4473e6102f8..a1a75ca772647 100644 --- a/turbopack/crates/turbo-persistence/src/lib.rs +++ b/turbopack/crates/turbo-persistence/src/lib.rs @@ -1,6 +1,7 @@ #![feature(once_cell_try)] #![feature(new_zeroed_alloc)] #![feature(get_mut_unchecked)] +#![feature(sync_unsafe_cell)] mod arc_slice; mod collector; diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 7039280570889..c8ec3789a9ce9 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -1,5 +1,5 @@ use std::{ - cell::UnsafeCell, + cell::SyncUnsafeCell, fs::File, io::Write, mem::{replace, take}, @@ -68,7 +68,7 @@ pub struct WriteBatch { /// The current sequence number counter. Increased for every new SST file or blob file. current_sequence_number: AtomicU32, /// The thread local state. - thread_locals: ThreadLocal>>, + thread_locals: ThreadLocal>>, /// Collectors in use. The thread local collectors flush into these when they are full. collectors: [Mutex>; FAMILIES], /// The list of new SST files that have been created. @@ -109,7 +109,7 @@ impl WriteBatch { #[allow(clippy::mut_from_ref)] fn thread_local_state(&self) -> &mut ThreadLocalState { let cell = self.thread_locals.get_or(|| { - UnsafeCell::new(ThreadLocalState { + SyncUnsafeCell::new(ThreadLocalState { collectors: [const { None }; FAMILIES], new_blob_files: Vec::new(), }) @@ -219,6 +219,45 @@ impl WriteBatch { Ok(()) } + /// Flushes a family of the write batch, reducing the amount of buffered memory used. + /// Does not commit any data persistently. + /// + /// Safety: Caller must ensure that no concurrent put or delete operation is happening on the + /// flushed family. + pub unsafe fn flush(&self, family: u32) -> Result<()> { + let mut collectors = Vec::new(); + for cell in self.thread_locals.iter() { + let state = unsafe { &mut *cell.get() }; + if let Some(collector) = state.collectors[usize_from_u32(family)].take() { + if !collector.is_empty() { + collectors.push(collector); + } + } + } + + let shared_error = Mutex::new(Ok(())); + scope(|scope| { + for mut collector in collectors { + let this = &self; + let shared_error = &shared_error; + let span = Span::current(); + scope.spawn(move |_| { + let _span = span.entered(); + if let Err(err) = + this.flush_thread_local_collector(family as u32, &mut collector) + { + *shared_error.lock() = Err(err); + } + this.idle_thread_local_collectors.lock().push(collector); + }); + } + }); + + shared_error.into_inner()?; + + Ok(()) + } + /// Finishes the write batch by returning the new sequence number and the new SST files. This /// writes all outstanding thread local data to disk. pub(crate) fn finish(&mut self) -> Result { diff --git a/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs b/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs index 050140198e59c..fbbd310efbb71 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/noop_kv.rs @@ -90,6 +90,10 @@ impl SerialWriteBatch<'_> for NoopWriteBatch { fn delete(&mut self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> { Ok(()) } + + fn flush(&mut self, _key_space: KeySpace) -> Result<()> { + Ok(()) + } } impl ConcurrentWriteBatch<'_> for NoopWriteBatch { @@ -105,4 +109,8 @@ impl ConcurrentWriteBatch<'_> for NoopWriteBatch { fn delete(&self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> { Ok(()) } + + unsafe fn flush(&self, _key_space: KeySpace) -> Result<()> { + Ok(()) + } } diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs index 5a90e3e4bf663..1df9d269abf87 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo.rs @@ -160,6 +160,10 @@ impl<'a> ConcurrentWriteBatch<'a> for TurboWriteBatch<'a> { fn delete(&self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()> { self.batch.delete(key_space as u32, key.into_static()) } + + unsafe fn flush(&self, key_space: KeySpace) -> Result<()> { + self.batch.flush(key_space as u32) + } } impl KeyBase for WriteBuffer<'_> { diff --git a/turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs b/turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs index 6754a4ebb69a3..a69f47e8746b0 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/write_batch.rs @@ -66,11 +66,18 @@ pub trait SerialWriteBatch<'a>: BaseWriteBatch<'a> { value: WriteBuffer<'_>, ) -> Result<()>; fn delete(&mut self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()>; + fn flush(&mut self, key_space: KeySpace) -> Result<()>; } pub trait ConcurrentWriteBatch<'a>: BaseWriteBatch<'a> + Sync + Send { fn put(&self, key_space: KeySpace, key: WriteBuffer<'_>, value: WriteBuffer<'_>) -> Result<()>; fn delete(&self, key_space: KeySpace, key: WriteBuffer<'_>) -> Result<()>; + /// Flushes a key space of the write batch, reducing the amount of buffered memory used. + /// Does not commit any data persistently. + /// + /// Safety: Caller must ensure that no concurrent put or delete operation is happening on the + /// flushed key space. + unsafe fn flush(&self, key_space: KeySpace) -> Result<()>; } pub enum WriteBatch<'a, S, C> @@ -164,6 +171,16 @@ where WriteBatch::Concurrent(c, _) => c.delete(key_space, key), } } + + fn flush(&mut self, key_space: KeySpace) -> Result<()> { + match self { + WriteBatch::Serial(s) => s.flush(key_space), + WriteBatch::Concurrent(c, _) => { + // Safety: the &mut self ensures that no concurrent operation is happening + unsafe { c.flush(key_space) } + } + } + } } pub enum WriteBatchRef<'r, 'a, S, C> @@ -241,6 +258,16 @@ where WriteBatchRef::Concurrent(c, _) => c.delete(key_space, key), } } + + fn flush(&mut self, key_space: KeySpace) -> Result<()> { + match self { + WriteBatchRef::Serial(s) => s.flush(key_space), + WriteBatchRef::Concurrent(c, _) => { + // Safety: the &mut self ensures that no concurrent operation is happening + unsafe { c.flush(key_space) } + } + } + } } pub struct UnimplementedWriteBatch; @@ -275,6 +302,9 @@ impl SerialWriteBatch<'_> for UnimplementedWriteBatch { fn delete(&mut self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> { todo!() } + fn flush(&mut self, _key_space: KeySpace) -> Result<()> { + todo!() + } } impl ConcurrentWriteBatch<'_> for UnimplementedWriteBatch { @@ -289,4 +319,7 @@ impl ConcurrentWriteBatch<'_> for UnimplementedWriteBatch { fn delete(&self, _key_space: KeySpace, _key: WriteBuffer<'_>) -> Result<()> { todo!() } + unsafe fn flush(&self, _key_space: KeySpace) -> Result<()> { + todo!() + } } diff --git a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs index 76086f8a004bb..90c3b5c0334be 100644 --- a/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs +++ b/turbopack/crates/turbo-tasks-backend/src/kv_backing_storage.rs @@ -184,6 +184,13 @@ impl BackingStorage { let _span = tracing::trace_span!("update task data").entered(); process_task_data(snapshots, Some(batch))?; + [KeySpace::TaskMeta, KeySpace::TaskData] + .into_par_iter() + .try_for_each(|key_space| { + // Safety: We already finished all processing of the task data and task + // meta + unsafe { batch.flush(key_space) } + })?; } let mut next_task_id = get_next_free_task_id::< @@ -500,6 +507,7 @@ where ) .with_context(|| anyhow!("Unable to write operations"))?; } + batch.flush(KeySpace::Infra)?; Ok(()) } From cc146f43deeb1a485dfda97e022bc30afab7de7e Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 2 May 2025 16:05:25 +0200 Subject: [PATCH 2/6] flush fixup --- .../turbo-persistence/src/write_batch.rs | 68 +++++++++++++------ 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index c8ec3789a9ce9..5c9d64f25af7d 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -185,7 +185,7 @@ impl WriteBatch { let sst = self.create_sst_file(family, global_collector.sorted())?; global_collector.clear(); self.new_sst_files.lock().push(sst); - self.idle_collectors.lock().push(global_collector); + self.dispose_collector(global_collector); } Ok(()) } @@ -197,6 +197,14 @@ impl WriteBatch { .unwrap_or_else(|| Collector::new()) } + fn dispose_collector(&self, collector: Collector) { + self.idle_collectors.lock().push(collector); + } + + fn dispose_thread_local_collector(&self, collector: Collector) { + self.idle_thread_local_collectors.lock().push(collector); + } + /// Puts a key-value pair into the write batch. pub fn put(&self, family: u32, key: K, value: ValueBuffer<'_>) -> Result<()> { let state = self.thread_local_state(); @@ -224,7 +232,9 @@ impl WriteBatch { /// /// Safety: Caller must ensure that no concurrent put or delete operation is happening on the /// flushed family. + #[tracing::instrument(level = "trace", skip(self))] pub unsafe fn flush(&self, family: u32) -> Result<()> { + // Flush the thread local collectors to the global collector. let mut collectors = Vec::new(); for cell in self.thread_locals.iter() { let state = unsafe { &mut *cell.get() }; @@ -235,25 +245,43 @@ impl WriteBatch { } } - let shared_error = Mutex::new(Ok(())); - scope(|scope| { - for mut collector in collectors { - let this = &self; - let shared_error = &shared_error; - let span = Span::current(); - scope.spawn(move |_| { - let _span = span.entered(); - if let Err(err) = - this.flush_thread_local_collector(family as u32, &mut collector) - { - *shared_error.lock() = Err(err); + let span = Span::current(); + collectors.into_par_iter().try_for_each(|mut collector| { + let _span = span.clone().entered(); + self.flush_thread_local_collector(family, &mut collector)?; + self.dispose_thread_local_collector(collector); + anyhow::Ok(()) + })?; + + // Now we flush the global collector(s). + let mut collector_state = self.collectors[usize_from_u32(family)].lock(); + match &mut *collector_state { + GlobalCollectorState::Unsharded(collector) => { + if !collector.is_empty() { + let sst = self.create_sst_file(family, collector.sorted())?; + collector.clear(); + self.new_sst_files.lock().push(sst); + } + } + GlobalCollectorState::Sharded(_) => { + let GlobalCollectorState::Sharded(shards) = replace( + &mut *collector_state, + GlobalCollectorState::Unsharded(self.get_new_collector()), + ) else { + unreachable!(); + }; + shards.into_par_iter().try_for_each(|mut collector| { + let _span = span.clone().entered(); + if !collector.is_empty() { + let sst = self.create_sst_file(family, collector.sorted())?; + collector.clear(); + self.new_sst_files.lock().push(sst); + self.dispose_collector(collector); } - this.idle_thread_local_collectors.lock().push(collector); - }); + anyhow::Ok(()) + })?; } - }); - - shared_error.into_inner()?; + } Ok(()) } @@ -288,7 +316,7 @@ impl WriteBatch { { *shared_error.lock() = Err(err); } - this.idle_thread_local_collectors.lock().push(collector); + this.dispose_thread_local_collector(collector); }); } } @@ -322,7 +350,7 @@ impl WriteBatch { if !collector.is_empty() { let sst = self.create_sst_file(family, collector.sorted())?; collector.clear(); - self.idle_collectors.lock().push(collector); + self.dispose_collector(collector); shared_new_sst_files.lock().push(sst); } anyhow::Ok(()) From fccae99b02ac108a1d1be022fd7a296da67bb406 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 2 May 2025 16:03:53 +0200 Subject: [PATCH 3/6] test for flush --- .../crates/turbo-persistence/src/tests.rs | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/turbopack/crates/turbo-persistence/src/tests.rs b/turbopack/crates/turbo-persistence/src/tests.rs index f74adf6e95ffd..08d2bbc1b878f 100644 --- a/turbopack/crates/turbo-persistence/src/tests.rs +++ b/turbopack/crates/turbo-persistence/src/tests.rs @@ -48,6 +48,28 @@ fn full_cycle() -> Result<()> { }, ); + test_case( + &mut test_cases, + "Many SST files", + |batch| { + for i in 10..100u8 { + batch.put(0, vec![i], vec![i].into())?; + unsafe { batch.flush(0)? }; + } + Ok(()) + }, + |db| { + let Some(value) = db.get(0, &[42u8])? else { + panic!("Value not found"); + }; + assert_eq!(&*value, &[42]); + assert_eq!(db.get(0, &[42u8, 42])?, None); + assert_eq!(db.get(0, &[1u8])?, None); + assert_eq!(db.get(0, &[255u8])?, None); + Ok(()) + }, + ); + test_case( &mut test_cases, "Families", From 13ba4aa0aeb54b76544da257cc850d6f367f7629 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 2 May 2025 16:11:06 +0200 Subject: [PATCH 4/6] undo tracing --- turbopack/crates/turbo-persistence/src/write_batch.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 5c9d64f25af7d..c6475baf8a40a 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -232,7 +232,6 @@ impl WriteBatch { /// /// Safety: Caller must ensure that no concurrent put or delete operation is happening on the /// flushed family. - #[tracing::instrument(level = "trace", skip(self))] pub unsafe fn flush(&self, family: u32) -> Result<()> { // Flush the thread local collectors to the global collector. let mut collectors = Vec::new(); From 41ad89755cbbda41452931525fbf1330e7852b55 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 6 May 2025 08:21:55 +0200 Subject: [PATCH 5/6] add missing import --- Cargo.lock | 1 + turbopack/crates/turbo-persistence/Cargo.toml | 1 + turbopack/crates/turbo-persistence/src/write_batch.rs | 1 + 3 files changed, 3 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 23f125873ee98..8a970397ac972 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9334,6 +9334,7 @@ dependencies = [ "smallvec", "tempfile", "thread_local", + "tracing", "twox-hash 2.1.0", "zstd", ] diff --git a/turbopack/crates/turbo-persistence/Cargo.toml b/turbopack/crates/turbo-persistence/Cargo.toml index abd656d3fcb87..c0ae5159467a8 100644 --- a/turbopack/crates/turbo-persistence/Cargo.toml +++ b/turbopack/crates/turbo-persistence/Cargo.toml @@ -25,6 +25,7 @@ rustc-hash = { workspace = true } serde = { workspace = true } smallvec = { workspace = true} thread_local = { workspace = true } +tracing = { workspace = true } twox-hash = { version = "2.0.1", features = ["xxhash64"] } zstd = { version = "0.13.2", features = ["zdict_builder"] } diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index c6475baf8a40a..6cbc91682fdee 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -17,6 +17,7 @@ use rayon::{ }; use smallvec::SmallVec; use thread_local::ThreadLocal; +use tracing::Span; use crate::{ collector::Collector, From 7749b8c10f5cc26e4382602d2f465480792f5eab Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 6 May 2025 09:00:08 +0200 Subject: [PATCH 6/6] clippy --- turbopack/crates/turbo-persistence/src/write_batch.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 6cbc91682fdee..2f234d2eac5b7 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -231,8 +231,10 @@ impl WriteBatch { /// Flushes a family of the write batch, reducing the amount of buffered memory used. /// Does not commit any data persistently. /// - /// Safety: Caller must ensure that no concurrent put or delete operation is happening on the - /// flushed family. + /// # Safety + /// + /// Caller must ensure that no concurrent put or delete operation is happening on the flushed + /// family. pub unsafe fn flush(&self, family: u32) -> Result<()> { // Flush the thread local collectors to the global collector. let mut collectors = Vec::new();