Skip to content

Commit

Permalink
fix deadlock in macos backend
Browse files Browse the repository at this point in the history
  • Loading branch information
mtak- committed Sep 11, 2019
1 parent 2a580b2 commit 977ecd0
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 35 deletions.
92 changes: 57 additions & 35 deletions src/debounce/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use std::time::{Duration, Instant};

use debounce::OperationsBuffer;

struct TryLockError(ScheduledEvent);

#[derive(PartialEq, Eq)]
struct ScheduledEvent {
id: u64,
Expand All @@ -30,49 +32,69 @@ struct ScheduleWorker {

impl ScheduleWorker {
fn fire_due_events(&self, now: Instant) -> Option<Instant> {
let mut events = self.events.lock().unwrap();
while let Some(event) = events.pop_front() {
if event.when <= now {
self.fire_event(event)
} else {
// not due yet, put it back
let next_when = event.when;
events.push_front(event);
return Some(next_when);
'a: loop {
let mut events = self.events.lock().unwrap();
while let Some(event) = events.pop_front() {
if event.when <= now {
match self.fire_event(event) {
Ok(()) => {}
Err(TryLockError(event)) => {
events.push_front(event);
drop(events);
std::thread::yield_now();
continue 'a
}
}
} else {
// not due yet, put it back
let next_when = event.when;
events.push_front(event);
return Some(next_when);
}
}
break
}
None
}

fn fire_event(&self, ev: ScheduledEvent) {
let ScheduledEvent { path, .. } = ev;
if let Ok(ref mut op_buf) = self.operations_buffer.lock() {
if let Some((op, from_path, _)) = op_buf.remove(&path) {
let is_partial_rename = from_path.is_none();
if let Some(from_path) = from_path {
self.tx
.send(DebouncedEvent::Rename(from_path, path.clone()))
.unwrap();
}
let message = match op {
Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)),
Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)),
Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)),
Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)),
Some(op::Op::RENAME) if is_partial_rename => {
if path.exists() {
Some(DebouncedEvent::Create(path))
} else {
Some(DebouncedEvent::Remove(path))
fn fire_event(&self, ev: ScheduledEvent) -> Result<(), TryLockError> {
match self.operations_buffer.try_lock() {
Ok(ref mut op_buf) => {
let ScheduledEvent { path, .. } = ev;
if let Some((op, from_path, _)) = op_buf.remove(&path) {
let is_partial_rename = from_path.is_none();
if let Some(from_path) = from_path {
self.tx
.send(DebouncedEvent::Rename(from_path, path.clone()))
.unwrap();
}
let message = match op {
Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)),
Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)),
Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)),
Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)),
Some(op::Op::RENAME) if is_partial_rename => {
if path.exists() {
Some(DebouncedEvent::Create(path))
} else {
Some(DebouncedEvent::Remove(path))
}
}
_ => None,
};
if let Some(m) = message {
let _ = self.tx.send(m);
}
_ => None,
};
if let Some(m) = message {
let _ = self.tx.send(m);
} else {
// TODO error!("path not found in operations_buffer: {}", path.display())
}
} else {
// TODO error!("path not found in operations_buffer: {}", path.display())
Ok(())
}
Err(std::sync::TryLockError::Poisoned { .. }) => {
Ok(())
}
Err(std::sync::TryLockError::WouldBlock) => {
Err(TryLockError(ev))
}
}
}
Expand Down
30 changes: 30 additions & 0 deletions tests/debounce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1411,3 +1411,33 @@ fn one_file_many_events() {
);
io_thread.join().unwrap();
}

// https://github.com/passcod/notify/issues/205
#[test]
#[cfg_attr(not(target_os = "macos"), ignore)]
fn delay_zero() {
let tdir = TempDir::new("temp_dir").expect("failed to create temporary directory");

tdir.create("file1");

let (tx, rx) = mpsc::channel();
let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(0))
.expect("failed to create debounced watcher");
watcher
.watch(tdir.mkpath("."), RecursiveMode::Recursive)
.expect("failed to watch directory");

let thread = thread::spawn(move || {
for e in rx.into_iter() {
println!("{:?}", e);
}
});

for _ in 0..100 {
tdir.rename("file1", "file2");
tdir.rename("file2", "file1");
}

drop(watcher);
thread.join().unwrap();
}

0 comments on commit 977ecd0

Please sign in to comment.