Skip to content

Commit

Permalink
Implementing renaming proposal for the interop with j.u.c.Flow (#402)
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorklang committed Nov 8, 2017
1 parent c24b476 commit a4abf38
Show file tree
Hide file tree
Showing 15 changed files with 39 additions and 39 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ subprojects {
"reactive-streams-tck",
"reactive-streams-tck-flow",
"reactive-streams-examples",
"reactive-streams-flow-bridge"]) {
"reactive-streams-flow-adapters"]) {
apply plugin: "maven"
apply plugin: "signing"

Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion flow-bridge/build.gradle → flow-adapters/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
description = 'reactive-streams-flow-bridge'
description = 'reactive-streams-flow-adapters'

dependencies {
compile project(':reactive-streams')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
/**
* Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API.
*/
public final class ReactiveStreamsFlowBridge {
public final class FlowAdapters {
/** Utility class. */
private ReactiveStreamsFlowBridge() {
private FlowAdapters() {
throw new IllegalStateException("No instances!");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class ReactiveStreamsFlowBridgeTest {
public class FlowAdaptersTest {
@Test
public void reactiveToFlowNormal() {
MulticastPublisher<Integer> p = new MulticastPublisher<Integer>(new Executor() {
Expand All @@ -31,7 +31,7 @@ public void execute(Runnable command) {

TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

ReactiveStreamsFlowBridge.toFlowPublisher(p).subscribe(tc);
FlowAdapters.toFlowPublisher(p).subscribe(tc);

p.offer(1);
p.offer(2);
Expand All @@ -54,7 +54,7 @@ public void execute(Runnable command) {

TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

ReactiveStreamsFlowBridge.toFlowPublisher(p).subscribe(tc);
FlowAdapters.toFlowPublisher(p).subscribe(tc);

p.offer(1);
p.offer(2);
Expand All @@ -77,7 +77,7 @@ public void execute(Runnable command) {

TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

ReactiveStreamsFlowBridge.toPublisher(p).subscribe(tc);
FlowAdapters.toPublisher(p).subscribe(tc);

p.submit(1);
p.submit(2);
Expand All @@ -100,7 +100,7 @@ public void execute(Runnable command) {

TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

ReactiveStreamsFlowBridge.toPublisher(p).subscribe(tc);
FlowAdapters.toPublisher(p).subscribe(tc);

p.submit(1);
p.submit(2);
Expand All @@ -116,7 +116,7 @@ public void execute(Runnable command) {
public void reactiveStreamsToFlowSubscriber() {
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

Flow.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toFlowSubscriber(tc);
Flow.Subscriber<Integer> fs = FlowAdapters.toFlowSubscriber(tc);

final Object[] state = { null, null };

Expand Down Expand Up @@ -148,7 +148,7 @@ public void cancel() {
public void flowToReactiveStreamsSubscriber() {
TestEitherConsumer<Integer> tc = new TestEitherConsumer<Integer>();

org.reactivestreams.Subscriber<Integer> fs = ReactiveStreamsFlowBridge.toSubscriber(tc);
org.reactivestreams.Subscriber<Integer> fs = FlowAdapters.toSubscriber(tc);

final Object[] state = { null, null };

Expand Down Expand Up @@ -192,8 +192,8 @@ public void stableConversionForSubscriber() {
@Override public void onComplete() {};
};

Assert.assertSame(ReactiveStreamsFlowBridge.toSubscriber(ReactiveStreamsFlowBridge.toFlowSubscriber(rsSub)), rsSub);
Assert.assertSame(ReactiveStreamsFlowBridge.toFlowSubscriber(ReactiveStreamsFlowBridge.toSubscriber(fSub)), fSub);
Assert.assertSame(FlowAdapters.toSubscriber(FlowAdapters.toFlowSubscriber(rsSub)), rsSub);
Assert.assertSame(FlowAdapters.toFlowSubscriber(FlowAdapters.toSubscriber(fSub)), fSub);
}

@Test
Expand All @@ -214,8 +214,8 @@ public void stableConversionForProcessor() {
@Override public void subscribe(Flow.Subscriber s) {};
};

Assert.assertSame(ReactiveStreamsFlowBridge.toProcessor(ReactiveStreamsFlowBridge.toFlowProcessor(rsPro)), rsPro);
Assert.assertSame(ReactiveStreamsFlowBridge.toFlowProcessor(ReactiveStreamsFlowBridge.toProcessor(fPro)), fPro);
Assert.assertSame(FlowAdapters.toProcessor(FlowAdapters.toFlowProcessor(rsPro)), rsPro);
Assert.assertSame(FlowAdapters.toFlowProcessor(FlowAdapters.toProcessor(fPro)), fPro);
}

@Test
Expand All @@ -228,7 +228,7 @@ public void stableConversionForPublisher() {
@Override public void subscribe(Flow.Subscriber s) {};
};

Assert.assertSame(ReactiveStreamsFlowBridge.toPublisher(ReactiveStreamsFlowBridge.toFlowPublisher(rsPub)), rsPub);
Assert.assertSame(ReactiveStreamsFlowBridge.toFlowPublisher(ReactiveStreamsFlowBridge.toPublisher(fPub)), fPub);
Assert.assertSame(FlowAdapters.toPublisher(FlowAdapters.toFlowPublisher(rsPub)), rsPub);
Assert.assertSame(FlowAdapters.toFlowPublisher(FlowAdapters.toPublisher(fPub)), fPub);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ public void run() {
sp.close();
}
}).start();
return ReactiveStreamsFlowBridge.toPublisher(sp);
return FlowAdapters.toPublisher(sp);
}

@Override
public Publisher<Integer> createFailedPublisher() {
final SubmissionPublisher<Integer> sp = new SubmissionPublisher<Integer>();
sp.closeExceptionally(new IOException());
return ReactiveStreamsFlowBridge.toPublisher(sp);
return FlowAdapters.toPublisher(sp);
}

@Override
Expand Down
14 changes: 7 additions & 7 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,29 @@ try {
Class.forName("java.util.concurrent.Flow")
jdkFlow = true
println(ANSI_GREEN + " INFO: ------------------ JDK9 classes detected ---------------------------------" + ANSI_RESET)
println(ANSI_GREEN + " INFO: Java 9 Flow API found; Including [flow-bridge, tck-flow] in build. " + ANSI_RESET)
println(ANSI_GREEN + " INFO: Java 9 Flow API found; Including [flow-adapters, tck-flow] in build. " + ANSI_RESET)
println(ANSI_GREEN + " INFO: --------------------------------------------------------------------------" + ANSI_RESET)
} catch (Throwable ex) {
// Flow API not available
println(ANSI_RED + "WARNING: ------------------ JDK9 classes NOT detected -----------------------------" + ANSI_RESET)
println(ANSI_RED + "WARNING: Java 9 Flow API not found; Not including [flow-bridge, tck-flow] in build." + ANSI_RESET)
println(ANSI_RED + "WARNING: In order to execute the complete test-suite run the build using JDK9+. " + ANSI_RESET)
println(ANSI_RED + "WARNING: --------------------------------------------------------------------------" + ANSI_RESET)
println(ANSI_RED + "WARNING: -------------------- JDK9 classes NOT detected -----------------------------" + ANSI_RESET)
println(ANSI_RED + "WARNING: Java 9 Flow API not found; Not including [flow-adapters, tck-flow] in build." + ANSI_RESET)
println(ANSI_RED + "WARNING: In order to execute the complete test-suite run the build using JDK9+. " + ANSI_RESET)
println(ANSI_RED + "WARNING: ----------------------------------------------------------------------------" + ANSI_RESET)
}

include ':reactive-streams'
include ':reactive-streams-tck'
include ':reactive-streams-examples'

if (jdkFlow) {
include ':reactive-streams-flow-bridge'
include ':reactive-streams-flow-adapters'
include ':reactive-streams-tck-flow'
}

project(':reactive-streams').projectDir = "$rootDir/api" as File
project(':reactive-streams-tck').projectDir = "$rootDir/tck" as File
project(':reactive-streams-examples').projectDir = "$rootDir/examples" as File
if (jdkFlow) {
project(':reactive-streams-flow-bridge').projectDir = "$rootDir/flow-bridge" as File
project(':reactive-streams-flow-adapters').projectDir = "$rootDir/flow-adapters" as File
project(':reactive-streams-tck-flow').projectDir = "$rootDir/tck-flow" as File
}
2 changes: 1 addition & 1 deletion tck-flow/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ description = 'reactive-streams-tck-flow'
dependencies {
compile group: 'org.testng', name: 'testng', version:'5.14.10'
compile project(':reactive-streams-tck')
compile project(':reactive-streams-flow-bridge')
compile project(':reactive-streams-flow-adapters')
}
test.useTestNG()
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
package org.reactivestreams.tck.flow;

import org.reactivestreams.Publisher;
import org.reactivestreams.ReactiveStreamsFlowBridge;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;

Expand All @@ -36,7 +36,7 @@ public FlowPublisherVerification(TestEnvironment env) {
@Override
final public Publisher<T> createPublisher(long elements) {
final Flow.Publisher<T> flowPublisher = createFlowPublisher(elements);
return ReactiveStreamsFlowBridge.toPublisher(flowPublisher);
return FlowAdapters.toPublisher(flowPublisher);
}
/**
* This is the main method you must implement in your test incarnation.
Expand All @@ -49,7 +49,7 @@ final public Publisher<T> createPublisher(long elements) {
final public Publisher<T> createFailedPublisher() {
final Flow.Publisher<T> failed = createFailedFlowPublisher();
if (failed == null) return null; // because `null` means "SKIP" in createFailedPublisher
else return ReactiveStreamsFlowBridge.toPublisher(failed);
else return FlowAdapters.toPublisher(failed);
}
/**
* By implementing this method, additional TCK tests concerning a "failed" publishers will be run.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package org.reactivestreams.tck.flow;

import org.reactivestreams.ReactiveStreamsFlowBridge;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberBlackboxVerification;
Expand Down Expand Up @@ -40,7 +40,7 @@ protected FlowSubscriberBlackboxVerification(TestEnvironment env) {

@Override
public final void triggerRequest(Subscriber<? super T> subscriber) {
triggerFlowRequest(ReactiveStreamsFlowBridge.toFlowSubscriber(subscriber));
triggerFlowRequest(FlowAdapters.toFlowSubscriber(subscriber));
}
/**
* Override this method if the {@link java.util.concurrent.Flow.Subscriber} implementation you are verifying
Expand All @@ -54,7 +54,7 @@ public void triggerFlowRequest(Flow.Subscriber<? super T> subscriber) {

@Override
public final Subscriber<T> createSubscriber() {
return ReactiveStreamsFlowBridge.<T>toSubscriber(createFlowSubscriber());
return FlowAdapters.<T>toSubscriber(createFlowSubscriber());
}
/**
* This is the main method you must implement in your test incarnation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package org.reactivestreams.tck.flow;

import org.reactivestreams.ReactiveStreamsFlowBridge;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Subscriber;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.reactivestreams.tck.TestEnvironment;
Expand All @@ -35,7 +35,7 @@ protected FlowSubscriberWhiteboxVerification(TestEnvironment env) {

@Override
final public Subscriber<T> createSubscriber(WhiteboxSubscriberProbe<T> probe) {
return ReactiveStreamsFlowBridge.toSubscriber(createFlowSubscriber(probe));
return FlowAdapters.toSubscriber(createFlowSubscriber(probe));
}
/**
* This is the main method you must implement in your test incarnation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package org.reactivestreams.tck.flow;

import org.reactivestreams.ReactiveStreamsFlowBridge;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
import java.util.concurrent.Flow.Publisher;

Expand Down Expand Up @@ -41,7 +41,7 @@ public EmptyLazyFlowPublisherTest() {

@Override
public Publisher<Integer> createFlowPublisher(long elements) {
return ReactiveStreamsFlowBridge.toFlowPublisher(
return FlowAdapters.toFlowPublisher(
new AsyncIterablePublisher<Integer>(Collections.<Integer>emptyList(), ex)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;

import org.reactivestreams.ReactiveStreamsFlowBridge;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
import org.reactivestreams.tck.TestEnvironment;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -42,7 +42,7 @@ public SingleElementFlowPublisherTest() {

@Override
public Flow.Publisher<Integer> createFlowPublisher(long elements) {
return ReactiveStreamsFlowBridge.toFlowPublisher(new AsyncIterablePublisher<Integer>(Collections.singleton(1), ex));
return FlowAdapters.toFlowPublisher(new AsyncIterablePublisher<Integer>(Collections.singleton(1), ex));
}

@Override
Expand Down

0 comments on commit a4abf38

Please sign in to comment.