Skip to content

Commit

Permalink
*: replace Future impl with poll_next method
Browse files Browse the repository at this point in the history
Remove Future impl on all platform IfWatcher's, instead add `poll_next`
method. Implement `Stream` and `FusedStream` for user-facing IfWatcher.
  • Loading branch information
elenaf9 committed Aug 5, 2022
1 parent 2d53554 commit 466bd81
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 28 deletions.
5 changes: 3 additions & 2 deletions examples/if_watch.rs
@@ -1,12 +1,13 @@
use futures::StreamExt;
use if_watch::IfWatcher;
use std::pin::Pin;

fn main() {
env_logger::init();
futures::executor::block_on(async {
let mut set = IfWatcher::new().await.unwrap();
loop {
println!("Got event {:?}", Pin::new(&mut set).await);
let event = set.select_next_some().await;
println!("Got event {:?}", event);
}
});
}
6 changes: 1 addition & 5 deletions src/apple.rs
Expand Up @@ -59,12 +59,8 @@ impl IfWatcher {
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
while let Poll::Ready(_) = Pin::new(&mut self.rx).poll_next(cx) {
if let Err(error) = self.resync() {
return Poll::Ready(Err(error));
Expand Down
6 changes: 1 addition & 5 deletions src/fallback.rs
Expand Up @@ -48,12 +48,8 @@ impl IfWatcher {
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
Expand Down
24 changes: 18 additions & 6 deletions src/lib.rs
Expand Up @@ -2,8 +2,9 @@
#![deny(missing_docs)]
#![deny(warnings)]

use futures::stream::FusedStream;
use futures::Stream;
pub use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use std::future::Future;
use std::io::Result;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -63,25 +64,36 @@ impl IfWatcher {
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.0.iter()
}

/// Poll for an address change event.
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
Pin::new(&mut self.0).poll_next(cx)
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;
impl Stream for IfWatcher {
type Item = Result<IfEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_next(cx).map(Some)
}
}

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
impl FusedStream for IfWatcher {
fn is_terminated(&self) -> bool {
false
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;

#[test]
fn test_ip_watch() {
futures::executor::block_on(async {
let mut set = IfWatcher::new().await.unwrap();
let event = Pin::new(&mut set).await.unwrap();
let event = set.select_next_some().await.unwrap();
println!("Got event {:?}", event);
});
}
Expand Down
6 changes: 1 addition & 5 deletions src/linux.rs
Expand Up @@ -93,12 +93,8 @@ impl IfWatcher {
}
}
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
log::trace!("polling IfWatcher {:p}", self.deref_mut());
if Pin::new(&mut self.conn).poll(cx).is_ready() {
return Poll::Ready(Err(std::io::Error::new(
Expand Down
6 changes: 1 addition & 5 deletions src/win.rs
Expand Up @@ -68,12 +68,8 @@ impl IfWatcher {
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
self.waker.register(cx.waker());
if self.resync.swap(false, Ordering::Relaxed) {
if let Err(error) = self.resync() {
Expand Down

0 comments on commit 466bd81

Please sign in to comment.