Skip to content

Commit

Permalink
Fix StreamMuxer::Error to io::Error
Browse files Browse the repository at this point in the history
We are already enforcing that the associated type must convert to
`io::Error`. We might as well just make all functions return an
`io::Error` directly.
  • Loading branch information
thomaseizinger committed Jun 15, 2022
1 parent 04f31cd commit aa946c2
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 128 deletions.
45 changes: 21 additions & 24 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,23 +203,22 @@ where
{
type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>;
type Error = io::Error;

fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
) -> Poll<io::Result<StreamMuxerEvent<Self::Substream>>> {
match self {
EitherOutput::First(inner) => inner.poll_event(cx).map(|result| {
result.map_err(|e| e.into()).map(|event| match event {
result.map(|event| match event {
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
StreamMuxerEvent::InboundSubstream(substream) => {
StreamMuxerEvent::InboundSubstream(EitherOutput::First(substream))
}
})
}),
EitherOutput::Second(inner) => inner.poll_event(cx).map(|result| {
result.map_err(|e| e.into()).map(|event| match event {
result.map(|event| match event {
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
StreamMuxerEvent::InboundSubstream(substream) => {
StreamMuxerEvent::InboundSubstream(EitherOutput::Second(substream))
Expand All @@ -240,16 +239,14 @@ where
&self,
cx: &mut Context<'_>,
substream: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>> {
) -> Poll<io::Result<Self::Substream>> {
match (self, substream) {
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::First))
.map_err(|e| e.into()),
.map(|p| p.map(EitherOutput::First)),
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::Second))
.map_err(|e| e.into()),
.map(|p| p.map(EitherOutput::Second)),
_ => panic!("Wrong API usage"),
}
}
Expand All @@ -272,13 +269,13 @@ where
cx: &mut Context<'_>,
sub: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, Self::Error>> {
) -> Poll<io::Result<usize>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.read_substream(cx, sub, buf).map_err(|e| e.into())
inner.read_substream(cx, sub, buf)
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.read_substream(cx, sub, buf).map_err(|e| e.into())
inner.read_substream(cx, sub, buf)
}
_ => panic!("Wrong API usage"),
}
Expand All @@ -289,13 +286,13 @@ where
cx: &mut Context<'_>,
sub: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>> {
) -> Poll<io::Result<usize>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.write_substream(cx, sub, buf).map_err(|e| e.into())
inner.write_substream(cx, sub, buf)
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.write_substream(cx, sub, buf).map_err(|e| e.into())
inner.write_substream(cx, sub, buf)
}
_ => panic!("Wrong API usage"),
}
Expand All @@ -305,13 +302,13 @@ where
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
) -> Poll<io::Result<()>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.flush_substream(cx, sub).map_err(|e| e.into())
inner.flush_substream(cx, sub)
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.flush_substream(cx, sub).map_err(|e| e.into())
inner.flush_substream(cx, sub)
}
_ => panic!("Wrong API usage"),
}
Expand All @@ -321,13 +318,13 @@ where
&self,
cx: &mut Context<'_>,
sub: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
) -> Poll<io::Result<()>> {
match (self, sub) {
(EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => {
inner.shutdown_substream(cx, sub).map_err(|e| e.into())
inner.shutdown_substream(cx, sub)
}
(EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => {
inner.shutdown_substream(cx, sub).map_err(|e| e.into())
inner.shutdown_substream(cx, sub)
}
_ => panic!("Wrong API usage"),
}
Expand All @@ -346,10 +343,10 @@ where
}
}

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match self {
EitherOutput::First(inner) => inner.poll_close(cx).map_err(|e| e.into()),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(|e| e.into()),
EitherOutput::First(inner) => inner.poll_close(cx),
EitherOutput::Second(inner) => inner.poll_close(cx),
}
}
}
Expand Down
33 changes: 15 additions & 18 deletions core/src/muxing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ pub trait StreamMuxer {
/// Future that will be resolved when the outgoing substream is open.
type OutboundSubstream;

/// Error type of the muxer
type Error: Into<io::Error>;

/// Polls for a connection-wide event.
///
/// This function behaves the same as a `Stream`.
Expand All @@ -98,7 +95,7 @@ pub trait StreamMuxer {
fn poll_event(
&self,
cx: &mut Context<'_>,
) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>>;
) -> Poll<io::Result<StreamMuxerEvent<Self::Substream>>>;

/// Opens a new outgoing substream, and produces the equivalent to a future that will be
/// resolved when it becomes available.
Expand All @@ -120,7 +117,7 @@ pub trait StreamMuxer {
&self,
cx: &mut Context<'_>,
s: &mut Self::OutboundSubstream,
) -> Poll<Result<Self::Substream, Self::Error>>;
) -> Poll<io::Result<Self::Substream>>;

/// Destroys an outbound substream future. Use this after the outbound substream has finished,
/// or if you want to interrupt it.
Expand All @@ -142,7 +139,7 @@ pub trait StreamMuxer {
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, Self::Error>>;
) -> Poll<io::Result<usize>>;

/// Write data to a substream. The behaviour is the same as `futures::AsyncWrite::poll_write`.
///
Expand All @@ -160,7 +157,7 @@ pub trait StreamMuxer {
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>>;
) -> Poll<io::Result<usize>>;

/// Flushes a substream. The behaviour is the same as `futures::AsyncWrite::poll_flush`.
///
Expand All @@ -176,7 +173,7 @@ pub trait StreamMuxer {
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>>;
) -> Poll<io::Result<()>>;

/// Attempts to shut down the writing side of a substream. The behaviour is similar to
/// `AsyncWrite::poll_close`.
Expand All @@ -193,7 +190,7 @@ pub trait StreamMuxer {
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>>;
) -> Poll<io::Result<()>>;

/// Destroys a substream.
fn destroy_substream(&self, s: Self::Substream);
Expand All @@ -210,7 +207,7 @@ pub trait StreamMuxer {
/// > that the remote is properly informed of the shutdown. However, apart from
/// > properly informing the remote, there is no difference between this and
/// > immediately dropping the muxer.
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn poll_close(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
}

/// Event about a connection, reported by an implementation of [`StreamMuxer`].
Expand Down Expand Up @@ -242,7 +239,7 @@ impl<T> StreamMuxerEvent<T> {
/// object that implements `Read`/`Write`/`AsyncRead`/`AsyncWrite`.
pub fn event_from_ref_and_wrap<P>(
muxer: P,
) -> impl Future<Output = Result<StreamMuxerEvent<SubstreamRef<P>>, <P::Target as StreamMuxer>::Error>>
) -> impl Future<Output = io::Result<StreamMuxerEvent<SubstreamRef<P>>>>
where
P: Deref + Clone,
P::Target: StreamMuxer,
Expand Down Expand Up @@ -281,7 +278,7 @@ where
P: Deref + Clone,
P::Target: StreamMuxer,
{
type Output = Result<SubstreamRef<P>, <P::Target as StreamMuxer>::Error>;
type Output = io::Result<SubstreamRef<P>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Future::poll(Pin::new(&mut self.inner), cx) {
Expand Down Expand Up @@ -330,7 +327,7 @@ where
P: Deref,
P::Target: StreamMuxer,
{
type Output = Result<<P::Target as StreamMuxer>::Substream, <P::Target as StreamMuxer>::Error>;
type Output = io::Result<<P::Target as StreamMuxer>::Substream>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// We use a `this` because the compiler isn't smart enough to allow mutably borrowing
Expand Down Expand Up @@ -419,7 +416,7 @@ where
let this = &mut *self;

let s = this.substream.as_mut().expect("substream was empty");
this.muxer.read_substream(cx, s, buf).map_err(|e| e.into())
this.muxer.read_substream(cx, s, buf)
}
}

Expand All @@ -438,7 +435,7 @@ where
let this = &mut *self;

let s = this.substream.as_mut().expect("substream was empty");
this.muxer.write_substream(cx, s, buf).map_err(|e| e.into())
this.muxer.write_substream(cx, s, buf)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
Expand All @@ -451,12 +448,12 @@ where
match this.shutdown_state {
ShutdownState::Shutdown => match this.muxer.shutdown_substream(cx, s) {
Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Flush,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
},
ShutdownState::Flush => match this.muxer.flush_substream(cx, s) {
Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Done,
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
Poll::Pending => return Poll::Pending,
},
ShutdownState::Done => {
Expand All @@ -472,7 +469,7 @@ where
let this = &mut *self;

let s = this.substream.as_mut().expect("substream was empty");
this.muxer.flush_substream(cx, s).map_err(|e| e.into())
this.muxer.flush_substream(cx, s)
}
}

Expand Down

0 comments on commit aa946c2

Please sign in to comment.