Skip to content

Commit

Permalink
[debounced] Make sure watcher terminates (#170)
Browse files Browse the repository at this point in the history
fixes #169
  • Loading branch information
vemoo authored and passcod committed Jan 19, 2019
1 parent 20c40f9 commit 6e211f7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
32 changes: 27 additions & 5 deletions src/debounce/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use debounce::OperationsBuffer;
enum Action {
Schedule(ScheduledEvent),
Ignore(u64),
Stop
}

#[derive(PartialEq, Eq)]
Expand Down Expand Up @@ -41,6 +42,7 @@ struct ScheduleWorker {
ignore: HashSet<u64>,
tx: mpsc::Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
stopped: bool,
}

impl ScheduleWorker {
Expand All @@ -56,21 +58,28 @@ impl ScheduleWorker {
ignore: HashSet::new(),
tx: tx,
operations_buffer: operations_buffer,
stopped: false,
}
}

fn drain_request_queue(&mut self) {
while let Ok(action) = self.request_source.try_recv() {
match action {
Action::Schedule(event) => self.schedule.push(event),
Action::Ignore(ignore_id) => {
loop {
match self.request_source.try_recv(){
Ok(Action::Schedule(event)) => self.schedule.push(event),
Ok(Action::Ignore(ignore_id)) => {
for &ScheduledEvent { ref id, .. } in &self.schedule {
if *id == ignore_id {
self.ignore.insert(ignore_id);
break;
}
}
}
},
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected)
| Ok(Action::Stop) => {
self.stopped = true;
break;
},
}
}
}
Expand Down Expand Up @@ -144,6 +153,10 @@ impl ScheduleWorker {

let wait_duration = self.duration_until_next_event();

if self.stopped {
break;
}

// Unwrapping is safe because the mutex can't be poisoned,
// since we haven't shared it with another thread.
g = if let Some(wait_duration) = wait_duration {
Expand Down Expand Up @@ -205,3 +218,12 @@ impl WatchTimer {
.expect("Failed to send a request to the global scheduling worker");
}
}

impl std::ops::Drop for WatchTimer {
fn drop(&mut self) {
self.schedule_tx
.send(Action::Stop)
.expect("Failed to send a request to the global scheduling worker");
self.trigger.notify_one();
}
}
13 changes: 13 additions & 0 deletions tests/debounce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1111,3 +1111,16 @@ fn rename_rename_remove_temp_file() {
]);
}
}

#[test]
fn watcher_terminates() {
let (tx, rx) = mpsc::channel();
let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_millis(DELAY_MS)).expect("failed to create debounced watcher");
let thread = thread::spawn(move || {
for e in rx.into_iter() {
println!("{:?}", e);
}
});
drop(watcher);
thread.join().unwrap();
}

0 comments on commit 6e211f7

Please sign in to comment.