Skip to content

Commit

Permalink
core/muxer: Deprecate StreamMuxerExt::next_{inbound,outbound} (#3002)
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Oct 12, 2022
1 parent eba2efe commit f711dd5
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 9 deletions.
3 changes: 3 additions & 0 deletions core/CHANGELOG.md
Expand Up @@ -5,8 +5,11 @@
- Remove default features. If you previously depended on `secp256k1` or `ecdsa` you need to enable these explicitly
now. See [PR 2918].

- Deprecate `StreamMuxerExt::next_{inbound,outbound}`. See [PR 3002].

[PR 2915]: https://github.com/libp2p/rust-libp2p/pull/2915
[PR 2918]: https://github.com/libp2p/rust-libp2p/pull/2918
[PR 3002]: https://github.com/libp2p/rust-libp2p/pull/3002

# 0.36.0

Expand Down
8 changes: 8 additions & 0 deletions core/src/muxing.rs
Expand Up @@ -160,11 +160,19 @@ pub trait StreamMuxerExt: StreamMuxer + Sized {
}

/// Returns a future that resolves to the next inbound `Substream` opened by the remote.
#[deprecated(
since = "0.37.0",
note = "This future violates the `StreamMuxer` contract because it doesn't call `StreamMuxer::poll`."
)]
fn next_inbound(&mut self) -> NextInbound<'_, Self> {
NextInbound(self)
}

/// Returns a future that opens a new outbound `Substream` with the remote.
#[deprecated(
since = "0.37.0",
note = "This future violates the `StreamMuxer` contract because it doesn't call `StreamMuxer::poll`."
)]
fn next_outbound(&mut self) -> NextOutbound<'_, Self> {
NextOutbound(self)
}
Expand Down
8 changes: 6 additions & 2 deletions muxers/mplex/benches/split_send_size.rs
Expand Up @@ -114,7 +114,10 @@ fn run(
}
transport::TransportEvent::Incoming { upgrade, .. } => {
let (_peer, mut conn) = upgrade.await.unwrap();
let mut s = conn.next_inbound().await.expect("unexpected error");
// Just calling `poll_inbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut s = poll_fn(|cx| conn.poll_inbound_unpin(cx))
.await
.expect("unexpected error");

let mut buf = vec![0u8; payload_len];
let mut off = 0;
Expand All @@ -139,7 +142,8 @@ fn run(
let sender = async move {
let addr = addr_receiver.await.unwrap();
let (_peer, mut conn) = sender_trans.dial(addr).unwrap().await.unwrap();
let mut stream = conn.next_outbound().await.unwrap();
// Just calling `poll_outbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut stream = poll_fn(|cx| conn.poll_outbound_unpin(cx)).await.unwrap();
let mut off = 0;
loop {
let n = poll_fn(|cx| Pin::new(&mut stream).poll_write(cx, &payload[off..]))
Expand Down
11 changes: 9 additions & 2 deletions muxers/mplex/tests/async_write.rs
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures::future::poll_fn;
use futures::{channel::oneshot, prelude::*};
use libp2p::core::muxing::StreamMuxerExt;
use libp2p::core::{upgrade, Transport};
Expand Down Expand Up @@ -59,7 +60,10 @@ fn async_write() {
.await
.unwrap();

let mut outbound = client.next_outbound().await.unwrap();
// Just calling `poll_outbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut outbound = poll_fn(|cx| client.poll_outbound_unpin(cx))
.await
.expect("unexpected error");

let mut buf = Vec::new();
outbound.read_to_end(&mut buf).await.unwrap();
Expand All @@ -73,7 +77,10 @@ fn async_write() {

let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();

let mut inbound = client.next_inbound().await.unwrap();
// Just calling `poll_inbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut inbound = poll_fn(|cx| client.poll_inbound_unpin(cx))
.await
.expect("unexpected error");
inbound.write_all(b"hello world").await.unwrap();

// The test consists in making sure that this flushes the substream.
Expand Down
26 changes: 21 additions & 5 deletions muxers/mplex/tests/two_peers.rs
Expand Up @@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures::future::poll_fn;
use futures::{channel::oneshot, prelude::*};
use libp2p::core::muxing::StreamMuxerExt;
use libp2p::core::{upgrade, Transport};
Expand Down Expand Up @@ -59,7 +60,10 @@ fn client_to_server_outbound() {
.await
.unwrap();

let mut outbound = client.next_outbound().await.unwrap();
// Just calling `poll_outbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut outbound = poll_fn(|cx| client.poll_outbound_unpin(cx))
.await
.expect("unexpected error");

let mut buf = Vec::new();
outbound.read_to_end(&mut buf).await.unwrap();
Expand All @@ -73,7 +77,10 @@ fn client_to_server_outbound() {
.boxed();

let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
let mut inbound = client.next_inbound().await.unwrap();
// Just calling `poll_inbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut inbound = poll_fn(|cx| client.poll_inbound_unpin(cx))
.await
.expect("unexpected error");
inbound.write_all(b"hello world").await.unwrap();
inbound.close().await.unwrap();

Expand Down Expand Up @@ -117,7 +124,10 @@ fn client_to_server_inbound() {
.await
.unwrap();

let mut inbound = client.next_inbound().await.unwrap();
// Just calling `poll_inbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut inbound = poll_fn(|cx| client.poll_inbound_unpin(cx))
.await
.expect("unexpected error");

let mut buf = Vec::new();
inbound.read_to_end(&mut buf).await.unwrap();
Expand All @@ -132,7 +142,10 @@ fn client_to_server_inbound() {

let mut client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();

let mut outbound = client.next_outbound().await.unwrap();
// Just calling `poll_outbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut outbound = poll_fn(|cx| client.poll_outbound_unpin(cx))
.await
.expect("unexpected error");
outbound.write_all(b"hello world").await.unwrap();
outbound.close().await.unwrap();

Expand Down Expand Up @@ -174,7 +187,10 @@ fn protocol_not_match() {
.await
.unwrap();

let mut outbound = client.next_outbound().await.unwrap();
// Just calling `poll_outbound` without `poll` is fine here because mplex makes progress through all `poll_` functions. It is hacky though.
let mut outbound = poll_fn(|cx| client.poll_outbound_unpin(cx))
.await
.expect("unexpected error");

let mut buf = Vec::new();
outbound.read_to_end(&mut buf).await.unwrap();
Expand Down

0 comments on commit f711dd5

Please sign in to comment.