Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/muxing: Generalise StreamMuxer::poll_address_change to poll #2797

Merged
merged 13 commits into from Aug 16, 2022
6 changes: 4 additions & 2 deletions core/CHANGELOG.md
@@ -1,16 +1,18 @@
# 0.35.0 [unreleased]

- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound`
and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724].
- Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776].
- Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775].
- Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765].
- Change `StreamMuxer` interface to be entirely poll-based. All functions on `StreamMuxer` now
require a `Context` and return `Poll`. This gives callers fine-grained control over what they
would like to make progress on. See [PR 2724] and [PR 2797].

[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724
[PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762
[PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775
[PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776
[PR 2765]: https://github.com/libp2p/rust-libp2p/pull/2765
[PR 2797]: https://github.com/libp2p/rust-libp2p/pull/2797

# 0.34.0

Expand Down
21 changes: 10 additions & 11 deletions core/src/either.rs
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::muxing::StreamMuxerEvent;
use crate::{
muxing::StreamMuxer,
transport::{ListenerId, Transport, TransportError, TransportEvent},
Expand Down Expand Up @@ -236,22 +237,20 @@ where
}
}

fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => {
inner.poll_address_change(cx).map_err(EitherError::B)
}
EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
EitherOutputProj::First(inner) => inner.poll(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll(cx).map_err(EitherError::B),
}
}
}
Expand Down
45 changes: 27 additions & 18 deletions core/src/muxing.rs
Expand Up @@ -75,6 +75,10 @@ pub trait StreamMuxer {
type Error: std::error::Error;

/// Poll for new inbound substreams.
///
/// This function should be called whenever callers are ready to accept more inbound streams. In
/// other words, callers may exercise back-pressure on incoming streams by not calling this
/// function if a certain limit is hit.
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -86,25 +90,33 @@ pub trait StreamMuxer {
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>;

/// Poll for an address change of the underlying connection.
/// Poll to close this [`StreamMuxer`].
///
/// Not all implementations may support this feature.
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>;

/// Closes this `StreamMuxer`.
///
/// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All
/// subsequent reads must return either `EOF` or an error. All subsequent writes, shutdowns,
/// or polls must generate an error or be ignored.
/// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless and may be safely
/// dropped.
///
/// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so
/// > that the remote is properly informed of the shutdown. However, apart from
/// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer.
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;

/// Poll to allow the underlying connection to make progress.
///
/// In contrast to all other `poll`-functions on [`StreamMuxer`], this function MUST be called
/// unconditionally. Because it will be called regardless, this function can be used by
/// implementations to return events about the underlying connection that the caller MUST deal
/// with.
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent, Self::Error>>;
}

/// An event produced by a [`StreamMuxer`].
pub enum StreamMuxerEvent {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts on just naming this Event and referring to is as muxing::Event?

I think there was a loose consensus around #2217 at some point but we haven't really made progress on this front.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Sounds good to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am in favor of just Event as long as we don't do use muxing::Event in other files, but instead just use muxing and then refer to it as muxing::Event.

/// The address of the remote has changed.
AddressChange(Multiaddr),
}

/// Extension trait for [`StreamMuxer`].
Expand All @@ -131,15 +143,12 @@ pub trait StreamMuxerExt: StreamMuxer + Sized {
Pin::new(self).poll_outbound(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_address_change`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_address_change_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>
/// Convenience function for calling [`StreamMuxer::poll`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<StreamMuxerEvent, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_address_change(cx)
Pin::new(self).poll(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`.
Expand Down
37 changes: 17 additions & 20 deletions core/src/muxing/boxed.rs
@@ -1,6 +1,6 @@
use crate::muxing::StreamMuxerEvent;
use crate::StreamMuxer;
use futures::{AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use pin_project::pin_project;
use std::error::Error;
use std::fmt;
Expand Down Expand Up @@ -38,11 +38,6 @@ where
type Substream = SubstreamBox;
type Error = io::Error;

#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(into_io_error)
}

fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -65,14 +60,16 @@ where
.map_err(into_io_error)
}

fn poll_address_change(
#[inline]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why inline this method but not the other ones, and why only inline it in this muxer? Just asking out of curiosity because I am not really familiar with #[inline].

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not on purpose to be honest. I thought I left everything as it was before.

I am not too familiar with inlining either but the rough advice I got was that the compiler tends to be smarter on when it is needed :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the many #[inline] attributes are from past premature optimizations. See #897.

the rough advice I got was that the compiler tends to be smarter on when it is needed :)

Yes. Unless proven through a benchmark, let the compiler make the decision and thus don't use #[inline].

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not on purpose to be honest. I thought I left everything as it was before.

The method was marked as #[inline] before this patch as well. I am fine with either removing it here or in a pull request in the future.

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(cx).map_err(into_io_error)
}

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
self.project()
.inner
.poll_address_change(cx)
.map_err(into_io_error)
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
self.project().inner.poll(cx).map_err(into_io_error)
}
}

Expand Down Expand Up @@ -109,11 +106,6 @@ impl StreamMuxer for StreamMuxerBox {
type Substream = SubstreamBox;
type Error = io::Error;

#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().poll_close(cx)
}

fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
Expand All @@ -128,11 +120,16 @@ impl StreamMuxer for StreamMuxerBox {
self.project().poll_outbound(cx)
}

fn poll_address_change(
#[inline]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().poll_close(cx)
}

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
self.project().poll_address_change(cx)
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
self.project().poll(cx)
}
}

Expand Down
14 changes: 7 additions & 7 deletions core/src/muxing/singleton.rs
Expand Up @@ -20,8 +20,8 @@

use crate::{connection::Endpoint, muxing::StreamMuxer};

use crate::muxing::StreamMuxerEvent;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::cell::Cell;
use std::pin::Pin;
use std::{io, task::Context, task::Poll};
Expand Down Expand Up @@ -88,14 +88,14 @@ where
}
}

fn poll_address_change(
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}

fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}

fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Poll::Ready(Ok(()))
}
}
7 changes: 4 additions & 3 deletions muxers/mplex/src/lib.rs
Expand Up @@ -27,9 +27,10 @@ pub use config::{MaxBufferBehaviour, MplexConfig};
use bytes::Bytes;
use codec::LocalStreamId;
use futures::{future, prelude::*, ready};
use libp2p_core::muxing::StreamMuxerEvent;
use libp2p_core::{
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
Multiaddr, StreamMuxer,
StreamMuxer,
};
use parking_lot::Mutex;
use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll};
Expand Down Expand Up @@ -105,10 +106,10 @@ where
.map_ok(|stream_id| Substream::new(stream_id, self.io.clone()))
}

fn poll_address_change(
fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}

Expand Down
7 changes: 3 additions & 4 deletions muxers/yamux/src/lib.rs
Expand Up @@ -26,9 +26,8 @@ use futures::{
prelude::*,
stream::{BoxStream, LocalBoxStream},
};
use libp2p_core::muxing::StreamMuxer;
use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_core::Multiaddr;
use std::{
fmt, io, iter, mem,
pin::Pin,
Expand Down Expand Up @@ -124,10 +123,10 @@ where
.map_err(YamuxError)
}

fn poll_address_change(
fn poll(
self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
) -> Poll<Result<StreamMuxerEvent, Self::Error>> {
Poll::Pending
}

Expand Down
43 changes: 26 additions & 17 deletions swarm/src/connection.rs
Expand Up @@ -35,7 +35,7 @@ use crate::IntoConnectionHandler;
use handler_wrapper::HandlerWrapper;
use libp2p_core::connection::ConnectedPoint;
use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt};
use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerEvent, StreamMuxerExt};
use libp2p_core::upgrade;
use libp2p_core::PeerId;
use std::collections::VecDeque;
Expand Down Expand Up @@ -153,27 +153,36 @@ where
}
}

if !self.open_info.is_empty() {
if let Poll::Ready(substream) = self.muxing.poll_outbound_unpin(cx)? {
let user_data = self
.open_info
.pop_front()
.expect("`open_info` is not empty");
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint);
continue; // Go back to the top, handler can potentially make progress again.
match self.muxing.poll_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(StreamMuxerEvent::AddressChange(address)) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
}
}

if let Poll::Ready(substream) = self.muxing.poll_inbound_unpin(cx)? {
self.handler
.inject_substream(substream, SubstreamEndpoint::Listener);
continue; // Go back to the top, handler can potentially make progress again.
if !self.open_info.is_empty() {
match self.muxing.poll_outbound_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(substream) => {
let user_data = self
.open_info
.pop_front()
.expect("`open_info` is not empty");
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint);
continue; // Go back to the top, handler can potentially make progress again.
}
}
}

if let Poll::Ready(address) = self.muxing.poll_address_change_unpin(cx)? {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
match self.muxing.poll_inbound_unpin(cx)? {
Poll::Pending => {}
Poll::Ready(substream) => {
self.handler
.inject_substream(substream, SubstreamEndpoint::Listener);
continue; // Go back to the top, handler can potentially make progress again.
}
}

return Poll::Pending; // Nothing can make progress, return `Pending`.
Expand Down