From fffa5ffadf692b353fb4359e92c69da07e07b3d9 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Jun 2022 12:56:11 +0200 Subject: [PATCH 01/25] Introduce `StreamMuxerEvent::OutboundSubstream` This variant is not yet constructed but we will need it very soon. --- core/src/either.rs | 4 ++-- core/src/muxing.rs | 11 +++++++++-- core/src/muxing/boxed.rs | 4 ++-- swarm/src/connection/substream.rs | 6 ++++++ 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/core/src/either.rs b/core/src/either.rs index 8ce9046f6f8..0822a71a47a 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -213,11 +213,11 @@ where EitherOutput::First(inner) => inner .poll_event(cx) .map_err(EitherError::A) - .map_ok(|event| event.map_inbound_stream(EitherOutput::First)), + .map_ok(|event| event.map_stream(EitherOutput::First)), EitherOutput::Second(inner) => inner .poll_event(cx) .map_err(EitherError::B) - .map_ok(|event| event.map_inbound_stream(EitherOutput::Second)), + .map_ok(|event| event.map_stream(EitherOutput::Second)), } } diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 050d8d1bd35..e5cff34ab82 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -139,6 +139,9 @@ pub enum StreamMuxerEvent { /// Remote has opened a new substream. Contains the substream in question. InboundSubstream(T), + /// We opened a new substream. Contains the substream in question. + OutboundSubstream(T), + /// Address to the remote has changed. The previous one is now obsolete. /// /// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes @@ -157,12 +160,16 @@ impl StreamMuxerEvent { } } - /// Map the stream within [`StreamMuxerEvent::InboundSubstream`] to a new type. - pub fn map_inbound_stream(self, map: impl FnOnce(T) -> O) -> StreamMuxerEvent { + /// Map the stream within [`StreamMuxerEvent::InboundSubstream`] and + /// [`StreamMuxerEvent::OutboundStream`] to a new type. + pub fn map_stream(self, map: impl FnOnce(T) -> O) -> StreamMuxerEvent { match self { StreamMuxerEvent::InboundSubstream(stream) => { StreamMuxerEvent::InboundSubstream(map(stream)) } + StreamMuxerEvent::OutboundSubstream(stream) => { + StreamMuxerEvent::OutboundSubstream(map(stream)) + } StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), } } diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index ad39ef0532d..d5717bc046f 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -50,8 +50,8 @@ where &self, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { - let event = ready!(self.inner.poll_event(cx).map_err(into_io_error)?) - .map_inbound_stream(SubstreamBox::new); + let event = + ready!(self.inner.poll_event(cx).map_err(into_io_error)?).map_stream(SubstreamBox::new); Poll::Ready(Ok(event)) } diff --git a/swarm/src/connection/substream.rs b/swarm/src/connection/substream.rs index 47d5d315b20..0947bde0ef3 100644 --- a/swarm/src/connection/substream.rs +++ b/swarm/src/connection/substream.rs @@ -140,6 +140,12 @@ where Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => { return Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })); } + Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(substream))) => { + return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { + substream, + user_data: todo!(), + })); + } Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => { return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))) } From e8630d7ec0966ea0ea4bad63d205b4af46ff2ebf Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Jun 2022 16:07:25 +0200 Subject: [PATCH 02/25] Introduce `StreamMuxerEvent::map_outbound_substream` --- core/src/muxing.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/core/src/muxing.rs b/core/src/muxing.rs index e5cff34ab82..1211c6d9d66 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -160,6 +160,16 @@ impl StreamMuxerEvent { } } + /// If `self` is a [`StreamMuxerEvent::OutboundSubstream`], returns the content. Otherwise + /// returns `None`. + pub fn into_outbound_substream(self) -> Option { + if let StreamMuxerEvent::OutboundSubstream(s) = self { + Some(s) + } else { + None + } + } + /// Map the stream within [`StreamMuxerEvent::InboundSubstream`] and /// [`StreamMuxerEvent::OutboundStream`] to a new type. pub fn map_stream(self, map: impl FnOnce(T) -> O) -> StreamMuxerEvent { From 7eeae1b1e129304806043c9d7057db9c37b00c42 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Jun 2022 17:03:48 +0200 Subject: [PATCH 03/25] Remove "outbound" functions on `StreamMuxer` in favor of `OpenFlags` It is easier for clients to only call one poll function that can be configured, which substreams to open. This API change is also trying to plan ahead for muxers like QUIC which actually allow to only open substreams in one direction. --- core/Cargo.toml | 1 + core/src/either.rs | 53 ++--------------- core/src/muxing.rs | 51 +++++++--------- core/src/muxing/boxed.rs | 79 +++---------------------- core/src/muxing/singleton.rs | 46 +++++--------- core/src/transport/upgrade.rs | 1 - muxers/mplex/benches/split_send_size.rs | 3 +- muxers/mplex/src/lib.rs | 51 +++++++++------- muxers/mplex/tests/async_write.rs | 8 ++- muxers/mplex/tests/two_peers.rs | 20 ++++--- muxers/yamux/Cargo.toml | 1 + muxers/yamux/src/lib.rs | 51 ++++++++-------- swarm/src/connection/substream.rs | 63 +++++--------------- 13 files changed, 139 insertions(+), 289 deletions(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index deb6479e433..64bbbc21dd2 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -12,6 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] asn1_der = "0.7.4" +bitflags = "1.3.2" bs58 = "0.4.0" ed25519-dalek = "1.0.1" either = "1.5" diff --git a/core/src/either.rs b/core/src/either.rs index 0822a71a47a..9e952772708 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - muxing::{StreamMuxer, StreamMuxerEvent}, + muxing::{OpenFlags, StreamMuxer, StreamMuxerEvent}, transport::{ListenerEvent, Transport, TransportError}, Multiaddr, ProtocolName, }; @@ -202,63 +202,25 @@ where B: StreamMuxer, { type Substream = EitherOutput; - type OutboundSubstream = EitherOutbound; type Error = EitherError; fn poll_event( &self, + flags: OpenFlags, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { match self { EitherOutput::First(inner) => inner - .poll_event(cx) + .poll_event(flags, cx) .map_err(EitherError::A) .map_ok(|event| event.map_stream(EitherOutput::First)), EitherOutput::Second(inner) => inner - .poll_event(cx) + .poll_event(flags, cx) .map_err(EitherError::B) .map_ok(|event| event.map_stream(EitherOutput::Second)), } } - fn open_outbound(&self) -> Self::OutboundSubstream { - match self { - EitherOutput::First(inner) => EitherOutbound::A(inner.open_outbound()), - EitherOutput::Second(inner) => EitherOutbound::B(inner.open_outbound()), - } - } - - fn poll_outbound( - &self, - cx: &mut Context<'_>, - substream: &mut Self::OutboundSubstream, - ) -> Poll> { - match (self, substream) { - (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner - .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::First)) - .map_err(EitherError::A), - (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner - .poll_outbound(cx, substream) - .map(|p| p.map(EitherOutput::Second)) - .map_err(EitherError::B), - _ => panic!("Wrong API usage"), - } - } - - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - match self { - EitherOutput::First(inner) => match substream { - EitherOutbound::A(substream) => inner.destroy_outbound(substream), - _ => panic!("Wrong API usage"), - }, - EitherOutput::Second(inner) => match substream { - EitherOutbound::B(substream) => inner.destroy_outbound(substream), - _ => panic!("Wrong API usage"), - }, - } - } - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { match self { EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A), @@ -267,13 +229,6 @@ where } } -#[derive(Debug, Copy, Clone)] -#[must_use = "futures do nothing unless polled"] -pub enum EitherOutbound { - A(A::OutboundSubstream), - B(B::OutboundSubstream), -} - /// Implements `Stream` and dispatches all method calls to either `First` or `Second`. #[pin_project(project = EitherListenStreamProj)] #[derive(Debug, Copy, Clone)] diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 1211c6d9d66..7073f48af59 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -71,14 +71,14 @@ pub trait StreamMuxer { /// Type of the object that represents the raw substream where data can be read and written. type Substream: AsyncRead + AsyncWrite; - /// Future that will be resolved when the outgoing substream is open. - type OutboundSubstream; - /// Error type of the muxer type Error: std::error::Error; /// Polls for a connection-wide event. /// + /// Depending on the passed [`OpenFlags`], the muxer will either open a new outbound substream, + /// check for new incoming substreams or both. + /// /// This function behaves the same as a `Stream`. /// /// If `Pending` is returned, then the current task will be notified once the muxer @@ -91,35 +91,10 @@ pub trait StreamMuxer { /// An error can be generated if the connection has been closed. fn poll_event( &self, + flags: OpenFlags, cx: &mut Context<'_>, ) -> Poll, Self::Error>>; - /// Opens a new outgoing substream, and produces the equivalent to a future that will be - /// resolved when it becomes available. - /// - /// The API of `OutboundSubstream` is totally opaque, and the object can only be interfaced - /// through the methods on the `StreamMuxer` trait. - fn open_outbound(&self) -> Self::OutboundSubstream; - - /// Polls the outbound substream. - /// - /// If `Pending` is returned, then the current task will be notified once the substream - /// is ready to be polled, similar to the API of `Future::poll()`. - /// However, for each individual outbound substream, only the latest task that was used to - /// call this method may be notified. - /// - /// May panic or produce an undefined result if an earlier polling of the same substream - /// returned `Ready` or `Err`. - fn poll_outbound( - &self, - cx: &mut Context<'_>, - s: &mut Self::OutboundSubstream, - ) -> Poll>; - - /// Destroys an outbound substream future. Use this after the outbound substream has finished, - /// or if you want to interrupt it. - fn destroy_outbound(&self, s: Self::OutboundSubstream); - /// Closes this `StreamMuxer`. /// /// After this has returned `Poll::Ready(Ok(()))`, the muxer has become useless. All @@ -133,6 +108,24 @@ pub trait StreamMuxer { fn poll_close(&self, cx: &mut Context<'_>) -> Poll>; } +bitflags::bitflags! { + /// Tells the [`StreamMuxer`] which substreams it should open. + /// + /// How this is implemented may vary from muxer to muxer. Not all muxing protocols support + /// this on the lowest level so some muxers may for example still accept incoming substreams but + /// immediately drop them if the flags do not contain [`OpenFlags::INBOUND`]. + pub struct OpenFlags: u8 { + const INBOUND = 0b00000001; + const OUTBOUND = 0b00000010; + } +} + +impl Default for OpenFlags { + fn default() -> Self { + OpenFlags::INBOUND + } +} + /// Event about a connection, reported by an implementation of [`StreamMuxer`]. #[derive(Debug, Clone, PartialEq, Eq)] pub enum StreamMuxerEvent { diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index d5717bc046f..3a609a2be1f 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -1,23 +1,16 @@ -use crate::muxing::StreamMuxerEvent; +use crate::muxing::{OpenFlags, StreamMuxerEvent}; use crate::StreamMuxer; -use fnv::FnvHashMap; use futures::{ready, AsyncRead, AsyncWrite}; -use parking_lot::Mutex; use std::error::Error; use std::fmt; use std::io; use std::io::{IoSlice, IoSliceMut}; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; /// Abstract `StreamMuxer`. pub struct StreamMuxerBox { - inner: Box< - dyn StreamMuxer - + Send - + Sync, - >, + inner: Box + Send + Sync>, } /// Abstract type for asynchronous reading and writing. @@ -31,8 +24,6 @@ where T: StreamMuxer, { inner: T, - outbound: Mutex>, - next_outbound: AtomicUsize, } impl StreamMuxer for Wrap @@ -42,50 +33,20 @@ where T::Error: Send + Sync + 'static, { type Substream = SubstreamBox; - type OutboundSubstream = usize; // TODO: use a newtype type Error = io::Error; #[inline] fn poll_event( &self, + flags: OpenFlags, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { - let event = - ready!(self.inner.poll_event(cx).map_err(into_io_error)?).map_stream(SubstreamBox::new); + let event = ready!(self.inner.poll_event(flags, cx).map_err(into_io_error)?) + .map_stream(SubstreamBox::new); Poll::Ready(Ok(event)) } - #[inline] - fn open_outbound(&self) -> Self::OutboundSubstream { - let outbound = self.inner.open_outbound(); - let id = self.next_outbound.fetch_add(1, Ordering::Relaxed); - self.outbound.lock().insert(id, outbound); - id - } - - #[inline] - fn poll_outbound( - &self, - cx: &mut Context<'_>, - substream: &mut Self::OutboundSubstream, - ) -> Poll> { - let mut list = self.outbound.lock(); - let stream = ready!(self - .inner - .poll_outbound(cx, list.get_mut(substream).unwrap()) - .map_err(into_io_error)?); - - Poll::Ready(Ok(SubstreamBox::new(stream))) - } - - #[inline] - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - let mut list = self.outbound.lock(); - self.inner - .destroy_outbound(list.remove(&substream).unwrap()) - } - #[inline] fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_close(cx).map_err(into_io_error) @@ -104,15 +65,10 @@ impl StreamMuxerBox { pub fn new(muxer: T) -> StreamMuxerBox where T: StreamMuxer + Send + Sync + 'static, - T::OutboundSubstream: Send, T::Substream: Send + Unpin + 'static, T::Error: Send + Sync + 'static, { - let wrap = Wrap { - inner: muxer, - outbound: Mutex::new(Default::default()), - next_outbound: AtomicUsize::new(0), - }; + let wrap = Wrap { inner: muxer }; StreamMuxerBox { inner: Box::new(wrap), @@ -122,34 +78,15 @@ impl StreamMuxerBox { impl StreamMuxer for StreamMuxerBox { type Substream = SubstreamBox; - type OutboundSubstream = usize; // TODO: use a newtype type Error = io::Error; #[inline] fn poll_event( &self, + flags: OpenFlags, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { - self.inner.poll_event(cx) - } - - #[inline] - fn open_outbound(&self) -> Self::OutboundSubstream { - self.inner.open_outbound() - } - - #[inline] - fn poll_outbound( - &self, - cx: &mut Context<'_>, - s: &mut Self::OutboundSubstream, - ) -> Poll> { - self.inner.poll_outbound(cx, s) - } - - #[inline] - fn destroy_outbound(&self, substream: Self::OutboundSubstream) { - self.inner.destroy_outbound(substream) + self.inner.poll_event(flags, cx) } #[inline] diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index c461ed00fc3..a97fbd07bc9 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -23,6 +23,7 @@ use crate::{ muxing::{StreamMuxer, StreamMuxerEvent}, }; +use crate::muxing::OpenFlags; use futures::prelude::*; use std::cell::Cell; use std::{io, task::Context, task::Poll}; @@ -52,56 +53,37 @@ impl SingletonMuxer { } } -/// Outbound substream attempt of the `SingletonMuxer`. -pub struct OutboundSubstream {} - impl StreamMuxer for SingletonMuxer where TSocket: AsyncRead + AsyncWrite + Unpin, { type Substream = TSocket; - type OutboundSubstream = OutboundSubstream; type Error = io::Error; fn poll_event( &self, + flags: OpenFlags, _: &mut Context<'_>, - ) -> Poll, io::Error>> { + ) -> Poll, Self::Error>> { + // these combinations will never emit a stream. match self.endpoint { - Endpoint::Dialer => return Poll::Pending, - Endpoint::Listener => {} + Endpoint::Dialer if flags == OpenFlags::INBOUND => return Poll::Pending, + Endpoint::Listener if flags == OpenFlags::OUTBOUND => return Poll::Pending, + _ => {} } - if let Some(stream) = self.inner.replace(None) { - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream))) - } else { - Poll::Pending - } - } - - fn open_outbound(&self) -> Self::OutboundSubstream { - OutboundSubstream {} - } + // can only emit the stream once + let socket = match self.inner.replace(None) { + None => return Poll::Pending, + Some(stream) => stream, + }; - fn poll_outbound( - &self, - _: &mut Context<'_>, - _: &mut Self::OutboundSubstream, - ) -> Poll> { match self.endpoint { - Endpoint::Listener => return Poll::Pending, - Endpoint::Dialer => {} - } - - if let Some(stream) = self.inner.replace(None) { - Poll::Ready(Ok(stream)) - } else { - Poll::Pending + Endpoint::Dialer => Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(socket))), + Endpoint::Listener => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(socket))), } } - fn destroy_outbound(&self, _: Self::OutboundSubstream) {} - fn poll_close(&self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 964045ad33f..bca8a778daf 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -301,7 +301,6 @@ impl Multiplexed { T::Error: Send + Sync, M: StreamMuxer + Send + Sync + 'static, M::Substream: Send + Unpin + 'static, - M::OutboundSubstream: Send + 'static, M::Error: Send + Sync + 'static, { boxed(self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))) diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index a15c15c3c92..d019c8113e2 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -26,6 +26,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughpu use futures::channel::oneshot; use futures::future::poll_fn; use futures::prelude::*; +use libp2p_core::muxing::OpenFlags; use libp2p_core::{ identity, multiaddr::multiaddr, muxing, transport, upgrade, Multiaddr, PeerId, StreamMuxer, Transport, @@ -105,7 +106,7 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd } transport::ListenerEvent::Upgrade { upgrade, .. } => { let (_peer, conn) = upgrade.await.unwrap(); - let mut s = poll_fn(|cx| conn.poll_event(cx)) + let mut s = poll_fn(|cx| conn.poll_event(OpenFlags::INBOUND, cx)) .await .expect("unexpected error") .into_inbound_substream() diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 80b1db16481..2d4edb3e414 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -28,7 +28,7 @@ use bytes::Bytes; use codec::LocalStreamId; use futures::{future, prelude::*, ready}; use libp2p_core::{ - muxing::StreamMuxerEvent, + muxing::{OpenFlags, StreamMuxerEvent}, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, StreamMuxer, }; @@ -87,33 +87,41 @@ where C: AsyncRead + AsyncWrite + Unpin, { type Substream = Substream; - type OutboundSubstream = OutboundSubstream; type Error = io::Error; fn poll_event( &self, + flags: OpenFlags, cx: &mut Context<'_>, - ) -> Poll>> { - let stream_id = ready!(self.io.lock().poll_next_stream(cx))?; - let stream = Substream::new(stream_id, self.io.clone()); - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream))) - } + ) -> Poll, Self::Error>> { + let mut io = self.io.lock(); - fn open_outbound(&self) -> Self::OutboundSubstream { - OutboundSubstream {} - } + loop { + if flags.contains(OpenFlags::OUTBOUND) { + if let Poll::Ready(stream_id) = io.poll_open_stream(cx)? { + return Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(Substream::new( + stream_id, + self.io.clone(), + )))); + } + } - fn poll_outbound( - &self, - cx: &mut Context<'_>, - _: &mut Self::OutboundSubstream, - ) -> Poll> { - let stream_id = ready!(self.io.lock().poll_open_stream(cx))?; - Poll::Ready(Ok(Substream::new(stream_id, self.io.clone()))) - } + let stream_id = ready!(io.poll_next_stream(cx))?; + + if !flags.contains(OpenFlags::INBOUND) { + log::debug!( + "Dropping inbound stream {stream_id} because OpenFlags::INBOUND is not present" + ); + io.drop_stream(stream_id); + + continue; + } - fn destroy_outbound(&self, _substream: Self::OutboundSubstream) { - // Nothing to do, since `open_outbound` creates no new local state. + return Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(Substream::new( + stream_id, + self.io.clone(), + )))); + } } fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { @@ -121,9 +129,6 @@ where } } -/// Active attempt to open an outbound substream. -pub struct OutboundSubstream {} - impl AsyncRead for Substream where C: AsyncRead + AsyncWrite + Unpin, diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index d59f8b279f9..1f27e2a4a55 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -20,6 +20,7 @@ use futures::future::poll_fn; use futures::{channel::oneshot, prelude::*}; +use libp2p_core::muxing::OpenFlags; use libp2p_core::{upgrade, StreamMuxer, Transport}; use libp2p_tcp::TcpConfig; use std::sync::Arc; @@ -61,9 +62,10 @@ fn async_write() { .await .unwrap(); - let mut outbound_token = client.open_outbound(); - let mut outbound = poll_fn(|cx| client.poll_outbound(cx, &mut outbound_token)) + let mut outbound = poll_fn(|cx| client.poll_event(OpenFlags::OUTBOUND, cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); let mut buf = Vec::new(); @@ -78,7 +80,7 @@ fn async_write() { let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); let mut inbound = loop { - if let Some(s) = poll_fn(|cx| client.poll_event(cx)) + if let Some(s) = poll_fn(|cx| client.poll_event(OpenFlags::INBOUND, cx)) .await .unwrap() .into_inbound_substream() diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index a6438feaff9..264481b9a4d 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -20,6 +20,7 @@ use futures::future::poll_fn; use futures::{channel::oneshot, prelude::*}; +use libp2p_core::muxing::OpenFlags; use libp2p_core::{upgrade, StreamMuxer, Transport}; use libp2p_tcp::TcpConfig; use std::sync::Arc; @@ -61,9 +62,10 @@ fn client_to_server_outbound() { .await .unwrap(); - let mut outbound_token = client.open_outbound(); - let mut outbound = poll_fn(|cx| client.poll_outbound(cx, &mut outbound_token)) + let mut outbound = poll_fn(|cx| client.poll_event(OpenFlags::OUTBOUND, cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); let mut buf = Vec::new(); @@ -78,7 +80,7 @@ fn client_to_server_outbound() { let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); let mut inbound = loop { - if let Some(s) = poll_fn(|cx| client.poll_event(cx)) + if let Some(s) = poll_fn(|cx| client.poll_event(OpenFlags::INBOUND, cx)) .await .unwrap() .into_inbound_substream() @@ -133,7 +135,7 @@ fn client_to_server_inbound() { ); let mut inbound = loop { - if let Some(s) = poll_fn(|cx| client.poll_event(cx)) + if let Some(s) = poll_fn(|cx| client.poll_event(OpenFlags::INBOUND, cx)) .await .unwrap() .into_inbound_substream() @@ -154,9 +156,10 @@ fn client_to_server_inbound() { let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); - let mut outbound_token = client.open_outbound(); - let mut outbound = poll_fn(|cx| client.poll_outbound(cx, &mut outbound_token)) + let mut outbound = poll_fn(|cx| client.poll_event(OpenFlags::OUTBOUND, cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); outbound.write_all(b"hello world").await.unwrap(); outbound.close().await.unwrap(); @@ -200,9 +203,10 @@ fn protocol_not_match() { .await .unwrap(); - let mut outbound_token = client.open_outbound(); - let mut outbound = poll_fn(|cx| client.poll_outbound(cx, &mut outbound_token)) + let mut outbound = poll_fn(|cx| client.poll_event(OpenFlags::OUTBOUND, cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); let mut buf = Vec::new(); diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index a7c55f08949..0f3d61f34ab 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -16,3 +16,4 @@ libp2p-core = { version = "0.34.0", path = "../../core", default-features = fals parking_lot = "0.12" thiserror = "1.0" yamux = "0.10.0" +log = "0.4" diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 8eb6fb3e895..38c6fe02390 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -27,7 +27,7 @@ use futures::{ ready, stream::{BoxStream, LocalBoxStream}, }; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; +use libp2p_core::muxing::{OpenFlags, StreamMuxer, StreamMuxerEvent}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use parking_lot::Mutex; use std::{ @@ -36,6 +36,7 @@ use std::{ task::{Context, Poll}, }; use thiserror::Error; +use yamux::ConnectionError; /// A Yamux connection. pub struct Yamux(Mutex>); @@ -103,38 +104,38 @@ where S: Stream> + Unpin, { type Substream = yamux::Stream; - type OutboundSubstream = OpenSubstreamToken; type Error = YamuxError; fn poll_event( &self, - c: &mut Context<'_>, - ) -> Poll>> { + flags: OpenFlags, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { let mut inner = self.0.lock(); - match ready!(inner.incoming.poll_next_unpin(c)) { - Some(Ok(s)) => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(s))), - Some(Err(e)) => Poll::Ready(Err(e)), - None => Poll::Ready(Err(yamux::ConnectionError::Closed.into())), - } - } - fn open_outbound(&self) -> Self::OutboundSubstream { - OpenSubstreamToken(()) - } + loop { + if flags.contains(OpenFlags::OUTBOUND) { + if let Poll::Ready(stream) = Pin::new(&mut inner.control).poll_open_stream(cx)? { + return Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(stream))); + } + } - fn poll_outbound( - &self, - c: &mut Context<'_>, - _: &mut OpenSubstreamToken, - ) -> Poll> { - let mut inner = self.0.lock(); - Pin::new(&mut inner.control) - .poll_open_stream(c) - .map_err(YamuxError) - } + let stream = match ready!(inner.incoming.poll_next_unpin(cx)).transpose()? { + Some(stream) => stream, + None => return Poll::Ready(Err(YamuxError(ConnectionError::Closed))), + }; + + if !flags.contains(OpenFlags::INBOUND) { + log::debug!( + "Dropping inbound stream {stream} because OpenFlags::INBOUND is not present" + ); + mem::drop(stream); - fn destroy_outbound(&self, _: Self::OutboundSubstream) { - self.0.lock().control.abort_open_stream() + continue; + } + + return Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream))); + } } fn poll_close(&self, c: &mut Context<'_>) -> Poll> { diff --git a/swarm/src/connection/substream.rs b/swarm/src/connection/substream.rs index 0947bde0ef3..3aa3895dbbe 100644 --- a/swarm/src/connection/substream.rs +++ b/swarm/src/connection/substream.rs @@ -20,8 +20,8 @@ use futures::prelude::*; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::{StreamMuxer, StreamMuxerEvent}; -use smallvec::SmallVec; +use libp2p_core::muxing::{OpenFlags, StreamMuxer, StreamMuxerEvent}; +use std::collections::VecDeque; use std::sync::Arc; use std::{fmt, pin::Pin, task::Context, task::Poll}; @@ -46,7 +46,7 @@ where /// The muxer used to manage substreams. inner: Arc, /// List of substreams we are currently opening. - outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>, + outbound_substreams: VecDeque, } /// Future that signals the remote that we have closed the connection. @@ -95,7 +95,7 @@ where pub fn new(muxer: TMuxer) -> Self { Muxing { inner: Arc::new(muxer), - outbound_substreams: SmallVec::new(), + outbound_substreams: VecDeque::with_capacity(8), } } @@ -105,8 +105,7 @@ where /// `OutboundSubstream` event or an `OutboundClosed` event containing the user data that has /// been passed to this method. pub fn open_substream(&mut self, user_data: TUserData) { - let raw = self.inner.open_outbound(); - self.outbound_substreams.push((user_data, raw)); + self.outbound_substreams.push_back(user_data); } /// Destroys the node stream and returns all the pending outbound substreams, plus an object @@ -123,9 +122,8 @@ where /// Destroys all outbound streams and returns the corresponding user data. pub fn cancel_outgoing(&mut self) -> Vec { let mut out = Vec::with_capacity(self.outbound_substreams.len()); - for (user_data, outbound) in self.outbound_substreams.drain(..) { + for user_data in self.outbound_substreams.drain(..) { out.push(user_data); - self.inner.destroy_outbound(outbound); } out } @@ -135,15 +133,22 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll, TMuxer::Error>> { - // Polling inbound substream. - match self.inner.poll_event(cx) { + let mut flags = OpenFlags::default(); + if !self.outbound_substreams.is_empty() { + flags.insert(OpenFlags::OUTBOUND); + } + + match self.inner.poll_event(flags, cx) { Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => { return Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })); } Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(substream))) => { return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { substream, - user_data: todo!(), + user_data: self + .outbound_substreams + .pop_front() + .expect("we checked that we are not empty"), })); } Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => { @@ -153,28 +158,6 @@ where Poll::Pending => {} } - // Polling outbound substreams. - // We remove each element from `outbound_substreams` one by one and add them back. - for n in (0..self.outbound_substreams.len()).rev() { - let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n); - match self.inner.poll_outbound(cx, &mut outbound) { - Poll::Ready(Ok(substream)) => { - self.inner.destroy_outbound(outbound); - return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { - user_data, - substream, - })); - } - Poll::Pending => { - self.outbound_substreams.push((user_data, outbound)); - } - Poll::Ready(Err(err)) => { - self.inner.destroy_outbound(outbound); - return Poll::Ready(Err(err)); - } - } - } - // Nothing happened. Register our task to be notified and return. Poll::Pending } @@ -191,20 +174,6 @@ where } } -impl Drop for Muxing -where - TMuxer: StreamMuxer, -{ - fn drop(&mut self) { - // The substreams that were produced will continue to work, as the muxer is held in an Arc. - // However we will no longer process any further inbound or outbound substream, and we - // therefore close everything. - for (_, outbound) in self.outbound_substreams.drain(..) { - self.inner.destroy_outbound(outbound); - } - } -} - impl Future for Close where TMuxer: StreamMuxer, From 66dc07176627206b5922b6cb667107921136ff3e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Jun 2022 17:41:44 +0200 Subject: [PATCH 04/25] Remove `OutboundStreamId` It is unused. --- swarm/src/connection/substream.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/swarm/src/connection/substream.rs b/swarm/src/connection/substream.rs index 3aa3895dbbe..253073fccec 100644 --- a/swarm/src/connection/substream.rs +++ b/swarm/src/connection/substream.rs @@ -83,10 +83,6 @@ where AddressChange(Multiaddr), } -/// Identifier for a substream being opened. -#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct OutboundSubstreamId(usize); - impl Muxing where TMuxer: StreamMuxer, From f155e6be3fc64ac41229727cbf250b04e39947ff Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Jun 2022 17:54:08 +0200 Subject: [PATCH 05/25] Inline `Muxing` type into `Connection` --- swarm/src/connection.rs | 53 ++++--- swarm/src/connection/substream.rs | 223 ------------------------------ 2 files changed, 34 insertions(+), 242 deletions(-) delete mode 100644 swarm/src/connection/substream.rs diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index b09cb0480bd..77167c80794 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -21,7 +21,6 @@ mod error; mod handler_wrapper; mod listeners; -mod substream; pub(crate) mod pool; @@ -32,18 +31,19 @@ pub use error::{ pub use listeners::{ListenersEvent, ListenersStream}; pub use pool::{ConnectionCounters, ConnectionLimits}; pub use pool::{EstablishedConnection, PendingConnection}; -pub use substream::{Close, SubstreamEndpoint}; use crate::handler::ConnectionHandler; use crate::IntoConnectionHandler; +use futures::future::poll_fn; use handler_wrapper::HandlerWrapper; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::StreamMuxerBox; -use libp2p_core::upgrade; +use libp2p_core::muxing::{OpenFlags, StreamMuxerBox, StreamMuxerEvent}; use libp2p_core::PeerId; -use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll}; -use substream::{Muxing, SubstreamEvent}; +use libp2p_core::{upgrade, StreamMuxer}; +use std::collections::VecDeque; +use std::future::Future; +use std::{error::Error, fmt, io, pin::Pin, task::Context, task::Poll}; /// Information about a successfully established connection. #[derive(Debug, Clone, PartialEq, Eq)] @@ -54,6 +54,13 @@ pub struct Connected { pub peer_id: PeerId, } +/// Endpoint for a received substream. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum SubstreamEndpoint { + Dialer(TDialInfo), + Listener, +} + /// Event generated by a [`Connection`]. #[derive(Debug, Clone)] pub enum Event { @@ -69,19 +76,22 @@ where THandler: ConnectionHandler, { /// Node that handles the muxing. - muxing: substream::Muxing>, + muxing: StreamMuxerBox, /// Handler that processes substreams. handler: HandlerWrapper, + /// List of "open_info" that is waiting for new outbound substreams. + open_info: VecDeque>, } impl fmt::Debug for Connection where THandler: ConnectionHandler + fmt::Debug, + THandler::OutboundOpenInfo: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection") - .field("muxing", &self.muxing) .field("handler", &self.handler) + .field("open_info", &self.open_info) .finish() } } @@ -110,8 +120,9 @@ where max_negotiating_inbound_streams, ); Connection { - muxing: Muxing::new(muxer), + muxing: muxer, handler: wrapped_handler, + open_info: VecDeque::with_capacity(8), } } @@ -122,10 +133,10 @@ where /// Begins an orderly shutdown of the connection, returning the connection /// handler and a `Future` that resolves when connection shutdown is complete. - pub fn close(self) -> (THandler, Close) { + pub fn close(self) -> (THandler, impl Future>) { ( self.handler.into_connection_handler(), - self.muxing.close().0, + poll_fn(move |cx| self.muxing.poll_close(cx)), ) } @@ -140,7 +151,7 @@ where match self.handler.poll(cx)? { Poll::Pending => {} Poll::Ready(handler_wrapper::Event::OutboundSubstreamRequest(user_data)) => { - self.muxing.open_substream(user_data); + self.open_info.push_back(user_data); continue; } Poll::Ready(handler_wrapper::Event::Custom(event)) => { @@ -148,24 +159,28 @@ where } } + let mut flags = OpenFlags::default(); + if !self.open_info.is_empty() { + // Ensure queue is not empty. + flags.insert(OpenFlags::OUTBOUND); + } + // Perform I/O on the connection through the muxer, informing the handler // of new substreams. - match self.muxing.poll(cx)? { + match self.muxing.poll_event(flags, cx)? { Poll::Pending => {} - Poll::Ready(SubstreamEvent::InboundSubstream { substream }) => { + Poll::Ready(StreamMuxerEvent::InboundSubstream(substream)) => { self.handler .inject_substream(substream, SubstreamEndpoint::Listener); continue; } - Poll::Ready(SubstreamEvent::OutboundSubstream { - user_data, - substream, - }) => { + Poll::Ready(StreamMuxerEvent::OutboundSubstream(substream)) => { + let user_data = self.open_info.pop_front().expect("See (1); qed."); let endpoint = SubstreamEndpoint::Dialer(user_data); self.handler.inject_substream(substream, endpoint); continue; } - Poll::Ready(SubstreamEvent::AddressChange(address)) => { + Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { self.handler.inject_address_change(&address); return Poll::Ready(Ok(Event::AddressChange(address))); } diff --git a/swarm/src/connection/substream.rs b/swarm/src/connection/substream.rs deleted file mode 100644 index 253073fccec..00000000000 --- a/swarm/src/connection/substream.rs +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use futures::prelude::*; -use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::{OpenFlags, StreamMuxer, StreamMuxerEvent}; -use std::collections::VecDeque; -use std::sync::Arc; -use std::{fmt, pin::Pin, task::Context, task::Poll}; - -/// Endpoint for a received substream. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum SubstreamEndpoint { - Dialer(TDialInfo), - Listener, -} - -/// Implementation of `Stream` that handles substream multiplexing. -/// -/// The stream will receive substreams and can be used to open new outgoing substreams. Destroying -/// the `Muxing` will **not** close the existing substreams. -/// -/// The stream will close once both the inbound and outbound channels are closed, and no more -/// outbound substream attempt is pending. -pub struct Muxing -where - TMuxer: StreamMuxer, -{ - /// The muxer used to manage substreams. - inner: Arc, - /// List of substreams we are currently opening. - outbound_substreams: VecDeque, -} - -/// Future that signals the remote that we have closed the connection. -pub struct Close { - /// Muxer to close. - muxer: Arc, -} - -/// Event that can happen on the `Muxing`. -pub enum SubstreamEvent -where - TMuxer: StreamMuxer, -{ - /// A new inbound substream arrived. - InboundSubstream { - /// The newly-opened substream. Will return EOF of an error if the `Muxing` is - /// destroyed or `close_graceful` is called. - substream: TMuxer::Substream, - }, - - /// An outbound substream has successfully been opened. - OutboundSubstream { - /// User data that has been passed to the `open_substream` method. - user_data: TUserData, - /// The newly-opened substream. Will return EOF of an error if the `Muxing` is - /// destroyed or `close_graceful` is called. - substream: TMuxer::Substream, - }, - - /// Address to the remote has changed. The previous one is now obsolete. - /// - /// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes - /// > can change their IP address while retaining the same QUIC connection. - AddressChange(Multiaddr), -} - -impl Muxing -where - TMuxer: StreamMuxer, -{ - /// Creates a new node events stream. - pub fn new(muxer: TMuxer) -> Self { - Muxing { - inner: Arc::new(muxer), - outbound_substreams: VecDeque::with_capacity(8), - } - } - - /// Starts the process of opening a new outbound substream. - /// - /// After calling this method, polling the stream should eventually produce either an - /// `OutboundSubstream` event or an `OutboundClosed` event containing the user data that has - /// been passed to this method. - pub fn open_substream(&mut self, user_data: TUserData) { - self.outbound_substreams.push_back(user_data); - } - - /// Destroys the node stream and returns all the pending outbound substreams, plus an object - /// that signals the remote that we shut down the connection. - #[must_use] - pub fn close(mut self) -> (Close, Vec) { - let substreams = self.cancel_outgoing(); - let close = Close { - muxer: self.inner.clone(), - }; - (close, substreams) - } - - /// Destroys all outbound streams and returns the corresponding user data. - pub fn cancel_outgoing(&mut self) -> Vec { - let mut out = Vec::with_capacity(self.outbound_substreams.len()); - for user_data in self.outbound_substreams.drain(..) { - out.push(user_data); - } - out - } - - /// Provides an API similar to `Future`. - pub fn poll( - &mut self, - cx: &mut Context<'_>, - ) -> Poll, TMuxer::Error>> { - let mut flags = OpenFlags::default(); - if !self.outbound_substreams.is_empty() { - flags.insert(OpenFlags::OUTBOUND); - } - - match self.inner.poll_event(flags, cx) { - Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => { - return Poll::Ready(Ok(SubstreamEvent::InboundSubstream { substream })); - } - Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(substream))) => { - return Poll::Ready(Ok(SubstreamEvent::OutboundSubstream { - substream, - user_data: self - .outbound_substreams - .pop_front() - .expect("we checked that we are not empty"), - })); - } - Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) => { - return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))) - } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => {} - } - - // Nothing happened. Register our task to be notified and return. - Poll::Pending - } -} - -impl fmt::Debug for Muxing -where - TMuxer: StreamMuxer, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Muxing") - .field("outbound_substreams", &self.outbound_substreams.len()) - .finish() - } -} - -impl Future for Close -where - TMuxer: StreamMuxer, -{ - type Output = Result<(), TMuxer::Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.muxer.poll_close(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(())) => Poll::Ready(Ok(())), - Poll::Ready(Err(err)) => Poll::Ready(Err(err)), - } - } -} - -impl fmt::Debug for Close -where - TMuxer: StreamMuxer, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - f.debug_struct("Close").finish() - } -} - -impl fmt::Debug for SubstreamEvent -where - TMuxer: StreamMuxer, - TMuxer::Substream: fmt::Debug, - TUserData: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - SubstreamEvent::InboundSubstream { substream } => f - .debug_struct("SubstreamEvent::OutboundClosed") - .field("substream", substream) - .finish(), - SubstreamEvent::OutboundSubstream { - user_data, - substream, - } => f - .debug_struct("SubstreamEvent::OutboundSubstream") - .field("user_data", user_data) - .field("substream", substream) - .finish(), - SubstreamEvent::AddressChange(address) => f - .debug_struct("SubstreamEvent::AddressChange") - .field("address", address) - .finish(), - } - } -} From 03861fca953abd4a360c3f54deae0497b43863e3 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Jun 2022 18:05:20 +0200 Subject: [PATCH 06/25] Remove obvious comment on how `poll` functions work --- core/src/muxing.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 7073f48af59..6aa37add171 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -79,12 +79,6 @@ pub trait StreamMuxer { /// Depending on the passed [`OpenFlags`], the muxer will either open a new outbound substream, /// check for new incoming substreams or both. /// - /// This function behaves the same as a `Stream`. - /// - /// If `Pending` is returned, then the current task will be notified once the muxer - /// is ready to be polled, similar to the API of `Stream::poll()`. - /// Only the latest task that was used to call this method may be notified. - /// /// It is permissible and common to use this method to perform background /// work, such as processing incoming packets and polling timers. /// From ed8f0b714ed8e7381bfbb1dfd4a0f902913e8396 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Wed, 22 Jun 2022 18:07:54 +0200 Subject: [PATCH 07/25] Fix benchmarks --- muxers/mplex/benches/split_send_size.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index d019c8113e2..10dd63eea50 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -135,9 +135,10 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd task::block_on(async move { let addr = addr_receiver.await.unwrap(); let (_peer, conn) = transport.dial(addr).unwrap().await.unwrap(); - let mut handle = conn.open_outbound(); - let mut stream = poll_fn(|cx| conn.poll_outbound(cx, &mut handle)) + let mut stream = poll_fn(|cx| conn.poll_event(OpenFlags::OUTBOUND, cx)) .await + .unwrap() + .into_outbound_substream() .unwrap(); let mut off = 0; loop { From 3c3bed3c89837d6a1841dd844584c5b0b6e5ac20 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Thu, 23 Jun 2022 08:47:06 +0200 Subject: [PATCH 08/25] Fix intra-doc link --- core/src/muxing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 6aa37add171..ce414edd0f5 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -158,7 +158,7 @@ impl StreamMuxerEvent { } /// Map the stream within [`StreamMuxerEvent::InboundSubstream`] and - /// [`StreamMuxerEvent::OutboundStream`] to a new type. + /// [`StreamMuxerEvent::OutboundSubstream`] to a new type. pub fn map_stream(self, map: impl FnOnce(T) -> O) -> StreamMuxerEvent { match self { StreamMuxerEvent::InboundSubstream(stream) => { From 90b7ad6aadfcb0b5b6e1616245488f6a45ca9ccd Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 24 Jun 2022 09:42:12 +0200 Subject: [PATCH 09/25] Fix documentation of StreamMuxer --- core/src/muxing.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/muxing.rs b/core/src/muxing.rs index ce414edd0f5..f5102b3bc0a 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -64,9 +64,8 @@ mod singleton; /// /// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and [`AsyncWrite`]. /// -/// Inbound substreams are reported via [`StreamMuxer::poll_event`]. -/// Outbound substreams can be opened via [`StreamMuxer::open_outbound`] and subsequent polling via -/// [`StreamMuxer::poll_outbound`]. +/// The process new incoming substreams and open new outbound ones, call the [`StreamMuxer::poll_event`] +/// function with the appropriate [`OpenFlags`]. pub trait StreamMuxer { /// Type of the object that represents the raw substream where data can be read and written. type Substream: AsyncRead + AsyncWrite; From 33c949127faa151a1908e9a99ab3b8ed8adeae35 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 24 Jun 2022 09:47:58 +0200 Subject: [PATCH 10/25] Minor import and comment cleanup --- core/src/muxing/singleton.rs | 3 +-- muxers/mplex/benches/split_send_size.rs | 5 ++--- swarm/src/connection.rs | 3 ++- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index a97fbd07bc9..9a035c6c2ce 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -20,10 +20,9 @@ use crate::{ connection::Endpoint, - muxing::{StreamMuxer, StreamMuxerEvent}, + muxing::{OpenFlags, StreamMuxer, StreamMuxerEvent}, }; -use crate::muxing::OpenFlags; use futures::prelude::*; use std::cell::Cell; use std::{io, task::Context, task::Poll}; diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index 10dd63eea50..a30e3725798 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -26,10 +26,9 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion, Throughpu use futures::channel::oneshot; use futures::future::poll_fn; use futures::prelude::*; -use libp2p_core::muxing::OpenFlags; use libp2p_core::{ - identity, multiaddr::multiaddr, muxing, transport, upgrade, Multiaddr, PeerId, StreamMuxer, - Transport, + identity, multiaddr::multiaddr, muxing, muxing::OpenFlags, transport, upgrade, Multiaddr, + PeerId, StreamMuxer, Transport, }; use libp2p_mplex as mplex; use libp2p_plaintext::PlainText2Config; diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 77167c80794..44094a4bcd6 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -160,8 +160,9 @@ where } let mut flags = OpenFlags::default(); + + // (1) Ensure queue is not empty. if !self.open_info.is_empty() { - // Ensure queue is not empty. flags.insert(OpenFlags::OUTBOUND); } From 546603cc5b2cc793e17e1c34af984e48af9971e5 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 24 Jun 2022 10:05:15 +0200 Subject: [PATCH 11/25] Remove `StreamMuxerEvent` in favor of individual poll functions --- core/src/either.rs | 40 ++++++++++----- core/src/muxing.rs | 95 ++++-------------------------------- core/src/muxing/boxed.rs | 53 +++++++++++--------- core/src/muxing/singleton.rs | 40 ++++++++------- muxers/mplex/src/lib.rs | 48 ++++++------------ muxers/yamux/src/lib.rs | 50 ++++++++----------- swarm/src/connection.rs | 34 +++++-------- 7 files changed, 135 insertions(+), 225 deletions(-) diff --git a/core/src/either.rs b/core/src/either.rs index 9e952772708..0efd8aed0e2 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{ - muxing::{OpenFlags, StreamMuxer, StreamMuxerEvent}, + muxing::StreamMuxer, transport::{ListenerEvent, Transport, TransportError}, Multiaddr, ProtocolName, }; @@ -204,20 +204,36 @@ where type Substream = EitherOutput; type Error = EitherError; - fn poll_event( - &self, - flags: OpenFlags, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { match self { EitherOutput::First(inner) => inner - .poll_event(flags, cx) - .map_err(EitherError::A) - .map_ok(|event| event.map_stream(EitherOutput::First)), + .poll_inbound(cx) + .map_ok(EitherOutput::First) + .map_err(EitherError::A), EitherOutput::Second(inner) => inner - .poll_event(flags, cx) - .map_err(EitherError::B) - .map_ok(|event| event.map_stream(EitherOutput::Second)), + .poll_inbound(cx) + .map_ok(EitherOutput::Second) + .map_err(EitherError::B), + } + } + + fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { + match self { + EitherOutput::First(inner) => inner + .poll_outbound(cx) + .map_ok(EitherOutput::First) + .map_err(EitherError::A), + EitherOutput::Second(inner) => inner + .poll_outbound(cx) + .map_ok(EitherOutput::Second) + .map_err(EitherError::B), + } + } + + fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll> { + match self { + EitherOutput::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A), + EitherOutput::Second(inner) => inner.poll_address_change(cx).map_err(EitherError::B), } } diff --git a/core/src/muxing.rs b/core/src/muxing.rs index f5102b3bc0a..4620c82845e 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -64,8 +64,7 @@ mod singleton; /// /// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and [`AsyncWrite`]. /// -/// The process new incoming substreams and open new outbound ones, call the [`StreamMuxer::poll_event`] -/// function with the appropriate [`OpenFlags`]. +/// TODO(docs) pub trait StreamMuxer { /// Type of the object that represents the raw substream where data can be read and written. type Substream: AsyncRead + AsyncWrite; @@ -73,20 +72,14 @@ pub trait StreamMuxer { /// Error type of the muxer type Error: std::error::Error; - /// Polls for a connection-wide event. - /// - /// Depending on the passed [`OpenFlags`], the muxer will either open a new outbound substream, - /// check for new incoming substreams or both. - /// - /// It is permissible and common to use this method to perform background - /// work, such as processing incoming packets and polling timers. - /// - /// An error can be generated if the connection has been closed. - fn poll_event( - &self, - flags: OpenFlags, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>>; + /// TODO(docs) + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll>; + + /// TODO(docs) + fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll>; + + /// TODO(docs) + fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll>; /// Closes this `StreamMuxer`. /// @@ -100,73 +93,3 @@ pub trait StreamMuxer { /// > immediately dropping the muxer. fn poll_close(&self, cx: &mut Context<'_>) -> Poll>; } - -bitflags::bitflags! { - /// Tells the [`StreamMuxer`] which substreams it should open. - /// - /// How this is implemented may vary from muxer to muxer. Not all muxing protocols support - /// this on the lowest level so some muxers may for example still accept incoming substreams but - /// immediately drop them if the flags do not contain [`OpenFlags::INBOUND`]. - pub struct OpenFlags: u8 { - const INBOUND = 0b00000001; - const OUTBOUND = 0b00000010; - } -} - -impl Default for OpenFlags { - fn default() -> Self { - OpenFlags::INBOUND - } -} - -/// Event about a connection, reported by an implementation of [`StreamMuxer`]. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum StreamMuxerEvent { - /// Remote has opened a new substream. Contains the substream in question. - InboundSubstream(T), - - /// We opened a new substream. Contains the substream in question. - OutboundSubstream(T), - - /// Address to the remote has changed. The previous one is now obsolete. - /// - /// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes - /// > can change their IP address while retaining the same QUIC connection. - AddressChange(Multiaddr), -} - -impl StreamMuxerEvent { - /// If `self` is a [`StreamMuxerEvent::InboundSubstream`], returns the content. Otherwise - /// returns `None`. - pub fn into_inbound_substream(self) -> Option { - if let StreamMuxerEvent::InboundSubstream(s) = self { - Some(s) - } else { - None - } - } - - /// If `self` is a [`StreamMuxerEvent::OutboundSubstream`], returns the content. Otherwise - /// returns `None`. - pub fn into_outbound_substream(self) -> Option { - if let StreamMuxerEvent::OutboundSubstream(s) = self { - Some(s) - } else { - None - } - } - - /// Map the stream within [`StreamMuxerEvent::InboundSubstream`] and - /// [`StreamMuxerEvent::OutboundSubstream`] to a new type. - pub fn map_stream(self, map: impl FnOnce(T) -> O) -> StreamMuxerEvent { - match self { - StreamMuxerEvent::InboundSubstream(stream) => { - StreamMuxerEvent::InboundSubstream(map(stream)) - } - StreamMuxerEvent::OutboundSubstream(stream) => { - StreamMuxerEvent::OutboundSubstream(map(stream)) - } - StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr), - } - } -} diff --git a/core/src/muxing/boxed.rs b/core/src/muxing/boxed.rs index 3a609a2be1f..80753813dcb 100644 --- a/core/src/muxing/boxed.rs +++ b/core/src/muxing/boxed.rs @@ -1,6 +1,6 @@ -use crate::muxing::{OpenFlags, StreamMuxerEvent}; use crate::StreamMuxer; -use futures::{ready, AsyncRead, AsyncWrite}; +use futures::{AsyncRead, AsyncWrite}; +use multiaddr::Multiaddr; use std::error::Error; use std::fmt; use std::io; @@ -36,20 +36,26 @@ where type Error = io::Error; #[inline] - fn poll_event( - &self, - flags: OpenFlags, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let event = ready!(self.inner.poll_event(flags, cx).map_err(into_io_error)?) - .map_stream(SubstreamBox::new); + fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_close(cx).map_err(into_io_error) + } - Poll::Ready(Ok(event)) + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { + self.inner + .poll_inbound(cx) + .map_ok(SubstreamBox::new) + .map_err(into_io_error) } - #[inline] - fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_close(cx).map_err(into_io_error) + fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { + self.inner + .poll_outbound(cx) + .map_ok(SubstreamBox::new) + .map_err(into_io_error) + } + + fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_address_change(cx).map_err(into_io_error) } } @@ -80,19 +86,22 @@ impl StreamMuxer for StreamMuxerBox { type Substream = SubstreamBox; type Error = io::Error; - #[inline] - fn poll_event( - &self, - flags: OpenFlags, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - self.inner.poll_event(flags, cx) - } - #[inline] fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_close(cx) } + + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_inbound(cx) + } + + fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_outbound(cx) + } + + fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_address_change(cx) + } } impl SubstreamBox { diff --git a/core/src/muxing/singleton.rs b/core/src/muxing/singleton.rs index 9a035c6c2ce..d67cb5e9825 100644 --- a/core/src/muxing/singleton.rs +++ b/core/src/muxing/singleton.rs @@ -18,12 +18,10 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{ - connection::Endpoint, - muxing::{OpenFlags, StreamMuxer, StreamMuxerEvent}, -}; +use crate::{connection::Endpoint, muxing::StreamMuxer}; use futures::prelude::*; +use multiaddr::Multiaddr; use std::cell::Cell; use std::{io, task::Context, task::Poll}; @@ -59,30 +57,30 @@ where type Substream = TSocket; type Error = io::Error; - fn poll_event( - &self, - flags: OpenFlags, - _: &mut Context<'_>, - ) -> Poll, Self::Error>> { - // these combinations will never emit a stream. + fn poll_inbound(&self, _: &mut Context<'_>) -> Poll> { match self.endpoint { - Endpoint::Dialer if flags == OpenFlags::INBOUND => return Poll::Pending, - Endpoint::Listener if flags == OpenFlags::OUTBOUND => return Poll::Pending, - _ => {} + Endpoint::Dialer => Poll::Pending, + Endpoint::Listener => match self.inner.replace(None) { + None => Poll::Pending, + Some(stream) => Poll::Ready(Ok(stream)), + }, } + } - // can only emit the stream once - let socket = match self.inner.replace(None) { - None => return Poll::Pending, - Some(stream) => stream, - }; - + fn poll_outbound(&self, _: &mut Context<'_>) -> Poll> { match self.endpoint { - Endpoint::Dialer => Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(socket))), - Endpoint::Listener => Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(socket))), + Endpoint::Listener => Poll::Pending, + Endpoint::Dialer => match self.inner.replace(None) { + None => Poll::Pending, + Some(stream) => Poll::Ready(Ok(stream)), + }, } } + fn poll_address_change(&self, _: &mut Context<'_>) -> Poll> { + Poll::Pending + } + fn poll_close(&self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 2d4edb3e414..d820be01b78 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -28,9 +28,8 @@ use bytes::Bytes; use codec::LocalStreamId; use futures::{future, prelude::*, ready}; use libp2p_core::{ - muxing::{OpenFlags, StreamMuxerEvent}, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, - StreamMuxer, + Multiaddr, StreamMuxer, }; use parking_lot::Mutex; use std::{cmp, iter, pin::Pin, sync::Arc, task::Context, task::Poll}; @@ -89,39 +88,22 @@ where type Substream = Substream; type Error = io::Error; - fn poll_event( - &self, - flags: OpenFlags, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let mut io = self.io.lock(); - - loop { - if flags.contains(OpenFlags::OUTBOUND) { - if let Poll::Ready(stream_id) = io.poll_open_stream(cx)? { - return Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(Substream::new( - stream_id, - self.io.clone(), - )))); - } - } - - let stream_id = ready!(io.poll_next_stream(cx))?; - - if !flags.contains(OpenFlags::INBOUND) { - log::debug!( - "Dropping inbound stream {stream_id} because OpenFlags::INBOUND is not present" - ); - io.drop_stream(stream_id); + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { + self.io + .lock() + .poll_next_stream(cx) + .map_ok(|stream_id| Substream::new(stream_id, self.io.clone())) + } - continue; - } + fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { + self.io + .lock() + .poll_open_stream(cx) + .map_ok(|stream_id| Substream::new(stream_id, self.io.clone())) + } - return Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(Substream::new( - stream_id, - self.io.clone(), - )))); - } + fn poll_address_change(&self, _: &mut Context<'_>) -> Poll> { + Poll::Pending } fn poll_close(&self, cx: &mut Context<'_>) -> Poll> { diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 38c6fe02390..3823c309fcd 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -24,11 +24,11 @@ use futures::{ future, prelude::*, - ready, stream::{BoxStream, LocalBoxStream}, }; -use libp2p_core::muxing::{OpenFlags, StreamMuxer, StreamMuxerEvent}; +use libp2p_core::muxing::StreamMuxer; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; +use libp2p_core::Multiaddr; use parking_lot::Mutex; use std::{ fmt, io, iter, mem, @@ -106,36 +106,26 @@ where type Substream = yamux::Stream; type Error = YamuxError; - fn poll_event( - &self, - flags: OpenFlags, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let mut inner = self.0.lock(); - - loop { - if flags.contains(OpenFlags::OUTBOUND) { - if let Poll::Ready(stream) = Pin::new(&mut inner.control).poll_open_stream(cx)? { - return Poll::Ready(Ok(StreamMuxerEvent::OutboundSubstream(stream))); - } - } - - let stream = match ready!(inner.incoming.poll_next_unpin(cx)).transpose()? { - Some(stream) => stream, - None => return Poll::Ready(Err(YamuxError(ConnectionError::Closed))), - }; - - if !flags.contains(OpenFlags::INBOUND) { - log::debug!( - "Dropping inbound stream {stream} because OpenFlags::INBOUND is not present" - ); - mem::drop(stream); + fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll> { + self.0 + .lock() + .incoming + .poll_next_unpin(cx) + .map(|maybe_stream| { + maybe_stream + .transpose()? + .ok_or(YamuxError(ConnectionError::Closed)) + }) + } - continue; - } + fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0.lock().control) + .poll_open_stream(cx) + .map_err(YamuxError) + } - return Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(stream))); - } + fn poll_address_change(&self, _: &mut Context<'_>) -> Poll> { + Poll::Pending } fn poll_close(&self, c: &mut Context<'_>) -> Poll> { diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 44094a4bcd6..66f6fc46c80 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -38,7 +38,7 @@ use futures::future::poll_fn; use handler_wrapper::HandlerWrapper; use libp2p_core::connection::ConnectedPoint; use libp2p_core::multiaddr::Multiaddr; -use libp2p_core::muxing::{OpenFlags, StreamMuxerBox, StreamMuxerEvent}; +use libp2p_core::muxing::StreamMuxerBox; use libp2p_core::PeerId; use libp2p_core::{upgrade, StreamMuxer}; use std::collections::VecDeque; @@ -159,32 +159,24 @@ where } } - let mut flags = OpenFlags::default(); - - // (1) Ensure queue is not empty. if !self.open_info.is_empty() { - flags.insert(OpenFlags::OUTBOUND); - } - - // Perform I/O on the connection through the muxer, informing the handler - // of new substreams. - match self.muxing.poll_event(flags, cx)? { - Poll::Pending => {} - Poll::Ready(StreamMuxerEvent::InboundSubstream(substream)) => { - self.handler - .inject_substream(substream, SubstreamEndpoint::Listener); - continue; - } - Poll::Ready(StreamMuxerEvent::OutboundSubstream(substream)) => { + if let Poll::Ready(substream) = self.muxing.poll_outbound(cx)? { let user_data = self.open_info.pop_front().expect("See (1); qed."); let endpoint = SubstreamEndpoint::Dialer(user_data); self.handler.inject_substream(substream, endpoint); continue; } - Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { - self.handler.inject_address_change(&address); - return Poll::Ready(Ok(Event::AddressChange(address))); - } + } + + if let Poll::Ready(substream) = self.muxing.poll_inbound(cx)? { + self.handler + .inject_substream(substream, SubstreamEndpoint::Listener); + continue; + } + + if let Poll::Ready(address) = self.muxing.poll_address_change(cx)? { + self.handler.inject_address_change(&address); + return Poll::Ready(Ok(Event::AddressChange(address))); } return Poll::Pending; From 68b4c0b81b94425537886e0876cbfcaef2597428 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 24 Jun 2022 10:10:15 +0200 Subject: [PATCH 12/25] Fix mplex tests and benches --- muxers/mplex/benches/split_send_size.rs | 16 ++++------ muxers/mplex/tests/async_write.rs | 17 ++--------- muxers/mplex/tests/two_peers.rs | 39 ++++--------------------- 3 files changed, 12 insertions(+), 60 deletions(-) diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index a30e3725798..91dfe524e17 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -27,8 +27,8 @@ use futures::channel::oneshot; use futures::future::poll_fn; use futures::prelude::*; use libp2p_core::{ - identity, multiaddr::multiaddr, muxing, muxing::OpenFlags, transport, upgrade, Multiaddr, - PeerId, StreamMuxer, Transport, + identity, multiaddr::multiaddr, muxing, transport, upgrade, Multiaddr, PeerId, StreamMuxer, + Transport, }; use libp2p_mplex as mplex; use libp2p_plaintext::PlainText2Config; @@ -105,11 +105,9 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd } transport::ListenerEvent::Upgrade { upgrade, .. } => { let (_peer, conn) = upgrade.await.unwrap(); - let mut s = poll_fn(|cx| conn.poll_event(OpenFlags::INBOUND, cx)) + let mut s = poll_fn(|cx| conn.poll_inbound(cx)) .await - .expect("unexpected error") - .into_inbound_substream() - .expect("Unexpected muxer event"); + .expect("unexpected error"); let mut buf = vec![0u8; payload_len]; let mut off = 0; @@ -134,11 +132,7 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd task::block_on(async move { let addr = addr_receiver.await.unwrap(); let (_peer, conn) = transport.dial(addr).unwrap().await.unwrap(); - let mut stream = poll_fn(|cx| conn.poll_event(OpenFlags::OUTBOUND, cx)) - .await - .unwrap() - .into_outbound_substream() - .unwrap(); + let mut stream = poll_fn(|cx| conn.poll_inbound(cx)).await.unwrap(); let mut off = 0; loop { let n = poll_fn(|cx| Pin::new(&mut stream).poll_write(cx, &payload[off..])) diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index 1f27e2a4a55..9c0af7cfb86 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -20,7 +20,6 @@ use futures::future::poll_fn; use futures::{channel::oneshot, prelude::*}; -use libp2p_core::muxing::OpenFlags; use libp2p_core::{upgrade, StreamMuxer, Transport}; use libp2p_tcp::TcpConfig; use std::sync::Arc; @@ -62,11 +61,7 @@ fn async_write() { .await .unwrap(); - let mut outbound = poll_fn(|cx| client.poll_event(OpenFlags::OUTBOUND, cx)) - .await - .unwrap() - .into_outbound_substream() - .unwrap(); + let mut outbound = poll_fn(|cx| client.poll_outbound(cx)).await.unwrap(); let mut buf = Vec::new(); outbound.read_to_end(&mut buf).await.unwrap(); @@ -79,15 +74,7 @@ fn async_write() { .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); - let mut inbound = loop { - if let Some(s) = poll_fn(|cx| client.poll_event(OpenFlags::INBOUND, cx)) - .await - .unwrap() - .into_inbound_substream() - { - break s; - } - }; + let mut inbound = poll_fn(|cx| client.poll_inbound(cx)).await.unwrap(); inbound.write_all(b"hello world").await.unwrap(); // The test consists in making sure that this flushes the substream. diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index 264481b9a4d..b537bbad352 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -20,7 +20,6 @@ use futures::future::poll_fn; use futures::{channel::oneshot, prelude::*}; -use libp2p_core::muxing::OpenFlags; use libp2p_core::{upgrade, StreamMuxer, Transport}; use libp2p_tcp::TcpConfig; use std::sync::Arc; @@ -62,11 +61,7 @@ fn client_to_server_outbound() { .await .unwrap(); - let mut outbound = poll_fn(|cx| client.poll_event(OpenFlags::OUTBOUND, cx)) - .await - .unwrap() - .into_outbound_substream() - .unwrap(); + let mut outbound = poll_fn(|cx| client.poll_outbound(cx)).await.unwrap(); let mut buf = Vec::new(); outbound.read_to_end(&mut buf).await.unwrap(); @@ -79,15 +74,7 @@ fn client_to_server_outbound() { .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); - let mut inbound = loop { - if let Some(s) = poll_fn(|cx| client.poll_event(OpenFlags::INBOUND, cx)) - .await - .unwrap() - .into_inbound_substream() - { - break s; - } - }; + let mut inbound = poll_fn(|cx| client.poll_inbound(cx)).await.unwrap(); inbound.write_all(b"hello world").await.unwrap(); inbound.close().await.unwrap(); @@ -134,15 +121,7 @@ fn client_to_server_inbound() { .unwrap(), ); - let mut inbound = loop { - if let Some(s) = poll_fn(|cx| client.poll_event(OpenFlags::INBOUND, cx)) - .await - .unwrap() - .into_inbound_substream() - { - break s; - } - }; + let mut inbound = poll_fn(|cx| client.poll_inbound(cx)).await.unwrap(); let mut buf = Vec::new(); inbound.read_to_end(&mut buf).await.unwrap(); @@ -156,11 +135,7 @@ fn client_to_server_inbound() { let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); - let mut outbound = poll_fn(|cx| client.poll_event(OpenFlags::OUTBOUND, cx)) - .await - .unwrap() - .into_outbound_substream() - .unwrap(); + let mut outbound = poll_fn(|cx| client.poll_outbound(cx)).await.unwrap(); outbound.write_all(b"hello world").await.unwrap(); outbound.close().await.unwrap(); @@ -203,11 +178,7 @@ fn protocol_not_match() { .await .unwrap(); - let mut outbound = poll_fn(|cx| client.poll_event(OpenFlags::OUTBOUND, cx)) - .await - .unwrap() - .into_outbound_substream() - .unwrap(); + let mut outbound = poll_fn(|cx| client.poll_outbound(cx)).await.unwrap(); let mut buf = Vec::new(); outbound.read_to_end(&mut buf).await.unwrap(); From 4a243078bf036c2fdc67046c2771af2c544e6653 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 24 Jun 2022 12:16:54 +0200 Subject: [PATCH 13/25] Remove mentions of deleted event --- muxers/mplex/src/lib.rs | 3 --- muxers/yamux/src/lib.rs | 1 - 2 files changed, 4 deletions(-) diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index d820be01b78..59b38db1156 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -74,9 +74,6 @@ where } /// Multiplexer. Implements the `StreamMuxer` trait. -/// -/// This implementation isn't capable of detecting when the underlying socket changes its address, -/// and no [`StreamMuxerEvent::AddressChange`] event is ever emitted. pub struct Multiplex { io: Arc>>, } diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 3823c309fcd..5470786bd85 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -98,7 +98,6 @@ where pub type YamuxResult = Result; -/// > **Note**: This implementation never emits [`StreamMuxerEvent::AddressChange`] events. impl StreamMuxer for Yamux where S: Stream> + Unpin, From e742b8e3ff87b78721609da9e6ecfbce27492790 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 24 Jun 2022 12:19:20 +0200 Subject: [PATCH 14/25] Update changelog --- core/CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 7bea0f3f35a..c201234d037 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,13 +1,14 @@ # 0.34.0 - unreleased -- Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691]. - Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707]. - Replace `Into` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710]. +- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound` + and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724]. -[PR 2691]: https://github.com/libp2p/rust-libp2p/pull/2691 [PR 2707]: https://github.com/libp2p/rust-libp2p/pull/2707 [PR 2710]: https://github.com/libp2p/rust-libp2p/pull/2710 +[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724 # 0.33.0 From 2348a4cf45cd7c312ecba83c6a6e4c455518603d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 24 Jun 2022 12:23:12 +0200 Subject: [PATCH 15/25] Remove now unused bitflags dependency --- core/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/core/Cargo.toml b/core/Cargo.toml index 64bbbc21dd2..deb6479e433 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -12,7 +12,6 @@ categories = ["network-programming", "asynchronous"] [dependencies] asn1_der = "0.7.4" -bitflags = "1.3.2" bs58 = "0.4.0" ed25519-dalek = "1.0.1" either = "1.5" From 7695a808878adae7ffc0fa8d975ff6f569326f0e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 24 Jun 2022 12:25:25 +0200 Subject: [PATCH 16/25] Remove unused log dependency --- muxers/yamux/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index 0f3d61f34ab..a7c55f08949 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -16,4 +16,3 @@ libp2p-core = { version = "0.34.0", path = "../../core", default-features = fals parking_lot = "0.12" thiserror = "1.0" yamux = "0.10.0" -log = "0.4" From a086e6104e636cabdade42a0411edd43a2c46d72 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 24 Jun 2022 12:29:18 +0200 Subject: [PATCH 17/25] Better `expect` message --- swarm/src/connection.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 66f6fc46c80..1fd0c2ce0f2 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -161,7 +161,10 @@ where if !self.open_info.is_empty() { if let Poll::Ready(substream) = self.muxing.poll_outbound(cx)? { - let user_data = self.open_info.pop_front().expect("See (1); qed."); + let user_data = self + .open_info + .pop_front() + .expect("`open_info` is not empty"); let endpoint = SubstreamEndpoint::Dialer(user_data); self.handler.inject_substream(substream, endpoint); continue; From dd81ea0457592ac17d45b2c918b832c9dc7312f5 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 24 Jun 2022 12:33:27 +0200 Subject: [PATCH 18/25] Use correct function in benchmark --- muxers/mplex/benches/split_send_size.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index 91dfe524e17..ddf7e2a0a63 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -132,7 +132,7 @@ fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiadd task::block_on(async move { let addr = addr_receiver.await.unwrap(); let (_peer, conn) = transport.dial(addr).unwrap().await.unwrap(); - let mut stream = poll_fn(|cx| conn.poll_inbound(cx)).await.unwrap(); + let mut stream = poll_fn(|cx| conn.poll_outbound(cx)).await.unwrap(); let mut off = 0; loop { let n = poll_fn(|cx| Pin::new(&mut stream).poll_write(cx, &payload[off..])) From 3143a6379aa69efc7af541920bafdd9bdb3ce440 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 15 Jul 2022 14:52:10 +0200 Subject: [PATCH 19/25] Update changelog and version numbers --- Cargo.toml | 18 +++++++++--------- core/CHANGELOG.md | 11 +++++++---- core/Cargo.toml | 2 +- misc/keygen/Cargo.toml | 2 +- misc/metrics/CHANGELOG.md | 2 ++ misc/metrics/Cargo.toml | 2 +- muxers/mplex/CHANGELOG.md | 4 ++++ muxers/mplex/Cargo.toml | 4 ++-- muxers/yamux/CHANGELOG.md | 4 ++++ muxers/yamux/Cargo.toml | 4 ++-- protocols/autonat/CHANGELOG.md | 2 ++ protocols/autonat/Cargo.toml | 2 +- protocols/dcutr/CHANGELOG.md | 2 ++ protocols/dcutr/Cargo.toml | 2 +- protocols/floodsub/CHANGELOG.md | 2 ++ protocols/floodsub/Cargo.toml | 2 +- protocols/gossipsub/CHANGELOG.md | 2 ++ protocols/gossipsub/Cargo.toml | 2 +- protocols/identify/CHANGELOG.md | 2 ++ protocols/identify/Cargo.toml | 2 +- protocols/kad/CHANGELOG.md | 2 ++ protocols/kad/Cargo.toml | 2 +- protocols/mdns/CHANGELOG.md | 2 ++ protocols/mdns/Cargo.toml | 2 +- protocols/ping/CHANGELOG.md | 2 ++ protocols/ping/Cargo.toml | 2 +- protocols/relay/CHANGELOG.md | 2 ++ protocols/relay/Cargo.toml | 2 +- protocols/rendezvous/CHANGELOG.md | 2 ++ protocols/rendezvous/Cargo.toml | 2 +- protocols/request-response/CHANGELOG.md | 2 ++ protocols/request-response/Cargo.toml | 2 +- swarm/CHANGELOG.md | 2 ++ swarm/Cargo.toml | 2 +- transports/deflate/CHANGELOG.md | 4 ++++ transports/deflate/Cargo.toml | 4 ++-- transports/dns/CHANGELOG.md | 4 ++++ transports/dns/Cargo.toml | 4 ++-- transports/noise/CHANGELOG.md | 4 ++++ transports/noise/Cargo.toml | 4 ++-- transports/plaintext/Cargo.toml | 2 +- transports/tcp/Cargo.toml | 2 +- transports/uds/CHANGELOG.md | 4 ++++ transports/uds/Cargo.toml | 4 ++-- transports/wasm-ext/CHANGELOG.md | 4 ++++ transports/wasm-ext/Cargo.toml | 4 ++-- transports/websocket/CHANGELOG.md | 4 ++++ transports/websocket/Cargo.toml | 4 ++-- 48 files changed, 107 insertions(+), 46 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c12144717c2..c5577ceb90f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,14 +78,14 @@ instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature lazy_static = "1.2" libp2p-autonat = { version = "0.6.0", path = "protocols/autonat", optional = true } -libp2p-core = { version = "0.34.0", path = "core", default-features = false } +libp2p-core = { version = "0.35.0", path = "core", default-features = false } libp2p-dcutr = { version = "0.5.0", path = "protocols/dcutr", optional = true } libp2p-floodsub = { version = "0.38.0", path = "protocols/floodsub", optional = true } libp2p-identify = { version = "0.38.0", path = "protocols/identify", optional = true } libp2p-kad = { version = "0.39.0", path = "protocols/kad", optional = true } libp2p-metrics = { version = "0.8.0", path = "misc/metrics", optional = true } -libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true } -libp2p-noise = { version = "0.37.0", path = "transports/noise", optional = true } +libp2p-mplex = { version = "0.35.0", path = "muxers/mplex", optional = true } +libp2p-noise = { version = "0.38.0", path = "transports/noise", optional = true } libp2p-ping = { version = "0.38.0", path = "protocols/ping", optional = true } libp2p-plaintext = { version = "0.34.0", path = "transports/plaintext", optional = true } libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true } @@ -94,9 +94,9 @@ libp2p-rendezvous = { version = "0.8.0", path = "protocols/rendezvous", optional libp2p-request-response = { version = "0.20.0", path = "protocols/request-response", optional = true } libp2p-swarm = { version = "0.38.0", path = "swarm" } libp2p-swarm-derive = { version = "0.28.0", path = "swarm-derive" } -libp2p-uds = { version = "0.33.0", path = "transports/uds", optional = true } -libp2p-wasm-ext = { version = "0.34.0", path = "transports/wasm-ext", default-features = false, optional = true } -libp2p-yamux = { version = "0.38.0", path = "muxers/yamux", optional = true } +libp2p-uds = { version = "0.34.0", path = "transports/uds", optional = true } +libp2p-wasm-ext = { version = "0.35.0", path = "transports/wasm-ext", default-features = false, optional = true } +libp2p-yamux = { version = "0.39.0", path = "muxers/yamux", optional = true } multiaddr = { version = "0.14.0" } parking_lot = "0.12.0" pin-project = "1.0.0" @@ -104,11 +104,11 @@ rand = "0.7.3" # Explicit dependency to be used in `wasm-bindgen` feature smallvec = "1.6.1" [target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies] -libp2p-deflate = { version = "0.34.0", path = "transports/deflate", optional = true } -libp2p-dns = { version = "0.34.0", path = "transports/dns", optional = true, default-features = false } +libp2p-deflate = { version = "0.35.0", path = "transports/deflate", optional = true } +libp2p-dns = { version = "0.35.0", path = "transports/dns", optional = true, default-features = false } libp2p-mdns = { version = "0.39.0", path = "protocols/mdns", optional = true } libp2p-tcp = { version = "0.34.0", path = "transports/tcp", default-features = false, optional = true } -libp2p-websocket = { version = "0.36.0", path = "transports/websocket", optional = true } +libp2p-websocket = { version = "0.37.0", path = "transports/websocket", optional = true } [target.'cfg(not(target_os = "unknown"))'.dependencies] libp2p-gossipsub = { version = "0.40.0", path = "protocols/gossipsub", optional = true } diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 20a6147464b..d63f0315dd6 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.35.0 + +- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound` + and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724]. + +[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724 + # 0.34.0 - Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait @@ -8,14 +15,10 @@ Instead the `Transport` is polled directly via `Transport::poll`. The `Transport` is now responsible for driving its listeners. See [PR 2652]. -- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound` - and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724]. - [PR 2691]: https://github.com/libp2p/rust-libp2p/pull/2691 [PR 2707]: https://github.com/libp2p/rust-libp2p/pull/2707 [PR 2710]: https://github.com/libp2p/rust-libp2p/pull/2710 [PR 2652]: https://github.com/libp2p/rust-libp2p/pull/2652 -[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724 # 0.33.0 diff --git a/core/Cargo.toml b/core/Cargo.toml index deb6479e433..386dff09669 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-core" edition = "2021" rust-version = "1.56.1" description = "Core traits and structs of libp2p" -version = "0.34.0" +version = "0.35.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/misc/keygen/Cargo.toml b/misc/keygen/Cargo.toml index 614aa2e6bf2..4be54014f0a 100644 --- a/misc/keygen/Cargo.toml +++ b/misc/keygen/Cargo.toml @@ -13,5 +13,5 @@ clap = {version = "3.1.6", features = ["derive"]} zeroize = "1" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" -libp2p-core = { path = "../../core", default-features = false, version = "0.34.0"} +libp2p-core = { path = "../../core", default-features = false, version = "0.35.0"} base64 = "0.13.0" diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index 1b3053eaae7..c9cc21a06f1 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -14,6 +14,8 @@ - Track number of connected nodes supporting a specific protocol via the identify protocol. See [PR 2734]. +- Update to `libp2p-core` `v0.35.0`. + [PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ # 0.7.0 diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index f38e192947f..7fdfec733e0 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -19,7 +19,7 @@ relay = ["libp2p-relay"] dcutr = ["libp2p-dcutr"] [dependencies] -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-dcutr = { version = "0.5.0", path = "../../protocols/dcutr", optional = true } libp2p-identify = { version = "0.38.0", path = "../../protocols/identify", optional = true } libp2p-kad = { version = "0.39.0", path = "../../protocols/kad", optional = true } diff --git a/muxers/mplex/CHANGELOG.md b/muxers/mplex/CHANGELOG.md index add3d1ace0d..45f2c217ce0 100644 --- a/muxers/mplex/CHANGELOG.md +++ b/muxers/mplex/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.35.0 + +- Update to `libp2p-core` `v0.35.0` + # 0.34.0 - `Substream` now implements `AsyncRead` and `AsyncWrite`. See [PR 2706]. diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 3b5a82cd959..ac053a5020f 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-mplex" edition = "2021" rust-version = "1.56.1" description = "Mplex multiplexing protocol for libp2p" -version = "0.34.0" +version = "0.35.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] bytes = "1" futures = "0.3.1" asynchronous-codec = "0.6" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4" nohash-hasher = "0.2" parking_lot = "0.12" diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index 95d01fbbfcd..5544ad15ab4 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.39.0 + +- Update to `libp2p-core` `v0.35.0` + # 0.38.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index a7c55f08949..02dfb832d8d 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-yamux" edition = "2021" rust-version = "1.56.1" description = "Yamux multiplexing protocol for libp2p" -version = "0.38.0" +version = "0.39.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } parking_lot = "0.12" thiserror = "1.0" yamux = "0.10.0" diff --git a/protocols/autonat/CHANGELOG.md b/protocols/autonat/CHANGELOG.md index eafaecff2a5..0c48b41951b 100644 --- a/protocols/autonat/CHANGELOG.md +++ b/protocols/autonat/CHANGELOG.md @@ -4,6 +4,8 @@ - Update to `libp2p-request-response` `v0.20.0`. +- Update to `libp2p-core` `v0.35.0`. + # 0.5.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/autonat/Cargo.toml b/protocols/autonat/Cargo.toml index 9e0d272785d..44f3c2b894d 100644 --- a/protocols/autonat/Cargo.toml +++ b/protocols/autonat/Cargo.toml @@ -18,7 +18,7 @@ async-trait = "0.1" futures = "0.3" futures-timer = "3.0" instant = "0.1" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } libp2p-request-response = { version = "0.20.0", path = "../request-response" } log = "0.4" diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 0416de5e9cb..f740b188455 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -4,6 +4,8 @@ - Expose `PROTOCOL_NAME`. See [PR 2734]. +- Update to `libp2p-core` `v0.35.0`. + [PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ # 0.4.0 diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml index 1786412cadf..6f8c4c284fd 100644 --- a/protocols/dcutr/Cargo.toml +++ b/protocols/dcutr/Cargo.toml @@ -17,7 +17,7 @@ either = "1.6.0" futures = "0.3.1" futures-timer = "3.0" instant = "0.1.11" -libp2p-core = { version = "0.34.0", path = "../../core" } +libp2p-core = { version = "0.35.0", path = "../../core" } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4" prost-codec = { version = "0.1", path = "../../misc/prost-codec" } diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index 491c87b99b5..89d4dd22b16 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -2,6 +2,8 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Update to `libp2p-core` `v0.35.0`. + # 0.37.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index 2556384f00a..d240ef66d07 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] cuckoofilter = "0.5.0" fnv = "1.0" futures = "0.3.1" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4" prost = "0.10" diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 0d0a5fa333f..f6b0902d306 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -2,6 +2,8 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Update to `libp2p-core` `v0.35.0`. + # 0.39.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 5e787076647..e992fb3b9ef 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] libp2p-swarm = { version = "0.38.0", path = "../../swarm" } -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } bytes = "1.0" byteorder = "1.3.4" fnv = "1.0.7" diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index 499af6e3a13..38acfad9b64 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -4,6 +4,8 @@ - Expose `PROTOCOL_NAME` and `PUSH_PROTOCOL_NAME`. See [PR 2734]. +- Update to `libp2p-core` `v0.35.0`. + [PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ # 0.37.0 diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 2fc604d1c25..1b3341df3f8 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] asynchronous-codec = "0.6" futures = "0.3.1" futures-timer = "3.0.2" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4.1" lru = "0.7.2" diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index 66730eecde5..6b78a6fcbce 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -2,6 +2,8 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Update to `libp2p-core` `v0.35.0`. + # 0.38.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 6686664f9b2..f33697f2e32 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -18,7 +18,7 @@ fnv = "1.0" asynchronous-codec = "0.6" futures = "0.3.1" log = "0.4" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } prost = "0.10" rand = "0.7.2" diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 320f8b13a2e..1bf4be8f6ae 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -2,6 +2,8 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Update to `libp2p-core` `v0.35.0`. + # 0.38.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index f97c185030a..eb32b7e9426 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -17,7 +17,7 @@ dns-parser = "0.8.0" futures = "0.3.13" if-watch = "1.0.0" lazy_static = "1.4.0" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4.14" rand = "0.8.3" diff --git a/protocols/ping/CHANGELOG.md b/protocols/ping/CHANGELOG.md index af9bb0a9690..e14934fe7a2 100644 --- a/protocols/ping/CHANGELOG.md +++ b/protocols/ping/CHANGELOG.md @@ -4,6 +4,8 @@ - Expose `PROTOCOL_NAME`. See [PR 2734]. +- Update to `libp2p-core` `v0.35.0`. + [PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ # 0.37.0 diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index 741cbf05fbc..14984332db7 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] futures = "0.3.1" futures-timer = "3.0.2" instant = "0.1.11" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4.1" rand = "0.7.2" diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index f03817080ea..85e066668bc 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -4,6 +4,8 @@ - Expose `HOP_PROTOCOL_NAME` and `STOP_PROTOCOL_NAME`. See [PR 2734]. +- Update to `libp2p-core` `v0.35.0`. + [PR 2734]: https://github.com/libp2p/rust-libp2p/pull/2734/ # 0.10.0 diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index 97d29ebdfd0..817025842f8 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -17,7 +17,7 @@ either = "1.6.0" futures = "0.3.1" futures-timer = "3" instant = "0.1.11" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4" pin-project = "1" diff --git a/protocols/rendezvous/CHANGELOG.md b/protocols/rendezvous/CHANGELOG.md index 11120e0be1b..c087f77b393 100644 --- a/protocols/rendezvous/CHANGELOG.md +++ b/protocols/rendezvous/CHANGELOG.md @@ -2,6 +2,8 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Update to `libp2p-core` `v0.35.0`. + # 0.7.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/rendezvous/Cargo.toml b/protocols/rendezvous/Cargo.toml index b8d81a87e20..745285b91bd 100644 --- a/protocols/rendezvous/Cargo.toml +++ b/protocols/rendezvous/Cargo.toml @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] asynchronous-codec = "0.6" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } prost = "0.10" void = "1" diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 8acc422ee40..bf556496fc1 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -2,6 +2,8 @@ - Update to `libp2p-swarm` `v0.38.0`. +- Update to `libp2p-core` `v0.35.0`. + # 0.19.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 010647fe6b3..91f95484f73 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1" bytes = "1" futures = "0.3.1" instant = "0.1.11" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } libp2p-swarm = { version = "0.38.0", path = "../../swarm" } log = "0.4.11" rand = "0.7" diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 174924889b0..02edf9beef4 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -2,6 +2,8 @@ - Update dial address concurrency factor to `8`, thus dialing up to 8 addresses concurrently for a single connection attempt. See `Swarm::dial_concurrency_factor` and [PR 2741]. +- Update to `libp2p-core` `v0.35.0`. + [PR 2741]: https://github.com/libp2p/rust-libp2p/pull/2741/ # 0.37.0 diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 2953c984368..e2829f80093 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -16,7 +16,7 @@ fnv = "1.0" futures = "0.3.1" futures-timer = "3.0.2" instant = "0.1.11" -libp2p-core = { version = "0.34.0", path = "../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../core", default-features = false } log = "0.4" pin-project = "1.0.0" rand = "0.7" diff --git a/transports/deflate/CHANGELOG.md b/transports/deflate/CHANGELOG.md index ebc2d811375..a2e4112caa2 100644 --- a/transports/deflate/CHANGELOG.md +++ b/transports/deflate/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.35.0 + +- Update to `libp2p-core` `v0.35.0`. + # 0.34.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/deflate/Cargo.toml b/transports/deflate/Cargo.toml index 82536d8acc3..4ac8661d0a3 100644 --- a/transports/deflate/Cargo.toml +++ b/transports/deflate/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-deflate" edition = "2021" rust-version = "1.56.1" description = "Deflate encryption protocol for libp2p" -version = "0.34.0" +version = "0.35.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } flate2 = "1.0" [dev-dependencies] diff --git a/transports/dns/CHANGELOG.md b/transports/dns/CHANGELOG.md index a32d5a95eb5..6c8a49af7b4 100644 --- a/transports/dns/CHANGELOG.md +++ b/transports/dns/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.35.0 + +- Update to `libp2p-core` `v0.35.0`. + # 0.34.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/dns/Cargo.toml b/transports/dns/Cargo.toml index 1aaa7a15302..46ca3aca1af 100644 --- a/transports/dns/Cargo.toml +++ b/transports/dns/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-dns" edition = "2021" rust-version = "1.56.1" description = "DNS transport implementation for libp2p" -version = "0.34.0" +version = "0.35.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4.1" futures = "0.3.1" async-std-resolver = { version = "0.21", optional = true } diff --git a/transports/noise/CHANGELOG.md b/transports/noise/CHANGELOG.md index ca830796b59..6b323eabd27 100644 --- a/transports/noise/CHANGELOG.md +++ b/transports/noise/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.38.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0`. + # 0.37.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/noise/Cargo.toml b/transports/noise/Cargo.toml index f0b8ef8996e..171a89a4715 100644 --- a/transports/noise/Cargo.toml +++ b/transports/noise/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-noise" edition = "2021" rust-version = "1.56.1" description = "Cryptographic handshake protocol using the noise framework." -version = "0.37.0" +version = "0.38.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ bytes = "1" curve25519-dalek = "3.0.0" futures = "0.3.1" lazy_static = "1.2" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4" prost = "0.10" rand = "0.8.3" diff --git a/transports/plaintext/Cargo.toml b/transports/plaintext/Cargo.toml index e5534f93c1e..4e51cf852ba 100644 --- a/transports/plaintext/Cargo.toml +++ b/transports/plaintext/Cargo.toml @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] bytes = "1" futures = "0.3.1" asynchronous-codec = "0.6" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4.8" prost = "0.10" unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index e72f3eaee56..0b86689fa32 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -18,7 +18,7 @@ if-watch = { version = "1.0.0", optional = true } if-addrs = { version = "0.7.0", optional = true } ipnet = "2.0.0" libc = "0.2.80" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4.11" socket2 = { version = "0.4.0", features = ["all"] } tokio-crate = { package = "tokio", version = "1.19.0", default-features = false, features = ["net"], optional = true } diff --git a/transports/uds/CHANGELOG.md b/transports/uds/CHANGELOG.md index 65c5da0559a..c27c12b8c3e 100644 --- a/transports/uds/CHANGELOG.md +++ b/transports/uds/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.34.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0`. + # 0.33.0 - Update dependencies. diff --git a/transports/uds/Cargo.toml b/transports/uds/Cargo.toml index 30d01c4f490..e00e6ae09f3 100644 --- a/transports/uds/Cargo.toml +++ b/transports/uds/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-uds" edition = "2021" rust-version = "1.56.1" description = "Unix domain sockets transport for libp2p" -version = "0.33.0" +version = "0.34.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [target.'cfg(all(unix, not(target_os = "emscripten")))'.dependencies] async-std = { version = "1.6.2", optional = true } -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4.1" futures = "0.3.1" tokio = { version = "1.15", default-features = false, features = ["net"], optional = true } diff --git a/transports/wasm-ext/CHANGELOG.md b/transports/wasm-ext/CHANGELOG.md index 65ff72d10bf..323ee80cbe0 100644 --- a/transports/wasm-ext/CHANGELOG.md +++ b/transports/wasm-ext/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.35.0 + +- Update to `libp2p-core` `v0.35.0`. + # 0.34.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/wasm-ext/Cargo.toml b/transports/wasm-ext/Cargo.toml index a0f67226513..34926d04b52 100644 --- a/transports/wasm-ext/Cargo.toml +++ b/transports/wasm-ext/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-wasm-ext" edition = "2021" rust-version = "1.56.1" description = "Allows passing in an external transport in a WASM environment" -version = "0.34.0" +version = "0.35.0" authors = ["Pierre Krieger "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" js-sys = "0.3.50" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } parity-send-wrapper = "0.1.0" wasm-bindgen = "0.2.42" wasm-bindgen-futures = "0.4.4" diff --git a/transports/websocket/CHANGELOG.md b/transports/websocket/CHANGELOG.md index 3783fef6c44..65f9eea1e96 100644 --- a/transports/websocket/CHANGELOG.md +++ b/transports/websocket/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.37.0 + +- Update to `libp2p-core` `v0.35.0`. + # 0.36.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index 624fc0cbe4f..49121c4f22b 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-websocket" edition = "2021" rust-version = "1.56.1" description = "WebSocket transport for libp2p" -version = "0.36.0" +version = "0.37.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] futures-rustls = "0.22" either = "1.5.3" futures = "0.3.1" -libp2p-core = { version = "0.34.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.35.0", path = "../../core", default-features = false } log = "0.4.8" parking_lot = "0.12.0" quicksink = "0.1" From bea8318dc693455b26727637190333957b9f5142 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 15 Jul 2022 14:53:46 +0200 Subject: [PATCH 20/25] Add unreleased label --- core/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index d63f0315dd6..0102096d780 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,4 +1,4 @@ -# 0.35.0 +# 0.35.0 [unreleased] - Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound` and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724]. From 8bbdbfa698d321e6bcd26f3349048e4a07b2dd4d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 15 Jul 2022 14:58:15 +0200 Subject: [PATCH 21/25] Add docs to `StreamMuxer` --- core/src/muxing.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 4620c82845e..a2bdfa80b37 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -63,8 +63,8 @@ mod singleton; /// Provides multiplexing for a connection by allowing users to open substreams. /// /// A substream created by a [`StreamMuxer`] is a type that implements [`AsyncRead`] and [`AsyncWrite`]. -/// -/// TODO(docs) +/// The [`StreamMuxer`] itself is modelled closely after [`AsyncWrite`]. It features `poll`-style +/// functions that allow the implementation to make progress on various tasks. pub trait StreamMuxer { /// Type of the object that represents the raw substream where data can be read and written. type Substream: AsyncRead + AsyncWrite; @@ -72,13 +72,15 @@ pub trait StreamMuxer { /// Error type of the muxer type Error: std::error::Error; - /// TODO(docs) + /// Poll for new inbound substreams. fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll>; - /// TODO(docs) + /// Poll for a new, outbound substream. fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll>; - /// TODO(docs) + /// Poll for an address change of the underlying connection. + /// + /// Not all implementations may support this feature. fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll>; /// Closes this `StreamMuxer`. From 9409250fd6b38d0fd5c7caac097c89016677acd2 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 15 Jul 2022 15:10:10 +0200 Subject: [PATCH 22/25] Fix changelog of yamux and mplex --- muxers/mplex/CHANGELOG.md | 2 +- muxers/yamux/CHANGELOG.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/muxers/mplex/CHANGELOG.md b/muxers/mplex/CHANGELOG.md index 45f2c217ce0..6b374e1b66c 100644 --- a/muxers/mplex/CHANGELOG.md +++ b/muxers/mplex/CHANGELOG.md @@ -1,4 +1,4 @@ -# 0.35.0 +# 0.35.0 [unreleased] - Update to `libp2p-core` `v0.35.0` diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index 5544ad15ab4..e8eded11836 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,4 +1,4 @@ -# 0.39.0 +# 0.39.0 [unreleased] - Update to `libp2p-core` `v0.35.0` From 4f7bd459ce4c13d09d280b2052d8de3df6da221e Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 15 Jul 2022 15:10:20 +0200 Subject: [PATCH 23/25] Explicitly return result --- muxers/yamux/src/lib.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 5470786bd85..a06e7934cf0 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -111,9 +111,11 @@ where .incoming .poll_next_unpin(cx) .map(|maybe_stream| { - maybe_stream + let stream = maybe_stream .transpose()? - .ok_or(YamuxError(ConnectionError::Closed)) + .ok_or(YamuxError(ConnectionError::Closed))?; + + Ok(stream) }) } From f20fd740c8e10329f75b1aa1246effccd3676d2d Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 15 Jul 2022 15:10:34 +0200 Subject: [PATCH 24/25] Add docs for poll loop in `Connection` --- swarm/src/connection.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/swarm/src/connection.rs b/swarm/src/connection.rs index 8785c181ac4..8d29ca53793 100644 --- a/swarm/src/connection.rs +++ b/swarm/src/connection.rs @@ -150,7 +150,7 @@ where Poll::Pending => {} Poll::Ready(handler_wrapper::Event::OutboundSubstreamRequest(user_data)) => { self.open_info.push_back(user_data); - continue; + continue; // Poll handler until exhausted. } Poll::Ready(handler_wrapper::Event::Custom(event)) => { return Poll::Ready(Ok(Event::Handler(event))); @@ -165,14 +165,14 @@ where .expect("`open_info` is not empty"); let endpoint = SubstreamEndpoint::Dialer(user_data); self.handler.inject_substream(substream, endpoint); - continue; + continue; // Go back to the top, handler can potentially make progress again. } } if let Poll::Ready(substream) = self.muxing.poll_inbound(cx)? { self.handler .inject_substream(substream, SubstreamEndpoint::Listener); - continue; + continue; // Go back to the top, handler can potentially make progress again. } if let Poll::Ready(address) = self.muxing.poll_address_change(cx)? { @@ -180,7 +180,7 @@ where return Poll::Ready(Ok(Event::AddressChange(address))); } - return Poll::Pending; + return Poll::Pending; // Nothing can make progress, return `Pending`. } } } From 3f776eff9e4e89366152c5c3b4fa2aa0f201aa7a Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 15 Jul 2022 15:19:56 +0200 Subject: [PATCH 25/25] Fixup a few more changelogs and manifests --- Cargo.toml | 2 +- transports/deflate/CHANGELOG.md | 2 +- transports/dns/CHANGELOG.md | 2 +- transports/plaintext/CHANGELOG.md | 4 ++++ transports/plaintext/Cargo.toml | 2 +- transports/wasm-ext/CHANGELOG.md | 2 +- transports/websocket/CHANGELOG.md | 2 +- 7 files changed, 10 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c5577ceb90f..9e6d6ea4786 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,7 @@ libp2p-metrics = { version = "0.8.0", path = "misc/metrics", optional = true } libp2p-mplex = { version = "0.35.0", path = "muxers/mplex", optional = true } libp2p-noise = { version = "0.38.0", path = "transports/noise", optional = true } libp2p-ping = { version = "0.38.0", path = "protocols/ping", optional = true } -libp2p-plaintext = { version = "0.34.0", path = "transports/plaintext", optional = true } +libp2p-plaintext = { version = "0.35.0", path = "transports/plaintext", optional = true } libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true } libp2p-relay = { version = "0.11.0", path = "protocols/relay", optional = true } libp2p-rendezvous = { version = "0.8.0", path = "protocols/rendezvous", optional = true } diff --git a/transports/deflate/CHANGELOG.md b/transports/deflate/CHANGELOG.md index a2e4112caa2..317a9ab78d3 100644 --- a/transports/deflate/CHANGELOG.md +++ b/transports/deflate/CHANGELOG.md @@ -1,4 +1,4 @@ -# 0.35.0 +# 0.35.0 [unreleased] - Update to `libp2p-core` `v0.35.0`. diff --git a/transports/dns/CHANGELOG.md b/transports/dns/CHANGELOG.md index 6c8a49af7b4..c9b9083cb3a 100644 --- a/transports/dns/CHANGELOG.md +++ b/transports/dns/CHANGELOG.md @@ -1,4 +1,4 @@ -# 0.35.0 +# 0.35.0 [unreleased] - Update to `libp2p-core` `v0.35.0`. diff --git a/transports/plaintext/CHANGELOG.md b/transports/plaintext/CHANGELOG.md index 560075bc0a2..7c5c389a0cd 100644 --- a/transports/plaintext/CHANGELOG.md +++ b/transports/plaintext/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.35.0 [unreleased] + +- Update to `libp2p-core` `v0.35.0`. + # 0.34.0 - Update to `libp2p-core` `v0.34.0`. diff --git a/transports/plaintext/Cargo.toml b/transports/plaintext/Cargo.toml index 4e51cf852ba..c8cd9395da0 100644 --- a/transports/plaintext/Cargo.toml +++ b/transports/plaintext/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-plaintext" edition = "2021" rust-version = "1.56.1" description = "Plaintext encryption dummy protocol for libp2p" -version = "0.34.0" +version = "0.35.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/transports/wasm-ext/CHANGELOG.md b/transports/wasm-ext/CHANGELOG.md index 323ee80cbe0..c3438b68b20 100644 --- a/transports/wasm-ext/CHANGELOG.md +++ b/transports/wasm-ext/CHANGELOG.md @@ -1,4 +1,4 @@ -# 0.35.0 +# 0.35.0 [unreleased] - Update to `libp2p-core` `v0.35.0`. diff --git a/transports/websocket/CHANGELOG.md b/transports/websocket/CHANGELOG.md index 65f9eea1e96..1a46978cd3b 100644 --- a/transports/websocket/CHANGELOG.md +++ b/transports/websocket/CHANGELOG.md @@ -1,4 +1,4 @@ -# 0.37.0 +# 0.37.0 [unreleased] - Update to `libp2p-core` `v0.35.0`.