Skip to content

Commit

Permalink
Ensure TCK passes with empty publisher that synchronously completes
Browse files Browse the repository at this point in the history
  • Loading branch information
jroper committed Jan 2, 2018
1 parent 8d68455 commit 61065a9
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -570,6 +571,8 @@ public void expectNone(long withinMillis, String errMsgPrefix) throws Interrupte

public static class ManualSubscriberWithSubscriptionSupport<T> extends ManualSubscriber<T> {

private final AtomicBoolean onSubscribeCalled = new AtomicBoolean();

public ManualSubscriberWithSubscriptionSupport(TestEnvironment env) {
super(env);
}
Expand All @@ -587,7 +590,7 @@ public void onNext(T element) {
@Override
public void onComplete() {
env.debug(this + "::onComplete()");
if (subscription.isCompleted()) {
if (onSubscribeCalled.get()) {
super.onComplete();
} else {
env.flop("Subscriber::onComplete() called before Subscriber::onSubscribe");
Expand All @@ -599,6 +602,7 @@ public void onSubscribe(Subscription s) {
env.debug(String.format("%s::onSubscribe(%s)", this, s));
if (!subscription.isCompleted()) {
subscription.complete(s);
onSubscribeCalled.set(true);
} else {
env.flop("Subscriber::onSubscribe called on an already-subscribed Subscriber");
}
Expand All @@ -607,7 +611,7 @@ public void onSubscribe(Subscription s) {
@Override
public void onError(Throwable cause) {
env.debug(String.format("%s::onError(%s)", this, cause));
if (subscription.isCompleted()) {
if (onSubscribeCalled.get()) {
super.onError(cause);
} else {
env.flop(cause, String.format("Subscriber::onError(%s) called before Subscriber::onSubscribe", cause));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,39 @@ public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete_shou
}, "Subscriber::onComplete() called before Subscriber::onSubscribe");
}

@Test
public void optional_spec105_emptyStreamMustTerminateBySignallingOnComplete_shouldPass_whenOnCompleteSynchronouslyInovked() throws Throwable {
final Publisher<Integer> synchronousOnCompletePublisher = new Publisher<Integer>() {
@Override public void subscribe(final Subscriber<? super Integer> s) {
s.onSubscribe(new Subscription() {
@Override public void request(long n) { }
@Override public void cancel() { }
});
s.onComplete();
}
};

requireOptionalTestPass(new ThrowingRunnable() {
@Override public void run() throws Throwable {
PublisherVerification<Integer> verification = new PublisherVerification<Integer>(newTestEnvironment()) {
@Override public Publisher<Integer> createPublisher(long elements) {
return synchronousOnCompletePublisher;
}

@Override public long maxElementsFromPublisher() {
return 0; // it is an "empty" Publisher
}

@Override public Publisher<Integer> createFailedPublisher() {
return null;
}
};

verification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete();
}
});
}

@Test
public void required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled_shouldFailForNotCompletingPublisher() throws Throwable {
requireTestFailure(new ThrowingRunnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ public void requireTestSkip(ThrowingRunnable run, String msgPart) {
throw new RuntimeException("Expected TCK to SKIP this test, instead if PASSed!");
}

public void requireOptionalTestPass(ThrowingRunnable run) {
try {
run.run();
} catch (SkipException skip) {
throw new RuntimeException("Expected TCK to PASS this test, instead it was SKIPPED", skip.getCause());
} catch (Throwable throwable) {
throw new RuntimeException(
String.format("Expected TCK to PASS this test, yet it threw %s(%s) instead!",
throwable.getClass().getName(), throwable.getMessage()), throwable);
}
}

/**
* This publisher does NOT fulfil all Publisher spec requirements.
* It's just the bare minimum to enable this test to fail the Subscriber tests.
Expand Down

0 comments on commit 61065a9

Please sign in to comment.