Skip to content

Commit

Permalink
[impl] Add a lock back in but streamline its use
Browse files Browse the repository at this point in the history
  • Loading branch information
passcod committed Apr 1, 2019
1 parent 1260473 commit 0eb4f2e
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 90 deletions.
63 changes: 49 additions & 14 deletions src/debounce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,69 @@ pub type OperationsBuffer =

#[derive(Clone)]
pub enum EventTx {
Raw {
Immediate {
tx: Sender<RawEvent>,
},
Debounced {
DebouncedTx {
tx: Sender<DebouncedEvent>,
debounce: Debounce,
},
DebouncedTx {
Debounced {
tx: Sender<DebouncedEvent>,
debounce: Arc<Mutex<Debounce>>,
},
}

impl EventTx {
pub fn send(&mut self, event: RawEvent) {
match *self {
EventTx::Raw { ref tx } => {
pub fn is_immediate(&self) -> bool {
match self {
EventTx::Immediate { .. } => true,
_ => false,
}
}

pub fn new_immediate(tx: Sender<RawEvent>) -> Self {
EventTx::Immediate { tx }
}

pub fn new_debounced_tx(tx: Sender<DebouncedEvent>) -> Self {
EventTx::DebouncedTx { tx }
}

pub fn new_debounced(tx: Sender<DebouncedEvent>, debounce: Debounce) -> Self {
EventTx::Debounced { tx, debounce: Arc::new(Mutex::new(debounce)) }
}

pub fn debounced_tx(&self) -> Self {
match self {
EventTx::Debounced { ref tx, .. } => Self::new_debounced_tx(tx.clone()),
_ => unreachable!(),
}
}

pub fn configure_if_debounced(&self, config: Config, tx: Sender<Result<bool>>) {
match self {
EventTx::Debounced { ref debounce, .. } => {
debounce.lock().unwrap().configure(config, tx);
},
_ => {}
}
}

pub fn send(&self, event: RawEvent) {
match self {
EventTx::Immediate { ref tx } => {
let _ = tx.send(event);
}
EventTx::Debounced {
ref tx,
ref mut debounce,
ref debounce,
} => {
match (event.path, event.op, event.cookie) {
(None, Ok(op::Op::RESCAN), None) => {
let _ = tx.send(DebouncedEvent::Rescan);
}
(Some(path), Ok(op), cookie) => {
debounce.event(path, op, cookie);
debounce.lock().unwrap().event(path, op, cookie);
}
(None, Ok(_op), _cookie) => {
// TODO panic!("path is None: {:?} ({:?})", _op, _cookie);
Expand Down Expand Up @@ -84,21 +119,21 @@ pub struct Debounce {

impl Debounce {
pub fn new(delay: Duration, tx: Sender<DebouncedEvent>) -> Debounce {
let operations_buffer: OperationsBuffer = Arc::new(Mutex::new(HashMap::new()));
let operations_buffer: OperationsBuffer = Arc::default();

// spawns new thread
let timer = WatchTimer::new(tx.clone(), operations_buffer.clone(), delay);

Debounce {
tx: tx,
operations_buffer: operations_buffer,
tx,
operations_buffer,
rename_path: None,
rename_cookie: None,
timer: timer,
timer,
}
}

pub fn configure_debounced_mode(&mut self, config: Config, tx: Sender<Result<bool>>) {
pub fn configure(&mut self, config: Config, tx: Sender<Result<bool>>) {
tx.send(match config {
Config::OngoingWrites(c) => self.timer.set_ongoing_write_duration(c),
_ => Ok(false),
Expand Down
26 changes: 10 additions & 16 deletions src/fsevent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ pub unsafe extern "C" fn callback(
let flags = slice::from_raw_parts_mut(e_ptr, num);
let ids = slice::from_raw_parts_mut(i_ptr, num);

let event_tx = &(*info).event_tx;
let mut rename_event: Option<RawEvent> = None;

for p in 0..num {
Expand Down Expand Up @@ -357,7 +358,7 @@ impl Watcher for FsEventWatcher {
since_when: fs::kFSEventStreamEventIdSinceNow,
latency: 0.0,
flags: fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer,
event_tx: Arc::new(EventTx::Raw { tx }),
event_tx: Arc::new(EventTx::new_immediate(tx)),
runloop: None,
context: None,
recursive_info: HashMap::new(),
Expand All @@ -372,10 +373,10 @@ impl Watcher for FsEventWatcher {
since_when: fs::kFSEventStreamEventIdSinceNow,
latency: 0.0,
flags: fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer,
event_tx: Arc::new(EventTx::Debounced {
tx: tx.clone(),
debounce: Debounce::new(delay, tx),
}),
event_tx: Arc::new(EventTx::new_debounced(
tx.clone(),
Debounce::new(delay, tx),
)),
runloop: None,
context: None,
recursive_info: HashMap::new(),
Expand All @@ -401,19 +402,12 @@ impl Watcher for FsEventWatcher {
fn configure(&mut self, config: Config) -> Result<bool> {
let (tx, rx) = unbounded();

{
let mut debounced_event = self.event_tx.lock()?;
if let EventTx::Debounced {
ref mut debounce,
tx: _,
} = *debounced_event
{
debounce.configure_debounced_mode(config, tx);
return rx.recv()?;
}
if self.event_tx.is_immediate() {
self.configure_raw_mode(config, tx);
} else {
self.event_tx.configure_if_debounced(config, tx);
}

self.configure_raw_mode(config, tx);
rx.recv()?
}
}
Expand Down
31 changes: 13 additions & 18 deletions src/inotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ enum EventLoopMsg {
}

#[inline]
fn send_pending_rename_event(rename_event: &mut Option<RawEvent>, event_tx: &mut EventTx) {
let event = mem::replace(rename_event, None);
if let Some(e) = event {
fn send_pending_rename_event(rename_event: &mut Option<RawEvent>, event_tx: &EventTx) {
if let Some(e) = mem::replace(rename_event, None) {
event_tx.send(RawEvent {
path: e.path,
op: Ok(op::Op::REMOVE),
Expand Down Expand Up @@ -199,18 +198,14 @@ impl EventLoop {
let current_cookie = self.rename_event.as_ref().and_then(|e| e.cookie);
// send pending rename event only if the rename event for which the timer has been created hasn't been handled already; otherwise ignore this timeout
if current_cookie == Some(cookie) {
send_pending_rename_event(&mut self.rename_event, &mut self.event_tx);
send_pending_rename_event(&mut self.rename_event, &self.event_tx);
}
}
EventLoopMsg::Configure(config, tx) => {
if let EventTx::Debounced {
ref mut debounce,
tx: _,
} = self.event_tx
{
debounce.configure_debounced_mode(config, tx);
} else {
if self.event_tx.is_immediate() {
self.configure_raw_mode(config, tx);
} else {
self.event_tx.configure_if_debounced(config, tx);
}
}
}
Expand Down Expand Up @@ -245,7 +240,7 @@ impl EventLoop {
};

if event.mask.contains(EventMask::MOVED_FROM) {
send_pending_rename_event(&mut self.rename_event, &mut self.event_tx);
send_pending_rename_event(&mut self.rename_event, &self.event_tx);
remove_watch_by_event(&path, &self.watches, &mut remove_watches);
self.rename_event = Some(RawEvent {
path: path,
Expand Down Expand Up @@ -296,7 +291,7 @@ impl EventLoop {
if !o.is_empty() {
send_pending_rename_event(
&mut self.rename_event,
&mut self.event_tx,
&self.event_tx,
);

self.event_tx.send(RawEvent {
Expand Down Expand Up @@ -456,7 +451,7 @@ fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry
impl Watcher for INotifyWatcher {
fn new_immediate(tx: Sender<RawEvent>) -> Result<INotifyWatcher> {
let inotify = Inotify::init()?;
let event_tx = EventTx::Raw { tx };
let event_tx = EventTx::new_immediate(tx);
let event_loop = EventLoop::new(inotify, event_tx)?;
let channel = event_loop.channel();
event_loop.run();
Expand All @@ -465,10 +460,10 @@ impl Watcher for INotifyWatcher {

fn new(tx: Sender<DebouncedEvent>, delay: Duration) -> Result<INotifyWatcher> {
let inotify = Inotify::init()?;
let event_tx = EventTx::Debounced {
tx: tx.clone(),
debounce: Debounce::new(delay, tx),
};
let event_tx = EventTx::new_debounced(
tx.clone(),
Debounce::new(delay, tx),
);
let event_loop = EventLoop::new(inotify, event_tx)?;
let channel = event_loop.channel();
event_loop.run();
Expand Down
28 changes: 16 additions & 12 deletions src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,27 @@ pub struct PollWatcher {
event_tx: EventTx,
watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
open: Arc<AtomicBool>,
delay: Duration,
}

impl PollWatcher {
/// Create a PollWatcher which polls every `delay` milliseconds
pub fn with_delay_ms(tx: Sender<RawEvent>, delay: u32) -> Result<PollWatcher> {
pub fn with_delay(tx: Sender<RawEvent>, delay: Duration) -> Result<PollWatcher> {
let event_tx = EventTx::new_immediate(tx);
let mut p = PollWatcher {
event_tx: EventTx::Raw { tx: tx.clone() },
event_tx: event_tx.clone(),
watches: Arc::new(Mutex::new(HashMap::new())),
open: Arc::new(AtomicBool::new(true)),
delay,
};
let event_tx = EventTx::Raw { tx };
p.run(Duration::from_millis(delay as u64), event_tx);
p.run(event_tx);
Ok(p)
}

fn run(&mut self, delay: Duration, mut event_tx: EventTx) {
fn run(&mut self, event_tx: EventTx) {
let watches = self.watches.clone();
let open = self.open.clone();
let delay = self.delay.clone();

thread::spawn(move || {
// In order of priority:
Expand Down Expand Up @@ -190,20 +193,21 @@ impl PollWatcher {

impl Watcher for PollWatcher {
fn new_immediate(tx: Sender<RawEvent>) -> Result<PollWatcher> {
PollWatcher::with_delay_ms(tx, 30_000)
PollWatcher::with_delay(tx, Duration::from_secs(30))
}

fn new(tx: Sender<DebouncedEvent>, delay: Duration) -> Result<PollWatcher> {
let event_tx = EventTx::new_debounced(
tx.clone(),
Debounce::new(delay, tx),
);
let mut p = PollWatcher {
event_tx: EventTx::DebouncedTx { tx: tx.clone() },
event_tx: event_tx.debounced_tx(),
watches: Arc::new(Mutex::new(HashMap::new())),
open: Arc::new(AtomicBool::new(true)),
delay,
};
let event_tx = EventTx::Debounced {
tx: tx.clone(),
debounce: Debounce::new(delay, tx),
};
p.run(delay, event_tx);
p.run(event_tx);
Ok(p)
}

Expand Down
28 changes: 6 additions & 22 deletions src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,22 +121,11 @@ impl ReadDirectoryChangesServer {
break;
}
Action::Configure(config, tx) => {
// TODO: We shouldn't need to obtain a lock on the tx just to figure out
// whether the watcher is in debounced mode or not. Let's store a boolean
// somewhere and be done with it. Irrespective of that, putting Senders in
// mutexes is useless and we really should use crossbeam-channel anyway.
{
if let EventTx::Debounced {
ref mut debounce,
tx: _,
} = &self.event_tx
{
debounce.configure_debounced_mode(config, tx);
break;
}
if self.event_tx.is_immediate() {
self.configure_raw_mode(config, tx);
} else {
self.event_tx.configure_if_debounced(config, tx);
}

self.configure_raw_mode(config, tx);
}
}
}
Expand Down Expand Up @@ -458,8 +447,7 @@ impl ReadDirectoryChangesWatcher {
));
}

let event_tx = EventTx::Raw { tx: tx };

let event_tx = EventTx::new_immediate(tx);
let action_tx = ReadDirectoryChangesServer::start(event_tx, meta_tx, cmd_tx, wakeup_sem);

Ok(ReadDirectoryChangesWatcher {
Expand All @@ -484,11 +472,7 @@ impl ReadDirectoryChangesWatcher {
));
}

let event_tx = EventTx::Debounced {
tx: tx.clone(),
debounce: Debounce::new(delay, tx),
};

let event_tx = EventTx::new_debounced(tx.clone(), Debounce::new(delay, tx));
let action_tx = ReadDirectoryChangesServer::start(event_tx, meta_tx, cmd_tx, wakeup_sem);

Ok(ReadDirectoryChangesWatcher {
Expand Down
Loading

0 comments on commit 0eb4f2e

Please sign in to comment.