Skip to content

Commit

Permalink
implement passive debouncer without blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
0xpr03 committed Jan 28, 2023
1 parent 85a231f commit 1cb521d
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 83 deletions.
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2021"

[dev-dependencies]
notify = { version = "5.1.0" }
notify-debouncer-mini = { version = "0.2.0" }
notify-debouncer-mini = { version = "0.3.0" }
futures = "0.3"

[[example]]
Expand Down
2 changes: 1 addition & 1 deletion examples/debounced.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fn main() {
let (tx, rx) = std::sync::mpsc::channel();

// No specific tickrate, max debounce time 2 seconds
let mut debouncer = new_debouncer(Duration::from_secs(2), None, tx).unwrap();
let mut debouncer = new_debouncer(Duration::from_secs(2), tx).unwrap();

debouncer
.watcher()
Expand Down
2 changes: 1 addition & 1 deletion examples/debounced_full_custom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() {
// setup debouncer
let (tx, rx) = std::sync::mpsc::channel();
// select backend via fish operator, here PollWatcher backend
let mut debouncer = new_debouncer_opt::<_,notify::PollWatcher>(Duration::from_secs(2), None, tx, Config::default()).unwrap();
let mut debouncer = new_debouncer_opt::<_,notify::PollWatcher>(Duration::from_secs(2), tx, Config::default()).unwrap();

debouncer
.watcher()
Expand Down
4 changes: 2 additions & 2 deletions notify-debouncer-mini/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "notify-debouncer-mini"
version = "0.2.1"
version = "0.3.0"
edition = "2021"
rust-version = "1.56"
description = "notify mini debouncer for events"
Expand All @@ -24,6 +24,6 @@ default = ["crossbeam"]
crossbeam = ["crossbeam-channel","notify/crossbeam-channel"]

[dependencies]
notify = "5.1.0"
notify = "5.0.0"
crossbeam-channel = { version = "0.5", optional = true }
serde = { version = "1.0.89", features = ["derive"], optional = true }
207 changes: 129 additions & 78 deletions notify-debouncer-mini/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//! # fn main() {
//! // setup initial watcher backend config
//! let config = Config::default();
//!
//!
//! // Select recommended watcher for debouncer.
//! // Using a callback here, could also be a channel.
//! let mut debouncer = new_debouncer(Duration::from_secs(2), None, |res: DebounceEventResult| {
Expand Down Expand Up @@ -53,14 +53,13 @@ use std::{
collections::HashMap,
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
mpsc::{RecvTimeoutError},
},
time::{Duration, Instant},
};

pub use notify;
use notify::{Error, ErrorKind, Event, RecommendedWatcher, Watcher};
use notify::{Error, Event, RecommendedWatcher, Watcher};

/// The set of requirements for watcher debounce event handling functions.
///
Expand Down Expand Up @@ -165,32 +164,77 @@ impl DebouncedEvent {
}
}

type DebounceData = Arc<Mutex<DebounceDataInner>>;
enum InnerEvent {
NotifyEvent(Result<Event, Error>),
Shutdown,
}

#[derive(Default)]
struct DebounceDataInner {
/// Path -> Event data
d: HashMap<PathBuf, EventData>,
/// timeout used to compare all events against, config
timeout: Duration,
/// last debounce run
last_run: Instant,
/// oldest buffered event
oldest_update: Instant,
/// errors
e: Vec<crate::Error>,
}

impl DebounceDataInner {
pub fn new(timeout: Duration) -> Self {
let time = Instant::now();
Self {
timeout: timeout,
oldest_update: time,
last_run: time,
e: Default::default(),
d: Default::default(),
}
}

/// Returns a duration to wait for the next tick
#[inline]
pub fn next_tick(&self) -> Option<Duration> {
if self.has_tick_data() {
let next_run = self.oldest_update + self.timeout;
Some(Instant::now().duration_since(next_run))
} else {
None
}
}

/// Returns true if there are buffered events
#[inline]
pub fn has_tick_data(&self) -> bool {
!self.d.is_empty() && !self.e.is_empty()
}

/// Retrieve a vec of debounced events, removing them if not continuous
///
/// Updates the internal tracker for the next tick
pub fn debounced_events(&mut self) -> Vec<DebouncedEvent> {
let mut events_expired = Vec::with_capacity(self.d.len());
let mut data_back = HashMap::with_capacity(self.d.len());
// TODO: perfect fit for drain_filter https://github.com/rust-lang/rust/issues/59618
// reset oldest update
self.oldest_update = Instant::now();
for (k, v) in self.d.drain() {
if v.update.elapsed() >= self.timeout {
events_expired.push(DebouncedEvent::new(k, DebouncedEventKind::Any));
} else if v.insert.elapsed() >= self.timeout {
data_back.insert(k.clone(), v);
events_expired.push(DebouncedEvent::new(k, DebouncedEventKind::AnyContinuous));
} else {
if self.oldest_update > v.update {
self.oldest_update = v.update;
}
data_back.insert(k, v);
}
}
self.d = data_back;
self.last_run = Instant::now();
events_expired
}

Expand All @@ -202,12 +246,23 @@ impl DebounceDataInner {
}

/// Add an error entry to re-send later on
#[inline]
pub fn add_error(&mut self, e: crate::Error) {
self.e.push(e);
}

/// Push notify event (error/event)
#[inline]
pub fn push(&mut self, e: Result<Event,crate::Error>) {
match e {
Ok(e) => self.add_event(e),
Err(e) => self.add_error(e),
}
}

/// Add new event to debouncer cache
pub fn add_event(&mut self, e: Event) {
#[inline]
fn add_event(&mut self, e: Event) {
for path in e.paths.into_iter() {
if let Some(v) = self.d.get_mut(&path) {
v.update = Instant::now();
Expand All @@ -220,28 +275,29 @@ impl DebounceDataInner {

/// Debouncer guard, stops the debouncer on drop
pub struct Debouncer<T: Watcher> {
stop: Arc<AtomicBool>,
watcher: T,
debouncer_thread: Option<std::thread::JoinHandle<()>>,
stop_channel: std::sync::mpsc::Sender<InnerEvent>,
}

impl<T: Watcher> Debouncer<T> {
/// Stop the debouncer, waits for the event thread to finish.
/// May block for the duration of one tick_rate.
pub fn stop(mut self) {
self.set_stop();
self.send_stop();
if let Some(t) = self.debouncer_thread.take() {
let _ = t.join();
}
}

/// Stop the debouncer, does not wait for the event thread to finish.
pub fn stop_nonblocking(self) {
self.set_stop();
self.send_stop();
}

fn set_stop(&self) {
self.stop.store(true, Ordering::Relaxed);
fn send_stop(&self) {
// send error just means that it is stopped, can't do much else
let _ = self.stop_channel.send(InnerEvent::Shutdown);
}

/// Access to the internally used notify Watcher backend
Expand All @@ -253,87 +309,81 @@ impl<T: Watcher> Debouncer<T> {
impl<T: Watcher> Drop for Debouncer<T> {
fn drop(&mut self) {
// don't imitate c++ async futures and block on drop
self.set_stop();
self.send_stop();
}
}

/// Creates a new debounced watcher with custom configuration.
///
/// Timeout is the amount of time after which a debounced event is emitted or a continuous event is send, if there still are events incoming for the specific path.
///
/// If tick_rate is None, notify will select a tick rate that is less than the provided timeout.
pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher>(
timeout: Duration,
tick_rate: Option<Duration>,
mut event_handler: F,
config: notify::Config
config: notify::Config,
) -> Result<Debouncer<T>, Error> {
let data = DebounceData::default();

let stop = Arc::new(AtomicBool::new(false));

let tick_div = 4;
let tick = match tick_rate {
Some(v) => {
if v > timeout {
return Err(Error::new(ErrorKind::Generic(format!(
"Invalid tick_rate, tick rate {:?} > {:?} timeout!",
v, timeout
))));
}
v
}
None => timeout.checked_div(tick_div).ok_or_else(|| {
Error::new(ErrorKind::Generic(format!(
"Failed to calculate tick as {:?}/{}!",
timeout, tick_div
)))
})?,
};

{
let mut data_w = data.lock().unwrap();
data_w.timeout = timeout;
}
let (tx, rx) = std::sync::mpsc::channel();

let data_c = data.clone();
let stop_c = stop.clone();
let thread = std::thread::Builder::new()
.name("notify-rs debouncer loop".to_string())
.spawn(move || loop {
if stop_c.load(Ordering::Acquire) {
break;
}
std::thread::sleep(tick);
let send_data;
let errors: Vec<crate::Error>;
{
let mut lock = data_c.lock().expect("Can't lock debouncer data!");
send_data = lock.debounced_events();
errors = lock.errors();
}
if send_data.len() > 0 {
event_handler.handle_event(Ok(send_data));
}
if errors.len() > 0 {
event_handler.handle_event(Err(errors));
.spawn(move || {
let mut data = DebounceDataInner::new(timeout);
let mut run = true;
while run {
match data.next_tick() {
Some(timeout) => {
dbg!(timeout);
// wait for wakeup
match rx.recv_timeout(timeout) {
Ok(InnerEvent::NotifyEvent(e)) => data.push(e),
Ok(InnerEvent::Shutdown) => run = false,
Err(RecvTimeoutError::Disconnected) => run = false,
// TODO: wait the correct time?
Err(RecvTimeoutError::Timeout) => continue,
}
},
None => match rx.recv() {
Ok(InnerEvent::NotifyEvent(e)) => data.push(e),
Ok(InnerEvent::Shutdown) => run = false,
Err(_) => run = false,
}
}

// // drain events channel
// loop {
// match rx.try_recv() {
// Ok(InnerEvent::NotifyEvent(e)) => data.push(e),
// Ok(InnerEvent::Shutdown) => run = false,
// Err(TryRecvError::Disconnected) => run = false,
// // drained
// Err(TryRecvError::Empty) => break,
// }
// }
// debounce events
let send_data = data.debounced_events();
let errors = data.errors();
if send_data.len() > 0 {
event_handler.handle_event(Ok(send_data));
}
if errors.len() > 0 {
event_handler.handle_event(Err(errors));
}
}
})?;

let watcher = T::new(move |e: Result<Event, Error>| {
let mut lock = data.lock().expect("Can't lock debouncer data!");

match e {
Ok(e) => lock.add_event(e),
// can't have multiple TX, so we need to pipe that through our debouncer
Err(e) => lock.add_error(e),
}
}, config)?;
let tx_c = tx.clone();
let watcher = T::new(
move |e: Result<Event, Error>| {
// send failure can't be handled, would need a working channel to signal that
// also probably means that we're in the process of shutting down
let _ = tx_c.send(InnerEvent::NotifyEvent(e));
},
config,
)?;

let guard = Debouncer {
watcher,
debouncer_thread: Some(thread),
stop,
stop_channel: tx
};

Ok(guard)
Expand All @@ -342,12 +392,13 @@ pub fn new_debouncer_opt<F: DebounceEventHandler, T: Watcher>(
/// Short function to create a new debounced watcher with the recommended debouncer.
///
/// Timeout is the amount of time after which a debounced event is emitted or a continuous event is send, if there still are events incoming for the specific path.
///
/// If tick_rate is None, notify will select a tick rate that is less than the provided timeout.
pub fn new_debouncer<F: DebounceEventHandler>(
timeout: Duration,
tick_rate: Option<Duration>,
event_handler: F
event_handler: F,
) -> Result<Debouncer<RecommendedWatcher>, Error> {
new_debouncer_opt::<F, RecommendedWatcher>(timeout, tick_rate, event_handler, notify::Config::default())
new_debouncer_opt::<F, RecommendedWatcher>(
timeout,
event_handler,
notify::Config::default(),
)
}

0 comments on commit 1cb521d

Please sign in to comment.