diff --git a/.cargo/config.toml b/.cargo/config.toml index 95976aaa800..9d5619d2c4e 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,3 @@ [alias] # Temporary solution to have clippy config in a single place until https://github.com/rust-lang/rust-clippy/blob/master/doc/roadmap-2021.md#lintstoml-configuration is shipped. -custom-clippy = "clippy -- -A clippy::type_complexity -A clippy::pedantic -D warnings" +custom-clippy = "clippy --all-features -- -A clippy::type_complexity -A clippy::pedantic -D warnings" diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 0102096d780..dc4fd0829c0 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -2,8 +2,15 @@ - Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound` and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724]. +- Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776]. +- Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775]. +- Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765]. [PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724 +[PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762 +[PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775 +[PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776 +[PR 2765]: https://github.com/libp2p/rust-libp2p/pull/2765 # 0.34.0 diff --git a/core/src/either.rs b/core/src/either.rs index 4b5c20b2929..42984519488 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -204,43 +204,54 @@ where type Substream = EitherOutput; type Error = EitherError; - fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { - match self { - EitherOutput::First(inner) => inner + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.project() { + EitherOutputProj::First(inner) => inner .poll_inbound(cx) .map_ok(EitherOutput::First) .map_err(EitherError::A), - EitherOutput::Second(inner) => inner + EitherOutputProj::Second(inner) => inner .poll_inbound(cx) .map_ok(EitherOutput::Second) .map_err(EitherError::B), } } - fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { - match self { - EitherOutput::First(inner) => inner + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.project() { + EitherOutputProj::First(inner) => inner .poll_outbound(cx) .map_ok(EitherOutput::First) .map_err(EitherError::A), - EitherOutput::Second(inner) => inner + EitherOutputProj::Second(inner) => inner .poll_outbound(cx) .map_ok(EitherOutput::Second) .map_err(EitherError::B), } } - fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll> { - match self { - EitherOutput::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A), - EitherOutput::Second(inner) => inner.poll_address_change(cx).map_err(EitherError::B), + fn poll_address_change( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.project() { + EitherOutputProj::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A), + EitherOutputProj::Second(inner) => { + inner.poll_address_change(cx).map_err(EitherError::B) + } } } - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - match self { - EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A), - EitherOutput::Second(inner) => inner.poll_close(cx).map_err(EitherError::B), + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.project() { + EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A), + EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B), } } } diff --git a/core/src/identity/ecdsa.rs b/core/src/identity/ecdsa.rs index b883243b13b..81dfec4b4e0 100644 --- a/core/src/identity/ecdsa.rs +++ b/core/src/identity/ecdsa.rs @@ -157,7 +157,7 @@ impl PublicKey { let buf = Self::del_asn1_header(k).ok_or_else(|| { DecodingError::new("failed to parse asn.1 encoded ecdsa p256 public key") })?; - Self::from_bytes(&buf) + Self::from_bytes(buf) } // ecPublicKey (ANSI X9.62 public key type) OID: 1.2.840.10045.2.1 @@ -198,8 +198,8 @@ impl PublicKey { if asn1_head[0] != 0x30 || asn1_head[2] != 0x30 || asn1_head[3] as usize != oids_len - || &oids_buf[..Self::EC_PUBLIC_KEY_OID.len()] != &Self::EC_PUBLIC_KEY_OID - || &oids_buf[Self::EC_PUBLIC_KEY_OID.len()..] != &Self::SECP_256_R1_OID + || oids_buf[..Self::EC_PUBLIC_KEY_OID.len()] != Self::EC_PUBLIC_KEY_OID + || oids_buf[Self::EC_PUBLIC_KEY_OID.len()..] != Self::SECP_256_R1_OID || bitstr_head[0] != 0x03 || bitstr_head[2] != 0x00 { diff --git a/core/src/muxing.rs b/core/src/muxing.rs index a2bdfa80b37..2d1e1068044 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -52,6 +52,8 @@ use futures::{task::Context, task::Poll, AsyncRead, AsyncWrite}; use multiaddr::Multiaddr; +use std::future::Future; +use std::pin::Pin; pub use self::boxed::StreamMuxerBox; pub use self::boxed::SubstreamBox; @@ -73,15 +75,24 @@ pub trait StreamMuxer { type Error: std::error::Error; /// Poll for new inbound substreams. - fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll>; + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>; /// Poll for a new, outbound substream. - fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll>; + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>; /// Poll for an address change of the underlying connection. /// /// Not all implementations may support this feature. - fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll>; + fn poll_address_change( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>; /// Closes this `StreamMuxer`. /// @@ -93,5 +104,105 @@ pub trait StreamMuxer { /// > that the remote is properly informed of the shutdown. However, apart from /// > properly informing the remote, there is no difference between this and /// > immediately dropping the muxer. - fn poll_close(&self, cx: &mut Context<'_>) -> Poll>; + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; +} + +/// Extension trait for [`StreamMuxer`]. +pub trait StreamMuxerExt: StreamMuxer + Sized { + /// Convenience function for calling [`StreamMuxer::poll_inbound`] for [`StreamMuxer`]s that are `Unpin`. + fn poll_inbound_unpin( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> + where + Self: Unpin, + { + Pin::new(self).poll_inbound(cx) + } + + /// Convenience function for calling [`StreamMuxer::poll_outbound`] for [`StreamMuxer`]s that are `Unpin`. + fn poll_outbound_unpin( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> + where + Self: Unpin, + { + Pin::new(self).poll_outbound(cx) + } + + /// Convenience function for calling [`StreamMuxer::poll_address_change`] for [`StreamMuxer`]s that are `Unpin`. + fn poll_address_change_unpin( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> + where + Self: Unpin, + { + Pin::new(self).poll_address_change(cx) + } + + /// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`. + fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll> + where + Self: Unpin, + { + Pin::new(self).poll_close(cx) + } + + /// Returns a future that resolves to the next inbound `Substream` opened by the remote. + fn next_inbound(&mut self) -> NextInbound<'_, Self> { + NextInbound(self) + } + + /// Returns a future that opens a new outbound `Substream` with the remote. + fn next_outbound(&mut self) -> NextOutbound<'_, Self> { + NextOutbound(self) + } + + /// Returns a future for closing this [`StreamMuxer`]. + fn close(self) -> Close { + Close(self) + } +} + +impl StreamMuxerExt for S where S: StreamMuxer {} + +pub struct NextInbound<'a, S>(&'a mut S); + +pub struct NextOutbound<'a, S>(&'a mut S); + +pub struct Close(S); + +impl<'a, S> Future for NextInbound<'a, S> +where + S: StreamMuxer + Unpin, +{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.0.poll_inbound_unpin(cx) + } +} + +impl<'a, S> Future for NextOutbound<'a, S> +where + S: StreamMuxer + Unpin, +{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.0.poll_outbound_unpin(cx) + } +} + +impl Future for Close +where + S: StreamMuxer + Unpin, +{ + type Output = Result<(), S::Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.0.poll_close_unpin(cx) + } } diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 80753813dcb..0f5b6e5822e 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -1,6 +1,7 @@ use crate::StreamMuxer; use futures::{AsyncRead, AsyncWrite}; use multiaddr::Multiaddr; +use pin_project::pin_project; use std::error::Error; use std::fmt; use std::io; @@ -10,52 +11,68 @@ use std::task::{Context, Poll}; /// Abstract `StreamMuxer`. pub struct StreamMuxerBox { - inner: Box + Send + Sync>, + inner: Pin + Send>>, } /// Abstract type for asynchronous reading and writing. /// /// A [`SubstreamBox`] erases the concrete type it is given and only retains its `AsyncRead` /// and `AsyncWrite` capabilities. -pub struct SubstreamBox(Box); +pub struct SubstreamBox(Pin>); +#[pin_project] struct Wrap where T: StreamMuxer, { + #[pin] inner: T, } impl StreamMuxer for Wrap where T: StreamMuxer, - T::Substream: Send + Unpin + 'static, + T::Substream: Send + 'static, T::Error: Send + Sync + 'static, { type Substream = SubstreamBox; type Error = io::Error; #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx).map_err(into_io_error) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_close(cx).map_err(into_io_error) } - fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { - self.inner + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project() + .inner .poll_inbound(cx) .map_ok(SubstreamBox::new) .map_err(into_io_error) } - fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { - self.inner + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project() + .inner .poll_outbound(cx) .map_ok(SubstreamBox::new) .map_err(into_io_error) } - fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_address_change(cx).map_err(into_io_error) + fn poll_address_change( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project() + .inner + .poll_address_change(cx) + .map_err(into_io_error) } } @@ -70,16 +87,22 @@ impl StreamMuxerBox { /// Turns a stream muxer into a `StreamMuxerBox`. pub fn new(muxer: T) -> StreamMuxerBox where - T: StreamMuxer + Send + Sync + 'static, - T::Substream: Send + Unpin + 'static, + T: StreamMuxer + Send + 'static, + T::Substream: Send + 'static, T::Error: Send + Sync + 'static, { let wrap = Wrap { inner: muxer }; StreamMuxerBox { - inner: Box::new(wrap), + inner: Box::pin(wrap), } } + + fn project( + self: Pin<&mut Self>, + ) -> Pin<&mut (dyn StreamMuxer + Send)> { + self.get_mut().inner.as_mut() + } } impl StreamMuxer for StreamMuxerBox { @@ -87,27 +110,36 @@ impl StreamMuxer for StreamMuxerBox { type Error = io::Error; #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().poll_close(cx) } - fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_inbound(cx) + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().poll_inbound(cx) } - fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_outbound(cx) + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().poll_outbound(cx) } - fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_address_change(cx) + fn poll_address_change( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().poll_address_change(cx) } } impl SubstreamBox { /// Construct a new [`SubstreamBox`] from something that implements [`AsyncRead`] and [`AsyncWrite`]. - pub fn new(stream: S) -> Self { - Self(Box::new(stream)) + pub fn new(stream: S) -> Self { + Self(Box::pin(stream)) } } @@ -118,7 +150,7 @@ impl fmt::Debug for SubstreamBox { } /// Workaround because Rust does not allow `Box`. -trait AsyncReadWrite: AsyncRead + AsyncWrite + Unpin { +trait AsyncReadWrite: AsyncRead + AsyncWrite { /// Helper function to capture the erased inner type. /// /// Used to make the [`Debug`] implementation of [`SubstreamBox`] more useful. @@ -127,7 +159,7 @@ trait AsyncReadWrite: AsyncRead + AsyncWrite + Unpin { impl AsyncReadWrite for S where - S: AsyncRead + AsyncWrite + Unpin, + S: AsyncRead + AsyncWrite, { fn type_name(&self) -> &'static str { std::any::type_name::() @@ -136,44 +168,44 @@ where impl AsyncRead for SubstreamBox { fn poll_read( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_read(cx, buf) + self.0.as_mut().poll_read(cx, buf) } fn poll_read_vectored( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_read_vectored(cx, bufs) + self.0.as_mut().poll_read_vectored(cx, bufs) } } impl AsyncWrite for SubstreamBox { fn poll_write( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_write(cx, buf) + self.0.as_mut().poll_write(cx, buf) } fn poll_write_vectored( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_write_vectored(cx, bufs) + self.0.as_mut().poll_write_vectored(cx, bufs) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_flush(cx) + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.0.as_mut().poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().0).poll_close(cx) + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.0.as_mut().poll_close(cx) } } diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index d67cb5e9825..193cfb6303f 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -23,6 +23,7 @@ use crate::{connection::Endpoint, muxing::StreamMuxer}; use futures::prelude::*; use multiaddr::Multiaddr; use std::cell::Cell; +use std::pin::Pin; use std::{io, task::Context, task::Poll}; /// Implementation of `StreamMuxer` that allows only one substream on top of a connection, @@ -57,31 +58,44 @@ where type Substream = TSocket; type Error = io::Error; - fn poll_inbound(&self, _: &mut Context<'_>) -> Poll> { - match self.endpoint { + fn poll_inbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + match this.endpoint { Endpoint::Dialer => Poll::Pending, - Endpoint::Listener => match self.inner.replace(None) { + Endpoint::Listener => match this.inner.replace(None) { None => Poll::Pending, Some(stream) => Poll::Ready(Ok(stream)), }, } } - fn poll_outbound(&self, _: &mut Context<'_>) -> Poll> { - match self.endpoint { + fn poll_outbound( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + let this = self.get_mut(); + + match this.endpoint { Endpoint::Listener => Poll::Pending, - Endpoint::Dialer => match self.inner.replace(None) { + Endpoint::Dialer => match this.inner.replace(None) { None => Poll::Pending, Some(stream) => Poll::Ready(Ok(stream)), }, } } - fn poll_address_change(&self, _: &mut Context<'_>) -> Poll> { + fn poll_address_change( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { Poll::Pending } - fn poll_close(&self, _cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index da87fb9dd6a..8fc0454794f 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -299,8 +299,8 @@ impl Multiplexed { T::Dial: Send + 'static, T::ListenerUpgrade: Send + 'static, T::Error: Send + Sync, - M: StreamMuxer + Send + Sync + 'static, - M::Substream: Send + Unpin + 'static, + M: StreamMuxer + Send + 'static, + M::Substream: Send + 'static, M::Error: Send + Sync + 'static, { boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))) diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index ecba64dfb2f..723a04b0780 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -18,8 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -mod util; - use futures::prelude::*; use libp2p_core::identity; use libp2p_core::transport::{MemoryTransport, Transport}; @@ -91,11 +89,6 @@ fn upgrade_pipeline() { .apply(HelloUpgrade {}) .apply(HelloUpgrade {}) .multiplex(MplexConfig::default()) - .and_then(|(peer, mplex), _| { - // Gracefully close the connection to allow protocol - // negotiation to complete. - util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) - }) .boxed(); let dialer_keys = identity::Keypair::generate_ed25519(); @@ -110,11 +103,6 @@ fn upgrade_pipeline() { .apply(HelloUpgrade {}) .apply(HelloUpgrade {}) .multiplex(MplexConfig::default()) - .and_then(|(peer, mplex), _| { - // Gracefully close the connection to allow protocol - // negotiation to complete. - util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) - }) .boxed(); let listen_addr1 = Multiaddr::from(Protocol::Memory(random::())); diff --git a/core/tests/util.rs b/core/tests/util.rs deleted file mode 100644 index 7ca52188a52..00000000000 --- a/core/tests/util.rs +++ /dev/null @@ -1,47 +0,0 @@ -#![allow(dead_code)] - -use futures::prelude::*; -use libp2p_core::muxing::StreamMuxer; -use std::{pin::Pin, task::Context, task::Poll}; - -pub struct CloseMuxer { - state: CloseMuxerState, -} - -impl CloseMuxer { - pub fn new(m: M) -> CloseMuxer { - CloseMuxer { - state: CloseMuxerState::Close(m), - } - } -} - -pub enum CloseMuxerState { - Close(M), - Done, -} - -impl Future for CloseMuxer -where - M: StreamMuxer, - M::Error: From, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match std::mem::replace(&mut self.state, CloseMuxerState::Done) { - CloseMuxerState::Close(muxer) => { - if !muxer.poll_close(cx)?.is_ready() { - self.state = CloseMuxerState::Close(muxer); - return Poll::Pending; - } - return Poll::Ready(Ok(muxer)); - } - CloseMuxerState::Done => panic!(), - } - } - } -} - -impl Unpin for CloseMuxer {} diff --git a/examples/README.md b/examples/README.md index 225425e0ff7..6c16d77cf66 100644 --- a/examples/README.md +++ b/examples/README.md @@ -7,7 +7,7 @@ A set of examples showcasing how to use rust-libp2p. - [Ping](ping.rs) Small `ping` clone, sending a ping to a peer, expecting a pong as a response. See - [tutorial](../src/tutorial.rs) for a step-by-step guide building the example. + [tutorial](../src/tutorials/ping.rs) for a step-by-step guide building the example. ## Individual libp2p protocols diff --git a/misc/metrics/src/identify.rs b/misc/metrics/src/identify.rs index b3ae5a75910..730528167a8 100644 --- a/misc/metrics/src/identify.rs +++ b/misc/metrics/src/identify.rs @@ -223,7 +223,7 @@ impl EncodeMetric for Protocols { |mut acc, (_, protocols)| { for protocol in protocols { let count = acc.entry(protocol.to_string()).or_default(); - *count = *count + 1; + *count += 1; } acc }, diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index d536edf4c8a..f74bcd1046f 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -26,9 +26,9 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughpu use futures::future::poll_fn; use futures::prelude::*; use futures::{channel::oneshot, future::join}; +use libp2p_core::muxing::StreamMuxerExt; use libp2p_core::{ - identity, multiaddr::multiaddr, muxing, transport, upgrade, Multiaddr, PeerId, StreamMuxer, - Transport, + identity, multiaddr::multiaddr, muxing, transport, upgrade, Multiaddr, PeerId, Transport, }; use libp2p_mplex as mplex; use libp2p_plaintext::PlainText2Config; @@ -113,10 +113,8 @@ fn run( addr_sender.take().unwrap().send(listen_addr).unwrap(); } transport::TransportEvent::Incoming { upgrade, .. } => { - let (_peer, conn) = upgrade.await.unwrap(); - let mut s = poll_fn(|cx| conn.poll_inbound(cx)) - .await - .expect("unexpected error"); + let (_peer, mut conn) = upgrade.await.unwrap(); + let mut s = conn.next_inbound().await.expect("unexpected error"); let mut buf = vec![0u8; payload_len]; let mut off = 0; @@ -140,8 +138,8 @@ fn run( // Spawn and block on the sender, i.e. until all data is sent. let sender = async move { let addr = addr_receiver.await.unwrap(); - let (_peer, conn) = sender_trans.dial(addr).unwrap().await.unwrap(); - let mut stream = poll_fn(|cx| conn.poll_outbound(cx)).await.unwrap(); + let (_peer, mut conn) = sender_trans.dial(addr).unwrap().await.unwrap(); + let mut stream = conn.next_outbound().await.unwrap(); let mut off = 0; loop { let n = poll_fn(|cx| Pin::new(&mut stream).poll_write(cx, &payload[off..])) diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 59b38db1156..14f9cda65d9 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -85,25 +85,34 @@ where type Substream = Substream; type Error = io::Error; - fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { self.io .lock() .poll_next_stream(cx) .map_ok(|stream_id| Substream::new(stream_id, self.io.clone())) } - fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { self.io .lock() .poll_open_stream(cx) .map_ok(|stream_id| Substream::new(stream_id, self.io.clone())) } - fn poll_address_change(&self, _: &mut Context<'_>) -> Poll> { + fn poll_address_change( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { Poll::Pending } - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.io.lock().poll_close(cx) } } diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index 94d69cd7ff1..bfbabf0f776 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -18,11 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::future::poll_fn; use futures::{channel::oneshot, prelude::*}; -use libp2p_core::{upgrade, StreamMuxer, Transport}; +use libp2p_core::muxing::StreamMuxerExt; +use libp2p_core::{upgrade, Transport}; use libp2p_tcp::TcpTransport; -use std::sync::Arc; #[test] fn async_write() { @@ -50,7 +49,7 @@ fn async_write() { tx.send(addr).unwrap(); - let client = transport + let mut client = transport .next() .await .expect("some event") @@ -60,7 +59,7 @@ fn async_write() { .await .unwrap(); - let mut outbound = poll_fn(|cx| client.poll_outbound(cx)).await.unwrap(); + let mut outbound = client.next_outbound().await.unwrap(); let mut buf = Vec::new(); outbound.read_to_end(&mut buf).await.unwrap(); @@ -72,8 +71,9 @@ fn async_write() { let mut transport = TcpTransport::default() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); - let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); - let mut inbound = poll_fn(|cx| client.poll_inbound(cx)).await.unwrap(); + let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); + + let mut inbound = client.next_inbound().await.unwrap(); inbound.write_all(b"hello world").await.unwrap(); // The test consists in making sure that this flushes the substream. diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index 20812fde55c..d30fcc1063d 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -18,11 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::future::poll_fn; use futures::{channel::oneshot, prelude::*}; -use libp2p_core::{upgrade, StreamMuxer, Transport}; +use libp2p_core::muxing::StreamMuxerExt; +use libp2p_core::{upgrade, Transport}; use libp2p_tcp::TcpTransport; -use std::sync::Arc; #[test] fn client_to_server_outbound() { @@ -50,7 +49,7 @@ fn client_to_server_outbound() { tx.send(addr).unwrap(); - let client = transport + let mut client = transport .next() .await .expect("some event") @@ -60,7 +59,7 @@ fn client_to_server_outbound() { .await .unwrap(); - let mut outbound = poll_fn(|cx| client.poll_outbound(cx)).await.unwrap(); + let mut outbound = client.next_outbound().await.unwrap(); let mut buf = Vec::new(); outbound.read_to_end(&mut buf).await.unwrap(); @@ -73,8 +72,8 @@ fn client_to_server_outbound() { .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)) .boxed(); - let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); - let mut inbound = poll_fn(|cx| client.poll_inbound(cx)).await.unwrap(); + let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); + let mut inbound = client.next_inbound().await.unwrap(); inbound.write_all(b"hello world").await.unwrap(); inbound.close().await.unwrap(); @@ -108,19 +107,17 @@ fn client_to_server_inbound() { tx.send(addr).unwrap(); - let client = Arc::new( - transport - .next() - .await - .expect("some event") - .into_incoming() - .unwrap() - .0 - .await - .unwrap(), - ); + let mut client = transport + .next() + .await + .expect("some event") + .into_incoming() + .unwrap() + .0 + .await + .unwrap(); - let mut inbound = poll_fn(|cx| client.poll_inbound(cx)).await.unwrap(); + let mut inbound = client.next_inbound().await.unwrap(); let mut buf = Vec::new(); inbound.read_to_end(&mut buf).await.unwrap(); @@ -133,9 +130,9 @@ fn client_to_server_inbound() { .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)) .boxed(); - let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); + let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); - let mut outbound = poll_fn(|cx| client.poll_outbound(cx)).await.unwrap(); + let mut outbound = client.next_outbound().await.unwrap(); outbound.write_all(b"hello world").await.unwrap(); outbound.close().await.unwrap(); @@ -167,7 +164,7 @@ fn protocol_not_match() { tx.send(addr).unwrap(); - let client = transport + let mut client = transport .next() .await .expect("some event") @@ -177,7 +174,7 @@ fn protocol_not_match() { .await .unwrap(); - let mut outbound = poll_fn(|cx| client.poll_outbound(cx)).await.unwrap(); + let mut outbound = client.next_outbound().await.unwrap(); let mut buf = Vec::new(); outbound.read_to_end(&mut buf).await.unwrap(); diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index a06e7934cf0..07327e203b3 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -29,7 +29,6 @@ use futures::{ use libp2p_core::muxing::StreamMuxer; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::Multiaddr; -use parking_lot::Mutex; use std::{ fmt, io, iter, mem, pin::Pin, @@ -39,7 +38,12 @@ use thiserror::Error; use yamux::ConnectionError; /// A Yamux connection. -pub struct Yamux(Mutex>); +pub struct Yamux { + /// The [`futures::stream::Stream`] of incoming substreams. + incoming: S, + /// Handle to control the connection. + control: yamux::Control, +} impl fmt::Debug for Yamux { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -47,13 +51,6 @@ impl fmt::Debug for Yamux { } } -struct Inner { - /// The [`futures::stream::Stream`] of incoming substreams. - incoming: S, - /// Handle to control the connection. - control: yamux::Control, -} - /// A token to poll for an outbound substream. #[derive(Debug)] pub struct OpenSubstreamToken(()); @@ -66,14 +63,14 @@ where fn new(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { let conn = yamux::Connection::new(io, cfg, mode); let ctrl = conn.control(); - let inner = Inner { + + Yamux { incoming: Incoming { stream: yamux::into_stream(conn).err_into().boxed(), _marker: std::marker::PhantomData, }, control: ctrl, - }; - Yamux(Mutex::new(inner)) + } } } @@ -85,14 +82,14 @@ where fn local(io: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { let conn = yamux::Connection::new(io, cfg, mode); let ctrl = conn.control(); - let inner = Inner { + + Yamux { incoming: LocalIncoming { stream: yamux::into_stream(conn).err_into().boxed_local(), _marker: std::marker::PhantomData, }, control: ctrl, - }; - Yamux(Mutex::new(inner)) + } } } @@ -105,41 +102,44 @@ where type Substream = yamux::Stream; type Error = YamuxError; - fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { - self.0 - .lock() - .incoming - .poll_next_unpin(cx) - .map(|maybe_stream| { - let stream = maybe_stream - .transpose()? - .ok_or(YamuxError(ConnectionError::Closed))?; - - Ok(stream) - }) + fn poll_inbound( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.incoming.poll_next_unpin(cx).map(|maybe_stream| { + let stream = maybe_stream + .transpose()? + .ok_or(YamuxError(ConnectionError::Closed))?; + + Ok(stream) + }) } - fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0.lock().control) + fn poll_outbound( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.control) .poll_open_stream(cx) .map_err(YamuxError) } - fn poll_address_change(&self, _: &mut Context<'_>) -> Poll> { + fn poll_address_change( + self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { Poll::Pending } - fn poll_close(&self, c: &mut Context<'_>) -> Poll> { - let mut inner = self.0.lock(); - - if let Poll::Ready(()) = Pin::new(&mut inner.control) + fn poll_close(mut self: Pin<&mut Self>, c: &mut Context<'_>) -> Poll> { + if let Poll::Ready(()) = Pin::new(&mut self.control) .poll_close(c) .map_err(YamuxError)? { return Poll::Ready(Ok(())); } - while let Poll::Ready(maybe_inbound_stream) = inner.incoming.poll_next_unpin(c)? { + while let Poll::Ready(maybe_inbound_stream) = self.incoming.poll_next_unpin(c)? { match maybe_inbound_stream { Some(inbound_stream) => mem::drop(inbound_stream), None => return Poll::Ready(Ok(())), diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 893148890b7..5d93d90b339 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -65,6 +65,7 @@ pub enum UpgradeError { Handler(ConnectionHandlerUpgrErr), } +#[derive(Default)] pub struct Behaviour { /// Queue of actions to return when polled. queued_actions: VecDeque, @@ -145,40 +146,35 @@ impl NetworkBehaviour for Behaviour { handler: Self::ConnectionHandler, _error: &DialError, ) { - match handler { - handler::Prototype::DirectConnection { - relayed_connection_id, - role: handler::Role::Initiator { attempt }, - } => { - let peer_id = - peer_id.expect("Peer of `Prototype::DirectConnection` is always known."); - if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { - self.queued_actions.push_back(ActionBuilder::Connect { + if let handler::Prototype::DirectConnection { + relayed_connection_id, + role: handler::Role::Initiator { attempt }, + } = handler + { + let peer_id = peer_id.expect("Peer of `Prototype::DirectConnection` is always known."); + if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { + self.queued_actions.push_back(ActionBuilder::Connect { + peer_id, + handler: NotifyHandler::One(relayed_connection_id), + attempt: attempt + 1, + }); + } else { + self.queued_actions.extend([ + NetworkBehaviourAction::NotifyHandler { peer_id, handler: NotifyHandler::One(relayed_connection_id), - attempt: attempt + 1, - }); - } else { - self.queued_actions.extend([ - NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::One(relayed_connection_id), - event: Either::Left( - handler::relayed::Command::UpgradeFinishedDontKeepAlive, - ), - } - .into(), - NetworkBehaviourAction::GenerateEvent( - Event::DirectConnectionUpgradeFailed { - remote_peer_id: peer_id, - error: UpgradeError::Dial, - }, - ) - .into(), - ]); - } + event: Either::Left( + handler::relayed::Command::UpgradeFinishedDontKeepAlive, + ), + } + .into(), + NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + remote_peer_id: peer_id, + error: UpgradeError::Dial, + }) + .into(), + ]); } - _ => {} } } @@ -324,7 +320,6 @@ impl NetworkBehaviour for Behaviour { /// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] /// before being returned in [`Behaviour::poll`]. -#[allow(clippy::large_enum_variant)] enum ActionBuilder { Done(NetworkBehaviourAction), Connect { @@ -333,7 +328,7 @@ enum ActionBuilder { peer_id: PeerId, }, AcceptInboundConnect { - inbound_connect: protocol::inbound::PendingConnect, + inbound_connect: Box, handler: NotifyHandler, peer_id: PeerId, }, diff --git a/protocols/dcutr/src/handler/relayed.rs b/protocols/dcutr/src/handler/relayed.rs index 9f9e2e01c13..e172b8f6993 100644 --- a/protocols/dcutr/src/handler/relayed.rs +++ b/protocols/dcutr/src/handler/relayed.rs @@ -44,7 +44,7 @@ pub enum Command { }, AcceptInboundConnect { obs_addrs: Vec, - inbound_connect: protocol::inbound::PendingConnect, + inbound_connect: Box, }, /// Upgrading the relayed connection to a direct connection either failed for good or succeeded. /// There is no need to keep the relayed connection alive for the sake of upgrading to a direct @@ -76,7 +76,7 @@ impl fmt::Debug for Command { pub enum Event { InboundConnectRequest { - inbound_connect: protocol::inbound::PendingConnect, + inbound_connect: Box, remote_addr: Multiaddr, }, InboundNegotiationFailed { @@ -201,7 +201,7 @@ impl ConnectionHandler for Handler { }; self.queued_events.push_back(ConnectionHandlerEvent::Custom( Event::InboundConnectRequest { - inbound_connect, + inbound_connect: Box::new(inbound_connect), remote_addr, }, )); @@ -245,9 +245,10 @@ impl ConnectionHandler for Handler { inbound_connect, obs_addrs, } => { - if let Some(_) = self + if self .inbound_connect .replace(inbound_connect.accept(obs_addrs).boxed()) + .is_some() { log::warn!( "New inbound connect stream while still upgrading previous one. \ @@ -337,8 +338,7 @@ impl ConnectionHandler for Handler { _ => { // Anything else is considered a fatal error or misbehaviour of // the remote peer and results in closing the connection. - self.pending_error = - Some(error.map_upgrade_err(|e| e.map_err(|e| EitherError::B(e)))); + self.pending_error = Some(error.map_upgrade_err(|e| e.map_err(EitherError::B))); } } } diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 8d29ca53793..f92186618ae 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -32,13 +32,12 @@ pub use pool::{EstablishedConnection, PendingConnection}; use crate::handler::ConnectionHandler; use crate::IntoConnectionHandler; -use futures::future::poll_fn; use handler_wrapper::HandlerWrapper; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::StreamMuxerBox; +use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; +use libp2p_core::upgrade; use libp2p_core::PeerId; -use libp2p_core::{upgrade, StreamMuxer}; use std::collections::VecDeque; use std::future::Future; use std::{error::Error, fmt, io, pin::Pin, task::Context, task::Poll}; @@ -132,10 +131,7 @@ where /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. pub fn close(self) -> (THandler, impl Future>) { - ( - self.handler.into_connection_handler(), - poll_fn(move |cx| self.muxing.poll_close(cx)), - ) + (self.handler.into_connection_handler(), self.muxing.close()) } /// Polls the handler and the substream, forwarding events from the former to the latter and @@ -158,7 +154,7 @@ where } if !self.open_info.is_empty() { - if let Poll::Ready(substream) = self.muxing.poll_outbound(cx)? { + if let Poll::Ready(substream) = self.muxing.poll_outbound_unpin(cx)? { let user_data = self .open_info .pop_front() @@ -169,13 +165,13 @@ where } } - if let Poll::Ready(substream) = self.muxing.poll_inbound(cx)? { + if let Poll::Ready(substream) = self.muxing.poll_inbound_unpin(cx)? { self.handler .inject_substream(substream, SubstreamEndpoint::Listener); continue; // Go back to the top, handler can potentially make progress again. } - if let Poll::Ready(address) = self.muxing.poll_address_change(cx)? { + if let Poll::Ready(address) = self.muxing.poll_address_change_unpin(cx)? { self.handler.inject_address_change(&address); return Poll::Ready(Ok(Event::AddressChange(address))); } diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 4bbdf9c4162..62e931e9510 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -38,7 +38,7 @@ use futures::{ stream::FuturesUnordered, }; use libp2p_core::connection::{ConnectionId, Endpoint, PendingPoint}; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox}; +use libp2p_core::muxing::{StreamMuxerBox, StreamMuxerExt}; use std::{ collections::{hash_map, HashMap}, convert::TryFrom as _, @@ -604,7 +604,7 @@ where match event { task::PendingConnectionEvent::ConnectionEstablished { id, - output: (obtained_peer_id, muxer), + output: (obtained_peer_id, mut muxer), outgoing, } => { let PendingConnectionInfo { @@ -692,7 +692,7 @@ where if let Err(error) = error { self.spawn( poll_fn(move |cx| { - if let Err(e) = ready!(muxer.poll_close(cx)) { + if let Err(e) = ready!(muxer.poll_close_unpin(cx)) { log::debug!( "Failed to close connection {:?} to peer {}: {:?}", id, diff --git a/transports/tcp/src/provider/tokio.rs b/transports/tcp/src/provider/tokio.rs index fa9ebe3b3ff..564eebfa48b 100644 --- a/transports/tcp/src/provider/tokio.rs +++ b/transports/tcp/src/provider/tokio.rs @@ -155,9 +155,9 @@ impl Provider for Tcp { #[derive(Debug)] pub struct TcpStream(pub tokio_crate::net::TcpStream); -impl Into for TcpStream { - fn into(self: TcpStream) -> tokio_crate::net::TcpStream { - self.0 +impl From for tokio_crate::net::TcpStream { + fn from(t: TcpStream) -> tokio_crate::net::TcpStream { + t.0 } }