diff --git a/checkstyle-header.txt b/checkstyle-header.txt
new file mode 100644
index 00000000..35348526
--- /dev/null
+++ b/checkstyle-header.txt
@@ -0,0 +1,6 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
\ No newline at end of file
diff --git a/checkstyle.xml b/checkstyle.xml
index b2babde5..2139a517 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -43,8 +43,7 @@
-
+
diff --git a/demos/RxChat/RxChat-Client/pom.xml b/demos/RxChat/RxChat-Client/pom.xml
deleted file mode 100644
index 0ad9fb47..00000000
--- a/demos/RxChat/RxChat-Client/pom.xml
+++ /dev/null
@@ -1,49 +0,0 @@
-
-
-
- RxChat
- com.salesforce.servicelibs
- 0.6.2-SNAPSHOT
-
- 4.0.0
-
- RxChat-Client
-
-
-
- jline
- jline
- 2.14.4
-
-
-
-
-
-
- maven-assembly-plugin
-
-
-
- com.salesforce.servicelibs.ChatClient
-
-
-
- jar-with-dependencies
-
- false
-
-
-
- make-assembly
- package
-
- single
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/demos/RxChat/RxChat-Client/src/main/java/com/salesforce/servicelibs/ChatClient.java b/demos/RxChat/RxChat-Client/src/main/java/com/salesforce/servicelibs/ChatClient.java
deleted file mode 100644
index 8c97168f..00000000
--- a/demos/RxChat/RxChat-Client/src/main/java/com/salesforce/servicelibs/ChatClient.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.servicelibs;
-
-import com.google.protobuf.Empty;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
-import io.reactivex.Observable;
-import io.reactivex.Single;
-import io.reactivex.disposables.Disposable;
-import jline.console.ConsoleReader;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static com.salesforce.servicelibs.ConsoleUtil.*;
-
-/**
- * Demonstrates building a gRPC streaming client using RxJava and RxGrpc.
- */
-public final class ChatClient {
- private static final int PORT = 9999;
-
- private ChatClient() { }
-
- public static void main(String[] args) throws Exception {
- // Connect to the sever
- ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", PORT).usePlaintext(true).build();
- RxChatGrpc.RxChatStub stub = RxChatGrpc.newRxStub(channel);
-
- CountDownLatch done = new CountDownLatch(1);
- ConsoleReader console = new ConsoleReader();
-
- // Prompt the user for their name
- console.println("Press ctrl+D to quit");
- String author = console.readLine("Who are you? > ");
- stub.postMessage(toMessage(author, author + " joined.")).subscribe();
-
- // Subscribe to incoming messages
- Disposable chatSubscription = stub.getMessages(Single.just(Empty.getDefaultInstance())).subscribe(
- message -> {
- // Don't re-print our own messages
- if (!message.getAuthor().equals(author)) {
- printLine(console, message.getAuthor(), message.getMessage());
- }
- },
- throwable -> {
- printLine(console, "ERROR", throwable.getMessage());
- done.countDown();
- },
- done::countDown
- );
-
- // Publish outgoing messages
- Observable.fromIterable(new ConsoleIterator(console, author + " > "))
- .map(msg -> toMessage(author, msg))
- .flatMapSingle(stub::postMessage)
- .subscribe(
- empty -> { },
- throwable -> {
- printLine(console, "ERROR", throwable.getMessage());
- done.countDown();
- },
- done::countDown
- );
-
- // Wait for a signal to exit, then clean up
- done.await();
- stub.postMessage(toMessage(author, author + " left.")).subscribe();
- chatSubscription.dispose();
- channel.shutdown();
- channel.awaitTermination(1, TimeUnit.SECONDS);
- console.getTerminal().restore();
- }
-
- private static Single toMessage(String author, String message) {
- return Single.just(
- ChatProto.ChatMessage.newBuilder()
- .setAuthor(author)
- .setMessage(message)
- .build()
- );
- }
-}
diff --git a/demos/RxChat/RxChat-Client/src/main/java/com/salesforce/servicelibs/ConsoleIterator.java b/demos/RxChat/RxChat-Client/src/main/java/com/salesforce/servicelibs/ConsoleIterator.java
deleted file mode 100644
index a3db7ea8..00000000
--- a/demos/RxChat/RxChat-Client/src/main/java/com/salesforce/servicelibs/ConsoleIterator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.servicelibs;
-
-import jline.console.ConsoleReader;
-
-import javax.annotation.Nonnull;
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * Adapts jLine to Iterator.
- */
-public class ConsoleIterator implements Iterable, Iterator {
- private ConsoleReader console;
- private String prompt;
- private String lastLine;
-
- ConsoleIterator(ConsoleReader console, String prompt) {
- this.console = console;
- this.prompt = prompt;
- }
-
- @Override
- @Nonnull
- public Iterator iterator() {
- return this;
- }
-
- @Override
- public boolean hasNext() {
- try {
- lastLine = console.readLine(prompt);
- return lastLine != null;
- } catch (IOException e) {
- return false;
- }
- }
-
- @Override
- public String next() {
- return lastLine;
- }
-}
diff --git a/demos/RxChat/RxChat-Client/src/main/java/com/salesforce/servicelibs/ConsoleUtil.java b/demos/RxChat/RxChat-Client/src/main/java/com/salesforce/servicelibs/ConsoleUtil.java
deleted file mode 100644
index a5d8ce0b..00000000
--- a/demos/RxChat/RxChat-Client/src/main/java/com/salesforce/servicelibs/ConsoleUtil.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.servicelibs;
-
-import jline.console.ConsoleReader;
-import jline.console.CursorBuffer;
-
-import java.io.IOException;
-
-/**
- * Utility methods for working with jLine.
- */
-public final class ConsoleUtil {
- private ConsoleUtil() { }
-
- public static void printLine(ConsoleReader console, String author, String message) throws IOException {
- CursorBuffer stashed = stashLine(console);
- console.println(author + " > " + message);
- unstashLine(console, stashed);
- console.flush();
- }
-
- public static CursorBuffer stashLine(ConsoleReader console) {
- CursorBuffer stashed = console.getCursorBuffer().copy();
- try {
- console.getOutput().write("\u001b[1G\u001b[K");
- console.flush();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return stashed;
- }
-
-
- public static void unstashLine(ConsoleReader console, CursorBuffer stashed) {
- try {
- console.resetPromptLine(console.getPrompt(), stashed.toString(), stashed.cursor);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-}
diff --git a/demos/RxChat/RxChat-Client/src/main/proto/Chat.proto b/demos/RxChat/RxChat-Client/src/main/proto/Chat.proto
deleted file mode 100644
index c1be978b..00000000
--- a/demos/RxChat/RxChat-Client/src/main/proto/Chat.proto
+++ /dev/null
@@ -1,20 +0,0 @@
-syntax = "proto3";
-
-package com.salesforce.servicelibs;
-
-option java_package = "com.salesforce.servicelibs";
-option java_outer_classname = "ChatProto";
-
-import "google/protobuf/empty.proto";
-import "google/protobuf/timestamp.proto";
-
-service Chat {
- rpc PostMessage (ChatMessage) returns (google.protobuf.Empty) {}
- rpc GetMessages (google.protobuf.Empty) returns (stream ChatMessage) {}
-}
-
-message ChatMessage {
- google.protobuf.Timestamp when = 1;
- string author = 2;
- string message = 3;
-}
\ No newline at end of file
diff --git a/demos/RxChat/RxChat-Client/src/main/resources/application.properties b/demos/RxChat/RxChat-Client/src/main/resources/application.properties
deleted file mode 100644
index f5f7e593..00000000
--- a/demos/RxChat/RxChat-Client/src/main/resources/application.properties
+++ /dev/null
@@ -1,2 +0,0 @@
-host=localhost
-port=9999
\ No newline at end of file
diff --git a/demos/RxChat/RxChat-Server/pom.xml b/demos/RxChat/RxChat-Server/pom.xml
deleted file mode 100644
index 69743f1a..00000000
--- a/demos/RxChat/RxChat-Server/pom.xml
+++ /dev/null
@@ -1,66 +0,0 @@
-
-
-
- RxChat
- com.salesforce.servicelibs
- 0.6.2-SNAPSHOT
-
- 4.0.0
-
- RxChat-Server
-
-
-
-
-
- org.springframework.boot
- spring-boot-dependencies
- 1.4.2.RELEASE
- pom
- import
-
-
-
-
-
-
- com.salesforce.servicelibs
- grpc-spring
-
-
- org.springframework.boot
- spring-boot
-
-
- org.springframework.boot
- spring-boot-autoconfigure
-
-
- org.springframework.boot
- spring-boot-starter-logging
-
-
- org.springframework
- spring-context
-
-
-
-
-
-
- org.springframework.boot
- spring-boot-maven-plugin
- 1.5.4.RELEASE
-
-
-
- repackage
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/demos/RxChat/RxChat-Server/src/main/java/com/salesforce/servicelibs/ChatImpl.java b/demos/RxChat/RxChat-Server/src/main/java/com/salesforce/servicelibs/ChatImpl.java
deleted file mode 100644
index b7f508ea..00000000
--- a/demos/RxChat/RxChat-Server/src/main/java/com/salesforce/servicelibs/ChatImpl.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.servicelibs;
-
-import com.google.protobuf.Empty;
-import com.salesforce.grpc.contrib.spring.GrpcService;
-import io.reactivex.BackpressureStrategy;
-import io.reactivex.Flowable;
-import io.reactivex.Single;
-import io.reactivex.subjects.PublishSubject;
-import io.reactivex.subjects.Subject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Demonstrates building a gRPC streaming server using RxJava and RxGrpc.
- */
-@GrpcService
-public class ChatImpl extends RxChatGrpc.ChatImplBase {
- private final Logger logger = LoggerFactory.getLogger(ChatImpl.class);
- private final Subject broadcast = PublishSubject.create();
-
- @Override
- public Single postMessage(Single request) {
- return request.map(this::broadcast);
- }
-
- private Empty broadcast(ChatProto.ChatMessage message) {
- logger.info(message.getAuthor() + ": " + message.getMessage());
- broadcast.onNext(message);
- return Empty.getDefaultInstance();
- }
-
- @Override
- public Flowable getMessages(Single request) {
- return broadcast.toFlowable(BackpressureStrategy.BUFFER);
- }
-}
diff --git a/demos/RxChat/RxChat-Server/src/main/java/com/salesforce/servicelibs/ChatServer.java b/demos/RxChat/RxChat-Server/src/main/java/com/salesforce/servicelibs/ChatServer.java
deleted file mode 100644
index b52492ab..00000000
--- a/demos/RxChat/RxChat-Server/src/main/java/com/salesforce/servicelibs/ChatServer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.servicelibs;
-
-import com.salesforce.grpc.contrib.spring.GrpcServerHost;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.Bean;
-
-/**
- * Demonstrates building a gRPC streaming server using RxJava, RxGrpc, grpc-spring, and Spring Boot.
- */
-@SpringBootApplication
-public class ChatServer {
- private final Logger logger = LoggerFactory.getLogger(ChatServer.class);
-
- public static void main(String[] args) throws Exception {
- SpringApplication.run(ChatServer.class, args);
- Thread.currentThread().join();
- }
-
- @Bean(initMethod = "start")
- public GrpcServerHost grpcServerHost(@Value("${port}") int port) {
- logger.info("Listening for gRPC on port " + port);
- GrpcServerHost host = new GrpcServerHost(port);
- return host;
- }
-
- @Bean
- public ChatGrpc.ChatImplBase chatImpl() {
- return new ChatImpl();
- }
-}
diff --git a/demos/RxChat/RxChat-Server/src/main/proto/Chat.proto b/demos/RxChat/RxChat-Server/src/main/proto/Chat.proto
deleted file mode 100644
index c1be978b..00000000
--- a/demos/RxChat/RxChat-Server/src/main/proto/Chat.proto
+++ /dev/null
@@ -1,20 +0,0 @@
-syntax = "proto3";
-
-package com.salesforce.servicelibs;
-
-option java_package = "com.salesforce.servicelibs";
-option java_outer_classname = "ChatProto";
-
-import "google/protobuf/empty.proto";
-import "google/protobuf/timestamp.proto";
-
-service Chat {
- rpc PostMessage (ChatMessage) returns (google.protobuf.Empty) {}
- rpc GetMessages (google.protobuf.Empty) returns (stream ChatMessage) {}
-}
-
-message ChatMessage {
- google.protobuf.Timestamp when = 1;
- string author = 2;
- string message = 3;
-}
\ No newline at end of file
diff --git a/demos/RxChat/RxChat-Server/src/main/resources/application.properties b/demos/RxChat/RxChat-Server/src/main/resources/application.properties
deleted file mode 100644
index 87f1d8f0..00000000
--- a/demos/RxChat/RxChat-Server/src/main/resources/application.properties
+++ /dev/null
@@ -1 +0,0 @@
-port=9999
\ No newline at end of file
diff --git a/demos/RxChat/pom.xml b/demos/RxChat/pom.xml
deleted file mode 100644
index 66adf6e9..00000000
--- a/demos/RxChat/pom.xml
+++ /dev/null
@@ -1,102 +0,0 @@
-
-
-
-
-
- grpc-contrib-parent
- com.salesforce.servicelibs
- 0.6.2-SNAPSHOT
- ../../pom.xml
-
- 4.0.0
-
- RxChat
- pom
-
-
- true
- true
-
-
-
- RxChat-Client
- RxChat-Server
-
-
-
-
- io.grpc
- grpc-netty
-
-
- com.salesforce.servicelibs
- rxgrpc
- provided
-
-
- com.salesforce.servicelibs
- rxgrpc-stub
-
-
- com.salesforce.servicelibs
- grpc-contrib
-
-
-
-
-
-
- kr.motd.maven
- os-maven-plugin
- 1.4.1.Final
-
-
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- 3.6.1
-
- 1.8
- 1.8
-
-
-
- org.xolstice.maven.plugins
- protobuf-maven-plugin
- 0.5.0
-
- com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
- grpc-java
- io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
-
-
-
-
- compile
- compile-custom
-
-
-
-
- rxgrpc
- com.salesforce.servicelibs
- rxgrpc
- ${project.version}
- com.salesforce.rxgrpc.RxGrpcGenerator
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 4e9bd24f..08815f6d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,11 +52,6 @@
grpc-springjprotoc-testdemos/grpc-java-contrib-demo
- demos/RxChat
- rxgrpc/rxgrpc
- rxgrpc/rxgrpc-test
- rxgrpc/rxgrpc-stub
- rxgrpc/rxgrpc-tck
diff --git a/rxgrpc/README.md b/rxgrpc/README.md
deleted file mode 100644
index f74a7939..00000000
--- a/rxgrpc/README.md
+++ /dev/null
@@ -1,88 +0,0 @@
-Overview
-========
-RxGrpc is a new set of gRPC bindings for reactive programming with [RxJava](https://github.com/ReactiveX/RxJava).
-RxGprc bindings support unary and streaming operations in both directions. RxGrpc also builds on top of gRPC's
-back-pressure support, to deliver end-to-end back-pressure-based flow control in line with RxJava's `Flowable`
-back-pressure model.
-
-Usage
-=====
-To use RxGrpc with the `protobuf-maven-plugin`, add a [custom protoc plugin configuration section](https://www.xolstice.org/protobuf-maven-plugin/examples/protoc-plugin.html).
-```xml
-
-
- rxgrpc
- com.salesforce.servicelibs
- rxgrpc
- [VERSION]
- com.salesforce.rxgrpc.RxGrpcGenerator
-
-
-```
-
-After installing the plugin, RxGrpc service stubs will be generated along with your gRPC service stubs.
-
-* To implement a service using an RxGrpc service, subclass `Rx[Name]Grpc.[Name]ImplBase` and override the RxJava-based
- methods.
-
- ```
- GreeterGrpc.GreeterImplBase svc = new RxGreeterGrpc.GreeterImplBase() {
- @Override
- public Single sayHello(Single rxRequest) {
- return rxRequest.map(protoRequest -> greet("Hello", protoRequest));
- }
-
- ...
-
- @Override
- public Flowable sayHelloBothStream(Flowable rxRequest) {
- return rxRequest
- .map(HelloRequest::getName)
- .buffer(2)
- .map(names -> greet("Hello", String.join(" and ", names)));
- }
- };
- ```
-* To call a service using an RxGrpc client, call `Rx[Name]Grpc.newRxStub(Channel channel)`.
-
- ```
- RxGreeterGrpc.RxGreeterStub stub = RxGreeterGrpc.newRxStub(channel);
- Flowable req = Flowable.just(
- HelloRequest.newBuilder().setName("a").build(),
- HelloRequest.newBuilder().setName("b").build(),
- HelloRequest.newBuilder().setName("c").build());
- Flowable resp = stub.sayHelloBothStream(req);
- resp.subscribe(...);
- ```
-
-Back-pressure
-=============
-RxGrpc stubs support bi-directional streaming with back-pressure. Under the hood, RxGrpc is built atop the vanilla
-gRPC service stubs generated by protoc. As such, they inherit gRPC's HTTP/2-based back-pressure model.
-
-Internally, gRPC and RxGrpc implement a pull-based back-pressure strategy. At the HTTP/2 layer, gRPC maintains a
-buffer of serialized protocol buffer messages. As frames are consumed on the consumer side, the producer is signaled
-to transmit more frames. If this producer-side transmit buffer fills, the HTTP/2 layer signals to the gRPC messaging
-layer to stop producing new messages in the stream. RxGrpc handles this signal, applying back-pressure to RxJava
-using the `Flowable` api. RxGrpc also implements `Flowable` back-pressure on the consumer side of a stream. As messages
-are consumed by the consumer-side `Flowable`, signals are sent down through gRPC and HTTP/2 to request more data.
-
-An example of back-pressure in action can be found in `BackpressureIntegrationTest.java`.
-
-Exception Handling
-==============
-Exception handling with RxGrpc is a little strange due to the way gRPC deals with errors. Servers that produce an error
-by calling `onError(Throwable)` will terminate the call with a `StatusRuntimeException`. The client will have its
-`onError(Throwable)` subscription handler called as expected.
-
-Exceptions going from client to server are a little less predictable. Depending on the timing, gRPC may cancel
-the request before sending any messages due to an exception in the outbound stream.
-
-Modules
-=======
-
-RxGrpc is broken down into three sub-modules:
-
-* _rxgrpc_ - a protoc generator for generating gRPC bindings for RxJava.
-* _rxgrpc-stub_ - stub classes supporting the generated RxGrpc bindings.
-* _rxgrpc-test_ - integration tests for RxGrpc.
diff --git a/rxgrpc/rxgrpc-stub/README.md b/rxgrpc/rxgrpc-stub/README.md
deleted file mode 100644
index 3c4f3d16..00000000
--- a/rxgrpc/rxgrpc-stub/README.md
+++ /dev/null
@@ -1,18 +0,0 @@
-[](https://maven-badges.herokuapp.com/maven-central/com.salesforce.servicelibs/rxgrpc-stub)
-
-Usage
-=====
-```xml
-
-
- ...
-
-
-
- com.salesforce.servicelibs
- rxgrpc-stub
- [VERSION]
-
-
-
-```
\ No newline at end of file
diff --git a/rxgrpc/rxgrpc-stub/pom.xml b/rxgrpc/rxgrpc-stub/pom.xml
deleted file mode 100644
index 57b273c5..00000000
--- a/rxgrpc/rxgrpc-stub/pom.xml
+++ /dev/null
@@ -1,65 +0,0 @@
-
-
-
-
-
- grpc-contrib-parent
- com.salesforce.servicelibs
- 0.6.2-SNAPSHOT
- ../../pom.xml
-
- 4.0.0
-
- rxgrpc-stub
-
-
-
- com.salesforce.servicelibs
- grpc-contrib
-
-
- io.reactivex.rxjava2
- rxjava
- 2.1.0
-
-
- io.grpc
- grpc-stub
-
-
- junit
- junit
- test
-
-
- org.assertj
- assertj-core
- test
-
-
- org.mockito
- mockito-core
- test
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-checkstyle-plugin
-
- ../../checkstyle.xml
- ../../checkstyle_ignore.xml
-
-
-
-
-
\ No newline at end of file
diff --git a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/CancellableStreamObserver.java b/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/CancellableStreamObserver.java
deleted file mode 100644
index 41a7b53f..00000000
--- a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/CancellableStreamObserver.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.rxgrpc.stub;
-
-import io.grpc.Status;
-import io.grpc.StatusException;
-import io.grpc.StatusRuntimeException;
-import io.grpc.stub.ClientCallStreamObserver;
-import io.grpc.stub.ClientResponseObserver;
-
-/**
- * CancellableStreamObserver wraps a {@link io.grpc.stub.StreamObserver} and invokes an onCanceledHandler if
- * {@link io.grpc.stub.StreamObserver#onError(Throwable)} is invoked with a {@link StatusException} or
- * {@link StatusRuntimeException} of type {@link Status.Code#CANCELLED}. This class is used to hook gRPC server
- * cancellation events.
- *
- * @param
- * @param
- */
-public class CancellableStreamObserver implements ClientResponseObserver {
- private final ClientResponseObserver delegate;
- private final Runnable onCanceledHandler;
-
- public CancellableStreamObserver(ClientResponseObserver delegate, Runnable onCanceledHandler) {
- this.delegate = delegate;
- this.onCanceledHandler = onCanceledHandler;
- }
-
- @Override
- public void onNext(TResponse value) {
- delegate.onNext(value);
- }
-
- @Override
- public void onError(Throwable t) {
- if (t instanceof StatusException) {
- if (((StatusException) t).getStatus().getCode() == Status.Code.CANCELLED) {
- onCanceledHandler.run();
- }
- }
- if (t instanceof StatusRuntimeException) {
- if (((StatusRuntimeException) t).getStatus().getCode() == Status.Code.CANCELLED) {
- onCanceledHandler.run();
- }
- }
- delegate.onError(t);
- }
-
- @Override
- public void onCompleted() {
- delegate.onCompleted();
- }
-
- @Override
- public void beforeStart(ClientCallStreamObserver requestStream) {
- delegate.beforeStart(requestStream);
- }
-}
diff --git a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/ClientCalls.java b/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/ClientCalls.java
deleted file mode 100644
index 8fa097ab..00000000
--- a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/ClientCalls.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.rxgrpc.stub;
-
-import com.google.common.util.concurrent.Runnables;
-import com.salesforce.grpc.contrib.LambdaStreamObserver;
-import io.grpc.stub.StreamObserver;
-import io.reactivex.Flowable;
-import io.reactivex.Single;
-
-import java.util.function.BiConsumer;
-import java.util.function.Function;
-
-/**
- * Utility functions for processing different client call idioms. We have one-to-one correspondence
- * between utilities in this class and the potential signatures in a generated stub client class so
- * that the runtime can vary behavior without requiring regeneration of the stub.
- */
-public final class ClientCalls {
- private ClientCalls() {
-
- }
-
- /**
- * Implements a unary -> unary call using {@link Single} -> {@link Single}.
- */
- public static Single oneToOne(
- Single rxRequest,
- BiConsumer> delegate) {
- try {
- return Single
- .create(emitter -> rxRequest.subscribe(
- request -> delegate.accept(request, new LambdaStreamObserver(
- emitter::onSuccess,
- emitter::onError,
- Runnables.doNothing()
- )),
- emitter::onError
- ))
- .lift(new SubscribeOnlyOnceSingleOperator<>());
- } catch (Throwable throwable) {
- return Single.error(throwable);
- }
- }
-
- /**
- * Implements a unary -> stream call as {@link Single} -> {@link Flowable}, where the server responds with a
- * stream of messages.
- */
- public static Flowable oneToMany(
- Single rxRequest,
- BiConsumer> delegate) {
- try {
- RxConsumerStreamObserver consumerStreamObserver = new RxConsumerStreamObserver<>();
- rxRequest.subscribe(request -> delegate.accept(request, consumerStreamObserver));
- return consumerStreamObserver
- .getRxConsumer()
- .lift(new SubscribeOnlyOnceFlowableOperator<>());
- } catch (Throwable throwable) {
- return Flowable.error(throwable);
- }
- }
-
- /**
- * Implements a stream -> unary call as {@link Flowable} -> {@link Single}, where the client transits a stream of
- * messages.
- */
- public static Single manyToOne(
- Flowable rxRequest,
- Function, StreamObserver> delegate) {
- try {
- return Single
- .create(emitter -> {
- RxProducerStreamObserver rxProducerStreamObserver = new RxProducerStreamObserver<>(
- rxRequest,
- emitter::onSuccess,
- emitter::onError,
- Runnables.doNothing());
- delegate.apply(
- new CancellableStreamObserver<>(rxProducerStreamObserver,
- rxProducerStreamObserver::cancel));
- rxProducerStreamObserver.rxSubscribe();
- }).lift(new SubscribeOnlyOnceSingleOperator<>());
- } catch (Throwable throwable) {
- return Single.error(throwable);
- }
- }
-
- /**
- * Implements a bidirectional stream -> stream call as {@link Flowable} -> {@link Flowable}, where both the client
- * and the server independently stream to each other.
- */
- public static Flowable manyToMany(
- Flowable rxRequest,
- Function, StreamObserver> delegate) {
- try {
- RxProducerConsumerStreamObserver consumerStreamObserver = new RxProducerConsumerStreamObserver<>(rxRequest);
- delegate.apply(new CancellableStreamObserver<>(consumerStreamObserver, consumerStreamObserver::cancel));
- consumerStreamObserver.rxSubscribe();
- return consumerStreamObserver
- .getRxConsumer()
- .lift(new SubscribeOnlyOnceFlowableOperator<>());
- } catch (Throwable throwable) {
- return Flowable.error(throwable);
- }
- }
-}
diff --git a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxConsumerStreamObserver.java b/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxConsumerStreamObserver.java
deleted file mode 100644
index ef33a61f..00000000
--- a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxConsumerStreamObserver.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.rxgrpc.stub;
-
-import com.google.common.base.Preconditions;
-import io.grpc.Status;
-import io.grpc.stub.ClientCallStreamObserver;
-import io.grpc.stub.ClientResponseObserver;
-import io.reactivex.Flowable;
-import io.reactivex.schedulers.Schedulers;
-
-import java.util.concurrent.CountDownLatch;
-
-/**
- * RxConsumerStreamObserver configures client-side manual flow control for the consuming end of a message stream.
- *
- * @param
- * @param
- */
-public class RxConsumerStreamObserver implements ClientResponseObserver {
- private RxStreamObserverPublisher publisher;
- private Flowable rxConsumer;
- private CountDownLatch beforeStartCalled = new CountDownLatch(1);
-
- public Flowable getRxConsumer() {
- try {
- beforeStartCalled.await();
- } catch (InterruptedException e) {
- throw Status.INTERNAL.withCause(e).asRuntimeException();
- }
- return rxConsumer;
- }
-
-
- @Override
- public void beforeStart(ClientCallStreamObserver requestStream) {
- publisher = new RxStreamObserverPublisher<>(Preconditions.checkNotNull(requestStream));
-
- rxConsumer = Flowable.unsafeCreate(publisher)
- .observeOn(Schedulers.from(RxExecutor.getSerializingExecutor()));
- beforeStartCalled.countDown();
- }
-
- @Override
- public void onNext(TResponse value) {
- Preconditions.checkState(publisher != null, "beforeStart() not yet called");
- publisher.onNext(Preconditions.checkNotNull(value));
- }
-
- @Override
- public void onError(Throwable throwable) {
- Preconditions.checkState(publisher != null, "beforeStart() not yet called");
- publisher.onError(Preconditions.checkNotNull(throwable));
- }
-
- @Override
- public void onCompleted() {
- Preconditions.checkState(publisher != null, "beforeStart() not yet called");
- publisher.onCompleted();
- }
-}
\ No newline at end of file
diff --git a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxExecutor.java b/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxExecutor.java
deleted file mode 100644
index ef733379..00000000
--- a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxExecutor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.rxgrpc.stub;
-
-import io.grpc.internal.GrpcUtil;
-import io.grpc.internal.SerializingExecutor;
-
-import java.util.concurrent.Executor;
-
-/**
- * RxExecutor holds a shared executor used by RxGrpc to marshall messages between RxJava and gRPC streams.
- */
-public final class RxExecutor {
- private RxExecutor() {
-
- }
-
- private static Executor executor = GrpcUtil.SHARED_CHANNEL_EXECUTOR.create();
-
- /**
- * Get the shared executor.
- */
- public static Executor getSerializingExecutor() {
- return new SerializingExecutor(executor);
- }
-}
diff --git a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxFlowableBackpressureOnReadyHandler.java b/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxFlowableBackpressureOnReadyHandler.java
deleted file mode 100644
index a511f07d..00000000
--- a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxFlowableBackpressureOnReadyHandler.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.rxgrpc.stub;
-
-import com.google.common.base.Preconditions;
-import io.grpc.Status;
-import io.grpc.StatusException;
-import io.grpc.StatusRuntimeException;
-import io.grpc.stub.CallStreamObserver;
-import io.grpc.stub.ClientCallStreamObserver;
-import io.grpc.stub.ServerCallStreamObserver;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import java.util.concurrent.CountDownLatch;
-
-/**
- * RxFlowableBackpressureOnReadyHandler bridges the manual flow control idioms of RxJava and gRPC. This class takes
- * messages off of a {@link org.reactivestreams.Publisher} and feeds them into a {@link CallStreamObserver}
- * while respecting backpressure. This class is the inverse of {@link RxStreamObserverPublisher}.
- *
- * When a gRPC publisher's transport wants more data to transmit, the {@link CallStreamObserver}'s onReady handler is
- * signaled. This handler must keep transmitting messages until {@link CallStreamObserver#isReady()} ceases to be true.
- *
- * When a {@link org.reactivestreams.Publisher} is subscribed to by a {@link Subscriber}, the
- * {@code Publisher} hands the {@code Subscriber} a {@link Subscription}. When the {@code Subscriber}
- * wants more messages from the {@code Publisher}, the {@code Subscriber} calls {@link Subscription#request(long)}.
- *
- * To bridge the two idioms: when gRPC wants more messages, the {@code onReadyHandler} is called and {@link #run()}
- * calls the {@code Subscription}'s {@code request()} method, asking the {@code Publisher} to produce another message.
- * Since this class is also registered as the {@code Publisher}'s {@code Subscriber}, the {@link #onNext(Object)}
- * method is called. {@code onNext()} passes the message to gRPC's {@link CallStreamObserver#onNext(Object)} method,
- * and then calls {@code request()} again if {@link CallStreamObserver#isReady()} is true. The loop of
- * request->pass->check is repeated until {@code isReady()} returns false, indicating that the outbound transmit buffer
- * is full and that backpressure must be applied.
- *
- * @param
- */
-public class RxFlowableBackpressureOnReadyHandler implements Subscriber, Runnable {
- private CallStreamObserver requestStream;
- private Subscription subscription;
- private boolean canceled = false;
- private CountDownLatch subscribed = new CountDownLatch(1);
-
- public RxFlowableBackpressureOnReadyHandler(ClientCallStreamObserver requestStream) {
- this.requestStream = Preconditions.checkNotNull(requestStream);
- requestStream.setOnReadyHandler(this);
- }
-
- public RxFlowableBackpressureOnReadyHandler(ServerCallStreamObserver requestStream) {
- this.requestStream = Preconditions.checkNotNull(requestStream);
- requestStream.setOnReadyHandler(this);
- requestStream.setOnCancelHandler(() -> subscription.cancel());
- }
-
- @Override
- public void run() {
- try {
- subscribed.await();
- } catch (InterruptedException e) {
-
- }
- Preconditions.checkState(subscription != null, "onSubscribe() not yet called");
- if (!isCanceled()) {
- // restart the pump
- subscription.request(1);
- }
- }
-
- public void cancel() {
- canceled = true;
- if (subscription != null) {
- subscription.cancel();
- subscription = null;
- }
- }
-
- public boolean isCanceled() {
- return canceled;
- }
-
- @Override
- public void onSubscribe(Subscription subscription) {
- if (this.subscription != null) {
- subscription.cancel();
- } else {
- this.subscription = Preconditions.checkNotNull(subscription);
- subscribed.countDown();
- }
- }
-
- @Override
- public void onNext(T t) {
- if (!isCanceled()) {
- requestStream.onNext(Preconditions.checkNotNull(t));
- if (requestStream.isReady()) {
- // keep the pump going
- subscription.request(1);
- }
- }
- }
-
- @Override
- public void onError(Throwable throwable) {
- requestStream.onError(prepareError(Preconditions.checkNotNull(throwable)));
- }
-
- @Override
- public void onComplete() {
- requestStream.onCompleted();
- }
-
- private static Throwable prepareError(Throwable throwable) {
- if (throwable instanceof StatusException || throwable instanceof StatusRuntimeException) {
- return throwable;
- } else {
- return Status.fromThrowable(throwable).asException();
- }
- }
-}
diff --git a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxProducerConsumerStreamObserver.java b/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxProducerConsumerStreamObserver.java
deleted file mode 100644
index b27ba4f2..00000000
--- a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxProducerConsumerStreamObserver.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.rxgrpc.stub;
-
-import com.google.common.base.Preconditions;
-import io.grpc.stub.ClientCallStreamObserver;
-import io.reactivex.Flowable;
-
-/**
- * RxProducerConsumerStreamObserver configures client-side manual flow control for when the client is both producing
- * and consuming streams of messages.
- *
- * @param
- * @param
- */
-public class RxProducerConsumerStreamObserver extends RxConsumerStreamObserver {
- private Flowable rxProducer;
- private RxFlowableBackpressureOnReadyHandler onReadyHandler;
-
- public RxProducerConsumerStreamObserver(Flowable rxProducer) {
- this.rxProducer = rxProducer;
- }
-
- @Override
- public void beforeStart(ClientCallStreamObserver requestStream) {
- super.beforeStart(Preconditions.checkNotNull(requestStream));
- onReadyHandler = new RxFlowableBackpressureOnReadyHandler<>(requestStream);
- }
-
- public void rxSubscribe() {
- rxProducer.subscribe(onReadyHandler);
- }
-
- public void cancel() {
- onReadyHandler.cancel();
- }
-}
diff --git a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxProducerStreamObserver.java b/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxProducerStreamObserver.java
deleted file mode 100644
index 24133395..00000000
--- a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxProducerStreamObserver.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.rxgrpc.stub;
-
-import com.google.common.base.Preconditions;
-import com.salesforce.grpc.contrib.LambdaStreamObserver;
-import io.grpc.stub.ClientCallStreamObserver;
-import io.grpc.stub.ClientResponseObserver;
-import io.reactivex.Flowable;
-
-import java.util.function.Consumer;
-
-/**
- * LambdaStreamObserver configures client-side manual flow control for the producing end of a message stream.
- *
- * @param
- * @param
- */
-public class RxProducerStreamObserver extends LambdaStreamObserver implements ClientResponseObserver {
- private Flowable rxProducer;
- private RxFlowableBackpressureOnReadyHandler onReadyHandler;
-
- public RxProducerStreamObserver(Flowable rxProducer, Consumer onNext, Consumer onError, Runnable onCompleted) {
- super(
- Preconditions.checkNotNull(onNext),
- Preconditions.checkNotNull(onError),
- Preconditions.checkNotNull(onCompleted)
- );
- this.rxProducer = Preconditions.checkNotNull(rxProducer);
- }
-
- @Override
- public void beforeStart(ClientCallStreamObserver producerStream) {
- Preconditions.checkNotNull(producerStream);
- // Subscribe to the rxProducer with an adapter to a gRPC StreamObserver that respects backpressure
- // signals from the underlying gRPC client transport.
- onReadyHandler = new RxFlowableBackpressureOnReadyHandler<>(producerStream);
- }
-
- public void rxSubscribe() {
- rxProducer.subscribe(onReadyHandler);
- }
-
- public void cancel() {
- onReadyHandler.cancel();
- }
-}
\ No newline at end of file
diff --git a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxStreamObserverPublisher.java b/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxStreamObserverPublisher.java
deleted file mode 100644
index fa685330..00000000
--- a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/RxStreamObserverPublisher.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.rxgrpc.stub;
-
-import com.google.common.base.Preconditions;
-import io.grpc.Status;
-import io.grpc.stub.CallStreamObserver;
-import io.grpc.stub.ClientCallStreamObserver;
-import io.grpc.stub.ServerCallStreamObserver;
-import io.grpc.stub.StreamObserver;
-import org.reactivestreams.Publisher;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import java.util.concurrent.CountDownLatch;
-
-/**
- * RxStreamObserverPublisher bridges the manual flow control idioms of gRPC and RxJava. This class takes
- * messages off of a {@link StreamObserver} and feeds them into a {@link Publisher} while respecting backpressure. This
- * class is the inverse of {@link RxFlowableBackpressureOnReadyHandler}.
- *
- * When a {@link Publisher} is subscribed to by a {@link Subscriber}, the {@code Publisher} hands the {@code Subscriber}
- * a {@link Subscription}. When the {@code Subscriber} wants more messages from the {@code Publisher}, the
- * {@code Subscriber} calls {@link Subscription#request(long)}.
- *
- * gRPC also uses the {@link CallStreamObserver#request(int)} idiom to request more messages from the stream.
- *
- * To bridge the two idioms: this class implements a {@code Publisher} which delegates calls to {@code request()} to
- * a {@link CallStreamObserver} set in the constructor. When a message is generated as a response, the message is
- * delegated in the reverse so the {@code Publisher} can announce it to RxJava.
- *
- * @param
- */
-public class RxStreamObserverPublisher implements Publisher, StreamObserver {
- private CallStreamObserver callStreamObserver;
- private Subscriber super T> subscriber;
- private volatile boolean isCanceled;
-
- // A gRPC server can sometimes send messages before subscribe() has been called and the consumer may not have
- // finished setting up the consumer pipeline. Use a countdown latch to prevent messages from processing before
- // subscribe() has been called.
- private CountDownLatch subscribed = new CountDownLatch(1);
-
- public RxStreamObserverPublisher(CallStreamObserver callStreamObserver) {
- Preconditions.checkNotNull(callStreamObserver);
- this.callStreamObserver = callStreamObserver;
- callStreamObserver.disableAutoInboundFlowControl();
- }
-
- @Override
- public void subscribe(Subscriber super T> subscriber) {
- Preconditions.checkNotNull(subscriber);
- subscriber.onSubscribe(new Subscription() {
- @Override
- public void request(long l) {
- // RxJava uses Long.MAX_VALUE to indicate "all messages"; gRPC uses Integer.MAX_VALUE.
- int i = (int) Long.min(l, Integer.MAX_VALUE);
-
- // Very rarely, request() gets called before the client has finished setting up its stream. If this
- // happens, wait momentarily and try again.
- try {
- callStreamObserver.request(i);
- } catch (IllegalStateException ex) {
- try {
- Thread.sleep(2);
- } catch (InterruptedException e) {
- // no-op
- }
- callStreamObserver.request(i);
- }
- }
-
- @Override
- public void cancel() {
- // Don't cancel twice if the server is already canceled
- if (callStreamObserver instanceof ServerCallStreamObserver && ((ServerCallStreamObserver) callStreamObserver).isCancelled()) {
- return;
- }
-
- isCanceled = true;
- if (callStreamObserver instanceof ClientCallStreamObserver) {
- ((ClientCallStreamObserver) callStreamObserver).cancel("Client canceled request", null);
- } else {
- callStreamObserver.onError(Status.CANCELLED.withDescription("Server canceled request").asRuntimeException());
- }
- }
- });
- this.subscriber = subscriber;
-
- subscribed.countDown();
- }
-
- @Override
- public void onNext(T value) {
- try {
- subscribed.await();
- } catch (InterruptedException e) {
-
- }
- subscriber.onNext(Preconditions.checkNotNull(value));
- }
-
- @Override
- public void onError(Throwable t) {
- try {
- subscribed.await();
- } catch (InterruptedException e) {
-
- }
-
- subscriber.onError(Preconditions.checkNotNull(t));
- // Release the subscriber, we don't need a reference to it anymore
- subscriber = null;
- }
-
- @Override
- public void onCompleted() {
- try {
- subscribed.await();
- } catch (InterruptedException e) {
-
- }
- subscriber.onComplete();
- // Release the subscriber, we don't need a reference to it anymore
- subscriber = null;
- }
-
- public boolean isCanceled() {
- return isCanceled;
- }
-}
diff --git a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/ServerCalls.java b/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/ServerCalls.java
deleted file mode 100644
index fddbeeca..00000000
--- a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/ServerCalls.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.rxgrpc.stub;
-
-import com.google.common.base.Preconditions;
-import com.salesforce.grpc.contrib.LambdaStreamObserver;
-import io.grpc.Status;
-import io.grpc.StatusException;
-import io.grpc.StatusRuntimeException;
-import io.grpc.stub.CallStreamObserver;
-import io.grpc.stub.ServerCallStreamObserver;
-import io.grpc.stub.StreamObserver;
-import io.reactivex.Flowable;
-import io.reactivex.Single;
-import io.reactivex.schedulers.Schedulers;
-import org.reactivestreams.Subscriber;
-
-import java.util.function.Function;
-
-/**
- * Utility functions for processing different server call idioms. We have one-to-one correspondence
- * between utilities in this class and the potential signatures in a generated server stub class so
- * that the runtime can vary behavior without requiring regeneration of the stub.
- */
-public final class ServerCalls {
- private ServerCalls() {
-
- }
-
- /**
- * Implements a unary -> unary call using {@link Single} -> {@link Single}.
- */
- public static void oneToOne(
- TRequest request, StreamObserver responseObserver,
- Function, Single> delegate) {
- try {
- Single rxRequest = Single.just(request);
-
- Single rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
- rxResponse.subscribe(
- value -> {
- // Don't try to respond if the server has already canceled the request
- if (responseObserver instanceof ServerCallStreamObserver && ((ServerCallStreamObserver) responseObserver).isCancelled()) {
- return;
- }
- responseObserver.onNext(value);
- responseObserver.onCompleted();
- },
- throwable -> responseObserver.onError(prepareError(throwable)));
- } catch (Throwable throwable) {
- responseObserver.onError(prepareError(throwable));
- }
- }
-
- /**
- * Implements a unary -> stream call as {@link Single} -> {@link Flowable}, where the server responds with a
- * stream of messages.
- */
- public static void oneToMany(
- TRequest request, StreamObserver responseObserver,
- Function, Flowable> delegate) {
- try {
- Single rxRequest = Single.just(request);
-
- Flowable rxResponse = Preconditions.checkNotNull(delegate.apply(rxRequest));
- rxResponse.subscribe(new RxFlowableBackpressureOnReadyHandler<>(
- (ServerCallStreamObserver) responseObserver));
- } catch (Throwable throwable) {
- responseObserver.onError(prepareError(throwable));
- }
- }
-
- /**
- * Implements a stream -> unary call as {@link Flowable} -> {@link Single}, where the client transits a stream of
- * messages.
- */
- public static StreamObserver manyToOne(
- StreamObserver responseObserver,
- Function, Single> delegate) {
- RxStreamObserverPublisher streamObserverPublisher =
- new RxStreamObserverPublisher<>((CallStreamObserver) responseObserver);
-
- try {
- Single rxResponse = Preconditions.checkNotNull(delegate.apply(
- Flowable.unsafeCreate(streamObserverPublisher)
- .observeOn(Schedulers.from(RxExecutor.getSerializingExecutor()))));
- rxResponse.subscribe(
- value -> {
- // Don't try to respond if the server has already canceled the request
- if (!streamObserverPublisher.isCanceled()) {
- responseObserver.onNext(value);
- responseObserver.onCompleted();
- }
- },
- throwable -> {
- // Don't try to respond if the server has already canceled the request
- if (!streamObserverPublisher.isCanceled()) {
- responseObserver.onError(prepareError(throwable));
- }
- }
- );
- } catch (Throwable throwable) {
- responseObserver.onError(prepareError(throwable));
- }
-
- return new LambdaStreamObserver<>(
- streamObserverPublisher::onNext,
- streamObserverPublisher::onError,
- streamObserverPublisher::onCompleted);
- }
-
- /**
- * Implements a bidirectional stream -> stream call as {@link Flowable} -> {@link Flowable}, where both the client
- * and the server independently stream to each other.
- */
- public static StreamObserver manyToMany(
- StreamObserver responseObserver,
- Function, Flowable> delegate) {
- RxStreamObserverPublisher streamObserverPublisher =
- new RxStreamObserverPublisher<>((CallStreamObserver) responseObserver);
-
- try {
- Flowable rxResponse = Preconditions.checkNotNull(delegate.apply(
- Flowable.unsafeCreate(streamObserverPublisher)
- .observeOn(Schedulers.from(RxExecutor.getSerializingExecutor()))));
- Subscriber subscriber = new RxFlowableBackpressureOnReadyHandler<>(
- (ServerCallStreamObserver) responseObserver);
- // Don't try to respond if the server has already canceled the request
- rxResponse.subscribe(
- tResponse -> {
- if (!streamObserverPublisher.isCanceled()) {
- subscriber.onNext(tResponse);
- }
- },
- throwable -> {
- if (!streamObserverPublisher.isCanceled()) {
- subscriber.onError(throwable);
- }
- },
- () -> {
- if (!streamObserverPublisher.isCanceled()) {
- subscriber.onComplete();
- }
- },
- subscriber::onSubscribe
- );
- } catch (Throwable throwable) {
- responseObserver.onError(prepareError(throwable));
- }
-
- return new LambdaStreamObserver<>(
- streamObserverPublisher::onNext,
- streamObserverPublisher::onError,
- streamObserverPublisher::onCompleted);
- }
-
- private static Throwable prepareError(Throwable throwable) {
- if (throwable instanceof StatusException || throwable instanceof StatusRuntimeException) {
- return throwable;
- } else {
- return Status.fromThrowable(throwable).asException();
- }
- }
-}
diff --git a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/SubscribeOnlyOnceFlowableOperator.java b/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/SubscribeOnlyOnceFlowableOperator.java
deleted file mode 100644
index 990cce2d..00000000
--- a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/SubscribeOnlyOnceFlowableOperator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.rxgrpc.stub;
-
-
-import io.reactivex.FlowableOperator;
-import org.reactivestreams.Subscriber;
-import org.reactivestreams.Subscription;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * SubscribeOnlyOnceFlowableOperator throws an exception if a user attempts to subscribe more than once to a
- * {@link io.reactivex.Flowable}.
- *
- * @param
- */
-public class SubscribeOnlyOnceFlowableOperator implements FlowableOperator {
- private AtomicBoolean subscribedOnce = new AtomicBoolean(false);
-
- @Override
- public Subscriber super T> apply(Subscriber super T> observer) throws Exception {
- return new Subscriber() {
- @Override
- public void onSubscribe(Subscription subscription) {
- if (subscribedOnce.getAndSet(true)) {
- throw new NullPointerException("You cannot directly subscribe to a gRPC service multiple times " +
- "concurrently. Use Flowable.share() instead.");
- } else {
- observer.onSubscribe(subscription);
- }
- }
-
- @Override
- public void onNext(T t) {
- observer.onNext(t);
- }
-
- @Override
- public void onError(Throwable throwable) {
- observer.onError(throwable);
- }
-
- @Override
- public void onComplete() {
- observer.onComplete();
- }
- };
- }
-}
diff --git a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/SubscribeOnlyOnceSingleOperator.java b/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/SubscribeOnlyOnceSingleOperator.java
deleted file mode 100644
index 4c3349b2..00000000
--- a/rxgrpc/rxgrpc-stub/src/main/java/com/salesforce/rxgrpc/stub/SubscribeOnlyOnceSingleOperator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.rxgrpc.stub;
-
-import io.reactivex.SingleObserver;
-import io.reactivex.SingleOperator;
-import io.reactivex.disposables.Disposable;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * SubscribeOnlyOnceSingleOperator throws an exception if a user attempts to subscribe more than once to a
- * {@link io.reactivex.Single}.
- *
- * @param
- */
-public class SubscribeOnlyOnceSingleOperator implements SingleOperator {
- private AtomicBoolean subscribedOnce = new AtomicBoolean(false);
-
- @Override
- public SingleObserver super T> apply(SingleObserver super T> observer) throws Exception {
- return new SingleObserver() {
- @Override
- public void onSubscribe(Disposable d) {
- if (subscribedOnce.getAndSet(true)) {
- throw new NullPointerException("You cannot directly subscribe to a gRPC service multiple times " +
- "concurrently. Use Flowable.share() instead.");
- } else {
- observer.onSubscribe(d);
- }
- }
-
- @Override
- public void onSuccess(T t) {
- observer.onSuccess(t);
- }
-
- @Override
- public void onError(Throwable e) {
- observer.onError(e);
- }
- };
- }
-}
diff --git a/rxgrpc/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/CancellableStreamObserverTest.java b/rxgrpc/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/CancellableStreamObserverTest.java
deleted file mode 100644
index 1e42ca77..00000000
--- a/rxgrpc/rxgrpc-stub/src/test/java/com/salesforce/rxgrpc/stub/CancellableStreamObserverTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Copyright (c) 2017, salesforce.com, inc.
- * All rights reserved.
- * Licensed under the BSD 3-Clause license.
- * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
- */
-
-package com.salesforce.rxgrpc.stub;
-
-import io.grpc.Status;
-import io.grpc.stub.ClientResponseObserver;
-import org.junit.Test;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.mockito.Mockito.*;
-import static org.assertj.core.api.Assertions.*;
-
-@SuppressWarnings("ALL")
-public class CancellableStreamObserverTest {
- @Test
- public void statusExceptionTriggersHandler() {
- ClientResponseObserver