Skip to content

Commit

Permalink
WIP: undoing deadlock avoidance to test non-macos platforms
Browse files Browse the repository at this point in the history
  • Loading branch information
mtak- committed Sep 25, 2019
1 parent 6eef798 commit 737b5d9
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 50 deletions.
4 changes: 2 additions & 2 deletions src/debounce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::time::Duration;

pub type OperationsBufferInner = HashMap<PathBuf, (Option<op::Op>, Option<PathBuf>, Option<u64>)>;
pub type OperationsBuffer = Arc<Mutex<OperationsBufferInner>>;
pub type OperationsBuffer =
Arc<Mutex<HashMap<PathBuf, (Option<op::Op>, Option<PathBuf>, Option<u64>)>>>;

pub enum EventTx {
Raw {
Expand Down
77 changes: 29 additions & 48 deletions src/debounce/timer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use super::super::{op, DebouncedEvent};

use std::collections::VecDeque;
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::{
Expand All @@ -11,7 +10,7 @@ use std::sync::{
use std::thread;
use std::time::{Duration, Instant};

use debounce::{OperationsBuffer, OperationsBufferInner};
use debounce::OperationsBuffer;

#[derive(PartialEq, Eq)]
struct ScheduledEvent {
Expand All @@ -31,26 +30,10 @@ struct ScheduleWorker {

impl ScheduleWorker {
fn fire_due_events(&self, now: Instant) -> Option<Instant> {
// simple deadlock avoidance loop.
let (mut events, mut op_buf) = loop {
let events = self.events.lock().unwrap();

// To avoid deadlock, we do a `try_lock`, and on `WouldBlock`, we unlock the
// events Mutex, and retry after yielding.
match self.operations_buffer.try_lock() {
Ok(op_buf) => break (events, op_buf),
Err(::std::sync::TryLockError::Poisoned {..}) => return None,
Err(::std::sync::TryLockError::WouldBlock) => {
// drop the lock before yielding to give other threads a chance to complete
// their work.
drop(events);
::std::thread::yield_now();
}
}
};
let mut events = self.events.lock().unwrap();
while let Some(event) = events.pop_front() {
if event.when <= now {
self.fire_event(event, &mut op_buf)
self.fire_event(event)
} else {
// not due yet, put it back
let next_when = event.when;
Expand All @@ -61,38 +44,36 @@ impl ScheduleWorker {
None
}

fn fire_event(
&self,
ev: ScheduledEvent,
op_buf: &mut impl DerefMut<Target = OperationsBufferInner>
) {
fn fire_event(&self, ev: ScheduledEvent) {
let ScheduledEvent { path, .. } = ev;
if let Some((op, from_path, _)) = op_buf.remove(&path) {
let is_partial_rename = from_path.is_none();
if let Some(from_path) = from_path {
self.tx
.send(DebouncedEvent::Rename(from_path, path.clone()))
.unwrap();
}
let message = match op {
Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)),
Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)),
Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)),
Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)),
Some(op::Op::RENAME) if is_partial_rename => {
if path.exists() {
Some(DebouncedEvent::Create(path))
} else {
Some(DebouncedEvent::Remove(path))
if let Ok(ref mut op_buf) = self.operations_buffer.lock() {
if let Some((op, from_path, _)) = op_buf.remove(&path) {
let is_partial_rename = from_path.is_none();
if let Some(from_path) = from_path {
self.tx
.send(DebouncedEvent::Rename(from_path, path.clone()))
.unwrap();
}
let message = match op {
Some(op::Op::CREATE) => Some(DebouncedEvent::Create(path)),
Some(op::Op::WRITE) => Some(DebouncedEvent::Write(path)),
Some(op::Op::CHMOD) => Some(DebouncedEvent::Chmod(path)),
Some(op::Op::REMOVE) => Some(DebouncedEvent::Remove(path)),
Some(op::Op::RENAME) if is_partial_rename => {
if path.exists() {
Some(DebouncedEvent::Create(path))
} else {
Some(DebouncedEvent::Remove(path))
}
}
_ => None,
};
if let Some(m) = message {
let _ = self.tx.send(m);
}
_ => None,
};
if let Some(m) = message {
let _ = self.tx.send(m);
} else {
// TODO error!("path not found in operations_buffer: {}", path.display())
}
} else {
// TODO error!("path not found in operations_buffer: {}", path.display())
}
}

Expand Down

0 comments on commit 737b5d9

Please sign in to comment.