Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/noise: Update to futures-preview #1248

Merged
merged 34 commits into from
Oct 3, 2019

Conversation

mxinden
Copy link
Member

@mxinden mxinden commented Sep 18, 2019

This pull request updates protocols/noise to use futures-preview 0.3.0-alpha.17. It is ready for review but still has a couple of TODO comments that I would like to have your input on.

Copy link
Member

@tomaka tomaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't finished reviewing.

async fn recv_empty<T>(state: &mut State<T>) -> Result<(), NoiseError>
where
T: AsyncRead
T: AsyncReadExt + Unpin
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be AsyncRead, not AsyncReadExt. AsyncReadExt is automatically implemented on anything that is AsyncRead. Same everywhere else you do that.

@@ -18,6 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

#![feature(async_closure)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to use features!

@@ -145,55 +146,63 @@ fn run<T, U>(server_transport: T, client_transport: U, message1: Vec<u8>)
where
T: Transport<Output = Output>,
T::Dial: Send + 'static,
T::Listener: Send + 'static,
T::Listener: Send + Unpin + futures::stream::TryStreamExt + 'static,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TryStream. No Ext.

protocols/noise/src/io.rs Outdated Show resolved Hide resolved
protocols/noise/src/io.rs Show resolved Hide resolved
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let buffer = self.buffer.borrow_mut();
impl<T: AsyncRead + Unpin> AsyncRead for NoiseOutput<T> {
fn poll_read(mut self: core::pin::Pin<&mut Self>, cx: &mut futures::task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, futures::io::Error>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn poll_read(mut self: core::pin::Pin<&mut Self>, cx: &mut futures::task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, futures::io::Error>> {
fn poll_read(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, futures::io::Error>> {

I would probably also use futures::io as nio (for n on-blocking I/O, or fio (f utures I/O) if you like) and thus write e.g. nio::Error but that is just my personal preference, i.e. I like to read io::xxx and nio::xxx.

Copy link
Member

@tomaka tomaka Sep 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

futures::io::Error is simply a re-export of std::io::Error. I would just use std::io::Error everywhere to avoid confusions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a change to std::io::Error.
Also, please improve the formatting. We're supposed to be limited to column 100.

protocols/noise/src/io.rs Outdated Show resolved Hide resolved
protocols/noise/src/io.rs Outdated Show resolved Hide resolved

impl<T: AsyncRead> AsyncRead for NoiseOutput<T> {
unsafe fn prepare_uninitialized_buffer(&self, _: &mut [u8]) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would still be good to implement unsafe fn initializer(&self) -> Initializer instead, with an implementation of Initializer::nop(), which should be the equivalent to this implementation of prepare_uninitialized_buffer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that as of right now the buffer would be zeroed on each call to poll_read? Does someone have experience with how expensive that is? Is it worth including unsafe code to skip the zeroing?

After doing some basic benchmarking with criterion, not zeroing gives me a performance benefit order of magnitude smaller than anything network io related.

This is in no way a profound or strong opinion from my side. I guess I am mostly curious what you think / experienced.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See fa7d919.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, see #1107. I think this is in general just a low-hanging fruit if you know the AsyncRead impl does not read from the buffer, which is very unusual. I personally haven't done any benchmarking, but apparently @tomaka did.

Copy link
Member

@tomaka tomaka Sep 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some benchmarking of libp2p a looong time ago, and zero-ing memory before reading was approximately a third of the CPU usage during a typical work load.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in fa7d919.

protocols/noise/src/io.rs Outdated Show resolved Hide resolved
protocols/noise/src/io.rs Outdated Show resolved Hide resolved
{
// TODO: Do we want to `read` here? Don't we want to read nothing at all
// (empty)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I don't understand the question. We want to read a handshake message with an empty payload (hence &mut []). Can you elaborate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, never mind. I forgot about the two byte frame length bytes that we read within read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not just that, the handshake messages transmit the key material used in the handshake. Payloads are entirely optional in the handshake messages. NoiseOutput reads the frame length and the opaque data of that length from the underlying I/O stream, passes that to snow::HandshakeState::read_message which maintains the handshake state and decrypts the payload, if any. NoiseOutput then copies the decrypted payload into the buf given to read (with intermediate buffering in buffer). Analogously for writing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioning discussion offline for future me:

fn read_exact(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>

read_exact borrows the asynchronous reader in addition it takes a reference to
the destination buffer [2]. The borrow of the reader (this.io) is not allowed to
outlive the poll function lifetime, but it has to, as the ReadExact future
needs to outlive multiple poll calls.

One could transform the future into a future that takes ownership of the underlying I/O resource. But in that situation concurrent reading and writing wouldn't be possible, given that the resource is owned either by a reading or a writing state.

[2] https://docs.rs/futures-preview/0.3.0-alpha.18/futures/io/trait.AsyncReadExt.html#method.read_exact

Handshake::rt1_responder(socket, session,
self.dh_keys.into_identity(),
IdentityExchange::Mutual)
Box::pin(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to wrap these blocks in another future that just waits on the inner future?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't express the type of the inner future. Async functions are similar to closures in the way that the type they generate is totally opaque.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems preferable to me though to keep rt1_responder & co return an opaque, boxed Future, as was done before with the Handshake newtype, instead of declaring them as async fns. Then the re-wrapping here in the declaration of the handshake patterns shouldn't be necessary, leaving the code and types much as before, with type Future = Handshake<...>.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old code is very hard to upgrade to new futures, because all the reading/writing helper functions provided by the futures library (such as write_all or read_exact) now hold the socket by reference instead of taking ownership of it.

On the other hand, the async/await syntax makes it possible to write code that is very easy to read (a linear list of instructions, as opposed to a state machine). This is one of the main reasons why we're upgrading to stable futures.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, the async/await syntax makes it possible to write code that is very easy to read (a linear list of instructions, as opposed to a state machine). This is one of the main reasons why we're upgrading to stable futures.

I'm not suggesting not to use async/await for the implementation of rt1_responder et al, just to return an opaque Handshake future as before, to avoid the repeated re-wrapping in the definition of the handshake pattern configs. It may be my ignorance that I don't see why that wouldn't work and simplify things, maybe I have to try it myself to see what happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See mxinden#1.

The previous implementation of AsyncRead for NoiseOutput would operate
on uninitialized buffers, given that it properly returned the number of
bytest that were written to the buffer. With this patch the current
implementation operates on uninitialized buffers as well by returning an
Initializer::nop() in AsyncRead::initializer.
@mxinden
Copy link
Member Author

mxinden commented Sep 19, 2019

@tomaka @romanb thanks for the thorough review. I either addressed or replied to each of your comments. Would you mind taking another look?

@romanb
Copy link
Contributor

romanb commented Sep 21, 2019

I would still like to see the changes from mxinden#1 included and preferably the code commentary around mutably borrowing from Pin removed (i.e. this discussion). Otherwise it looks good to me!

Given that `let mut this = &mut *self` is not specific to a pinned self,
but follows the dereference coercion [1] happening at compile time when
trying to mutably borrow two distinct struct fields, this patch removes
the code comment.

[1]
```rust
let x = &mut self.deref_mut().x;
let y = &mut self.deref_mut().y; // error

// ---

let mut this = self.deref_mut();
let x = &mut this.x;
let y = &mut this.y; // ok
```
@mxinden
Copy link
Member Author

mxinden commented Sep 26, 2019

@tomaka any further comments from your side?

fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let buffer = self.buffer.borrow_mut();
impl<T: AsyncRead + Unpin> AsyncRead for NoiseOutput<T> {
fn poll_read(mut self: core::pin::Pin<&mut Self>, cx: &mut futures::task::Context<'_>, buf: &mut [u8]) -> Poll<Result<usize, futures::io::Error>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a change to std::io::Error.
Also, please improve the formatting. We're supposed to be limited to column 100.

fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let buffer = self.buffer.borrow_mut();
impl<T: AsyncWrite + Unpin> AsyncWrite for NoiseOutput<T> {
fn poll_write(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> futures::task::Poll<std::result::Result<usize, std::io::Error>>{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why all the namespaces?

Suggested change
fn poll_write(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &[u8]) -> futures::task::Poll<std::result::Result<usize, std::io::Error>>{
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize, std::io::Error>>{

}
WriteState::WriteData { len, ref mut off } => {
let n = self.io.write(&buffer.write_crypto[*off .. len])?;
let n = match ready!(Pin::new(&mut this.io).poll_write( cx, &buffer.write_crypto[*off .. len])) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let n = match ready!(Pin::new(&mut this.io).poll_write( cx, &buffer.write_crypto[*off .. len])) {
let n = match ready!(Pin::new(&mut this.io).poll_write(cx, &buffer.write_crypto[*off .. len])) {

}
}
}

fn flush(&mut self) -> io::Result<()> {
let buffer = self.buffer.borrow_mut();
fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> futures::task::Poll<std::result::Result<(), std::io::Error>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn poll_flush(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> futures::task::Poll<std::result::Result<(), std::io::Error>> {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {

false
}
}
fn poll_close(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> futures::task::Poll<std::result::Result<(), std::io::Error>>{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

impl<T, C> Future for Handshake<T, C> {
type Output = Result<(RemoteIdentity<C>, NoiseOutput<T>), NoiseError>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {

FutureResult<Self::Output, Self::Error>,
fn((RemoteIdentity<C>, NoiseOutput<Negotiated<T>>)) -> FutureResult<Self::Output, Self::Error>
>;
type Future = Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Unpin>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type Future = Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Unpin>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

Copy link
Member Author

@mxinden mxinden Sep 26, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell @tomaka that would require core::upgrade::InboundUpgrade::Future to be Send. Should that be part of this pull request?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We definitely want all futures to be send, as they can move around between threads of a threads pool while they are being executed, so yes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. See 555c2df.

.and_then(|(remote, io)| future::result(match remote {
RemoteIdentity::IdentityKey(pk) => Ok((pk.into_peer_id(), io)),
_ => Err(NoiseError::AuthenticationFailed)
Box::new(self.config.upgrade_inbound(socket, info)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Box::new(self.config.upgrade_inbound(socket, info)
Box::pin(self.config.upgrade_inbound(socket, info)

FutureResult<Self::Output, Self::Error>,
fn((RemoteIdentity<C>, NoiseOutput<Negotiated<T>>)) -> FutureResult<Self::Output, Self::Error>
>;
type Future = Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Unpin>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type Future = Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Unpin>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

.and_then(|(remote, io)| future::result(match remote {
RemoteIdentity::IdentityKey(pk) => Ok((pk.into_peer_id(), io)),
_ => Err(NoiseError::AuthenticationFailed)
Box::new(self.config.upgrade_outbound(socket, info)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Box::new(self.config.upgrade_outbound(socket, info)
Box::pin(self.config.upgrade_outbound(socket, info)

@mxinden mxinden marked this pull request as ready for review September 26, 2019 13:07
Copy link
Member

@tomaka tomaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the error when you add + Send to the Box<dyn Future> that noise generates? I don't get it.

@@ -49,7 +49,7 @@ impl<C, U, F, T> InboundUpgrade<C> for MapInboundUpgrade<U, F>
where
U: InboundUpgrade<C>,
U::Future: Unpin,
F: FnOnce(U::Output) -> T
F: FnOnce(U::Output) -> T + Send
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not what I was thinking of.

@@ -145,7 +145,7 @@ pub trait InboundUpgrade<C>: UpgradeInfo {
type Error;
/// Future that performs the handshake with the remote.
// TODO: remove Unpin
type Future: Future<Output = Result<Self::Output, Self::Error>> + Unpin;
type Future: Future<Output = Result<Self::Output, Self::Error>> + Unpin + Send;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This neither.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomaka I am sorry, I must be misunderstanding something here.

You would like <NoiseAuthenticated<P, C, R> as InboundUpgrade<T>>::Future to be Send, right?

As far as I understood something can only be Send if everything below it is Send.

<NoiseAuthenticated<P, C, R> as InboundUpgrade<T>>::upgrade_inbound calls <NoiseConfig<P, C, R> as InboundUpgrade<T>>::upgrade_inbound which by our requirement above needs to return a Send Future, thus the added trait bound (Send) on the InboundUpgrade trait.

What am I missing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mentioned, the InboundUpgrade is Send if all its components are Send. So all you have to do is require Send on its components within the code of libp2p-noise.

As far as I understood something can only be Send if everything below it is Send.

The exception is Box<dyn Future> which is never send and that you have to replace with Box<dyn Future + Send>. That's the only change that you might have to do outside of libp2p-noise.

@mxinden
Copy link
Member Author

mxinden commented Sep 30, 2019

@tomaka thanks for bearing with me. Could you take another look?

@mxinden
Copy link
Member Author

mxinden commented Oct 3, 2019

I don't have permissions to merge into libp2p:stable-futures. Given that both of you gave their looks-good-to-me, could you hit the merge button @tomaka @romanb?

@romanb romanb merged commit 73aa278 into libp2p:stable-futures Oct 3, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants