From 3e9e4651d85b2e3617312ff4697e393bbe98dcd2 Mon Sep 17 00:00:00 2001 From: Ryan Michela Date: Sat, 14 Oct 2017 11:39:02 -0700 Subject: [PATCH] Implemented ambient context --- .../grpc/contrib/context/AmbientContext.java | 229 +++++++++++++++++ .../AmbientContextClientInterceptor.java | 72 ++++++ ...bientContextEnforcerClientInterceptor.java | 83 ++++++ ...bientContextEnforcerServerInterceptor.java | 73 ++++++ ...AmbientContextFreezeServerInterceptor.java | 60 +++++ .../AmbientContextServerInterceptor.java | 79 ++++++ .../grpc/contrib/context/package-info.java | 37 +++ .../context/AmbientContextEnforcerTest.java | 149 +++++++++++ .../AmbientContextFreezeInterceptorTest.java | 59 +++++ .../contrib/context/AmbientContextTest.java | 99 ++++++++ .../context/AmbientContextTransferTest.java | 240 ++++++++++++++++++ .../context/BinaryAmbientContextTest.java | 64 +++++ grpc-contrib/src/test/proto/helloworld.proto | 2 +- 13 files changed, 1245 insertions(+), 1 deletion(-) create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerClientInterceptor.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerServerInterceptor.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeServerInterceptor.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java create mode 100644 grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerTest.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeInterceptorTest.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java create mode 100644 grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java new file mode 100644 index 00000000..745a795c --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContext.java @@ -0,0 +1,229 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.Context; +import io.grpc.Metadata; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; +import java.util.Set; + +import static com.google.common.base.Preconditions.*; + + +/** + * {@code AmbientContext} is entry point for working with the ambient context managed by {@link AmbientContextClientInterceptor} + * and {@link AmbientContextServerInterceptor}. The interface for this class is very similar to gRPC's {@code Metadata} + * class. + * + *

This class is not thread safe, implementations should ensure that ambient context reads and writes do + * not occur in multiple threads concurrently. + * + *

See package javadoc for more info. + */ +@NotThreadSafe +public final class AmbientContext { + static final Context.Key DATA_KEY = Context.key("AmbientContext"); + + /** + * Attaches an empty ambient context to the provided gRPC {@code Context}. + * + * @throws IllegalStateException if an ambient context has already been attached to the + * provided gRPC {@code Context}. + */ + public static Context initialize(Context context) { + checkNotNull(context, "context"); + checkState(DATA_KEY.get(context) == null, + "AmbientContext has already been created in the scope of the current context"); + return context.withValue(DATA_KEY, new AmbientContext()); + } + + /** + * Returns the ambient context attached to the current gRPC {@code Context}. + * + * @throws IllegalStateException if no ambient context is attached to the current gRPC {@code Context}. + */ + public static AmbientContext current() { + checkState(DATA_KEY.get() != null, + "AmbientContext has not yet been created in the scope of the current context"); + return DATA_KEY.get(); + } + + /** + * @return true if an {@code AmbientContext} is attached to the current gRPC context. + */ + public static boolean isPresent() { + return DATA_KEY.get() != null; + } + + private Metadata contextMetadata; + private Object freezeKey = null; + + AmbientContext() { + this.contextMetadata = new Metadata(); + } + + /** + * Copy constructor. + */ + AmbientContext(AmbientContext other) { + this(); + this.contextMetadata.merge(other.contextMetadata); + } + + /** + * Makes the AmbientContext as read-only, preventing any further modification. A "freeze key" is returned, which + * can be used to {@link #thaw(Object)} the AmbientContext in the future. + * + *

{@code freeze()} and {@code thaw()} are typically used to mark the ambient context read-only when the + * interceptor chain completes. + * + * @return a "freeze key" that can be used passed to {@link #thaw(Object)} + * + * @throws IllegalStateException if the AmbientContext is already frozen + */ + public Object freeze() { + checkState(!isFrozen(), "AmbientContext already frozen. Cannot freeze() twice."); + freezeKey = new Object(); + return freezeKey; + } + + /** + * Makes the AmbientContext mutable again, after {@link #freeze()} has been called. A "freeze key" is needed to + * unfreeze the AmbientContext, ensuring only the code that froze the context can subsequently thaw it. + * + *

{@code freeze()} and {@code thaw()} are typically used to mark the ambient context read-only when the + * interceptor chain completes. + * + * @param freezeKey the "freeze key" returned by {@link #freeze()} + * + * @throws IllegalStateException if the AmbientContext has not yet been frozen + * @throws IllegalArgumentException if the {@code freezeKey} is incorrect + */ + public void thaw(Object freezeKey) { + checkState(isFrozen(), "AmbientContext is not frozen. Cannot thaw()."); + checkArgument(this.freezeKey == freezeKey, + "The provided freezeKey is not the same object returned by freeze()"); + this.freezeKey = null; + } + + /** + * Similar to {@link #initialize(Context)}, {@code fork()} attaches a shallow clone of this {@code AmbientContext} + * to a provided gRPC {@code Context}. Use {@code fork()} when you want create a temporary context scope. + * + * @param context + * @return + */ + public Context fork(Context context) { + return context.withValue(DATA_KEY, new AmbientContext(this)); + } + + /** + * @return true of the AmbientContext has been frozen + */ + public boolean isFrozen() { + return freezeKey != null; + } + + private void checkFreeze() { + checkState(freezeKey == null, "AmbientContext cannot be modified while frozen"); + } + + /** + * Returns true if a value is defined for the given key. + * + *

This is done by linear search, so if it is followed by {@link #get} or {@link #getAll}, + * prefer calling them directly and checking the return value against {@code null}. + */ + public boolean containsKey(Metadata.Key key) { + return contextMetadata.containsKey(key); + } + + /** + * Remove all values for the given key without returning them. This is a minor performance + * optimization if you do not need the previous values. + * + * @throws IllegalStateException if the AmbientContext is frozen + */ + public void discardAll(Metadata.Key key) { + checkFreeze(); + contextMetadata.discardAll(key); + } + + /** + * Returns the last ambient context entry added with the name 'name' parsed as T. + * + * @return the parsed metadata entry or null if there are none. + */ + @Nullable + public T get(Metadata.Key key) { + return contextMetadata.get(key); + } + + /** + * Returns all the ambient context entries named 'name', in the order they were received, parsed as T, or + * null if there are none. The iterator is not guaranteed to be "live." It may or may not be + * accurate if the ambient context is mutated. + */ + @Nullable + public Iterable getAll(final Metadata.Key key) { + return contextMetadata.getAll(key); + } + + /** + * Returns set of all keys in store. + * + * @return unmodifiable Set of keys + */ + public Set keys() { + return contextMetadata.keys(); + } + + /** + * Adds the {@code key, value} pair. If {@code key} already has values, {@code value} is added to + * the end. Duplicate values for the same key are permitted. + * + * @throws NullPointerException if key or value is null + * @throws IllegalStateException if the AmbientContext is frozen + */ + public void put(Metadata.Key key, T value) { + checkFreeze(); + contextMetadata.put(key, value); + } + + /** + * Removes the first occurrence of {@code value} for {@code key}. + * + * @param key key for value + * @param value value + * @return {@code true} if {@code value} removed; {@code false} if {@code value} was not present + * + * @throws NullPointerException if {@code key} or {@code value} is null + * @throws IllegalStateException if the AmbientContext is frozen + */ + public boolean remove(Metadata.Key key, T value) { + checkFreeze(); + return contextMetadata.remove(key, value); + } + + /** + * Remove all values for the given key. If there were no values, {@code null} is returned. + * + * @throws IllegalStateException if the AmbientContext is frozen + */ + public Iterable removeAll(Metadata.Key key) { + checkFreeze(); + return contextMetadata.removeAll(key); + } + + @Override + public String toString() { + return (isFrozen() ? "[FROZEN] " : "[THAWED] ") + contextMetadata.toString(); + } +} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java new file mode 100644 index 00000000..16c13e95 --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextClientInterceptor.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.*; + +/** + * {@code AmbientContextClientInterceptor} transparently deserializes prefixed request headers into an ambient context. + * Header values can be accessed using the {@link AmbientContext} class. + * + *

Each {@code AmbientContextClientInterceptor} marshals headers with a know prefix. If multiple prefixes are needed, + * add multiple {@code AmbientContextClientInterceptor} instances to the gRPC interceptor chain. + * + *

See package javadoc for more info. + */ +public class AmbientContextClientInterceptor implements ClientInterceptor { + private String headerPrefix; + + /** + * Constructs an {@code AmbientContextClientInterceptor} that marshals request headers with a know prefix into the + * {@link AmbientContext}. + * + * @param headerPrefix the header prefix to marshal. + */ + public AmbientContextClientInterceptor(String headerPrefix) { + this.headerPrefix = headerPrefix; + } + + @Override + public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + if (AmbientContext.isPresent()) { + return new ForwardingClientCall.SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + AmbientContext ctx = AmbientContext.current(); + if (ctx != null) { + for (String keyString : ctx.keys()) { + if (keyString.startsWith(headerPrefix)) { + if (keyString.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + Metadata.Key key = Metadata.Key.of(keyString, Metadata.BINARY_BYTE_MARSHALLER); + Iterable values = ctx.getAll(key); + if (values != null) { + for (byte[] value : values) { + headers.put(key, value); + } + } + } else { + Metadata.Key key = Metadata.Key.of(keyString, Metadata.ASCII_STRING_MARSHALLER); + Iterable values = ctx.getAll(key); + if (values != null) { + for (String value : values) { + headers.put(key, value); + } + } + } + } + } + } + super.start(responseListener, headers); + } + }; + } else { + // Noop if ambient context is absent + return next.newCall(method, callOptions); + } + } +} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerClientInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerClientInterceptor.java new file mode 100644 index 00000000..f4972ea5 --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerClientInterceptor.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.*; + +/** + * Some uses of ambient context, like distributed tracing, break down if a service fails to propagate the context + * to downstream services. This typically happens when multi-threading code fails to correctly transfer the gRPC + * context to worker threads. + * + *

{@code AmbientContextEnforcerClientInterceptor} is used to enforce context propagation by catastrophically + * failing downstream service calls if the ambient context is missing or incomplete. + */ +public class AmbientContextEnforcerClientInterceptor implements ClientInterceptor { + private String[] requiredContextKeys = {}; + + /** + * Constructs an {@code AmbientContextEnforcerClientInterceptor} with no required context keys. + */ + public AmbientContextEnforcerClientInterceptor() { + } + + /** + * Constructs an {@code AmbientContextEnforcerClientInterceptor} with a set of required context keys. + */ + public AmbientContextEnforcerClientInterceptor(String... requiredContextKeys) { + this.requiredContextKeys = requiredContextKeys; + } + + /** + * MissingAmbientContextException is thrown to indicate a breakdown in context continuity between services. + */ + public static class MissingAmbientContextException extends RuntimeException { + public MissingAmbientContextException(String message) { + super(message); + } + } + + @Override + public ClientCall interceptCall(MethodDescriptor method, CallOptions callOptions, Channel next) { + // Throw if ambient context is missing + if (!AmbientContext.isPresent()) { + throw missingContextException(); + } + + // Throw if required context keys are missing + for (String requiredKey : requiredContextKeys) { + if (!AmbientContext.current().keys().contains(requiredKey)) { + throw incompleteContextException(requiredKey); + } + } + + return next.newCall(method, callOptions); + } + + /** + * Override this method to change the exception type or message thrown when the ambient context is missing + * entirely, perhaps to reference your own internal documentation. + * + * @return a RuntimeException + */ + protected RuntimeException missingContextException() { + return new MissingAmbientContextException("No AmbientContext is attached to the current gRPC Context. " + + "Make sure Context is correctly transferred between worker threads using Context.wrap() or " + + "Context.currentContextExecutor()."); + } + + /** + * Override this method to change the exception type or message thrown when the ambient context is missing a + * required context key, perhaps to reference your own internal documentation. + * + * @return a RuntimeException + */ + protected RuntimeException incompleteContextException(String missingKey) { + return new MissingAmbientContextException("The AmbientContext is missing a required context key: " + missingKey); + } +} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerServerInterceptor.java new file mode 100644 index 00000000..b0d6cf03 --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerServerInterceptor.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.*; + +/** + * Some uses of ambient context, like distributed tracing, break down if a service fails to propagate the context + * to downstream services. + * + *

{@code AmbientContextEnforcerServerInterceptor} is used to enforce context propagation by catastrophically + * failing a service call if the ambient context is missing or incomplete. + */ +public class AmbientContextEnforcerServerInterceptor implements ServerInterceptor { + private String[] requiredContextKeys = {}; + + /** + * Constructs an {@code AmbientContextEnforcerServerInterceptor} with no required context keys. + */ + public AmbientContextEnforcerServerInterceptor() { + } + + /** + * Constructs an {@code AmbientContextEnforcerServerInterceptor} with a set of required context keys. + */ + public AmbientContextEnforcerServerInterceptor(String... requiredContextKeys) { + this.requiredContextKeys = requiredContextKeys; + } + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { + // Throw if ambient context is missing + if (!AmbientContext.isPresent()) { + call.close(missingContextStatus(), new Metadata()); + return new ServerCall.Listener() { }; + } + + // Throw if required context keys are missing + for (String requiredKey : requiredContextKeys) { + if (!AmbientContext.current().keys().contains(requiredKey)) { + call.close(incompleteContextStatus(requiredKey), new Metadata()); + return new ServerCall.Listener() { }; + } + } + + return next.startCall(call, headers); + } + + /** + * Override this method to change the gRPC {@code Status} returned when the ambient context is missing + * entirely, perhaps to reference your own internal documentation. + * + * @return a gRPC Status + */ + protected Status missingContextStatus() { + return Status.FAILED_PRECONDITION.withDescription("Ambient context is required but not found"); + } + + /** + * Override this method to change the gRPC {@code Status} returned when the ambient context is missing a + * required context key, perhaps to reference your own internal documentation. + * + * @return a gRPC Status + */ + protected Status incompleteContextStatus(String missingKey) { + return Status.FAILED_PRECONDITION.withDescription("Required ambient context key " + missingKey + " was not found"); + } +} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeServerInterceptor.java new file mode 100644 index 00000000..9841b1dd --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeServerInterceptor.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.*; + +/** + * {@code AmbientContextFreezeServerInterceptor} freezes the current {@link AmbientContext} for all downstream + * operations. The best place to put this interceptor is at the end of the gRPC interceptor chain. + */ +public class AmbientContextFreezeServerInterceptor implements ServerInterceptor { + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { + return new ForwardingServerCallListener.SimpleForwardingServerCallListener(next.startCall(call, headers)) { + @Override + public void onMessage(ReqT message) { + freezeAndThaw(() -> super.onMessage(message)); + } + + @Override + public void onHalfClose() { + freezeAndThaw(super::onHalfClose); + } + + @Override + public void onCancel() { + freezeAndThaw(super::onCancel); + } + + @Override + public void onComplete() { + freezeAndThaw(super::onComplete); + } + + @Override + public void onReady() { + freezeAndThaw(super::onReady); + } + + private void freezeAndThaw(Runnable delegate) { + if (AmbientContext.isPresent()) { + Object freezeKey = AmbientContext.current().freeze(); + try { + delegate.run(); + } finally { + AmbientContext.current().thaw(freezeKey); + } + } else { + + delegate.run(); + } + } + }; + } +} diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java new file mode 100644 index 00000000..c732e86c --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/AmbientContextServerInterceptor.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.*; + +/** + * {@code AmbientContextServerInterceptor} transparently serializes prefixed ambient context values into outbound request + * headers. + * + *

Each {@code AmbientContextServerInterceptor} marshals headers with a know prefix. If multiple prefixes are needed, + * add multiple {@code AmbientContextServerInterceptor} instances to the gRPC interceptor chain. + * + *

See package javadoc for more info. + */ +public class AmbientContextServerInterceptor implements ServerInterceptor { + private String headerPrefix; + + /** + * Constructs an {@code AmbientContextServerInterceptor} that marshals ambient context values with a know prefix + * into outbound request headers. + * {@link AmbientContext}. + * + * @param headerPrefix the header prefix to marshal. + */ + public AmbientContextServerInterceptor(String headerPrefix) { + this.headerPrefix = headerPrefix; + } + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, ServerCallHandler next) { + AmbientContext ctx = AmbientContext.DATA_KEY.get(); + // Only initialize ctx if not yet initialized + ctx = ctx != null ? ctx : new AmbientContext(); + + boolean found = false; + for (String keyName : headers.keys()) { + if (!keyName.startsWith(headerPrefix)) { + continue; + } + + if (keyName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) { + Metadata.Key key = Metadata.Key.of(keyName, Metadata.BINARY_BYTE_MARSHALLER); + Iterable values = headers.getAll(key); + if (values == null) { + continue; + } + + for (byte[] value : values) { + ctx.put(key, value); + } + } else { + Metadata.Key key = Metadata.Key.of(keyName, Metadata.ASCII_STRING_MARSHALLER); + Iterable values = headers.getAll(key); + if (values == null) { + continue; + } + + for (String value : values) { + ctx.put(key, value); + } + } + + found = true; + } + + if (found) { + return Contexts.interceptCall(Context.current().withValue(AmbientContext.DATA_KEY, ctx), call, headers, next); + } else { + // Don't attach a context if there is nothing to attach + return next.startCall(call, headers); + } + } +} \ No newline at end of file diff --git a/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java new file mode 100644 index 00000000..1108a38c --- /dev/null +++ b/grpc-contrib/src/main/java/com/salesforce/grpc/contrib/context/package-info.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +/** + * The classes in this package are used together to implement the Ambient Context pattern, where context values are + * transparently marshaled from service to service without user intervention. This pattern is particularly useful for + * transparently populating system information like Zipkin trace ids and logStop correlation ids on every service call in + * a call graph. + * + *

The Ambient Context pattern is implemented in gRPC using the + * {@link com.salesforce.grpc.contrib.context.AmbientContextServerInterceptor} and + * {@link com.salesforce.grpc.contrib.context.AmbientContextClientInterceptor}. Together these interceptors marshall + * context headers from the gRPC {@code Context} into outbound request {@code Metadata}, and from inbound request + * {@code Metadata} into the gRPC {@code Context}. + * + *

Within your service and infrastructure code, the {@link com.salesforce.grpc.contrib.context.AmbientContext} class + * provides utility methods for interacting with the ambient context. {@code AmbientContext}s API is similar to gRPC's + * {@code Metadata} API, with the addition of the {@code freeze()} and {@code thaw()} operations used to toggle + * the {@code AmbientContext}'s read-only status. + * + *

Freezing the ambient context is useful for protecting its contents when transitioning between platform + * infrastructure code and service implementation code. In many cases, platform infrastructure code and service + * implementations are written by different groups with different goals. Platform infrastructure implementors typically + * own inter-service functions like distributed tracing, logStop correlation, and service metrics, typically implemented + * using the gRPC interceptor chain. For these developers ambient context must be mutable. Service implementors, on + * the other hand, own the business logic for each service. For these developers it can make sense for the ambient + * context to be immutable to prevent unnecessary pollution of the ambient context. Freezing and thawing the context + * allows it to seamlessly transition between mutable and immutable modes as needed by each audience. + * + * @see + * The Ambient Context Design Pattern in .NET + */ +package com.salesforce.grpc.contrib.context; \ No newline at end of file diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerTest.java new file mode 100644 index 00000000..a91d26bd --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextEnforcerTest.java @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import com.salesforce.grpc.contrib.GreeterGrpc; +import com.salesforce.grpc.contrib.HelloRequest; +import com.salesforce.grpc.contrib.HelloResponse; +import com.salesforce.grpc.contrib.Statuses; +import io.grpc.*; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import org.assertj.core.api.Condition; +import org.junit.Rule; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class AmbientContextEnforcerTest { + @Rule public final GrpcServerRule serverRule = new GrpcServerRule(); + + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + @Test + public void clientEnforcerPasses() { + // Plumbing + serverRule.getServiceRegistry().addService(svc); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextEnforcerClientInterceptor("ctx-test")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + Metadata.Key k = Metadata.Key.of("ctx-test", Metadata.ASCII_STRING_MARSHALLER); + AmbientContext.current().put(k, "v"); + + stub.sayHello(HelloRequest.newBuilder().setName("World").build()); + }); + } + + @Test + public void clientEnforcerFailsMissing() { + // Plumbing + serverRule.getServiceRegistry().addService(svc); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextEnforcerClientInterceptor("ctx-test")); + + // Test + assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build())) + .isInstanceOf(AmbientContextEnforcerClientInterceptor.MissingAmbientContextException.class); + } + + @Test + public void clientEnforcerFailsIncomplete() { + // Plumbing + serverRule.getServiceRegistry().addService(svc); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextEnforcerClientInterceptor("ctx-test")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build())) + .isInstanceOf(AmbientContextEnforcerClientInterceptor.MissingAmbientContextException.class) + .hasMessageContaining("ctx-test"); + }); + } + + @Test + public void serverEnforcerPasses() { + // Plumbing + serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc, + new AmbientContextServerInterceptor("ctx-"), + new AmbientContextEnforcerServerInterceptor("ctx-test") + )); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + Metadata.Key k = Metadata.Key.of("ctx-test", Metadata.ASCII_STRING_MARSHALLER); + AmbientContext.current().put(k, "v"); + + stub.sayHello(HelloRequest.newBuilder().setName("World").build()); + }); + } + + @Test + public void serverEnforcerFailsMissing() { + // Plumbing + serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc, + new AmbientContextServerInterceptor("ctx-"), + new AmbientContextEnforcerServerInterceptor("ctx-test") + )); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build())) + .isInstanceOfAny(StatusRuntimeException.class) + .has(new Condition() { + @Override + public boolean matches(Throwable throwable) { + return Statuses.hasStatusCode(throwable, Status.Code.FAILED_PRECONDITION); + } + }); + } + + @Test + public void serverEnforcerFailsIncomplete() { + // Plumbing + serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc, + new AmbientContextServerInterceptor("ctx-"), + new AmbientContextEnforcerServerInterceptor("ctx-test") + )); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + Metadata.Key k = Metadata.Key.of("ctx-unrelated", Metadata.ASCII_STRING_MARSHALLER); + AmbientContext.current().put(k, "v"); + + assertThatThrownBy(() -> stub.sayHello(HelloRequest.newBuilder().setName("World").build())) + .isInstanceOfAny(StatusRuntimeException.class) + .hasMessageContaining("ctx-test") + .has(new Condition() { + @Override + public boolean matches(Throwable throwable) { + return Statuses.hasStatusCode(throwable, Status.Code.FAILED_PRECONDITION); + } + }); + }); + } +} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeInterceptorTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeInterceptorTest.java new file mode 100644 index 00000000..2eccf9a1 --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextFreezeInterceptorTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import com.salesforce.grpc.contrib.GreeterGrpc; +import com.salesforce.grpc.contrib.HelloRequest; +import com.salesforce.grpc.contrib.HelloResponse; +import io.grpc.Context; +import io.grpc.Metadata; +import io.grpc.ServerInterceptors; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import org.junit.Rule; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class AmbientContextFreezeInterceptorTest { + @Rule public final GrpcServerRule serverRule = new GrpcServerRule().directExecutor(); + + private class TestService extends GreeterGrpc.GreeterImplBase { + boolean frozen; + + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + frozen = AmbientContext.current().isFrozen(); + + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + @Test + public void interceptorShouldFreezeContext() { + TestService svc = new TestService(); + + // Plumbing + serverRule.getServiceRegistry().addService(ServerInterceptors.interceptForward(svc, + new AmbientContextServerInterceptor("ctx-"), + new AmbientContextFreezeServerInterceptor())); + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + Metadata.Key key = Metadata.Key.of("ctx-k", Metadata.ASCII_STRING_MARSHALLER); + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(key, "value"); + stub.sayHello(HelloRequest.newBuilder().setName("World").build()); + }); + + assertThat(svc.frozen).isTrue(); + } +} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java new file mode 100644 index 00000000..79da3837 --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTest.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import io.grpc.Context; +import io.grpc.Metadata; +import org.junit.Before; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +@SuppressWarnings("Duplicates") +public class AmbientContextTest { + @Before + public void setUp() throws Exception { + // Reset the gRPC context between test executions + Context.ROOT.attach(); + } + + @Test + public void initializeAttachesContext() { + Context ctx = AmbientContext.initialize(Context.current()); + ctx.run(() -> assertThat(AmbientContext.current()).isNotNull()); + } + + @Test + public void doubleInitializeThrows() { + Context ctx = AmbientContext.initialize(Context.current()); + assertThatThrownBy(() -> AmbientContext.initialize(ctx)).isInstanceOf(IllegalStateException.class); + } + + @Test + public void uninitializedContextThrows() { + assertThatThrownBy(AmbientContext::current).isInstanceOf(IllegalStateException.class); + } + + @Test + public void contextFreezingWorks() { + Metadata.Key key = Metadata.Key.of("key", Metadata.ASCII_STRING_MARSHALLER); + AmbientContext context = new AmbientContext(); + + assertThat(context.isFrozen()).isFalse(); + Object freezeKey = context.freeze(); + assertThat(context.isFrozen()).isTrue(); + assertThatThrownBy(() -> context.put(key, "foo")).isInstanceOf(IllegalStateException.class); + } + + @Test + public void contextDoubleFreezingThrows() { + AmbientContext context = new AmbientContext(); + context.freeze(); + assertThatThrownBy(context::freeze).isInstanceOf(IllegalStateException.class); + } + + @Test + public void contextThawingWorks() { + AmbientContext context = new AmbientContext(); + + Object freezeKey = context.freeze(); + context.thaw(freezeKey); + assertThat(context.isFrozen()).isFalse(); + } + + @Test + public void contextThawingNotFrozenThrows() { + AmbientContext context = new AmbientContext(); + assertThatThrownBy(() -> context.thaw(new Object())).isInstanceOf(IllegalStateException.class); + } + + @Test + public void contextThawingWrongKeyThrows() { + AmbientContext context = new AmbientContext(); + + Object freezeKey = context.freeze(); + assertThatThrownBy(() -> context.thaw(new Object())).isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void contextScopeStackingWorks() { + Metadata.Key key = Metadata.Key.of("k", Metadata.ASCII_STRING_MARSHALLER); + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(key, "outer"); + assertThat(AmbientContext.current().get(key)).isEqualTo("outer"); + + AmbientContext.current().fork(Context.current()).run(() -> { + AmbientContext.current().put(key, "inner"); + assertThat(AmbientContext.current().get(key)).isEqualTo("inner"); + }); + + assertThat(AmbientContext.current().get(key)).isEqualTo("outer"); + }); + } +} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java new file mode 100644 index 00000000..f749c86e --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/AmbientContextTransferTest.java @@ -0,0 +1,240 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import com.google.common.util.concurrent.ListenableFuture; +import com.salesforce.grpc.contrib.GreeterGrpc; +import com.salesforce.grpc.contrib.HelloRequest; +import com.salesforce.grpc.contrib.HelloResponse; +import com.salesforce.grpc.contrib.MoreFutures; +import io.grpc.Context; +import io.grpc.Metadata; +import io.grpc.ServerInterceptors; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import org.awaitility.Duration; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.awaitility.Awaitility.await; + +public class AmbientContextTransferTest { + @Rule public final GrpcServerRule serverRule1 = new GrpcServerRule(); + @Rule public final GrpcServerRule serverRule2 = new GrpcServerRule(); + + @Before + public void setUp() throws Exception { + // Reset the gRPC context between test executions + Context.ROOT.attach(); + } + + @Test + public void contextTransfersOneHopSync() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue = "context-value"; + AtomicReference ctxValue = new AtomicReference<>(); + + // Service + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Plumbing + serverRule1.getServiceRegistry().addService(ServerInterceptors + .intercept(svc, new AmbientContextServerInterceptor("ctx-"))); + + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule1.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + stub.sayHello(HelloRequest.newBuilder().setName("world").build()); + }); + + assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); + } + + @Test + public void multiValueContextTransfers() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue1 = "context-value1"; + String expectedCtxValue2 = "context-value2"; + String expectedCtxValue3 = "context-value3"; + AtomicReference> ctxValue = new AtomicReference<>(); + + // Service + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().getAll(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Plumbing + serverRule1.getServiceRegistry().addService(ServerInterceptors + .intercept(svc, new AmbientContextServerInterceptor("ctx-"))); + + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule1.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue1); + AmbientContext.current().put(ctxKey, expectedCtxValue2); + AmbientContext.current().put(ctxKey, expectedCtxValue3); + stub.sayHello(HelloRequest.newBuilder().setName("world").build()); + }); + + assertThat(ctxValue.get()).containsExactlyInAnyOrder(expectedCtxValue1, expectedCtxValue2, expectedCtxValue3); + } + + @Test + public void contextTransfersTwoHopSync() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue = "context-value"; + AtomicReference ctxValue = new AtomicReference<>(); + + // Terminal service + GreeterGrpc.GreeterImplBase svc2 = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Terminal service plumbing + serverRule2.getServiceRegistry().addService(ServerInterceptors + .intercept(svc2, new AmbientContextServerInterceptor("ctx-"))); + GreeterGrpc.GreeterBlockingStub stub2 = GreeterGrpc + .newBlockingStub(serverRule2.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Relay service + GreeterGrpc.GreeterImplBase svc1 = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + responseObserver.onNext(stub2.sayHello(request)); + responseObserver.onCompleted(); + } + }; + + // Relay service plumbing + serverRule1.getServiceRegistry().addService(ServerInterceptors + .intercept(svc1, new AmbientContextServerInterceptor("ctx-"))); + GreeterGrpc.GreeterBlockingStub stub1 = GreeterGrpc + .newBlockingStub(serverRule1.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + stub1.sayHello(HelloRequest.newBuilder().setName("world").build()); + }); + + assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); + } + + @Test + public void contextTransfersOneHopAsync() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue = "context-value"; + AtomicReference ctxValue = new AtomicReference<>(); + + // Service + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Plumbing + serverRule1.getServiceRegistry().addService(ServerInterceptors + .intercept(svc, new AmbientContextServerInterceptor("ctx-"))); + GreeterGrpc.GreeterFutureStub stub = GreeterGrpc + .newFutureStub(serverRule1.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + ListenableFuture futureResponse = stub.sayHello(HelloRequest.newBuilder().setName("world").build()); + + // Verify response callbacks still have context + MoreFutures.onSuccess( + futureResponse, + response -> assertThat(AmbientContext.current().get(ctxKey)).isEqualTo(expectedCtxValue), + Context.currentContextExecutor(Executors.newSingleThreadExecutor())); + + await().atMost(Duration.ONE_SECOND).until(futureResponse::isDone); + }); + + assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); + } + + @Test + public void multipleContextTransfersOneHopSync() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of("ctx-context-key", Metadata.ASCII_STRING_MARSHALLER); + Metadata.Key l5dKey = Metadata.Key.of("l5d-context-key", Metadata.ASCII_STRING_MARSHALLER); + String expectedCtxValue = "context-value"; + AtomicReference ctxValue = new AtomicReference<>(); + AtomicReference l5dValue = new AtomicReference<>(); + + // Service + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + l5dValue.set(AmbientContext.current().get(l5dKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Plumbing + serverRule1.getServiceRegistry().addService(ServerInterceptors.intercept(svc, + new AmbientContextServerInterceptor("ctx-"), + new AmbientContextServerInterceptor("l5d-"))); + + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule1.getChannel()) + .withInterceptors( + new AmbientContextClientInterceptor("ctx-"), + new AmbientContextClientInterceptor("l5d-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + AmbientContext.current().put(l5dKey, expectedCtxValue); + stub.sayHello(HelloRequest.newBuilder().setName("world").build()); + }); + + assertThat(ctxValue.get()).isEqualTo(expectedCtxValue); + assertThat(l5dValue.get()).isEqualTo(expectedCtxValue); + } +} diff --git a/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java new file mode 100644 index 00000000..3dbb4722 --- /dev/null +++ b/grpc-contrib/src/test/java/com/salesforce/grpc/contrib/context/BinaryAmbientContextTest.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2017, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.txt file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ + +package com.salesforce.grpc.contrib.context; + +import com.google.common.io.BaseEncoding; +import com.salesforce.grpc.contrib.GreeterGrpc; +import com.salesforce.grpc.contrib.HelloRequest; +import com.salesforce.grpc.contrib.HelloResponse; +import io.grpc.Context; +import io.grpc.Metadata; +import io.grpc.ServerInterceptors; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import org.junit.Rule; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; + +public class BinaryAmbientContextTest { + @Rule public final GrpcServerRule serverRule = new GrpcServerRule(); + + @Test + public void binaryContextValueTransfers() throws Exception { + Metadata.Key ctxKey = Metadata.Key.of( + "ctx-context-key" + Metadata.BINARY_HEADER_SUFFIX, + Metadata.BINARY_BYTE_MARSHALLER); + byte[] expectedCtxValue = BaseEncoding.base16().decode("DEADBEEF"); + AtomicReference ctxValue = new AtomicReference<>(); + + // Service + GreeterGrpc.GreeterImplBase svc = new GreeterGrpc.GreeterImplBase() { + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + ctxValue.set(AmbientContext.current().get(ctxKey)); + responseObserver.onNext(HelloResponse.newBuilder().setMessage("Hello " + request.getName()).build()); + responseObserver.onCompleted(); + } + }; + + // Plumbing + serverRule.getServiceRegistry().addService(ServerInterceptors + .intercept(svc, new AmbientContextServerInterceptor("ctx-"))); + + GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc + .newBlockingStub(serverRule.getChannel()) + .withInterceptors(new AmbientContextClientInterceptor("ctx-")); + + // Test + AmbientContext.initialize(Context.current()).run(() -> { + AmbientContext.current().put(ctxKey, expectedCtxValue); + stub.sayHello(HelloRequest.newBuilder().setName("world").build()); + }); + + assertThat(ctxValue.get()).containsExactly(expectedCtxValue); + } +} diff --git a/grpc-contrib/src/test/proto/helloworld.proto b/grpc-contrib/src/test/proto/helloworld.proto index 1c441cd7..d1abddce 100644 --- a/grpc-contrib/src/test/proto/helloworld.proto +++ b/grpc-contrib/src/test/proto/helloworld.proto @@ -6,7 +6,7 @@ import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; option java_multiple_files = true; -option java_package = "com.salesforce.jprotoc"; +option java_package = "com.salesforce.grpc.contrib"; option java_outer_classname = "HelloWorldProto"; // The greeting service definition.