From e521101d9e2337f02ceb6bb562b19434086aea61 Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Thu, 11 Jan 2018 14:26:47 -0800 Subject: [PATCH 1/2] Added GrpcContextRule and NettyGrpcServerRule --- .../com/salesforce/grpc/contrib/Servers.java | 19 ++- grpc-testing-contrib/pom.xml | 54 +++++++ .../grpc/testing/contrib/GrpcContextRule.java | 45 ++++++ .../testing/contrib/NettyGrpcServerRule.java | 136 ++++++++++++++++ .../testing/contrib/GrpcContextRuleTest.java | 58 +++++++ .../contrib/NettyGrpcServerRuleTest.java | 148 ++++++++++++++++++ pom.xml | 17 +- 7 files changed, 465 insertions(+), 12 deletions(-) create mode 100644 grpc-testing-contrib/pom.xml create mode 100644 grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/GrpcContextRule.java create mode 100644 grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRule.java create mode 100644 grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/GrpcContextRuleTest.java create mode 100644 grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRuleTest.java diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/Servers.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/Servers.java index 707c5ad2..5a023cef 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/Servers.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/Servers.java @@ -28,13 +28,28 @@ public final class Servers { * @throws InterruptedException if waiting for termination is interrupted */ public static Server shutdownGracefully(Server server, long maxWaitTimeInMillis) throws InterruptedException { + return shutdownGracefully(server, maxWaitTimeInMillis, TimeUnit.MILLISECONDS); + } + + /** + * Attempt to {@link Server#shutdown()} the {@link Server} gracefully. If the max wait time is exceeded, give up and + * perform a hard {@link Server#shutdownNow()}. + * + * @param server the server to be shutdown + * @param timeout the max amount of time to wait for graceful shutdown to occur + * @param unit the time unit denominating the shutdown timeout + * @return the given server + * @throws InterruptedException if waiting for termination is interrupted + */ + public static Server shutdownGracefully(Server server, long timeout, TimeUnit unit) throws InterruptedException { Preconditions.checkNotNull(server, "server"); - Preconditions.checkArgument(maxWaitTimeInMillis > 0, "maxWaitTimeInMillis must be greater than 0"); + Preconditions.checkArgument(timeout > 0, "timeout must be greater than 0"); + Preconditions.checkNotNull(unit, "unit"); server.shutdown(); try { - server.awaitTermination(maxWaitTimeInMillis, TimeUnit.MILLISECONDS); + server.awaitTermination(timeout, unit); } finally { server.shutdownNow(); } diff --git a/grpc-testing-contrib/pom.xml b/grpc-testing-contrib/pom.xml new file mode 100644 index 00000000..2b25a913 --- /dev/null +++ b/grpc-testing-contrib/pom.xml @@ -0,0 +1,54 @@ + + + + + + grpc-contrib-parent + com.salesforce.servicelibs + 0.7.1-SNAPSHOT + + 4.0.0 + + grpc-testing-contrib + + + + io.grpc + grpc-core + + + io.grpc + grpc-stub + + + io.grpc + grpc-netty + + + com.salesforce.servicelibs + grpc-contrib + + + junit + junit + compile + + + org.assertj + assertj-core + test + + + io.grpc + grpc-testing-proto + test + + + \ No newline at end of file diff --git a/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/GrpcContextRule.java b/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/GrpcContextRule.java new file mode 100644 index 00000000..ca6a5e72 --- /dev/null +++ b/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/GrpcContextRule.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.testing.contrib; + +import io.grpc.Context; +import org.junit.Assert; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import java.util.logging.Logger; + +/** + * {@code GrpcContextRule} is a JUnit {@link TestRule} that forcibly resets the gRPC + * {@link Context} to {@link Context#ROOT} between every unit test. + * + *

This rule makes it easier to correctly implement correct unit tests by preventing the + * accidental leakage of context state between tests. + */ +public class GrpcContextRule implements TestRule { + @Override + public Statement apply(final Statement base, final Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + // Reset the gRPC context between test executions + Context prev = Context.ROOT.attach(); + try { + base.evaluate(); + if (Context.current() != Context.ROOT) { + Assert.fail("Test is leaking context state between tests! Ensure proper " + + "attach()/detach() pairing."); + } + } finally { + Context.ROOT.detach(prev); + } + } + }; + } +} \ No newline at end of file diff --git a/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRule.java b/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRule.java new file mode 100644 index 00000000..b69ff510 --- /dev/null +++ b/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRule.java @@ -0,0 +1,136 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.testing.contrib; + +import com.salesforce.grpc.contrib.Servers; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.NettyServerBuilder; +import io.grpc.util.MutableHandlerRegistry; +import org.junit.rules.ExternalResource; + +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +/** + * {@code NettyGrpcServerRule} is a JUnit {@link org.junit.rules.TestRule} that starts a gRPC Netty service with + * a {@link MutableHandlerRegistry} for adding services. It is particularly useful for testing middleware and + * interceptors using the "real" gRPC wire protocol instead of the InProcess protocol. While InProcess testing works + * 99% of the time, the Netty and InProcess transports have different flow control and serialization semantics that + * can have an affect on low-level gRPC integrations. + * + *

An {@link io.grpc.stub.AbstractStub} can be created against this service by using the + * {@link ManagedChannel} provided by {@link NettyGrpcServerRule#getChannel()}. + */ +public class NettyGrpcServerRule extends ExternalResource { + + private ManagedChannel channel; + private Server server; + private MutableHandlerRegistry serviceRegistry; + private boolean useDirectExecutor; + private int port = 0; + + private Consumer configureServerBuilder = sb -> { }; + private Consumer configureChannelBuilder = cb -> { }; + + /** + * Provides a way to configure the {@code NettyServerBuilder} used for testing. + */ + public final NettyGrpcServerRule configureServerBuilder(Consumer configureServerBuilder) { + checkState(port == 0, "configureServerBuilder() can only be called at the rule instantiation"); + this.configureServerBuilder = checkNotNull(configureServerBuilder, "configureServerBuilder"); + return this; + } + + /** + * Provides a way to configure the {@code NettyChannelBuilder} used for testing. + */ + public final NettyGrpcServerRule configureChannelBuilder(Consumer configureChannelBuilder) { + checkState(port == 0, "configureChannelBuilder() can only be called at the rule instantiation"); + this.configureChannelBuilder = checkNotNull(configureChannelBuilder, "configureChannelBuilder"); + return this; + } + + /** + * Returns a {@link ManagedChannel} connected to this service. + */ + public final ManagedChannel getChannel() { + return channel; + } + + /** + * Returns the underlying gRPC {@link Server} for this service. + */ + public final Server getServer() { + return server; + } + + /** + * Returns the randomly generated TCP port for this service. + */ + public final int getPort() { + return port; + } + + /** + * Returns the service registry for this service. The registry is used to add service instances + * (e.g. {@link io.grpc.BindableService} or {@link io.grpc.ServerServiceDefinition} to the server. + */ + public final MutableHandlerRegistry getServiceRegistry() { + return serviceRegistry; + } + + /** + * Before the test has started, create the server and channel. + */ + @Override + protected void before() throws Throwable { + serviceRegistry = new MutableHandlerRegistry(); + + NettyServerBuilder serverBuilder = NettyServerBuilder + .forPort(0) + .fallbackHandlerRegistry(serviceRegistry); + + if (useDirectExecutor) { + serverBuilder.directExecutor(); + } + + configureServerBuilder.accept(serverBuilder); + server = serverBuilder.build().start(); + port = server.getPort(); + + NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress("localhost", port).usePlaintext(true); + configureChannelBuilder.accept(channelBuilder); + channel = channelBuilder.build(); + } + + /** + * After the test has completed, clean up the channel and server. + */ + @Override + protected void after() { + serviceRegistry = null; + + channel.shutdown(); + channel = null; + port = 0; + + try { + Servers.shutdownGracefully(server, 1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } finally { + server = null; + } + } +} diff --git a/grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/GrpcContextRuleTest.java b/grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/GrpcContextRuleTest.java new file mode 100644 index 00000000..3d0352d3 --- /dev/null +++ b/grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/GrpcContextRuleTest.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2018, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.testing.contrib; + +import io.grpc.Context; +import org.junit.Test; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.fail; + +public class GrpcContextRuleTest { + @Test + public void ruleSetsContextToRoot() { + Context.current().withValue(Context.key("foo"), "bar").run(() -> { + assertThat(Context.current()).isNotEqualTo(Context.ROOT); + + try { + GrpcContextRule rule = new GrpcContextRule(); + rule.apply(new Statement() { + @Override + public void evaluate() { + assertThat(Context.current()).isEqualTo(Context.ROOT); + } + }, Description.createTestDescription(GrpcContextRuleTest.class, "ruleSetsContextToRoot")) + .evaluate(); + } catch (Throwable throwable) { + fail(throwable.getMessage()); + } + }); + } + + @Test + public void ruleFailsIfContextLeaks() { + Context.current().withValue(Context.key("foo"), "bar").run(() -> { + assertThat(Context.current()).isNotEqualTo(Context.ROOT); + + assertThatThrownBy(() -> { + GrpcContextRule rule = new GrpcContextRule(); + rule.apply(new Statement() { + @Override + public void evaluate() { + // Leak context + Context.current().withValue(Context.key("cheese"), "baz").attach(); + } + }, Description.createTestDescription(GrpcContextRuleTest.class, "ruleSetsContextToRoot")) + .evaluate(); + }).isInstanceOf(AssertionError.class).hasMessageContaining("Test is leaking context"); + }); + } +} diff --git a/grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRuleTest.java b/grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRuleTest.java new file mode 100644 index 00000000..3c180016 --- /dev/null +++ b/grpc-testing-contrib/src/test/java/com/salesforce/grpc/testing/contrib/NettyGrpcServerRuleTest.java @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2018, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.testing.contrib; + +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.stub.StreamObserver; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runners.model.Statement; +import io.grpc.testing.protobuf.SimpleRequest; +import io.grpc.testing.protobuf.SimpleResponse; +import io.grpc.testing.protobuf.SimpleServiceGrpc; + +import java.util.Collection; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.assertj.core.api.Assertions.assertThat; + +@SuppressWarnings("Duplicates") +public class NettyGrpcServerRuleTest { + @Rule public final NettyGrpcServerRule grpcServerRule = new NettyGrpcServerRule(); + + @Test + public void serverAndChannelAreStarted_withoutDirectExecutor() { + assertThat(grpcServerRule.getServer().isShutdown()).isFalse(); + assertThat(grpcServerRule.getServer().isTerminated()).isFalse(); + + assertThat(grpcServerRule.getChannel().isShutdown()).isFalse(); + assertThat(grpcServerRule.getChannel().isTerminated()).isFalse(); + + assertThat(grpcServerRule.getPort()).isNotZero(); + assertThat(grpcServerRule.getServiceRegistry()).isNotNull(); + } + + @Test + public void serverAllowsServicesToBeAddedViaServiceRegistry_withoutDirectExecutor() { + TestServiceImpl testService = new TestServiceImpl(); + + grpcServerRule.getServiceRegistry().addService(testService); + + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub(grpcServerRule.getChannel()); + + SimpleRequest request1 = SimpleRequest.getDefaultInstance(); + + SimpleRequest request2 = SimpleRequest.newBuilder().build(); + + stub.unaryRpc(request1); + stub.unaryRpc(request2); + + assertThat(testService.unaryCallRequests).containsExactly(request1, request2); + } + + @Test + public void serviceIsNotRunOnSameThreadAsTest_withoutDirectExecutor() { + TestServiceImpl testService = new TestServiceImpl(); + + grpcServerRule.getServiceRegistry().addService(testService); + + SimpleServiceGrpc.SimpleServiceBlockingStub stub = + SimpleServiceGrpc.newBlockingStub(grpcServerRule.getChannel()); + + stub.serverStreamingRpc(SimpleRequest.getDefaultInstance()); + + assertThat(testService.lastServerStreamingRpcThread).isNotEqualTo(Thread.currentThread()); + } + + @Test + public void serverAndChannelAreShutdownAfterRule() throws Throwable { + NettyGrpcServerRule grpcServerRule = new NettyGrpcServerRule(); + + // Before the rule has been executed, all of its resources should be null. + assertThat(grpcServerRule.getChannel()).isNull(); + assertThat(grpcServerRule.getServer()).isNull(); + assertThat(grpcServerRule.getPort()).isZero(); + assertThat(grpcServerRule.getServiceRegistry()).isNull(); + + // The TestStatement stores the channel and server instances so that we can inspect them after + // the rule cleans up. + TestStatement statement = new TestStatement(grpcServerRule); + + grpcServerRule.apply(statement, null).evaluate(); + + // Ensure that the stored channel and server instances were shut down. + assertThat(statement.channel.isShutdown()).isTrue(); + assertThat(statement.server.isShutdown()).isTrue(); + + // All references to the resources that we created should be set to null. + assertThat(grpcServerRule.getChannel()).isNull(); + assertThat(grpcServerRule.getServer()).isNull(); + assertThat(grpcServerRule.getPort()).isZero(); + assertThat(grpcServerRule.getServiceRegistry()).isNull(); + } + + private static class TestStatement extends Statement { + + private final NettyGrpcServerRule grpcServerRule; + + private ManagedChannel channel; + private Server server; + + private TestStatement(NettyGrpcServerRule grpcServerRule) { + this.grpcServerRule = grpcServerRule; + } + + @Override + public void evaluate() throws Throwable { + channel = grpcServerRule.getChannel(); + server = grpcServerRule.getServer(); + } + } + + private static class TestServiceImpl extends SimpleServiceGrpc.SimpleServiceImplBase { + + private final Collection unaryCallRequests = + new ConcurrentLinkedQueue(); + + private volatile Thread lastServerStreamingRpcThread; + + @Override + public void serverStreamingRpc( + SimpleRequest request, StreamObserver responseObserver) { + + lastServerStreamingRpcThread = Thread.currentThread(); + + responseObserver.onNext(SimpleResponse.getDefaultInstance()); + + responseObserver.onCompleted(); + } + + @Override + public void unaryRpc( + SimpleRequest request, StreamObserver responseObserver) { + + unaryCallRequests.add(request); + + responseObserver.onNext(SimpleResponse.getDefaultInstance()); + + responseObserver.onCompleted(); + } + } +} diff --git a/pom.xml b/pom.xml index 4a025f18..9a666d06 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ grpc-spring jprotoc-test demos/grpc-java-contrib-demo + grpc-testing-contrib @@ -107,16 +108,6 @@ grpc-spring ${project.version} - - com.salesforce.servicelibs - rxgrpc - ${project.version} - - - com.salesforce.servicelibs - rxgrpc-stub - ${project.version} - com.salesforce.servicelibs jprotoc @@ -182,6 +173,12 @@ ${grpc.version} test + + io.grpc + grpc-testing-proto + ${grpc.version} + test + junit junit From b4247b97dff53bf80899f057a7ce7879cb4870fa Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Thu, 11 Jan 2018 14:39:22 -0800 Subject: [PATCH 2/2] Checkstyle --- .../test/java/com/salesforce/grpc/contrib/ServersTest.java | 4 ++-- .../com/salesforce/grpc/testing/contrib/GrpcContextRule.java | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/ServersTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/ServersTest.java index c63f49e7..26a4fa66 100644 --- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/ServersTest.java +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/ServersTest.java @@ -34,7 +34,7 @@ public void shutdownGracefullyThrowsIfMaxWaitTimeInMillisIsZero() { assertThatThrownBy(() -> Servers.shutdownGracefully(server, 0)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("maxWaitTimeInMillis"); + .hasMessageContaining("timeout must be greater than 0"); } @Test @@ -44,7 +44,7 @@ public void shutdownGracefullyThrowsIfMaxWaitTimeInMillisIsLessThanZero() { assertThatThrownBy(() -> Servers.shutdownGracefully(server, maxWaitTimeInMillis)) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("maxWaitTimeInMillis"); + .hasMessageContaining("timeout must be greater than 0"); } @Test diff --git a/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/GrpcContextRule.java b/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/GrpcContextRule.java index ca6a5e72..a9652ba5 100644 --- a/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/GrpcContextRule.java +++ b/grpc-testing-contrib/src/main/java/com/salesforce/grpc/testing/contrib/GrpcContextRule.java @@ -13,8 +13,6 @@ import org.junit.runner.Description; import org.junit.runners.model.Statement; -import java.util.logging.Logger; - /** * {@code GrpcContextRule} is a JUnit {@link TestRule} that forcibly resets the gRPC * {@link Context} to {@link Context#ROOT} between every unit test.