Skip to content

Commit

Permalink
[impl] Use crossbeam-channel instead of stdlib
Browse files Browse the repository at this point in the history
Only a straight find-replace was done. There are many places where std 
channels were placed in mutexes and arcs; this is not necessary anymore 
but needs more careful refactoring.

See #160
  • Loading branch information
passcod committed Mar 30, 2019
1 parent 48a2e06 commit 7f68c8e
Show file tree
Hide file tree
Showing 18 changed files with 231 additions and 211 deletions.
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ travis-ci = { repository = "passcod/notify", branch = "main" }
maintenance = { status = "passively-maintained" }

[dependencies]
anymap = "0.12.1"
bitflags = "^1.0.4"
libc = "^0.2.4"
crossbeam-channel = "0.3.8"
filetime = "^0.2.1"
walkdir = "^2.0.1"
anymap = "0.12.1"
libc = "^0.2.4"
serde = { version = "1.0.89", features = ["derive"], optional = true }
walkdir = "^2.0.1"

[target.'cfg(target_os="linux")'.dependencies]
inotify = { version = "^0.7", default-features = false }
Expand All @@ -37,8 +38,8 @@ fsevent = "^0.2.17"
fsevent-sys = "^0.1.3"

[target.'cfg(windows)'.dependencies]
winapi = "^0.3.5"
kernel32-sys = "^0.2.1"
winapi = "^0.3.5"

[dev-dependencies]
tempdir = "^0.3.4"
Expand Down
9 changes: 5 additions & 4 deletions examples/monitor_debounced.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
extern crate crossbeam_channel;
extern crate notify;

use crossbeam_channel::unbounded;
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use std::path::Path;
use std::sync::mpsc::channel;
use std::time::Duration;

fn watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {
// Create a channel to receive the events.
let (tx, rx) = channel();
let (tx, rx) = unbounded();

// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let mut watcher: RecommendedWatcher = try!(Watcher::new(tx, Duration::from_secs(2)));
let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(2))?;

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
try!(watcher.watch(path, RecursiveMode::Recursive));
watcher.watch(path, RecursiveMode::Recursive)?;

// This is a simple loop, but you may want to use more complex logic here,
// for example to handle I/O.
Expand Down
9 changes: 5 additions & 4 deletions examples/monitor_raw.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
extern crate crossbeam_channel;
extern crate notify;

use crossbeam_channel::unbounded;
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use std::path::Path;
use std::sync::mpsc::channel;

fn watch<P: AsRef<Path>>(path: P) -> notify::Result<()> {
// Create a channel to receive the events.
let (tx, rx) = channel();
let (tx, rx) = unbounded();

// Automatically select the best implementation for your platform.
// You can also access each implementation directly e.g. INotifyWatcher.
let mut watcher: RecommendedWatcher = try!(Watcher::new_raw(tx));
let mut watcher: RecommendedWatcher = Watcher::new_raw(tx)?;

// Add a path to be watched. All files and directories at that path and
// below will be monitored for changes.
try!(watcher.watch(path, RecursiveMode::Recursive));
watcher.watch(path, RecursiveMode::Recursive)?;

// This is a simple loop, but you may want to use more complex logic here,
// for example to handle I/O.
Expand Down
17 changes: 8 additions & 9 deletions src/debounce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,25 @@ mod timer;
use super::{op, Config, DebouncedEvent, RawEvent, Result};

use self::timer::WatchTimer;

use crossbeam_channel::Sender;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::{Arc, Mutex, mpsc::Sender};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

pub type OperationsBuffer =
Arc<Mutex<HashMap<PathBuf, (Option<op::Op>, Option<PathBuf>, Option<u64>)>>>;

pub enum EventTx {
Raw {
tx: mpsc::Sender<RawEvent>,
tx: Sender<RawEvent>,
},
Debounced {
tx: mpsc::Sender<DebouncedEvent>,
tx: Sender<DebouncedEvent>,
debounce: Debounce,
},
DebouncedTx {
tx: mpsc::Sender<DebouncedEvent>,
tx: Sender<DebouncedEvent>,
},
}

Expand Down Expand Up @@ -74,15 +73,15 @@ impl EventTx {
}

pub struct Debounce {
tx: mpsc::Sender<DebouncedEvent>,
tx: Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
rename_path: Option<PathBuf>,
rename_cookie: Option<u32>,
timer: WatchTimer,
}

impl Debounce {
pub fn new(delay: Duration, tx: mpsc::Sender<DebouncedEvent>) -> Debounce {
pub fn new(delay: Duration, tx: Sender<DebouncedEvent>) -> Debounce {
let operations_buffer: OperationsBuffer = Arc::new(Mutex::new(HashMap::new()));

// spawns new thread
Expand Down Expand Up @@ -519,7 +518,7 @@ fn restart_timer(timer_id: &mut Option<u64>, path: PathBuf, timer: &mut WatchTim
fn handle_ongoing_write_event(
timer: &WatchTimer,
path: PathBuf,
tx: &mpsc::Sender<DebouncedEvent>,
tx: &Sender<DebouncedEvent>,
) {
let mut ongoing_write_event = timer.ongoing_write_event.lock().unwrap();
let mut event_details = Option::None;
Expand Down
7 changes: 3 additions & 4 deletions src/debounce/timer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::super::{op, DebouncedEvent, Error, Result};

use crossbeam_channel::Sender;
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::{
atomic::{self, AtomicBool},
Arc, Condvar, Mutex,
Expand All @@ -23,7 +22,7 @@ struct ScheduleWorker {
new_event_trigger: Arc<Condvar>,
stop_trigger: Arc<Condvar>,
events: Arc<Mutex<VecDeque<ScheduledEvent>>>,
tx: mpsc::Sender<DebouncedEvent>,
tx: Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
stopped: Arc<AtomicBool>,
worker_ongoing_write_event: Arc<Mutex<Option<(Instant, PathBuf)>>>,
Expand Down Expand Up @@ -129,7 +128,7 @@ pub struct WatchTimer {

impl WatchTimer {
pub fn new(
tx: mpsc::Sender<DebouncedEvent>,
tx: Sender<DebouncedEvent>,
operations_buffer: OperationsBuffer,
delay: Duration,
) -> WatchTimer {
Expand Down
10 changes: 5 additions & 5 deletions src/fsevent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ extern crate fsevent as fse;

use super::debounce::{Debounce, EventTx};
use super::{op, Config, DebouncedEvent, Error, RawEvent, RecursiveMode, Result, Watcher};
use crossbeam_channel::{unbounded, Receiver, Sender};
use fsevent_sys::core_foundation as cf;
use fsevent_sys::fsevent as fs;
use libc;
Expand All @@ -25,7 +26,6 @@ use std::mem::transmute;
use std::path::{Path, PathBuf};
use std::slice;
use std::str::from_utf8;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -182,7 +182,7 @@ impl FsEventWatcher {
}

// done channel is used to sync quit status of runloop thread
let (done_tx, done_rx) = channel();
let (done_tx, done_rx) = unbounded();

let info = StreamContextInfo {
event_tx: self.event_tx.clone(),
Expand Down Expand Up @@ -215,7 +215,7 @@ impl FsEventWatcher {
// move into thread
let dummy = stream as usize;
// channel to pass runloop around
let (rl_tx, rl_rx) = channel();
let (rl_tx, rl_rx) = unbounded();

thread::spawn(move || {
let stream = dummy as *mut libc::c_void;
Expand Down Expand Up @@ -400,7 +400,7 @@ impl Watcher for FsEventWatcher {
}

fn configure(&mut self, config: Config) -> Result<bool> {
let (tx, rx) = channel();
let (tx, rx) = unbounded();

{
let mut debounced_event = self.event_tx.lock()?;
Expand Down Expand Up @@ -433,7 +433,7 @@ fn test_fsevent_watcher_drop() {
use super::*;
use std::time::Duration;

let (tx, rx) = channel();
let (tx, rx) = unbounded();

{
let mut watcher: RecommendedWatcher = Watcher::new_raw(tx).unwrap();
Expand Down
8 changes: 4 additions & 4 deletions src/inotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use self::inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
use self::walkdir::WalkDir;
use super::debounce::{Debounce, EventTx};
use super::{op, Config, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher};
use crossbeam_channel::{bounded, unbounded, Sender};
use mio;
use mio_extras;
use std::collections::HashMap;
Expand All @@ -21,7 +22,6 @@ use std::fs::metadata;
use std::mem;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::sync::mpsc::{self, channel, Sender};
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -481,7 +481,7 @@ impl Watcher for INotifyWatcher {
let p = try!(env::current_dir().map_err(Error::Io));
p.join(path)
};
let (tx, rx) = mpsc::channel();
let (tx, rx) = unbounded();
let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);

// we expect the event loop to live and reply => unwraps must not panic
Expand All @@ -496,7 +496,7 @@ impl Watcher for INotifyWatcher {
let p = try!(env::current_dir().map_err(Error::Io));
p.join(path)
};
let (tx, rx) = mpsc::channel();
let (tx, rx) = unbounded();
let msg = EventLoopMsg::RemoveWatch(pb, tx);

// we expect the event loop to live and reply => unwraps must not panic
Expand All @@ -505,7 +505,7 @@ impl Watcher for INotifyWatcher {
}

fn configure(&mut self, config: Config) -> Result<bool> {
let (tx, rx) = channel();
let (tx, rx) = bounded(1);
self.0.lock()?.send(EventLoopMsg::Configure(config, tx))?;
rx.recv()?
}
Expand Down
Loading

0 comments on commit 7f68c8e

Please sign in to comment.