Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make zipkin's current context can be nested #1262

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.linecorp.armeria.common.logging.RequestLogAvailability;
import com.linecorp.armeria.internal.tracing.AsciiStringKeyFactory;
import com.linecorp.armeria.internal.tracing.SpanContextUtil;
import com.linecorp.armeria.internal.tracing.SpanInScopeWrapper;

import brave.Span;
import brave.Span.Kind;
Expand All @@ -52,7 +53,7 @@
*/
public class HttpTracingClient extends SimpleDecoratingClient<HttpRequest, HttpResponse> {

private static final FastThreadLocal<SpanInScope> SPAN_IN_THREAD = new FastThreadLocal<>();
private static final FastThreadLocal<SpanInScopeWrapper> SPAN_IN_THREAD = new FastThreadLocal<>();

/**
* Creates a new tracing {@link Client} decorator using the specified {@link Tracing} instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,23 @@ public final class SpanContextUtil {
/**
* Sets up the {@link RequestContext} to push and pop the {@link Span} whenever it is entered/exited.
*/
public static void setupContext(FastThreadLocal<SpanInScope> threadLocalSpan, RequestContext ctx, Span span,
Tracer tracer) {
ctx.onEnter(unused -> threadLocalSpan.set(tracer.withSpanInScope(span)));
public static void setupContext(FastThreadLocal<SpanInScopeWrapper> threadLocalSpan, RequestContext ctx,
Span span, Tracer tracer) {
ctx.onEnter(unused -> {
final SpanInScopeWrapper current = threadLocalSpan.get();
final SpanInScope newScope = tracer.withSpanInScope(span);
threadLocalSpan.set(new SpanInScopeWrapper(newScope, current));
});
ctx.onExit(unused -> {
final SpanInScope spanInScope = threadLocalSpan.get();
if (spanInScope != null) {
spanInScope.close();
final SpanInScopeWrapper spanInScope = threadLocalSpan.get();
if (spanInScope == null) {
return;
}
spanInScope.close();
final SpanInScopeWrapper previousScope = spanInScope.previous();
if (previousScope != null) {
threadLocalSpan.set(previousScope);
} else {
threadLocalSpan.remove();
}
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.internal.tracing;

import javax.annotation.Nullable;

import brave.Tracer.SpanInScope;

/**
* {@link SpanInScope} wrapper that keeps the previous {@link SpanInScope}.
*/
public final class SpanInScopeWrapper implements AutoCloseable {

private final SpanInScope spanInScope;

@Nullable
private final SpanInScopeWrapper previous;

public SpanInScopeWrapper(SpanInScope current, @Nullable SpanInScopeWrapper previous) {
this.spanInScope = current;
this.previous = previous;
}

@Nullable
public SpanInScopeWrapper previous() {
return previous;
}

@Override
public void close() {
spanInScope.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.linecorp.armeria.common.logging.RequestLogAvailability;
import com.linecorp.armeria.internal.tracing.AsciiStringKeyFactory;
import com.linecorp.armeria.internal.tracing.SpanContextUtil;
import com.linecorp.armeria.internal.tracing.SpanInScopeWrapper;
import com.linecorp.armeria.server.Service;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.SimpleDecoratingService;
Expand All @@ -46,7 +47,7 @@
*/
public class HttpTracingService extends SimpleDecoratingService<HttpRequest, HttpResponse> {

private static final FastThreadLocal<SpanInScope> SPAN_IN_THREAD = new FastThreadLocal<>();
private static final FastThreadLocal<SpanInScopeWrapper> SPAN_IN_THREAD = new FastThreadLocal<>();

/**
* Creates a new tracing {@link Service} decorator using the specified {@link Tracing} instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@
import static com.linecorp.armeria.common.thrift.ThriftSerializationFormats.BINARY;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.IntStream;

import org.apache.thrift.async.AsyncMethodCallback;
Expand Down Expand Up @@ -68,7 +64,6 @@
import brave.propagation.CurrentTraceContext;
import brave.sampler.Sampler;
import zipkin2.Span;
import zipkin2.reporter.Reporter;

public class HttpTracingIntegrationTest {

Expand Down Expand Up @@ -162,7 +157,7 @@ public void setupClients() {
.build(HelloService.Iface.class);
zipClient = new ClientBuilder(server.uri(BINARY, "/zip"))
.decorator(HttpRequest.class, HttpResponse.class,
HttpTracingClient.newDecorator(newTracing("client/zip")))
HttpTracingClient.newDecorator(newTracing("client/zip")))
.build(HelloService.Iface.class);
fooClientWithoutTracing = Clients.newClient(server.uri(BINARY, "/foo"), HelloService.Iface.class);
barClient = newClient("/bar");
Expand All @@ -177,7 +172,7 @@ public void tearDown() {

@After
public void shouldHaveNoExtraSpans() {
assertThat(spanReporter.spans).isEmpty();
assertThat(spanReporter.getSpans()).isEmpty();
}

private static HttpTracingService decorate(String name, Service<HttpRequest, HttpResponse> service) {
Expand Down Expand Up @@ -331,24 +326,4 @@ public void onError(Exception exception) {
resultHandler.onError(exception);
}
}

private static class ReporterImpl implements Reporter<Span> {
private final BlockingQueue<Span> spans = new LinkedBlockingQueue<>();

@Override
public void report(Span span) {
spans.add(span);
}

Span[] take(int numSpans) throws InterruptedException {
final List<Span> taken = new ArrayList<>();
while (taken.size() < numSpans) {
taken.add(spans.take());
}

// Reverse the collected spans to sort the spans by request time.
Collections.reverse(taken);
return taken.toArray(new Span[numSpans]);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.it.tracing;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
import static com.linecorp.armeria.common.HttpStatus.OK;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.with;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.common.AggregatedHttpMessage;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.server.AbstractHttpService;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.Service;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.tracing.HttpTracingService;
import com.linecorp.armeria.testing.server.ServerRule;

import brave.Tracing;
import brave.propagation.CurrentTraceContext;
import brave.sampler.Sampler;
import zipkin2.Span;

public class HttpTracingNestedContextIntegrationTest {

private static final ReporterImpl spanReporter = new ReporterImpl();

private HttpClient poolHttpClient;

private final CountDownLatch waitCreateCache = new CountDownLatch(1);

@Rule
public final ServerRule server = new ServerRule() {
@Override
protected void configure(ServerBuilder sb) throws Exception {

final CountDownLatch countDownLatch = new CountDownLatch(2);
final AtomicReference<CompletableFuture<HttpStatus>> cache = new AtomicReference<>();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use caffeine, so this is similar situation.


sb.service("/non-trace", new AbstractHttpService() {
@Override
protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req)
throws Exception {
if (Tracing.currentTracer().currentSpan() != null) {
return HttpResponse.of(INTERNAL_SERVER_ERROR);
}
return HttpResponse.of(OK);
}
});

sb.service("/create-cache", decorate("service/create-cache", new AbstractHttpService() {
@Override
protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req)
throws Exception {
final CompletableFuture<HttpStatus> future = CompletableFuture.supplyAsync(() -> {
try {
countDownLatch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return OK;
}, RequestContext.current().contextAwareEventLoop());
cache.set(future);
waitCreateCache.countDown();
return HttpResponse.from(future.thenApply(status -> {
if (Tracing.currentTracer().currentSpan() == null) {
return HttpResponse.of(INTERNAL_SERVER_ERROR);
}
return HttpResponse.of(status);
}));
}
}));

sb.service("/read-cache", decorate("service/read-cache", new AbstractHttpService() {
@Override
protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) throws Exception {
try {
final RequestContext requestContext = RequestContext.current();
return HttpResponse.from(
cache.get().thenApply(status -> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or the application dev must always know using thenApplyAsync with current evetloop.

try (SafeCloseable ignored = RequestContext.push(requestContext)) {
Copy link
Collaborator

@anuraaga anuraaga Jun 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, the example seems to have two unrelated server context's on the same request context stack. Is this allowed by the semantics of RequestContext? We have features like RequestContext.onChild, which I don't think support this sort of behavior, pushing a context here is like saying we're creating a new child context, so the unrelated "parent"'s onChildCallbacks would run too from what I can tell.

Copy link
Member Author

@kojilin kojilin Jun 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, right. Didn't notice about #onChild.
Also do you think I should use propagateContextIfNotPresent in here too? https://github.com/line/armeria/blob/master/rxjava/src/main/java/com/linecorp/armeria/common/rxjava/RequestContextCompletable.java#L38

Copy link
Collaborator

@anuraaga anuraaga Jun 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah going with propagate if not present seems safer for that sort of callback, so while new contexts don't get set up properly, at least old contexts won't be affected poorly which I guess is usually better. Maybe some sort of logging could be added for such a case.

FWIW, the reason I added this comment a long time ago is because in my experience it's extremely difficult if not impossible to get "context" to work in asynchronous code in Java (which can only try mapping them to ThreadLocal) without jumping threads. Perhaps we could call this out in the documentation better - wonder what others' thoughts are

https://github.com/line/armeria/blob/master/core/src/main/java/com/linecorp/armeria/common/RequestContext.java#L271

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it's so easy to forget using Async callback so thats why we want to use this kind of hook for chaining sync/async operations.

I will close this PR and try to make RxJava hook strictly.

if (Tracing.currentTracer().currentSpan() == null) {
return HttpResponse.of(INTERNAL_SERVER_ERROR);
}
return HttpResponse.of(status);
}
}));
} finally {
countDownLatch.countDown();
}
}
}));
}
};

@Before
public void setupClients() {
poolHttpClient = HttpClient.of(server.uri("/"));
}

@After
public void tearDown() {
Tracing.current().close();
}

@After
public void shouldHaveNoExtraSpans() {
assertThat(spanReporter.getSpans()).isEmpty();
}

private static HttpTracingService decorate(String name, Service<HttpRequest, HttpResponse> service) {
return HttpTracingService.newDecorator(newTracing(name)).apply(service);
}

private static Tracing newTracing(String name) {
return Tracing.newBuilder()
.currentTraceContext(CurrentTraceContext.Default.create())
.localServiceName(name)
.spanReporter(spanReporter)
.sampler(Sampler.ALWAYS_SAMPLE)
.build();
}

@Test(timeout = 20000)
public void testNestedRequestContext() throws Exception {
final CompletableFuture<AggregatedHttpMessage> create = poolHttpClient.get("/create-cache").aggregate();
waitCreateCache.await(3, SECONDS);
final CompletableFuture<AggregatedHttpMessage> read1 = poolHttpClient.get("/read-cache").aggregate();
final CompletableFuture<AggregatedHttpMessage> read2 = poolHttpClient.get("/read-cache").aggregate();

assertThat(create.get().status()).isEqualTo(OK);
assertThat(read1.get().status()).isEqualTo(OK);
assertThat(read2.get().status()).isEqualTo(OK);

final Span[] spans = spanReporter.take(3);
assertThat(Arrays.stream(spans).map(Span::traceId).collect(toImmutableSet())).hasSize(3);

try {
with().pollInterval(10, MILLISECONDS)
.then()
.atMost(10, SECONDS)
.untilAsserted(
() -> assertThat(poolHttpClient.get("/non-trace").aggregate().get().status())
.isEqualTo(INTERNAL_SERVER_ERROR));
fail("There is a leaked context.");
} catch (ConditionTimeoutException ignored) {
}
}
}