Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive Streams impl #1282

Merged
merged 30 commits into from
Feb 24, 2020
Merged

Reactive Streams impl #1282

merged 30 commits into from
Feb 24, 2020

Conversation

danielkec
Copy link
Contributor

@danielkec danielkec commented Jan 9, 2020

Reactive Streams Operators

Reactive Streams implementation based on existing Helidon Common Reactive Library compliant with
Reactive streams for JVM.

Part of the implementation of /issues/1206

@danielkec danielkec self-assigned this Jan 9, 2020
@danielkec danielkec added this to the 2.0.0 milestone Jan 9, 2020
@danielkec danielkec added the enhancement New feature or request label Jan 9, 2020
@danielkec
Copy link
Contributor Author

image

@danielkec
Copy link
Contributor Author

Rebased on @tomas-langer 's native image changes in the master, sorry for force-push

Copy link

@olotenko olotenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this declared R if onNext expects it to be X? (I know, this change is just adapting old to new, but still)

Copy link

@olotenko olotenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Atomic properties of these are not used. It's better to declare them volatile. (But I am not even convinced they have to be volatile - the onSubscribe/onNext/onComplete/onError protocol is single-threaded)

Copy link

@olotenko olotenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hybrid* may be better expressed as an interface that implements both, with mutually-recursive default methods, then from* static methods construct concrete implementations that override either half of those mutually-recursive methods. This way you don't need to always check for what type of processor is set.

@danielkec
Copy link
Contributor Author

@olotenko

Why is this declared R if onNext expects it to be X? (I know, this change is just adapting old to new, but still)

I can't see to which file is the comment pointing, but I guess this is about FlatMapProcessor, its a fix of the previous PR #1260 , its different/inner subscriber

        public InnerSubscriber<? super X> executeMapper(U item) {
         ...
      }

    private class InnerSubscriber<R> implements Flow.Subscriber<R> {
        ...
        public void onNext(R o) {
            Objects.requireNonNull(o);
            MultiFlatMapProcessor.this.subscriber.onNext((X) o);
            ...

@olotenko
Copy link

Right, that's the place. Note that X is still available in onNext of the InnerSubscriber<R>, so that onNext knows what X is, and expects the item to be of type X. So it seems only suitable to declare it implementing Flow.Subscriber<X>, not introducing a new type parameter R.

Mind you, I am not blocking the merge, just suggesting what may be a good idea to review as a future improvement.

@danielkec
Copy link
Contributor Author

@olotenko Thx a lot, Hybrids with default methods are great idea, why I didn't think of it! Also thanks for generics in the flatMap.
You are right about over-usage of Atomic* but I wouldn't get rid of volatiles in the processors so fast, its true that by the spec some of the signals must be executed serially, but reactive streams can incorporate to the stream 3rd party publishers/subscribers/processors and those can be implemented in "various" ways. It seems to me better to be defensive in this case.

@danielkec
Copy link
Contributor Author

Apology for force push, needed rebase on shrinkwrap upgrade in master

@danielkec
Copy link
Contributor Author

This is not what has been discussed by email.

This is a nonsensical implementation of that rule in that spec.

It's just a quick fix passing tck tests so we are able to move forward, not the final solution.

danielkec added a commit to danielkec/helidon that referenced this pull request Feb 3, 2020
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
@danielkec
Copy link
Contributor Author

The implementation is not what has been discussed.

Please, split into smaller commits: one for changes to BaseProcessor, others for other changes.

60739a8#diff-ca92013a99c748b99a9e08660b9dea17R99 - racy. This should have been called only when this.subscription has been set.

it is

    private void tryOnSubscribe() {
        if (Objects.nonNull(subscription) && subscriber.tryOnSubscribe(this)) {
            if (done) {
                tryComplete();
            }
        }
    }

@tomas-langer
Copy link
Member

Hi Alex, is there any blocking issue that would prevent us from merging this pull request, or can we create a follow up issue (or issues) to fix some of your comments?
We need to move forward with the MP messaging work that depends on these changes.

@akarnokd
Copy link
Collaborator

Why did you implement operators as Flow.Processors? Implementing one implies you either want to multicast events to any number of downstream consumers or requires you to ensure there is at most one downstream consumer that can subscribe.

@danielkec
Copy link
Contributor Author

danielkec commented Feb 19, 2020

Why did you implement operators as Flow.Processors? Implementing one implies you either want to multicast events to any number of downstream consumers or requires you to ensure there is at most one downstream consumer that can subscribe.

Hi @akarnokd , sure second subscribe signals onError with IllegalStateException to second subscriber which is initialized with empty subscription

EDIT: wow thanks a lot, totally missed that!

@akarnokd
Copy link
Collaborator

Yes, I saw those.

Still, I don't see why implement operators as Flow.Processors. Do you intend to drive such chains via onNext calls imperatively somewhere?

With a processor chain, you trigger a subscribe() storm while you are still assembling maps and filters, which could trigger side-effects, resource utilization or even failure before you are ready to consume the whole sequence with a Flow.Subscriber. This is similar to how composing CompletionStage operators actually can race with the values in each of those stages. In addition, you are not allowed to drive individual Flow.Processors in a chain externally because that could violate the serial requirement of onXXXs.

You can have simply a chain of Flow.Publishers as operators and if necessary, have the upmost source be a Flow.Processor for imperative item emission.

@akarnokd
Copy link
Collaborator

If you need one example why a chain of Flow.Processors don't work: repeating or retrying a chain is not possible because the intermediate Flow.Processors will be in terminal state. You'd have to recreate the entire chain from scratch with fresh Flow.Processors to allow a new run.

With Flow.Publishers, they hold the blueprint of the operators thus a retry due to an error is straightforward and requires no manual chain recreation.

  * Fix second subscriber cancellation in the BaseProcessor

Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Copy link
Member

@tomas-langer tomas-langer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@akarnokd
Copy link
Collaborator

How is it not emitted serially?

@olotenko
Copy link

olotenko commented Feb 24, 2020

request can be during onNext and can be concurrent. Emitting onError concurrently breaks the guarantee that all on* invocations are in total order. Unguarded invocation of onNext / onError will fail the recursion limitations - the stack depth becomes unbounded.

@akarnokd
Copy link
Collaborator

akarnokd commented Feb 24, 2020

Wrong. The atomic state transition of addRequest with requiring the transition from zero ensures there is only one thread entering the emission section.

@olotenko
Copy link

olotenko commented Feb 24, 2020

Well, it's more subtle than that, because it also interacts with canceled, but ok, yes, it is serial then. The alternative that was in review used the same trick, just it was more obvious: olotenko@ea05dd2#diff-75074b35969a7ab9a07495bf20596b1fR102

@akarnokd
Copy link
Collaborator

Yes, the TCK expects a clear failure for non-positive request amounts which means the Publisher should not fail or complete on its own or get cancelled before that. This is one of the pain points of the TCK.

@olotenko
Copy link

olotenko commented Feb 24, 2020

That requirement ("should not fail or complete on its own or get cancelled before that") is non-enforceable, and the code you submitted does not guarantee that either.

Eg cancel followed synchronously by request(-1) - request wins, and will produce onError. Whereas the spec requires a cancelled Subscription to behave as no-op. I treat this as not a bug, because there is no requirement for cancel and request to be observed in a total order.

But the part where you say "should not complete or error before that" is just not enforceable. If the Publisher has passed the branch where it determined !hasNext, then a request(-1) will not produce onError. I treat this as not a bug, just the nature of concurrent systems. But this means you can't enforce that claim.

All the try-catch can, and possibly should, be fused into one: the only difference in behaviour is that if onNext throws, it will produce onError, whereas currently it won't - but this is not wrong; and if it doesn't throw, then the fusion of all try-catch doesn't harm. The intermediate checks for canceled are unnecessary for the above reasons. The spec allows cancel and request(...) to be observed "eventually". There is no requirement to observe them immediately. So if it races against other on*, there is no requirement to prefer issuing error produced by a bad value in request(...). After that you get pretty much the loop body that was in review: olotenko@ea05dd2#diff-75074b35969a7ab9a07495bf20596b1fR107-R133 - with a subtle difference in how we determine mutual exclusion of who executes the on*.

@akarnokd
Copy link
Collaborator

Eg cancel followed synchronously by request(-1) - request wins, and will produce onError. Whereas the spec requires a cancelled Subscription to behave as no-op.

Yes and such bad request amounts should not go unnoticed because it means there is a bug somewhere in the chain. Since there is currently no established approach in the module to not lose exceptions, this is the best it can be done while other operators are being rewritten. We are discussing the possibility of a global error consumer on Slack.

All the try-catch can, and possibly should, be fused into one. The checks for canceled are unnecessary for the above reasons.

With a general Iterator, it could take an arbitrary time for hasNext and next to return, which if the sequence get canceled due to timeout, would enter the other method and wait even more. This setup bails out eagerly.

@olotenko
Copy link

olotenko commented Feb 24, 2020

Yes and such bad request amounts should not go unnoticed

Yes, but you can't guarantee that after cancel has been fired (the spec requires no-op behaviour, and the same is true with all implementations - after canceled has been observed, negative request values will not be notified via onError), or Publisher committed to fire onError / onComplete. In other cases it will be noticed. The time to execute hasNext / next is immaterial for correctness. Given these premises, there is no proof eager notification is any better than lazy. On the contrary, there are good reasons to optimize the happy path at expense of handling error cases somewhat slower.

@olotenko
Copy link

"global error consumer" is a "unhandled exception handler" and should be at the root of any thread.

@akarnokd
Copy link
Collaborator

The reactive foundation of Helidon is currently incomplete and is not yet prepared for all corner cases within and outside the spec and the TCK. This implementation passes the TCK and enables the development of more operators.

The time to execute hasNext / next is immaterial for correctness.

They pose practical considerations.

"global error consumer" is a "unhandled exception handler" and should be at the root of any thread

If you control all the threads that may come into contact with the reactive operators.

@olotenko
Copy link

olotenko commented Feb 24, 2020

They pose practical considerations.

:) what can be more practical than fencing off problematic implementations instead of punishing everyone? Eg EagerCancellationIterablePublisher, or TimedIterableWrapper to be used with Iterables that are problematic.

3f61017#diff-6e58085854ee1341e169599585b70a21R117-R119 - you've already spent CPU cycles getting the value, you may just as well let the Subscriber have it.

@akarnokd
Copy link
Collaborator

I welcome you to benchmark implementations with and without those volatile reads.

@olotenko
Copy link

olotenko commented Feb 24, 2020

:) simpler linear code with fewer things to think about, rather. It's not just a volatile read here, too.

I get 15% difference with a iterator that just produces a range of integers. Propose a test where it matters or does not matter?

@danielkec danielkec merged commit f7f1486 into helidon-io:master Feb 24, 2020
@danielkec
Copy link
Contributor Author

Continuation of alternative cancelation strategy moved to /issues/1441

danielkec added a commit that referenced this pull request Mar 16, 2020
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants