From 5a79d60f4129782bb223e4a811bad2e91837954a Mon Sep 17 00:00:00 2001 From: minwoox Date: Tue, 25 Jun 2019 20:19:22 +0900 Subject: [PATCH] Add brave module --- brave/build.gradle | 6 + .../armeria/client/brave/BraveClient.java | 68 +++ .../armeria/client/brave/package-info.java | 24 + .../RequestContextCurrentTraceContext.java | 311 ++++++++++++ .../armeria/common/brave/package-info.java | 23 + .../armeria/server/brave/BraveService.java | 57 +++ .../armeria/server/brave/package-info.java | 24 + .../armeria/client/brave/BraveClientTest.java | 241 +++++++++ ...RequestContextCurrentTraceContextTest.java | 193 ++++++++ .../common/brave/SpanCollectingReporter.java | 37 ++ .../it/brave/BraveIntegrationTest.java | 466 ++++++++++++++++++ .../server/brave/BraveServiceTest.java | 195 ++++++++ brave/src/test/thrift/HelloService.thrift | 5 + settings.gradle | 1 + site/src/sphinx/advanced-zipkin.rst | 2 +- zipkin/build.gradle | 1 - .../client/tracing/HttpTracingClient.java | 40 +- .../armeria/client/tracing/TracingClient.java | 162 ------ .../RequestContextCurrentTraceContext.java | 9 +- .../internal/tracing/package-info.java | 1 + .../server/tracing/HttpTracingService.java | 26 +- .../server/tracing/TracingService.java | 106 ---- .../client/tracing/TracingClientTest.java | 7 +- .../tracing/HttpTracingIntegrationTest.java | 21 +- .../server/tracing/TracingServiceTest.java | 6 +- 25 files changed, 1726 insertions(+), 306 deletions(-) create mode 100644 brave/build.gradle create mode 100644 brave/src/main/java/com/linecorp/armeria/client/brave/BraveClient.java create mode 100644 brave/src/main/java/com/linecorp/armeria/client/brave/package-info.java create mode 100644 brave/src/main/java/com/linecorp/armeria/common/brave/RequestContextCurrentTraceContext.java create mode 100644 brave/src/main/java/com/linecorp/armeria/common/brave/package-info.java create mode 100644 brave/src/main/java/com/linecorp/armeria/server/brave/BraveService.java create mode 100644 brave/src/main/java/com/linecorp/armeria/server/brave/package-info.java create mode 100644 brave/src/test/java/com/linecorp/armeria/client/brave/BraveClientTest.java create mode 100644 brave/src/test/java/com/linecorp/armeria/common/brave/RequestContextCurrentTraceContextTest.java create mode 100644 brave/src/test/java/com/linecorp/armeria/common/brave/SpanCollectingReporter.java create mode 100644 brave/src/test/java/com/linecorp/armeria/it/brave/BraveIntegrationTest.java create mode 100644 brave/src/test/java/com/linecorp/armeria/server/brave/BraveServiceTest.java create mode 100644 brave/src/test/thrift/HelloService.thrift delete mode 100644 zipkin/src/main/java/com/linecorp/armeria/client/tracing/TracingClient.java delete mode 100644 zipkin/src/main/java/com/linecorp/armeria/server/tracing/TracingService.java diff --git a/brave/build.gradle b/brave/build.gradle new file mode 100644 index 00000000000..f03015acf56 --- /dev/null +++ b/brave/build.gradle @@ -0,0 +1,6 @@ +dependencies { + testCompile project(':thrift') + + compile project(':zipkin') + compile 'io.zipkin.brave:brave-instrumentation-http' +} diff --git a/brave/src/main/java/com/linecorp/armeria/client/brave/BraveClient.java b/brave/src/main/java/com/linecorp/armeria/client/brave/BraveClient.java new file mode 100644 index 00000000000..8dc662125d8 --- /dev/null +++ b/brave/src/main/java/com/linecorp/armeria/client/brave/BraveClient.java @@ -0,0 +1,68 @@ +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.brave; + +import static com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext.ensureScopeUsesRequestContext; + +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linecorp.armeria.client.Client; +import com.linecorp.armeria.client.tracing.HttpTracingClient; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext; + +import brave.Tracing; +import brave.http.HttpTracing; + +/** + * Decorates a {@link Client} to trace outbound {@link HttpRequest}s using + * Brave. + * + *

This decorator puts trace data into HTTP headers. The specifications of header names and its values + * correspond to Zipkin. + */ +public final class BraveClient extends HttpTracingClient { + + private static final Logger logger = LoggerFactory.getLogger(BraveClient.class); + + /** + * Creates a new tracing {@link Client} decorator using the specified {@link Tracing} instance. + */ + public static Function, BraveClient> newDecorator( + HttpTracing httpTracing) { + try { + ensureScopeUsesRequestContext(httpTracing.tracing()); + } catch (IllegalStateException e) { + logger.warn("{} - it is appropriate to ignore this warning if this client is not being used " + + "inside an Armeria server (e.g., this is a normal spring-mvc tomcat server).", + e.getMessage()); + } + return delegate -> new BraveClient(delegate, httpTracing); + } + + /** + * Creates a new instance. + */ + private BraveClient(Client delegate, HttpTracing httpTracing) { + super(delegate, httpTracing.tracing(), httpTracing.serverName(), + RequestContextCurrentTraceContext::copy); + } +} diff --git a/brave/src/main/java/com/linecorp/armeria/client/brave/package-info.java b/brave/src/main/java/com/linecorp/armeria/client/brave/package-info.java new file mode 100644 index 00000000000..ffa31e15084 --- /dev/null +++ b/brave/src/main/java/com/linecorp/armeria/client/brave/package-info.java @@ -0,0 +1,24 @@ +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Distributed tracing clients based on Brave, + * a Java tracing library compatible with Zipkin. + */ +@NonNullByDefault +package com.linecorp.armeria.client.brave; + +import com.linecorp.armeria.common.util.NonNullByDefault; diff --git a/brave/src/main/java/com/linecorp/armeria/common/brave/RequestContextCurrentTraceContext.java b/brave/src/main/java/com/linecorp/armeria/common/brave/RequestContextCurrentTraceContext.java new file mode 100644 index 00000000000..ccb9a3cdaf7 --- /dev/null +++ b/brave/src/main/java/com/linecorp/armeria/common/brave/RequestContextCurrentTraceContext.java @@ -0,0 +1,311 @@ +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.brave; + +import static java.util.Objects.requireNonNull; + +import java.util.Collections; +import java.util.function.Function; +import java.util.function.Supplier; + +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +import com.linecorp.armeria.client.brave.BraveClient; +import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.server.brave.BraveService; + +import brave.Tracing; +import brave.propagation.CurrentTraceContext; +import brave.propagation.TraceContext; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; + +/** + * {@linkplain Tracing.Builder#currentTraceContext(CurrentTraceContext) Tracing + * context} implemented with {@link RequestContext}. + * + *

This {@link CurrentTraceContext} stores/loads the trace context into/from a + * {@link RequestContext}'s attribute so that there's no need for thread local variables + * which can lead to unpredictable behavior in asynchronous programming. + */ +public final class RequestContextCurrentTraceContext extends CurrentTraceContext { + + /** + * Use this singleton when building a {@link Tracing} instance for use with + * {@link BraveService} or {@link BraveClient}. + * + *

If you need to customize the context, use {@link #builder()} instead. + * + * @see Tracing.Builder#currentTraceContext(CurrentTraceContext) + */ + public static final CurrentTraceContext DEFAULT = new RequestContextCurrentTraceContext(new Builder()); + + private static final Logger logger = LoggerFactory.getLogger(RequestContextCurrentTraceContext.class); + private static final AttributeKey TRACE_CONTEXT_KEY = + AttributeKey.valueOf(RequestContextCurrentTraceContext.class, "TRACE_CONTEXT"); + + // Thread-local for storing TraceContext when invoking callbacks off the request thread. + private static final ThreadLocal THREAD_LOCAL_CONTEXT = new ThreadLocal<>(); + + private static final Scope INCOMPLETE_CONFIGURATION_SCOPE = new Scope() { + @Override + public void close() { + } + + @Override + public String toString() { + return "IncompleteConfigurationScope"; + } + }; + + private static final Scope INITIAL_REQUEST_SCOPE = new Scope() { + @Override + public void close() { + // Don't remove the outer-most context (client or server request) + } + + @Override + public String toString() { + return "InitialRequestScope"; + } + }; + + static final class Builder extends CurrentTraceContext.Builder { + + @Override + public CurrentTraceContext build() { + return new RequestContextCurrentTraceContext(this); + } + } + + /** + * Use this when you need customizations such as log integration via + * {@linkplain Builder#addScopeDecorator(ScopeDecorator)}. + * + * @see Tracing.Builder#currentTraceContext(CurrentTraceContext) + */ + public static CurrentTraceContext.Builder builder() { + return new Builder(); + } + + /** + * Ensures the specified {@link Tracing} uses a {@link RequestContextCurrentTraceContext}. + * + * @throws IllegalStateException if {@code tracing} does not use {@link RequestContextCurrentTraceContext} + */ + public static void ensureScopeUsesRequestContext(Tracing tracing) { + requireNonNull(tracing, "tracing"); + final PingPongExtra extra = new PingPongExtra(); + // trace contexts are not recorded until Tracer.toSpan, so this won't end up as junk data + final TraceContext dummyContext = TraceContext.newBuilder().traceId(1).spanId(1) + .extra(Collections.singletonList(extra)).build(); + final boolean scopeUsesRequestContext; + try (Scope scope = tracing.currentTraceContext().newScope(dummyContext)) { + scopeUsesRequestContext = extra.isPong(); + } + if (!scopeUsesRequestContext) { + throw new IllegalStateException( + "Tracing.currentTraceContext is not a " + RequestContextCurrentTraceContext.class + .getSimpleName() + " scope. " + + "Please call Tracing.Builder.currentTraceContext(" + RequestContextCurrentTraceContext.class + .getSimpleName() + ".DEFAULT)."); + } + } + + /** + * Use this to ensure the trace context propagates to children. + * + *

Ex. + *

{@code
+     *  // Ensure the trace context propagates to children
+     * ctx.onChild(RequestContextCurrentTraceContext::copy);
+     * }
+ */ + public static void copy(RequestContext src, RequestContext dst) { + dst.attr(TRACE_CONTEXT_KEY).set(src.attr(TRACE_CONTEXT_KEY).get()); + } + + private RequestContextCurrentTraceContext(Builder builder) { + super(builder); + } + + @Override + @Nullable + public TraceContext get() { + final RequestContext ctx = getRequestContextOrWarnOnce(); + if (ctx == null) { + return null; + } + if (ctx.eventLoop().inEventLoop()) { + return ctx.attr(TRACE_CONTEXT_KEY).get(); + } else { + final TraceContext threadLocalContext = THREAD_LOCAL_CONTEXT.get(); + if (threadLocalContext != null) { + return threadLocalContext; + } + // First span on a non-request thread will use the request's TraceContext as a parent. + return ctx.attr(TRACE_CONTEXT_KEY).get(); + } + } + + @Override + public Scope newScope(@Nullable TraceContext currentSpan) { + // Handle inspection added to ensure we can fail-fast if this isn't installed. + if (currentSpan != null && PingPongExtra.maybeSetPong(currentSpan)) { + return Scope.NOOP; + } + + final RequestContext ctx = getRequestContextOrWarnOnce(); + if (ctx == null) { + return INCOMPLETE_CONFIGURATION_SCOPE; + } + + if (ctx.eventLoop().inEventLoop()) { + return createScopeForRequestThread(ctx, currentSpan); + } else { + // The RequestContext is the canonical thread-local storage for the thread processing the request. + // However, when creating spans on other threads (e.g., a thread-pool), we must use separate + // thread-local storage to prevent threads from replacing the same trace context. + return createScopeForNonRequestThread(currentSpan); + } + } + + private Scope createScopeForRequestThread(RequestContext ctx, @Nullable TraceContext currentSpan) { + final Attribute traceContextAttribute = ctx.attr(TRACE_CONTEXT_KEY); + + final TraceContext previous = traceContextAttribute.getAndSet(currentSpan); + + // Don't remove the outer-most context (client or server request) + if (previous == null) { + return decorateScope(currentSpan, INITIAL_REQUEST_SCOPE); + } + + // Removes sub-spans (i.e. local spans) from the current context when Brave's scope does. + // If an asynchronous sub-span, it may still complete later. + class RequestContextTraceContextScope implements Scope { + @Override + public void close() { + // re-lookup the attribute to avoid holding a reference to the request if this scope is leaked + getTraceContextAttributeOrWarnOnce().set(previous); + } + + @Override + public String toString() { + return "RequestContextTraceContextScope"; + } + } + + return decorateScope(currentSpan, new RequestContextTraceContextScope()); + } + + private Scope createScopeForNonRequestThread(@Nullable TraceContext currentSpan) { + final TraceContext previous = THREAD_LOCAL_CONTEXT.get(); + THREAD_LOCAL_CONTEXT.set(currentSpan); + class ThreadLocalScope implements Scope { + @Override + public void close() { + THREAD_LOCAL_CONTEXT.set(previous); + } + + @Override + public String toString() { + return "ThreadLocalScope"; + } + } + + return decorateScope(currentSpan, new ThreadLocalScope()); + } + + /** + * Armeria code should always have a request context available, and this won't work without it. + */ + @Nullable + private static RequestContext getRequestContextOrWarnOnce() { + return RequestContext.mapCurrent(Function.identity(), LogRequestContextWarningOnce.INSTANCE); + } + + /** + * Armeria code should always have a request context available, and this won't work without it. + */ + @Nullable + private static Attribute getTraceContextAttributeOrWarnOnce() { + final RequestContext ctx = getRequestContextOrWarnOnce(); + if (ctx == null) { + return null; + } + return ctx.attr(TRACE_CONTEXT_KEY); + } + + private enum LogRequestContextWarningOnce implements Supplier { + + INSTANCE; + + @Override + @Nullable + public RequestContext get() { + ClassLoaderHack.loadMe(); + return null; + } + + /** + * This won't be referenced until {@link #get()} is called. If there's only one classloader, the + * initializer will only be called once. + */ + private static final class ClassLoaderHack { + static void loadMe() {} + + static { + logger.warn("Attempted to propagate trace context, but no request context available. " + + "Did you forget to use RequestContext.contextAwareExecutor() or " + + "RequestContext.makeContextAware()?", new NoRequestContextException()); + } + } + + private static final class NoRequestContextException extends RuntimeException { + private static final long serialVersionUID = 2804189311774982052L; + } + } + + /** Hack to allow us to peek inside a current trace context implementation. */ + @VisibleForTesting + static final class PingPongExtra { + /** + * If the input includes only this extra, set {@link #isPong() pong = true}. + */ + static boolean maybeSetPong(TraceContext context) { + if (context.extra().size() == 1) { + final Object extra = context.extra().get(0); + if (extra instanceof PingPongExtra) { + ((PingPongExtra) extra).pong = true; + return true; + } + } + return false; + } + + private boolean pong; + + boolean isPong() { + return pong; + } + } +} diff --git a/brave/src/main/java/com/linecorp/armeria/common/brave/package-info.java b/brave/src/main/java/com/linecorp/armeria/common/brave/package-info.java new file mode 100644 index 00000000000..094194fae0b --- /dev/null +++ b/brave/src/main/java/com/linecorp/armeria/common/brave/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Common classes for distributed tracing support based on Brave. + */ +@NonNullByDefault +package com.linecorp.armeria.common.brave; + +import com.linecorp.armeria.common.util.NonNullByDefault; diff --git a/brave/src/main/java/com/linecorp/armeria/server/brave/BraveService.java b/brave/src/main/java/com/linecorp/armeria/server/brave/BraveService.java new file mode 100644 index 00000000000..709ea4fef92 --- /dev/null +++ b/brave/src/main/java/com/linecorp/armeria/server/brave/BraveService.java @@ -0,0 +1,57 @@ +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server.brave; + +import static com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext.ensureScopeUsesRequestContext; + +import java.util.function.Function; + +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext; +import com.linecorp.armeria.server.Service; +import com.linecorp.armeria.server.tracing.HttpTracingService; + +import brave.Tracing; +import brave.http.HttpTracing; + +/** + * Decorates a {@link Service} to trace inbound {@link HttpRequest}s using + * Brave. + * + *

This decorator retrieves trace data from HTTP headers. The specifications of header names and its values + * correspond to Zipkin. + */ +public final class BraveService extends HttpTracingService { + + /** + * Creates a new tracing {@link Service} decorator using the specified {@link HttpTracing} instance. + */ + public static Function, BraveService> newDecorator( + HttpTracing httpTracing) { + final Tracing tracing = httpTracing.tracing(); + ensureScopeUsesRequestContext(tracing); + return service -> new BraveService(service, tracing); + } + + /** + * Creates a new instance. + */ + private BraveService(Service delegate, Tracing tracing) { + super(delegate, tracing, RequestContextCurrentTraceContext::copy); + } +} diff --git a/brave/src/main/java/com/linecorp/armeria/server/brave/package-info.java b/brave/src/main/java/com/linecorp/armeria/server/brave/package-info.java new file mode 100644 index 00000000000..d5810023f20 --- /dev/null +++ b/brave/src/main/java/com/linecorp/armeria/server/brave/package-info.java @@ -0,0 +1,24 @@ +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * Distributed tracing services based on Brave, + * a Java tracing library compatible with Zipkin. + */ +@NonNullByDefault +package com.linecorp.armeria.server.brave; + +import com.linecorp.armeria.common.util.NonNullByDefault; diff --git a/brave/src/test/java/com/linecorp/armeria/client/brave/BraveClientTest.java b/brave/src/test/java/com/linecorp/armeria/client/brave/BraveClientTest.java new file mode 100644 index 00000000000..471448790a9 --- /dev/null +++ b/brave/src/test/java/com/linecorp/armeria/client/brave/BraveClientTest.java @@ -0,0 +1,241 @@ +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.client.brave; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.client.Client; +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.ClientRequestContextBuilder; +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.common.HttpHeaderNames; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.RpcRequest; +import com.linecorp.armeria.common.RpcResponse; +import com.linecorp.armeria.common.brave.HelloService; +import com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext; +import com.linecorp.armeria.common.brave.SpanCollectingReporter; +import com.linecorp.armeria.common.util.SafeCloseable; + +import brave.Tracing; +import brave.http.HttpTracing; +import brave.propagation.CurrentTraceContext; +import brave.propagation.CurrentTraceContext.ScopeDecorator; +import brave.sampler.Sampler; +import zipkin2.Span; +import zipkin2.Span.Kind; + +class BraveClientTest { + + private static final String TEST_SERVICE = "test-service"; + + private static final String TEST_SPAN = "hello"; + + @AfterEach + void tearDown() { + Tracing.current().close(); + } + + @Test + void newDecorator_shouldWorkWhenRequestContextCurrentTraceContextNotConfigured() { + BraveClient.newDecorator(HttpTracing.create(Tracing.newBuilder().build())); + } + + @Test + void newDecorator_shouldWorkWhenRequestContextCurrentTraceContextConfigured() { + BraveClient.newDecorator(HttpTracing.create( + Tracing.newBuilder().currentTraceContext(RequestContextCurrentTraceContext.DEFAULT).build())); + } + + @Test + void shouldSubmitSpanWhenSampled() throws Exception { + final SpanCollectingReporter reporter = new SpanCollectingReporter(); + + final Tracing tracing = Tracing.newBuilder() + .localServiceName(TEST_SERVICE) + .spanReporter(reporter) + .sampler(Sampler.create(1.0f)) + .build(); + testRemoteInvocation(HttpTracing.create(tracing)); + + // check span name + final Span span = reporter.spans().take(); + assertThat(span.name()).isEqualTo(TEST_SPAN); + + // check kind + assertThat(span.kind()).isSameAs(Kind.CLIENT); + + // only one span should be submitted + assertThat(reporter.spans().poll(1, TimeUnit.SECONDS)).isNull(); + + // check # of annotations (we add wire annotations) + assertThat(span.annotations()).hasSize(2); + assertTags(span); + + assertThat(span.traceId().length()).isEqualTo(16); + + // check service name + assertThat(span.localServiceName()).isEqualTo(TEST_SERVICE); + + // check remote service name + assertThat(span.remoteServiceName()).isEqualTo(null); + } + + @Test + void shouldSubmitSpanWithCustomRemoteName() throws Exception { + final SpanCollectingReporter reporter = new SpanCollectingReporter(); + + final Tracing tracing = Tracing.newBuilder() + .localServiceName(TEST_SERVICE) + .spanReporter(reporter) + .sampler(Sampler.create(1.0f)) + .build(); + testRemoteInvocation(HttpTracing.create(tracing).clientOf("fooService")); + + // check span name + final Span span = reporter.spans().take(); + + // check tags + assertThat(span.tags()).containsEntry("http.host", "foo.com") + .containsEntry("http.method", "POST") + .containsEntry("http.path", "/hello/armeria") + .containsEntry("http.status_code", "200") + .containsEntry("http.url", "http://foo.com/hello/armeria") + .containsEntry("http.protocol", "h2c"); + + // check service name + assertThat(span.localServiceName()).isEqualTo(TEST_SERVICE); + + // check remote service name, lower-cased + assertThat(span.remoteServiceName()).isEqualTo("fooservice"); + } + + @Test + void scopeDecorator() throws Exception { + final SpanCollectingReporter reporter = new SpanCollectingReporter(); + final AtomicInteger scopeDecoratorCallingCounter = new AtomicInteger(); + final ScopeDecorator scopeDecorator = (currentSpan, scope) -> { + scopeDecoratorCallingCounter.getAndIncrement(); + return scope; + }; + final CurrentTraceContext traceContext = + RequestContextCurrentTraceContext.builder() + .addScopeDecorator(scopeDecorator) + .build(); + + final Tracing tracing = Tracing.newBuilder() + .localServiceName(TEST_SERVICE) + .currentTraceContext(traceContext) + .spanReporter(reporter) + .sampler(Sampler.create(1.0f)) + .build(); + testRemoteInvocation(HttpTracing.create(tracing)); + + // check span name + final Span span = reporter.spans().take(); + + // check tags + assertTags(span); + + // check service name + assertThat(span.localServiceName()).isEqualTo(TEST_SERVICE); + assertThat(scopeDecoratorCallingCounter.get()).isEqualTo(1); + } + + @Test + void shouldNotSubmitSpanWhenNotSampled() throws Exception { + final SpanCollectingReporter reporter = new SpanCollectingReporter(); + final Tracing tracing = Tracing.newBuilder() + .localServiceName(TEST_SERVICE) + .spanReporter(reporter) + .sampler(Sampler.create(0.0f)) + .build(); + testRemoteInvocation(HttpTracing.create(tracing)); + + assertThat(reporter.spans().poll(1, TimeUnit.SECONDS)).isNull(); + } + + private static void testRemoteInvocation(HttpTracing httpTracing) + throws Exception { + + // prepare parameters + final HttpRequest req = HttpRequest.of(RequestHeaders.of(HttpMethod.POST, "/hello/armeria", + HttpHeaderNames.AUTHORITY, "foo.com")); + final RpcRequest rpcReq = RpcRequest.of(HelloService.Iface.class, "hello", "Armeria"); + final HttpResponse res = HttpResponse.of(HttpStatus.OK); + final RpcResponse rpcRes = RpcResponse.of("Hello, Armeria!"); + final ClientRequestContext ctx = + ClientRequestContextBuilder.of(req) + .endpoint(Endpoint.of("localhost", 8080)) + .build(); + + ctx.logBuilder().requestFirstBytesTransferred(); + ctx.logBuilder().requestContent(rpcReq, req); + ctx.logBuilder().endRequest(); + + try (SafeCloseable ignored = ctx.push()) { + @SuppressWarnings("unchecked") + final Client delegate = mock(Client.class); + when(delegate.execute(any(), any())).thenReturn(res); + + final BraveClient stub = BraveClient.newDecorator(httpTracing).apply(delegate); + // do invoke + final HttpResponse actualRes = stub.execute(ctx, req); + + assertThat(actualRes).isEqualTo(res); + + verify(delegate, times(1)).execute(same(ctx), argThat(arg -> { + final RequestHeaders headers = arg.headers(); + return headers.contains(HttpHeaderNames.of("x-b3-traceid")) && + headers.contains(HttpHeaderNames.of("x-b3-spanid")) && + headers.contains(HttpHeaderNames.of("x-b3-sampled")); + })); + } + + ctx.logBuilder().responseHeaders(ResponseHeaders.of(HttpStatus.OK)); + ctx.logBuilder().responseFirstBytesTransferred(); + ctx.logBuilder().responseContent(rpcRes, res); + ctx.logBuilder().endResponse(); + } + + private static void assertTags(Span span) { + assertThat(span.tags()).containsEntry("http.host", "foo.com") + .containsEntry("http.method", "POST") + .containsEntry("http.path", "/hello/armeria") + .containsEntry("http.status_code", "200") + .containsEntry("http.url", "http://foo.com/hello/armeria") + .containsEntry("http.protocol", "h2c"); + } +} diff --git a/brave/src/test/java/com/linecorp/armeria/common/brave/RequestContextCurrentTraceContextTest.java b/brave/src/test/java/com/linecorp/armeria/common/brave/RequestContextCurrentTraceContextTest.java new file mode 100644 index 00000000000..203ec0e74f4 --- /dev/null +++ b/brave/src/test/java/com/linecorp/armeria/common/brave/RequestContextCurrentTraceContextTest.java @@ -0,0 +1,193 @@ +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.brave; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.when; + +import java.util.Collections; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; +import org.mockito.stubbing.Answer; + +import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext.PingPongExtra; +import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.internal.DefaultAttributeMap; + +import brave.propagation.CurrentTraceContext; +import brave.propagation.CurrentTraceContext.Scope; +import brave.propagation.TraceContext; +import io.netty.channel.EventLoop; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; + +public class RequestContextCurrentTraceContextTest { + @Rule + public MockitoRule mockitoRule = MockitoJUnit.rule(); + + @Mock(answer = Answers.CALLS_REAL_METHODS) + RequestContext mockRequestContext; + @Mock(answer = Answers.CALLS_REAL_METHODS) + RequestContext mockRequestContext2; + @Mock + EventLoop eventLoop; + + final CurrentTraceContext currentTraceContext = RequestContextCurrentTraceContext.DEFAULT; + final DefaultAttributeMap attrs1 = new DefaultAttributeMap(); + final DefaultAttributeMap attrs2 = new DefaultAttributeMap(); + final TraceContext traceContext = TraceContext.newBuilder().traceId(1).spanId(1).build(); + + @Before + public void setup() { + when(mockRequestContext.eventLoop()).thenReturn(eventLoop); + when(mockRequestContext2.eventLoop()).thenReturn(eventLoop); + when(eventLoop.inEventLoop()).thenReturn(true); + + when(mockRequestContext.attr(isA(AttributeKey.class))).thenAnswer( + (Answer) invocation -> attrs1.attr(invocation.getArgument(0))); + when(mockRequestContext2.attr(isA(AttributeKey.class))).thenAnswer( + (Answer) invocation -> attrs2.attr(invocation.getArgument(0))); + } + + @Test + public void copy() { + try (SafeCloseable requestContextScope = mockRequestContext.push()) { + try (Scope traceContextScope = currentTraceContext.newScope(traceContext)) { + RequestContextCurrentTraceContext.copy(mockRequestContext, mockRequestContext2); + assertThat(attrs1.attrs().next().get()) + .isEqualTo(traceContext) + .isEqualTo(attrs2.attrs().next().get()); + } + } + } + + @Test + public void get_returnsNullWhenNoCurrentRequestContext() { + assertThat(currentTraceContext.get()).isNull(); + } + + @Test + public void get_returnsNullWhenCurrentRequestContext_hasNoTraceAttribute() { + try (SafeCloseable requestContextScope = mockRequestContext.push()) { + assertThat(currentTraceContext.get()).isNull(); + } + } + + @Test + public void newScope_doesNothingWhenNoCurrentRequestContext() { + try (Scope traceContextScope = currentTraceContext.newScope(traceContext)) { + assertThat(traceContextScope).hasToString("IncompleteConfigurationScope"); + assertThat(currentTraceContext.get()).isNull(); + } + } + + @Test + public void newScope_appliesWhenCurrentRequestContext() { + try (SafeCloseable requestContextScope = mockRequestContext.push()) { + try (Scope traceContextScope = currentTraceContext.newScope(traceContext)) { + assertThat(traceContextScope).hasToString("InitialRequestScope"); + assertThat(currentTraceContext.get()).isEqualTo(traceContext); + } + } + } + + @Test + public void newScope_closeDoesntClearFirstScope() { + final TraceContext traceContext2 = TraceContext.newBuilder().traceId(1).spanId(2).build(); + + try (SafeCloseable requestContextScope = mockRequestContext.push()) { + try (Scope traceContextScope = currentTraceContext.newScope(traceContext)) { + assertThat(traceContextScope).hasToString("InitialRequestScope"); + assertThat(currentTraceContext.get()).isEqualTo(traceContext); + + try (Scope traceContextScope2 = currentTraceContext.newScope(traceContext2)) { + assertThat(traceContextScope2).hasToString("RequestContextTraceContextScope"); + assertThat(currentTraceContext.get()).isEqualTo(traceContext2); + } + assertThat(currentTraceContext.get()).isEqualTo(traceContext); + } + // the first scope is attached to the request context and cleared when that's destroyed + assertThat(currentTraceContext.get()).isEqualTo(traceContext); + } + } + + @Test + public void newScope_notOnEventLoop() { + final TraceContext traceContext2 = TraceContext.newBuilder().traceId(1).spanId(2).build(); + + try (SafeCloseable requestContextScope = mockRequestContext.push()) { + try (Scope traceContextScope = currentTraceContext.newScope(traceContext)) { + assertThat(traceContextScope).hasToString("InitialRequestScope"); + assertThat(currentTraceContext.get()).isEqualTo(traceContext); + + when(eventLoop.inEventLoop()).thenReturn(false); + try (Scope traceContextScope2 = currentTraceContext.newScope(traceContext2)) { + assertThat(traceContextScope2).hasToString("ThreadLocalScope"); + assertThat(currentTraceContext.get()).isEqualTo(traceContext2); + } + when(eventLoop.inEventLoop()).thenReturn(true); + assertThat(currentTraceContext.get()).isEqualTo(traceContext); + } + // the first scope is attached to the request context and cleared when that's destroyed + assertThat(currentTraceContext.get()).isEqualTo(traceContext); + } + } + + @Test + public void newScope_canClearScope() { + try (SafeCloseable requestContextScope = mockRequestContext.push()) { + try (Scope traceContextScope = currentTraceContext.newScope(traceContext)) { + try (Scope traceContextScope2 = currentTraceContext.newScope(null)) { + assertThat(currentTraceContext.get()).isNull(); + } + assertThat(currentTraceContext.get()).isEqualTo(traceContext); + } + } + } + + @Test + public void newScope_respondsToPing() { + final PingPongExtra extra = new PingPongExtra(); + final TraceContext extraContext = TraceContext.newBuilder().traceId(1).spanId(1) + .extra(Collections.singletonList(extra)).build(); + + try (Scope traceContextScope = currentTraceContext.newScope(extraContext)) { + assertThat(traceContextScope).hasToString("NoopScope"); + assertThat(extra.isPong()).isTrue(); + } + } + + @Test + public void shouldSetPongIfOnlyExtra() { + final PingPongExtra extra = new PingPongExtra(); + + final TraceContext context = TraceContext.newBuilder().traceId(1).spanId(1) + .extra(Collections.singletonList(extra)).build(); + + PingPongExtra.maybeSetPong(context); + + assertThat(extra.isPong()).isTrue(); + } +} diff --git a/brave/src/test/java/com/linecorp/armeria/common/brave/SpanCollectingReporter.java b/brave/src/test/java/com/linecorp/armeria/common/brave/SpanCollectingReporter.java new file mode 100644 index 00000000000..03640560b89 --- /dev/null +++ b/brave/src/test/java/com/linecorp/armeria/common/brave/SpanCollectingReporter.java @@ -0,0 +1,37 @@ +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.common.brave; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedTransferQueue; + +import zipkin2.Span; +import zipkin2.reporter.Reporter; + +public final class SpanCollectingReporter implements Reporter { + + private final BlockingQueue spans = new LinkedTransferQueue<>(); + + @Override + public void report(Span span) { + spans.add(span); + } + + public BlockingQueue spans() { + return spans; + } +} diff --git a/brave/src/test/java/com/linecorp/armeria/it/brave/BraveIntegrationTest.java b/brave/src/test/java/com/linecorp/armeria/it/brave/BraveIntegrationTest.java new file mode 100644 index 00000000000..1ea74f138f0 --- /dev/null +++ b/brave/src/test/java/com/linecorp/armeria/it/brave/BraveIntegrationTest.java @@ -0,0 +1,466 @@ +/* + * Copyright 2016 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.it.brave; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.util.concurrent.Futures.allAsList; +import static com.google.common.util.concurrent.Futures.transformAsync; +import static com.linecorp.armeria.common.HttpStatus.OK; +import static com.linecorp.armeria.common.thrift.ThriftSerializationFormats.BINARY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.IntStream; + +import org.apache.thrift.async.AsyncMethodCallback; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +import com.linecorp.armeria.client.ClientBuilder; +import com.linecorp.armeria.client.Clients; +import com.linecorp.armeria.client.HttpClient; +import com.linecorp.armeria.client.InvalidResponseHeadersException; +import com.linecorp.armeria.client.ResponseTimeoutException; +import com.linecorp.armeria.client.brave.BraveClient; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.RequestContext; +import com.linecorp.armeria.common.brave.HelloService; +import com.linecorp.armeria.common.brave.HelloService.AsyncIface; +import com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext; +import com.linecorp.armeria.common.thrift.ThriftCompletableFuture; +import com.linecorp.armeria.server.AbstractHttpService; +import com.linecorp.armeria.server.ServerBuilder; +import com.linecorp.armeria.server.Service; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.brave.BraveService; +import com.linecorp.armeria.server.thrift.THttpService; +import com.linecorp.armeria.testing.junit4.server.ServerRule; + +import brave.ScopedSpan; +import brave.Tracer.SpanInScope; +import brave.Tracing; +import brave.http.HttpTracing; +import brave.propagation.CurrentTraceContext; +import brave.propagation.StrictScopeDecorator; +import brave.sampler.Sampler; +import zipkin2.Span; +import zipkin2.reporter.Reporter; + +public class BraveIntegrationTest { + + private static final ReporterImpl spanReporter = new ReporterImpl(); + + private HelloService.Iface fooClient; + private HelloService.Iface fooClientWithoutTracing; + private HelloService.Iface timeoutClient; + private HelloService.Iface timeoutClientClientTimesOut; + private HelloService.AsyncIface barClient; + private HelloService.AsyncIface quxClient; + private HelloService.Iface zipClient; + private HttpClient poolHttpClient; + + @Rule + public final ServerRule server = new ServerRule() { + @Override + protected void configure(ServerBuilder sb) throws Exception { + // Our test that triggers a timeout will take a second to run. Hopefully it doesn't cause flakiness + // for being too short. + sb.requestTimeout(Duration.ofSeconds(1)); + + sb.service("/foo", decorate("service/foo", THttpService.of( + (AsyncIface) (name, resultHandler) -> + barClient.hello("Miss. " + name, new DelegatingCallback(resultHandler))))); + + sb.service("/bar", decorate("service/bar", THttpService.of( + (AsyncIface) (name, resultHandler) -> { + if (name.startsWith("Miss. ")) { + name = "Ms. " + name.substring(6); + } + quxClient.hello(name, new DelegatingCallback(resultHandler)); + }))); + + sb.service("/zip", decorate("service/zip", THttpService.of( + (AsyncIface) (name, resultHandler) -> { + final ThriftCompletableFuture f1 = new ThriftCompletableFuture<>(); + final ThriftCompletableFuture f2 = new ThriftCompletableFuture<>(); + quxClient.hello(name, f1); + quxClient.hello(name, f2); + CompletableFuture.allOf(f1, f2).whenCompleteAsync((aVoid, throwable) -> { + resultHandler.onComplete(f1.join() + ", and " + f2.join()); + }, RequestContext.current().contextAwareExecutor()); + }))); + + sb.service("/qux", decorate("service/qux", THttpService.of( + (AsyncIface) (name, resultHandler) -> resultHandler.onComplete("Hello, " + name + '!')))); + + sb.service("/pool", decorate("service/pool", new AbstractHttpService() { + @Override + protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) + throws Exception { + final ListeningExecutorService executorService = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(2)); + final CountDownLatch countDownLatch = new CountDownLatch(2); + + final ListenableFuture> spanAware = allAsList(IntStream.range(1, 3).mapToObj( + i -> executorService.submit( + RequestContext.current().makeContextAware(() -> { + if (i == 2) { + countDownLatch.countDown(); + countDownLatch.await(); + } + brave.Span span = Tracing.currentTracer().nextSpan().start(); + try (SpanInScope spanInScope = + Tracing.currentTracer().withSpanInScope(span)) { + if (i == 1) { + countDownLatch.countDown(); + countDownLatch.await(); + // to wait second task get span. + Thread.sleep(1000L); + } + } finally { + span.finish(); + } + return null; + }))).collect(toImmutableList())); + + final CompletableFuture responseFuture = new CompletableFuture<>(); + final HttpResponse res = HttpResponse.from(responseFuture); + transformAsync(spanAware, + result -> allAsList(IntStream.range(1, 3).mapToObj( + i -> executorService.submit( + RequestContext.current().makeContextAware(() -> { + ScopedSpan span = Tracing.currentTracer() + .startScopedSpan("aloha"); + try { + return null; + } finally { + span.finish(); + } + }) + )).collect(toImmutableList())), + RequestContext.current().contextAwareExecutor()) + .addListener(() -> { + responseFuture.complete(HttpResponse.of(OK, MediaType.PLAIN_TEXT_UTF_8, "Lee")); + }, RequestContext.current().contextAwareExecutor()); + return res; + } + })); + + sb.service("/timeout", decorate("service/timeout", THttpService.of( + // This service never calls the handler and will timeout. + (AsyncIface) (name, resultHandler) -> { + }))); + } + }; + + @Before + public void setupClients() { + fooClient = new ClientBuilder(server.uri(BINARY, "/foo")) + .decorator(BraveClient.newDecorator(newTracing("client/foo"))) + .build(HelloService.Iface.class); + zipClient = new ClientBuilder(server.uri(BINARY, "/zip")) + .decorator(BraveClient.newDecorator(newTracing("client/zip"))) + .build(HelloService.Iface.class); + fooClientWithoutTracing = Clients.newClient(server.uri(BINARY, "/foo"), HelloService.Iface.class); + barClient = newClient("/bar"); + quxClient = newClient("/qux"); + poolHttpClient = HttpClient.of(server.uri("/")); + timeoutClient = new ClientBuilder(server.uri(BINARY, "/timeout")) + .decorator(BraveClient.newDecorator(newTracing("client/timeout"))) + .build(HelloService.Iface.class); + timeoutClientClientTimesOut = new ClientBuilder(server.uri(BINARY, "/timeout")) + .decorator(BraveClient.newDecorator(newTracing("client/timeout"))) + .responseTimeout(Duration.ofMillis(10)) + .build(HelloService.Iface.class); + } + + @After + public void tearDown() { + Tracing.current().close(); + } + + @After + public void shouldHaveNoExtraSpans() { + assertThat(spanReporter.spans).isEmpty(); + } + + private static BraveService decorate(String name, Service service) { + return BraveService.newDecorator(newTracing(name)).apply(service); + } + + private HelloService.AsyncIface newClient(String path) { + return new ClientBuilder(server.uri(BINARY, path)) + .decorator(BraveClient.newDecorator(newTracing("client" + path))) + .build(HelloService.AsyncIface.class); + } + + private static HttpTracing newTracing(String name) { + final CurrentTraceContext currentTraceContext = + RequestContextCurrentTraceContext.builder() + .addScopeDecorator(StrictScopeDecorator.create()) + .build(); + return HttpTracing.create(Tracing.newBuilder() + .currentTraceContext(currentTraceContext) + .localServiceName(name) + .spanReporter(spanReporter) + .sampler(Sampler.ALWAYS_SAMPLE) + .build()); + } + + @Test(timeout = 10000) + public void testServiceHasMultipleClientRequests() throws Exception { + assertThat(zipClient.hello("Lee")).isEqualTo("Hello, Lee!, and Hello, Lee!"); + + final Span[] spans = spanReporter.take(6); + final String traceId = spans[0].traceId(); + assertThat(spans).allMatch(s -> s.traceId().equals(traceId)); + } + + @Test(timeout = 10000) + public void testClientInitiatedTrace() throws Exception { + assertThat(fooClient.hello("Lee")).isEqualTo("Hello, Ms. Lee!"); + + final Span[] spans = spanReporter.take(6); + final String traceId = spans[0].traceId(); + assertThat(spans).allMatch(s -> s.traceId().equals(traceId)); + + // Find all spans. + final Span clientFooSpan = findSpan(spans, "client/foo"); + final Span serviceFooSpan = findSpan(spans, "service/foo"); + final Span clientBarSpan = findSpan(spans, "client/bar"); + final Span serviceBarSpan = findSpan(spans, "service/bar"); + final Span clientQuxSpan = findSpan(spans, "client/qux"); + final Span serviceQuxSpan = findSpan(spans, "service/qux"); + + // client/foo and service/foo should have no parents. + assertThat(clientFooSpan.parentId()).isNull(); + assertThat(serviceFooSpan.parentId()).isNull(); + + // client/foo and service/foo should have the ID values identical with their traceIds. + assertThat(clientFooSpan.id()).isEqualTo(traceId); + assertThat(serviceFooSpan.id()).isEqualTo(traceId); + + // The spans that do not cross the network boundary should have the same ID. + assertThat(clientFooSpan.id()).isEqualTo(serviceFooSpan.id()); + assertThat(clientBarSpan.id()).isEqualTo(serviceBarSpan.id()); + assertThat(clientQuxSpan.id()).isEqualTo(serviceQuxSpan.id()); + + // Check the parentIds. + assertThat(clientBarSpan.parentId()).isEqualTo(clientFooSpan.id()); + assertThat(serviceBarSpan.parentId()).isEqualTo(clientFooSpan.id()); + assertThat(clientQuxSpan.parentId()).isEqualTo(clientBarSpan.id()); + assertThat(serviceQuxSpan.parentId()).isEqualTo(clientBarSpan.id()); + + // Check the service names. + assertThat(clientFooSpan.localServiceName()).isEqualTo("client/foo"); + assertThat(serviceFooSpan.localServiceName()).isEqualTo("service/foo"); + assertThat(clientBarSpan.localServiceName()).isEqualTo("client/bar"); + assertThat(serviceBarSpan.localServiceName()).isEqualTo("service/bar"); + assertThat(clientQuxSpan.localServiceName()).isEqualTo("client/qux"); + assertThat(serviceQuxSpan.localServiceName()).isEqualTo("service/qux"); + + // Check the span names. + assertThat(spans).allMatch(s -> "hello".equals(s.name())); + + // Check wire times + final long clientStartTime = clientFooSpan.timestampAsLong(); + final long clientWireSendTime = clientFooSpan.annotations().stream() + .filter(a -> a.value().equals("ws")) + .findFirst().get().timestamp(); + final long clientWireReceiveTime = clientFooSpan.annotations().stream() + .filter(a -> a.value().equals("wr")) + .findFirst().get().timestamp(); + final long clientEndTime = clientStartTime + clientFooSpan.durationAsLong(); + + final long serverStartTime = serviceFooSpan.timestampAsLong(); + final long serverWireSendTime = serviceFooSpan.annotations().stream() + .filter(a -> a.value().equals("ws")) + .findFirst().get().timestamp(); + final long serverWireReceiveTime = serviceFooSpan.annotations().stream() + .filter(a -> a.value().equals("wr")) + .findFirst().get().timestamp(); + final long serverEndTime = serverStartTime + serviceFooSpan.durationAsLong(); + + // These values are taken at microsecond precision and should be reliable to compare to each other. + + // Because of the small deltas among these numbers in a unit test, a thread context switch can cause + // client - server values to not compare correctly. We go ahead and only verify values recorded from the + // same thread. + + assertThat(clientStartTime).isNotZero(); + assertThat(clientWireSendTime).isGreaterThanOrEqualTo(clientStartTime); + assertThat(clientWireReceiveTime).isGreaterThanOrEqualTo(clientWireSendTime); + assertThat(clientEndTime).isGreaterThanOrEqualTo(clientWireReceiveTime); + + // Server start time and wire receive time are essentially the same in our current model, and whether + // one is greater than the other is mostly an implementation detail, so we don't compare them to each + // other. + + assertThat(serverWireSendTime).isGreaterThanOrEqualTo(serverStartTime); + assertThat(serverWireSendTime).isGreaterThanOrEqualTo(serverWireReceiveTime); + assertThat(serverEndTime).isGreaterThanOrEqualTo(serverWireSendTime); + } + + @Test(timeout = 10000) + public void testServiceInitiatedTrace() throws Exception { + assertThat(fooClientWithoutTracing.hello("Lee")).isEqualTo("Hello, Ms. Lee!"); + + final Span[] spans = spanReporter.take(5); + final String traceId = spans[0].traceId(); + assertThat(spans).allMatch(s -> s.traceId().equals(traceId)); + + // Find all spans. + final Span serviceFooSpan = findSpan(spans, "service/foo"); + final Span clientBarSpan = findSpan(spans, "client/bar"); + final Span serviceBarSpan = findSpan(spans, "service/bar"); + final Span clientQuxSpan = findSpan(spans, "client/qux"); + final Span serviceQuxSpan = findSpan(spans, "service/qux"); + + // service/foo should have no parent. + assertThat(serviceFooSpan.parentId()).isNull(); + + // service/foo should have the ID value identical with its traceId. + assertThat(serviceFooSpan.id()).isEqualTo(traceId); + + // The spans that do not cross the network boundary should have the same ID. + assertThat(clientBarSpan.id()).isEqualTo(serviceBarSpan.id()); + assertThat(clientQuxSpan.id()).isEqualTo(serviceQuxSpan.id()); + + // Check the parentIds + assertThat(clientBarSpan.parentId()).isEqualTo(serviceFooSpan.id()); + assertThat(serviceBarSpan.parentId()).isEqualTo(serviceFooSpan.id()); + assertThat(clientQuxSpan.parentId()).isEqualTo(serviceBarSpan.id()); + assertThat(serviceQuxSpan.parentId()).isEqualTo(serviceBarSpan.id()); + + // Check the service names. + assertThat(serviceFooSpan.localServiceName()).isEqualTo("service/foo"); + assertThat(clientBarSpan.localServiceName()).isEqualTo("client/bar"); + assertThat(serviceBarSpan.localServiceName()).isEqualTo("service/bar"); + assertThat(clientQuxSpan.localServiceName()).isEqualTo("client/qux"); + assertThat(serviceQuxSpan.localServiceName()).isEqualTo("service/qux"); + + // Check the span names. + assertThat(spans).allMatch(s -> "hello".equals(s.name())); + } + + @Test(timeout = 10000) + public void testSpanInThreadPoolHasSameTraceId() throws Exception { + poolHttpClient.get("pool").aggregate().get(); + final Span[] spans = spanReporter.take(5); + assertThat(Arrays.stream(spans).map(Span::traceId).collect(toImmutableSet())).hasSize(1); + assertThat(Arrays.stream(spans).map(Span::parentId) + .filter(Objects::nonNull) + .collect(toImmutableSet())).hasSize(1); + } + + @Test(timeout = 10000) + public void testServerTimesOut() throws Exception { + assertThatThrownBy(() -> timeoutClient.hello("name")) + .isInstanceOf(InvalidResponseHeadersException.class); + final Span[] spans = spanReporter.take(2); + + final Span serverSpan = findSpan(spans, "service/timeout"); + final Span clientSpan = findSpan(spans, "client/timeout"); + + // Server timed out meaning it did still send a timeout response to the client and we have all + // annotations. + assertThat(serverSpan.annotations()).hasSize(2); + assertThat(clientSpan.annotations()).hasSize(2); + } + + @Test(timeout = 10000) + public void testClientTimesOut() throws Exception { + assertThatThrownBy(() -> timeoutClientClientTimesOut.hello("name")) + .isInstanceOf(ResponseTimeoutException.class); + final Span[] spans = spanReporter.take(2); + + final Span serverSpan = findSpan(spans, "service/timeout"); + final Span clientSpan = findSpan(spans, "client/timeout"); + + // Client timed out, so no response data was ever sent from the server. There is no wire send in the + // server and no wire receive in the client. + assertThat(serverSpan.annotations()).hasSize(1); + assertThat(clientSpan.annotations()).hasSize(1); + } + + private static Span findSpan(Span[] spans, String serviceName) { + return Arrays.stream(spans) + .filter(s -> serviceName.equals(s.localServiceName())) + .findAny() + .orElseThrow(() -> new AssertionError( + "Can't find a Span with service name: " + serviceName)); + } + + private static class DelegatingCallback implements AsyncMethodCallback { + private final AsyncMethodCallback resultHandler; + + @SuppressWarnings({ "unchecked", "rawtypes" }) + DelegatingCallback(AsyncMethodCallback resultHandler) { + this.resultHandler = resultHandler; + } + + @Override + public void onComplete(String response) { + resultHandler.onComplete(response); + } + + @Override + public void onError(Exception exception) { + resultHandler.onError(exception); + } + } + + private static class ReporterImpl implements Reporter { + private final BlockingQueue spans = new LinkedBlockingQueue<>(); + + @Override + public void report(Span span) { + spans.add(span); + } + + Span[] take(int numSpans) throws InterruptedException { + final List taken = new ArrayList<>(); + while (taken.size() < numSpans) { + taken.add(spans.take()); + } + + // Reverse the collected spans to sort the spans by request time. + Collections.reverse(taken); + return taken.toArray(new Span[numSpans]); + } + } +} diff --git a/brave/src/test/java/com/linecorp/armeria/server/brave/BraveServiceTest.java b/brave/src/test/java/com/linecorp/armeria/server/brave/BraveServiceTest.java new file mode 100644 index 00000000000..690ca42a634 --- /dev/null +++ b/brave/src/test/java/com/linecorp/armeria/server/brave/BraveServiceTest.java @@ -0,0 +1,195 @@ +/* + * Copyright 2019 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.server.brave; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import com.linecorp.armeria.common.HttpHeaderNames; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.RequestHeaders; +import com.linecorp.armeria.common.ResponseHeaders; +import com.linecorp.armeria.common.RpcRequest; +import com.linecorp.armeria.common.RpcResponse; +import com.linecorp.armeria.common.brave.HelloService; +import com.linecorp.armeria.common.brave.RequestContextCurrentTraceContext; +import com.linecorp.armeria.common.brave.SpanCollectingReporter; +import com.linecorp.armeria.common.logging.RequestLogBuilder; +import com.linecorp.armeria.common.util.SafeCloseable; +import com.linecorp.armeria.server.Service; +import com.linecorp.armeria.server.ServiceRequestContext; +import com.linecorp.armeria.server.ServiceRequestContextBuilder; + +import brave.Tracing; +import brave.http.HttpTracing; +import brave.propagation.CurrentTraceContext; +import brave.propagation.CurrentTraceContext.ScopeDecorator; +import brave.sampler.Sampler; +import zipkin2.Span; +import zipkin2.Span.Kind; + +class BraveServiceTest { + + private static final String TEST_SERVICE = "test-service"; + + private static final String TEST_METHOD = "hello"; + + @AfterEach + public void tearDown() { + Tracing.current().close(); + } + + @Test + void newDecorator_shouldFailFastWhenRequestContextCurrentTraceContextNotConfigured() { + assertThatThrownBy(() -> BraveService.newDecorator(HttpTracing.create(Tracing.newBuilder().build()))) + .isInstanceOf(IllegalStateException.class).hasMessage( + "Tracing.currentTraceContext is not a RequestContextCurrentTraceContext scope. " + + "Please call Tracing.Builder.currentTraceContext(RequestContextCurrentTraceContext.DEFAULT)." + ); + } + + @Test + void newDecorator_shouldWorkWhenRequestContextCurrentTraceContextConfigured() { + BraveService.newDecorator( + HttpTracing.create(Tracing.newBuilder() + .currentTraceContext(RequestContextCurrentTraceContext.DEFAULT) + .build())); + } + + @Test + void shouldSubmitSpanWhenRequestIsSampled() throws Exception { + final SpanCollectingReporter reporter = testServiceInvocation( + RequestContextCurrentTraceContext.DEFAULT, 1.0f); + + // check span name + final Span span = reporter.spans().take(); + assertThat(span.name()).isEqualTo(TEST_METHOD); + + // check kind + assertThat(span.kind()).isSameAs(Kind.SERVER); + + // only one span should be submitted + assertThat(reporter.spans().poll(1, TimeUnit.SECONDS)).isNull(); + + // check # of annotations (we add wire annotations) + assertThat(span.annotations()).hasSize(2); + + // check tags + assertTags(span); + + // check service name + assertThat(span.localServiceName()).isEqualTo(TEST_SERVICE); + } + + @Test + void shouldNotSubmitSpanWhenRequestIsNotSampled() throws Exception { + final SpanCollectingReporter reporter = testServiceInvocation( + RequestContextCurrentTraceContext.DEFAULT, 0.0f); + + // don't submit any spans + assertThat(reporter.spans().poll(1, TimeUnit.SECONDS)).isNull(); + } + + @Test + void scopeDecorator() throws Exception { + final AtomicInteger scopeDecoratorCallingCounter = new AtomicInteger(); + final ScopeDecorator scopeDecorator = (currentSpan, scope) -> { + scopeDecoratorCallingCounter.getAndIncrement(); + return scope; + }; + final CurrentTraceContext traceContext = + RequestContextCurrentTraceContext.builder() + .addScopeDecorator(scopeDecorator) + .build(); + + final SpanCollectingReporter reporter = testServiceInvocation(traceContext, 1.0f); + + // check span name + final Span span = reporter.spans().take(); + + // check tags + assertTags(span); + + // check service name + assertThat(span.localServiceName()).isEqualTo(TEST_SERVICE); + assertThat(scopeDecoratorCallingCounter.get()).isEqualTo(1); + } + + private static SpanCollectingReporter testServiceInvocation(CurrentTraceContext traceContext, + float samplingRate) throws Exception { + final SpanCollectingReporter reporter = new SpanCollectingReporter(); + + final HttpTracing httpTracing = HttpTracing.create(Tracing.newBuilder() + .localServiceName(TEST_SERVICE) + .spanReporter(reporter) + .currentTraceContext(traceContext) + .sampler(Sampler.create(samplingRate)) + .build()); + + final HttpRequest req = HttpRequest.of(RequestHeaders.of(HttpMethod.POST, "/hello/trustin", + HttpHeaderNames.AUTHORITY, "foo.com")); + final ServiceRequestContext ctx = ServiceRequestContextBuilder.of(req) + .build(); + final RpcRequest rpcReq = RpcRequest.of(HelloService.Iface.class, "hello", "trustin"); + final HttpResponse res = HttpResponse.of(HttpStatus.OK); + final RpcResponse rpcRes = RpcResponse.of("Hello, trustin!"); + final RequestLogBuilder logBuilder = ctx.logBuilder(); + logBuilder.requestContent(rpcReq, req); + logBuilder.endRequest(); + + try (SafeCloseable ignored = ctx.push()) { + @SuppressWarnings("unchecked") + final Service delegate = mock(Service.class); + final BraveService service = BraveService.newDecorator(httpTracing).apply(delegate); + when(delegate.serve(ctx, req)).thenReturn(res); + + // do invoke + service.serve(ctx, req); + + verify(delegate, times(1)).serve(eq(ctx), eq(req)); + } + + logBuilder.responseHeaders(ResponseHeaders.of(HttpStatus.OK)); + logBuilder.responseFirstBytesTransferred(); + logBuilder.responseContent(rpcRes, res); + logBuilder.endResponse(); + return reporter; + } + + private static void assertTags(Span span) { + assertThat(span.tags()).containsEntry("http.host", "foo.com") + .containsEntry("http.method", "POST") + .containsEntry("http.path", "/hello/trustin") + .containsEntry("http.status_code", "200") + .containsEntry("http.url", "http://foo.com/hello/trustin") + .containsEntry("http.protocol", "h2c"); + } +} diff --git a/brave/src/test/thrift/HelloService.thrift b/brave/src/test/thrift/HelloService.thrift new file mode 100644 index 00000000000..cf37c795dfc --- /dev/null +++ b/brave/src/test/thrift/HelloService.thrift @@ -0,0 +1,5 @@ +namespace java com.linecorp.armeria.common.brave + +service HelloService { + string hello(1:string name) +} diff --git a/settings.gradle b/settings.gradle index 174a294ba59..1964a7be51a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -6,6 +6,7 @@ apply from: "${rootDir}/gradle/scripts/settings-flags.gradle" includeWithFlags ':bom', 'bom' // Published Java projects +includeWithFlags ':brave', 'java', 'publish', 'relocate' includeWithFlags ':core', 'java', 'publish', 'shade', 'trim' includeWithFlags ':rxjava', 'java', 'publish', 'relocate' includeWithFlags ':grpc', 'java', 'publish', 'relocate' diff --git a/site/src/sphinx/advanced-zipkin.rst b/site/src/sphinx/advanced-zipkin.rst index 7b954e1d14c..b2bbf9e06f6 100644 --- a/site/src/sphinx/advanced-zipkin.rst +++ b/site/src/sphinx/advanced-zipkin.rst @@ -3,4 +3,4 @@ Zipkin integration ================== -TBW - See :api:`TracingService` and :api:`TracingClient` +TBW - See :api:`BraveService` and :api:`BraveClient` diff --git a/zipkin/build.gradle b/zipkin/build.gradle index 2d1121ad826..db6c4d8287c 100644 --- a/zipkin/build.gradle +++ b/zipkin/build.gradle @@ -3,5 +3,4 @@ dependencies { // Zipkin compile 'io.zipkin.brave:brave' - compile 'io.zipkin.brave:brave-instrumentation-http' } diff --git a/zipkin/src/main/java/com/linecorp/armeria/client/tracing/HttpTracingClient.java b/zipkin/src/main/java/com/linecorp/armeria/client/tracing/HttpTracingClient.java index 23b9db454c8..81187711306 100644 --- a/zipkin/src/main/java/com/linecorp/armeria/client/tracing/HttpTracingClient.java +++ b/zipkin/src/main/java/com/linecorp/armeria/client/tracing/HttpTracingClient.java @@ -16,11 +16,14 @@ package com.linecorp.armeria.client.tracing; -import static com.linecorp.armeria.client.tracing.TracingClient.checkTracing; +import static com.google.common.base.Strings.isNullOrEmpty; +import static com.linecorp.armeria.common.tracing.RequestContextCurrentTraceContext.ensureScopeUsesRequestContext; +import static java.util.Objects.requireNonNull; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.function.BiConsumer; import java.util.function.Function; import javax.annotation.Nullable; @@ -33,6 +36,7 @@ import com.linecorp.armeria.client.SimpleDecoratingClient; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.common.RequestHeadersBuilder; import com.linecorp.armeria.common.logging.RequestLog; import com.linecorp.armeria.common.logging.RequestLogAvailability; @@ -55,17 +59,17 @@ *

This decorator puts trace data into HTTP headers. The specifications of header names and its values * correspond to Zipkin. * - * @deprecated Use {@link TracingClient}. + * @deprecated Use {@code BraveClient} in the `armeria-brave` dependency. */ @Deprecated -public final class HttpTracingClient extends SimpleDecoratingClient { +public class HttpTracingClient extends SimpleDecoratingClient { private static final Logger logger = LoggerFactory.getLogger(HttpTracingClient.class); /** * Creates a new tracing {@link Client} decorator using the specified {@link Tracing} instance. * - * @deprecated Use {@link TracingClient#newDecorator(Tracing)}. + * @deprecated Use {@code BraveClient#newDecorator(httpTracing)} in the `armeria-brave` dependency. */ @Deprecated public static Function, HttpTracingClient> newDecorator(Tracing tracing) { @@ -76,35 +80,47 @@ public static Function, HttpTracingClient> new * Creates a new tracing {@link Client} decorator using the specified {@link Tracing} instance * and the remote service name. * - * @deprecated Use {@link TracingClient#newDecorator(Tracing, String)}. + * @deprecated Use {@code BraveClient#newDecorator(httpTracing)} in the `armeria-brave` dependency. */ @Deprecated public static Function, HttpTracingClient> newDecorator( Tracing tracing, @Nullable String remoteServiceName) { - checkTracing(tracing); - return delegate -> new HttpTracingClient(delegate, tracing, remoteServiceName); + try { + ensureScopeUsesRequestContext(tracing); + } catch (IllegalStateException e) { + logger.warn("{} - it is appropriate to ignore this warning if this client is not being used " + + "inside an Armeria server (e.g., this is a normal spring-mvc tomcat server).", + e.getMessage()); + } + return delegate -> new HttpTracingClient(delegate, tracing, remoteServiceName, + RequestContextCurrentTraceContext::copy); } private final Tracer tracer; private final TraceContext.Injector injector; @Nullable private final String remoteServiceName; + private final BiConsumer traceContextPropagator; /** * Creates a new instance. */ - HttpTracingClient(Client delegate, Tracing tracing, - @Nullable String remoteServiceName) { + protected HttpTracingClient(Client delegate, Tracing tracing, + @Nullable String remoteServiceName, + BiConsumer traceContextPropagator) { super(delegate); - tracer = tracing.tracer(); + tracer = requireNonNull(tracing, "tracing").tracer(); injector = tracing.propagationFactory().create(AsciiStringKeyFactory.INSTANCE) .injector(RequestHeadersBuilder::set); this.remoteServiceName = remoteServiceName; + this.traceContextPropagator = requireNonNull(traceContextPropagator, "traceContextPropagator"); } @Override public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Exception { + final Tracer tracer = this.tracer; final Span span = tracer.nextSpan(); + final TraceContext.Injector injector = this.injector; // Inject the headers. final RequestHeadersBuilder newHeaders = req.headers().toBuilder(); @@ -123,7 +139,7 @@ public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Ex RequestLogAvailability.REQUEST_START); // Ensure the trace context propagates to children - ctx.onChild(RequestContextCurrentTraceContext::copy); + ctx.onChild(traceContextPropagator); ctx.log().addListener(log -> { SpanTags.logWireSend(span, log.requestFirstBytesTransferredTimeNanos(), log); @@ -158,7 +174,7 @@ private void setRemoteEndpoint(Span span, RequestLog log) { address = null; port = 0; } - if (remoteServiceName != null) { + if (!isNullOrEmpty(remoteServiceName)) { span.remoteServiceName(remoteServiceName); } if (address != null) { diff --git a/zipkin/src/main/java/com/linecorp/armeria/client/tracing/TracingClient.java b/zipkin/src/main/java/com/linecorp/armeria/client/tracing/TracingClient.java deleted file mode 100644 index c0ab3c41bd4..00000000000 --- a/zipkin/src/main/java/com/linecorp/armeria/client/tracing/TracingClient.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Copyright 2019 LINE Corporation - * - * LINE Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.client.tracing; - -import static com.linecorp.armeria.common.tracing.RequestContextCurrentTraceContext.ensureScopeUsesRequestContext; - -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.util.function.Function; - -import javax.annotation.Nullable; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.linecorp.armeria.client.Client; -import com.linecorp.armeria.client.ClientRequestContext; -import com.linecorp.armeria.client.SimpleDecoratingClient; -import com.linecorp.armeria.common.HttpRequest; -import com.linecorp.armeria.common.HttpResponse; -import com.linecorp.armeria.common.RequestHeadersBuilder; -import com.linecorp.armeria.common.logging.RequestLog; -import com.linecorp.armeria.common.logging.RequestLogAvailability; -import com.linecorp.armeria.common.tracing.RequestContextCurrentTraceContext; -import com.linecorp.armeria.internal.tracing.AsciiStringKeyFactory; -import com.linecorp.armeria.internal.tracing.SpanContextUtil; -import com.linecorp.armeria.internal.tracing.SpanTags; - -import brave.Span; -import brave.Span.Kind; -import brave.Tracer; -import brave.Tracer.SpanInScope; -import brave.Tracing; -import brave.http.HttpTracing; -import brave.propagation.TraceContext; - -/** - * Decorates a {@link Client} to trace outbound {@link HttpRequest}s using - * Zipkin. - * - *

This decorator puts trace data into HTTP headers. The specifications of header names and its values - * correspond to Zipkin. - */ -public final class TracingClient extends SimpleDecoratingClient { - - private static final Logger logger = LoggerFactory.getLogger(TracingClient.class); - - /** - * Creates a new tracing {@link Client} decorator using the specified {@link HttpTracing} instance. - */ - public static Function, TracingClient> newDecorator( - HttpTracing httpTracing) { - checkTracing(httpTracing.tracing()); - return delegate -> new TracingClient(delegate, httpTracing); - } - - private final Tracer tracer; - private final TraceContext.Injector injector; - @Nullable - private final String remoteServiceName; - - /** - * Creates a new instance. - */ - private TracingClient(Client delegate, HttpTracing httpTracing) { - super(delegate); - final Tracing tracing = httpTracing.tracing(); - tracer = tracing.tracer(); - injector = tracing.propagationFactory().create(AsciiStringKeyFactory.INSTANCE) - .injector(RequestHeadersBuilder::set); - remoteServiceName = httpTracing.serverName(); - } - - @Override - public HttpResponse execute(ClientRequestContext ctx, HttpRequest req) throws Exception { - final Span span = tracer.nextSpan(); - - // Inject the headers. - final RequestHeadersBuilder newHeaders = req.headers().toBuilder(); - injector.inject(span.context(), newHeaders); - req = HttpRequest.of(req, newHeaders.build()); - ctx.updateRequest(req); - - // For no-op spans, we only need to inject into headers and don't set any other attributes. - if (span.isNoop()) { - return delegate().execute(ctx, req); - } - - final String method = ctx.method().name(); - span.kind(Kind.CLIENT).name(method); - ctx.log().addListener(log -> SpanContextUtil.startSpan(span, log), - RequestLogAvailability.REQUEST_START); - - // Ensure the trace context propagates to children - ctx.onChild(RequestContextCurrentTraceContext::copy); - - ctx.log().addListener(log -> { - SpanTags.logWireSend(span, log.requestFirstBytesTransferredTimeNanos(), log); - - // If the client timed-out the request, we will have never received any response data at all. - if (log.isAvailable(RequestLogAvailability.RESPONSE_FIRST_BYTES_TRANSFERRED)) { - SpanTags.logWireReceive(span, log.responseFirstBytesTransferredTimeNanos(), log); - } - - finishSpan(span, log); - }, RequestLogAvailability.COMPLETE); - - try (SpanInScope ignored = tracer.withSpanInScope(span)) { - return delegate().execute(ctx, req); - } - } - - private void finishSpan(Span span, RequestLog log) { - setRemoteEndpoint(span, log); - SpanContextUtil.closeSpan(span, log); - } - - private void setRemoteEndpoint(Span span, RequestLog log) { - final SocketAddress remoteAddress = log.context().remoteAddress(); - final InetAddress address; - final int port; - if (remoteAddress instanceof InetSocketAddress) { - final InetSocketAddress socketAddress = (InetSocketAddress) remoteAddress; - address = socketAddress.getAddress(); - port = socketAddress.getPort(); - } else { - address = null; - port = 0; - } - if (remoteServiceName != null) { - span.remoteServiceName(remoteServiceName); - } - if (address != null) { - span.remoteIpAndPort(address.getHostAddress(), port); - } - } - - static void checkTracing(Tracing tracing) { - try { - ensureScopeUsesRequestContext(tracing); - } catch (IllegalStateException e) { - logger.warn("{} - it is appropriate to ignore this warning if this client is not being used " + - "inside an Armeria server (e.g., this is a normal spring-mvc tomcat server).", - e.getMessage()); - } - } -} diff --git a/zipkin/src/main/java/com/linecorp/armeria/common/tracing/RequestContextCurrentTraceContext.java b/zipkin/src/main/java/com/linecorp/armeria/common/tracing/RequestContextCurrentTraceContext.java index a44dd35de03..071c76dc91e 100644 --- a/zipkin/src/main/java/com/linecorp/armeria/common/tracing/RequestContextCurrentTraceContext.java +++ b/zipkin/src/main/java/com/linecorp/armeria/common/tracing/RequestContextCurrentTraceContext.java @@ -29,9 +29,9 @@ import com.google.common.annotations.VisibleForTesting; -import com.linecorp.armeria.client.tracing.TracingClient; +import com.linecorp.armeria.client.tracing.HttpTracingClient; import com.linecorp.armeria.common.RequestContext; -import com.linecorp.armeria.server.tracing.TracingService; +import com.linecorp.armeria.server.tracing.HttpTracingService; import brave.Tracing; import brave.propagation.CurrentTraceContext; @@ -46,12 +46,15 @@ *

This {@link CurrentTraceContext} stores/loads the trace context into/from a * {@link RequestContext}'s attribute so that there's no need for thread local variables * which can lead to unpredictable behavior in asynchronous programming. + * + * @deprecated Use the same class in the `armeria-brave` dependency. */ +@Deprecated public final class RequestContextCurrentTraceContext extends CurrentTraceContext { /** * Use this singleton when building a {@link brave.Tracing} instance for use with - * {@link TracingService} or {@link TracingClient}. + * {@link HttpTracingService} or {@link HttpTracingClient}. * *

If you need to customize the context, use {@link #builder()} instead. * diff --git a/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/package-info.java b/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/package-info.java index a3e89565e72..7da23d29849 100644 --- a/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/package-info.java +++ b/zipkin/src/main/java/com/linecorp/armeria/internal/tracing/package-info.java @@ -14,6 +14,7 @@ * under the License. */ +// TODO(minwoox) Move this package into "internal.brave" in "brave" module. /** * Various classes used internally. Anything in this package can be changed or removed at any time. */ diff --git a/zipkin/src/main/java/com/linecorp/armeria/server/tracing/HttpTracingService.java b/zipkin/src/main/java/com/linecorp/armeria/server/tracing/HttpTracingService.java index f551874f7c0..b1afad6f464 100644 --- a/zipkin/src/main/java/com/linecorp/armeria/server/tracing/HttpTracingService.java +++ b/zipkin/src/main/java/com/linecorp/armeria/server/tracing/HttpTracingService.java @@ -17,12 +17,15 @@ package com.linecorp.armeria.server.tracing; import static com.linecorp.armeria.common.tracing.RequestContextCurrentTraceContext.ensureScopeUsesRequestContext; +import static java.util.Objects.requireNonNull; +import java.util.function.BiConsumer; import java.util.function.Function; import com.linecorp.armeria.common.HttpHeaders; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.RequestContext; import com.linecorp.armeria.common.logging.RequestLogAvailability; import com.linecorp.armeria.common.tracing.RequestContextCurrentTraceContext; import com.linecorp.armeria.internal.tracing.AsciiStringKeyFactory; @@ -47,7 +50,7 @@ *

This decorator retrieves trace data from HTTP headers. The specifications of header names and its values * correspond to Zipkin. * - * @deprecated Use {@link TracingService}. + * @deprecated Use {@code BraveService} in the `armeria-brave` dependency. */ @Deprecated public class HttpTracingService extends SimpleDecoratingService { @@ -55,7 +58,7 @@ public class HttpTracingService extends SimpleDecoratingService, HttpTracingService> @@ -66,18 +69,31 @@ public class HttpTracingService extends SimpleDecoratingService extractor; + private final BiConsumer traceContextPropagator; /** * Creates a new instance. * - * @deprecated Use {@link TracingService#newDecorator(Tracing)}. + * @deprecated Use {@code BraveService#newDecorator(httpTracing)} in the `armeria-brave` dependency. */ @Deprecated public HttpTracingService(Service delegate, Tracing tracing) { + this(delegate, tracing, RequestContextCurrentTraceContext::copy); + } + + /** + * Creates a new instance. + * + * @deprecated Use {@code BraveService#newDecorator(httpTracing)} in the `armeria-brave` dependency. + */ + @Deprecated + protected HttpTracingService(Service delegate, Tracing tracing, + BiConsumer traceContextPropagator) { super(delegate); - tracer = tracing.tracer(); + tracer = requireNonNull(tracing, "tracing").tracer(); extractor = tracing.propagationFactory().create(AsciiStringKeyFactory.INSTANCE) .extractor(HttpHeaders::get); + this.traceContextPropagator = requireNonNull(traceContextPropagator, "traceContextPropagator"); } @Override @@ -96,7 +112,7 @@ public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exc RequestLogAvailability.REQUEST_START); // Ensure the trace context propagates to children - ctx.onChild(RequestContextCurrentTraceContext::copy); + ctx.onChild(traceContextPropagator); ctx.log().addListener(log -> { SpanTags.logWireReceive(span, log.requestFirstBytesTransferredTimeNanos(), log); diff --git a/zipkin/src/main/java/com/linecorp/armeria/server/tracing/TracingService.java b/zipkin/src/main/java/com/linecorp/armeria/server/tracing/TracingService.java deleted file mode 100644 index 81e1bf79971..00000000000 --- a/zipkin/src/main/java/com/linecorp/armeria/server/tracing/TracingService.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2019 LINE Corporation - * - * LINE Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.armeria.server.tracing; - -import static com.linecorp.armeria.common.tracing.RequestContextCurrentTraceContext.ensureScopeUsesRequestContext; - -import java.util.function.Function; - -import com.linecorp.armeria.common.HttpHeaders; -import com.linecorp.armeria.common.HttpRequest; -import com.linecorp.armeria.common.HttpResponse; -import com.linecorp.armeria.common.logging.RequestLogAvailability; -import com.linecorp.armeria.common.tracing.RequestContextCurrentTraceContext; -import com.linecorp.armeria.internal.tracing.AsciiStringKeyFactory; -import com.linecorp.armeria.internal.tracing.SpanContextUtil; -import com.linecorp.armeria.internal.tracing.SpanTags; -import com.linecorp.armeria.server.Service; -import com.linecorp.armeria.server.ServiceRequestContext; -import com.linecorp.armeria.server.SimpleDecoratingService; - -import brave.Span; -import brave.Span.Kind; -import brave.Tracer; -import brave.Tracer.SpanInScope; -import brave.Tracing; -import brave.propagation.TraceContext; -import brave.propagation.TraceContextOrSamplingFlags; - -/** - * Decorates a {@link Service} to trace inbound {@link HttpRequest}s using - * Zipkin. - * - *

This decorator retrieves trace data from HTTP headers. The specifications of header names and its values - * correspond to Zipkin. - */ -public final class TracingService extends SimpleDecoratingService { - - /** - * Creates a new tracing {@link Service} decorator using the specified {@link Tracing} instance. - */ - public static Function, TracingService> newDecorator(Tracing tracing) { - ensureScopeUsesRequestContext(tracing); - return service -> new TracingService(service, tracing); - } - - private final Tracer tracer; - private final TraceContext.Extractor extractor; - - /** - * Creates a new instance. - */ - private TracingService(Service delegate, Tracing tracing) { - super(delegate); - tracer = tracing.tracer(); - extractor = tracing.propagationFactory().create(AsciiStringKeyFactory.INSTANCE) - .extractor(HttpHeaders::get); - } - - @Override - public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exception { - final TraceContextOrSamplingFlags contextOrFlags = extractor.extract(req.headers()); - final Span span = contextOrFlags.context() != null ? tracer.joinSpan(contextOrFlags.context()) - : tracer.nextSpan(contextOrFlags); - // For no-op spans, nothing special to do. - if (span.isNoop()) { - return delegate().serve(ctx, req); - } - - final String method = ctx.method().name(); - span.kind(Kind.SERVER).name(method); - ctx.log().addListener(log -> SpanContextUtil.startSpan(span, log), - RequestLogAvailability.REQUEST_START); - - // Ensure the trace context propagates to children - ctx.onChild(RequestContextCurrentTraceContext::copy); - - ctx.log().addListener(log -> { - SpanTags.logWireReceive(span, log.requestFirstBytesTransferredTimeNanos(), log); - - // If the client timed-out the request, we will have never sent any response data at all. - if (log.isAvailable(RequestLogAvailability.RESPONSE_FIRST_BYTES_TRANSFERRED)) { - SpanTags.logWireSend(span, log.responseFirstBytesTransferredTimeNanos(), log); - } - - SpanContextUtil.closeSpan(span, log); - }, RequestLogAvailability.COMPLETE); - - try (SpanInScope ignored = tracer.withSpanInScope(span)) { - return delegate().serve(ctx, req); - } - } -} diff --git a/zipkin/src/test/java/com/linecorp/armeria/client/tracing/TracingClientTest.java b/zipkin/src/test/java/com/linecorp/armeria/client/tracing/TracingClientTest.java index 2f9625774d1..e698b93db3e 100644 --- a/zipkin/src/test/java/com/linecorp/armeria/client/tracing/TracingClientTest.java +++ b/zipkin/src/test/java/com/linecorp/armeria/client/tracing/TracingClientTest.java @@ -71,12 +71,12 @@ void tearDown() { @Test void newDecorator_shouldWorkWhenRequestContextCurrentTraceContextNotConfigured() { - TracingClient.newDecorator(Tracing.newBuilder().build()); + HttpTracingClient.newDecorator(Tracing.newBuilder().build()); } @Test void newDecorator_shouldWorkWhenRequestContextCurrentTraceContextConfigured() { - TracingClient.newDecorator( + HttpTracingClient.newDecorator( Tracing.newBuilder().currentTraceContext(RequestContextCurrentTraceContext.DEFAULT).build()); } @@ -211,7 +211,8 @@ private static void testRemoteInvocation(Tracing tracing, @Nullable String remot final Client delegate = mock(Client.class); when(delegate.execute(any(), any())).thenReturn(res); - final TracingClient stub = TracingClient.newDecorator(tracing, remoteServiceName).apply(delegate); + final HttpTracingClient stub = HttpTracingClient.newDecorator(tracing, remoteServiceName) + .apply(delegate); // do invoke final HttpResponse actualRes = stub.execute(ctx, req); diff --git a/zipkin/src/test/java/com/linecorp/armeria/it/tracing/HttpTracingIntegrationTest.java b/zipkin/src/test/java/com/linecorp/armeria/it/tracing/HttpTracingIntegrationTest.java index d9a94e71f09..2e30bffbce0 100644 --- a/zipkin/src/test/java/com/linecorp/armeria/it/tracing/HttpTracingIntegrationTest.java +++ b/zipkin/src/test/java/com/linecorp/armeria/it/tracing/HttpTracingIntegrationTest.java @@ -53,7 +53,7 @@ import com.linecorp.armeria.client.HttpClient; import com.linecorp.armeria.client.InvalidResponseHeadersException; import com.linecorp.armeria.client.ResponseTimeoutException; -import com.linecorp.armeria.client.tracing.TracingClient; +import com.linecorp.armeria.client.tracing.HttpTracingClient; import com.linecorp.armeria.common.HttpRequest; import com.linecorp.armeria.common.HttpResponse; import com.linecorp.armeria.common.MediaType; @@ -67,7 +67,7 @@ import com.linecorp.armeria.server.Service; import com.linecorp.armeria.server.ServiceRequestContext; import com.linecorp.armeria.server.thrift.THttpService; -import com.linecorp.armeria.server.tracing.TracingService; +import com.linecorp.armeria.server.tracing.HttpTracingService; import com.linecorp.armeria.testing.junit4.server.ServerRule; import brave.ScopedSpan; @@ -181,27 +181,28 @@ protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) sb.service("/timeout", decorate("service/timeout", THttpService.of( // This service never calls the handler and will timeout. - (AsyncIface) (name, resultHandler) -> {}))); + (AsyncIface) (name, resultHandler) -> { + }))); } }; @Before public void setupClients() { fooClient = new ClientBuilder(server.uri(BINARY, "/foo")) - .decorator(TracingClient.newDecorator(newTracing("client/foo"))) + .decorator(HttpTracingClient.newDecorator(newTracing("client/foo"))) .build(HelloService.Iface.class); zipClient = new ClientBuilder(server.uri(BINARY, "/zip")) - .decorator(TracingClient.newDecorator(newTracing("client/zip"))) + .decorator(HttpTracingClient.newDecorator(newTracing("client/zip"))) .build(HelloService.Iface.class); fooClientWithoutTracing = Clients.newClient(server.uri(BINARY, "/foo"), HelloService.Iface.class); barClient = newClient("/bar"); quxClient = newClient("/qux"); poolHttpClient = HttpClient.of(server.uri("/")); timeoutClient = new ClientBuilder(server.uri(BINARY, "/timeout")) - .decorator(TracingClient.newDecorator(newTracing("client/timeout"))) + .decorator(HttpTracingClient.newDecorator(newTracing("client/timeout"))) .build(HelloService.Iface.class); timeoutClientClientTimesOut = new ClientBuilder(server.uri(BINARY, "/timeout")) - .decorator(TracingClient.newDecorator(newTracing("client/timeout"))) + .decorator(HttpTracingClient.newDecorator(newTracing("client/timeout"))) .responseTimeout(Duration.ofMillis(10)) .build(HelloService.Iface.class); } @@ -216,13 +217,13 @@ public void shouldHaveNoExtraSpans() { assertThat(spanReporter.spans).isEmpty(); } - private static TracingService decorate(String name, Service service) { - return TracingService.newDecorator(newTracing(name)).apply(service); + private static HttpTracingService decorate(String name, Service service) { + return HttpTracingService.newDecorator(newTracing(name)).apply(service); } private HelloService.AsyncIface newClient(String path) { return new ClientBuilder(server.uri(BINARY, path)) - .decorator(TracingClient.newDecorator(newTracing("client" + path))) + .decorator(HttpTracingClient.newDecorator(newTracing("client" + path))) .build(HelloService.AsyncIface.class); } diff --git a/zipkin/src/test/java/com/linecorp/armeria/server/tracing/TracingServiceTest.java b/zipkin/src/test/java/com/linecorp/armeria/server/tracing/TracingServiceTest.java index 7c7238d2b2e..f2e8a95d08e 100644 --- a/zipkin/src/test/java/com/linecorp/armeria/server/tracing/TracingServiceTest.java +++ b/zipkin/src/test/java/com/linecorp/armeria/server/tracing/TracingServiceTest.java @@ -68,7 +68,7 @@ public void tearDown() { @Test void newDecorator_shouldFailFastWhenRequestContextCurrentTraceContextNotConfigured() { - assertThatThrownBy(() -> TracingService.newDecorator(Tracing.newBuilder().build())) + assertThatThrownBy(() -> HttpTracingService.newDecorator(Tracing.newBuilder().build())) .isInstanceOf(IllegalStateException.class).hasMessage( "Tracing.currentTraceContext is not a RequestContextCurrentTraceContext scope. " + "Please call Tracing.Builder.currentTraceContext(RequestContextCurrentTraceContext.DEFAULT)." @@ -77,7 +77,7 @@ void newDecorator_shouldFailFastWhenRequestContextCurrentTraceContextNotConfigur @Test void newDecorator_shouldWorkWhenRequestContextCurrentTraceContextConfigured() { - TracingService.newDecorator( + HttpTracingService.newDecorator( Tracing.newBuilder().currentTraceContext(RequestContextCurrentTraceContext.DEFAULT).build()); } @@ -165,7 +165,7 @@ private static SpanCollectingReporter testServiceInvocation(CurrentTraceContext try (SafeCloseable ignored = ctx.push()) { @SuppressWarnings("unchecked") final Service delegate = mock(Service.class); - final TracingService service = TracingService.newDecorator(tracing).apply(delegate); + final HttpTracingService service = HttpTracingService.newDecorator(tracing).apply(delegate); when(delegate.serve(ctx, req)).thenReturn(res); // do invoke