Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

how to implement stream r/w in parallel? #1108

Closed
kingluo opened this issue May 25, 2019 · 37 comments
Closed

how to implement stream r/w in parallel? #1108

kingluo opened this issue May 25, 2019 · 37 comments
Labels
A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-net Module: tokio/net

Comments

@kingluo
Copy link

kingluo commented May 25, 2019

As known, due to the ownership, we could not r/w the stream at the same time.

The split in tokio seems like a faked split, because it uses mutex for r/w.

Behind the scenes, split ensures that if we both try to read and write at the same time, only one of them happen at a time.

Although we do not block on syscall, which is just the goal of async programming, but the syscall itself is locked by that mutex, i.e. when the read() syscall is processing, we could not do write() syscall. That seems very silly, with obvious performance impact. At syscall level, the read and write could be in parallel, which is meaningful for full-duplex app protocols (e.g. http 1.1 with pipe-lining and http2),

Could we do a real split? Like we could clone mio::net::TcpStream (https://docs.rs/mio/0.6.18/mio/net/struct.TcpStream.html#method.try_clone), but sharing the registration and other stuff? That way, we do have real zero cost abstraction.

@vojtechkral
Copy link
Contributor

I've ran into this as well. So far, I've written this (very ugly) workaround for a non-locking split:

#[derive(Debug)]
pub struct TcpStreamRecv(Arc<UnsafeCell<TcpStream>>);

unsafe impl Send for TcpStreamRecv {}
unsafe impl Sync for TcpStreamRecv {}

impl TcpStreamRecv {
    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
        unsafe { &*self.0.get() }.shutdown(how)
    }
}

impl io::Read for TcpStreamRecv {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        unsafe { &mut *self.0.get() }.read(buf)
    }
}

impl AsyncRead for TcpStreamRecv {
    unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
        (&*self.0.get()).prepare_uninitialized_buffer(buf)
    }

    fn read_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
        unsafe { &mut *self.0.get() }.read_buf(buf)
    }
}


#[derive(Debug)]
pub struct TcpStreamSend(Arc<UnsafeCell<TcpStream>>);

unsafe impl Send for TcpStreamSend {}
unsafe impl Sync for TcpStreamSend {}

impl TcpStreamSend {
    pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
        unsafe { &*self.0.get() }.shutdown(how)
    }
}

impl io::Write for TcpStreamSend {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        unsafe { &mut *self.0.get() }.write(buf)
    }

    fn flush(&mut self) -> io::Result<()> {
        unsafe { &mut *self.0.get() }.flush()
    }
}


impl AsyncWrite for TcpStreamSend {
    fn shutdown(&mut self) -> Poll<(), io::Error> {
        unsafe { &mut *self.0.get() }.shutdown()
    }

    fn write_buf<B: bytes::Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
        unsafe { &mut *self.0.get() }.write_buf(buf)
    }
}

pub fn tcp_split(stream: TcpStream) -> (TcpStreamSend, TcpStreamRecv) {
    let inner = Arc::new(UnsafeCell::new(stream));
    let send = TcpStreamSend(inner.clone());
    let recv = TcpStreamRecv(inner);
    (send, recv)
}

Like I said, it's ugly, but I don't think it can be much prettier without some sort of explicit support...

@quininer
Copy link
Member

quininer commented Aug 9, 2019

This cannot be implement in tokio-rustls because we have to lock ClientSession to share state.

We might have a full-duplex rustls implementation? @ctz

@vojtechkral
Copy link
Contributor

@quininer afaik this is fixed in tokio in master, so you shouldn't use custom hacks and just wait for the next relesae I think? :)

@carllerche
Copy link
Member

Non-locking splits should be possible even w/ tokio-rustls using the split_mut strategy TcpStream uses now.

@carllerche carllerche added this to the v0.2 milestone Aug 9, 2019
@quininer
Copy link
Member

quininer commented Aug 9, 2019

Yes, but we still have to use a lock for the ClientSession/ServerSession of rustls. maybe I should open a issue in rustls.

@carllerche
Copy link
Member

@quininer why do you need a lock?

@quininer
Copy link
Member

I want TlsStream of tokio-rustls to implement parallel reads and writes just like TcpStream. but TlsStream read and write requires a mutable Session state, which makes it impossible.

Ideally, the Session is split into a pair of ReadSession and WriteSession to achieve full duplex. but this needs to be implement in inside rustls.

@blckngm
Copy link
Contributor

blckngm commented Aug 10, 2019

I want TlsStream of tokio-rustls to implement parallel reads and writes just like TcpStream. but TlsStream read and write requires a mutable Session state, which makes it impossible.

Ideally, the Session is split into a pair of ReadSession and WriteSession to achieve full duplex. but this needs to be implement in inside rustls.

The generic AsyncRead::split already takes care of the locking. It is not a problem if you cannot implement a more optimized split like TcpStream.

@quininer
Copy link
Member

The generic AsyncRead::split already takes care of the locking. It is not a problem if you cannot implement a more optimized split like TcpStream.

Yes, I want to implement this optimization for TlsStream. I want TlsStream is full duplex.

@carllerche
Copy link
Member

It is possible to implement a zero cost split_mut that takes &mut self and provides a read and write handle. I plan on adding it soonish.

@blckngm
Copy link
Contributor

blckngm commented Aug 10, 2019

It is possible to implement a zero cost split_mut that takes &mut self and provides a read and write handle. I plan on adding it soonish.

How would that work for TLS streams here? The two handles can have two & references, but not two &mut references.

@blckngm
Copy link
Contributor

blckngm commented Aug 10, 2019

I want TlsStream is full duplex.

There might be a bit of misunderstanding here. The generic split does give you two handles that are full-duplex. The lock is taken only during polling. A blocked read operation will not block an write operation, and vice versa.

@quininer
Copy link
Member

A blocked read operation will not block an write operation, and vice versa.

I don't think this is true, otherwise you can write and read a Vec at the same time.

I didn't find generic split implementation in tokio, but in the implementation of futures-rs, it does lock the entire read operation with BiLock.

@DoumanAsh
Copy link
Contributor

DoumanAsh commented Aug 10, 2019

I think right now in tokio there is only split for concrete IO types (like TcpStream), no generic split lock.
As pro it doesn't require locking to sync between Read and Write halves

@blckngm
Copy link
Contributor

blckngm commented Aug 11, 2019

A blocked read operation will not block an write operation, and vice versa.

I don't think this is true, otherwise you can write and read a Vec at the same time.

I didn't find generic split implementation in tokio, but in the implementation of futures-rs, it does lock the entire read operation with BiLock.

A "blocked" poll_read would return Pending, and the lock is then released, and the other task can take the lock and poll_write. Suppose that a read task(R) calls read(...).await, and a write task(W) calls write(...).await:

R: poll_lock -> Ready
W: poll_lock -> Pending
R: poll_read -> Pending
R: unlock
W: poll_lock -> Ready
W: poll_write -> Ready
W: unlock
...
R: poll_lock -> Ready
R: poll_read -> Ready
R: unlock

@quininer
Copy link
Member

quininer commented Aug 11, 2019

A "blocked" poll_read would return Pending, and the lock is then released, and the other task can take the lock and poll_write.

I think this is half duplex.

You can call it to full duplex emulation. Anyway, I don't want to struggle with definitions. I want to be able to write in another thread while reading in one thread, and they won't block each other.

@kingluo
Copy link
Author

kingluo commented Oct 7, 2019

Current split is no locking (https://github.com/tokio-rs/tokio/blob/master/tokio-net/src/tcp/split.rs), but it depends on the r/w methods are orthogonal to each other and do not cause any data race on inner members, and all methods use immutable reference to TcpStream.
However, ReadHalf and WriteHalf only works in the same spawn task (thus only one thread) with TcpStream, because they contain references to Stream.
That's how the current proxy example works (https://github.com/tokio-rs/tokio/blob/master/tokio/examples/proxy.rs).

IMO, the real concurrency is that we could run tokio::spawn twice, and put the same TcpStream into them, that means it must be Sync. In that way, the r/w could run in parallel using two threads.

Is there any idea here for the tokio developers?

@blckngm
Copy link
Contributor

blckngm commented Oct 7, 2019 via email

@kingluo
Copy link
Author

kingluo commented Oct 7, 2019

@sopium Then why not Arc?

@blckngm
Copy link
Contributor

blckngm commented Oct 7, 2019

It replaced by split_mut in #1521.

@kingluo
Copy link
Author

kingluo commented Oct 7, 2019

It replaced by split_mut in #1521.

But that still uses mutual locking.
Parallel r/w is sound and feasible (that's why specialized split works).
So why specialized split does not uses Arc but no locking to hold the stream for multiple threading usage? And this version may be renamed to split2, if someone cares about the zero-cost.

@vojtechkral
Copy link
Contributor

To me it's questionable that we even need the Arc to begin with. I think on Linux/epoll the stream could be split simply with dup(), I think this is even documented in epoll doc to work. Not sure about kqueue but I'd expect this to work there too. On Windows there's WSADuplicateSocket which sounds like it should work if care is taken, but I'm really not very experienced with WSA so I'm not sure there... Also MIO does some extra stuff on Windows which I'm not sure whether it would interfere.

@blckngm
Copy link
Contributor

blckngm commented Oct 8, 2019

To me it's questionable that we even need the Arc to begin with. I think on Linux/epoll the stream could be split simply with dup()

I don't think it really works. At least not with the current design of reactors and registrations, etc. See #774, #824, #1307, etc.

@vojtechkral
Copy link
Contributor

@sopium Ok, interesting. I think #1307 is the (most) relevant one, esp Carl's comment:

Tokio cannot support cloning as the OS selectors do not allow storing different data for FD clones. Because of this, we cannot properly dispatch events if the FD is cloned.

So, if I read that right, that means that, for example in epoll, the userdata field in struct epoll_event is shared for dup-ed fds. (Which would make sense given how epoll works.) I can see how this makes zero-cost owning socket split a problem - the mapping from mio Token to Evented suddenly becomes ambiguous.

On idea that comes to mind is to not map to Evented based on the Token only, but basically map (Token, fd)Evented. However, this would be a pretty significant change. However it seems to me this would allow "more zero-costness" / performance gains (but my viewpoint is maybe kind of superficial).

@kingluo
Copy link
Author

kingluo commented Oct 8, 2019

Now I think no need to clone anything, since the r/w methods are orthogonal and the low level poll_read_priv and poll_write_priv use immutable references to TcpStream as receiver, and even the shutdown could shutdown r/w direction individually, so Arc without locking is already enough to fulfill parallel r/w.

Check my patch on the split:
https://github.com/kingluo/tokio/commit/65827cfb98563ee84cbd05890d0dd2789a72465b

And here is my example app (socks5 server) to use parallel r/w:
https://github.com/kingluo/tokio-socks5/blob/master/src/lib.rs#L200

Note that I spawn two tasks (may run on two threads) to do r/w.

In fact, async-std also apply this way to do parallel r/w:
https://github.com/async-rs/async-std/blob/master/examples/a-chat/server.rs

@blckngm
Copy link
Contributor

blckngm commented Oct 8, 2019

Now I think no need to clone anything, since the r/w methods are orthogonal and the low level poll_read_priv and poll_write_priv use immutable references to TcpStream as receiver, and even the shutdown could shutdown r/w direction individually, so Arc without locking is already enough to fulfill parallel r/w.

Check my patch on the split:
kingluo@65827cf

TcpStream::split was just like that before #1521.

In fact, async-std also apply this way to do parallel r/w:
https://github.com/async-rs/async-std/blob/master/examples/a-chat/server.rs

In async-std Read and Write are implemented for &TcpStream, so there is no need for split.

@vojtechkral
Copy link
Contributor

@kingluo Arc is off-limits as it breaks zero-costness according to @carllerche in #1521 , presumably because of the extra pointer indirection implied by Arc.

@kingluo
Copy link
Author

kingluo commented Oct 8, 2019

In async-std Read and Write are implemented for &TcpStream, so there is no need for split.

Yes, but it still needs Arc. And in fact, like async-std, tokio also uses immutable reference as receiver for the private r/w methods. Most importantly, the mio stream objects also use immutable reference. So Arc is ok for tokio too.

Arc is off-limits as it breaks zero-costness according to @carllerche in #1521 , presumably because of the extra pointer indirection implied by Arc.

However, we could not focus only on the zero-cost on the split itself. Without Arc, for the current specialized split, it needs references to TcpStream, then it's impossible to put r/w into separate threads, which is also a performance trade-off for some app, e.g. some app needs post-handling, e.g. encode/decode after r/w.

@vojtechkral
Copy link
Contributor

@kingluo Sure, I'm not disagreeing and in fact the encode/decode thing is exactly what we do in the software we're using Tokio for :) The Arc-based split would still be a benefit for us too. Sill, I thought it would be interesting to consider a truly zero-cost solution...

@carllerche
Copy link
Member

@sopium implementing AsyncRead / AsyncWrite on &TcpStream would enable either dead-locked tasks or memory leaks. This is why Tokio does not provide that implementation.

@blckngm
Copy link
Contributor

blckngm commented Oct 17, 2019

@sopium implementing AsyncRead / AsyncWrite on &TcpStream would enable either dead-locked tasks or memory leaks. This is why Tokio does not provide that implementation.

I understand it does not work with the current PollEvented out of the box. But with some locking I don't think it's impossible:

struct UdpSocket {
    socket: PollEvented<mio::net::UdpSocket>,
    send_lock: tokio::sync::Mutex<()>,
    recv_lock: tokio::sync::Mutex<()>,
}

impl UdpSocket {
    pub async fn send_to(&self, ...) -> ... {
        let _send_lock_guard = self.send_lock.lock().await;
        poll_fn(...).await
    }
}

(Or are you saying this will dead lock?)

For streams it's trickier because they need to implement poll_read(/write), not async fn read, and there is no obvious place to store the lock guard.

I'm not saying that tokio should implement it this way. Just want to discuss maybe some alternative API designs.

@scalexm
Copy link

scalexm commented Feb 21, 2020

@carllerche would you accept a PR introducing a split_arc method on TcpStream, with appropriate ReadHalfArc/WriteHalfArc types?

It seems like an easy way to have both the zero-cost wrapper and the 'static one, but maybe you’re looking for a more general solution.

It’s unfortunate that we cannot move the two halves in separate tasks right now.

@simonbuchan
Copy link

Just checking, is this the same situation for windows named pipes? E.g. splitting tokio::fs::OpenOptions::new().read(true).write(true).open(r"\\?\pipe\some\name")? I guess you could simply open the pipe twice in that case.

@Darksonn Darksonn added the A-tokio Area: The main tokio crate label Apr 23, 2020
@Darksonn
Copy link
Contributor

Note that #2270 has been merged.

@Darksonn Darksonn added the M-net Module: tokio/net label Apr 23, 2020
@stuhood
Copy link

stuhood commented May 11, 2020

@Darksonn : Would we say that #2270 resolves this? It seems to...

@simonbuchan
Copy link

Depends on if there are other streams that could be split, I guess?

After digging into the named pipe issue I was hitting (write blocks waiting for read), it wouldn't help to avoid the mutex on splitting File, because there's two more levels that would have to change: File itself queues io requests, so close to the same behavior, and the underlying windows file is not opened or accessed in overlapped mode, so windows itself is queueing the requests too (I ended up implementing something hacky on winapi). So that seems like a whole separate issue.

@Darksonn
Copy link
Contributor

I guess it does. We still need to make the split methods more consistent, but that has to wait for v0.3 due to backwards compatibility.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-net Module: tokio/net
Projects
None yet
Development

No branches or pull requests

10 participants