Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow request-coordinating Processor implementations to pass the TCK #414

Merged
merged 6 commits into from Nov 30, 2017

Conversation

@akarnokd
Copy link
Contributor

@akarnokd akarnokd commented Nov 23, 2017

This PR fixes the TCK to support request-coordinating Processor implementations. These kinds of Processors may either

  • coordinate the request amounts of their Subscribers and request only from upstream when all Subscribers requested something; or
  • coordinate emissions, requesting a bounded amount upfront from the upstream and then emitting only when all Subscribers have requested something.

From the downstream Subscribers' perspective, both manifest as lack of emissions.

The IdentityProcessorVerification.doesCoordinatedEmission() added will affect

  • required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError
  • required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLongAgo

By adjusting the test request pattern to request on both Subscribers before asserting on events received.

The LockstepProcessorTest contains the LockstepProcessor implementation which coordinates emissions when there are multiple Subscribers.

Replaces #287.
Original issue #284.

* request amounts and only delivers onNext signals if all Subscribers have
* indicated (via their Subscription#request(long)) they are ready to receive elements.
*/
public boolean doesCoordinatedEmission() {

This comment has been minimized.

@viktorklang

viktorklang Nov 24, 2017
Contributor

Since most devs probably won't ever notice this method, or know that they should enable/disable it—what do you think about including a hint about it in case the tests which use it fail?

(Sorry, this comment was on the wrong line before)

This comment has been minimized.

@ktoso

ktoso Nov 29, 2017
Contributor

Documenting it more in the README of the TCK could be a good step?

This comment has been minimized.

@akarnokd

akarnokd Nov 29, 2017
Author Contributor

I can add some description to it in a couple of hours.

This comment has been minimized.

@akarnokd

akarnokd Nov 29, 2017
Author Contributor

Done.

sub1.request(1);
sub2.request(1);

sendError(ex);

This comment has been minimized.

@viktorklang

viktorklang Nov 26, 2017
Contributor

Perhaps put sendError(ex); after the branch (since it is the last statement in each "leg" of the branch.

expectNextElement(sub1, z);
expectNextElement(sub2, z);

if (totalRequests == 3) {

This comment has been minimized.

@viktorklang

viktorklang Nov 26, 2017
Contributor

Since this is the same behavior on each side of the branch, perhaps put it after the branch?

* <p>
* The test is not executed if {@link IdentityProcessorVerification#maxSupportedSubscribers()} is less than 2.
* <p>
* If this test fails, the following could be checked within the {@code Publisher} implementation:

This comment has been minimized.

@viktorklang

viktorklang Nov 27, 2017
Contributor

If we wanted to include a hint about doesCoordinatedEmission either when running the test case, or when the testcase fails, what would you propose?

This comment has been minimized.

@akarnokd

akarnokd Nov 27, 2017
Author Contributor

We could introduce an overload to expectNextElement that takes an error message that hints about that method.

This comment has been minimized.

@viktorklang

viktorklang Nov 27, 2017
Contributor

👍

This comment has been minimized.

@ktoso

ktoso Nov 29, 2017
Contributor

That sounds very good! (the hint override)

expectNextElement(sub2, x);

sub1.request(1);
sub2.request(1);

This comment has been minimized.

@viktorklang

viktorklang Nov 27, 2017
Contributor

Shouldn't expectRequest() work after these two?

This comment has been minimized.

@akarnokd

akarnokd Nov 27, 2017
Author Contributor

Yes, it should. I'll update.

@viktorklang
Copy link
Contributor

@viktorklang viktorklang commented Nov 27, 2017

@akarnokd Thanks for taking a stab at this issue!

@ktoso, @reactive-streams/contributors Anyone else have feedback on this?

final T z = sendNextTFromUpstream();
expectNextElement(sub1, z,
"If the Processor coordinates requests/emissions when having multiple Subscribers"
+ " at once, please override doesCoordinatedEmission() in this "

This comment has been minimized.

@viktorklang

viktorklang Nov 27, 2017
Contributor

"please override doesCoordinatedEmission() to return true in this"

This comment has been minimized.

@akarnokd

akarnokd Nov 27, 2017
Author Contributor

Updated.

@ktoso
Copy link
Contributor

@ktoso ktoso commented Nov 27, 2017

I do want to review this, please don't merge yet.
Esp. I'm worried about the use of coordinated omission -- I don't think this is an instance of that, and the term is used a bit misleadingly. Coordinated omission is about a measurement problem where the measuring-party is not measuring the real latencies due to backpressure from stressed system, I don't think that a Processor that handles many subscribers sis the same thing.

I want to review the thing in depth to be able to reply more precisely though

@viktorklang
Copy link
Contributor

@viktorklang viktorklang commented Nov 27, 2017

@ktoso Are you conflating "coordinated omission" with "coordinated emission"?

In any case, perhaps there's a better term, I'm all for improvements when it comes to names :)

@ktoso
Copy link
Contributor

@ktoso ktoso commented Nov 27, 2017

Not conflating, but misreading what was in the PR it seems 😉
Yeah emission is fine, but perhaps a bit too close to that one...

Anyway, want to review this but completely under-water today, hope to get to it tomorrow

@viktorklang
Copy link
Contributor

@viktorklang viktorklang commented Nov 27, 2017

@ktoso No worries! :-)

if (!elem.equals(expected)) {
env.flop(String.format("Received `onNext(%s)` on downstream but expected `onNext(%s)`", elem, expected));
}
}

This comment has been minimized.

@ktoso

ktoso Nov 29, 2017
Contributor

Nice 👍

@viktorklang
Copy link
Contributor

@viktorklang viktorklang commented Nov 29, 2017

@ktoso
ktoso approved these changes Nov 29, 2017
Copy link
Contributor

@ktoso ktoso left a comment

Looks good as far as I can see, thanks for implementing it @akarnokd 👍

@viktorklang
Copy link
Contributor

@viktorklang viktorklang commented Nov 30, 2017

VERY nice, @akarnokd! Merging!

@viktorklang viktorklang merged commit 43da5a9 into reactive-streams:master Nov 30, 2017
1 check passed
1 check passed
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@viktorklang viktorklang added this to the 1.0.2 milestone Nov 30, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

3 participants