Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix segfault when watcher is moved. #33 #35

Merged
merged 2 commits into from
Oct 24, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 49 additions & 62 deletions src/fsevent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct FsEventWatcher {
flags: fs::FSEventStreamCreateFlags,
sender: Sender<Event>,
runloop: Arc<RwLock<Option<usize>>>,
context: Option<StreamContextInfo>,
context: Option<Box<StreamContextInfo>>,
}

fn translate_flags(flags: fse::StreamFlags) -> op::Op {
Expand Down Expand Up @@ -64,7 +64,7 @@ impl FsEventWatcher {
}

if let Ok(runloop) = self.runloop.read() {
if let Some(runloop) = runloop.clone() {
if let Some(runloop) = *runloop {
unsafe {
let runloop = runloop as *mut libc::c_void;
cf::CFRunLoopStop(runloop);
Expand All @@ -73,15 +73,14 @@ impl FsEventWatcher {
}

self.runloop = Arc::new(RwLock::new(None));
if let Some(ref context_info) = self.context {
if let Some(ref context_info) = self.context {
// sync done channel
match context_info.done.recv() {
Ok(()) => (),
Err(_) => panic!("the runloop may not be finished!"),
}
}


self.context = None;
}

Expand Down Expand Up @@ -120,37 +119,33 @@ impl FsEventWatcher {
done: done_rx
};

self.context = Some(info);
self.context = Some(Box::new(info));

let stream_context = fs::FSEventStreamContext{
version: 0,
info: unsafe { transmute::<_, *mut libc::c_void>(self.context.as_ref()) },
info: unsafe { transmute(self.context.as_ref().map(|ctx| & **ctx)) },
retain: cf::NULL,
copy_description: cf::NULL };

let cb = callback as *mut _;
unsafe {
let stream = fs::FSEventStreamCreate(cf::kCFAllocatorDefault,
cb,
&stream_context,
self.paths,
self.since_when,
self.latency,
self.flags);
let dummy = stream as u64;
let runloop = self.runloop.clone();

thread::spawn(move || {
let stream = dummy as *mut libc::c_void;
// fs::FSEventStreamShow(stream);
let stream = unsafe { fs::FSEventStreamCreate(cf::kCFAllocatorDefault,
cb,
&stream_context,
self.paths,
self.since_when,
self.latency,
self.flags) };

// move into thread
let dummy = stream as usize;
let runloop = self.runloop.clone();

thread::spawn(move || {
let stream = dummy as *mut libc::c_void;
unsafe {
let cur_runloop = cf::CFRunLoopGetCurrent();
{
let mut runloop = runloop.write().unwrap();
*runloop = Some(cur_runloop as *mut libc::c_void as usize);
}
fs::FSEventStreamScheduleWithRunLoop(stream,
cur_runloop,
cf::kCFRunLoopDefaultMode);
let _ = runloop.write().as_mut().map(|rl| **rl = Some(cur_runloop as *mut libc::c_void as usize)).unwrap();
fs::FSEventStreamScheduleWithRunLoop(stream, cur_runloop, cf::kCFRunLoopDefaultMode);

fs::FSEventStreamStart(stream);

Expand All @@ -159,64 +154,56 @@ impl FsEventWatcher {
fs::FSEventStreamStop(stream);
fs::FSEventStreamInvalidate(stream);
fs::FSEventStreamRelease(stream);
let _d = done_tx.send(()).unwrap();
});
}
}
done_tx.send(()).ok().expect("error while signal run loop is done");
});

Ok(())
}
}

#[allow(unused_variables)]
pub extern "C" fn callback(
pub unsafe extern "C" fn callback(
stream_ref: fs::FSEventStreamRef,
info: *mut libc::c_void,
num_events: libc::size_t, // size_t numEvents
num_events: libc::size_t, // size_t numEvents
event_paths: *const *const libc::c_char, // void *eventPaths
event_flags: *mut libc::c_void, // const FSEventStreamEventFlags eventFlags[]
event_ids: *mut libc::c_void, // const FSEventStreamEventId eventIds[]
event_flags: *mut libc::c_void, // const FSEventStreamEventFlags eventFlags[]
event_ids: *mut libc::c_void, // const FSEventStreamEventId eventIds[]
) {
let num = num_events as usize;
let e_ptr = event_flags as *mut u32;
let i_ptr = event_ids as *mut u64;
let info = unsafe { transmute::<_, *const StreamContextInfo>(info) };
let info = transmute::<_, *const StreamContextInfo>(info);

unsafe {
let paths: &[*const libc::c_char] = transmute(slice::from_raw_parts(event_paths, num));
let flags = from_raw_parts_mut(e_ptr, num);
let ids = from_raw_parts_mut(i_ptr, num);
let paths: &[*const libc::c_char] = transmute(slice::from_raw_parts(event_paths, num));
let flags = slice::from_raw_parts_mut(e_ptr, num);
let ids = slice::from_raw_parts_mut(i_ptr, num);

for p in (0..num) {
let i = CStr::from_ptr(paths[p]).to_bytes();
let flag = fse::StreamFlags::from_bits(flags[p] as u32)
.expect(format!("Unable to decode StreamFlags: {}", flags[p] as u32).as_ref());
for p in (0..num) {
let i = CStr::from_ptr(paths[p]).to_bytes();
let flag = fse::StreamFlags::from_bits(flags[p] as u32)
.expect(format!("Unable to decode StreamFlags: {}", flags[p] as u32).as_ref());

let path = PathBuf::from(from_utf8(i).ok().expect("Invalid UTF8 string."));
let event = Event{op: Ok(translate_flags(flag)), path: Some(path)};
let path = PathBuf::from(from_utf8(i).ok().expect("Invalid UTF8 string."));
let event = Event{op: Ok(translate_flags(flag)), path: Some(path)};

let _s = (*info).sender.send(event).unwrap();
}
(*info).sender.send(event).ok().expect("error while sending event");
}
}


impl Watcher for FsEventWatcher {
fn new(tx: Sender<Event>) -> Result<FsEventWatcher, Error> {
let fsevent: FsEventWatcher;

unsafe {
fsevent = FsEventWatcher {
paths: cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks),
since_when: fs::kFSEventStreamEventIdSinceNow,
latency: 0.0,
flags: fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer,
sender: tx,
runloop: Arc::new(RwLock::new(None)),
context: None,
};
}

Ok(fsevent)
Ok(FsEventWatcher {
paths: unsafe { cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks) },
since_when: fs::kFSEventStreamEventIdSinceNow,
latency: 0.0,
flags: fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer,
sender: tx,
runloop: Arc::new(RwLock::new(None)),
context: None,
})
}

fn watch<P: AsRef<Path>>(&mut self, path: P) -> Result<(), Error> {
Expand Down