diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreFutures.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreFutures.java
index bbc67a63..28c856cb 100644
--- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreFutures.java
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreFutures.java
@@ -202,7 +202,7 @@ public boolean completeExceptionally(Throwable ex) {
} catch (RuntimeException | Error ex) { // the ListenableFuture failed with a REALLY BAD exception
completableFuture.completeExceptionally(ex);
} catch (InterruptedException ex) {
- completableFuture.completeExceptionally(ex); // Won't happen since get() only called after completion
+ completableFuture.completeExceptionally(ex); // Won't happen since current() only called after completion
}
};
listenableFuture.addListener(callbackRunnable, MoreExecutors.directExecutor());
diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreMetadata.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreMetadata.java
new file mode 100644
index 00000000..1a3917ff
--- /dev/null
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/MoreMetadata.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.protobuf.GeneratedMessageV3;
+import com.google.protobuf.InvalidProtocolBufferException;
+import io.grpc.Metadata;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * {@code MoreMetadata} provides additional utilities for working with gRPC {@code Metadata}.
+ */
+//CHECKSTYLE:OFF: MethodName
+public final class MoreMetadata {
+ private MoreMetadata() { }
+
+ /**
+ * A metadata marshaller that encodes objects as JSON using the google-gson library.
+ *
+ * All non-ascii characters are unicode escaped to comply with {@code AsciiMarshaller}'s character range
+ * requirements.
+ *
+ * @param clazz the type to serialize
+ * @param
+ */
+ public static final Metadata.AsciiMarshaller JSON_MARSHALLER(Class clazz) {
+ return new Metadata.AsciiMarshaller() {
+ TypeToken typeToken = TypeToken.of(clazz);
+ private Gson gson = new Gson();
+
+ @Override
+ public String toAsciiString(T value) {
+ try {
+ try (StringWriter sw = new StringWriter()) {
+ gson.toJson(value, typeToken.getType(), new UnicodeEscapingAsciiWriter(sw));
+ return sw.toString();
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ @Override
+ public T parseAsciiString(String serialized) {
+ return gson.fromJson(serialized, typeToken.getType());
+ }
+ };
+ }
+
+ /**
+ * See: https://github.com/google/gson/issues/388.
+ */
+ private static final class UnicodeEscapingAsciiWriter extends Writer {
+ private final Writer out;
+
+ private UnicodeEscapingAsciiWriter(Writer out) {
+ this.out = out;
+ }
+
+ @Override public void write(char[] buffer, int offset, int count) throws IOException {
+ for (int i = 0; i < count; i++) {
+ char c = buffer[i + offset];
+ if (c >= ' ' && c <= '~') {
+ out.write(c);
+ } else {
+ out.write(String.format("\\u%04x", (int) c));
+ }
+ }
+ }
+
+ @Override public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override public void close() throws IOException {
+ out.close();
+ }
+ }
+
+ /**
+ * A metadata marshaller that encodes objects as protobuf according to their proto IDL specification.
+ *
+ * @param clazz the type to serialize
+ * @param
+ */
+ public static Metadata.BinaryMarshaller PROTOBUF_MARSHALLER(Class clazz) {
+ try {
+ Method defaultInstance = clazz.getMethod("getDefaultInstance");
+ GeneratedMessageV3 instance = (GeneratedMessageV3) defaultInstance.invoke(null);
+
+ return new Metadata.BinaryMarshaller() {
+ @Override
+ public byte[] toBytes(T value) {
+ return value.toByteArray();
+ }
+
+ @Override
+ public T parseBytes(byte[] serialized) {
+ try {
+ return (T) instance.getParserForType().parseFrom(serialized);
+ } catch (InvalidProtocolBufferException ipbe) {
+ throw new IllegalArgumentException(ipbe);
+ }
+ }
+ };
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ /**
+ * A metadata marshaller that encodes boolean values.
+ */
+ public static final Metadata.AsciiMarshaller BOOLEAN_MARSHALLER = new Metadata.AsciiMarshaller() {
+ @Override
+ public String toAsciiString(Boolean value) {
+ return value.toString();
+ }
+
+ @Override
+ public Boolean parseAsciiString(String serialized) {
+ return Boolean.parseBoolean(serialized);
+ }
+ };
+
+ /**
+ * A metadata marshaller that encodes integer-type values.
+ */
+ public static final Metadata.AsciiMarshaller LONG_MARSHALLER = new Metadata.AsciiMarshaller() {
+ @Override
+ public String toAsciiString(Long value) {
+ return value.toString();
+ }
+
+ @Override
+ public Long parseAsciiString(String serialized) {
+ return Long.parseLong(serialized);
+ }
+ };
+
+ /**
+ * A metadata marshaller that encodes floating-point-type values.
+ */
+ public static final Metadata.AsciiMarshaller DOUBLE_MARSHALLER = new Metadata.AsciiMarshaller() {
+ @Override
+ public String toAsciiString(Double value) {
+ return value.toString();
+ }
+
+ @Override
+ public Double parseAsciiString(String serialized) {
+ return Double.parseDouble(serialized);
+ }
+ };
+}
diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java
new file mode 100644
index 00000000..745a795c
--- /dev/null
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java
@@ -0,0 +1,229 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.context;
+
+import io.grpc.Context;
+import io.grpc.Metadata;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.*;
+
+
+/**
+ * {@code AmbientContext} is entry point for working with the ambient context managed by {@link AmbientContextClientInterceptor}
+ * and {@link AmbientContextServerInterceptor}. The interface for this class is very similar to gRPC's {@code Metadata}
+ * class.
+ *
+ * This class is not thread safe, implementations should ensure that ambient context reads and writes do
+ * not occur in multiple threads concurrently.
+ *
+ *
See package javadoc for more info.
+ */
+@NotThreadSafe
+public final class AmbientContext {
+ static final Context.Key DATA_KEY = Context.key("AmbientContext");
+
+ /**
+ * Attaches an empty ambient context to the provided gRPC {@code Context}.
+ *
+ * @throws IllegalStateException if an ambient context has already been attached to the
+ * provided gRPC {@code Context}.
+ */
+ public static Context initialize(Context context) {
+ checkNotNull(context, "context");
+ checkState(DATA_KEY.get(context) == null,
+ "AmbientContext has already been created in the scope of the current context");
+ return context.withValue(DATA_KEY, new AmbientContext());
+ }
+
+ /**
+ * Returns the ambient context attached to the current gRPC {@code Context}.
+ *
+ * @throws IllegalStateException if no ambient context is attached to the current gRPC {@code Context}.
+ */
+ public static AmbientContext current() {
+ checkState(DATA_KEY.get() != null,
+ "AmbientContext has not yet been created in the scope of the current context");
+ return DATA_KEY.get();
+ }
+
+ /**
+ * @return true if an {@code AmbientContext} is attached to the current gRPC context.
+ */
+ public static boolean isPresent() {
+ return DATA_KEY.get() != null;
+ }
+
+ private Metadata contextMetadata;
+ private Object freezeKey = null;
+
+ AmbientContext() {
+ this.contextMetadata = new Metadata();
+ }
+
+ /**
+ * Copy constructor.
+ */
+ AmbientContext(AmbientContext other) {
+ this();
+ this.contextMetadata.merge(other.contextMetadata);
+ }
+
+ /**
+ * Makes the AmbientContext as read-only, preventing any further modification. A "freeze key" is returned, which
+ * can be used to {@link #thaw(Object)} the AmbientContext in the future.
+ *
+ * {@code freeze()} and {@code thaw()} are typically used to mark the ambient context read-only when the
+ * interceptor chain completes.
+ *
+ * @return a "freeze key" that can be used passed to {@link #thaw(Object)}
+ *
+ * @throws IllegalStateException if the AmbientContext is already frozen
+ */
+ public Object freeze() {
+ checkState(!isFrozen(), "AmbientContext already frozen. Cannot freeze() twice.");
+ freezeKey = new Object();
+ return freezeKey;
+ }
+
+ /**
+ * Makes the AmbientContext mutable again, after {@link #freeze()} has been called. A "freeze key" is needed to
+ * unfreeze the AmbientContext, ensuring only the code that froze the context can subsequently thaw it.
+ *
+ *
{@code freeze()} and {@code thaw()} are typically used to mark the ambient context read-only when the
+ * interceptor chain completes.
+ *
+ * @param freezeKey the "freeze key" returned by {@link #freeze()}
+ *
+ * @throws IllegalStateException if the AmbientContext has not yet been frozen
+ * @throws IllegalArgumentException if the {@code freezeKey} is incorrect
+ */
+ public void thaw(Object freezeKey) {
+ checkState(isFrozen(), "AmbientContext is not frozen. Cannot thaw().");
+ checkArgument(this.freezeKey == freezeKey,
+ "The provided freezeKey is not the same object returned by freeze()");
+ this.freezeKey = null;
+ }
+
+ /**
+ * Similar to {@link #initialize(Context)}, {@code fork()} attaches a shallow clone of this {@code AmbientContext}
+ * to a provided gRPC {@code Context}. Use {@code fork()} when you want create a temporary context scope.
+ *
+ * @param context
+ * @return
+ */
+ public Context fork(Context context) {
+ return context.withValue(DATA_KEY, new AmbientContext(this));
+ }
+
+ /**
+ * @return true of the AmbientContext has been frozen
+ */
+ public boolean isFrozen() {
+ return freezeKey != null;
+ }
+
+ private void checkFreeze() {
+ checkState(freezeKey == null, "AmbientContext cannot be modified while frozen");
+ }
+
+ /**
+ * Returns true if a value is defined for the given key.
+ *
+ *
This is done by linear search, so if it is followed by {@link #get} or {@link #getAll},
+ * prefer calling them directly and checking the return value against {@code null}.
+ */
+ public boolean containsKey(Metadata.Key> key) {
+ return contextMetadata.containsKey(key);
+ }
+
+ /**
+ * Remove all values for the given key without returning them. This is a minor performance
+ * optimization if you do not need the previous values.
+ *
+ * @throws IllegalStateException if the AmbientContext is frozen
+ */
+ public void discardAll(Metadata.Key key) {
+ checkFreeze();
+ contextMetadata.discardAll(key);
+ }
+
+ /**
+ * Returns the last ambient context entry added with the name 'name' parsed as T.
+ *
+ * @return the parsed metadata entry or null if there are none.
+ */
+ @Nullable
+ public T get(Metadata.Key key) {
+ return contextMetadata.get(key);
+ }
+
+ /**
+ * Returns all the ambient context entries named 'name', in the order they were received, parsed as T, or
+ * null if there are none. The iterator is not guaranteed to be "live." It may or may not be
+ * accurate if the ambient context is mutated.
+ */
+ @Nullable
+ public Iterable getAll(final Metadata.Key key) {
+ return contextMetadata.getAll(key);
+ }
+
+ /**
+ * Returns set of all keys in store.
+ *
+ * @return unmodifiable Set of keys
+ */
+ public Set keys() {
+ return contextMetadata.keys();
+ }
+
+ /**
+ * Adds the {@code key, value} pair. If {@code key} already has values, {@code value} is added to
+ * the end. Duplicate values for the same key are permitted.
+ *
+ * @throws NullPointerException if key or value is null
+ * @throws IllegalStateException if the AmbientContext is frozen
+ */
+ public void put(Metadata.Key key, T value) {
+ checkFreeze();
+ contextMetadata.put(key, value);
+ }
+
+ /**
+ * Removes the first occurrence of {@code value} for {@code key}.
+ *
+ * @param key key for value
+ * @param value value
+ * @return {@code true} if {@code value} removed; {@code false} if {@code value} was not present
+ *
+ * @throws NullPointerException if {@code key} or {@code value} is null
+ * @throws IllegalStateException if the AmbientContext is frozen
+ */
+ public boolean remove(Metadata.Key key, T value) {
+ checkFreeze();
+ return contextMetadata.remove(key, value);
+ }
+
+ /**
+ * Remove all values for the given key. If there were no values, {@code null} is returned.
+ *
+ * @throws IllegalStateException if the AmbientContext is frozen
+ */
+ public Iterable removeAll(Metadata.Key key) {
+ checkFreeze();
+ return contextMetadata.removeAll(key);
+ }
+
+ @Override
+ public String toString() {
+ return (isFrozen() ? "[FROZEN] " : "[THAWED] ") + contextMetadata.toString();
+ }
+}
diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java
new file mode 100644
index 00000000..16c13e95
--- /dev/null
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.context;
+
+import io.grpc.*;
+
+/**
+ * {@code AmbientContextClientInterceptor} transparently deserializes prefixed request headers into an ambient context.
+ * Header values can be accessed using the {@link AmbientContext} class.
+ *
+ * Each {@code AmbientContextClientInterceptor} marshals headers with a know prefix. If multiple prefixes are needed,
+ * add multiple {@code AmbientContextClientInterceptor} instances to the gRPC interceptor chain.
+ *
+ *
See package javadoc for more info.
+ */
+public class AmbientContextClientInterceptor implements ClientInterceptor {
+ private String headerPrefix;
+
+ /**
+ * Constructs an {@code AmbientContextClientInterceptor} that marshals request headers with a know prefix into the
+ * {@link AmbientContext}.
+ *
+ * @param headerPrefix the header prefix to marshal.
+ */
+ public AmbientContextClientInterceptor(String headerPrefix) {
+ this.headerPrefix = headerPrefix;
+ }
+
+ @Override
+ public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) {
+ if (AmbientContext.isPresent()) {
+ return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) {
+ @Override
+ public void start(Listener responseListener, Metadata headers) {
+ AmbientContext ctx = AmbientContext.current();
+ if (ctx != null) {
+ for (String keyString : ctx.keys()) {
+ if (keyString.startsWith(headerPrefix)) {
+ if (keyString.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+ Metadata.Key key = Metadata.Key.of(keyString, Metadata.BINARY_BYTE_MARSHALLER);
+ Iterable values = ctx.getAll(key);
+ if (values != null) {
+ for (byte[] value : values) {
+ headers.put(key, value);
+ }
+ }
+ } else {
+ Metadata.Key key = Metadata.Key.of(keyString, Metadata.ASCII_STRING_MARSHALLER);
+ Iterable values = ctx.getAll(key);
+ if (values != null) {
+ for (String value : values) {
+ headers.put(key, value);
+ }
+ }
+ }
+ }
+ }
+ }
+ super.start(responseListener, headers);
+ }
+ };
+ } else {
+ // Noop if ambient context is absent
+ return next.newCall(method, callOptions);
+ }
+ }
+}
diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerClientInterceptor.java
new file mode 100644
index 00000000..f4972ea5
--- /dev/null
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerClientInterceptor.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.context;
+
+import io.grpc.*;
+
+/**
+ * Some uses of ambient context, like distributed tracing, break down if a service fails to propagate the context
+ * to downstream services. This typically happens when multi-threading code fails to correctly transfer the gRPC
+ * context to worker threads.
+ *
+ * {@code AmbientContextEnforcerClientInterceptor} is used to enforce context propagation by catastrophically
+ * failing downstream service calls if the ambient context is missing or incomplete.
+ */
+public class AmbientContextEnforcerClientInterceptor implements ClientInterceptor {
+ private String[] requiredContextKeys = {};
+
+ /**
+ * Constructs an {@code AmbientContextEnforcerClientInterceptor} with no required context keys.
+ */
+ public AmbientContextEnforcerClientInterceptor() {
+ }
+
+ /**
+ * Constructs an {@code AmbientContextEnforcerClientInterceptor} with a set of required context keys.
+ */
+ public AmbientContextEnforcerClientInterceptor(String... requiredContextKeys) {
+ this.requiredContextKeys = requiredContextKeys;
+ }
+
+ /**
+ * MissingAmbientContextException is thrown to indicate a breakdown in context continuity between services.
+ */
+ public static class MissingAmbientContextException extends RuntimeException {
+ public MissingAmbientContextException(String message) {
+ super(message);
+ }
+ }
+
+ @Override
+ public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) {
+ // Throw if ambient context is missing
+ if (!AmbientContext.isPresent()) {
+ throw missingContextException();
+ }
+
+ // Throw if required context keys are missing
+ for (String requiredKey : requiredContextKeys) {
+ if (!AmbientContext.current().keys().contains(requiredKey)) {
+ throw incompleteContextException(requiredKey);
+ }
+ }
+
+ return next.newCall(method, callOptions);
+ }
+
+ /**
+ * Override this method to change the exception type or message thrown when the ambient context is missing
+ * entirely, perhaps to reference your own internal documentation.
+ *
+ * @return a RuntimeException
+ */
+ protected RuntimeException missingContextException() {
+ return new MissingAmbientContextException("No AmbientContext is attached to the current gRPC Context. " +
+ "Make sure Context is correctly transferred between worker threads using Context.wrap() or " +
+ "Context.currentContextExecutor().");
+ }
+
+ /**
+ * Override this method to change the exception type or message thrown when the ambient context is missing a
+ * required context key, perhaps to reference your own internal documentation.
+ *
+ * @return a RuntimeException
+ */
+ protected RuntimeException incompleteContextException(String missingKey) {
+ return new MissingAmbientContextException("The AmbientContext is missing a required context key: " + missingKey);
+ }
+}
diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerServerInterceptor.java
new file mode 100644
index 00000000..b0d6cf03
--- /dev/null
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerServerInterceptor.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.context;
+
+import io.grpc.*;
+
+/**
+ * Some uses of ambient context, like distributed tracing, break down if a service fails to propagate the context
+ * to downstream services.
+ *
+ * {@code AmbientContextEnforcerServerInterceptor} is used to enforce context propagation by catastrophically
+ * failing a service call if the ambient context is missing or incomplete.
+ */
+public class AmbientContextEnforcerServerInterceptor implements ServerInterceptor {
+ private String[] requiredContextKeys = {};
+
+ /**
+ * Constructs an {@code AmbientContextEnforcerServerInterceptor} with no required context keys.
+ */
+ public AmbientContextEnforcerServerInterceptor() {
+ }
+
+ /**
+ * Constructs an {@code AmbientContextEnforcerServerInterceptor} with a set of required context keys.
+ */
+ public AmbientContextEnforcerServerInterceptor(String... requiredContextKeys) {
+ this.requiredContextKeys = requiredContextKeys;
+ }
+
+ @Override
+ public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) {
+ // Throw if ambient context is missing
+ if (!AmbientContext.isPresent()) {
+ call.close(missingContextStatus(), new Metadata());
+ return new ServerCall.Listener() { };
+ }
+
+ // Throw if required context keys are missing
+ for (String requiredKey : requiredContextKeys) {
+ if (!AmbientContext.current().keys().contains(requiredKey)) {
+ call.close(incompleteContextStatus(requiredKey), new Metadata());
+ return new ServerCall.Listener() { };
+ }
+ }
+
+ return next.startCall(call, headers);
+ }
+
+ /**
+ * Override this method to change the gRPC {@code Status} returned when the ambient context is missing
+ * entirely, perhaps to reference your own internal documentation.
+ *
+ * @return a gRPC Status
+ */
+ protected Status missingContextStatus() {
+ return Status.FAILED_PRECONDITION.withDescription("Ambient context is required but not found");
+ }
+
+ /**
+ * Override this method to change the gRPC {@code Status} returned when the ambient context is missing a
+ * required context key, perhaps to reference your own internal documentation.
+ *
+ * @return a gRPC Status
+ */
+ protected Status incompleteContextStatus(String missingKey) {
+ return Status.FAILED_PRECONDITION.withDescription("Required ambient context key " + missingKey + " was not found");
+ }
+}
diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeServerInterceptor.java
new file mode 100644
index 00000000..9841b1dd
--- /dev/null
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeServerInterceptor.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.context;
+
+import io.grpc.*;
+
+/**
+ * {@code AmbientContextFreezeServerInterceptor} freezes the current {@link AmbientContext} for all downstream
+ * operations. The best place to put this interceptor is at the end of the gRPC interceptor chain.
+ */
+public class AmbientContextFreezeServerInterceptor implements ServerInterceptor {
+ @Override
+ public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) {
+ return new ForwardingServerCallListener.SimpleForwardingServerCallListener(next.startCall(call, headers)) {
+ @Override
+ public void onMessage(ReqT message) {
+ freezeAndThaw(() -> super.onMessage(message));
+ }
+
+ @Override
+ public void onHalfClose() {
+ freezeAndThaw(super::onHalfClose);
+ }
+
+ @Override
+ public void onCancel() {
+ freezeAndThaw(super::onCancel);
+ }
+
+ @Override
+ public void onComplete() {
+ freezeAndThaw(super::onComplete);
+ }
+
+ @Override
+ public void onReady() {
+ freezeAndThaw(super::onReady);
+ }
+
+ private void freezeAndThaw(Runnable delegate) {
+ if (AmbientContext.isPresent()) {
+ Object freezeKey = AmbientContext.current().freeze();
+ try {
+ delegate.run();
+ } finally {
+ AmbientContext.current().thaw(freezeKey);
+ }
+ } else {
+
+ delegate.run();
+ }
+ }
+ };
+ }
+}
diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java
new file mode 100644
index 00000000..c732e86c
--- /dev/null
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.context;
+
+import io.grpc.*;
+
+/**
+ * {@code AmbientContextServerInterceptor} transparently serializes prefixed ambient context values into outbound request
+ * headers.
+ *
+ * Each {@code AmbientContextServerInterceptor} marshals headers with a know prefix. If multiple prefixes are needed,
+ * add multiple {@code AmbientContextServerInterceptor} instances to the gRPC interceptor chain.
+ *
+ *
See package javadoc for more info.
+ */
+public class AmbientContextServerInterceptor implements ServerInterceptor {
+ private String headerPrefix;
+
+ /**
+ * Constructs an {@code AmbientContextServerInterceptor} that marshals ambient context values with a know prefix
+ * into outbound request headers.
+ * {@link AmbientContext}.
+ *
+ * @param headerPrefix the header prefix to marshal.
+ */
+ public AmbientContextServerInterceptor(String headerPrefix) {
+ this.headerPrefix = headerPrefix;
+ }
+
+ @Override
+ public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) {
+ AmbientContext ctx = AmbientContext.DATA_KEY.get();
+ // Only initialize ctx if not yet initialized
+ ctx = ctx != null ? ctx : new AmbientContext();
+
+ boolean found = false;
+ for (String keyName : headers.keys()) {
+ if (!keyName.startsWith(headerPrefix)) {
+ continue;
+ }
+
+ if (keyName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+ Metadata.Key key = Metadata.Key.of(keyName, Metadata.BINARY_BYTE_MARSHALLER);
+ Iterable values = headers.getAll(key);
+ if (values == null) {
+ continue;
+ }
+
+ for (byte[] value : values) {
+ ctx.put(key, value);
+ }
+ } else {
+ Metadata.Key key = Metadata.Key.of(keyName, Metadata.ASCII_STRING_MARSHALLER);
+ Iterable values = headers.getAll(key);
+ if (values == null) {
+ continue;
+ }
+
+ for (String value : values) {
+ ctx.put(key, value);
+ }
+ }
+
+ found = true;
+ }
+
+ if (found) {
+ return Contexts.interceptCall(Context.current().withValue(AmbientContext.DATA_KEY, ctx), call, headers, next);
+ } else {
+ // Don't attach a context if there is nothing to attach
+ return next.startCall(call, headers);
+ }
+ }
+}
\ No newline at end of file
diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java
new file mode 100644
index 00000000..1108a38c
--- /dev/null
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+/**
+ * The classes in this package are used together to implement the Ambient Context pattern, where context values are
+ * transparently marshaled from service to service without user intervention. This pattern is particularly useful for
+ * transparently populating system information like Zipkin trace ids and logStop correlation ids on every service call in
+ * a call graph.
+ *
+ * The Ambient Context pattern is implemented in gRPC using the
+ * {@link com.salesforce.grpc.contrib.context.AmbientContextServerInterceptor} and
+ * {@link com.salesforce.grpc.contrib.context.AmbientContextClientInterceptor}. Together these interceptors marshall
+ * context headers from the gRPC {@code Context} into outbound request {@code Metadata}, and from inbound request
+ * {@code Metadata} into the gRPC {@code Context}.
+ *
+ *
Within your service and infrastructure code, the {@link com.salesforce.grpc.contrib.context.AmbientContext} class
+ * provides utility methods for interacting with the ambient context. {@code AmbientContext}s API is similar to gRPC's
+ * {@code Metadata} API, with the addition of the {@code freeze()} and {@code thaw()} operations used to toggle
+ * the {@code AmbientContext}'s read-only status.
+ *
+ *
Freezing the ambient context is useful for protecting its contents when transitioning between platform
+ * infrastructure code and service implementation code. In many cases, platform infrastructure code and service
+ * implementations are written by different groups with different goals. Platform infrastructure implementors typically
+ * own inter-service functions like distributed tracing, logStop correlation, and service metrics, typically implemented
+ * using the gRPC interceptor chain. For these developers ambient context must be mutable. Service implementors, on
+ * the other hand, own the business logic for each service. For these developers it can make sense for the ambient
+ * context to be immutable to prevent unnecessary pollution of the ambient context. Freezing and thawing the context
+ * allows it to seamlessly transition between mutable and immutable modes as needed by each audience.
+ *
+ * @see
+ * The Ambient Context Design Pattern in .NET
+ */
+package com.salesforce.grpc.contrib.context;
\ No newline at end of file
diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptor.java
index ebcbde8a..80e5c221 100644
--- a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptor.java
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptor.java
@@ -10,6 +10,7 @@
import com.google.common.base.Preconditions;
import io.grpc.*;
+import java.time.Duration;
import java.util.concurrent.TimeUnit;
/**
@@ -18,21 +19,38 @@
* implicit deadline will be used instead.
*/
public class DefaultDeadlineInterceptor implements ClientInterceptor {
- private final long duration;
- private final TimeUnit timeUnit;
+ private Duration duration;
- public DefaultDeadlineInterceptor(long duration, TimeUnit timeUnit) {
- Preconditions.checkArgument(duration > 0, "duration must be greater than zero");
- Preconditions.checkNotNull(timeUnit, "timeUnit");
+ public DefaultDeadlineInterceptor(Duration duration) {
+ Preconditions.checkNotNull(duration, "duration");
+ Preconditions.checkArgument(!duration.isNegative(), "duration must be greater than zero");
this.duration = duration;
- this.timeUnit = timeUnit;
+ }
+
+ /**
+ * Get the current default deadline duration.
+ *
+ * @return the current default deadline duration
+ */
+ public Duration getDuration() {
+ return duration;
+ }
+
+ /**
+ * Set a new default deadline duration.
+ *
+ * @param duration the new default deadline duration
+ */
+ public void setDuration(Duration duration) {
+ this.duration = duration;
}
@Override
public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) {
+ // Only add a deadline if no other deadline has been set.
if (callOptions.getDeadline() == null && Context.current().getDeadline() == null) {
- callOptions = callOptions.withDeadlineAfter(duration, timeUnit);
+ callOptions = callOptions.withDeadlineAfter(duration.toMillis(), TimeUnit.MILLISECONDS);
}
return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) {
diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchClientInterceptor.java
new file mode 100644
index 00000000..9d332111
--- /dev/null
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchClientInterceptor.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.interceptor;
+
+import com.google.common.base.Stopwatch;
+import io.grpc.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@code StopwatchClientInterceptor} logs the beginning and end of an outbound gRPC request, along with the total
+ * round-trip time.
+ *
+ * Typical usage would override {@link #logStart(MethodDescriptor)} and {@link #logStop(MethodDescriptor, Duration)}.
+ */
+public class StopwatchClientInterceptor implements ClientInterceptor {
+ private final Logger logger = LoggerFactory.getLogger(StopwatchClientInterceptor.class);
+
+ @Override
+ public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) {
+ logStart(method);
+
+ return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) {
+ private Stopwatch stopwatch = Stopwatch.createStarted();
+
+ @Override
+ public void start(Listener responseListener, Metadata headers) {
+ super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener(responseListener) {
+ @Override
+ public void onClose(Status status, Metadata trailers) {
+ super.onClose(status, trailers);
+ logStop(method, Duration.ofNanos(stopwatch.stop().elapsed(TimeUnit.NANOSECONDS)));
+ }
+ }, headers);
+ }
+ };
+ }
+
+ /**
+ * Override this method to change how start messages are logged. Ex: use log4j.
+ *
+ * @param method The operation being called
+ */
+ protected void logStart(MethodDescriptor method) {
+ logger.info("Begin call op:" + method.getFullMethodName());
+ }
+
+ /**
+ * Override this method to change how stop messages are logged. Ex: use log4j.
+ *
+ * @param method The operation being called
+ * @param duration Total round-trip time
+ */
+ protected void logStop(MethodDescriptor method, Duration duration) {
+ logger.info("End call op:" + method.getFullMethodName() + " duration:" + duration);
+ }
+}
diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchServerInterceptor.java
new file mode 100644
index 00000000..2d27befc
--- /dev/null
+++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/interceptor/StopwatchServerInterceptor.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.interceptor;
+
+import com.google.common.base.Stopwatch;
+import io.grpc.*;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@code StopwatchServerInterceptor} logs the beginning and end of an inbound gRPC request, along with the total
+ * processing time.
+ *
+ * Typical usage would override {@link #logStart(MethodDescriptor)} and {@link #logStop(MethodDescriptor, Duration)}.
+ */
+public class StopwatchServerInterceptor implements ServerInterceptor {
+ @Override
+ public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) {
+ logStart(call.getMethodDescriptor());
+
+ return new ForwardingServerCallListener.SimpleForwardingServerCallListener(next.startCall(call, headers)) {
+ private Stopwatch stopwatch = Stopwatch.createStarted();
+
+ @Override
+ public void onCancel() {
+ super.onCancel();
+ logCancel(call.getMethodDescriptor(), Duration.ofNanos(stopwatch.stop().elapsed(TimeUnit.NANOSECONDS)));
+ }
+
+ @Override
+ public void onComplete() {
+ super.onComplete();
+ logStop(call.getMethodDescriptor(), Duration.ofNanos(stopwatch.stop().elapsed(TimeUnit.NANOSECONDS)));
+ }
+ };
+ }
+
+ /**
+ * Override this method to change how "start" messages are logged. Ex: use log4j.
+ *
+ * @param method The operation being called
+ */
+ protected void logStart(MethodDescriptor method) {
+ System.out.println("Begin service op:" + method.getFullMethodName());
+ }
+
+ /**
+ * Override this method to change how "stop" messages are logged. Ex: use log4j.
+ *
+ * @param method The operation being called
+ * @param duration Total round-trip time
+ */
+ protected void logStop(MethodDescriptor method, Duration duration) {
+ System.out.println("End service op:" + method.getFullMethodName() + " duration:" + duration);
+ }
+
+ /**
+ * Override this method to change how "cancel" messages are logged. Ex: use log4j.
+ *
+ * By default, this delegates to {@link #logStop(MethodDescriptor, Duration)}.
+ *
+ * @param method The operation being called
+ * @param duration Total round-trip time
+ */
+ protected void logCancel(MethodDescriptor method, Duration duration) {
+ logStop(method, duration);
+ }
+}
diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/MoreMetadataTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/MoreMetadataTest.java
new file mode 100644
index 00000000..da6fb5ec
--- /dev/null
+++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/MoreMetadataTest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib;
+
+import com.google.common.base.Objects;
+import io.grpc.Metadata;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class MoreMetadataTest {
+ @Test
+ public void jsonMarshallerRoundtrip() {
+ Foo foo = new Foo();
+ foo.country = "France";
+ List bars = new ArrayList<>();
+ Bar bar1 = new Bar();
+ bar1.cheese = "Brë";
+ bar1.age = 2;
+ bars.add(bar1);
+ Bar bar2 = new Bar();
+ bar2.cheese = "Guda<>'";
+ bar2.age = 4;
+ bars.add(bar2);
+ foo.bars = bars;
+
+ Metadata.AsciiMarshaller marshaller = MoreMetadata.JSON_MARSHALLER(Foo.class);
+ String str = marshaller.toAsciiString(foo);
+ assertThat(str).doesNotContain("ë");
+
+ Foo foo2 = marshaller.parseAsciiString(str);
+ assertThat(foo2).isEqualTo(foo);
+ }
+
+ private class Foo {
+ String country;
+ List bars;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Foo foo = (Foo) o;
+ return Objects.equal(country, foo.country) &&
+ Objects.equal(bars, foo.bars);
+ }
+ }
+
+ private class Bar {
+ String cheese;
+ int age;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Bar bar = (Bar) o;
+ return age == bar.age &&
+ Objects.equal(cheese, bar.cheese);
+ }
+ }
+
+ @Test
+ public void protobufMarshallerRoundtrip() {
+ HelloRequest request = HelloRequest.newBuilder().setName("World").build();
+
+ Metadata.BinaryMarshaller marshaller = MoreMetadata.PROTOBUF_MARSHALLER(HelloRequest.class);
+ byte[] bytes = marshaller.toBytes(request);
+ HelloRequest request2 = marshaller.parseBytes(bytes);
+
+ assertThat(request2).isEqualTo(request);
+ }
+
+ @Test
+ public void booleanMarshallerRountrip() {
+ Metadata.AsciiMarshaller marshaller = MoreMetadata.BOOLEAN_MARSHALLER;
+ String s = marshaller.toAsciiString(Boolean.TRUE);
+ assertThat(s).isEqualTo("true");
+
+ Boolean b = marshaller.parseAsciiString(s);
+ assertThat(b).isTrue();
+ }
+
+ @Test
+ public void longMarshallerRountrip() {
+ Metadata.AsciiMarshaller marshaller = MoreMetadata.LONG_MARSHALLER;
+ String s = marshaller.toAsciiString(42L);
+ assertThat(s).isEqualTo("42");
+
+ Long l = marshaller.parseAsciiString(s);
+ assertThat(l).isEqualTo(42L);
+ }
+
+ @Test
+ public void doubleMarshallerRountrip() {
+ Metadata.AsciiMarshaller marshaller = MoreMetadata.DOUBLE_MARSHALLER;
+ String s = marshaller.toAsciiString(42.42);
+ assertThat(s).isEqualTo("42.42");
+
+ Double d = marshaller.parseAsciiString(s);
+ assertThat(d).isEqualTo(42.42);
+ }
+}
diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerTest.java
new file mode 100644
index 00000000..a91d26bd
--- /dev/null
+++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerTest.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.context;
+
+import com.salesforce.grpc.contrib.GreeterGrpc;
+import com.salesforce.grpc.contrib.HelloRequest;
+import com.salesforce.grpc.contrib.HelloResponse;
+import com.salesforce.grpc.contrib.Statuses;
+import io.grpc.*;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcServerRule;
+import org.assertj.core.api.Condition;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class AmbientContextEnforcerTest {
+ @Rule public final GrpcServerRule serverRule = new GrpcServerRule();
+
+ GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
+ @Override
+ public void sayHello(HelloRequest request, StreamObserver responseObserver) {
+ responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build());
+ responseObserver.onCompleted();
+ }
+ };
+
+ @Test
+ public void clientEnforcerPasses() {
+ // Plumbing
+ serverRule.getServiceRegistry().addService(svc);
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
+ .newBlockingStub(serverRule.getChannel())
+ .withInterceptors(new AmbientContextEnforcerClientInterceptor("ctx-test"));
+
+ // Test
+ AmbientContext.initialize(Context.current()).run(() -> {
+ Metadata.Key k = Metadata.Key.of("ctx-test", Metadata.ASCII_STRING_MARSHALLER);
+ AmbientContext.current().put(k, "v");
+
+ stub.sayHello(HelloRequest.newBuilder().setName("World").build());
+ });
+ }
+
+ @Test
+ public void clientEnforcerFailsMissing() {
+ // Plumbing
+ serverRule.getServiceRegistry().addService(svc);
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
+ .newBlockingStub(serverRule.getChannel())
+ .withInterceptors(new AmbientContextEnforcerClientInterceptor("ctx-test"));
+
+ // Test
+ assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
+ .isInstanceOf(AmbientContextEnforcerClientInterceptor.MissingAmbientContextException.class);
+ }
+
+ @Test
+ public void clientEnforcerFailsIncomplete() {
+ // Plumbing
+ serverRule.getServiceRegistry().addService(svc);
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
+ .newBlockingStub(serverRule.getChannel())
+ .withInterceptors(new AmbientContextEnforcerClientInterceptor("ctx-test"));
+
+ // Test
+ AmbientContext.initialize(Context.current()).run(() -> {
+ assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
+ .isInstanceOf(AmbientContextEnforcerClientInterceptor.MissingAmbientContextException.class)
+ .hasMessageContaining("ctx-test");
+ });
+ }
+
+ @Test
+ public void serverEnforcerPasses() {
+ // Plumbing
+ serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc,
+ new AmbientContextServerInterceptor("ctx-"),
+ new AmbientContextEnforcerServerInterceptor("ctx-test")
+ ));
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
+ .newBlockingStub(serverRule.getChannel())
+ .withInterceptors(new AmbientContextClientInterceptor("ctx-"));
+
+ // Test
+ AmbientContext.initialize(Context.current()).run(() -> {
+ Metadata.Key k = Metadata.Key.of("ctx-test", Metadata.ASCII_STRING_MARSHALLER);
+ AmbientContext.current().put(k, "v");
+
+ stub.sayHello(HelloRequest.newBuilder().setName("World").build());
+ });
+ }
+
+ @Test
+ public void serverEnforcerFailsMissing() {
+ // Plumbing
+ serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc,
+ new AmbientContextServerInterceptor("ctx-"),
+ new AmbientContextEnforcerServerInterceptor("ctx-test")
+ ));
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
+ .newBlockingStub(serverRule.getChannel())
+ .withInterceptors(new AmbientContextClientInterceptor("ctx-"));
+
+ // Test
+ assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
+ .isInstanceOfAny(StatusRuntimeException.class)
+ .has(new Condition() {
+ @Override
+ public boolean matches(Throwable throwable) {
+ return Statuses.hasStatusCode(throwable, Status.Code.FAILED_PRECONDITION);
+ }
+ });
+ }
+
+ @Test
+ public void serverEnforcerFailsIncomplete() {
+ // Plumbing
+ serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc,
+ new AmbientContextServerInterceptor("ctx-"),
+ new AmbientContextEnforcerServerInterceptor("ctx-test")
+ ));
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
+ .newBlockingStub(serverRule.getChannel())
+ .withInterceptors(new AmbientContextClientInterceptor("ctx-"));
+
+ // Test
+ AmbientContext.initialize(Context.current()).run(() -> {
+ Metadata.Key k = Metadata.Key.of("ctx-unrelated", Metadata.ASCII_STRING_MARSHALLER);
+ AmbientContext.current().put(k, "v");
+
+ assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build()))
+ .isInstanceOfAny(StatusRuntimeException.class)
+ .hasMessageContaining("ctx-test")
+ .has(new Condition() {
+ @Override
+ public boolean matches(Throwable throwable) {
+ return Statuses.hasStatusCode(throwable, Status.Code.FAILED_PRECONDITION);
+ }
+ });
+ });
+ }
+}
diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeInterceptorTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeInterceptorTest.java
new file mode 100644
index 00000000..2eccf9a1
--- /dev/null
+++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeInterceptorTest.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.context;
+
+import com.salesforce.grpc.contrib.GreeterGrpc;
+import com.salesforce.grpc.contrib.HelloRequest;
+import com.salesforce.grpc.contrib.HelloResponse;
+import io.grpc.Context;
+import io.grpc.Metadata;
+import io.grpc.ServerInterceptors;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcServerRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class AmbientContextFreezeInterceptorTest {
+ @Rule public final GrpcServerRule serverRule = new GrpcServerRule().directExecutor();
+
+ private class TestService extends GreeterGrpc.GreeterImplBase {
+ boolean frozen;
+
+ @Override
+ public void sayHello(HelloRequest request, StreamObserver responseObserver) {
+ frozen = AmbientContext.current().isFrozen();
+
+ responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build());
+ responseObserver.onCompleted();
+ }
+ };
+
+ @Test
+ public void interceptorShouldFreezeContext() {
+ TestService svc = new TestService();
+
+ // Plumbing
+ serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc,
+ new AmbientContextServerInterceptor("ctx-"),
+ new AmbientContextFreezeServerInterceptor()));
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
+ .newBlockingStub(serverRule.getChannel())
+ .withInterceptors(new AmbientContextClientInterceptor("ctx-"));
+
+ // Test
+ Metadata.Key key = Metadata.Key.of("ctx-k", Metadata.ASCII_STRING_MARSHALLER);
+ AmbientContext.initialize(Context.current()).run(() -> {
+ AmbientContext.current().put(key, "value");
+ stub.sayHello(HelloRequest.newBuilder().setName("World").build());
+ });
+
+ assertThat(svc.frozen).isTrue();
+ }
+}
diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java
new file mode 100644
index 00000000..79da3837
--- /dev/null
+++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.context;
+
+import io.grpc.Context;
+import io.grpc.Metadata;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+@SuppressWarnings("Duplicates")
+public class AmbientContextTest {
+ @Before
+ public void setUp() throws Exception {
+ // Reset the gRPC context between test executions
+ Context.ROOT.attach();
+ }
+
+ @Test
+ public void initializeAttachesContext() {
+ Context ctx = AmbientContext.initialize(Context.current());
+ ctx.run(() -> assertThat(AmbientContext.current()).isNotNull());
+ }
+
+ @Test
+ public void doubleInitializeThrows() {
+ Context ctx = AmbientContext.initialize(Context.current());
+ assertThatThrownBy(() -> AmbientContext.initialize(ctx)).isInstanceOf(IllegalStateException.class);
+ }
+
+ @Test
+ public void uninitializedContextThrows() {
+ assertThatThrownBy(AmbientContext::current).isInstanceOf(IllegalStateException.class);
+ }
+
+ @Test
+ public void contextFreezingWorks() {
+ Metadata.Key key = Metadata.Key.of("key", Metadata.ASCII_STRING_MARSHALLER);
+ AmbientContext context = new AmbientContext();
+
+ assertThat(context.isFrozen()).isFalse();
+ Object freezeKey = context.freeze();
+ assertThat(context.isFrozen()).isTrue();
+ assertThatThrownBy(() -> context.put(key, "foo")).isInstanceOf(IllegalStateException.class);
+ }
+
+ @Test
+ public void contextDoubleFreezingThrows() {
+ AmbientContext context = new AmbientContext();
+ context.freeze();
+ assertThatThrownBy(context::freeze).isInstanceOf(IllegalStateException.class);
+ }
+
+ @Test
+ public void contextThawingWorks() {
+ AmbientContext context = new AmbientContext();
+
+ Object freezeKey = context.freeze();
+ context.thaw(freezeKey);
+ assertThat(context.isFrozen()).isFalse();
+ }
+
+ @Test
+ public void contextThawingNotFrozenThrows() {
+ AmbientContext context = new AmbientContext();
+ assertThatThrownBy(() -> context.thaw(new Object())).isInstanceOf(IllegalStateException.class);
+ }
+
+ @Test
+ public void contextThawingWrongKeyThrows() {
+ AmbientContext context = new AmbientContext();
+
+ Object freezeKey = context.freeze();
+ assertThatThrownBy(() -> context.thaw(new Object())).isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void contextScopeStackingWorks() {
+ Metadata.Key key = Metadata.Key.of("k", Metadata.ASCII_STRING_MARSHALLER);
+ AmbientContext.initialize(Context.current()).run(() -> {
+ AmbientContext.current().put(key, "outer");
+ assertThat(AmbientContext.current().get(key)).isEqualTo("outer");
+
+ AmbientContext.current().fork(Context.current()).run(() -> {
+ AmbientContext.current().put(key, "inner");
+ assertThat(AmbientContext.current().get(key)).isEqualTo("inner");
+ });
+
+ assertThat(AmbientContext.current().get(key)).isEqualTo("outer");
+ });
+ }
+}
diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java
new file mode 100644
index 00000000..f749c86e
--- /dev/null
+++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.context;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.salesforce.grpc.contrib.GreeterGrpc;
+import com.salesforce.grpc.contrib.HelloRequest;
+import com.salesforce.grpc.contrib.HelloResponse;
+import com.salesforce.grpc.contrib.MoreFutures;
+import io.grpc.Context;
+import io.grpc.Metadata;
+import io.grpc.ServerInterceptors;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcServerRule;
+import org.awaitility.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+import static org.awaitility.Awaitility.await;
+
+public class AmbientContextTransferTest {
+ @Rule public final GrpcServerRule serverRule1 = new GrpcServerRule();
+ @Rule public final GrpcServerRule serverRule2 = new GrpcServerRule();
+
+ @Before
+ public void setUp() throws Exception {
+ // Reset the gRPC context between test executions
+ Context.ROOT.attach();
+ }
+
+ @Test
+ public void contextTransfersOneHopSync() throws Exception {
+ Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER);
+ String expectedCtxValue = "context-value";
+ AtomicReference ctxValue = new AtomicReference<>();
+
+ // Service
+ GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
+ @Override
+ public void sayHello(HelloRequest request, StreamObserver responseObserver) {
+ ctxValue.set(AmbientContext.current().get(ctxKey));
+ responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build());
+ responseObserver.onCompleted();
+ }
+ };
+
+ // Plumbing
+ serverRule1.getServiceRegistry().addService(ServerInterceptors
+ .intercept(svc, new AmbientContextServerInterceptor("ctx-")));
+
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
+ .newBlockingStub(serverRule1.getChannel())
+ .withInterceptors(new AmbientContextClientInterceptor("ctx-"));
+
+ // Test
+ AmbientContext.initialize(Context.current()).run(() -> {
+ AmbientContext.current().put(ctxKey, expectedCtxValue);
+ stub.sayHello(HelloRequest.newBuilder().setName("world").build());
+ });
+
+ assertThat(ctxValue.get()).isEqualTo(expectedCtxValue);
+ }
+
+ @Test
+ public void multiValueContextTransfers() throws Exception {
+ Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER);
+ String expectedCtxValue1 = "context-value1";
+ String expectedCtxValue2 = "context-value2";
+ String expectedCtxValue3 = "context-value3";
+ AtomicReference> ctxValue = new AtomicReference<>();
+
+ // Service
+ GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
+ @Override
+ public void sayHello(HelloRequest request, StreamObserver responseObserver) {
+ ctxValue.set(AmbientContext.current().getAll(ctxKey));
+ responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build());
+ responseObserver.onCompleted();
+ }
+ };
+
+ // Plumbing
+ serverRule1.getServiceRegistry().addService(ServerInterceptors
+ .intercept(svc, new AmbientContextServerInterceptor("ctx-")));
+
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
+ .newBlockingStub(serverRule1.getChannel())
+ .withInterceptors(new AmbientContextClientInterceptor("ctx-"));
+
+ // Test
+ AmbientContext.initialize(Context.current()).run(() -> {
+ AmbientContext.current().put(ctxKey, expectedCtxValue1);
+ AmbientContext.current().put(ctxKey, expectedCtxValue2);
+ AmbientContext.current().put(ctxKey, expectedCtxValue3);
+ stub.sayHello(HelloRequest.newBuilder().setName("world").build());
+ });
+
+ assertThat(ctxValue.get()).containsExactlyInAnyOrder(expectedCtxValue1, expectedCtxValue2, expectedCtxValue3);
+ }
+
+ @Test
+ public void contextTransfersTwoHopSync() throws Exception {
+ Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER);
+ String expectedCtxValue = "context-value";
+ AtomicReference ctxValue = new AtomicReference<>();
+
+ // Terminal service
+ GreeterGrpc.GreeterImplBase svc2 = new GreeterGrpc.GreeterImplBase() {
+ @Override
+ public void sayHello(HelloRequest request, StreamObserver responseObserver) {
+ ctxValue.set(AmbientContext.current().get(ctxKey));
+ responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build());
+ responseObserver.onCompleted();
+ }
+ };
+
+ // Terminal service plumbing
+ serverRule2.getServiceRegistry().addService(ServerInterceptors
+ .intercept(svc2, new AmbientContextServerInterceptor("ctx-")));
+ GreeterGrpc.GreeterBlockingStub stub2 = GreeterGrpc
+ .newBlockingStub(serverRule2.getChannel())
+ .withInterceptors(new AmbientContextClientInterceptor("ctx-"));
+
+ // Relay service
+ GreeterGrpc.GreeterImplBase svc1 = new GreeterGrpc.GreeterImplBase() {
+ @Override
+ public void sayHello(HelloRequest request, StreamObserver responseObserver) {
+ responseObserver.onNext(stub2.sayHello(request));
+ responseObserver.onCompleted();
+ }
+ };
+
+ // Relay service plumbing
+ serverRule1.getServiceRegistry().addService(ServerInterceptors
+ .intercept(svc1, new AmbientContextServerInterceptor("ctx-")));
+ GreeterGrpc.GreeterBlockingStub stub1 = GreeterGrpc
+ .newBlockingStub(serverRule1.getChannel())
+ .withInterceptors(new AmbientContextClientInterceptor("ctx-"));
+
+ // Test
+ AmbientContext.initialize(Context.current()).run(() -> {
+ AmbientContext.current().put(ctxKey, expectedCtxValue);
+ stub1.sayHello(HelloRequest.newBuilder().setName("world").build());
+ });
+
+ assertThat(ctxValue.get()).isEqualTo(expectedCtxValue);
+ }
+
+ @Test
+ public void contextTransfersOneHopAsync() throws Exception {
+ Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER);
+ String expectedCtxValue = "context-value";
+ AtomicReference ctxValue = new AtomicReference<>();
+
+ // Service
+ GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
+ @Override
+ public void sayHello(HelloRequest request, StreamObserver responseObserver) {
+ ctxValue.set(AmbientContext.current().get(ctxKey));
+ responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build());
+ responseObserver.onCompleted();
+ }
+ };
+
+ // Plumbing
+ serverRule1.getServiceRegistry().addService(ServerInterceptors
+ .intercept(svc, new AmbientContextServerInterceptor("ctx-")));
+ GreeterGrpc.GreeterFutureStub stub = GreeterGrpc
+ .newFutureStub(serverRule1.getChannel())
+ .withInterceptors(new AmbientContextClientInterceptor("ctx-"));
+
+ // Test
+ AmbientContext.initialize(Context.current()).run(() -> {
+ AmbientContext.current().put(ctxKey, expectedCtxValue);
+ ListenableFuture futureResponse = stub.sayHello(HelloRequest.newBuilder().setName("world").build());
+
+ // Verify response callbacks still have context
+ MoreFutures.onSuccess(
+ futureResponse,
+ response -> assertThat(AmbientContext.current().get(ctxKey)).isEqualTo(expectedCtxValue),
+ Context.currentContextExecutor(Executors.newSingleThreadExecutor()));
+
+ await().atMost(Duration.ONE_SECOND).until(futureResponse::isDone);
+ });
+
+ assertThat(ctxValue.get()).isEqualTo(expectedCtxValue);
+ }
+
+ @Test
+ public void multipleContextTransfersOneHopSync() throws Exception {
+ Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER);
+ Metadata.Key l5dKey = Metadata.Key.of("l5d-context-key", Metadata.ASCII_STRING_MARSHALLER);
+ String expectedCtxValue = "context-value";
+ AtomicReference ctxValue = new AtomicReference<>();
+ AtomicReference l5dValue = new AtomicReference<>();
+
+ // Service
+ GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
+ @Override
+ public void sayHello(HelloRequest request, StreamObserver responseObserver) {
+ ctxValue.set(AmbientContext.current().get(ctxKey));
+ l5dValue.set(AmbientContext.current().get(l5dKey));
+ responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build());
+ responseObserver.onCompleted();
+ }
+ };
+
+ // Plumbing
+ serverRule1.getServiceRegistry().addService(ServerInterceptors.intercept(svc,
+ new AmbientContextServerInterceptor("ctx-"),
+ new AmbientContextServerInterceptor("l5d-")));
+
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
+ .newBlockingStub(serverRule1.getChannel())
+ .withInterceptors(
+ new AmbientContextClientInterceptor("ctx-"),
+ new AmbientContextClientInterceptor("l5d-"));
+
+ // Test
+ AmbientContext.initialize(Context.current()).run(() -> {
+ AmbientContext.current().put(ctxKey, expectedCtxValue);
+ AmbientContext.current().put(l5dKey, expectedCtxValue);
+ stub.sayHello(HelloRequest.newBuilder().setName("world").build());
+ });
+
+ assertThat(ctxValue.get()).isEqualTo(expectedCtxValue);
+ assertThat(l5dValue.get()).isEqualTo(expectedCtxValue);
+ }
+}
diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java
new file mode 100644
index 00000000..3dbb4722
--- /dev/null
+++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.context;
+
+import com.google.common.io.BaseEncoding;
+import com.salesforce.grpc.contrib.GreeterGrpc;
+import com.salesforce.grpc.contrib.HelloRequest;
+import com.salesforce.grpc.contrib.HelloResponse;
+import io.grpc.Context;
+import io.grpc.Metadata;
+import io.grpc.ServerInterceptors;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcServerRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.fail;
+
+public class BinaryAmbientContextTest {
+ @Rule public final GrpcServerRule serverRule = new GrpcServerRule();
+
+ @Test
+ public void binaryContextValueTransfers() throws Exception {
+ Metadata.Key ctxKey = Metadata.Key.of(
+ "ctx-context-key" + Metadata.BINARY_HEADER_SUFFIX,
+ Metadata.BINARY_BYTE_MARSHALLER);
+ byte[] expectedCtxValue = BaseEncoding.base16().decode("DEADBEEF");
+ AtomicReference ctxValue = new AtomicReference<>();
+
+ // Service
+ GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
+ @Override
+ public void sayHello(HelloRequest request, StreamObserver responseObserver) {
+ ctxValue.set(AmbientContext.current().get(ctxKey));
+ responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build());
+ responseObserver.onCompleted();
+ }
+ };
+
+ // Plumbing
+ serverRule.getServiceRegistry().addService(ServerInterceptors
+ .intercept(svc, new AmbientContextServerInterceptor("ctx-")));
+
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
+ .newBlockingStub(serverRule.getChannel())
+ .withInterceptors(new AmbientContextClientInterceptor("ctx-"));
+
+ // Test
+ AmbientContext.initialize(Context.current()).run(() -> {
+ AmbientContext.current().put(ctxKey, expectedCtxValue);
+ stub.sayHello(HelloRequest.newBuilder().setName("world").build());
+ });
+
+ assertThat(ctxValue.get()).containsExactly(expectedCtxValue);
+ }
+}
diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptorTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptorTest.java
index 87c89595..c77b2833 100644
--- a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptorTest.java
+++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/DefaultDeadlineInterceptorTest.java
@@ -10,6 +10,7 @@
import io.grpc.*;
import org.junit.Test;
+import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -22,7 +23,7 @@ public class DefaultDeadlineInterceptorTest {
public void interceptorShouldAddDeadlineWhenAbsent() {
AtomicBoolean called = new AtomicBoolean(false);
- DefaultDeadlineInterceptor interceptor = new DefaultDeadlineInterceptor(1, TimeUnit.HOURS);
+ DefaultDeadlineInterceptor interceptor = new DefaultDeadlineInterceptor(Duration.ofHours(1));
interceptor.interceptCall(null, CallOptions.DEFAULT, new Channel() {
@Override
@@ -45,7 +46,7 @@ public String authority() {
public void interceptorShouldNotModifyExplicitDeadline() {
AtomicBoolean called = new AtomicBoolean(false);
- DefaultDeadlineInterceptor interceptor = new DefaultDeadlineInterceptor(1, TimeUnit.HOURS);
+ DefaultDeadlineInterceptor interceptor = new DefaultDeadlineInterceptor(Duration.ofHours(1));
interceptor.interceptCall(null, CallOptions.DEFAULT.withDeadlineAfter(10, TimeUnit.HOURS), new Channel() {
@Override
@@ -68,7 +69,7 @@ public String authority() {
public void interceptorShouldNotModifyContextDeadline() throws Exception {
AtomicBoolean called = new AtomicBoolean(false);
- DefaultDeadlineInterceptor interceptor = new DefaultDeadlineInterceptor(1, TimeUnit.HOURS);
+ DefaultDeadlineInterceptor interceptor = new DefaultDeadlineInterceptor(Duration.ofHours(1));
Context.current().withDeadlineAfter(10, TimeUnit.HOURS, Executors.newSingleThreadScheduledExecutor()).run(() -> {
interceptor.interceptCall(null, CallOptions.DEFAULT, new Channel() {
diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/StopwatchInterceptorTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/StopwatchInterceptorTest.java
new file mode 100644
index 00000000..42ad757a
--- /dev/null
+++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/interceptor/StopwatchInterceptorTest.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2017, salesforce.com, inc.
+ * All rights reserved.
+ * Licensed under the BSD 3-Clause license.
+ * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause
+ */
+
+package com.salesforce.grpc.contrib.interceptor;
+
+import com.salesforce.grpc.contrib.GreeterGrpc;
+import com.salesforce.grpc.contrib.HelloRequest;
+import com.salesforce.grpc.contrib.HelloResponse;
+import io.grpc.MethodDescriptor;
+import io.grpc.ServerInterceptors;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcServerRule;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class StopwatchInterceptorTest {
+ @Rule public final GrpcServerRule serverRule = new GrpcServerRule().directExecutor();
+
+ GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() {
+ @Override
+ public void sayHello(HelloRequest request, StreamObserver responseObserver) {
+ responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build());
+ responseObserver.onCompleted();
+ }
+ };
+
+ @Test
+ public void clientStopwatchWorks() {
+ AtomicReference startDesc = new AtomicReference<>();
+ AtomicReference stopDesc = new AtomicReference<>();
+ AtomicReference stopDur = new AtomicReference<>();
+
+ //Setup
+ serverRule.getServiceRegistry().addService(svc);
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc
+ .newBlockingStub(serverRule.getChannel())
+ .withInterceptors(new StopwatchClientInterceptor() {
+ @Override
+ protected void logStart(MethodDescriptor method) {
+ startDesc.set(method);
+ }
+
+ @Override
+ protected void logStop(MethodDescriptor method, Duration duration) {
+ stopDesc.set(method);
+ stopDur.set(duration);
+ }
+ });
+
+ stub.sayHello(HelloRequest.newBuilder().setName("World").build());
+
+ assertThat(startDesc.get().getFullMethodName()).contains("SayHello");
+ assertThat(startDesc.get().getFullMethodName()).contains("SayHello");
+ assertThat(stopDur.get()).isGreaterThan(Duration.ZERO);
+ }
+
+ @Test
+ public void serverStopwatchWorks() {
+ AtomicReference startDesc = new AtomicReference<>();
+ AtomicReference stopDesc = new AtomicReference<>();
+ AtomicReference stopDur = new AtomicReference<>();
+
+ //Setup
+ serverRule.getServiceRegistry().addService(ServerInterceptors.intercept(svc,
+ new StopwatchServerInterceptor() {
+ @Override
+ protected void logStart(MethodDescriptor method) {
+ startDesc.set(method);
+ }
+
+ @Override
+ protected void logStop(MethodDescriptor method, Duration duration) {
+ stopDesc.set(method);
+ stopDur.set(duration);
+ }
+ }));
+ GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(serverRule.getChannel());
+
+ stub.sayHello(HelloRequest.newBuilder().setName("World").build());
+
+ assertThat(startDesc.get().getFullMethodName()).contains("SayHello");
+ assertThat(startDesc.get().getFullMethodName()).contains("SayHello");
+ assertThat(stopDur.get()).isGreaterThan(Duration.ZERO);
+ }
+}
diff --git a/grpc-contrib/src/test/proto/helloworld.proto b/grpc-contrib/src/test/proto/helloworld.proto
index 1c441cd7..d1abddce 100644
--- a/grpc-contrib/src/test/proto/helloworld.proto
+++ b/grpc-contrib/src/test/proto/helloworld.proto
@@ -6,7 +6,7 @@ import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
option java_multiple_files = true;
-option java_package = "com.salesforce.jprotoc";
+option java_package = "com.salesforce.grpc.contrib";
option java_outer_classname = "HelloWorldProto";
// The greeting service definition.
diff --git a/pom.xml b/pom.xml
index 3ad7ec63..fc3b5c16 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,6 +69,7 @@
1.6.1
3.3.0
+ 2.7
0.9.4
4.2.0.RELEASE
@@ -82,6 +83,7 @@
1.8
${java.version}
${java.version}
+ 1.7.21
@@ -165,6 +167,19 @@
spring-webmvc
${spring.version}
+
+
+ org.slf4j
+ slf4j-api
+ ${slf4j.version}
+ provided
+
+
+ com.google.code.gson
+ gson
+ ${gson.version}
+ provided
+
io.grpc
diff --git a/rxgrpc/rxgrpc-test/src/test/java/com/salesforce/rxgrpc/UnexpectedServerErrorIntegrationTest.java b/rxgrpc/rxgrpc-test/src/test/java/com/salesforce/rxgrpc/UnexpectedServerErrorIntegrationTest.java
index 59e8770b..2b523df9 100644
--- a/rxgrpc/rxgrpc-test/src/test/java/com/salesforce/rxgrpc/UnexpectedServerErrorIntegrationTest.java
+++ b/rxgrpc/rxgrpc-test/src/test/java/com/salesforce/rxgrpc/UnexpectedServerErrorIntegrationTest.java
@@ -100,7 +100,7 @@ public void manyToOne() {
test.awaitTerminalEvent(3, TimeUnit.SECONDS);
test.assertError(t -> t instanceof StatusRuntimeException);
- // Flowable requests get canceled when unexpected errors happen
+ // Flowable requests current canceled when unexpected errors happen
test.assertError(t -> ((StatusRuntimeException)t).getStatus().getCode() == Status.Code.CANCELLED);
}