New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider having polling an error represent the final Stream value #206

Closed
carllerche opened this Issue Oct 12, 2016 · 56 comments

Comments

Projects
None yet
@carllerche
Contributor

carllerche commented Oct 12, 2016

Consider having polling an error represent the final value. In other words, a poll that returns an error means that poll should never be called again. In the case of a Stream, this means that an error indicates the stream has terminated.

This issue is a placeholder for the associated discussion.

cc @aturon

@aturon

This comment has been minimized.

Show comment
Hide comment
@aturon

aturon Oct 12, 2016

Collaborator

@alexcrichton and I debated this initially, and the two options are equally expressive, in that a given stream can be set up to behave either way in either approach (e.g. by using Result in the Item type). So the question is what is the most convenient/straightforward default.

I tend to think that it's much more common for stream errors to be "fatal" than recoverable, so I agree with @carllerche that we've probably chosen the wrong default here.

Collaborator

aturon commented Oct 12, 2016

@alexcrichton and I debated this initially, and the two options are equally expressive, in that a given stream can be set up to behave either way in either approach (e.g. by using Result in the Item type). So the question is what is the most convenient/straightforward default.

I tend to think that it's much more common for stream errors to be "fatal" than recoverable, so I agree with @carllerche that we've probably chosen the wrong default here.

@alexcrichton

This comment has been minimized.

Show comment
Hide comment
@alexcrichton

alexcrichton Oct 12, 2016

Member

I also think that this is largely a question of what the combinators do. For example the TCP listener probably won't fuse itself once an error happens but rather will continue to keep accepting sockets. The combinators, however, would be able to assume that when an error happens or Ok(None) happens that everything is terminated.

Sounds reasonable to switch to errors == "end of stream"

Member

alexcrichton commented Oct 12, 2016

I also think that this is largely a question of what the combinators do. For example the TCP listener probably won't fuse itself once an error happens but rather will continue to keep accepting sockets. The combinators, however, would be able to assume that when an error happens or Ok(None) happens that everything is terminated.

Sounds reasonable to switch to errors == "end of stream"

@alexcrichton alexcrichton added this to the 0.2 release milestone Oct 12, 2016

@tmiasko

This comment has been minimized.

Show comment
Hide comment
@tmiasko

tmiasko Oct 12, 2016

Contributor

Terminating a stream on an error would simplify the life of implementers of the
Stream trait, as they wouldn't have to ensure any particular behaviour of
further polls after returning an error (presuming that behaviour of poll after
termination is still left unspecified).

The main question seems to be indeed, what combinators would do in that case.
They could be overspecified with respect to errors (edit: at least those that
short-circuit on error), and continue to behave exactly the same as they
currently do. In this case the difference would not be particularly important.

Contributor

tmiasko commented Oct 12, 2016

Terminating a stream on an error would simplify the life of implementers of the
Stream trait, as they wouldn't have to ensure any particular behaviour of
further polls after returning an error (presuming that behaviour of poll after
termination is still left unspecified).

The main question seems to be indeed, what combinators would do in that case.
They could be overspecified with respect to errors (edit: at least those that
short-circuit on error), and continue to behave exactly the same as they
currently do. In this case the difference would not be particularly important.

@tmiasko

This comment has been minimized.

Show comment
Hide comment
@tmiasko

tmiasko Oct 12, 2016

Contributor

I the world with streams terminating on an error, would there still be a place
for combinators that recover from errors like or_else and then?

They could recover from one error, but what next? Their only option seems to be
to terminate the stream, as otherwise they would be breaking the Stream
contract. Notice that it is the combinator that would be breaking the contract
on continued polling, not a downstream consumer.

This seems to be a weak point of stopping on errors. Error recovery is
possible, but only once. Alternatively, those combinators could have stronger
preconditions to work only with streams that permit further polling on errors.

Contributor

tmiasko commented Oct 12, 2016

I the world with streams terminating on an error, would there still be a place
for combinators that recover from errors like or_else and then?

They could recover from one error, but what next? Their only option seems to be
to terminate the stream, as otherwise they would be breaking the Stream
contract. Notice that it is the combinator that would be breaking the contract
on continued polling, not a downstream consumer.

This seems to be a weak point of stopping on errors. Error recovery is
possible, but only once. Alternatively, those combinators could have stronger
preconditions to work only with streams that permit further polling on errors.

@alexcrichton

This comment has been minimized.

Show comment
Hide comment
@alexcrichton

alexcrichton Oct 13, 2016

Member

Hm yeah that's a very good point about the combinators. I think this would remove basically all of the "chaining" combinators like and_then, or_else, and then. More specifically:

  • or_else would only allow for recovery of one error, which seems odd
  • then similarly would only allow catching one error, which seems odd
  • and_then would maybe generate errors from futures, not the stream. This error would enter the stream and then oddly terminate it even though the original stream is just fine.

That... may slightly change my opinion here. @carllerche @aturon thoughts about the effect on these chaining combinators with streams?

Member

alexcrichton commented Oct 13, 2016

Hm yeah that's a very good point about the combinators. I think this would remove basically all of the "chaining" combinators like and_then, or_else, and then. More specifically:

  • or_else would only allow for recovery of one error, which seems odd
  • then similarly would only allow catching one error, which seems odd
  • and_then would maybe generate errors from futures, not the stream. This error would enter the stream and then oddly terminate it even though the original stream is just fine.

That... may slightly change my opinion here. @carllerche @aturon thoughts about the effect on these chaining combinators with streams?

@carllerche

This comment has been minimized.

Show comment
Hide comment
@carllerche

carllerche Oct 13, 2016

Contributor

IMO this is an argument in favor of having the Stream error terminate the stream.

There are two different error classes with streams:

  1. A stream of values that are either OK or errors.
  2. An error in the production of values, which means that no further values may be produced due to that error.

The current behavior of stream errors implies the first category which makes the second class of errors difficult to model.

If, however, the stream error terminates the stream, this implies that a stream error follows under group #2.

TcpListener::incoming is group #1, in which case, I believe this is modeled much better with a Stream<Item = Result<T, E>, E2>

This lets you differentiate between producing an error value, and an error in value production (you can have potentially two different error types).

As pointed out, combinators like or_else, and_then, then, etc.. don't make sense anymore, but a different set of combinators do:

incoming
  .map(my_or_else_fn)
  .take_while(Result::is_ok)

For example would provide similar behavior as the current or_else

Contributor

carllerche commented Oct 13, 2016

IMO this is an argument in favor of having the Stream error terminate the stream.

There are two different error classes with streams:

  1. A stream of values that are either OK or errors.
  2. An error in the production of values, which means that no further values may be produced due to that error.

The current behavior of stream errors implies the first category which makes the second class of errors difficult to model.

If, however, the stream error terminates the stream, this implies that a stream error follows under group #2.

TcpListener::incoming is group #1, in which case, I believe this is modeled much better with a Stream<Item = Result<T, E>, E2>

This lets you differentiate between producing an error value, and an error in value production (you can have potentially two different error types).

As pointed out, combinators like or_else, and_then, then, etc.. don't make sense anymore, but a different set of combinators do:

incoming
  .map(my_or_else_fn)
  .take_while(Result::is_ok)

For example would provide similar behavior as the current or_else

@tilal6991

This comment has been minimized.

Show comment
Hide comment
@tilal6991

tilal6991 Oct 13, 2016

I would also favour the approach of termination of stream on errors. I find that an error which terminates the stream is much more common than one which is an "expected value" and is recoverable from.

(FYI Rx also does this and it has error handling "operators" if you want to check how and_then et. all can be modelled in the new system).

tilal6991 commented Oct 13, 2016

I would also favour the approach of termination of stream on errors. I find that an error which terminates the stream is much more common than one which is an "expected value" and is recoverable from.

(FYI Rx also does this and it has error handling "operators" if you want to check how and_then et. all can be modelled in the new system).

@alexcrichton

This comment has been minimized.

Show comment
Hide comment
@alexcrichton

alexcrichton Oct 14, 2016

Member

Hm I wonder if we'd perhaps have:

fn and_then<F, R>(self, f: F) -> impl Stream<Result<R::Item, R::Error>, Self::Error>
    where F: FnMut(Self::Item) -> R,
          R: Future,

That is, @carllerche I agree that if we had these semantics then a TCP listener would be that form of a stream. I find and_then to be a very useful combinator, however, and it'd be a shame to lose it. If, however, and_then just transformed the Item type then perhaps that could work.

I'm still a little uneasy about the composition story though, it doesn't feel quite right...

Member

alexcrichton commented Oct 14, 2016

Hm I wonder if we'd perhaps have:

fn and_then<F, R>(self, f: F) -> impl Stream<Result<R::Item, R::Error>, Self::Error>
    where F: FnMut(Self::Item) -> R,
          R: Future,

That is, @carllerche I agree that if we had these semantics then a TCP listener would be that form of a stream. I find and_then to be a very useful combinator, however, and it'd be a shame to lose it. If, however, and_then just transformed the Item type then perhaps that could work.

I'm still a little uneasy about the composition story though, it doesn't feel quite right...

@carllerche

This comment has been minimized.

Show comment
Hide comment
@carllerche

carllerche Oct 14, 2016

Contributor

You could do something like this, which would have the additional benefit of being able to have different have different error types for the different category of errors:

fn and_then<F, T, E, U, R>(self, f: F) -> impl Stream<Result<U, E>, Self::Error>
    where Self: Stream<Item = Result<T, E>> + Sized,
          F: FnMut(T) -> R,
          R: IntoFuture<Item = Result<U, E>, Error = Self::Error>
{
    // ...
}

... it is a lot of generics though...

Contributor

carllerche commented Oct 14, 2016

You could do something like this, which would have the additional benefit of being able to have different have different error types for the different category of errors:

fn and_then<F, T, E, U, R>(self, f: F) -> impl Stream<Result<U, E>, Self::Error>
    where Self: Stream<Item = Result<T, E>> + Sized,
          F: FnMut(T) -> R,
          R: IntoFuture<Item = Result<U, E>, Error = Self::Error>
{
    // ...
}

... it is a lot of generics though...

@carllerche

This comment has been minimized.

Show comment
Hide comment
@carllerche

carllerche Oct 21, 2016

Contributor

I think this PR is relevant to the discussion: https://github.com/alexcrichton/futures-rs/pull/199/files

Aka, if an error is passed through without terminating the stream it is unclear if it should count as an element or not.

Contributor

carllerche commented Oct 21, 2016

I think this PR is relevant to the discussion: https://github.com/alexcrichton/futures-rs/pull/199/files

Aka, if an error is passed through without terminating the stream it is unclear if it should count as an element or not.

@carllerche carllerche changed the title from Consider having polling an error represent the final value to Consider having polling an error represent the final Stream value Nov 3, 2016

@alexcrichton

This comment has been minimized.

Show comment
Hide comment
@alexcrichton

alexcrichton Nov 18, 2016

Member

If we go with this (which it seems like we will) we should update stream::iter to take an iterator of items, not an iterator of results.

Member

alexcrichton commented Nov 18, 2016

If we go with this (which it seems like we will) we should update stream::iter to take an iterator of items, not an iterator of results.

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Jan 12, 2017

Shouldn't this be encoded by type? Currently Stream::poll takes &mut self, which indicates self is always in correct state after execution of poll. The other case would be encoded by move.

Maybe we could have two traits (names invented just for now):

enum StreamAsync<S: FallibleStream> {
    Ready(S::Item, S),
    NotReady(S),
}

trait FallibleStream {
    type Item;
    type Error;

    fn poll(self) -> Result<StreamAsync<Self>, Self::Error>;

    // combinators
}

trait InfallibleStream {
     type Item;
     // We don't need error here.

    fn poll(&mut self) -> Async<Self::Item>;
}

// This bridges the two:
impl<S: FallibleStream> InfallibleStream for Result<S, S::Error> {
    type Item = Result<S::Item, S::Error>;

    fn poll(&mut self) -> Async<Self::Item> {
        // Too lazy to write actual code, but the idea is to replace Self::Ok with Err if stream fails.
    }
}

I admit it'd somewhat complicate the API but it might express constrains more clearly and it'd be checked by compiler.

What do you think?

Kixunil commented Jan 12, 2017

Shouldn't this be encoded by type? Currently Stream::poll takes &mut self, which indicates self is always in correct state after execution of poll. The other case would be encoded by move.

Maybe we could have two traits (names invented just for now):

enum StreamAsync<S: FallibleStream> {
    Ready(S::Item, S),
    NotReady(S),
}

trait FallibleStream {
    type Item;
    type Error;

    fn poll(self) -> Result<StreamAsync<Self>, Self::Error>;

    // combinators
}

trait InfallibleStream {
     type Item;
     // We don't need error here.

    fn poll(&mut self) -> Async<Self::Item>;
}

// This bridges the two:
impl<S: FallibleStream> InfallibleStream for Result<S, S::Error> {
    type Item = Result<S::Item, S::Error>;

    fn poll(&mut self) -> Async<Self::Item> {
        // Too lazy to write actual code, but the idea is to replace Self::Ok with Err if stream fails.
    }
}

I admit it'd somewhat complicate the API but it might express constrains more clearly and it'd be checked by compiler.

What do you think?

@alexcrichton

This comment has been minimized.

Show comment
Hide comment
@alexcrichton

alexcrichton Jan 12, 2017

Member

@Kixunil yes that was considered long ago in the initial early design stages of this library, but unfortunately it suffers a few problems.

  1. Usage of such an interface isn't always the most ergonomic. With so many types always moving there's lots of variable binding and inability to store values in structures.
  2. The Stream trait is no longer object safe, precluding type erasure and creating trait objects.
  3. Performance can decrease sometimes as moves aren't free and if everything is always moving it can cause unnecessary overhead.
Member

alexcrichton commented Jan 12, 2017

@Kixunil yes that was considered long ago in the initial early design stages of this library, but unfortunately it suffers a few problems.

  1. Usage of such an interface isn't always the most ergonomic. With so many types always moving there's lots of variable binding and inability to store values in structures.
  2. The Stream trait is no longer object safe, precluding type erasure and creating trait objects.
  3. Performance can decrease sometimes as moves aren't free and if everything is always moving it can cause unnecessary overhead.
@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Jan 12, 2017

Good points. What exactly do you mean by "With so many types always moving there's lots of variable binding and inability to store values in structures."? I somehow can't imagine, what do you mean.

Also, isn't 3 optimized by compiler? I thought that internally it avoids copying where possible.

Edit: Looking at the ASM of a simple example it seems like it doesn't...

Kixunil commented Jan 12, 2017

Good points. What exactly do you mean by "With so many types always moving there's lots of variable binding and inability to store values in structures."? I somehow can't imagine, what do you mean.

Also, isn't 3 optimized by compiler? I thought that internally it avoids copying where possible.

Edit: Looking at the ASM of a simple example it seems like it doesn't...

@alexcrichton

This comment has been minimized.

Show comment
Hide comment
@alexcrichton

alexcrichton Jan 12, 2017

Member

It basically means that every function consumes the receiver and then optionally returns it. That means that if you try to store partial state in a struct, for example, you'll have to deconstruct and reconstruct the struct frequently or have a lot of Option fields.

Member

alexcrichton commented Jan 12, 2017

It basically means that every function consumes the receiver and then optionally returns it. That means that if you try to store partial state in a struct, for example, you'll have to deconstruct and reconstruct the struct frequently or have a lot of Option fields.

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Jan 13, 2017

Thank you!

With all this problems, I prefer to keep the semantics as is. Maybe provide some other interface to specify whether error is final. (e.g. is_err_final(&self) -> bool method.)

Kixunil commented Jan 13, 2017

Thank you!

With all this problems, I prefer to keep the semantics as is. Maybe provide some other interface to specify whether error is final. (e.g. is_err_final(&self) -> bool method.)

@carllerche

This comment has been minimized.

Show comment
Hide comment
@carllerche

carllerche Jan 13, 2017

Contributor

@Kixunil could you explain why you would like the semantics to stay as is? If the semantics are changed, you can get the original semantics by having a stream over Result.

Contributor

carllerche commented Jan 13, 2017

@Kixunil could you explain why you would like the semantics to stay as is? If the semantics are changed, you can get the original semantics by having a stream over Result.

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Jan 16, 2017

@carllerche I think mutable reference expresses "the value will stay valid". If error indicated end of stream, what should happen when poll is called again? Panic? Return same error?

Keeping it this way has another advantage of not breaking existing code. :)

Also, one interesting example would be writing to file. The write can fail because of full disk but later it can succeed. On the other hand, I can imagine fatal error.

I'd prefer if the semantics could be defined by implementor. Just single fn is_err_final() may be fine. Or we may require Stream::Error: IsFatal too. Or create other associated type ErrorSemantics with one of Fatal, Restartable, Mixed (Mixed would require Stream::Error: IsFatal) I'm not sure what's best approach here.

I'd love if the constraint could be encoded in type system somehow. To give an example, I dislike the way io::{Read, Write} hard-code error type, so there's no way to statically enforce that writing to Vec always succeeds. I'd love if we could avoid creating similar issue in case of Futures.

Kixunil commented Jan 16, 2017

@carllerche I think mutable reference expresses "the value will stay valid". If error indicated end of stream, what should happen when poll is called again? Panic? Return same error?

Keeping it this way has another advantage of not breaking existing code. :)

Also, one interesting example would be writing to file. The write can fail because of full disk but later it can succeed. On the other hand, I can imagine fatal error.

I'd prefer if the semantics could be defined by implementor. Just single fn is_err_final() may be fine. Or we may require Stream::Error: IsFatal too. Or create other associated type ErrorSemantics with one of Fatal, Restartable, Mixed (Mixed would require Stream::Error: IsFatal) I'm not sure what's best approach here.

I'd love if the constraint could be encoded in type system somehow. To give an example, I dislike the way io::{Read, Write} hard-code error type, so there's no way to statically enforce that writing to Vec always succeeds. I'd love if we could avoid creating similar issue in case of Futures.

@robey

This comment has been minimized.

Show comment
Hide comment
@robey

robey Mar 16, 2017

I've played with streams in other languages, and found the idea (in this library) of "errors that don't terminate the stream" odd. Not bad, just odd. I agree with the comments above that you could get the same effect with a stream of Result, so the current behavior of making every item an implicit Result just complicates the common case.

robey commented Mar 16, 2017

I've played with streams in other languages, and found the idea (in this library) of "errors that don't terminate the stream" odd. Not bad, just odd. I agree with the comments above that you could get the same effect with a stream of Result, so the current behavior of making every item an implicit Result just complicates the common case.

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Mar 16, 2017

@robey we have these cases:

  • Stream can never fail
  • Stream can fail (no future operations are possible)
  • Stream can temporarily fail producing a value but it might be able to produce later. The decision whether to terminate it is up to the caller

Same with Sink.

How would you model all of these?

Kixunil commented Mar 16, 2017

@robey we have these cases:

  • Stream can never fail
  • Stream can fail (no future operations are possible)
  • Stream can temporarily fail producing a value but it might be able to produce later. The decision whether to terminate it is up to the caller

Same with Sink.

How would you model all of these?

@robey

This comment has been minimized.

Show comment
Hide comment
@robey

robey Mar 16, 2017

@Kixunil I think @alexcrichton's idea from here would let all three cases work: #206 (comment)

The first two cases would be the new behavior, and the third case would be handled by making the stream be explicitly of type Stream<Result<A, E>> -- a stream of things that can each be an item or an error.

robey commented Mar 16, 2017

@Kixunil I think @alexcrichton's idea from here would let all three cases work: #206 (comment)

The first two cases would be the new behavior, and the third case would be handled by making the stream be explicitly of type Stream<Result<A, E>> -- a stream of things that can each be an item or an error.

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Mar 16, 2017

@robey That sounds reasonable. The problem I see there is someone could accidentally mistake one for the other but that is probably unavoidable anyway.

Anyway, I think my comment still holds: we should specify exactly, what should happen when someone calls poll() even if the stream itself failed?

Kixunil commented Mar 16, 2017

@robey That sounds reasonable. The problem I see there is someone could accidentally mistake one for the other but that is probably unavoidable anyway.

Anyway, I think my comment still holds: we should specify exactly, what should happen when someone calls poll() even if the stream itself failed?

@living180

This comment has been minimized.

Show comment
Hide comment
@living180

living180 Apr 12, 2017

After implementing my own transport for use with tokio-proto, I agree that having an error terminate the stream makes the most sense, and that as @robey said, a non-fatal error can be handled with Stream<Result<A, E>>.

Regarding @Kixunil's question about poll() on a failed stream, the most robust way to handle would be to have poll() take self instead of &mut self as proposed in @Kixunil 's comment. I know that @alexcrichton said that this was considered and ruled out, but that approach would make implementing a stream that can fail much nicer. As it stands right now, my implementation has to save any fatal error so that it can continue to return Ok(None) after an error occurs. The same would be true if the semantics were that calling poll() on a failed stream panics - it would still be necessary for me to maintain state to know that the panic should happen. My implementation would be much simpler if poll() could just consume the stream on completion/failure.

I understand the concern about the performance cost of the moves, but a very basic attempt seems to indicate that the necessary code changes wouldn't be that onerous. As an experiment I created a gist containing an implementation of ForEach::poll() when Stream::poll() takes self instead of &mut self and the necessary changes weren't terribly drastic. Perhaps other cases are less simple though.

living180 commented Apr 12, 2017

After implementing my own transport for use with tokio-proto, I agree that having an error terminate the stream makes the most sense, and that as @robey said, a non-fatal error can be handled with Stream<Result<A, E>>.

Regarding @Kixunil's question about poll() on a failed stream, the most robust way to handle would be to have poll() take self instead of &mut self as proposed in @Kixunil 's comment. I know that @alexcrichton said that this was considered and ruled out, but that approach would make implementing a stream that can fail much nicer. As it stands right now, my implementation has to save any fatal error so that it can continue to return Ok(None) after an error occurs. The same would be true if the semantics were that calling poll() on a failed stream panics - it would still be necessary for me to maintain state to know that the panic should happen. My implementation would be much simpler if poll() could just consume the stream on completion/failure.

I understand the concern about the performance cost of the moves, but a very basic attempt seems to indicate that the necessary code changes wouldn't be that onerous. As an experiment I created a gist containing an implementation of ForEach::poll() when Stream::poll() takes self instead of &mut self and the necessary changes weren't terribly drastic. Perhaps other cases are less simple though.

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Apr 12, 2017

@living180 Thank you for considering my suggestion!

I'd like to revisit my suggestion again since I got into this very recently. I'm implementing an application where correctness > speed (but still it needs to be fast enough).

This is related to Future trait though. Currently, Future takes &mut self and I'd like to have one future produce other (different!) future. However they need to pass non-clone "bus". So the only way to implement it is something like this:

fn poll(&mut self) -> Poll<...> {
    match self.bus.take() {
        Some(bus) => match bus.poll() {
            Ok(Async::Ready(result)) => Ok(Async::Ready(MyNewFuture::new(result, bus)),
            Ok(Async::NotReady) => { 
                 self.bus = Some(bus);
                 Ok(Async::NotReady)
            }
            Err(e) => Err(e)
        },
        None => panic!("poll() called twice")
}

This is weird, involves copying from/to Option<T> anyway (it probably can't be optimised away because of panic safety; similar problem as with std::mem::replace_with() - see it's RFC) and also risks panics that could've been statically type checked. It would be much simpler to do with self instead of &mut self.

Kixunil commented Apr 12, 2017

@living180 Thank you for considering my suggestion!

I'd like to revisit my suggestion again since I got into this very recently. I'm implementing an application where correctness > speed (but still it needs to be fast enough).

This is related to Future trait though. Currently, Future takes &mut self and I'd like to have one future produce other (different!) future. However they need to pass non-clone "bus". So the only way to implement it is something like this:

fn poll(&mut self) -> Poll<...> {
    match self.bus.take() {
        Some(bus) => match bus.poll() {
            Ok(Async::Ready(result)) => Ok(Async::Ready(MyNewFuture::new(result, bus)),
            Ok(Async::NotReady) => { 
                 self.bus = Some(bus);
                 Ok(Async::NotReady)
            }
            Err(e) => Err(e)
        },
        None => panic!("poll() called twice")
}

This is weird, involves copying from/to Option<T> anyway (it probably can't be optimised away because of panic safety; similar problem as with std::mem::replace_with() - see it's RFC) and also risks panics that could've been statically type checked. It would be much simpler to do with self instead of &mut self.

@alexcrichton

This comment has been minimized.

Show comment
Hide comment
@alexcrichton

alexcrichton Apr 12, 2017

Member

@living180 note that using &mut self instead of self isn't purely for performance, the consumer ergonomics (e.g. being able to store in a struct field) and object safety (being able to create a trait object) are also quite important!

I do agree though that implementations would be easier with self, there's a number of combinators which have "empty" statements which just panic intended for temporary state.

Member

alexcrichton commented Apr 12, 2017

@living180 note that using &mut self instead of self isn't purely for performance, the consumer ergonomics (e.g. being able to store in a struct field) and object safety (being able to create a trait object) are also quite important!

I do agree though that implementations would be easier with self, there's a number of combinators which have "empty" statements which just panic intended for temporary state.

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Apr 12, 2017

@alexcrichton I feel like it's repeating pattern. It begins to make more and more sense to me to use two traits and a glue between them. So implementors could use self version and consumers &mut self version. I'm not sure about naming right now, though.

Also I'm not sure about what consumers are. I feel like it's only reactor etc. Could you clarify it?

Update: I've created a quick, imperfect example of how that could work.

Kixunil commented Apr 12, 2017

@alexcrichton I feel like it's repeating pattern. It begins to make more and more sense to me to use two traits and a glue between them. So implementors could use self version and consumers &mut self version. I'm not sure about naming right now, though.

Also I'm not sure about what consumers are. I feel like it's only reactor etc. Could you clarify it?

Update: I've created a quick, imperfect example of how that could work.

@alexcrichton

This comment has been minimized.

Show comment
Hide comment
@alexcrichton

alexcrichton Apr 12, 2017

Member

@Kixunil that's imposing the Option-style cost at all layers of the stack vs only where necessary, though?

Member

alexcrichton commented Apr 12, 2017

@Kixunil that's imposing the Option-style cost at all layers of the stack vs only where necessary, though?

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Apr 12, 2017

@alexcrichton I guess it would be only used at top layer (so you can spawn the future on reactor). It's difficult to tell without actually trying. Of course, my implementation would have to add it's own (optimized) combinators.

Kixunil commented Apr 12, 2017

@alexcrichton I guess it would be only used at top layer (so you can spawn the future on reactor). It's difficult to tell without actually trying. Of course, my implementation would have to add it's own (optimized) combinators.

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Apr 18, 2017

I think I figured out how to restore "object safety". I guess this is needed for reactors. The basic idea is that users could create instance of FutureObject<I, E> which would take care of dynamic dispatch. It involves a bit of unsafe inside but it provides completely safe interface for users.

This would completely eliminate panics, usage mistakes without cost of branching (match glue).

The cost of moving would remain but I guess it could be optimised by inlining. Even if not, it might be good choice for people who prefer safety over performance.

I'll post implementation example later.

Edit: could you give me a real-world example of storing future in struct field?

Kixunil commented Apr 18, 2017

I think I figured out how to restore "object safety". I guess this is needed for reactors. The basic idea is that users could create instance of FutureObject<I, E> which would take care of dynamic dispatch. It involves a bit of unsafe inside but it provides completely safe interface for users.

This would completely eliminate panics, usage mistakes without cost of branching (match glue).

The cost of moving would remain but I guess it could be optimised by inlining. Even if not, it might be good choice for people who prefer safety over performance.

I'll post implementation example later.

Edit: could you give me a real-world example of storing future in struct field?

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Apr 18, 2017

So, I did it. As I've written previously, if user wants a "trait object" of future, he can use FutureObject<I, E>. It allows all things trait objects do (storing in array, not caring about implementation details, ...) while still being type safe.

As I mentioned, it involves bit of unsafe. The unsafe is hidden behind safe abstraction (idiomatic Rust) and not even public. Also, I believe it's simple enough to be correct. (The reasons are explained in comments in great details.)

The interesting thing is that the abstraction allows putting type unsafe futures into FutureObject too. Another thing I noticed is that fuse() is not needed anymore because the implementation statically prevents accidental poll of resolved future.

The more I play with that code, the more I like it. I'm not saying it's the only good solution but I'm confident it might be useful. I think that having both approaches would be useful. Especially if they function well together.

Kixunil commented Apr 18, 2017

So, I did it. As I've written previously, if user wants a "trait object" of future, he can use FutureObject<I, E>. It allows all things trait objects do (storing in array, not caring about implementation details, ...) while still being type safe.

As I mentioned, it involves bit of unsafe. The unsafe is hidden behind safe abstraction (idiomatic Rust) and not even public. Also, I believe it's simple enough to be correct. (The reasons are explained in comments in great details.)

The interesting thing is that the abstraction allows putting type unsafe futures into FutureObject too. Another thing I noticed is that fuse() is not needed anymore because the implementation statically prevents accidental poll of resolved future.

The more I play with that code, the more I like it. I'm not saying it's the only good solution but I'm confident it might be useful. I think that having both approaches would be useful. Especially if they function well together.

@ArtemGr

This comment has been minimized.

Show comment
Hide comment
@ArtemGr

ArtemGr Apr 18, 2017

I'm probably daft, but what IS the primary difference between Box<Future<Item=I, Error=E>> and FutureObject<I,E>? Why wouldn't Box<Future<Item=I, Error=E>> provide object safety?

ArtemGr commented Apr 18, 2017

I'm probably daft, but what IS the primary difference between Box<Future<Item=I, Error=E>> and FutureObject<I,E>? Why wouldn't Box<Future<Item=I, Error=E>> provide object safety?

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Apr 18, 2017

@ArtemGr Future as defined in futures-typesafe crate is not object safe because it takes self and obviously requires Self: Sized. Trait objects aren't Self: Sized. Thus, such trait can't be put inside box like Box<Future<Item=I, Error=E>>.

Future as defined in this, "type unsafe", crate is object safe because it uses only references. However, it's not type safe (it doesn't prevent users using it incorrectly; that may lead to panics etc).

Object safety is required for some use cases, as far as I understand, mostly reactors. FutureObject solves this by encapsulating Box<UnsafeFuture<Item=I, Error=E>>. behind safe abstraction. It provides safe interface for it. UnsafeFuture is almost same as Future in this crate with one exception: calling poll() on resolved future is undefined behaviour (UB).

In other words, Box<futures_typesafe::Future<Item=I, Error=E>> is invalid, while FutureObject<I,E> is valid and provides same functionality as one would expect.

Kixunil commented Apr 18, 2017

@ArtemGr Future as defined in futures-typesafe crate is not object safe because it takes self and obviously requires Self: Sized. Trait objects aren't Self: Sized. Thus, such trait can't be put inside box like Box<Future<Item=I, Error=E>>.

Future as defined in this, "type unsafe", crate is object safe because it uses only references. However, it's not type safe (it doesn't prevent users using it incorrectly; that may lead to panics etc).

Object safety is required for some use cases, as far as I understand, mostly reactors. FutureObject solves this by encapsulating Box<UnsafeFuture<Item=I, Error=E>>. behind safe abstraction. It provides safe interface for it. UnsafeFuture is almost same as Future in this crate with one exception: calling poll() on resolved future is undefined behaviour (UB).

In other words, Box<futures_typesafe::Future<Item=I, Error=E>> is invalid, while FutureObject<I,E> is valid and provides same functionality as one would expect.

@ArtemGr

This comment has been minimized.

Show comment
Hide comment
@ArtemGr

ArtemGr Apr 18, 2017

@Kixunil Thanks, I've mistaken futures_typesafe::Future with futures::Future for a moment.

ArtemGr commented Apr 18, 2017

@Kixunil Thanks, I've mistaken futures_typesafe::Future with futures::Future for a moment.

@alexcrichton

This comment has been minimized.

Show comment
Hide comment
@alexcrichton
Member

alexcrichton commented Apr 20, 2017

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Apr 22, 2017

@alexcrichton panic is not UB because there is a call to take() which is identical to std::mem::replace(&mut self.0, None), so after panic, drop is called on None and that is fine. Also, after panic, the implementation of futures_typesafe::Future for FutureObject doesn't return, so there is no self to call that function again.

The whole point of FutureObject is that it encapsulates UnsafeFuture in a way to guarantee that double poll won't happen. It implements futures_typesafe::Future and that mandates consuming self so it can't be called incorrectly by caller. The implementation itself could be incorrect but that's the point of unsafe blocks:

Rust doesn't end unsafety, it just builds a strong, high-visibility fence around it, with warning signs on the one gate to get inside. As opposed to C's approach, which was to have a sign on the periphery reading "lol good luck".

And I'm confident that my implementation is correct, because:

  • the unsafe poll() is called exactly once in whole function
  • the function consumes self
  • the function returns self if and only if the unsafe poll() returned Async::NotReady (that accounts for panics)

Kixunil commented Apr 22, 2017

@alexcrichton panic is not UB because there is a call to take() which is identical to std::mem::replace(&mut self.0, None), so after panic, drop is called on None and that is fine. Also, after panic, the implementation of futures_typesafe::Future for FutureObject doesn't return, so there is no self to call that function again.

The whole point of FutureObject is that it encapsulates UnsafeFuture in a way to guarantee that double poll won't happen. It implements futures_typesafe::Future and that mandates consuming self so it can't be called incorrectly by caller. The implementation itself could be incorrect but that's the point of unsafe blocks:

Rust doesn't end unsafety, it just builds a strong, high-visibility fence around it, with warning signs on the one gate to get inside. As opposed to C's approach, which was to have a sign on the periphery reading "lol good luck".

And I'm confident that my implementation is correct, because:

  • the unsafe poll() is called exactly once in whole function
  • the function consumes self
  • the function returns self if and only if the unsafe poll() returned Async::NotReady (that accounts for panics)
@alexcrichton

This comment has been minimized.

Show comment
Hide comment
@alexcrichton

alexcrichton Apr 24, 2017

Member

Ah yes I see now. If you'd like to pursue this I'd recommend moving it to a separate issue thread, as it's much more far reaching than just the title of this issue.

Member

alexcrichton commented Apr 24, 2017

Ah yes I see now. If you'd like to pursue this I'd recommend moving it to a separate issue thread, as it's much more far reaching than just the title of this issue.

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Apr 25, 2017

@alexcrichton yeah, I felt like it's becoming OT. I've created a new issue.

Kixunil commented Apr 25, 2017

@alexcrichton yeah, I felt like it's becoming OT. I've created a new issue.

@Arnavion

This comment has been minimized.

Show comment
Hide comment
@Arnavion

Arnavion Jun 28, 2017

Just so I understand - if I'm impl'ing futures v0.1 Stream for a data structure today, and given how existing combinators like or_else work, I must allow poll() to be called after it has returned an Err ? That is, poll() cannot panic in this situation and must return Ready(None) ?

Arnavion commented Jun 28, 2017

Just so I understand - if I'm impl'ing futures v0.1 Stream for a data structure today, and given how existing combinators like or_else work, I must allow poll() to be called after it has returned an Err ? That is, poll() cannot panic in this situation and must return Ready(None) ?

@alexcrichton

This comment has been minimized.

Show comment
Hide comment
@alexcrichton

alexcrichton Jun 30, 2017

Member

@Arnavion that's mostly right yeah. It's valid for streams to have poll called again even after they return an error, and the stream must not panic. (or at least not because of that)

Note though that you're allowed to do w/e you'd like after that. Once you return an error you can terminate the stream with Ready(None), you could yield more errors, more items, etc.

Member

alexcrichton commented Jun 30, 2017

@Arnavion that's mostly right yeah. It's valid for streams to have poll called again even after they return an error, and the stream must not panic. (or at least not because of that)

Note though that you're allowed to do w/e you'd like after that. Once you return an error you can terminate the stream with Ready(None), you could yield more errors, more items, etc.

@3n-mb

This comment has been minimized.

Show comment
Hide comment
@3n-mb

3n-mb Oct 17, 2017

I tend to think that it's much more common for stream errors to be "fatal" than recoverable, so I agree with @carllerche that we've probably chosen the wrong default here.

Very often chain stream.op1(...).op2(...).op3(...) will have tiny and simple functions/closures that go into opN(...).
Above else it makes code simple to read. 🥇
But this also means that a closure in op1 is not equipped to deal with, say 3, possible conditions besides a normal event. However trivial those runtime conditions can be on a big scale, from a point of view of a small section, their are fatal. Of cause, on a big scale, i.e. on the level of constructing process shaping operations, developer just adds appropriate catch and retry, where desired.

3n-mb commented Oct 17, 2017

I tend to think that it's much more common for stream errors to be "fatal" than recoverable, so I agree with @carllerche that we've probably chosen the wrong default here.

Very often chain stream.op1(...).op2(...).op3(...) will have tiny and simple functions/closures that go into opN(...).
Above else it makes code simple to read. 🥇
But this also means that a closure in op1 is not equipped to deal with, say 3, possible conditions besides a normal event. However trivial those runtime conditions can be on a big scale, from a point of view of a small section, their are fatal. Of cause, on a big scale, i.e. on the level of constructing process shaping operations, developer just adds appropriate catch and retry, where desired.

@3n-mb

This comment has been minimized.

Show comment
Hide comment
@3n-mb

3n-mb Oct 17, 2017

Above point is expressed around 13:20 in a C++ talk about observables. Judgment about error is done downstream, at the most opportune and clean place. Again, clean code theme is here.

3n-mb commented Oct 17, 2017

Above point is expressed around 13:20 in a C++ talk about observables. Judgment about error is done downstream, at the most opportune and clean place. Again, clean code theme is here.

@carllerche

This comment has been minimized.

Show comment
Hide comment
@carllerche

carllerche Oct 19, 2017

Contributor

For the record, I have changed my mind since I opened this issue and I believe that the current behavior is the best.

Contributor

carllerche commented Oct 19, 2017

For the record, I have changed my mind since I opened this issue and I believe that the current behavior is the best.

@3n-mb

This comment has been minimized.

Show comment
Hide comment
@3n-mb

3n-mb Oct 19, 2017

@carllerche why? However short you comment is, I cannot read your mind, and I'd love you to share an experience, that made you change your mind.

3n-mb commented Oct 19, 2017

@carllerche why? However short you comment is, I cannot read your mind, and I'd love you to share an experience, that made you change your mind.

@3n-mb

This comment has been minimized.

Show comment
Hide comment
@3n-mb

3n-mb Oct 20, 2017

There can be a middle ground here!

  1. Rx style close-all on error, can be handled by Stream::Error.

  2. The less dangerous errors, on which stream merely "skips the beat" but doesn't close, can be accommodated via Stream::Item being a Result<T> instead of bare T.

3n-mb commented Oct 20, 2017

There can be a middle ground here!

  1. Rx style close-all on error, can be handled by Stream::Error.

  2. The less dangerous errors, on which stream merely "skips the beat" but doesn't close, can be accommodated via Stream::Item being a Result<T> instead of bare T.

@Aarowaim

This comment has been minimized.

Show comment
Hide comment
@Aarowaim

Aarowaim Oct 21, 2017

A stream returning a Result<Item> should not terminate.

However when the resource underlying the stream returns an Err, the stream should be terminated. If the stream terminated, combinators on it can only return Err unless some action re-establishes the stream's live state (such as jumping the data pointer back to 0 for a stream made from a file resource).

I believe I lean towards handling Err outside of the stream, as it is the only clean way to escalate a problem to something that can fix it. A stream gets closed when its lifetime ends; it is not necessary to close when it becomes invalid and returns Err.

Aarowaim commented Oct 21, 2017

A stream returning a Result<Item> should not terminate.

However when the resource underlying the stream returns an Err, the stream should be terminated. If the stream terminated, combinators on it can only return Err unless some action re-establishes the stream's live state (such as jumping the data pointer back to 0 for a stream made from a file resource).

I believe I lean towards handling Err outside of the stream, as it is the only clean way to escalate a problem to something that can fix it. A stream gets closed when its lifetime ends; it is not necessary to close when it becomes invalid and returns Err.

@Aarowaim

This comment has been minimized.

Show comment
Hide comment
@Aarowaim

Aarowaim Oct 21, 2017

So to clarify, streams should be terminated (by the user), when the stream itself produces Err. The user can choose to allow object lifetimes to close the stream. They can also, if they know the cause, fix the problem and resume normal use of the stream.

Aarowaim commented Oct 21, 2017

So to clarify, streams should be terminated (by the user), when the stream itself produces Err. The user can choose to allow object lifetimes to close the stream. They can also, if they know the cause, fix the problem and resume normal use of the stream.

@3n-mb

This comment has been minimized.

Show comment
Hide comment
@3n-mb

3n-mb Oct 21, 2017

streams should be terminated (by the user), when the stream itself produces Err.

Consider the following life example from use of rx.
I use library, or module, written by someone else, of cause. This library let's me to introduce a section of my streams.

Now Err comes from that section. How do I know, if 3rd party code will be ok with me skipping this error, treating it as a "skipped beat", instead of treating it like "big error", for which I should drop the stream and restart its processing from scratch. How am I to know this?

😄 the answer is not "read 3rd party code documentation".

3n-mb commented Oct 21, 2017

streams should be terminated (by the user), when the stream itself produces Err.

Consider the following life example from use of rx.
I use library, or module, written by someone else, of cause. This library let's me to introduce a section of my streams.

Now Err comes from that section. How do I know, if 3rd party code will be ok with me skipping this error, treating it as a "skipped beat", instead of treating it like "big error", for which I should drop the stream and restart its processing from scratch. How am I to know this?

😄 the answer is not "read 3rd party code documentation".

@3n-mb

This comment has been minimized.

Show comment
Hide comment
@3n-mb

3n-mb Oct 21, 2017

I thought about the form. How about the following formulation.
Let's have completion with error and a natural one. And let's have an event that is either ok, or is a lost-beat error.

  • If a combinator has a bad error, it does a completion with error.
  • If a combinator has an error condition, but it may continue working with new events, then it produces a lost-beat error, continuing its operation as normal.
  • When following combinator gets a lost-beat error, it knows itself if such thing is tolerated by its own algorithm. Tolerated lost-beat error is passed down stream. Else, completion with error is done.
  • When lost-beat error comes to end, developer can do her own logical decision about it, as usual.

3n-mb commented Oct 21, 2017

I thought about the form. How about the following formulation.
Let's have completion with error and a natural one. And let's have an event that is either ok, or is a lost-beat error.

  • If a combinator has a bad error, it does a completion with error.
  • If a combinator has an error condition, but it may continue working with new events, then it produces a lost-beat error, continuing its operation as normal.
  • When following combinator gets a lost-beat error, it knows itself if such thing is tolerated by its own algorithm. Tolerated lost-beat error is passed down stream. Else, completion with error is done.
  • When lost-beat error comes to end, developer can do her own logical decision about it, as usual.
@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Nov 1, 2017

@3n-mb Very good question! I'm big believer in the type system and I believe we should have different types/traits for different behaviors. That way the compiler can check our code.

Kixunil commented Nov 1, 2017

@3n-mb Very good question! I'm big believer in the type system and I believe we should have different types/traits for different behaviors. That way the compiler can check our code.

@dvtomas

This comment has been minimized.

Show comment
Hide comment
@dvtomas

dvtomas Dec 14, 2017

Hi,
just my $0.02. I'm new to Rust, but I've used to work with RxScala and a little bit with another Scala library, Monix (better designed IMO) in the past. Both strive to be compatible with the Reactive Streams spec. I'm not really sure how similar futures streams are or want to be to the Reactive Streams spec, but the spec considers the reactive stream done with after encountering an error, no more items are emitted after an error, so I'm used to this behavior.

I'm not sure if this is relevant to the discussion as I'm VERY new to Rust and futures streams. Still, I wanted to mention Reactive Streams in case somebody finds it useful.

dvtomas commented Dec 14, 2017

Hi,
just my $0.02. I'm new to Rust, but I've used to work with RxScala and a little bit with another Scala library, Monix (better designed IMO) in the past. Both strive to be compatible with the Reactive Streams spec. I'm not really sure how similar futures streams are or want to be to the Reactive Streams spec, but the spec considers the reactive stream done with after encountering an error, no more items are emitted after an error, so I'm used to this behavior.

I'm not sure if this is relevant to the discussion as I'm VERY new to Rust and futures streams. Still, I wanted to mention Reactive Streams in case somebody finds it useful.

@3n-mb

This comment has been minimized.

Show comment
Hide comment
@3n-mb

3n-mb Dec 21, 2017

Sounds like Reactive Streams deal with non-blocking backpressure.
This overflow notes that streams are pull-based, while observables (rx, or Reactive Extensions) are push-based.

How much tokio streams are pull-based, i.e. include backpressure control, versus observables (no inbuilt backpressure control)?
Or, were do tokio streams fit between reactive streams and observables?

3n-mb commented Dec 21, 2017

Sounds like Reactive Streams deal with non-blocking backpressure.
This overflow notes that streams are pull-based, while observables (rx, or Reactive Extensions) are push-based.

How much tokio streams are pull-based, i.e. include backpressure control, versus observables (no inbuilt backpressure control)?
Or, were do tokio streams fit between reactive streams and observables?

@Kixunil

This comment has been minimized.

Show comment
Hide comment
@Kixunil

Kixunil Jan 1, 2018

@3n-mb I think this is off-topic. Streams are pull-based and sinks are push-based (but they still have backpresure). There's also unbounded channel, which has push-based non-blocking end.

Kixunil commented Jan 1, 2018

@3n-mb I think this is off-topic. Streams are pull-based and sinks are push-based (but they still have backpresure). There's also unbounded channel, which has push-based non-blocking end.

@luben

This comment has been minimized.

Show comment
Hide comment
@luben

luben Jan 15, 2018

@3n-mb the linked overview is about java-streams, not reactive streams. BTW the reactive streams are now in Java 9 as java.util.concurrent.Flow.

luben commented Jan 15, 2018

@3n-mb the linked overview is about java-streams, not reactive streams. BTW the reactive streams are now in Java 9 as java.util.concurrent.Flow.

@3n-mb

This comment has been minimized.

Show comment
Hide comment
@3n-mb

3n-mb Jan 15, 2018

@luben I assume you talk about this link, and it compares the two:

There are significant differences between Observable and Stream:
...

@luben Yes! It is even in Java!
Both reactive streams and observables are recognized by many language communities as being useful for folks like me, i.e. regular developers, users of core libraries.
Rust? 😢

3n-mb commented Jan 15, 2018

@luben I assume you talk about this link, and it compares the two:

There are significant differences between Observable and Stream:
...

@luben Yes! It is even in Java!
Both reactive streams and observables are recognized by many language communities as being useful for folks like me, i.e. regular developers, users of core libraries.
Rust? 😢

@aturon

This comment has been minimized.

Show comment
Hide comment
@aturon

aturon Feb 12, 2018

Collaborator

@cramertj was there any particular decision here?

Collaborator

aturon commented Feb 12, 2018

@cramertj was there any particular decision here?

@aturon

This comment has been minimized.

Show comment
Hide comment
@aturon

aturon Mar 19, 2018

Collaborator

This has been resolved.

Collaborator

aturon commented Mar 19, 2018

This has been resolved.

@aturon aturon closed this Mar 19, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment