From c86758718b38d966a336e5357b670cf589f7598b Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Sun, 1 Oct 2017 21:35:51 -0700 Subject: [PATCH 01/16] WIP --- .../grpc/contrib/AmbientContext.java | 18 +++ .../AmbientContextInterceptor.java | 113 ++++++++++++++++++ .../DefaultDeadlineInterceptor.java | 32 +++-- .../grpc/contrib/AmbientContextTest.java | 11 ++ .../DefaultDeadlineInterceptorTest.java | 7 +- 5 files changed, 171 insertions(+), 10 deletions(-) create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/AmbientContext.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/AmbientContextInterceptor.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/AmbientContextTest.java diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/AmbientContext.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/AmbientContext.java new file mode 100644 index 00000000..946a06ff --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/AmbientContext.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib; + +import io.grpc.*; + +public class AmbientContext { + public static final Context.Key KEY = Context.key("AmbientContext"); + + public static Metadata get() { + return KEY.get(); + } +} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/AmbientContextInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/AmbientContextInterceptor.java new file mode 100644 index 00000000..49984f17 --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/AmbientContextInterceptor.java @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.interceptor; + +import com.salesforce.grpc.contrib.AmbientContext; +import io.grpc.*; + +public class AmbientContextInterceptor { + public static class Client implements ClientInterceptor { + private String headerPrefix; + + public Client(String headerPrefix) { + this.headerPrefix = headerPrefix; + } + + public String getHeaderPrefix() { + return headerPrefix; + } + + public void setHeaderPrefix(String headerPrefix) { + this.headerPrefix = headerPrefix; + } + + @Override + public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + Metadata ctx = AmbientContext.get(); + if (ctx != null) { + for (String keyString : ctx.keys()) { + if (keyString.startsWith(headerPrefix)) { + Metadata.Key key = Metadata.Key.of(keyString, Metadata.ASCII_STRING_MARSHALLER); + String value = ctx.get(key); + if (value != null) { + headers.put(key, value); + } + } + } + } + super.start(responseListener, headers); + } + }; + } + } + + public static class Server implements ServerInterceptor { + private String headerPrefix; + + public Server(String headerPrefix) { + this.headerPrefix = headerPrefix; + } + + public String getHeaderPrefix() { + return headerPrefix; + } + + public void setHeaderPrefix(String headerPrefix) { + this.headerPrefix = headerPrefix; + } + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { + final Metadata ctx = new Metadata(); + for (String keyName : headers.keys()) { + if (!keyName.startsWith(headerPrefix)) { + continue; + } + + Metadata.Key key = Metadata.Key.of(keyName, Metadata.ASCII_STRING_MARSHALLER); + String value = headers.get(key); + if (value == null) { + continue; + } + + ctx.put(key, value); + } + + final ServerCall.Listener listener = next.startCall(call, headers); + return new ServerCall.Listener() { + @Override + public void onMessage(ReqT message) { + Context.current().withValue(AmbientContext.KEY, ctx).run(() -> listener.onMessage(message)); + } + + @Override + public void onHalfClose() { + Context.current().withValue(AmbientContext.KEY, ctx).run(listener::onHalfClose); + } + + @Override + public void onCancel() { + Context.current().withValue(AmbientContext.KEY, ctx).run(listener::onCancel); + } + + @Override + public void onComplete() { + Context.current().withValue(AmbientContext.KEY, ctx).run(listener::onComplete); + } + + @Override + public void onReady() { + Context.current().withValue(AmbientContext.KEY, ctx).run(listener::onReady); + } + }; + } + } +} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptor.java index ebcbde8a..bc8c6c88 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptor.java @@ -10,6 +10,7 @@ import com.google.common.base.Preconditions; import io.grpc.*; +import java.time.Duration; import java.util.concurrent.TimeUnit; /** @@ -18,21 +19,38 @@ * implicit deadline will be used instead. */ public class DefaultDeadlineInterceptor implements ClientInterceptor { - private final long duration; - private final TimeUnit timeUnit; + private Duration duration; - public DefaultDeadlineInterceptor(long duration, TimeUnit timeUnit) { - Preconditions.checkArgument(duration > 0, "duration must be greater than zero"); - Preconditions.checkNotNull(timeUnit, "timeUnit"); + public DefaultDeadlineInterceptor(Duration duration) { + Preconditions.checkNotNull(duration, "duration"); + Preconditions.checkArgument(!duration.isNegative(), "duration must be greater than zero"); this.duration = duration; - this.timeUnit = timeUnit; + } + + /** + * Get the current default deadline duration + * + * @return the current default deadline duration + */ + public Duration getDuration() { + return duration; + } + + /** + * Set a new default deadline duration + * + * @param duration the new default deadline duration + */ + public void setDuration(Duration duration) { + this.duration = duration; } @Override public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + // Only add a deadline if no other deadline has been set. if (callOptions.getDeadline() == null && Context.current().getDeadline() == null) { - callOptions = callOptions.withDeadlineAfter(duration, timeUnit); + callOptions = callOptions.withDeadlineAfter(duration.toMillis(), TimeUnit.MILLISECONDS); } return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/AmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/AmbientContextTest.java new file mode 100644 index 00000000..7b5ecd25 --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/AmbientContextTest.java @@ -0,0 +1,11 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib; + +public class AmbientContextTest { +} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptorTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptorTest.java index 87c89595..c77b2833 100644 --- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptorTest.java +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptorTest.java @@ -10,6 +10,7 @@ import io.grpc.*; import org.junit.Test; +import java.time.Duration; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -22,7 +23,7 @@ public class DefaultDeadlineInterceptorTest { public void interceptorShouldAddDeadlineWhenAbsent() { AtomicBoolean called = new AtomicBoolean(false); - DefaultDeadlineInterceptor interceptor = new DefaultDeadlineInterceptor(1, TimeUnit.HOURS); + DefaultDeadlineInterceptor interceptor = new DefaultDeadlineInterceptor(Duration.ofHours(1)); interceptor.interceptCall(null, CallOptions.DEFAULT, new Channel() { @Override @@ -45,7 +46,7 @@ public String authority() { public void interceptorShouldNotModifyExplicitDeadline() { AtomicBoolean called = new AtomicBoolean(false); - DefaultDeadlineInterceptor interceptor = new DefaultDeadlineInterceptor(1, TimeUnit.HOURS); + DefaultDeadlineInterceptor interceptor = new DefaultDeadlineInterceptor(Duration.ofHours(1)); interceptor.interceptCall(null, CallOptions.DEFAULT.withDeadlineAfter(10, TimeUnit.HOURS), new Channel() { @Override @@ -68,7 +69,7 @@ public String authority() { public void interceptorShouldNotModifyContextDeadline() throws Exception { AtomicBoolean called = new AtomicBoolean(false); - DefaultDeadlineInterceptor interceptor = new DefaultDeadlineInterceptor(1, TimeUnit.HOURS); + DefaultDeadlineInterceptor interceptor = new DefaultDeadlineInterceptor(Duration.ofHours(1)); Context.current().withDeadlineAfter(10, TimeUnit.HOURS, Executors.newSingleThreadScheduledExecutor()).run(() -> { interceptor.interceptCall(null, CallOptions.DEFAULT, new Channel() { From d4169b0cd719d1da639bbead22f37e1ba7f6a414 Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Mon, 2 Oct 2017 13:32:55 -0700 Subject: [PATCH 02/16] Basic AmbientContext --- grpc-contrib/pom.xml | 5 + .../grpc/contrib/AmbientContext.java | 18 -- .../salesforce/grpc/contrib/MoreFutures.java | 2 +- .../grpc/contrib/context/AmbientContext.java | 46 +++++ .../AmbientContextClientInterceptor.java | 56 +++++ .../AmbientContextServerInterceptor.java | 46 +++++ .../AmbientContextInterceptor.java | 113 ----------- .../DefaultDeadlineInterceptor.java | 4 +- .../grpc/contrib/AmbientContextTest.java | 191 ++++++++++++++++++ grpc-contrib/src/test/proto/helloworld.proto | 2 +- .../UnexpectedServerErrorIntegrationTest.java | 2 +- 11 files changed, 349 insertions(+), 136 deletions(-) delete mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/AmbientContext.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java delete mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/AmbientContextInterceptor.java diff --git a/grpc-contrib/pom.xml b/grpc-contrib/pom.xml index 6587f933..6545af7d 100644 --- a/grpc-contrib/pom.xml +++ b/grpc-contrib/pom.xml @@ -60,6 +60,11 @@ mockito-core test + + io.grpc + grpc-netty + test + diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/AmbientContext.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/AmbientContext.java deleted file mode 100644 index 946a06ff..00000000 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/AmbientContext.java +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright (c) 2017, salesforce.com, inc. - * All rights reserved. - * Licensed under the BSD 3-Clause license. - * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ - -package com.salesforce.grpc.contrib; - -import io.grpc.*; - -public class AmbientContext { - public static final Context.Key KEY = Context.key("AmbientContext"); - - public static Metadata get() { - return KEY.get(); - } -} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreFutures.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreFutures.java index bbc67a63..28c856cb 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreFutures.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreFutures.java @@ -202,7 +202,7 @@ public boolean completeExceptionally(Throwable ex) { } catch (RuntimeException | Error ex) { // the ListenableFuture failed with a REALLY BAD exception completableFuture.completeExceptionally(ex); } catch (InterruptedException ex) { - completableFuture.completeExceptionally(ex); // Won't happen since get() only called after completion + completableFuture.completeExceptionally(ex); // Won't happen since current() only called after completion } }; listenableFuture.addListener(callbackRunnable, MoreExecutors.directExecutor()); diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java new file mode 100644 index 00000000..64e89f4b --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.*; + +/** + * TODO. + */ +public final class AmbientContext { + private AmbientContext() { } + + static final Context.Key KEY = Context.key("AmbientContext"); + + /** + * TODO. + * @return + */ + public static Context initialize(Context context) { + return context.withValue(KEY, new Metadata()); + } + + /** + * TODO. + * @return + */ + public static Metadata current() { + return current(Context.current()); + } + + @VisibleForTesting + static Metadata current(Context context) { + Metadata current = KEY.get(context); + if (current == null) { + throw new IllegalStateException("AmbientContext has not yet been created in the scope of the current context"); + } else { + return KEY.get(); + } + } +} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java new file mode 100644 index 00000000..8e1907d6 --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.*; + +public class AmbientContextClientInterceptor implements ClientInterceptor { + private String headerPrefix; + + public AmbientContextClientInterceptor(String headerPrefix) { + this.headerPrefix = headerPrefix; + } + + public String getHeaderPrefix() { + return headerPrefix; + } + + public void setHeaderPrefix(String headerPrefix) { + this.headerPrefix = headerPrefix; + } + + @Override + public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + Metadata ctx = AmbientContext.current(); + if (ctx != null) { + for (String keyString : ctx.keys()) { + if (keyString.startsWith(headerPrefix)) { + if (keyString.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + Metadata.Key key = Metadata.Key.of(keyString, Metadata.BINARY_BYTE_MARSHALLER); + byte[] value = ctx.get(key); + if (value != null) { + headers.put(key, value); + } + } else { + Metadata.Key key = Metadata.Key.of(keyString, Metadata.ASCII_STRING_MARSHALLER); + String value = ctx.get(key); + if (value != null) { + headers.put(key, value); + } + } + } + } + } + super.start(responseListener, headers); + } + }; + } +} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java new file mode 100644 index 00000000..155e0371 --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.*; + +public class AmbientContextServerInterceptor implements ServerInterceptor { + private String headerPrefix; + + public AmbientContextServerInterceptor(String headerPrefix) { + this.headerPrefix = headerPrefix; + } + + public String getHeaderPrefix() { + return headerPrefix; + } + + public void setHeaderPrefix(String headerPrefix) { + this.headerPrefix = headerPrefix; + } + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { + final Metadata ctx = new Metadata(); + for (String keyName : headers.keys()) { + if (!keyName.startsWith(headerPrefix)) { + continue; + } + + Metadata.Key key = Metadata.Key.of(keyName, Metadata.ASCII_STRING_MARSHALLER); + String value = headers.get(key); + if (value == null) { + continue; + } + + ctx.put(key, value); + } + + return Contexts.interceptCall(Context.current().withValue(AmbientContext.KEY, ctx), call, headers, next); + } +} \ No newline at end of file diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/AmbientContextInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/AmbientContextInterceptor.java deleted file mode 100644 index 49984f17..00000000 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/AmbientContextInterceptor.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (c) 2017, salesforce.com, inc. - * All rights reserved. - * Licensed under the BSD 3-Clause license. - * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ - -package com.salesforce.grpc.contrib.interceptor; - -import com.salesforce.grpc.contrib.AmbientContext; -import io.grpc.*; - -public class AmbientContextInterceptor { - public static class Client implements ClientInterceptor { - private String headerPrefix; - - public Client(String headerPrefix) { - this.headerPrefix = headerPrefix; - } - - public String getHeaderPrefix() { - return headerPrefix; - } - - public void setHeaderPrefix(String headerPrefix) { - this.headerPrefix = headerPrefix; - } - - @Override - public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { - return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { - @Override - public void start(Listener responseListener, Metadata headers) { - Metadata ctx = AmbientContext.get(); - if (ctx != null) { - for (String keyString : ctx.keys()) { - if (keyString.startsWith(headerPrefix)) { - Metadata.Key key = Metadata.Key.of(keyString, Metadata.ASCII_STRING_MARSHALLER); - String value = ctx.get(key); - if (value != null) { - headers.put(key, value); - } - } - } - } - super.start(responseListener, headers); - } - }; - } - } - - public static class Server implements ServerInterceptor { - private String headerPrefix; - - public Server(String headerPrefix) { - this.headerPrefix = headerPrefix; - } - - public String getHeaderPrefix() { - return headerPrefix; - } - - public void setHeaderPrefix(String headerPrefix) { - this.headerPrefix = headerPrefix; - } - - @Override - public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { - final Metadata ctx = new Metadata(); - for (String keyName : headers.keys()) { - if (!keyName.startsWith(headerPrefix)) { - continue; - } - - Metadata.Key key = Metadata.Key.of(keyName, Metadata.ASCII_STRING_MARSHALLER); - String value = headers.get(key); - if (value == null) { - continue; - } - - ctx.put(key, value); - } - - final ServerCall.Listener listener = next.startCall(call, headers); - return new ServerCall.Listener() { - @Override - public void onMessage(ReqT message) { - Context.current().withValue(AmbientContext.KEY, ctx).run(() -> listener.onMessage(message)); - } - - @Override - public void onHalfClose() { - Context.current().withValue(AmbientContext.KEY, ctx).run(listener::onHalfClose); - } - - @Override - public void onCancel() { - Context.current().withValue(AmbientContext.KEY, ctx).run(listener::onCancel); - } - - @Override - public void onComplete() { - Context.current().withValue(AmbientContext.KEY, ctx).run(listener::onComplete); - } - - @Override - public void onReady() { - Context.current().withValue(AmbientContext.KEY, ctx).run(listener::onReady); - } - }; - } - } -} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptor.java index bc8c6c88..80e5c221 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptor.java @@ -29,7 +29,7 @@ public DefaultDeadlineInterceptor(Duration duration) { } /** - * Get the current default deadline duration + * Get the current default deadline duration. * * @return the current default deadline duration */ @@ -38,7 +38,7 @@ public Duration getDuration() { } /** - * Set a new default deadline duration + * Set a new default deadline duration. * * @param duration the new default deadline duration */ diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/AmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/AmbientContextTest.java index 7b5ecd25..6b5ec1b8 100644 --- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/AmbientContextTest.java +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/AmbientContextTest.java @@ -7,5 +7,196 @@ package com.salesforce.grpc.contrib; +import com.google.common.util.concurrent.ListenableFuture; +import com.salesforce.grpc.contrib.context.AmbientContext; +import com.salesforce.grpc.contrib.context.AmbientContextClientInterceptor; +import com.salesforce.grpc.contrib.context.AmbientContextServerInterceptor; +import io.grpc.*; +import io.grpc.stub.StreamObserver; +import org.awaitility.Duration; +import org.junit.Test; + +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; + +@SuppressWarnings("Duplicates") public class AmbientContextTest { + @Test + public void initializeAttachesContext() { + Context ctx = AmbientContext.initialize(Context.current()); + ctx.run(() -> assertThat(AmbientContext.current()).isNotNull()); + } + + @Test + public void uninitializedContextThrows() { + assertThatThrownBy(AmbientContext::current).isInstanceOf(IllegalStateException.class); + } + + @Test + public void contextTransfersOneHopSync() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue = "context-value"; + AtomicReference ctxValue = new AtomicReference<>(); + + // Service + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Plumbing + Server server = ServerBuilder + .forPort(0) + .addService(svc) + .intercept(new AmbientContextServerInterceptor("ctx-")) + .build() + .start(); + ManagedChannel channel = ManagedChannelBuilder + .forAddress("localhost", server.getPort()) + .usePlaintext(true) + .intercept(new AmbientContextClientInterceptor("ctx-")) + .build(); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel); + + // Test + try { + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + stub.sayHello(HelloRequest.newBuilder().setName("world").build()); + }); + + assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); + } finally { + channel.shutdownNow(); + server.shutdownNow(); + } + } + + @Test + public void contextTransfersTwoHopSync() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue = "context-value"; + AtomicReference ctxValue = new AtomicReference<>(); + + // Terminal service + GreeterGrpc.GreeterImplBase svc2 = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Terminal service plumbing + Server server2 = ServerBuilder + .forPort(0) + .addService(svc2) + .intercept(new AmbientContextServerInterceptor("ctx-")) + .build() + .start(); + ManagedChannel channel2 = ManagedChannelBuilder + .forAddress("localhost", server2.getPort()) + .usePlaintext(true) + .intercept(new AmbientContextClientInterceptor("ctx-")) + .build(); + GreeterGrpc.GreeterBlockingStub stub2 = GreeterGrpc.newBlockingStub(channel2); + + // Relay service + GreeterGrpc.GreeterImplBase svc1 = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + responseObserver.onNext(stub2.sayHello(request)); + responseObserver.onCompleted(); + } + }; + + // Relay service plumbing + Server server1 = ServerBuilder + .forPort(0) + .addService(svc2) + .intercept(new AmbientContextServerInterceptor("ctx-")) + .build() + .start(); + ManagedChannel channel1 = ManagedChannelBuilder + .forAddress("localhost", server1.getPort()) + .usePlaintext(true) + .intercept(new AmbientContextClientInterceptor("ctx-")) + .build(); + GreeterGrpc.GreeterBlockingStub stub1 = GreeterGrpc.newBlockingStub(channel2); + + // Test + try { + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + stub1.sayHello(HelloRequest.newBuilder().setName("world").build()); + }); + + assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); + } finally { + channel2.shutdownNow(); + server2.shutdownNow(); + channel1.shutdownNow(); + server1.shutdownNow(); + } + } + + @Test + public void contextTransfersOneHopAsync() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue = "context-value"; + AtomicReference ctxValue = new AtomicReference<>(); + + // Service + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Plumbing + Server server = ServerBuilder + .forPort(0) + .addService(svc) + .intercept(new AmbientContextServerInterceptor("ctx-")) + .build() + .start(); + ManagedChannel channel = ManagedChannelBuilder + .forAddress("localhost", server.getPort()) + .usePlaintext(true) + .intercept(new AmbientContextClientInterceptor("ctx-")) + .build(); + GreeterGrpc.GreeterFutureStub stub = GreeterGrpc.newFutureStub(channel); + + // Test + try { + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + ListenableFuture futureResponse = stub.sayHello(HelloRequest.newBuilder().setName("world").build()); + + // Verify response callbacks still have context + MoreFutures.onSuccess(futureResponse, response -> { + assertThat(AmbientContext.current().get(ctxKey)).isEqualTo(expectedCtxValue); + }, Context.currentContextExecutor(Executors.newSingleThreadExecutor())); + + await().atMost(Duration.ONE_SECOND).until(futureResponse::isDone); + }); + + assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); + } finally { + channel.shutdownNow(); + server.shutdownNow(); + } + } } diff --git a/grpc-contrib/src/test/proto/helloworld.proto b/grpc-contrib/src/test/proto/helloworld.proto index 1c441cd7..d1abddce 100644 --- a/grpc-contrib/src/test/proto/helloworld.proto +++ b/grpc-contrib/src/test/proto/helloworld.proto @@ -6,7 +6,7 @@ import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; option java_multiple_files = true; -option java_package = "com.salesforce.jprotoc"; +option java_package = "com.salesforce.grpc.contrib"; option java_outer_classname = "HelloWorldProto"; // The greeting service definition. diff --git a/rxgrpc/rxgrpc-test/src/test/java/com/salesforce/rxgrpc/UnexpectedServerErrorIntegrationTest.java b/rxgrpc/rxgrpc-test/src/test/java/com/salesforce/rxgrpc/UnexpectedServerErrorIntegrationTest.java index 59e8770b..2b523df9 100644 --- a/rxgrpc/rxgrpc-test/src/test/java/com/salesforce/rxgrpc/UnexpectedServerErrorIntegrationTest.java +++ b/rxgrpc/rxgrpc-test/src/test/java/com/salesforce/rxgrpc/UnexpectedServerErrorIntegrationTest.java @@ -100,7 +100,7 @@ public void manyToOne() { test.awaitTerminalEvent(3, TimeUnit.SECONDS); test.assertError(t -> t instanceof StatusRuntimeException); - // Flowable requests get canceled when unexpected errors happen + // Flowable requests current canceled when unexpected errors happen test.assertError(t -> ((StatusRuntimeException)t).getStatus().getCode() == Status.Code.CANCELLED); } From 6fd9fdaa8c0438577a77506e9c2794fe6461bdde Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Sat, 7 Oct 2017 19:17:06 -0500 Subject: [PATCH 03/16] More documentation and test refactoring --- .../grpc/contrib/context/AmbientContext.java | 26 ++- .../AmbientContextClientInterceptor.java | 4 + .../AmbientContextServerInterceptor.java | 4 +- .../grpc/contrib/context/package-info.java | 34 +++ .../grpc/contrib/AmbientContextTest.java | 202 ----------------- .../contrib/context/AmbientContextTest.java | 62 ++++++ .../context/AmbientContextTransferTest.java | 209 ++++++++++++++++++ .../context/BinaryAmbientContextTest.java | 19 ++ 8 files changed, 347 insertions(+), 213 deletions(-) create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java delete mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/AmbientContextTest.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java index 64e89f4b..1ca8af2c 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java @@ -8,6 +8,7 @@ package com.salesforce.grpc.contrib.context; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import io.grpc.*; /** @@ -19,16 +20,22 @@ private AmbientContext() { } static final Context.Key KEY = Context.key("AmbientContext"); /** - * TODO. - * @return + * Attaches an empty ambient context to the provided gRPC {@code Context}. + * + * @throws IllegalStateException if an ambient context has already been attached to the + * provided gRPC {@code Context}. */ public static Context initialize(Context context) { + Preconditions.checkNotNull(context, "context"); + Preconditions.checkState(KEY.get(context) == null, + "AmbientContext has already been created in the scope of the current context"); return context.withValue(KEY, new Metadata()); } /** - * TODO. - * @return + * Returns the ambient context attached to the current gRPC {@code Context}. + * + * @throws IllegalStateException if no ambient context is attached to the current gRPC {@code Context}. */ public static Metadata current() { return current(Context.current()); @@ -36,11 +43,10 @@ public static Metadata current() { @VisibleForTesting static Metadata current(Context context) { - Metadata current = KEY.get(context); - if (current == null) { - throw new IllegalStateException("AmbientContext has not yet been created in the scope of the current context"); - } else { - return KEY.get(); - } + Preconditions.checkNotNull(context, "context"); + Preconditions.checkState(KEY.get(context) != null, + "AmbientContext has not yet been created in the scope of the current context"); + + return KEY.get(context); } } diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java index 8e1907d6..8e2fcdb0 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java @@ -9,6 +9,10 @@ import io.grpc.*; +/** + * {@code AmbientContextClientInterceptor} transparently deserializes prefixed request headers into a well-known gRPC + * {@code Context} property. + */ public class AmbientContextClientInterceptor implements ClientInterceptor { private String headerPrefix; diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java index 155e0371..4e7b1453 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java @@ -26,7 +26,9 @@ public void setHeaderPrefix(String headerPrefix) { @Override public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { - final Metadata ctx = new Metadata(); + Metadata ctx = AmbientContext.KEY.get(); + // Only initialize ctx if not yet initialized + ctx = ctx != null ? ctx : new Metadata(); for (String keyName : headers.keys()) { if (!keyName.startsWith(headerPrefix)) { continue; diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java new file mode 100644 index 00000000..6221c7d8 --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +/** + * The classes in this package are used together to implement the Ambient Context pattern, where context values are + * transparently marshaled from service to service without user intervention. This pattern is particularly useful for + * transparently populating system information like Zipkin trace ids and log correlation ids on every service call in + * a call graph. + * + *

The Ambient Context pattern is implemented in gRPC using the + * {@link com.salesforce.grpc.contrib.context.AmbientContextServerInterceptor} and + * {@link com.salesforce.grpc.contrib.context.AmbientContextClientInterceptor}. Together these interceptors marshall + * context headers from the gRPC {@code Context} into outbound request {@code Metadata}, and from inbound request + * {@code Metadata} into the gRPC {@code Context}. + * + *

Within your service and infrastructure code, the {@link com.salesforce.grpc.contrib.context.AmbientContext} class + * provides utility methods for interacting with the ambient context. {@code AmbientContext}s API is similar to gRPC's + * {@code Metadata} API, with the addition of the {@code freeze()} and {@code thaw()} operations used to toggle + * the {@code AmbientContext}'s read-only status. + * + *

Freezing the ambient context is useful for protecting its contents when transitioning between platform + * infrastructure code and service implementation code. In many cases, platform infrastructure code and service + * implementations are written by different groups with different goals. Platform infrastructure implementors typically + * own inter-service functions like distributed tracing, log correlation, and service metrics, typically implemented + * using the gRPC interceptor chain. For these developers ambient context must be mutable. Service implementors, on + * the other hand, own the business logic for each service. For these developers it can make sense for the ambient + * context to be immutable to prevent unnecessary pollution of the ambient context. Freezing and thawing the context + * allows it to seamlessly transition between mutable and immutable modes as needed by each audience. + */ +package com.salesforce.grpc.contrib.context; \ No newline at end of file diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/AmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/AmbientContextTest.java deleted file mode 100644 index 6b5ec1b8..00000000 --- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/AmbientContextTest.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (c) 2017, salesforce.com, inc. - * All rights reserved. - * Licensed under the BSD 3-Clause license. - * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ - -package com.salesforce.grpc.contrib; - -import com.google.common.util.concurrent.ListenableFuture; -import com.salesforce.grpc.contrib.context.AmbientContext; -import com.salesforce.grpc.contrib.context.AmbientContextClientInterceptor; -import com.salesforce.grpc.contrib.context.AmbientContextServerInterceptor; -import io.grpc.*; -import io.grpc.stub.StreamObserver; -import org.awaitility.Duration; -import org.junit.Test; - -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.awaitility.Awaitility.await; - -@SuppressWarnings("Duplicates") -public class AmbientContextTest { - @Test - public void initializeAttachesContext() { - Context ctx = AmbientContext.initialize(Context.current()); - ctx.run(() -> assertThat(AmbientContext.current()).isNotNull()); - } - - @Test - public void uninitializedContextThrows() { - assertThatThrownBy(AmbientContext::current).isInstanceOf(IllegalStateException.class); - } - - @Test - public void contextTransfersOneHopSync() throws Exception { - Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); - String expectedCtxValue = "context-value"; - AtomicReference ctxValue = new AtomicReference<>(); - - // Service - GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { - @Override - public void sayHello(HelloRequest request, StreamObserver responseObserver) { - ctxValue.set(AmbientContext.current().get(ctxKey)); - responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); - responseObserver.onCompleted(); - } - }; - - // Plumbing - Server server = ServerBuilder - .forPort(0) - .addService(svc) - .intercept(new AmbientContextServerInterceptor("ctx-")) - .build() - .start(); - ManagedChannel channel = ManagedChannelBuilder - .forAddress("localhost", server.getPort()) - .usePlaintext(true) - .intercept(new AmbientContextClientInterceptor("ctx-")) - .build(); - GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel); - - // Test - try { - AmbientContext.initialize(Context.current()).run(() -> { - AmbientContext.current().put(ctxKey, expectedCtxValue); - stub.sayHello(HelloRequest.newBuilder().setName("world").build()); - }); - - assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); - } finally { - channel.shutdownNow(); - server.shutdownNow(); - } - } - - @Test - public void contextTransfersTwoHopSync() throws Exception { - Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); - String expectedCtxValue = "context-value"; - AtomicReference ctxValue = new AtomicReference<>(); - - // Terminal service - GreeterGrpc.GreeterImplBase svc2 = new GreeterGrpc.GreeterImplBase() { - @Override - public void sayHello(HelloRequest request, StreamObserver responseObserver) { - ctxValue.set(AmbientContext.current().get(ctxKey)); - responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); - responseObserver.onCompleted(); - } - }; - - // Terminal service plumbing - Server server2 = ServerBuilder - .forPort(0) - .addService(svc2) - .intercept(new AmbientContextServerInterceptor("ctx-")) - .build() - .start(); - ManagedChannel channel2 = ManagedChannelBuilder - .forAddress("localhost", server2.getPort()) - .usePlaintext(true) - .intercept(new AmbientContextClientInterceptor("ctx-")) - .build(); - GreeterGrpc.GreeterBlockingStub stub2 = GreeterGrpc.newBlockingStub(channel2); - - // Relay service - GreeterGrpc.GreeterImplBase svc1 = new GreeterGrpc.GreeterImplBase() { - @Override - public void sayHello(HelloRequest request, StreamObserver responseObserver) { - responseObserver.onNext(stub2.sayHello(request)); - responseObserver.onCompleted(); - } - }; - - // Relay service plumbing - Server server1 = ServerBuilder - .forPort(0) - .addService(svc2) - .intercept(new AmbientContextServerInterceptor("ctx-")) - .build() - .start(); - ManagedChannel channel1 = ManagedChannelBuilder - .forAddress("localhost", server1.getPort()) - .usePlaintext(true) - .intercept(new AmbientContextClientInterceptor("ctx-")) - .build(); - GreeterGrpc.GreeterBlockingStub stub1 = GreeterGrpc.newBlockingStub(channel2); - - // Test - try { - AmbientContext.initialize(Context.current()).run(() -> { - AmbientContext.current().put(ctxKey, expectedCtxValue); - stub1.sayHello(HelloRequest.newBuilder().setName("world").build()); - }); - - assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); - } finally { - channel2.shutdownNow(); - server2.shutdownNow(); - channel1.shutdownNow(); - server1.shutdownNow(); - } - } - - @Test - public void contextTransfersOneHopAsync() throws Exception { - Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); - String expectedCtxValue = "context-value"; - AtomicReference ctxValue = new AtomicReference<>(); - - // Service - GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { - @Override - public void sayHello(HelloRequest request, StreamObserver responseObserver) { - ctxValue.set(AmbientContext.current().get(ctxKey)); - responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); - responseObserver.onCompleted(); - } - }; - - // Plumbing - Server server = ServerBuilder - .forPort(0) - .addService(svc) - .intercept(new AmbientContextServerInterceptor("ctx-")) - .build() - .start(); - ManagedChannel channel = ManagedChannelBuilder - .forAddress("localhost", server.getPort()) - .usePlaintext(true) - .intercept(new AmbientContextClientInterceptor("ctx-")) - .build(); - GreeterGrpc.GreeterFutureStub stub = GreeterGrpc.newFutureStub(channel); - - // Test - try { - AmbientContext.initialize(Context.current()).run(() -> { - AmbientContext.current().put(ctxKey, expectedCtxValue); - ListenableFuture futureResponse = stub.sayHello(HelloRequest.newBuilder().setName("world").build()); - - // Verify response callbacks still have context - MoreFutures.onSuccess(futureResponse, response -> { - assertThat(AmbientContext.current().get(ctxKey)).isEqualTo(expectedCtxValue); - }, Context.currentContextExecutor(Executors.newSingleThreadExecutor())); - - await().atMost(Duration.ONE_SECOND).until(futureResponse::isDone); - }); - - assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); - } finally { - channel.shutdownNow(); - server.shutdownNow(); - } - } -} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java new file mode 100644 index 00000000..bfce4ac3 --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.Context; +import io.grpc.testing.GrpcServerRule; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.Assert.fail; + +@SuppressWarnings("Duplicates") +public class AmbientContextTest { + @Rule public GrpcServerRule serverRule1 = new GrpcServerRule(); + @Rule public GrpcServerRule serverRule2 = new GrpcServerRule(); + + @Before + public void setUp() throws Exception { + // Reset the gRPC context between test executions + Context.ROOT.attach(); + } + + @Test + public void initializeAttachesContext() { + Context ctx = AmbientContext.initialize(Context.current()); + ctx.run(() -> assertThat(AmbientContext.current()).isNotNull()); + } + + @Test + public void doubleInitializeThrows() { + Context ctx = AmbientContext.initialize(Context.current()); + assertThatThrownBy(() -> AmbientContext.initialize(ctx)).isInstanceOf(IllegalStateException.class); + } + + @Test + public void uninitializedContextThrows() { + assertThatThrownBy(AmbientContext::current).isInstanceOf(IllegalStateException.class); + } + + @Test + public void contextAccessMethods() { + fail("Not implemented"); + } + + @Test + public void contextFreezingWorks() { + fail("Not implemented"); + } + + @Test + public void contextThawingWorks() { + fail("Not implemented"); + } +} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java new file mode 100644 index 00000000..110693c4 --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import com.google.common.util.concurrent.ListenableFuture; +import com.salesforce.grpc.contrib.GreeterGrpc; +import com.salesforce.grpc.contrib.HelloRequest; +import com.salesforce.grpc.contrib.HelloResponse; +import com.salesforce.grpc.contrib.MoreFutures; +import io.grpc.Context; +import io.grpc.Metadata; +import io.grpc.ServerInterceptors; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import org.awaitility.Duration; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; + +public class AmbientContextTransferTest { + @Rule + public GrpcServerRule serverRule1 = new GrpcServerRule(); + @Rule public GrpcServerRule serverRule2 = new GrpcServerRule(); + + @Before + public void setUp() throws Exception { + // Reset the gRPC context between test executions + Context.ROOT.attach(); + } + + @Test + public void contextTransfersOneHopSync() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue = "context-value"; + AtomicReference ctxValue = new AtomicReference<>(); + + // Service + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Plumbing + serverRule1.getServiceRegistry().addService(ServerInterceptors + .intercept(svc, new AmbientContextServerInterceptor("ctx-"))); + + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule1.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + stub.sayHello(HelloRequest.newBuilder().setName("world").build()); + }); + + assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); + } + + @Test + public void multiValueContextTransfers() throws Exception { + fail("Not implemented"); + } + + @Test + public void contextTransfersTwoHopSync() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue = "context-value"; + AtomicReference ctxValue = new AtomicReference<>(); + + // Terminal service + GreeterGrpc.GreeterImplBase svc2 = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Terminal service plumbing + serverRule2.getServiceRegistry().addService(ServerInterceptors + .intercept(svc2, new AmbientContextServerInterceptor("ctx-"))); + GreeterGrpc.GreeterBlockingStub stub2 = GreeterGrpc + .newBlockingStub(serverRule2.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Relay service + GreeterGrpc.GreeterImplBase svc1 = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + responseObserver.onNext(stub2.sayHello(request)); + responseObserver.onCompleted(); + } + }; + + // Relay service plumbing + serverRule1.getServiceRegistry().addService(ServerInterceptors + .intercept(svc1, new AmbientContextServerInterceptor("ctx-"))); + GreeterGrpc.GreeterBlockingStub stub1 = GreeterGrpc + .newBlockingStub(serverRule1.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + stub1.sayHello(HelloRequest.newBuilder().setName("world").build()); + }); + + assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); + } + + @Test + public void contextTransfersOneHopAsync() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue = "context-value"; + AtomicReference ctxValue = new AtomicReference<>(); + + // Service + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Plumbing + serverRule1.getServiceRegistry().addService(ServerInterceptors + .intercept(svc, new AmbientContextServerInterceptor("ctx-"))); + GreeterGrpc.GreeterFutureStub stub = GreeterGrpc + .newFutureStub(serverRule1.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + ListenableFuture futureResponse = stub.sayHello(HelloRequest.newBuilder().setName("world").build()); + + // Verify response callbacks still have context + MoreFutures.onSuccess( + futureResponse, + response -> assertThat(AmbientContext.current().get(ctxKey)).isEqualTo(expectedCtxValue), + Context.currentContextExecutor(Executors.newSingleThreadExecutor())); + + await().atMost(Duration.ONE_SECOND).until(futureResponse::isDone); + }); + + assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); + } + + @Test + public void multipleContextTransfersOneHopSync() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + Metadata.Key l5dKey = Metadata.Key.of("l5d-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue = "context-value"; + AtomicReference ctxValue = new AtomicReference<>(); + AtomicReference l5dValue = new AtomicReference<>(); + + // Service + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + l5dValue.set(AmbientContext.current().get(l5dKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Plumbing + serverRule1.getServiceRegistry().addService(ServerInterceptors.intercept(svc, + new AmbientContextServerInterceptor("ctx-"), + new AmbientContextServerInterceptor("l5d-"))); + + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule1.getChannel()) + .withInterceptors( + new AmbientContextClientInterceptor("ctx-"), + new AmbientContextClientInterceptor("l5d-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + AmbientContext.current().put(l5dKey, expectedCtxValue); + stub.sayHello(HelloRequest.newBuilder().setName("world").build()); + }); + + assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); + assertThat(l5dValue.get()).isEqualTo(expectedCtxValue); + } +} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java new file mode 100644 index 00000000..31757f26 --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java @@ -0,0 +1,19 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import org.junit.Test; + +import static org.junit.Assert.fail; + +public class BinaryAmbientContextTest { + @Test + public void binaryContextValueTransfers() throws Exception { + fail("Not implemented"); + } +} From d10e3ffd01d0f14ed9212d285a8260d5f7d9e784 Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Sat, 7 Oct 2017 19:31:20 -0500 Subject: [PATCH 04/16] More documentation --- .../grpc/contrib/context/AmbientContext.java | 5 +++- .../AmbientContextClientInterceptor.java | 23 ++++++++++-------- .../AmbientContextServerInterceptor.java | 24 ++++++++++++------- 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java index 1ca8af2c..c02339e3 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java @@ -12,7 +12,10 @@ import io.grpc.*; /** - * TODO. + * {@code AmbientContext} is entry point for working with the ambient context managed by {@link AmbientContextClientInterceptor} + * and {@link AmbientContextServerInterceptor}. + * + * See package javadoc for more info. */ public final class AmbientContext { private AmbientContext() { } diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java index 8e2fcdb0..7151e14a 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java @@ -10,24 +10,27 @@ import io.grpc.*; /** - * {@code AmbientContextClientInterceptor} transparently deserializes prefixed request headers into a well-known gRPC - * {@code Context} property. + * {@code AmbientContextClientInterceptor} transparently deserializes prefixed request headers into an ambient context. + * Header values can be accessed using the {@link AmbientContext} class. + * + *

Each {@code AmbientContextClientInterceptor} marshals headers with a know prefix. If multiple prefixes are needed, + * add multiple {@code AmbientContextClientInterceptor} instances to the gRPC interceptor chain. + * + * See package javadoc for more info. */ public class AmbientContextClientInterceptor implements ClientInterceptor { private String headerPrefix; + /** + * Constructs an {@code AmbientContextClientInterceptor} that marshals request headers with a know prefix into the + * {@link AmbientContext}. + * + * @param headerPrefix the header prefix to marshal. + */ public AmbientContextClientInterceptor(String headerPrefix) { this.headerPrefix = headerPrefix; } - public String getHeaderPrefix() { - return headerPrefix; - } - - public void setHeaderPrefix(String headerPrefix) { - this.headerPrefix = headerPrefix; - } - @Override public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java index 4e7b1453..29b15904 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java @@ -9,21 +9,29 @@ import io.grpc.*; +/** + * {@code AmbientContextServerInterceptor} transparently serializes prefixed ambient context values into outbound request + * headers. + * + *

Each {@code AmbientContextServerInterceptor} marshals headers with a know prefix. If multiple prefixes are needed, + * add multiple {@code AmbientContextServerInterceptor} instances to the gRPC interceptor chain. + * + * See package javadoc for more info. + */ public class AmbientContextServerInterceptor implements ServerInterceptor { private String headerPrefix; + /** + * Constructs an {@code AmbientContextServerInterceptor} that marshals ambient context values with a know prefix + * into outbound request headers. + * {@link AmbientContext}. + * + * @param headerPrefix the header prefix to marshal. + */ public AmbientContextServerInterceptor(String headerPrefix) { this.headerPrefix = headerPrefix; } - public String getHeaderPrefix() { - return headerPrefix; - } - - public void setHeaderPrefix(String headerPrefix) { - this.headerPrefix = headerPrefix; - } - @Override public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { Metadata ctx = AmbientContext.KEY.get(); From b6d8ab9d4cd03230899af55a42b4b0588a95137b Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Sat, 7 Oct 2017 20:00:34 -0500 Subject: [PATCH 05/16] AmbientContext API --- .../grpc/contrib/context/AmbientContext.java | 115 ++++++++++++++++-- .../AmbientContextClientInterceptor.java | 2 +- .../AmbientContextServerInterceptor.java | 4 +- 3 files changed, 105 insertions(+), 16 deletions(-) diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java index c02339e3..91be4ff5 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java @@ -7,20 +7,25 @@ package com.salesforce.grpc.contrib.context; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import io.grpc.*; +import io.grpc.Context; +import io.grpc.Metadata; + +import javax.annotation.Nullable; +import java.util.Set; /** * {@code AmbientContext} is entry point for working with the ambient context managed by {@link AmbientContextClientInterceptor} - * and {@link AmbientContextServerInterceptor}. + * and {@link AmbientContextServerInterceptor}. The interface for this class is very similar to gRPC's {@code Metadata} + * class. * * See package javadoc for more info. */ public final class AmbientContext { private AmbientContext() { } - static final Context.Key KEY = Context.key("AmbientContext"); + static final Context.Key DATA_KEY = Context.key("AmbientContext"); + private static final AmbientContext instance = new AmbientContext(); /** * Attaches an empty ambient context to the provided gRPC {@code Context}. @@ -30,9 +35,9 @@ private AmbientContext() { } */ public static Context initialize(Context context) { Preconditions.checkNotNull(context, "context"); - Preconditions.checkState(KEY.get(context) == null, + Preconditions.checkState(DATA_KEY.get(context) == null, "AmbientContext has already been created in the scope of the current context"); - return context.withValue(KEY, new Metadata()); + return context.withValue(DATA_KEY, new Metadata()); } /** @@ -40,16 +45,100 @@ public static Context initialize(Context context) { * * @throws IllegalStateException if no ambient context is attached to the current gRPC {@code Context}. */ - public static Metadata current() { - return current(Context.current()); + public static AmbientContext current() { + internalCurrent(); + return instance; } - @VisibleForTesting - static Metadata current(Context context) { - Preconditions.checkNotNull(context, "context"); - Preconditions.checkState(KEY.get(context) != null, + private static Metadata internalCurrent() { + Preconditions.checkState(DATA_KEY.get() != null, "AmbientContext has not yet been created in the scope of the current context"); + return DATA_KEY.get(); + } + + + + /** + * Returns true if a value is defined for the given key. + * + *

This is done by linear search, so if it is followed by {@link #get} or {@link #getAll}, + * prefer calling them directly and checking the return value against {@code null}. + */ + public boolean containsKey(Metadata.Key key) { + return internalCurrent().containsKey(key); + } + + /** + * Remove all values for the given key without returning them. This is a minor performance + * optimization if you do not need the previous values. + */ + public void discardAll(Metadata.Key key) { + internalCurrent().discardAll(key); + } + + /** + * Returns the last metadata entry added with the name 'name' parsed as T. + * + * @return the parsed metadata entry or null if there are none. + */ + @Nullable + public T get(Metadata.Key key) { + return internalCurrent().get(key); + } + + /** + * Returns all the metadata entries named 'name', in the order they were received, parsed as T, or + * null if there are none. The iterator is not guaranteed to be "live." It may or may not be + * accurate if Metadata is mutated. + */ + @Nullable + public Iterable getAll(final Metadata.Key key) { + return internalCurrent().getAll(key); + } + + /** + * Returns set of all keys in store. + * + * @return unmodifiable Set of keys + */ + public Set keys() { + return internalCurrent().keys(); + } + + /** + * Adds the {@code key, value} pair. If {@code key} already has values, {@code value} is added to + * the end. Duplicate values for the same key are permitted. + * + * @throws NullPointerException if key or value is null + */ + public void put(Metadata.Key key, T value) { + internalCurrent().put(key, value); + } + + /** + * Removes the first occurrence of {@code value} for {@code key}. + * + * @param key key for value + * @param value value + * @return {@code true} if {@code value} removed; {@code false} if {@code value} was not present + * @throws NullPointerException if {@code key} or {@code value} is null + */ + public boolean remove(Metadata.Key key, T value) { + return internalCurrent().remove(key, value); + } + + /** Remove all values for the given key. If there were no values, {@code null} is returned. */ + public Iterable removeAll(Metadata.Key key) { + return internalCurrent().removeAll(key); + } - return KEY.get(context); + @Override + public String toString() { + Metadata ctx = DATA_KEY.get(); + if (ctx != null) { + return ctx.toString(); + } else { + return "[MISSING AMBIENT CONTEXT]"; + } } } diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java index 7151e14a..c154afe5 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java @@ -36,7 +36,7 @@ public ClientCall interceptCall(MethodDescriptor(next.newCall(method, callOptions)) { @Override public void start(Listener responseListener, Metadata headers) { - Metadata ctx = AmbientContext.current(); + AmbientContext ctx = AmbientContext.current(); if (ctx != null) { for (String keyString : ctx.keys()) { if (keyString.startsWith(headerPrefix)) { diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java index 29b15904..a65f163a 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java @@ -34,7 +34,7 @@ public AmbientContextServerInterceptor(String headerPrefix) { @Override public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { - Metadata ctx = AmbientContext.KEY.get(); + Metadata ctx = AmbientContext.DATA_KEY.get(); // Only initialize ctx if not yet initialized ctx = ctx != null ? ctx : new Metadata(); for (String keyName : headers.keys()) { @@ -51,6 +51,6 @@ public ServerCall.Listener interceptCall(ServerCall Date: Mon, 9 Oct 2017 13:05:23 -0700 Subject: [PATCH 06/16] WIP --- .../grpc/contrib/context/AmbientContext.java | 20 ++++++++++++++----- .../AmbientContextClientInterceptor.java | 2 +- .../AmbientContextServerInterceptor.java | 2 +- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java index 91be4ff5..019125f3 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java @@ -12,6 +12,7 @@ import io.grpc.Metadata; import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; import java.util.Set; /** @@ -19,12 +20,19 @@ * and {@link AmbientContextServerInterceptor}. The interface for this class is very similar to gRPC's {@code Metadata} * class. * - * See package javadoc for more info. + *

This class is not thread safe, implementations should ensure that ambient context reads and writes do + * not occur in multiple threads concurrently. + * + *

See package javadoc for more info. */ +@NotThreadSafe public final class AmbientContext { private AmbientContext() { } static final Context.Key DATA_KEY = Context.key("AmbientContext"); + private static final Metadata.Key FREEZE_KEY = + Metadata.Key.of("com.salesforce.grpc.contrib.context.frozen", Metadata.ASCII_STRING_MARSHALLER); + private static final AmbientContext instance = new AmbientContext(); /** @@ -77,7 +85,7 @@ public void discardAll(Metadata.Key key) { } /** - * Returns the last metadata entry added with the name 'name' parsed as T. + * Returns the last ambient context entry added with the name 'name' parsed as T. * * @return the parsed metadata entry or null if there are none. */ @@ -87,9 +95,9 @@ public T get(Metadata.Key key) { } /** - * Returns all the metadata entries named 'name', in the order they were received, parsed as T, or + * Returns all the ambient context entries named 'name', in the order they were received, parsed as T, or * null if there are none. The iterator is not guaranteed to be "live." It may or may not be - * accurate if Metadata is mutated. + * accurate if the ambient context is mutated. */ @Nullable public Iterable getAll(final Metadata.Key key) { @@ -127,7 +135,9 @@ public boolean remove(Metadata.Key key, T value) { return internalCurrent().remove(key, value); } - /** Remove all values for the given key. If there were no values, {@code null} is returned. */ + /** + * Remove all values for the given key. If there were no values, {@code null} is returned. + */ public Iterable removeAll(Metadata.Key key) { return internalCurrent().removeAll(key); } diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java index c154afe5..1a9028fc 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java @@ -16,7 +16,7 @@ *

Each {@code AmbientContextClientInterceptor} marshals headers with a know prefix. If multiple prefixes are needed, * add multiple {@code AmbientContextClientInterceptor} instances to the gRPC interceptor chain. * - * See package javadoc for more info. + *

See package javadoc for more info. */ public class AmbientContextClientInterceptor implements ClientInterceptor { private String headerPrefix; diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java index a65f163a..32ab64d7 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java @@ -16,7 +16,7 @@ *

Each {@code AmbientContextServerInterceptor} marshals headers with a know prefix. If multiple prefixes are needed, * add multiple {@code AmbientContextServerInterceptor} instances to the gRPC interceptor chain. * - * See package javadoc for more info. + *

See package javadoc for more info. */ public class AmbientContextServerInterceptor implements ServerInterceptor { private String headerPrefix; From afdaed391e532f7d4604c82f2978fcea7e53600e Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Mon, 9 Oct 2017 15:59:43 -0700 Subject: [PATCH 07/16] Freeze and thaw AmbientContext --- .../grpc/contrib/context/AmbientContext.java | 90 ++++++++++++------- .../AmbientContextServerInterceptor.java | 4 +- .../contrib/context/AmbientContextTest.java | 40 ++++++++- 3 files changed, 99 insertions(+), 35 deletions(-) diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java index 019125f3..1804d61c 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java @@ -7,7 +7,6 @@ package com.salesforce.grpc.contrib.context; -import com.google.common.base.Preconditions; import io.grpc.Context; import io.grpc.Metadata; @@ -15,6 +14,9 @@ import javax.annotation.concurrent.NotThreadSafe; import java.util.Set; +import static com.google.common.base.Preconditions.*; + + /** * {@code AmbientContext} is entry point for working with the ambient context managed by {@link AmbientContextClientInterceptor} * and {@link AmbientContextServerInterceptor}. The interface for this class is very similar to gRPC's {@code Metadata} @@ -27,13 +29,7 @@ */ @NotThreadSafe public final class AmbientContext { - private AmbientContext() { } - - static final Context.Key DATA_KEY = Context.key("AmbientContext"); - private static final Metadata.Key FREEZE_KEY = - Metadata.Key.of("com.salesforce.grpc.contrib.context.frozen", Metadata.ASCII_STRING_MARSHALLER); - - private static final AmbientContext instance = new AmbientContext(); + static final Context.Key DATA_KEY = Context.key("AmbientContext"); /** * Attaches an empty ambient context to the provided gRPC {@code Context}. @@ -42,10 +38,10 @@ private AmbientContext() { } * provided gRPC {@code Context}. */ public static Context initialize(Context context) { - Preconditions.checkNotNull(context, "context"); - Preconditions.checkState(DATA_KEY.get(context) == null, + checkNotNull(context, "context"); + checkState(DATA_KEY.get(context) == null, "AmbientContext has already been created in the scope of the current context"); - return context.withValue(DATA_KEY, new Metadata()); + return context.withValue(DATA_KEY, new AmbientContext()); } /** @@ -54,17 +50,43 @@ public static Context initialize(Context context) { * @throws IllegalStateException if no ambient context is attached to the current gRPC {@code Context}. */ public static AmbientContext current() { - internalCurrent(); - return instance; - } - - private static Metadata internalCurrent() { - Preconditions.checkState(DATA_KEY.get() != null, + checkState(DATA_KEY.get() != null, "AmbientContext has not yet been created in the scope of the current context"); return DATA_KEY.get(); } + private Metadata contextMetadata; + private Object freezeKey = null; + AmbientContext() { + this.contextMetadata = new Metadata(); + } + + /** + * Marks this AmbientContext as read-only, preventing any further modification + * + * @return + */ + public Object freeze() { + checkState(!isFrozen(), "AmbientContext already frozen. Cannot freeze() twice."); + freezeKey = new Object(); + return freezeKey; + } + + public void thaw(Object freezeKey) { + checkState(isFrozen(), "AmbientContext is not frozen. Cannot thaw()."); + checkState(this.freezeKey == freezeKey, + "The provided freezeKey is not the same object returned by freeze()"); + this.freezeKey = null; + } + + public boolean isFrozen() { + return freezeKey != null; + } + + private void checkFreeze() { + checkState(freezeKey == null, "AmbientContext cannot be modified while frozen"); + } /** * Returns true if a value is defined for the given key. @@ -73,15 +95,18 @@ private static Metadata internalCurrent() { * prefer calling them directly and checking the return value against {@code null}. */ public boolean containsKey(Metadata.Key key) { - return internalCurrent().containsKey(key); + return contextMetadata.containsKey(key); } /** * Remove all values for the given key without returning them. This is a minor performance * optimization if you do not need the previous values. + * + * @throws IllegalStateException if the AmbientContext is frozen */ public void discardAll(Metadata.Key key) { - internalCurrent().discardAll(key); + checkFreeze(); + contextMetadata.discardAll(key); } /** @@ -91,7 +116,7 @@ public void discardAll(Metadata.Key key) { */ @Nullable public T get(Metadata.Key key) { - return internalCurrent().get(key); + return contextMetadata.get(key); } /** @@ -101,7 +126,7 @@ public T get(Metadata.Key key) { */ @Nullable public Iterable getAll(final Metadata.Key key) { - return internalCurrent().getAll(key); + return contextMetadata.getAll(key); } /** @@ -110,7 +135,7 @@ public Iterable getAll(final Metadata.Key key) { * @return unmodifiable Set of keys */ public Set keys() { - return internalCurrent().keys(); + return contextMetadata.keys(); } /** @@ -118,9 +143,11 @@ public Set keys() { * the end. Duplicate values for the same key are permitted. * * @throws NullPointerException if key or value is null + * @throws IllegalStateException if the AmbientContext is frozen */ public void put(Metadata.Key key, T value) { - internalCurrent().put(key, value); + checkFreeze(); + contextMetadata.put(key, value); } /** @@ -129,26 +156,27 @@ public void put(Metadata.Key key, T value) { * @param key key for value * @param value value * @return {@code true} if {@code value} removed; {@code false} if {@code value} was not present + * * @throws NullPointerException if {@code key} or {@code value} is null + * @throws IllegalStateException if the AmbientContext is frozen */ public boolean remove(Metadata.Key key, T value) { - return internalCurrent().remove(key, value); + checkFreeze(); + return contextMetadata.remove(key, value); } /** * Remove all values for the given key. If there were no values, {@code null} is returned. + * + * @throws IllegalStateException if the AmbientContext is frozen */ public Iterable removeAll(Metadata.Key key) { - return internalCurrent().removeAll(key); + checkFreeze(); + return contextMetadata.removeAll(key); } @Override public String toString() { - Metadata ctx = DATA_KEY.get(); - if (ctx != null) { - return ctx.toString(); - } else { - return "[MISSING AMBIENT CONTEXT]"; - } + return (isFrozen() ? "[FROZEN] " : "[THAWED] ") + contextMetadata.toString(); } } diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java index 32ab64d7..b722976d 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java @@ -34,9 +34,9 @@ public AmbientContextServerInterceptor(String headerPrefix) { @Override public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { - Metadata ctx = AmbientContext.DATA_KEY.get(); + AmbientContext ctx = AmbientContext.DATA_KEY.get(); // Only initialize ctx if not yet initialized - ctx = ctx != null ? ctx : new Metadata(); + ctx = ctx != null ? ctx : new AmbientContext(); for (String keyName : headers.keys()) { if (!keyName.startsWith(headerPrefix)) { continue; diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java index bfce4ac3..4772f0fb 100644 --- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java @@ -8,6 +8,7 @@ package com.salesforce.grpc.contrib.context; import io.grpc.Context; +import io.grpc.Metadata; import io.grpc.testing.GrpcServerRule; import org.junit.Before; import org.junit.Rule; @@ -52,11 +53,46 @@ public void contextAccessMethods() { @Test public void contextFreezingWorks() { - fail("Not implemented"); + Metadata.Key key = Metadata.Key.of("key", Metadata.ASCII_STRING_MARSHALLER); + AmbientContext context = new AmbientContext(); + + assertThat(context.isFrozen()).isFalse(); + Object freezeKey = context.freeze(); + assertThat(context.isFrozen()).isTrue(); + assertThatThrownBy(() -> context.put(key, "foo")).isInstanceOf(IllegalStateException.class); + } + + @Test + public void contextDoubleFreezingThrows() { + AmbientContext context = new AmbientContext(); + context.freeze(); + assertThatThrownBy(context::freeze).isInstanceOf(IllegalStateException.class); } @Test public void contextThawingWorks() { - fail("Not implemented"); + AmbientContext context = new AmbientContext(); + + Object freezeKey = context.freeze(); + context.thaw(freezeKey); + assertThat(context.isFrozen()).isFalse(); + } + + @Test + public void contextThawingNotFrozenThrows() { + AmbientContext context = new AmbientContext(); + assertThatThrownBy(() -> context.thaw(new Object())) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("AmbientContext is not frozen."); + } + + @Test + public void contextThawingWrongKeyThrows() { + AmbientContext context = new AmbientContext(); + + Object freezeKey = context.freeze(); + assertThatThrownBy(() -> context.thaw(new Object())) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("The provided freezeKey"); } } From 07cbd9431c04e58d9d3d5f58368a1ebe9a6ca051 Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Mon, 9 Oct 2017 17:39:33 -0700 Subject: [PATCH 08/16] Multi-value AmbientContext --- .../AmbientContextClientInterceptor.java | 16 +++++---- .../AmbientContextServerInterceptor.java | 8 +++-- .../context/AmbientContextTransferTest.java | 34 ++++++++++++++++++- 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java index 1a9028fc..edd9e71f 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java @@ -42,15 +42,19 @@ public void start(Listener responseListener, Metadata headers) { if (keyString.startsWith(headerPrefix)) { if (keyString.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { Metadata.Key key = Metadata.Key.of(keyString, Metadata.BINARY_BYTE_MARSHALLER); - byte[] value = ctx.get(key); - if (value != null) { - headers.put(key, value); + Iterable values = ctx.getAll(key); + if (values != null) { + for (byte[] value : values) { + headers.put(key, value); + } } } else { Metadata.Key key = Metadata.Key.of(keyString, Metadata.ASCII_STRING_MARSHALLER); - String value = ctx.get(key); - if (value != null) { - headers.put(key, value); + Iterable values = ctx.getAll(key); + if (values != null) { + for (String value : values) { + headers.put(key, value); + } } } } diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java index b722976d..eddce14e 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java @@ -43,12 +43,14 @@ public ServerCall.Listener interceptCall(ServerCall key = Metadata.Key.of(keyName, Metadata.ASCII_STRING_MARSHALLER); - String value = headers.get(key); - if (value == null) { + Iterable values = headers.getAll(key); + if (values == null) { continue; } - ctx.put(key, value); + for (String value : values) { + ctx.put(key, value); + } } return Contexts.interceptCall(Context.current().withValue(AmbientContext.DATA_KEY, ctx), call, headers, next); diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java index 110693c4..495ac64d 100644 --- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java @@ -75,7 +75,39 @@ public void sayHello(HelloRequest request, StreamObserver respons @Test public void multiValueContextTransfers() throws Exception { - fail("Not implemented"); + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue1 = "context-value1"; + String expectedCtxValue2 = "context-value2"; + String expectedCtxValue3 = "context-value3"; + AtomicReference> ctxValue = new AtomicReference<>(); + + // Service + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().getAll(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Plumbing + serverRule1.getServiceRegistry().addService(ServerInterceptors + .intercept(svc, new AmbientContextServerInterceptor("ctx-"))); + + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule1.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue1); + AmbientContext.current().put(ctxKey, expectedCtxValue2); + AmbientContext.current().put(ctxKey, expectedCtxValue3); + stub.sayHello(HelloRequest.newBuilder().setName("world").build()); + }); + + assertThat(ctxValue.get()).containsExactlyInAnyOrder(expectedCtxValue1, expectedCtxValue2, expectedCtxValue3); } @Test From d60fd376de5b054d4de0764341d251bc4f67e636 Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Mon, 9 Oct 2017 18:01:04 -0700 Subject: [PATCH 09/16] Documentation --- .../grpc/contrib/context/AmbientContext.java | 27 +++++++++-- .../AmbientContextServerInterceptor.java | 26 +++++++--- .../contrib/context/AmbientContextTest.java | 13 +---- .../context/AmbientContextTransferTest.java | 5 +- .../context/BinaryAmbientContextTest.java | 48 ++++++++++++++++++- 5 files changed, 94 insertions(+), 25 deletions(-) diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java index 1804d61c..de82e2b6 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java @@ -63,9 +63,15 @@ public static AmbientContext current() { } /** - * Marks this AmbientContext as read-only, preventing any further modification + * Makes the AmbientContext as read-only, preventing any further modification. A "freeze key" is returned, which + * can be used to {@link #thaw(Object)} the AmbientContext in the future. * - * @return + *

{@code freeze()} and {@code thaw()} are typically used to mark the ambient context read-only when the + * interceptor chain completes. + * + * @return a "freeze key" that can be used passed to {@link #thaw(Object)} + * + * @throws IllegalStateException if the AmbientContext is already frozen */ public Object freeze() { checkState(!isFrozen(), "AmbientContext already frozen. Cannot freeze() twice."); @@ -73,13 +79,28 @@ public Object freeze() { return freezeKey; } + /** + * Makes the AmbientContext mutable again, after {@link #freeze()} has been called. A "freeze key" is needed to + * unfreeze the AmbientContext, ensuring only the code that froze the context can subsequently thaw it. + * + *

{@code freeze()} and {@code thaw()} are typically used to mark the ambient context read-only when the + * interceptor chain completes. + * + * @param freezeKey the "freeze key" returned by {@link #freeze()} + * + * @throws IllegalStateException if the AmbientContext has not yet been frozen + * @throws IllegalArgumentException if the {@code freezeKey} is incorrect + */ public void thaw(Object freezeKey) { checkState(isFrozen(), "AmbientContext is not frozen. Cannot thaw()."); - checkState(this.freezeKey == freezeKey, + checkArgument(this.freezeKey == freezeKey, "The provided freezeKey is not the same object returned by freeze()"); this.freezeKey = null; } + /** + * @return true of the AmbientContext has been frozen + */ public boolean isFrozen() { return freezeKey != null; } diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java index eddce14e..1bab855c 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java @@ -42,14 +42,26 @@ public ServerCall.Listener interceptCall(ServerCall key = Metadata.Key.of(keyName, Metadata.ASCII_STRING_MARSHALLER); - Iterable values = headers.getAll(key); - if (values == null) { - continue; - } + if (keyName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + Metadata.Key key = Metadata.Key.of(keyName, Metadata.BINARY_BYTE_MARSHALLER); + Iterable values = headers.getAll(key); + if (values == null) { + continue; + } + + for (byte[] value : values) { + ctx.put(key, value); + } + } else { + Metadata.Key key = Metadata.Key.of(keyName, Metadata.ASCII_STRING_MARSHALLER); + Iterable values = headers.getAll(key); + if (values == null) { + continue; + } - for (String value : values) { - ctx.put(key, value); + for (String value : values) { + ctx.put(key, value); + } } } diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java index 4772f0fb..16079ced 100644 --- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java @@ -46,11 +46,6 @@ public void uninitializedContextThrows() { assertThatThrownBy(AmbientContext::current).isInstanceOf(IllegalStateException.class); } - @Test - public void contextAccessMethods() { - fail("Not implemented"); - } - @Test public void contextFreezingWorks() { Metadata.Key key = Metadata.Key.of("key", Metadata.ASCII_STRING_MARSHALLER); @@ -81,9 +76,7 @@ public void contextThawingWorks() { @Test public void contextThawingNotFrozenThrows() { AmbientContext context = new AmbientContext(); - assertThatThrownBy(() -> context.thaw(new Object())) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("AmbientContext is not frozen."); + assertThatThrownBy(() -> context.thaw(new Object())).isInstanceOf(IllegalStateException.class); } @Test @@ -91,8 +84,6 @@ public void contextThawingWrongKeyThrows() { AmbientContext context = new AmbientContext(); Object freezeKey = context.freeze(); - assertThatThrownBy(() -> context.thaw(new Object())) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining("The provided freezeKey"); + assertThatThrownBy(() -> context.thaw(new Object())).isInstanceOf(IllegalArgumentException.class); } } diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java index 495ac64d..f749c86e 100644 --- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java @@ -30,9 +30,8 @@ import static org.awaitility.Awaitility.await; public class AmbientContextTransferTest { - @Rule - public GrpcServerRule serverRule1 = new GrpcServerRule(); - @Rule public GrpcServerRule serverRule2 = new GrpcServerRule(); + @Rule public final GrpcServerRule serverRule1 = new GrpcServerRule(); + @Rule public final GrpcServerRule serverRule2 = new GrpcServerRule(); @Before public void setUp() throws Exception { diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java index 31757f26..bef318cb 100644 --- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java @@ -7,13 +7,59 @@ package com.salesforce.grpc.contrib.context; +import com.google.common.io.BaseEncoding; +import com.salesforce.grpc.contrib.GreeterGrpc; +import com.salesforce.grpc.contrib.HelloRequest; +import com.salesforce.grpc.contrib.HelloResponse; +import io.grpc.Context; +import io.grpc.Metadata; +import io.grpc.ServerInterceptors; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import org.junit.Rule; import org.junit.Test; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.fail; public class BinaryAmbientContextTest { + @Rule + public final GrpcServerRule serverRule = new GrpcServerRule(); + @Test public void binaryContextValueTransfers() throws Exception { - fail("Not implemented"); + Metadata.Key ctxKey = Metadata.Key.of( + "ctx-context-key" + Metadata.BINARY_HEADER_SUFFIX, + Metadata.BINARY_BYTE_MARSHALLER); + byte[] expectedCtxValue = BaseEncoding.base16().decode("DEADBEEF"); + AtomicReference ctxValue = new AtomicReference<>(); + + // Service + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Plumbing + serverRule.getServiceRegistry().addService(ServerInterceptors + .intercept(svc, new AmbientContextServerInterceptor("ctx-"))); + + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + stub.sayHello(HelloRequest.newBuilder().setName("world").build()); + }); + + assertThat(ctxValue.get()).containsExactly(expectedCtxValue); } } From 39f13651fd904347e962e08f3ccae60c042c6cb9 Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Mon, 9 Oct 2017 23:43:44 -0700 Subject: [PATCH 10/16] MoreMetadata marshallers --- grpc-contrib/pom.xml | 5 + .../salesforce/grpc/contrib/MoreMetadata.java | 168 ++++++++++++++++++ .../grpc/contrib/MoreMetadataTest.java | 111 ++++++++++++ pom.xml | 7 + 4 files changed, 291 insertions(+) create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreMetadata.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/MoreMetadataTest.java diff --git a/grpc-contrib/pom.xml b/grpc-contrib/pom.xml index 6545af7d..6d23b8f6 100644 --- a/grpc-contrib/pom.xml +++ b/grpc-contrib/pom.xml @@ -35,6 +35,11 @@ io.grpc grpc-context + + com.google.code.gson + gson + provided + io.grpc grpc-testing diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreMetadata.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreMetadata.java new file mode 100644 index 00000000..eeac64d6 --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreMetadata.java @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib; + +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.protobuf.GeneratedMessageV3; +import com.google.protobuf.InvalidProtocolBufferException; +import io.grpc.Metadata; + +import java.io.IOException; +import java.io.StringWriter; +import java.io.Writer; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +/** + * {@code MoreMetadata} provides additional utilities for working with gRPC {@code Metadata}. + */ +//CHECKSTYLE:OFF: MethodName +//CHECKSTYLE:OFF: VisibilityModifier +public final class MoreMetadata { + private MoreMetadata() { } + + /** + * A metadata marshaller that encodes objects as JSON using the google-gson library. + * + *

All non-ascii characters are unicode escaped to comply with {@code AsciiMarshaller}'s character range + * requirements. + * + * @param clazz the type to serialize + * @param + */ + public static Metadata.AsciiMarshaller JSON_MARSHALLER(Class clazz) { + return new Metadata.AsciiMarshaller() { + TypeToken typeToken = TypeToken.of(clazz); + private Gson gson = new Gson(); + + @Override + public String toAsciiString(T value) { + try { + try (StringWriter sw = new StringWriter()) { + gson.toJson(value, typeToken.getType(), new UnicodeEscapingAsciiWriter(sw)); + return sw.toString(); + } + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + + @Override + public T parseAsciiString(String serialized) { + return gson.fromJson(serialized, typeToken.getType()); + } + }; + } + + /** + * See: https://github.com/google/gson/issues/388. + */ + private static final class UnicodeEscapingAsciiWriter extends Writer { + private final Writer out; + + private UnicodeEscapingAsciiWriter(Writer out) { + this.out = out; + } + + @Override public void write(char[] buffer, int offset, int count) throws IOException { + for (int i = 0; i < count; i++) { + char c = buffer[i + offset]; + if (c >= ' ' && c <= '~') { + out.write(c); + } else { + out.write(String.format("\\u%04x", (int) c)); + } + } + } + + @Override public void flush() throws IOException { + out.flush(); + } + + @Override public void close() throws IOException { + out.close(); + } + } + + /** + * A metadata marshaller that encodes objects as protobuf according to their proto IDL specification. + * + * @param clazz the type to serialize + * @param + */ + public static Metadata.BinaryMarshaller PROTOBUF_MARSHALLER(Class clazz) { + try { + Method defaultInstance = clazz.getMethod("getDefaultInstance"); + GeneratedMessageV3 instance = (GeneratedMessageV3) defaultInstance.invoke(null); + + return new Metadata.BinaryMarshaller() { + @Override + public byte[] toBytes(T value) { + return value.toByteArray(); + } + + @Override + public T parseBytes(byte[] serialized) { + try { + return (T) instance.getParserForType().parseFrom(serialized); + } catch (InvalidProtocolBufferException ipbe) { + throw new IllegalArgumentException(ipbe); + } + } + }; + } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ex) { + throw new IllegalStateException(ex); + } + } + + /** + * A metadata marshaller that encodes boolean values. + */ + public static Metadata.AsciiMarshaller BOOLEAN_MARSHALLER = new Metadata.AsciiMarshaller() { + @Override + public String toAsciiString(Boolean value) { + return value.toString(); + } + + @Override + public Boolean parseAsciiString(String serialized) { + return Boolean.parseBoolean(serialized); + } + }; + + /** + * A metadata marshaller that encodes integer-type values. + */ + public static Metadata.AsciiMarshaller LONG_MARSHALLER = new Metadata.AsciiMarshaller() { + @Override + public String toAsciiString(Long value) { + return value.toString(); + } + + @Override + public Long parseAsciiString(String serialized) { + return Long.parseLong(serialized); + } + }; + + /** + * A metadata marshaller that encodes floating-point-type values. + */ + public static Metadata.AsciiMarshaller DOUBLE_MARSHALLER = new Metadata.AsciiMarshaller() { + @Override + public String toAsciiString(Double value) { + return value.toString(); + } + + @Override + public Double parseAsciiString(String serialized) { + return Double.parseDouble(serialized); + } + }; +} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/MoreMetadataTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/MoreMetadataTest.java new file mode 100644 index 00000000..da6fb5ec --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/MoreMetadataTest.java @@ -0,0 +1,111 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib; + +import com.google.common.base.Objects; +import io.grpc.Metadata; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class MoreMetadataTest { + @Test + public void jsonMarshallerRoundtrip() { + Foo foo = new Foo(); + foo.country = "France"; + List bars = new ArrayList<>(); + Bar bar1 = new Bar(); + bar1.cheese = "Brë"; + bar1.age = 2; + bars.add(bar1); + Bar bar2 = new Bar(); + bar2.cheese = "Guda<>'"; + bar2.age = 4; + bars.add(bar2); + foo.bars = bars; + + Metadata.AsciiMarshaller marshaller = MoreMetadata.JSON_MARSHALLER(Foo.class); + String str = marshaller.toAsciiString(foo); + assertThat(str).doesNotContain("ë"); + + Foo foo2 = marshaller.parseAsciiString(str); + assertThat(foo2).isEqualTo(foo); + } + + private class Foo { + String country; + List bars; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Foo foo = (Foo) o; + return Objects.equal(country, foo.country) && + Objects.equal(bars, foo.bars); + } + } + + private class Bar { + String cheese; + int age; + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Bar bar = (Bar) o; + return age == bar.age && + Objects.equal(cheese, bar.cheese); + } + } + + @Test + public void protobufMarshallerRoundtrip() { + HelloRequest request = HelloRequest.newBuilder().setName("World").build(); + + Metadata.BinaryMarshaller marshaller = MoreMetadata.PROTOBUF_MARSHALLER(HelloRequest.class); + byte[] bytes = marshaller.toBytes(request); + HelloRequest request2 = marshaller.parseBytes(bytes); + + assertThat(request2).isEqualTo(request); + } + + @Test + public void booleanMarshallerRountrip() { + Metadata.AsciiMarshaller marshaller = MoreMetadata.BOOLEAN_MARSHALLER; + String s = marshaller.toAsciiString(Boolean.TRUE); + assertThat(s).isEqualTo("true"); + + Boolean b = marshaller.parseAsciiString(s); + assertThat(b).isTrue(); + } + + @Test + public void longMarshallerRountrip() { + Metadata.AsciiMarshaller marshaller = MoreMetadata.LONG_MARSHALLER; + String s = marshaller.toAsciiString(42L); + assertThat(s).isEqualTo("42"); + + Long l = marshaller.parseAsciiString(s); + assertThat(l).isEqualTo(42L); + } + + @Test + public void doubleMarshallerRountrip() { + Metadata.AsciiMarshaller marshaller = MoreMetadata.DOUBLE_MARSHALLER; + String s = marshaller.toAsciiString(42.42); + assertThat(s).isEqualTo("42.42"); + + Double d = marshaller.parseAsciiString(s); + assertThat(d).isEqualTo(42.42); + } +} diff --git a/pom.xml b/pom.xml index 3ad7ec63..2b7ac21c 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,7 @@ 1.6.1 3.3.0 + 2.7 0.9.4 4.2.0.RELEASE @@ -165,6 +166,12 @@ spring-webmvc ${spring.version} + + com.google.code.gson + gson + ${gson.version} + provided + io.grpc From 615fb09021587c6326fc99672bb08097baa11ffd Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Tue, 10 Oct 2017 15:25:53 -0700 Subject: [PATCH 11/16] Added AmbientContextEnforcer interceptors --- .../grpc/contrib/context/AmbientContext.java | 26 +++ .../AmbientContextClientInterceptor.java | 51 +++--- ...bientContextEnforcerClientInterceptor.java | 83 ++++++++++ ...bientContextEnforcerServerInterceptor.java | 73 +++++++++ .../AmbientContextServerInterceptor.java | 11 +- .../grpc/contrib/context/package-info.java | 3 + .../context/AmbientContextEnforcerTest.java | 149 ++++++++++++++++++ .../contrib/context/AmbientContextTest.java | 22 ++- 8 files changed, 388 insertions(+), 30 deletions(-) create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerClientInterceptor.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerServerInterceptor.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerTest.java diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java index de82e2b6..745a795c 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java @@ -55,6 +55,13 @@ public static AmbientContext current() { return DATA_KEY.get(); } + /** + * @return true if an {@code AmbientContext} is attached to the current gRPC context. + */ + public static boolean isPresent() { + return DATA_KEY.get() != null; + } + private Metadata contextMetadata; private Object freezeKey = null; @@ -62,6 +69,14 @@ public static AmbientContext current() { this.contextMetadata = new Metadata(); } + /** + * Copy constructor. + */ + AmbientContext(AmbientContext other) { + this(); + this.contextMetadata.merge(other.contextMetadata); + } + /** * Makes the AmbientContext as read-only, preventing any further modification. A "freeze key" is returned, which * can be used to {@link #thaw(Object)} the AmbientContext in the future. @@ -98,6 +113,17 @@ public void thaw(Object freezeKey) { this.freezeKey = null; } + /** + * Similar to {@link #initialize(Context)}, {@code fork()} attaches a shallow clone of this {@code AmbientContext} + * to a provided gRPC {@code Context}. Use {@code fork()} when you want create a temporary context scope. + * + * @param context + * @return + */ + public Context fork(Context context) { + return context.withValue(DATA_KEY, new AmbientContext(this)); + } + /** * @return true of the AmbientContext has been frozen */ diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java index edd9e71f..16c13e95 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java @@ -33,35 +33,40 @@ public AmbientContextClientInterceptor(String headerPrefix) { @Override public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { - return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { - @Override - public void start(Listener responseListener, Metadata headers) { - AmbientContext ctx = AmbientContext.current(); - if (ctx != null) { - for (String keyString : ctx.keys()) { - if (keyString.startsWith(headerPrefix)) { - if (keyString.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { - Metadata.Key key = Metadata.Key.of(keyString, Metadata.BINARY_BYTE_MARSHALLER); - Iterable values = ctx.getAll(key); - if (values != null) { - for (byte[] value : values) { - headers.put(key, value); + if (AmbientContext.isPresent()) { + return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + AmbientContext ctx = AmbientContext.current(); + if (ctx != null) { + for (String keyString : ctx.keys()) { + if (keyString.startsWith(headerPrefix)) { + if (keyString.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + Metadata.Key key = Metadata.Key.of(keyString, Metadata.BINARY_BYTE_MARSHALLER); + Iterable values = ctx.getAll(key); + if (values != null) { + for (byte[] value : values) { + headers.put(key, value); + } } - } - } else { - Metadata.Key key = Metadata.Key.of(keyString, Metadata.ASCII_STRING_MARSHALLER); - Iterable values = ctx.getAll(key); - if (values != null) { - for (String value : values) { - headers.put(key, value); + } else { + Metadata.Key key = Metadata.Key.of(keyString, Metadata.ASCII_STRING_MARSHALLER); + Iterable values = ctx.getAll(key); + if (values != null) { + for (String value : values) { + headers.put(key, value); + } } } } } } + super.start(responseListener, headers); } - super.start(responseListener, headers); - } - }; + }; + } else { + // Noop if ambient context is absent + return next.newCall(method, callOptions); + } } } diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerClientInterceptor.java new file mode 100644 index 00000000..f4972ea5 --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerClientInterceptor.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.*; + +/** + * Some uses of ambient context, like distributed tracing, break down if a service fails to propagate the context + * to downstream services. This typically happens when multi-threading code fails to correctly transfer the gRPC + * context to worker threads. + * + *

{@code AmbientContextEnforcerClientInterceptor} is used to enforce context propagation by catastrophically + * failing downstream service calls if the ambient context is missing or incomplete. + */ +public class AmbientContextEnforcerClientInterceptor implements ClientInterceptor { + private String[] requiredContextKeys = {}; + + /** + * Constructs an {@code AmbientContextEnforcerClientInterceptor} with no required context keys. + */ + public AmbientContextEnforcerClientInterceptor() { + } + + /** + * Constructs an {@code AmbientContextEnforcerClientInterceptor} with a set of required context keys. + */ + public AmbientContextEnforcerClientInterceptor(String... requiredContextKeys) { + this.requiredContextKeys = requiredContextKeys; + } + + /** + * MissingAmbientContextException is thrown to indicate a breakdown in context continuity between services. + */ + public static class MissingAmbientContextException extends RuntimeException { + public MissingAmbientContextException(String message) { + super(message); + } + } + + @Override + public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + // Throw if ambient context is missing + if (!AmbientContext.isPresent()) { + throw missingContextException(); + } + + // Throw if required context keys are missing + for (String requiredKey : requiredContextKeys) { + if (!AmbientContext.current().keys().contains(requiredKey)) { + throw incompleteContextException(requiredKey); + } + } + + return next.newCall(method, callOptions); + } + + /** + * Override this method to change the exception type or message thrown when the ambient context is missing + * entirely, perhaps to reference your own internal documentation. + * + * @return a RuntimeException + */ + protected RuntimeException missingContextException() { + return new MissingAmbientContextException("No AmbientContext is attached to the current gRPC Context. " + + "Make sure Context is correctly transferred between worker threads using Context.wrap() or " + + "Context.currentContextExecutor()."); + } + + /** + * Override this method to change the exception type or message thrown when the ambient context is missing a + * required context key, perhaps to reference your own internal documentation. + * + * @return a RuntimeException + */ + protected RuntimeException incompleteContextException(String missingKey) { + return new MissingAmbientContextException("The AmbientContext is missing a required context key: " + missingKey); + } +} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerServerInterceptor.java new file mode 100644 index 00000000..b0d6cf03 --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerServerInterceptor.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.*; + +/** + * Some uses of ambient context, like distributed tracing, break down if a service fails to propagate the context + * to downstream services. + * + *

{@code AmbientContextEnforcerServerInterceptor} is used to enforce context propagation by catastrophically + * failing a service call if the ambient context is missing or incomplete. + */ +public class AmbientContextEnforcerServerInterceptor implements ServerInterceptor { + private String[] requiredContextKeys = {}; + + /** + * Constructs an {@code AmbientContextEnforcerServerInterceptor} with no required context keys. + */ + public AmbientContextEnforcerServerInterceptor() { + } + + /** + * Constructs an {@code AmbientContextEnforcerServerInterceptor} with a set of required context keys. + */ + public AmbientContextEnforcerServerInterceptor(String... requiredContextKeys) { + this.requiredContextKeys = requiredContextKeys; + } + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { + // Throw if ambient context is missing + if (!AmbientContext.isPresent()) { + call.close(missingContextStatus(), new Metadata()); + return new ServerCall.Listener() { }; + } + + // Throw if required context keys are missing + for (String requiredKey : requiredContextKeys) { + if (!AmbientContext.current().keys().contains(requiredKey)) { + call.close(incompleteContextStatus(requiredKey), new Metadata()); + return new ServerCall.Listener() { }; + } + } + + return next.startCall(call, headers); + } + + /** + * Override this method to change the gRPC {@code Status} returned when the ambient context is missing + * entirely, perhaps to reference your own internal documentation. + * + * @return a gRPC Status + */ + protected Status missingContextStatus() { + return Status.FAILED_PRECONDITION.withDescription("Ambient context is required but not found"); + } + + /** + * Override this method to change the gRPC {@code Status} returned when the ambient context is missing a + * required context key, perhaps to reference your own internal documentation. + * + * @return a gRPC Status + */ + protected Status incompleteContextStatus(String missingKey) { + return Status.FAILED_PRECONDITION.withDescription("Required ambient context key " + missingKey + " was not found"); + } +} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java index 1bab855c..c732e86c 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java @@ -37,6 +37,8 @@ public ServerCall.Listener interceptCall(ServerCall ServerCall.Listener interceptCall(ServerCall + * The Ambient Context Design Pattern in .NET */ package com.salesforce.grpc.contrib.context; \ No newline at end of file diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerTest.java new file mode 100644 index 00000000..a91d26bd --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerTest.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import com.salesforce.grpc.contrib.GreeterGrpc; +import com.salesforce.grpc.contrib.HelloRequest; +import com.salesforce.grpc.contrib.HelloResponse; +import com.salesforce.grpc.contrib.Statuses; +import io.grpc.*; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import org.assertj.core.api.Condition; +import org.junit.Rule; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class AmbientContextEnforcerTest { + @Rule public final GrpcServerRule serverRule = new GrpcServerRule(); + + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + @Test + public void clientEnforcerPasses() { + // Plumbing + serverRule.getServiceRegistry().addService(svc); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextEnforcerClientInterceptor("ctx-test")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + Metadata.Key k = Metadata.Key.of("ctx-test", Metadata.ASCII_STRING_MARSHALLER); + AmbientContext.current().put(k, "v"); + + stub.sayHello(HelloRequest.newBuilder().setName("World").build()); + }); + } + + @Test + public void clientEnforcerFailsMissing() { + // Plumbing + serverRule.getServiceRegistry().addService(svc); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextEnforcerClientInterceptor("ctx-test")); + + // Test + assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build())) + .isInstanceOf(AmbientContextEnforcerClientInterceptor.MissingAmbientContextException.class); + } + + @Test + public void clientEnforcerFailsIncomplete() { + // Plumbing + serverRule.getServiceRegistry().addService(svc); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextEnforcerClientInterceptor("ctx-test")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build())) + .isInstanceOf(AmbientContextEnforcerClientInterceptor.MissingAmbientContextException.class) + .hasMessageContaining("ctx-test"); + }); + } + + @Test + public void serverEnforcerPasses() { + // Plumbing + serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc, + new AmbientContextServerInterceptor("ctx-"), + new AmbientContextEnforcerServerInterceptor("ctx-test") + )); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + Metadata.Key k = Metadata.Key.of("ctx-test", Metadata.ASCII_STRING_MARSHALLER); + AmbientContext.current().put(k, "v"); + + stub.sayHello(HelloRequest.newBuilder().setName("World").build()); + }); + } + + @Test + public void serverEnforcerFailsMissing() { + // Plumbing + serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc, + new AmbientContextServerInterceptor("ctx-"), + new AmbientContextEnforcerServerInterceptor("ctx-test") + )); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build())) + .isInstanceOfAny(StatusRuntimeException.class) + .has(new Condition() { + @Override + public boolean matches(Throwable throwable) { + return Statuses.hasStatusCode(throwable, Status.Code.FAILED_PRECONDITION); + } + }); + } + + @Test + public void serverEnforcerFailsIncomplete() { + // Plumbing + serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc, + new AmbientContextServerInterceptor("ctx-"), + new AmbientContextEnforcerServerInterceptor("ctx-test") + )); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + Metadata.Key k = Metadata.Key.of("ctx-unrelated", Metadata.ASCII_STRING_MARSHALLER); + AmbientContext.current().put(k, "v"); + + assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build())) + .isInstanceOfAny(StatusRuntimeException.class) + .hasMessageContaining("ctx-test") + .has(new Condition() { + @Override + public boolean matches(Throwable throwable) { + return Statuses.hasStatusCode(throwable, Status.Code.FAILED_PRECONDITION); + } + }); + }); + } +} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java index 16079ced..79da3837 100644 --- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java @@ -9,20 +9,14 @@ import io.grpc.Context; import io.grpc.Metadata; -import io.grpc.testing.GrpcServerRule; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.Assert.fail; @SuppressWarnings("Duplicates") public class AmbientContextTest { - @Rule public GrpcServerRule serverRule1 = new GrpcServerRule(); - @Rule public GrpcServerRule serverRule2 = new GrpcServerRule(); - @Before public void setUp() throws Exception { // Reset the gRPC context between test executions @@ -86,4 +80,20 @@ public void contextThawingWrongKeyThrows() { Object freezeKey = context.freeze(); assertThatThrownBy(() -> context.thaw(new Object())).isInstanceOf(IllegalArgumentException.class); } + + @Test + public void contextScopeStackingWorks() { + Metadata.Key key = Metadata.Key.of("k", Metadata.ASCII_STRING_MARSHALLER); + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(key, "outer"); + assertThat(AmbientContext.current().get(key)).isEqualTo("outer"); + + AmbientContext.current().fork(Context.current()).run(() -> { + AmbientContext.current().put(key, "inner"); + assertThat(AmbientContext.current().get(key)).isEqualTo("inner"); + }); + + assertThat(AmbientContext.current().get(key)).isEqualTo("outer"); + }); + } } From cd2cde67883e42431df75b8f6a439c58379b0bcc Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Tue, 10 Oct 2017 17:34:20 -0700 Subject: [PATCH 12/16] Added StopwatchInterceptor --- demos/grpc-java-contrib-demo/pom.xml | 3 +- grpc-contrib/pom.xml | 5 + .../grpc/contrib/context/package-info.java | 4 +- .../StopwatchClientInterceptor.java | 65 +++++++++++++ .../StopwatchServerInterceptor.java | 74 +++++++++++++++ .../context/BinaryAmbientContextTest.java | 3 +- .../interceptor/StopwatchInterceptorTest.java | 94 +++++++++++++++++++ pom.xml | 8 ++ 8 files changed, 251 insertions(+), 5 deletions(-) create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchClientInterceptor.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchServerInterceptor.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/StopwatchInterceptorTest.java diff --git a/demos/grpc-java-contrib-demo/pom.xml b/demos/grpc-java-contrib-demo/pom.xml index 10885f42..ad237ce7 100644 --- a/demos/grpc-java-contrib-demo/pom.xml +++ b/demos/grpc-java-contrib-demo/pom.xml @@ -44,7 +44,8 @@ org.slf4j slf4j-api - 1.7.21 + ${slf4j.version} + compile diff --git a/grpc-contrib/pom.xml b/grpc-contrib/pom.xml index 6d23b8f6..53dc4ccc 100644 --- a/grpc-contrib/pom.xml +++ b/grpc-contrib/pom.xml @@ -40,6 +40,11 @@ gson provided + + org.slf4j + slf4j-api + provided + io.grpc grpc-testing diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java index 4e6d81a8..1108a38c 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java @@ -8,7 +8,7 @@ /** * The classes in this package are used together to implement the Ambient Context pattern, where context values are * transparently marshaled from service to service without user intervention. This pattern is particularly useful for - * transparently populating system information like Zipkin trace ids and log correlation ids on every service call in + * transparently populating system information like Zipkin trace ids and logStop correlation ids on every service call in * a call graph. * *

The Ambient Context pattern is implemented in gRPC using the @@ -25,7 +25,7 @@ *

Freezing the ambient context is useful for protecting its contents when transitioning between platform * infrastructure code and service implementation code. In many cases, platform infrastructure code and service * implementations are written by different groups with different goals. Platform infrastructure implementors typically - * own inter-service functions like distributed tracing, log correlation, and service metrics, typically implemented + * own inter-service functions like distributed tracing, logStop correlation, and service metrics, typically implemented * using the gRPC interceptor chain. For these developers ambient context must be mutable. Service implementors, on * the other hand, own the business logic for each service. For these developers it can make sense for the ambient * context to be immutable to prevent unnecessary pollution of the ambient context. Freezing and thawing the context diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchClientInterceptor.java new file mode 100644 index 00000000..2b34e5ac --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchClientInterceptor.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.interceptor; + +import com.google.common.base.Stopwatch; +import io.grpc.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** + * {@code StopwatchClientInterceptor} logs the beginning and end of an outbound gRPC request, along with the total + * round-trip time. + * + *

Typical usage would override {@link #logStart(MethodDescriptor)} and {@link #logStop(MethodDescriptor, Duration)}. + */ +public class StopwatchClientInterceptor implements ClientInterceptor { + private final Logger logger = LoggerFactory.getLogger(StopwatchClientInterceptor.class); + + @Override + public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + logStart(method); + + return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { + Stopwatch stopwatch = Stopwatch.createStarted(); + + @Override + public void start(Listener responseListener, Metadata headers) { + super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener(responseListener) { + @Override + public void onClose(Status status, Metadata trailers) { + super.onClose(status, trailers); + logStop(method, Duration.ofNanos(stopwatch.stop().elapsed(TimeUnit.NANOSECONDS))); + } + }, headers); + } + }; + } + + /** + * Override this method to change how start messages are logged. Ex: use log4j. + * + * @param method The operation being called + */ + protected void logStart(MethodDescriptor method) { + logger.info("Begin call op:" + method.getFullMethodName()); + } + + /** + * Override this method to change how stop messages are logged. Ex: use log4j. + * + * @param method The operation being called + * @param duration Total round-trip time + */ + protected void logStop(MethodDescriptor method, Duration duration) { + logger.info("End call op:" + method.getFullMethodName() + " duration:" + duration); + } +} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchServerInterceptor.java new file mode 100644 index 00000000..be830e5d --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchServerInterceptor.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.interceptor; + +import com.google.common.base.Stopwatch; +import io.grpc.*; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +/** + * {@code StopwatchServerInterceptor} logs the beginning and end of an inbound gRPC request, along with the total + * processing time. + * + *

Typical usage would override {@link #logStart(MethodDescriptor)} and {@link #logStop(MethodDescriptor, Duration)}. + */ +public class StopwatchServerInterceptor implements ServerInterceptor { + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { + logStart(call.getMethodDescriptor()); + + return new ForwardingServerCallListener.SimpleForwardingServerCallListener(next.startCall(call, headers)) { + Stopwatch stopwatch = Stopwatch.createStarted(); + + @Override + public void onCancel() { + super.onCancel(); + logCancel(call.getMethodDescriptor(), Duration.ofNanos(stopwatch.stop().elapsed(TimeUnit.NANOSECONDS))); + } + + @Override + public void onComplete() { + super.onComplete(); + logStop(call.getMethodDescriptor(), Duration.ofNanos(stopwatch.stop().elapsed(TimeUnit.NANOSECONDS))); + } + }; + } + + /** + * Override this method to change how "start" messages are logged. Ex: use log4j. + * + * @param method The operation being called + */ + protected void logStart(MethodDescriptor method) { + System.out.println("Begin service op:" + method.getFullMethodName()); + } + + /** + * Override this method to change how "stop" messages are logged. Ex: use log4j. + * + * @param method The operation being called + * @param duration Total round-trip time + */ + protected void logStop(MethodDescriptor method, Duration duration) { + System.out.println("End service op:" + method.getFullMethodName() + " duration:" + duration); + } + + /** + * Override this method to change how "cancel" messages are logged. Ex: use log4j. + * + *

By default, this delegates to {@link #logStop(MethodDescriptor, Duration)}. + * + * @param method The operation being called + * @param duration Total round-trip time + */ + protected void logCancel(MethodDescriptor method, Duration duration) { + logStop(method, duration); + } +} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java index bef318cb..3dbb4722 100644 --- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java @@ -25,8 +25,7 @@ import static org.junit.Assert.fail; public class BinaryAmbientContextTest { - @Rule - public final GrpcServerRule serverRule = new GrpcServerRule(); + @Rule public final GrpcServerRule serverRule = new GrpcServerRule(); @Test public void binaryContextValueTransfers() throws Exception { diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/StopwatchInterceptorTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/StopwatchInterceptorTest.java new file mode 100644 index 00000000..02fb49a4 --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/StopwatchInterceptorTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.interceptor; + +import com.salesforce.grpc.contrib.GreeterGrpc; +import com.salesforce.grpc.contrib.HelloRequest; +import com.salesforce.grpc.contrib.HelloResponse; +import io.grpc.MethodDescriptor; +import io.grpc.ServerInterceptors; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import org.junit.Rule; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; + +public class StopwatchInterceptorTest { + @Rule public final GrpcServerRule serverRule = new GrpcServerRule(); + + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + @Test + public void clientStopwatchWorks() { + AtomicReference startDesc = new AtomicReference<>(); + AtomicReference stopDesc = new AtomicReference<>(); + AtomicReference stopDur = new AtomicReference<>(); + + //Setup + serverRule.getServiceRegistry().addService(svc); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new StopwatchClientInterceptor() { + @Override + protected void logStart(MethodDescriptor method) { + startDesc.set(method); + } + + @Override + protected void logStop(MethodDescriptor method, Duration duration) { + stopDesc.set(method); + stopDur.set(duration); + } + }); + + stub.sayHello(HelloRequest.newBuilder().setName("World").build()); + + assertThat(startDesc.get().getFullMethodName()).contains("SayHello"); + assertThat(startDesc.get().getFullMethodName()).contains("SayHello"); + assertThat(stopDur.get()).isGreaterThan(Duration.ZERO); + } + + @Test + public void serverStopwatchWorks() { + AtomicReference startDesc = new AtomicReference<>(); + AtomicReference stopDesc = new AtomicReference<>(); + AtomicReference stopDur = new AtomicReference<>(); + + //Setup + serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc, + new StopwatchServerInterceptor() { + @Override + protected void logStart(MethodDescriptor method) { + startDesc.set(method); + } + + @Override + protected void logStop(MethodDescriptor method, Duration duration) { + stopDesc.set(method); + stopDur.set(duration); + } + })); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel()); + + stub.sayHello(HelloRequest.newBuilder().setName("World").build()); + + assertThat(startDesc.get().getFullMethodName()).contains("SayHello"); + assertThat(startDesc.get().getFullMethodName()).contains("SayHello"); + assertThat(stopDur.get()).isGreaterThan(Duration.ZERO); + } +} diff --git a/pom.xml b/pom.xml index 2b7ac21c..fc3b5c16 100644 --- a/pom.xml +++ b/pom.xml @@ -83,6 +83,7 @@ 1.8 ${java.version} ${java.version} + 1.7.21 @@ -166,6 +167,13 @@ spring-webmvc ${spring.version} + + + org.slf4j + slf4j-api + ${slf4j.version} + provided + com.google.code.gson gson From de7521506d59b5814c5ae5541fc76e0f354ed3c4 Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Wed, 11 Oct 2017 10:33:27 -0700 Subject: [PATCH 13/16] Checkstyle --- .../grpc/contrib/interceptor/StopwatchClientInterceptor.java | 2 +- .../grpc/contrib/interceptor/StopwatchServerInterceptor.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchClientInterceptor.java index 2b34e5ac..9d332111 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchClientInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchClientInterceptor.java @@ -29,7 +29,7 @@ public ClientCall interceptCall(MethodDescriptor(next.newCall(method, callOptions)) { - Stopwatch stopwatch = Stopwatch.createStarted(); + private Stopwatch stopwatch = Stopwatch.createStarted(); @Override public void start(Listener responseListener, Metadata headers) { diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchServerInterceptor.java index be830e5d..2d27befc 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchServerInterceptor.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchServerInterceptor.java @@ -25,7 +25,7 @@ public ServerCall.Listener interceptCall(ServerCall(next.startCall(call, headers)) { - Stopwatch stopwatch = Stopwatch.createStarted(); + private Stopwatch stopwatch = Stopwatch.createStarted(); @Override public void onCancel() { From c11da77424f3519b66a0f32446a954fac205a022 Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Wed, 11 Oct 2017 10:41:20 -0700 Subject: [PATCH 14/16] Test determinism --- .../AmbientContextFreezeClientInterceptor.java | 17 +++++++++++++++++ .../interceptor/StopwatchInterceptorTest.java | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeClientInterceptor.java diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeClientInterceptor.java new file mode 100644 index 00000000..39cd9871 --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeClientInterceptor.java @@ -0,0 +1,17 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.*; + +public class AmbientContextFreezeClientInterceptor implements ClientInterceptor { + @Override + public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + return null; + } +} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/StopwatchInterceptorTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/StopwatchInterceptorTest.java index 02fb49a4..42ad757a 100644 --- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/StopwatchInterceptorTest.java +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/StopwatchInterceptorTest.java @@ -23,7 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; public class StopwatchInterceptorTest { - @Rule public final GrpcServerRule serverRule = new GrpcServerRule(); + @Rule public final GrpcServerRule serverRule = new GrpcServerRule().directExecutor(); GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { @Override From e9a9f1386abdff4adf9f5941692959b7d7e5b466 Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Wed, 11 Oct 2017 13:15:35 -0700 Subject: [PATCH 15/16] Added AmbientContextFreezeServerInterceptor --- ...AmbientContextFreezeClientInterceptor.java | 17 ------ ...AmbientContextFreezeServerInterceptor.java | 60 +++++++++++++++++++ .../AmbientContextFreezeInterceptorTest.java | 59 ++++++++++++++++++ 3 files changed, 119 insertions(+), 17 deletions(-) delete mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeClientInterceptor.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeServerInterceptor.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeInterceptorTest.java diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeClientInterceptor.java deleted file mode 100644 index 39cd9871..00000000 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeClientInterceptor.java +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright (c) 2017, salesforce.com, inc. - * All rights reserved. - * Licensed under the BSD 3-Clause license. - * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause - */ - -package com.salesforce.grpc.contrib.context; - -import io.grpc.*; - -public class AmbientContextFreezeClientInterceptor implements ClientInterceptor { - @Override - public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { - return null; - } -} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeServerInterceptor.java new file mode 100644 index 00000000..9841b1dd --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeServerInterceptor.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.*; + +/** + * {@code AmbientContextFreezeServerInterceptor} freezes the current {@link AmbientContext} for all downstream + * operations. The best place to put this interceptor is at the end of the gRPC interceptor chain. + */ +public class AmbientContextFreezeServerInterceptor implements ServerInterceptor { + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { + return new ForwardingServerCallListener.SimpleForwardingServerCallListener(next.startCall(call, headers)) { + @Override + public void onMessage(ReqT message) { + freezeAndThaw(() -> super.onMessage(message)); + } + + @Override + public void onHalfClose() { + freezeAndThaw(super::onHalfClose); + } + + @Override + public void onCancel() { + freezeAndThaw(super::onCancel); + } + + @Override + public void onComplete() { + freezeAndThaw(super::onComplete); + } + + @Override + public void onReady() { + freezeAndThaw(super::onReady); + } + + private void freezeAndThaw(Runnable delegate) { + if (AmbientContext.isPresent()) { + Object freezeKey = AmbientContext.current().freeze(); + try { + delegate.run(); + } finally { + AmbientContext.current().thaw(freezeKey); + } + } else { + + delegate.run(); + } + } + }; + } +} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeInterceptorTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeInterceptorTest.java new file mode 100644 index 00000000..2eccf9a1 --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeInterceptorTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import com.salesforce.grpc.contrib.GreeterGrpc; +import com.salesforce.grpc.contrib.HelloRequest; +import com.salesforce.grpc.contrib.HelloResponse; +import io.grpc.Context; +import io.grpc.Metadata; +import io.grpc.ServerInterceptors; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import org.junit.Rule; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class AmbientContextFreezeInterceptorTest { + @Rule public final GrpcServerRule serverRule = new GrpcServerRule().directExecutor(); + + private class TestService extends GreeterGrpc.GreeterImplBase { + boolean frozen; + + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + frozen = AmbientContext.current().isFrozen(); + + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + @Test + public void interceptorShouldFreezeContext() { + TestService svc = new TestService(); + + // Plumbing + serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc, + new AmbientContextServerInterceptor("ctx-"), + new AmbientContextFreezeServerInterceptor())); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + Metadata.Key key = Metadata.Key.of("ctx-k", Metadata.ASCII_STRING_MARSHALLER); + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(key, "value"); + stub.sayHello(HelloRequest.newBuilder().setName("World").build()); + }); + + assertThat(svc.frozen).isTrue(); + } +} From 97deb139aec00ecb34d9d413e4aff4f81a693279 Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Wed, 11 Oct 2017 13:54:30 -0700 Subject: [PATCH 16/16] More final --- .../java/com/salesforce/grpc/contrib/MoreMetadata.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreMetadata.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreMetadata.java index eeac64d6..1a3917ff 100644 --- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreMetadata.java +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreMetadata.java @@ -23,7 +23,6 @@ * {@code MoreMetadata} provides additional utilities for working with gRPC {@code Metadata}. */ //CHECKSTYLE:OFF: MethodName -//CHECKSTYLE:OFF: VisibilityModifier public final class MoreMetadata { private MoreMetadata() { } @@ -36,7 +35,7 @@ private MoreMetadata() { } * @param clazz the type to serialize * @param */ - public static Metadata.AsciiMarshaller JSON_MARSHALLER(Class clazz) { + public static final Metadata.AsciiMarshaller JSON_MARSHALLER(Class clazz) { return new Metadata.AsciiMarshaller() { TypeToken typeToken = TypeToken.of(clazz); private Gson gson = new Gson(); @@ -124,7 +123,7 @@ public T parseBytes(byte[] serialized) { /** * A metadata marshaller that encodes boolean values. */ - public static Metadata.AsciiMarshaller BOOLEAN_MARSHALLER = new Metadata.AsciiMarshaller() { + public static final Metadata.AsciiMarshaller BOOLEAN_MARSHALLER = new Metadata.AsciiMarshaller() { @Override public String toAsciiString(Boolean value) { return value.toString(); @@ -139,7 +138,7 @@ public Boolean parseAsciiString(String serialized) { /** * A metadata marshaller that encodes integer-type values. */ - public static Metadata.AsciiMarshaller LONG_MARSHALLER = new Metadata.AsciiMarshaller() { + public static final Metadata.AsciiMarshaller LONG_MARSHALLER = new Metadata.AsciiMarshaller() { @Override public String toAsciiString(Long value) { return value.toString(); @@ -154,7 +153,7 @@ public Long parseAsciiString(String serialized) { /** * A metadata marshaller that encodes floating-point-type values. */ - public static Metadata.AsciiMarshaller DOUBLE_MARSHALLER = new Metadata.AsciiMarshaller() { + public static final Metadata.AsciiMarshaller DOUBLE_MARSHALLER = new Metadata.AsciiMarshaller() { @Override public String toAsciiString(Double value) { return value.toString();