diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/Servers.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/Servers.java
index 707c5ad2..5a023cef 100644
--- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/Servers.java
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/Servers.java
@@ -28,13 +28,28 @@ public final class Servers {
* @throws InterruptedException if waiting for termination is interrupted
*/
public static Server shutdownGracefully(Server server, long maxWaitTimeInMillis) throws InterruptedException {
+ return shutdownGracefully(server, maxWaitTimeInMillis, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Attempt to {@link Server#shutdown()} the {@link Server} gracefully. If the max wait time is exceeded, give up and
+ * perform a hard {@link Server#shutdownNow()}.
+ *
+ * @param server the server to be shutdown
+ * @param timeout the max amount of time to wait for graceful shutdown to occur
+ * @param unit the time unit denominating the shutdown timeout
+ * @return the given server
+ * @throws InterruptedException if waiting for termination is interrupted
+ */
+ public static Server shutdownGracefully(Server server, long timeout, TimeUnit unit) throws InterruptedException {
Preconditions.checkNotNull(server, "server");
- Preconditions.checkArgument(maxWaitTimeInMillis > 0, "maxWaitTimeInMillis must be greater than 0");
+ Preconditions.checkArgument(timeout > 0, "timeout must be greater than 0");
+ Preconditions.checkNotNull(unit, "unit");
server.shutdown();
try {
- server.awaitTermination(maxWaitTimeInMillis, TimeUnit.MILLISECONDS);
+ server.awaitTermination(timeout, unit);
} finally {
server.shutdownNow();
}
diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/ServersTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/ServersTest.java
index c63f49e7..26a4fa66 100644
--- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/ServersTest.java
+++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/ServersTest.java
@@ -34,7 +34,7 @@ public void shutdownGracefullyThrowsIfMaxWaitTimeInMillisIsZero() {
assertThatThrownBy(() -> Servers.shutdownGracefully(server, 0))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("maxWaitTimeInMillis");
+ .hasMessageContaining("timeout must be greater than 0");
}
@Test
@@ -44,7 +44,7 @@ public void shutdownGracefullyThrowsIfMaxWaitTimeInMillisIsLessThanZero() {
assertThatThrownBy(() -> Servers.shutdownGracefully(server, maxWaitTimeInMillis))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("maxWaitTimeInMillis");
+ .hasMessageContaining("timeout must be greater than 0");
}
@Test
diff --git a/grpc-testing-contrib/pom.xml b/grpc-testing-contrib/pom.xml
new file mode 100644
index 00000000..2b25a913
--- /dev/null
+++ b/grpc-testing-contrib/pom.xml
@@ -0,0 +1,54 @@
+
+
+
+
+
+ grpc-contrib-parent
+ com.salesforce.servicelibs
+ 0.7.1-SNAPSHOT
+
+ 4.0.0
+
+ grpc-testing-contrib
+
+
+
+ io.grpc
+ grpc-core
+
+
+ io.grpc
+ grpc-stub
+
+
+ io.grpc
+ grpc-netty
+
+
+ com.salesforce.servicelibs
+ grpc-contrib
+
+
+ junit
+ junit
+ compile
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ io.grpc
+ grpc-testing-proto
+ test
+
+
+
\ No newline at end of file
diff --git a/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/GrpcContextRule.java b/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/GrpcContextRule.java
new file mode 100644
index 00000000..a9652ba5
--- /dev/null
+++ b/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/GrpcContextRule.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.testing.contrib;
+
+import io.grpc.Context;
+import org.junit.Assert;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * {@code GrpcContextRule} is a JUnit {@link TestRule} that forcibly resets the gRPC
+ * {@link Context} to {@link Context#ROOT} between every unit test.
+ *
+ *
This rule makes it easier to correctly implement correct unit tests by preventing the
+ * accidental leakage of context state between tests.
+ */
+public class GrpcContextRule implements TestRule {
+ @Override
+ public Statement apply(final Statement base, final Description description) {
+ return new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ // Reset the gRPC context between test executions
+ Context prev = Context.ROOT.attach();
+ try {
+ base.evaluate();
+ if (Context.current() != Context.ROOT) {
+ Assert.fail("Test is leaking context state between tests! Ensure proper " +
+ "attach()/detach() pairing.");
+ }
+ } finally {
+ Context.ROOT.detach(prev);
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRule.java b/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRule.java
new file mode 100644
index 00000000..b69ff510
--- /dev/null
+++ b/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRule.java
@@ -0,0 +1,136 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.testing.contrib;
+
+import com.salesforce.grpc.contrib.Servers;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.netty.NettyChannelBuilder;
+import io.grpc.netty.NettyServerBuilder;
+import io.grpc.util.MutableHandlerRegistry;
+import org.junit.rules.ExternalResource;
+
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * {@code NettyGrpcServerRule} is a JUnit {@link org.junit.rules.TestRule} that starts a gRPC Netty service with
+ * a {@link MutableHandlerRegistry} for adding services. It is particularly useful for testing middleware and
+ * interceptors using the "real" gRPC wire protocol instead of the InProcess protocol. While InProcess testing works
+ * 99% of the time, the Netty and InProcess transports have different flow control and serialization semantics that
+ * can have an affect on low-level gRPC integrations.
+ *
+ *
An {@link io.grpc.stub.AbstractStub} can be created against this service by using the
+ * {@link ManagedChannel} provided by {@link NettyGrpcServerRule#getChannel()}.
+ */
+public class NettyGrpcServerRule extends ExternalResource {
+
+ private ManagedChannel channel;
+ private Server server;
+ private MutableHandlerRegistry serviceRegistry;
+ private boolean useDirectExecutor;
+ private int port = 0;
+
+ private Consumer configureServerBuilder = sb -> { };
+ private Consumer configureChannelBuilder = cb -> { };
+
+ /**
+ * Provides a way to configure the {@code NettyServerBuilder} used for testing.
+ */
+ public final NettyGrpcServerRule configureServerBuilder(Consumer configureServerBuilder) {
+ checkState(port == 0, "configureServerBuilder() can only be called at the rule instantiation");
+ this.configureServerBuilder = checkNotNull(configureServerBuilder, "configureServerBuilder");
+ return this;
+ }
+
+ /**
+ * Provides a way to configure the {@code NettyChannelBuilder} used for testing.
+ */
+ public final NettyGrpcServerRule configureChannelBuilder(Consumer configureChannelBuilder) {
+ checkState(port == 0, "configureChannelBuilder() can only be called at the rule instantiation");
+ this.configureChannelBuilder = checkNotNull(configureChannelBuilder, "configureChannelBuilder");
+ return this;
+ }
+
+ /**
+ * Returns a {@link ManagedChannel} connected to this service.
+ */
+ public final ManagedChannel getChannel() {
+ return channel;
+ }
+
+ /**
+ * Returns the underlying gRPC {@link Server} for this service.
+ */
+ public final Server getServer() {
+ return server;
+ }
+
+ /**
+ * Returns the randomly generated TCP port for this service.
+ */
+ public final int getPort() {
+ return port;
+ }
+
+ /**
+ * Returns the service registry for this service. The registry is used to add service instances
+ * (e.g. {@link io.grpc.BindableService} or {@link io.grpc.ServerServiceDefinition} to the server.
+ */
+ public final MutableHandlerRegistry getServiceRegistry() {
+ return serviceRegistry;
+ }
+
+ /**
+ * Before the test has started, create the server and channel.
+ */
+ @Override
+ protected void before() throws Throwable {
+ serviceRegistry = new MutableHandlerRegistry();
+
+ NettyServerBuilder serverBuilder = NettyServerBuilder
+ .forPort(0)
+ .fallbackHandlerRegistry(serviceRegistry);
+
+ if (useDirectExecutor) {
+ serverBuilder.directExecutor();
+ }
+
+ configureServerBuilder.accept(serverBuilder);
+ server = serverBuilder.build().start();
+ port = server.getPort();
+
+ NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress("localhost", port).usePlaintext(true);
+ configureChannelBuilder.accept(channelBuilder);
+ channel = channelBuilder.build();
+ }
+
+ /**
+ * After the test has completed, clean up the channel and server.
+ */
+ @Override
+ protected void after() {
+ serviceRegistry = null;
+
+ channel.shutdown();
+ channel = null;
+ port = 0;
+
+ try {
+ Servers.shutdownGracefully(server, 1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } finally {
+ server = null;
+ }
+ }
+}
diff --git a/grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/GrpcContextRuleTest.java b/grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/GrpcContextRuleTest.java
new file mode 100644
index 00000000..3d0352d3
--- /dev/null
+++ b/grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/GrpcContextRuleTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2018, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.testing.contrib;
+
+import io.grpc.Context;
+import org.junit.Test;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.fail;
+
+public class GrpcContextRuleTest {
+ @Test
+ public void ruleSetsContextToRoot() {
+ Context.current().withValue(Context.key("foo"), "bar").run(() -> {
+ assertThat(Context.current()).isNotEqualTo(Context.ROOT);
+
+ try {
+ GrpcContextRule rule = new GrpcContextRule();
+ rule.apply(new Statement() {
+ @Override
+ public void evaluate() {
+ assertThat(Context.current()).isEqualTo(Context.ROOT);
+ }
+ }, Description.createTestDescription(GrpcContextRuleTest.class, "ruleSetsContextToRoot"))
+ .evaluate();
+ } catch (Throwable throwable) {
+ fail(throwable.getMessage());
+ }
+ });
+ }
+
+ @Test
+ public void ruleFailsIfContextLeaks() {
+ Context.current().withValue(Context.key("foo"), "bar").run(() -> {
+ assertThat(Context.current()).isNotEqualTo(Context.ROOT);
+
+ assertThatThrownBy(() -> {
+ GrpcContextRule rule = new GrpcContextRule();
+ rule.apply(new Statement() {
+ @Override
+ public void evaluate() {
+ // Leak context
+ Context.current().withValue(Context.key("cheese"), "baz").attach();
+ }
+ }, Description.createTestDescription(GrpcContextRuleTest.class, "ruleSetsContextToRoot"))
+ .evaluate();
+ }).isInstanceOf(AssertionError.class).hasMessageContaining("Test is leaking context");
+ });
+ }
+}
diff --git a/grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRuleTest.java b/grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRuleTest.java
new file mode 100644
index 00000000..3c180016
--- /dev/null
+++ b/grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRuleTest.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright (c) 2018, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.testing.contrib;
+
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.stub.StreamObserver;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runners.model.Statement;
+import io.grpc.testing.protobuf.SimpleRequest;
+import io.grpc.testing.protobuf.SimpleResponse;
+import io.grpc.testing.protobuf.SimpleServiceGrpc;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@SuppressWarnings("Duplicates")
+public class NettyGrpcServerRuleTest {
+ @Rule public final NettyGrpcServerRule grpcServerRule = new NettyGrpcServerRule();
+
+ @Test
+ public void serverAndChannelAreStarted_withoutDirectExecutor() {
+ assertThat(grpcServerRule.getServer().isShutdown()).isFalse();
+ assertThat(grpcServerRule.getServer().isTerminated()).isFalse();
+
+ assertThat(grpcServerRule.getChannel().isShutdown()).isFalse();
+ assertThat(grpcServerRule.getChannel().isTerminated()).isFalse();
+
+ assertThat(grpcServerRule.getPort()).isNotZero();
+ assertThat(grpcServerRule.getServiceRegistry()).isNotNull();
+ }
+
+ @Test
+ public void serverAllowsServicesToBeAddedViaServiceRegistry_withoutDirectExecutor() {
+ TestServiceImpl testService = new TestServiceImpl();
+
+ grpcServerRule.getServiceRegistry().addService(testService);
+
+ SimpleServiceGrpc.SimpleServiceBlockingStub stub =
+ SimpleServiceGrpc.newBlockingStub(grpcServerRule.getChannel());
+
+ SimpleRequest request1 = SimpleRequest.getDefaultInstance();
+
+ SimpleRequest request2 = SimpleRequest.newBuilder().build();
+
+ stub.unaryRpc(request1);
+ stub.unaryRpc(request2);
+
+ assertThat(testService.unaryCallRequests).containsExactly(request1, request2);
+ }
+
+ @Test
+ public void serviceIsNotRunOnSameThreadAsTest_withoutDirectExecutor() {
+ TestServiceImpl testService = new TestServiceImpl();
+
+ grpcServerRule.getServiceRegistry().addService(testService);
+
+ SimpleServiceGrpc.SimpleServiceBlockingStub stub =
+ SimpleServiceGrpc.newBlockingStub(grpcServerRule.getChannel());
+
+ stub.serverStreamingRpc(SimpleRequest.getDefaultInstance());
+
+ assertThat(testService.lastServerStreamingRpcThread).isNotEqualTo(Thread.currentThread());
+ }
+
+ @Test
+ public void serverAndChannelAreShutdownAfterRule() throws Throwable {
+ NettyGrpcServerRule grpcServerRule = new NettyGrpcServerRule();
+
+ // Before the rule has been executed, all of its resources should be null.
+ assertThat(grpcServerRule.getChannel()).isNull();
+ assertThat(grpcServerRule.getServer()).isNull();
+ assertThat(grpcServerRule.getPort()).isZero();
+ assertThat(grpcServerRule.getServiceRegistry()).isNull();
+
+ // The TestStatement stores the channel and server instances so that we can inspect them after
+ // the rule cleans up.
+ TestStatement statement = new TestStatement(grpcServerRule);
+
+ grpcServerRule.apply(statement, null).evaluate();
+
+ // Ensure that the stored channel and server instances were shut down.
+ assertThat(statement.channel.isShutdown()).isTrue();
+ assertThat(statement.server.isShutdown()).isTrue();
+
+ // All references to the resources that we created should be set to null.
+ assertThat(grpcServerRule.getChannel()).isNull();
+ assertThat(grpcServerRule.getServer()).isNull();
+ assertThat(grpcServerRule.getPort()).isZero();
+ assertThat(grpcServerRule.getServiceRegistry()).isNull();
+ }
+
+ private static class TestStatement extends Statement {
+
+ private final NettyGrpcServerRule grpcServerRule;
+
+ private ManagedChannel channel;
+ private Server server;
+
+ private TestStatement(NettyGrpcServerRule grpcServerRule) {
+ this.grpcServerRule = grpcServerRule;
+ }
+
+ @Override
+ public void evaluate() throws Throwable {
+ channel = grpcServerRule.getChannel();
+ server = grpcServerRule.getServer();
+ }
+ }
+
+ private static class TestServiceImpl extends SimpleServiceGrpc.SimpleServiceImplBase {
+
+ private final Collection unaryCallRequests =
+ new ConcurrentLinkedQueue();
+
+ private volatile Thread lastServerStreamingRpcThread;
+
+ @Override
+ public void serverStreamingRpc(
+ SimpleRequest request, StreamObserver responseObserver) {
+
+ lastServerStreamingRpcThread = Thread.currentThread();
+
+ responseObserver.onNext(SimpleResponse.getDefaultInstance());
+
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void unaryRpc(
+ SimpleRequest request, StreamObserver responseObserver) {
+
+ unaryCallRequests.add(request);
+
+ responseObserver.onNext(SimpleResponse.getDefaultInstance());
+
+ responseObserver.onCompleted();
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 4a025f18..9a666d06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
grpc-spring
jprotoc-test
demos/grpc-java-contrib-demo
+ grpc-testing-contrib
@@ -107,16 +108,6 @@
grpc-spring
${project.version}
-
- com.salesforce.servicelibs
- rxgrpc
- ${project.version}
-
-
- com.salesforce.servicelibs
- rxgrpc-stub
- ${project.version}
-
com.salesforce.servicelibs
jprotoc
@@ -182,6 +173,12 @@
${grpc.version}
test
+
+ io.grpc
+ grpc-testing-proto
+ ${grpc.version}
+ test
+
junit
junit