From 76582e93cead3cf040e012d300503de3ebe93851 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 20 Dec 2022 16:21:32 +0100 Subject: [PATCH 1/2] restrict concurrency to 1 read/write operation per file per filesystem this avoids that a file is read while it is still being written --- Cargo.lock | 1 + crates/turbo-tasks-fs/Cargo.toml | 1 + crates/turbo-tasks-fs/src/lib.rs | 11 +++ crates/turbo-tasks-fs/src/mutex_map.rs | 108 +++++++++++++++++++++++++ 4 files changed, 121 insertions(+) create mode 100644 crates/turbo-tasks-fs/src/mutex_map.rs diff --git a/Cargo.lock b/Cargo.lock index 7ff3e38c1e46a..94fcbc7be2a58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7196,6 +7196,7 @@ dependencies = [ "jsonc-parser", "mime", "notify", + "parking_lot", "rstest", "serde", "serde_json", diff --git a/crates/turbo-tasks-fs/Cargo.toml b/crates/turbo-tasks-fs/Cargo.toml index c2d7c4ae2881e..821a7d48f13ce 100644 --- a/crates/turbo-tasks-fs/Cargo.toml +++ b/crates/turbo-tasks-fs/Cargo.toml @@ -24,6 +24,7 @@ include_dir = { version = "0.7.2", features = ["nightly"] } jsonc-parser = { version = "0.21.0", features = ["serde"] } mime = "0.3.16" notify = "4.0.17" +parking_lot = "0.12.1" serde = { version = "1.0.136", features = ["rc"] } serde_json = "1.0.85" tokio = "1.21.2" diff --git a/crates/turbo-tasks-fs/src/lib.rs b/crates/turbo-tasks-fs/src/lib.rs index 82fcd725867bf..397f6f38b1e71 100644 --- a/crates/turbo-tasks-fs/src/lib.rs +++ b/crates/turbo-tasks-fs/src/lib.rs @@ -9,6 +9,7 @@ pub mod attach; pub mod embed; pub mod glob; mod invalidator_map; +mod mutex_map; mod read_glob; mod retry; pub mod rope; @@ -51,6 +52,7 @@ use turbo_tasks::{ use turbo_tasks_hash::hash_xxh3_hash64; use util::{join_path, normalize_path, sys_to_unix, unix_to_sys}; +use self::mutex_map::MutexMap; #[cfg(target_family = "windows")] use crate::util::is_windows_raw_path; use crate::{ @@ -77,6 +79,9 @@ pub struct DiskFileSystem { pub name: String, pub root: String, #[turbo_tasks(debug_ignore, trace_ignore)] + #[serde(skip)] + mutex_map: MutexMap, + #[turbo_tasks(debug_ignore, trace_ignore)] invalidator_map: Arc, #[turbo_tasks(debug_ignore, trace_ignore)] dir_invalidator_map: Arc, @@ -279,6 +284,7 @@ impl DiskFileSystemVc { let instance = DiskFileSystem { name, root, + mutex_map: Default::default(), invalidator_map: Arc::new(InvalidatorMap::new()), dir_invalidator_map: Arc::new(InvalidatorMap::new()), watcher: Mutex::new(None), @@ -301,6 +307,7 @@ impl FileSystem for DiskFileSystem { let full_path = self.to_sys_path(fs_path).await?; self.register_invalidator(&full_path, true); + let _lock = self.mutex_map.lock(full_path.clone()).await; let content = match retry_future(|| File::from_path(full_path.clone())).await { Ok(file) => FileContent::new(file), Err(e) if e.kind() == ErrorKind::NotFound => FileContent::NotFound, @@ -371,6 +378,7 @@ impl FileSystem for DiskFileSystem { let full_path = self.to_sys_path(fs_path).await?; self.register_invalidator(&full_path, true); + let _lock = self.mutex_map.lock(full_path.clone()).await; let link_path = match retry_future(|| fs::read_link(&full_path)).await { Ok(res) => res, Err(_) => return Ok(LinkContent::NotFound.cell()), @@ -470,6 +478,7 @@ impl FileSystem for DiskFileSystem { if *content == *old_content { return Ok(CompletionVc::unchanged()); } + let _lock = self.mutex_map.lock(full_path.clone()).await; let create_directory = *old_content == FileContent::NotFound; match &*content { @@ -550,6 +559,7 @@ impl FileSystem for DiskFileSystem { })?; } } + let _lock = self.mutex_map.lock(full_path.clone()).await; match &*target_link { LinkContent::Link { target, link_type } => { let link_type = *link_type; @@ -601,6 +611,7 @@ impl FileSystem for DiskFileSystem { let full_path = self.to_sys_path(fs_path).await?; self.register_invalidator(&full_path, true); + let _lock = self.mutex_map.lock(full_path.clone()).await; let meta = retry_future(|| fs::metadata(full_path.clone())) .await .with_context(|| format!("reading metadata for {}", full_path.display()))?; diff --git a/crates/turbo-tasks-fs/src/mutex_map.rs b/crates/turbo-tasks-fs/src/mutex_map.rs new file mode 100644 index 0000000000000..4103247c6a409 --- /dev/null +++ b/crates/turbo-tasks-fs/src/mutex_map.rs @@ -0,0 +1,108 @@ +use std::{ + collections::{hash_map::Entry, HashMap}, + hash::Hash, + marker::PhantomData, +}; + +use parking_lot::Mutex; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use turbo_tasks::event::Event; + +pub struct MutexMap { + map: Mutex>>, +} + +impl Default for MutexMap { + fn default() -> Self { + Self { + map: Mutex::new(HashMap::new()), + } + } +} + +impl<'a, K: Eq + Hash + Clone> MutexMap { + pub async fn lock(&'a self, key: K) -> MutexMapGuard<'a, K> { + let listener = { + let mut map = self.map.lock(); + match map.entry(key.clone()) { + Entry::Occupied(mut e) => { + let state = e.get_mut(); + Some(match state { + Some((event, count)) => { + *count += 1; + event.listen() + } + None => { + let event = Event::new(|| "MutexMap".to_string()); + let listener = event.listen(); + *state = Some((event, 0)); + listener + } + }) + } + Entry::Vacant(e) => { + e.insert(None); + None + } + } + }; + if let Some(listener) = listener { + listener.await; + } + MutexMapGuard { + map: self, + key: Some(key), + } + } +} + +pub struct MutexMapGuard<'a, K: Eq + Hash> { + map: &'a MutexMap, + key: Option, +} + +impl<'a, K: Eq + Hash> Drop for MutexMapGuard<'a, K> { + fn drop(&mut self) { + if let Some(key) = self.key.take() { + let mut map = self.map.map.lock(); + if let Entry::Occupied(mut e) = map.entry(key) { + let value = e.get_mut(); + match value { + Some((event, count)) => { + event.notify(1); + if *count == 0 { + *value = None; + } else { + *count -= 1; + } + } + None => { + e.remove(); + } + } + } + } + } +} + +impl Serialize for MutexMap { + fn serialize(&self, serializer: S) -> Result { + serializer.serialize_unit() + } +} + +impl<'de, K> Deserialize<'de> for MutexMap { + fn deserialize>(deserializer: D) -> Result { + struct Visitor(PhantomData>); + impl<'de, K> serde::de::Visitor<'de> for Visitor { + type Value = MutexMap; + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a unit") + } + fn visit_unit(self) -> Result { + Ok(MutexMap::default()) + } + } + deserializer.deserialize_unit(Visitor(std::marker::PhantomData)) + } +} From 85619bf1d8275834dcd3080bf621f51b9f96aad7 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Tue, 20 Dec 2022 16:22:01 +0100 Subject: [PATCH 2/2] fix CompletionVc::unchanged typo COMPLETIONS -> COMPLETION --- crates/turbo-tasks/src/completion.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/turbo-tasks/src/completion.rs b/crates/turbo-tasks/src/completion.rs index d7a23b0e3ae6c..23e998324e9fa 100644 --- a/crates/turbo-tasks/src/completion.rs +++ b/crates/turbo-tasks/src/completion.rs @@ -25,10 +25,10 @@ impl CompletionVc { /// new invalidation. pub fn unchanged() -> Self { // This is the same code that CompletionVc::cell uses except that it - // in fact compares the cell (CompletionVc::cell opted-out of + // only updates the cell when it is empty (CompletionVc::cell opted-out of // that via `#[turbo_tasks::value(cell = "new")]`) - let cell = turbo_tasks::macro_helpers::find_cell_by_type(*COMPLETIONS_VALUE_TYPE_ID); - cell.compare_and_update_shared(Completion); + let cell = turbo_tasks::macro_helpers::find_cell_by_type(*COMPLETION_VALUE_TYPE_ID); + cell.conditional_update_shared(|old| old.is_none().then_some(Completion)); let raw: RawVc = cell.into(); raw.into() }