Skip to content

Commit

Permalink
m: Bump to event-listener v4.0.0
Browse files Browse the repository at this point in the history
This also bumps to event-listener-strategy v0.4.0

Signed-off-by: John Nunley <dev@notgull.net>
  • Loading branch information
notgull committed Nov 21, 2023
1 parent f87b7a1 commit dab18de
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 17 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ exclude = ["/.*"]

[dependencies]
concurrent-queue = { version = "2", default-features = false }
event-listener = { version = "3.0.0", default-features = false }
event-listener-strategy = { version = "0.3.0", default-features = false }
event-listener = { version = "4.0.0", default-features = false }
event-listener-strategy = { version = "0.4.0", default-features = false }
futures-core = { version = "0.3.5", default-features = false }
pin-project-lite = "0.2.11"

Expand Down
28 changes: 13 additions & 15 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
channel: channel.clone(),
};
let r = Receiver {
listener: EventListener::new(&channel.stream_ops),
listener: EventListener::new(),
channel,
};
(s, r)
Expand Down Expand Up @@ -172,7 +172,7 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
channel: channel.clone(),
};
let r = Receiver {
listener: EventListener::new(&channel.stream_ops),
listener: EventListener::new(),
channel,
};
(s, r)
Expand Down Expand Up @@ -247,7 +247,7 @@ impl<T> Sender<T> {
Send::_new(SendInner {
sender: self,
msg: Some(msg),
listener: EventListener::new(&self.channel.send_ops),
listener: EventListener::new(),
})
}

Expand Down Expand Up @@ -567,7 +567,7 @@ impl<T> Receiver<T> {
pub fn recv(&self) -> Recv<'_, T> {
Recv::_new(RecvInner {
receiver: self,
listener: EventListener::new(&self.channel.recv_ops),
listener: EventListener::new(),
})
}

Expand Down Expand Up @@ -787,7 +787,7 @@ impl<T> Clone for Receiver<T> {

Receiver {
channel: self.channel.clone(),
listener: EventListener::new(&self.channel.stream_ops),
listener: EventListener::new(),
}
}
}
Expand All @@ -811,17 +811,13 @@ impl<T> Stream for Receiver<T> {
Ok(msg) => {
// The stream is not blocked on an event - drop the listener.
let mut this = self.project();
this.listener
.as_mut()
.set(EventListener::new(&this.channel.stream_ops));
this.listener.as_mut().set(EventListener::new());
return Poll::Ready(Some(msg));
}
Err(TryRecvError::Closed) => {
// The stream is not blocked on an event - drop the listener.
let mut this = self.project();
this.listener
.as_mut()
.set(EventListener::new(&this.channel.stream_ops));
this.listener.as_mut().set(EventListener::new());
return Poll::Ready(None);
}
Err(TryRecvError::Empty) => {}
Expand All @@ -833,7 +829,7 @@ impl<T> Stream for Receiver<T> {
// Go back to the outer loop to wait for a notification.
break;
} else {
this.listener.as_mut().listen();
this.listener.as_mut().listen(&this.channel.stream_ops);
}
}
}
Expand Down Expand Up @@ -918,7 +914,7 @@ impl<T> WeakReceiver<T> {
}
Ok(_) => Some(Receiver {
channel: self.channel.clone(),
listener: EventListener::new(&self.channel.stream_ops),
listener: EventListener::new(),
}),
}
}
Expand Down Expand Up @@ -1123,7 +1119,7 @@ impl<'a, T> EventListenerFuture for SendInner<'a, T> {
// Poll using the given strategy
ready!(S::poll(strategy, this.listener.as_mut(), context));
} else {
this.listener.as_mut().listen();
this.listener.as_mut().listen(&this.sender.channel.send_ops);
}
}
}
Expand Down Expand Up @@ -1171,7 +1167,9 @@ impl<'a, T> EventListenerFuture for RecvInner<'a, T> {
// Poll using the given strategy
ready!(S::poll(strategy, this.listener.as_mut(), cx));
} else {
this.listener.as_mut().listen();
this.listener
.as_mut()
.listen(&this.receiver.channel.recv_ops);
}
}
}
Expand Down

0 comments on commit dab18de

Please sign in to comment.