Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/libp2p/rust-libp2p into c…
Browse files Browse the repository at this point in the history
…oding-guidelines
  • Loading branch information
mxinden committed Aug 4, 2022
2 parents 995fd8c + 028dece commit cc89ff5
Show file tree
Hide file tree
Showing 22 changed files with 376 additions and 265 deletions.
2 changes: 1 addition & 1 deletion .cargo/config.toml
@@ -1,3 +1,3 @@
[alias]
# Temporary solution to have clippy config in a single place until https://github.com/rust-lang/rust-clippy/blob/master/doc/roadmap-2021.md#lintstoml-configuration is shipped.
custom-clippy = "clippy -- -A clippy::type_complexity -A clippy::pedantic -D warnings"
custom-clippy = "clippy --all-features -- -A clippy::type_complexity -A clippy::pedantic -D warnings"
7 changes: 7 additions & 0 deletions core/CHANGELOG.md
Expand Up @@ -2,8 +2,15 @@

- Remove `StreamMuxer::poll_event` in favor of individual functions: `poll_inbound`, `poll_outbound`
and `poll_address_change`. Consequently, `StreamMuxerEvent` is also removed. See [PR 2724].
- Drop `Unpin` requirement from `SubstreamBox`. See [PR 2762] and [PR 2776].
- Drop `Sync` requirement on `StreamMuxer` for constructing `StreamMuxerBox`. See [PR 2775].
- Use `Pin<&mut Self>` as the receiver type for all `StreamMuxer` poll functions. See [PR 2765].

[PR 2724]: https://github.com/libp2p/rust-libp2p/pull/2724
[PR 2762]: https://github.com/libp2p/rust-libp2p/pull/2762
[PR 2775]: https://github.com/libp2p/rust-libp2p/pull/2775
[PR 2776]: https://github.com/libp2p/rust-libp2p/pull/2776
[PR 2765]: https://github.com/libp2p/rust-libp2p/pull/2765

# 0.34.0

Expand Down
43 changes: 27 additions & 16 deletions core/src/either.rs
Expand Up @@ -204,43 +204,54 @@ where
type Substream = EitherOutput<A::Substream, B::Substream>;
type Error = EitherError<A::Error, B::Error>;

fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => inner
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner
.poll_inbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
EitherOutput::Second(inner) => inner
EitherOutputProj::Second(inner) => inner
.poll_inbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
}
}

fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>> {
match self {
EitherOutput::First(inner) => inner
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::First)
.map_err(EitherError::A),
EitherOutput::Second(inner) => inner
EitherOutputProj::Second(inner) => inner
.poll_outbound(cx)
.map_ok(EitherOutput::Second)
.map_err(EitherError::B),
}
}

fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_address_change(cx).map_err(EitherError::B),
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_address_change(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => {
inner.poll_address_change(cx).map_err(EitherError::B)
}
}
}

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
EitherOutputProj::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutputProj::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/identity/ecdsa.rs
Expand Up @@ -157,7 +157,7 @@ impl PublicKey {
let buf = Self::del_asn1_header(k).ok_or_else(|| {
DecodingError::new("failed to parse asn.1 encoded ecdsa p256 public key")
})?;
Self::from_bytes(&buf)
Self::from_bytes(buf)
}

// ecPublicKey (ANSI X9.62 public key type) OID: 1.2.840.10045.2.1
Expand Down Expand Up @@ -198,8 +198,8 @@ impl PublicKey {
if asn1_head[0] != 0x30
|| asn1_head[2] != 0x30
|| asn1_head[3] as usize != oids_len
|| &oids_buf[..Self::EC_PUBLIC_KEY_OID.len()] != &Self::EC_PUBLIC_KEY_OID
|| &oids_buf[Self::EC_PUBLIC_KEY_OID.len()..] != &Self::SECP_256_R1_OID
|| oids_buf[..Self::EC_PUBLIC_KEY_OID.len()] != Self::EC_PUBLIC_KEY_OID
|| oids_buf[Self::EC_PUBLIC_KEY_OID.len()..] != Self::SECP_256_R1_OID
|| bitstr_head[0] != 0x03
|| bitstr_head[2] != 0x00
{
Expand Down
119 changes: 115 additions & 4 deletions core/src/muxing.rs
Expand Up @@ -52,6 +52,8 @@

use futures::{task::Context, task::Poll, AsyncRead, AsyncWrite};
use multiaddr::Multiaddr;
use std::future::Future;
use std::pin::Pin;

pub use self::boxed::StreamMuxerBox;
pub use self::boxed::SubstreamBox;
Expand All @@ -73,15 +75,24 @@ pub trait StreamMuxer {
type Error: std::error::Error;

/// Poll for new inbound substreams.
fn poll_inbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;
fn poll_inbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>;

/// Poll for a new, outbound substream.
fn poll_outbound(&self, cx: &mut Context<'_>) -> Poll<Result<Self::Substream, Self::Error>>;
fn poll_outbound(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>;

/// Poll for an address change of the underlying connection.
///
/// Not all implementations may support this feature.
fn poll_address_change(&self, cx: &mut Context<'_>) -> Poll<Result<Multiaddr, Self::Error>>;
fn poll_address_change(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>;

/// Closes this `StreamMuxer`.
///
Expand All @@ -93,5 +104,105 @@ pub trait StreamMuxer {
/// > that the remote is properly informed of the shutdown. However, apart from
/// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer.
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}

/// Extension trait for [`StreamMuxer`].
pub trait StreamMuxerExt: StreamMuxer + Sized {
/// Convenience function for calling [`StreamMuxer::poll_inbound`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_inbound_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_inbound(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_outbound`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_outbound_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Substream, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_outbound(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_address_change`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_address_change_unpin(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<Multiaddr, Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_address_change(cx)
}

/// Convenience function for calling [`StreamMuxer::poll_close`] for [`StreamMuxer`]s that are `Unpin`.
fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_close(cx)
}

/// Returns a future that resolves to the next inbound `Substream` opened by the remote.
fn next_inbound(&mut self) -> NextInbound<'_, Self> {
NextInbound(self)
}

/// Returns a future that opens a new outbound `Substream` with the remote.
fn next_outbound(&mut self) -> NextOutbound<'_, Self> {
NextOutbound(self)
}

/// Returns a future for closing this [`StreamMuxer`].
fn close(self) -> Close<Self> {
Close(self)
}
}

impl<S> StreamMuxerExt for S where S: StreamMuxer {}

pub struct NextInbound<'a, S>(&'a mut S);

pub struct NextOutbound<'a, S>(&'a mut S);

pub struct Close<S>(S);

impl<'a, S> Future for NextInbound<'a, S>
where
S: StreamMuxer + Unpin,
{
type Output = Result<S::Substream, S::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_inbound_unpin(cx)
}
}

impl<'a, S> Future for NextOutbound<'a, S>
where
S: StreamMuxer + Unpin,
{
type Output = Result<S::Substream, S::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_outbound_unpin(cx)
}
}

impl<S> Future for Close<S>
where
S: StreamMuxer + Unpin,
{
type Output = Result<(), S::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_close_unpin(cx)
}
}

0 comments on commit cc89ff5

Please sign in to comment.