Skip to content

Commit

Permalink
Merge 85619bf into 66407e6
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Dec 20, 2022
2 parents 66407e6 + 85619bf commit 69e892b
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/turbo-tasks-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ include_dir = { version = "0.7.2", features = ["nightly"] }
jsonc-parser = { version = "0.21.0", features = ["serde"] }
mime = "0.3.16"
notify = "4.0.17"
parking_lot = "0.12.1"
serde = { version = "1.0.136", features = ["rc"] }
serde_json = "1.0.85"
tokio = "1.21.2"
Expand Down
11 changes: 11 additions & 0 deletions crates/turbo-tasks-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod attach;
pub mod embed;
pub mod glob;
mod invalidator_map;
mod mutex_map;
mod read_glob;
mod retry;
pub mod rope;
Expand Down Expand Up @@ -51,6 +52,7 @@ use turbo_tasks::{
use turbo_tasks_hash::hash_xxh3_hash64;
use util::{join_path, normalize_path, sys_to_unix, unix_to_sys};

use self::mutex_map::MutexMap;
#[cfg(target_family = "windows")]
use crate::util::is_windows_raw_path;
use crate::{
Expand All @@ -77,6 +79,9 @@ pub struct DiskFileSystem {
pub name: String,
pub root: String,
#[turbo_tasks(debug_ignore, trace_ignore)]
#[serde(skip)]
mutex_map: MutexMap<PathBuf>,
#[turbo_tasks(debug_ignore, trace_ignore)]
invalidator_map: Arc<InvalidatorMap>,
#[turbo_tasks(debug_ignore, trace_ignore)]
dir_invalidator_map: Arc<InvalidatorMap>,
Expand Down Expand Up @@ -279,6 +284,7 @@ impl DiskFileSystemVc {
let instance = DiskFileSystem {
name,
root,
mutex_map: Default::default(),
invalidator_map: Arc::new(InvalidatorMap::new()),
dir_invalidator_map: Arc::new(InvalidatorMap::new()),
watcher: Mutex::new(None),
Expand All @@ -301,6 +307,7 @@ impl FileSystem for DiskFileSystem {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path, true);

let _lock = self.mutex_map.lock(full_path.clone()).await;
let content = match retry_future(|| File::from_path(full_path.clone())).await {
Ok(file) => FileContent::new(file),
Err(e) if e.kind() == ErrorKind::NotFound => FileContent::NotFound,
Expand Down Expand Up @@ -371,6 +378,7 @@ impl FileSystem for DiskFileSystem {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path, true);

let _lock = self.mutex_map.lock(full_path.clone()).await;
let link_path = match retry_future(|| fs::read_link(&full_path)).await {
Ok(res) => res,
Err(_) => return Ok(LinkContent::NotFound.cell()),
Expand Down Expand Up @@ -470,6 +478,7 @@ impl FileSystem for DiskFileSystem {
if *content == *old_content {
return Ok(CompletionVc::unchanged());
}
let _lock = self.mutex_map.lock(full_path.clone()).await;

let create_directory = *old_content == FileContent::NotFound;
match &*content {
Expand Down Expand Up @@ -550,6 +559,7 @@ impl FileSystem for DiskFileSystem {
})?;
}
}
let _lock = self.mutex_map.lock(full_path.clone()).await;
match &*target_link {
LinkContent::Link { target, link_type } => {
let link_type = *link_type;
Expand Down Expand Up @@ -601,6 +611,7 @@ impl FileSystem for DiskFileSystem {
let full_path = self.to_sys_path(fs_path).await?;
self.register_invalidator(&full_path, true);

let _lock = self.mutex_map.lock(full_path.clone()).await;
let meta = retry_future(|| fs::metadata(full_path.clone()))
.await
.with_context(|| format!("reading metadata for {}", full_path.display()))?;
Expand Down
108 changes: 108 additions & 0 deletions crates/turbo-tasks-fs/src/mutex_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use std::{
collections::{hash_map::Entry, HashMap},
hash::Hash,
marker::PhantomData,
};

use parking_lot::Mutex;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use turbo_tasks::event::Event;

pub struct MutexMap<K> {
map: Mutex<HashMap<K, Option<(Event, usize)>>>,
}

impl<K> Default for MutexMap<K> {
fn default() -> Self {
Self {
map: Mutex::new(HashMap::new()),
}
}
}

impl<'a, K: Eq + Hash + Clone> MutexMap<K> {
pub async fn lock(&'a self, key: K) -> MutexMapGuard<'a, K> {
let listener = {
let mut map = self.map.lock();
match map.entry(key.clone()) {
Entry::Occupied(mut e) => {
let state = e.get_mut();
Some(match state {
Some((event, count)) => {
*count += 1;
event.listen()
}
None => {
let event = Event::new(|| "MutexMap".to_string());
let listener = event.listen();
*state = Some((event, 0));
listener
}
})
}
Entry::Vacant(e) => {
e.insert(None);
None
}
}
};
if let Some(listener) = listener {
listener.await;
}
MutexMapGuard {
map: self,
key: Some(key),
}
}
}

pub struct MutexMapGuard<'a, K: Eq + Hash> {
map: &'a MutexMap<K>,
key: Option<K>,
}

impl<'a, K: Eq + Hash> Drop for MutexMapGuard<'a, K> {
fn drop(&mut self) {
if let Some(key) = self.key.take() {
let mut map = self.map.map.lock();
if let Entry::Occupied(mut e) = map.entry(key) {
let value = e.get_mut();
match value {
Some((event, count)) => {
event.notify(1);
if *count == 0 {
*value = None;
} else {
*count -= 1;
}
}
None => {
e.remove();
}
}
}
}
}
}

impl<K> Serialize for MutexMap<K> {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
serializer.serialize_unit()
}
}

impl<'de, K> Deserialize<'de> for MutexMap<K> {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
struct Visitor<K>(PhantomData<MutexMap<K>>);
impl<'de, K> serde::de::Visitor<'de> for Visitor<K> {
type Value = MutexMap<K>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("a unit")
}
fn visit_unit<E: serde::de::Error>(self) -> Result<Self::Value, E> {
Ok(MutexMap::default())
}
}
deserializer.deserialize_unit(Visitor(std::marker::PhantomData))
}
}
6 changes: 3 additions & 3 deletions crates/turbo-tasks/src/completion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ impl CompletionVc {
/// new invalidation.
pub fn unchanged() -> Self {
// This is the same code that CompletionVc::cell uses except that it
// in fact compares the cell (CompletionVc::cell opted-out of
// only updates the cell when it is empty (CompletionVc::cell opted-out of
// that via `#[turbo_tasks::value(cell = "new")]`)
let cell = turbo_tasks::macro_helpers::find_cell_by_type(*COMPLETIONS_VALUE_TYPE_ID);
cell.compare_and_update_shared(Completion);
let cell = turbo_tasks::macro_helpers::find_cell_by_type(*COMPLETION_VALUE_TYPE_ID);
cell.conditional_update_shared(|old| old.is_none().then_some(Completion));
let raw: RawVc = cell.into();
raw.into()
}
Expand Down

0 comments on commit 69e892b

Please sign in to comment.