New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redefine Stream/Sink/AsyncRead/AsyncWrite/etc on top of Future #1365

Open
Matthias247 opened this Issue Dec 8, 2018 · 10 comments

Comments

Projects
None yet
5 participants
@Matthias247
Copy link
Contributor

Matthias247 commented Dec 8, 2018

I think in the async/await world it might be preferable to redefine the traits and types like the ones mentioned in the title on top of Futures, and e.g. move from poll_something() signatures to methods that return Futures.

E.g.

trait Stream<T> {
    type Item: Future<Item=T>;
    fn read(&mut self) -> Self::Item;
    // or something along the following, whatever works best
    fn read(&mut self) -> impl Future<Item = T>
}

The important reason for this, that it should be possible to implement those types on top of Futures as primitives, which isn't possible today.

As a motivating example I'm thinking about a MPMC channel, which can be built around some async primitives as shown below:

struct ChannelImpl<T> {
    state: AsyncMutex<Queue<T>>;
    has_space: AsyncManualResetEvent;
    has_element: AsyncManualResetEvent;
}

impl ChannelImpl<T> {
    async fn write(&mut self, item: T) {
        loop {
            await!(self.has_space.wait());
            let mut queue = await!(self.state.lock());
            if queue.free_space() == 0 {
                continue;
            }
            queue.push(item);
            break;
       }
       self.has_element.set();
    }

    async fn read(&mut self) -> T {
        let mut result: Option<T> = None;
        loop {
            await!(self.has_element.wait());
            let mut queue = await!(self.state.lock());
            if queue.len() == 0 {
                continue;
            }
            result = Some(queue.pop());
            break;
       }
       self.has_space.set();
       result.unwrap()
    }
}

I'm not yet sure if the current state of async functions and traits allow all this (please educate me!), but I think this should be one of the goals that async programming in Rust should enable.

This kind of implementation is however currently not wrappable in a Stream or Sink implementation.
We can't stuff this kind of code inside a poll_xyz method of a Stream/etc, since it requires temporary Futures that all carry their own poll state to be instantiated in the task. Those Futures must maintain their state between calls to poll(). E.g. polling an asynchronous mutex the first time creates a registration in the waiting list, and dropping it cancels the registration. If we would try to use an asynchronous mutex inside a poll_next method, we must always need to drop it before poll_next() returns, which would cancel the registration and never allow use to retrieve the mutex.

I think the more general observation might be, that Futures allow for persisting some additional temporary state per operation, and provide additional lifecycle hooks per operation (Future gets polled for the first time, and Future gets dropped) compared to the current Stream definitions.

Another side effect is that currently Streams/etc must all support pinning. In Streams that return Futures that wouldn't be the case, since the Futures get pinned. They might indirectly get pinned because the respective Futures might reference the Stream through a reference and a certain lifetime, but that relationship is already directly checked through Rusts normal means, and e.g. doesn't put special requirements regarding pinning on the implementation of the type. And often we would Streams being moveable after they have been partly consumed (e.g. a Tcp Socket), which is not allowed if they are pinned. Of course some of them might be Unpin, but this kind of change would allow moves again for all Streams.

@Nemo157

This comment has been minimized.

Copy link
Member

Nemo157 commented Dec 9, 2018

Having the core traits return futures directly are probably blocked on GATs (or all the way to async methods, although I’m hopeful that there will be good cross-compatibility between async methods and their underlying GAT representation). While it is possible to have a stream that can produce the futures for more than one item at a time, I think the common case is that the stream can only produce a future for the next item. For example a trait like

trait Stream {
    type Output;
    type Next<'a>: Future<Item = Option<Self::Output>> + 'a;

    fn next(self: Pin<&mut self>) -> Self::Next<'_>;
}

I reintroduced Pin to the method as I have a usecase where the stream must be pinned from the first call to next until it is dropped, doing the setup and teardown of the immovable part on every call to next would be too expensive. I’m not sure how common this might be, so I guess I could impl Stream for Pin<&mut MyStream> instead and eat the cost of a double indirection if pinning is dropped from the Stream API. Maybe if we have some way to construct streams via async fn this would be common enough to keep it as part of the trait.

@Matthias247

This comment has been minimized.

Copy link
Contributor

Matthias247 commented Dec 9, 2018

Having the core traits return futures directly are probably blocked on GATs (or all the way to async methods, although I’m hopeful that there will be good cross-compatibility between async methods and their underlying GAT representation).

I was afraid of that. I don't know yet what GATs exactly are but will try read up on them. Can you elaborate how a signature for a stream would need to look like in order to make it implementable by future composition as outlined above?

I would say as long as that is not possible, we would maybe face the general issue that it's not possible to define abstractions around types that are implemented by async functions. And e.g if I built an async powered HTTP client, I can't built an interface/trait around it that I use for unit-testing an dependency-injection in other code.

While it is possible to have a stream that can produce the futures for more than one item at a time, I think the common case is that the stream can only produce a future for the next item.

You are right, most streams will only support producing a single item at a time. Supporting more would require internal queuing (which can easily be implemented through AsyncMutex in some situations, but is hard for manually implemented Streams). E.g. if we look at other implementations glib only does support one concurrent outstanding operation per IO type. WinAPI/IOCP supports more (that's what the term Overlapped tries to describe), but there's barely a need for it.

I think that aspect could be most cleanly solved by the type system, if reading from the stream produces a Future and consumes the stream until the operation had been finished - where we get it back. Allowing the creation of multiple futures, and then either blocking in their implementation (via AsyncMutex) or reporting an error if no concurrent operations are allowed is another option.

I reintroduced Pin to the method as I have a usecase where the stream must be pinned from the first call to next until it is dropped, doing the setup and teardown of the immovable part on every call to next would be too expensive. I’m not sure how common this might be, so I guess I could impl Stream for Pin<&mut MyStream> instead and eat the cost of a double indirection if pinning is dropped from the Stream API

I can see that some Streams must have stable addresses, but I'm not sure whether that fact needs to be propagated or is an implementation detail. E.g many streams will be be implemented through Arc<Mutex<InnerStream>>, which effectively pins the inner stream, but that doesn't leak into the API and still makes moveable. What's your use-case for this?

@Nemo157

This comment has been minimized.

Copy link
Member

Nemo157 commented Dec 10, 2018

GATs are generic associated types, that's the feature that allows parameterizing the Next associated type by the lifetime 'a in my example.

I think that aspect could be most cleanly solved by the type system, if reading from the stream produces a Future and consumes the stream until the operation had been finished - where we get it back.

This is where having GATs is useful, rather than consuming the stream and receiving it back once complete you can borrow the stream, and have that borrow last until the future is complete/dropped.

What's your use-case for this?

My use case was reading a stream of values from a radio peripheral in a microcontroller. By using pinning the stream can have an internal buffer that is written to by the radio using DMA, without pinning this buffer would need to be externally allocated and passed in as a reference (or heap allocated, but this was for a no_std project).

@Matthias247

This comment has been minimized.

Copy link
Contributor

Matthias247 commented Dec 10, 2018

This is where having GATs is useful, rather than consuming the stream and receiving it back once complete you can borrow the stream, and have that borrow last until the future is complete/dropped.

Ah, that makes sense. So the following wouldn't work, because without GAT we can't put the lifetime on the item?

trait Stream {
    type Output;
    type Next: Future<Item = Self::Output>;

    fn next(self: &'a mut) -> Self::Next<'a>; // Or is that Self::Next + 'a ?
}

After reading GATs I understand it makes lifetimes more flexible, but I wasn't sure whether something is possible without them. It should be OK for this use-case if the lifetime of the returned Future is bound to the stream.

My use case was reading a stream of values from a radio peripheral in a microcontroller. By using pinning the stream can have an internal buffer that is written to by the radio using DMA, without pinning this buffer would need to be externally allocated and passed in as a reference (or heap allocated, but this was for a no_std project).

Ah, cool idea to represent it as a stream. I think however your requirement is more the typical pain point of an embedded system: One shouldn't dynamically allocate, and therefore ideally all buffers/etc. should be contained inside the type.

However without e.g. const generics that's already painful, and even with I don't find the embedding approach super appealing (there's an endless forwarding of type parameters).
I guess I would go with either the externally allocated route (which gives the stream a lifetime, but that can be 'static for a global buffer). Or maybe cleaner: Create a simple bump allocator for the platform, and change the strategy from "don't allocate at all" to "only allocate at startup".

@Matthias247

This comment has been minimized.

Copy link
Contributor

Matthias247 commented Dec 11, 2018

I tried things out. I seems to be only possible to define this at the moment:

trait Stream<'a> {
    type Output;
    type Next: Future<Item = Self::Output> + 'a;

    fn next(&'a mut self) -> Self::Next;
}

which is arguably not nice due to the lifetime which needlessly is on the trait.
Haven't yet tried out whether that's implementable through async function composition.

@aturon

This comment has been minimized.

Copy link
Collaborator

aturon commented Dec 13, 2018

@cramertj

This comment has been minimized.

Copy link
Collaborator

cramertj commented Dec 13, 2018

I discussed this with @erickt elsewhere and explained my thoughts. As the ecosystem stands, it's not possible to make this work without making all of these traits non-object-safe, so I'm not interested in pursuing this at the moment.

@seanmonstar

This comment has been minimized.

Copy link
Collaborator

seanmonstar commented Dec 13, 2018

One thing that come to mind with regards to AsyncRead/AsyncWrite, if the poll_read/poll_write methods are removed, and there are only async fns, then we can't really wait on both the read and the write future, since both will want to mutably borrow the transport and not release the borrow until complete.

@Matthias247

This comment has been minimized.

Copy link
Contributor

Matthias247 commented Dec 13, 2018

@cramertj: would you mind sharing these publicly at some point? I guess that would be interesting for others too. The implications on the ecosystem are totally understood. Although the current changes from 0.1 already are lots of those smaller renaming things. And this would mostly be another. We can still have the old things around, with adapters to the new variants. This direction is possible to bridge. However it’s not possible to move future based thingies into the current signatures without costs (eg extra allocations)

@seanmonstar: I think this should still be possible as all types that support read and write should also support a split method for getting individual halves in order to make them more flexible. Then you can get a future from the reader and one from the writer.
In lots of Protocol implementations i would want to give ownership of the writer to some writing task, and keep the reader in the reading task. Then I need split anyway.

@cramertj

This comment has been minimized.

Copy link
Collaborator

cramertj commented Dec 13, 2018

@Matthias247 Oh sorry, I don't have a large writeup or anything, I just mean't we had discussed it. If there were a way to return a dyn<size_of::<usize>> Future<Output = io::Result<()>> (dynamic future with maximum one-pointer size) I'd support that, but that feature doesn't exist yet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment