Skip to content

Commit

Permalink
Convert tests and examples to compose() and as() form
Browse files Browse the repository at this point in the history
  • Loading branch information
rmichela committed Mar 8, 2018
1 parent 9d2b8cc commit 9c410b0
Show file tree
Hide file tree
Showing 18 changed files with 122 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public static void main(String[] args) throws Exception {
// Prompt the user for their name
console.println("Press ctrl+D to quit");
String author = console.readLine("Who are you? > ");
stub.postMessage(toMessage(author, author + " joined.")).subscribe();
toMessage(author, author + " joined.").compose(stub::postMessage).subscribe();

// Subscribe to incoming messages
Disposable chatSubscription = stub.getMessages(Single.just(Empty.getDefaultInstance())).subscribe(
Disposable chatSubscription = Single.just(Empty.getDefaultInstance()).as(stub::getMessages).subscribe(
message -> {
// Don't re-print our own messages
if (!message.getAuthor().equals(author)) {
Expand Down Expand Up @@ -71,7 +71,7 @@ public static void main(String[] args) throws Exception {

// Wait for a signal to exit, then clean up
done.await();
stub.postMessage(toMessage(author, author + " left.")).subscribe();
toMessage(author, author + " left.").compose(stub::postMessage).subscribe();
chatSubscription.dispose();
channel.shutdown();
channel.awaitTermination(1, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* Subscriber tests from the Reactive Streams Technology Compatibility Kit.
* https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck
*/
@SuppressWarnings("Duplicates")
public class ReactorGrpcSubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVerification<Message> {
public ReactorGrpcSubscriberWhiteboxVerificationTest() {
super(new TestEnvironment());
Expand Down
20 changes: 18 additions & 2 deletions rx-java/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,27 @@ After installing the plugin, RxGrpc service stubs will be generated along with y
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build());
Flowable<HelloResponse> resp = stub.sayHelloBothStream(req);
Flowable<HelloResponse> resp = req.compose(stub::sayHelloBothStream);
resp.subscribe(...);
```

## Context propagation
## Don't break the chain
Used on their own, the generated RxGrpc stub methods do not cleanly chain with other RxJava operators.
Using the `compose()` and `as()` methods of `Single` and `Flowable` are preferred over direct invocation.

#### One→One, Many→Many
```java
Single<HelloResponse> singleResponse = singleRequest.compose(stub::sayHello);
Flowable<HelloResponse> flowableResponse = flowableRequest.compose(stub::sayHelloBothStream);
```

#### One→Many, Many→One
```java
Single<HelloResponse> singleResponse = flowableRequest.as(stub::sayHelloRequestStream);
Flowable<HelloResponse> flowableResponse = singleRequest.as(stub::sayHelloResponseStream);
```

## gRPC Context propagation
Because the non-blocking nature of RX, RX-Java tends to switch between threads a lot.
GRPC stores its context in the Thread context and is therefore often lost when RX
switches threads. To solve this problem, you can add a hook that makes the Context
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static void tearDown() throws Exception {
public Publisher<Message> createPublisher(long elements) {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Flowable<Message> request = Flowable.range(0, (int)elements).map(this::toMessage);
Publisher<Message> publisher = stub.manyToMany(request);
Publisher<Message> publisher = request.compose(stub::manyToMany);

return publisher;
}
Expand All @@ -65,7 +65,7 @@ public Publisher<Message> createPublisher(long elements) {
public Publisher<Message> createFailedPublisher() {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Flowable<Message> request = Flowable.just(toMessage(TckService.KABOOM));
Publisher<Message> publisher = stub.manyToMany(request);
Publisher<Message> publisher = request.compose(stub::manyToMany);

return publisher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public long maxElementsFromPublisher() {
public Publisher<Message> createPublisher(long elements) {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Flowable<Message> request = Flowable.range(0, (int)elements).map(this::toMessage);
Single<Message> publisher = stub.manyToOne(request);
Single<Message> publisher = request.as(stub::manyToOne);

return publisher.toFlowable();
}
Expand All @@ -71,7 +71,7 @@ public Publisher<Message> createPublisher(long elements) {
public Publisher<Message> createFailedPublisher() {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Flowable<Message> request = Flowable.just(toMessage(TckService.KABOOM));
Single<Message> publisher = stub.manyToOne(request);
Single<Message> publisher = request.as(stub::manyToOne);

return publisher.toFlowable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static void tearDown() throws Exception {
public Publisher<Message> createPublisher(long elements) {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Single<Message> request = Single.just(toMessage((int) elements));
Publisher<Message> publisher = stub.oneToMany(request);
Publisher<Message> publisher = request.as(stub::oneToMany);

return publisher;
}
Expand All @@ -65,7 +65,7 @@ public Publisher<Message> createPublisher(long elements) {
public Publisher<Message> createFailedPublisher() {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Single<Message> request = Single.just(toMessage(TckService.KABOOM));
Publisher<Message> publisher = stub.oneToMany(request);
Publisher<Message> publisher = request.as(stub::oneToMany);

return publisher;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public long maxElementsFromPublisher() {
public Publisher<Message> createPublisher(long elements) {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Single<Message> request = Single.just(toMessage((int) elements));
Single<Message> publisher = stub.oneToOne(request);
Single<Message> publisher = request.compose(stub::oneToOne);

return publisher.toFlowable();
}
Expand All @@ -70,7 +70,7 @@ public Publisher<Message> createPublisher(long elements) {
public Publisher<Message> createFailedPublisher() {
RxTckGrpc.RxTckStub stub = RxTckGrpc.newRxStub(channel);
Single<Message> request = Single.just(toMessage(TckService.KABOOM));
Single<Message> publisher = stub.oneToOne(request);
Single<Message> publisher = request.compose(stub::oneToOne);

return publisher.toFlowable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* Subscriber tests from the Reactive Streams Technology Compatibility Kit.
* https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck
*/
@SuppressWarnings("Duplicates")
public class RxGrpcSubscriberWhiteboxVerificationTest extends SubscriberWhiteboxVerification<Message> {
public RxGrpcSubscriberWhiteboxVerificationTest() {
super(new TestEnvironment());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static void stopServer() throws InterruptedException {
}

@Test
public void clientToServerBackpressure() throws InterruptedException {
public void clientToServerBackpressure() {
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel);

Flowable<NumberProto.Number> rxRequest = Flowable
Expand All @@ -104,7 +104,7 @@ public void clientToServerBackpressure() throws InterruptedException {
.doOnNext(i -> updateNumberOfWaits(lastValueTime, numberOfWaits))
.map(BackpressureIntegrationTest::protoNum);

TestObserver<NumberProto.Number> rxResponse = stub.requestPressure(rxRequest).test();
TestObserver<NumberProto.Number> rxResponse = rxRequest.as(stub::requestPressure).test();

rxResponse.awaitTerminalEvent(5, TimeUnit.SECONDS);
rxResponse.assertComplete()
Expand All @@ -114,12 +114,12 @@ public void clientToServerBackpressure() throws InterruptedException {
}

@Test
public void serverToClientBackpressure() throws InterruptedException {
public void serverToClientBackpressure() {
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel);

Single<Empty> rxRequest = Single.just(Empty.getDefaultInstance());

TestSubscriber<NumberProto.Number> rxResponse = stub.responsePressure(rxRequest)
TestSubscriber<NumberProto.Number> rxResponse = rxRequest.as(stub::responsePressure)
.doOnNext(n -> System.out.println(n.getNumber(0) + " <--"))
.doOnNext(n -> waitIfValuesAreEqual(n.getNumber(0), 3))
.test();
Expand All @@ -132,10 +132,11 @@ public void serverToClientBackpressure() throws InterruptedException {
}

@Test
public void bidiResponseBackpressure() throws InterruptedException {
public void bidiResponseBackpressure() {
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel);

TestSubscriber<NumberProto.Number> rxResponse = stub.twoWayResponsePressure(Flowable.empty())
TestSubscriber<NumberProto.Number> rxResponse = Flowable.<NumberProto.Number>empty()
.compose(stub::twoWayResponsePressure)
.doOnNext(n -> System.out.println(n.getNumber(0) + " <--"))
.doOnNext(n -> waitIfValuesAreEqual(n.getNumber(0), 3))
.test();
Expand All @@ -148,7 +149,7 @@ public void bidiResponseBackpressure() throws InterruptedException {
}

@Test
public void bidiRequestBackpressure() throws InterruptedException {
public void bidiRequestBackpressure() {
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel);

Flowable<NumberProto.Number> rxRequest = Flowable
Expand All @@ -157,7 +158,7 @@ public void bidiRequestBackpressure() throws InterruptedException {
.doOnNext(i -> updateNumberOfWaits(lastValueTime, numberOfWaits))
.map(BackpressureIntegrationTest::protoNum);

TestSubscriber<NumberProto.Number> rxResponse = stub.twoWayRequestPressure(rxRequest).test();
TestSubscriber<NumberProto.Number> rxResponse = rxRequest.compose(stub::twoWayRequestPressure).test();

rxResponse.awaitTerminalEvent(5, TimeUnit.SECONDS);
rxResponse.assertComplete()
Expand All @@ -180,7 +181,7 @@ private static void waitIfValuesAreEqual(int value, int other) {
if (value == other) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
} catch (InterruptedException e) {
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ private static class TestService extends RxNumbersGrpc.NumbersImplBase {
private AtomicBoolean wasCanceled = new AtomicBoolean(false);
private AtomicBoolean explicitCancel = new AtomicBoolean(false);

public void reset() {
void reset() {
lastNumberProduced.set(Integer.MIN_VALUE);
wasCanceled.set(false);
explicitCancel.set(false);
}

public int getLastNumberProduced() {
int getLastNumberProduced() {
return lastNumberProduced.get();
}

public boolean wasCanceled() {
boolean wasCanceled() {
return wasCanceled.get();
}

public void setExplicitCancel(boolean explicitCancel) {
void setExplicitCancel(boolean explicitCancel) {
this.explicitCancel.set(explicitCancel);
}

Expand Down Expand Up @@ -124,8 +124,8 @@ public static void stopServer() throws InterruptedException {
@Test
public void clientCanCancelServerStreamExplicitly() throws InterruptedException {
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel);
TestSubscriber<NumberProto.Number> subscription = stub
.responsePressure(Single.just(Empty.getDefaultInstance()))
TestSubscriber<NumberProto.Number> subscription = Single.just(Empty.getDefaultInstance())
.as(stub::responsePressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
Expand All @@ -145,8 +145,8 @@ public void clientCanCancelServerStreamExplicitly() throws InterruptedException
@Test
public void clientCanCancelServerStreamImplicitly() throws InterruptedException {
RxNumbersGrpc.RxNumbersStub stub = RxNumbersGrpc.newRxStub(channel);
TestSubscriber<NumberProto.Number> subscription = stub
.responsePressure(Single.just(Empty.getDefaultInstance()))
TestSubscriber<NumberProto.Number> subscription = Single.just(Empty.getDefaultInstance())
.as(stub::responsePressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.doOnComplete(() -> System.out.println("Completed"))
Expand Down Expand Up @@ -186,8 +186,8 @@ public void serverCanCancelClientStreamImplicitly() {
System.out.println("Client canceled");
});

TestObserver<NumberProto.Number> observer = stub
.requestPressure(request)
TestObserver<NumberProto.Number> observer = request
.as(stub::requestPressure)
.doOnSuccess(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.test();
Expand Down Expand Up @@ -222,8 +222,8 @@ public void serverCanCancelClientStreamExplicitly() {
System.out.println("Client canceled");
});

TestObserver<NumberProto.Number> observer = stub
.requestPressure(request)
TestObserver<NumberProto.Number> observer = request
.as(stub::requestPressure)
.doOnSuccess(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.test();
Expand Down Expand Up @@ -257,8 +257,8 @@ public void serverCanCancelClientStreamImplicitlyBidi() {
System.out.println("Client canceled");
});

TestSubscriber<NumberProto.Number> observer = stub
.twoWayPressure(request)
TestSubscriber<NumberProto.Number> observer = request
.compose(stub::twoWayPressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.test();
Expand Down Expand Up @@ -291,8 +291,8 @@ public void serverCanCancelClientStreamExplicitlyBidi() {
System.out.println("Client canceled");
});

TestSubscriber<NumberProto.Number> observer = stub
.twoWayPressure(request)
TestSubscriber<NumberProto.Number> observer = request
.compose(stub::twoWayPressure)
.doOnNext(number -> System.out.println(number.getNumber(0)))
.doOnError(throwable -> System.out.println(throwable.getMessage()))
.test();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,30 @@ public void stopServer() throws InterruptedException {
public void servicesCanCallOtherServices() throws InterruptedException {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);

Single<HelloRequest> input = Single.just(request("X"));
Single<HelloRequest> one = stub.sayHello(input)
Single<String> chain = Single.just(request("X"))
// one -> one
.compose(stub::sayHello)
.map(ChainedCallIntegrationTest::bridge)
.doOnSuccess(System.out::println);
Flowable<HelloRequest> two = stub.sayHelloRespStream(one)
.doOnSuccess(System.out::println)
// one -> many
.as(stub::sayHelloRespStream)
.map(ChainedCallIntegrationTest::bridge)
.doOnNext(System.out::println);
Flowable<HelloRequest> three = stub.sayHelloBothStream(two)
.doOnNext(System.out::println)
// many -> many
.compose(stub::sayHelloBothStream)
.map(ChainedCallIntegrationTest::bridge)
.doOnNext(System.out::println);
Single<HelloRequest> four = stub.sayHelloReqStream(three)
.doOnNext(System.out::println)
// many -> one
.as(stub::sayHelloReqStream)
.map(ChainedCallIntegrationTest::bridge)
.doOnSuccess(System.out::println);
Single<String> five = stub.sayHello(four)
.doOnSuccess(System.out::println)
// one -> one
.compose(stub::sayHello)
.map(HelloResponse::getMessage)
.doOnSuccess(System.out::println);

TestObserver<String> test = five.test();

TestObserver<String> test = chain.test();

test.awaitTerminalEvent(2, TimeUnit.SECONDS);
test.assertComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void stopServer() throws InterruptedException {
public void oneToOne() {
RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
Single<HelloRequest> req = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
Single<HelloResponse> resp = stub.sayHello(req);
Single<HelloResponse> resp = req.compose(stub::sayHello);

AtomicReference<String> clientThreadName = new AtomicReference<>();

Expand All @@ -119,7 +119,7 @@ public void manyToMany() {

AtomicReference<String> clientThreadName = new AtomicReference<>();

Flowable<HelloResponse> resp = stub.sayHelloBothStream(req);
Flowable<HelloResponse> resp = req.compose(stub::sayHelloBothStream);

TestSubscriber<String> testSubscriber = resp
.map(HelloResponse::getMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,19 @@ public void fourKindsOfRequestAtOnce() throws Exception {
// == MAKE REQUESTS ==
// One to One
Single<HelloRequest> req1 = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
Single<HelloResponse> resp1 = stub.sayHello(req1);
Single<HelloResponse> resp1 = req1.compose(stub::sayHello);

// One to Many
Single<HelloRequest> req2 = Single.just(HelloRequest.newBuilder().setName("rxjava").build());
Flowable<HelloResponse> resp2 = stub.sayHelloRespStream(req2);
Flowable<HelloResponse> resp2 = req2.as(stub::sayHelloRespStream);

// Many to One
Flowable<HelloRequest> req3 = Flowable.just(
HelloRequest.newBuilder().setName("a").build(),
HelloRequest.newBuilder().setName("b").build(),
HelloRequest.newBuilder().setName("c").build());

Single<HelloResponse> resp3 = stub.sayHelloReqStream(req3);
Single<HelloResponse> resp3 = req3.as(stub::sayHelloReqStream);

// Many to Many
Flowable<HelloRequest> req4 = Flowable.just(
Expand All @@ -126,7 +126,7 @@ public void fourKindsOfRequestAtOnce() throws Exception {
HelloRequest.newBuilder().setName("d").build(),
HelloRequest.newBuilder().setName("e").build());

Flowable<HelloResponse> resp4 = stub.sayHelloBothStream(req4);
Flowable<HelloResponse> resp4 = req4.compose(stub::sayHelloBothStream);

// == VERIFY RESPONSES ==
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
Expand Down
Loading

0 comments on commit 9c410b0

Please sign in to comment.