-
Notifications
You must be signed in to change notification settings - Fork 949
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core/muxing: Force StreamMuxer::Substream
to implement Async{Read,Write}
#2707
Conversation
loop { | ||
match this.shutdown_state { | ||
ShutdownState::Shutdown => match this.muxer.shutdown_substream(cx, s) { | ||
Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Flush, | ||
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), | ||
Poll::Pending => return Poll::Pending, | ||
}, | ||
ShutdownState::Flush => match this.muxer.flush_substream(cx, s) { | ||
Poll::Ready(Ok(())) => this.shutdown_state = ShutdownState::Done, | ||
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())), | ||
Poll::Pending => return Poll::Pending, | ||
}, | ||
ShutdownState::Done => { | ||
return Poll::Ready(Ok(())); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is important to focus on this bit of deleted code here.
Previously, it would have been very likely that a substream created by any StreamMuxer
implementation would have gone through this logic because using the StreamMuxer
interface without the help of SubstreamRef
would be extremely cumbersome.
With this PR, substreams are forced to implement AsyncRead
and AsyncWrite
and thus, users can directly use them. Even if we put this piece of logic into SubstreamBox
, it may be bypassed. The mplex implementation follows the pattern of "first close, then flush" anyway and is also separately tested.
My take here is: It is fine to delete this code as long as we document that all implementations of StreamMuxer
must create substreams that flush after closing. If we ever get to do #508, this could be part of that test suite. Perhaps we can extract such a test suite out of the mplex tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"first close, then flush" anyway and is also separately tested.
I am a bit confused here. My intuition would be: A call to poll_close
first flushes the substream before closing it. Same as documented e.g. in Tokio's `AsyncWrite:
Invocation of a shutdown implies an invocation of flush. Once this method returns Ready it implies that a flush successfully happened before the shutdown happened. That is, callers don’t need to call flush before calling shutdown. They can rely that by calling shutdown any pending buffered data will be written out.
https://docs.rs/tokio/1.6.0/tokio/io/trait.AsyncWrite.html#tymethod.poll_shutdown
Is this not the convention in libp2p? Based on the above it is not. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not the convention in libp2p?
At least for mplex, no:
rust-libp2p/muxers/mplex/src/io.rs
Line 527 in ea487ae
/// > **Note**: As opposed to `poll_close()`, a flush it not implied. |
I am not 100% sure why this code was added on this abstraction level. With your quote from above, I think this PR is actually making things more correct then because the now added AsyncWrite
impl on libp2p_mplex::Substream
implies flush on close.
Maybe @tomaka can provide some background why this was solved on a generic level previously?
SubstreamBox
which implements Async{Read,Write}
StreamMuxer::Substream
to implement Async{Read,Write}
Hmm, I thought I am clever by stacking PRs but with our squash merge strategy, those first commits don't get dropped :/ |
…ite}` Instead of delegating reading and writing of the substream to the `StreamMuxer`, we introduce a `SubstreamBox` which implements `Async{Read,Write}`. This is used as the `Substream` associated type in `StreamMuxerBox`. This allows us to delete the `read_substream` etc functions from the `StreamMuxer` trait.
cdfa81e
to
f014c29
Compare
Rebased only to solve conflicts: https://github.com/libp2p/rust-libp2p/compare/cdfa81ef4d46ec47cf8f1602ee707b986d2569a5..f014c299064236cbf2af11debe57428771935e7b |
These are already implied through the associated type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just two comments. Otherwise this looks great to me. Very happy to see this happening.
Might want to give @tomaka a bit more time, in case he has opinions.
@@ -1,8 +1,11 @@ | |||
# 0.33.1 - unreleased | |||
# 0.34.0 - unreleased |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing version bump in core/Cargo.toml
. I prepared this in 6d4e09a with my very hacky script. Hope this is helpful @thomaseizinger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, that seems to be based off a different version of master? I applied the commit but it generates a lot of merge conflicts.
Investingating ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, seems to be due to the recent release off master! I've resolved the conflicts :)
Co-authored-by: Max Inden <mail@max-inden.de>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to hit the merge button any time.
…Write}` (libp2p#2707) Co-authored-by: Max Inden <mail@max-inden.de>
Description
Instead of delegating reading and writing of the substream to the
StreamMuxer
,we introduce a
SubstreamBox
which implementsAsync{Read,Write}
. This isused as the
Substream
associated type inStreamMuxerBox
. This allows us todelete the
read_substream
etc functions from theStreamMuxer
trait.Links to any relevant issues
Extracted out of #2648.
Open Questions
Change checklist
I have added tests that prove my fix is effective or that my feature works