diff --git a/examples/if_watch.rs b/examples/if_watch.rs index 9a0cc7e..49a3e3a 100644 --- a/examples/if_watch.rs +++ b/examples/if_watch.rs @@ -1,12 +1,13 @@ +use futures::StreamExt; use if_watch::IfWatcher; -use std::pin::Pin; fn main() { env_logger::init(); futures::executor::block_on(async { let mut set = IfWatcher::new().await.unwrap(); loop { - println!("Got event {:?}", Pin::new(&mut set).await); + let event = set.select_next_some().await; + println!("Got event {:?}", event); } }); } diff --git a/src/apple.rs b/src/apple.rs index aeb8972..5942ce2 100644 --- a/src/apple.rs +++ b/src/apple.rs @@ -59,12 +59,8 @@ impl IfWatcher { pub fn iter(&self) -> impl Iterator { self.addrs.iter() } -} - -impl Future for IfWatcher { - type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { while let Poll::Ready(_) = Pin::new(&mut self.rx).poll_next(cx) { if let Err(error) = self.resync() { return Poll::Ready(Err(error)); diff --git a/src/fallback.rs b/src/fallback.rs index 22d54f2..bc4886e 100644 --- a/src/fallback.rs +++ b/src/fallback.rs @@ -48,12 +48,8 @@ impl IfWatcher { pub fn iter(&self) -> impl Iterator { self.addrs.iter() } -} - -impl Future for IfWatcher { - type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { loop { if let Some(event) = self.queue.pop_front() { return Poll::Ready(Ok(event)); diff --git a/src/lib.rs b/src/lib.rs index 3198818..aad7077 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,8 +2,9 @@ #![deny(missing_docs)] #![deny(warnings)] +use futures::stream::FusedStream; +use futures::Stream; pub use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use std::future::Future; use std::io::Result; use std::pin::Pin; use std::task::{Context, Poll}; @@ -63,25 +64,36 @@ impl IfWatcher { pub fn iter(&self) -> impl Iterator { self.0.iter() } + + /// Poll for an address change event. + pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.0).poll_next(cx) + } } -impl Future for IfWatcher { - type Output = Result; +impl Stream for IfWatcher { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_next(cx).map(Some) + } +} - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - Pin::new(&mut self.0).poll(cx) +impl FusedStream for IfWatcher { + fn is_terminated(&self) -> bool { + false } } #[cfg(test)] mod tests { use super::*; + use futures::StreamExt; #[test] fn test_ip_watch() { futures::executor::block_on(async { let mut set = IfWatcher::new().await.unwrap(); - let event = Pin::new(&mut set).await.unwrap(); + let event = set.select_next_some().await.unwrap(); println!("Got event {:?}", event); }); } diff --git a/src/linux.rs b/src/linux.rs index c0afed1..ce0c85d 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -93,12 +93,8 @@ impl IfWatcher { } } } -} - -impl Future for IfWatcher { - type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { log::trace!("polling IfWatcher {:p}", self.deref_mut()); if Pin::new(&mut self.conn).poll(cx).is_ready() { return Poll::Ready(Err(std::io::Error::new( diff --git a/src/win.rs b/src/win.rs index c9c3ff7..d2b4f69 100644 --- a/src/win.rs +++ b/src/win.rs @@ -68,12 +68,8 @@ impl IfWatcher { pub fn iter(&self) -> impl Iterator { self.addrs.iter() } -} - -impl Future for IfWatcher { - type Output = Result; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { self.waker.register(cx.waker()); if self.resync.swap(false, Ordering::Relaxed) { if let Err(error) = self.resync() {