Skip to content

Commit

Permalink
Append test.
Browse files Browse the repository at this point in the history
  • Loading branch information
kojilin committed Jun 18, 2018
1 parent a26697c commit f0579f4
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static void setupContext(FastThreadLocal<SpanInScopeWrapper> threadLocalS
return;
}
spanInScope.close();
final SpanInScopeWrapper previousScope = spanInScope.getPrevious();
final SpanInScopeWrapper previousScope = spanInScope.previous();
if (previousScope != null) {
threadLocalSpan.set(previousScope);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import brave.Tracer.SpanInScope;

/**
* SpanInScope Wrapper for keeping previous span scope.
* {@link SpanInScope} wrapper that keeps the previous {@link SpanInScope}.
*/
public final class SpanInScopeWrapper implements AutoCloseable {

Expand All @@ -36,7 +36,7 @@ public SpanInScopeWrapper(SpanInScope current, @Nullable SpanInScopeWrapper prev
}

@Nullable
public SpanInScopeWrapper getPrevious() {
public SpanInScopeWrapper previous() {
return previous;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@
import static com.linecorp.armeria.common.thrift.ThriftSerializationFormats.BINARY;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.IntStream;

import org.apache.thrift.async.AsyncMethodCallback;
Expand Down Expand Up @@ -68,7 +64,6 @@
import brave.propagation.CurrentTraceContext;
import brave.sampler.Sampler;
import zipkin2.Span;
import zipkin2.reporter.Reporter;

public class HttpTracingIntegrationTest {

Expand Down Expand Up @@ -162,7 +157,7 @@ public void setupClients() {
.build(HelloService.Iface.class);
zipClient = new ClientBuilder(server.uri(BINARY, "/zip"))
.decorator(HttpRequest.class, HttpResponse.class,
HttpTracingClient.newDecorator(newTracing("client/zip")))
HttpTracingClient.newDecorator(newTracing("client/zip")))
.build(HelloService.Iface.class);
fooClientWithoutTracing = Clients.newClient(server.uri(BINARY, "/foo"), HelloService.Iface.class);
barClient = newClient("/bar");
Expand All @@ -177,7 +172,7 @@ public void tearDown() {

@After
public void shouldHaveNoExtraSpans() {
assertThat(spanReporter.spans).isEmpty();
assertThat(spanReporter.getSpans()).isEmpty();
}

private static HttpTracingService decorate(String name, Service<HttpRequest, HttpResponse> service) {
Expand Down Expand Up @@ -332,23 +327,4 @@ public void onError(Exception exception) {
}
}

private static class ReporterImpl implements Reporter<Span> {
private final BlockingQueue<Span> spans = new LinkedBlockingQueue<>();

@Override
public void report(Span span) {
spans.add(span);
}

Span[] take(int numSpans) throws InterruptedException {
final List<Span> taken = new ArrayList<>();
while (taken.size() < numSpans) {
taken.add(spans.take());
}

// Reverse the collected spans to sort the spans by request time.
Collections.reverse(taken);
return taken.toArray(new Span[numSpans]);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.it.tracing;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;
import static com.linecorp.armeria.common.HttpStatus.OK;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.with;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.common.AggregatedHttpMessage;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.util.SafeCloseable;
import com.linecorp.armeria.server.AbstractHttpService;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.Service;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.tracing.HttpTracingService;
import com.linecorp.armeria.testing.server.ServerRule;

import brave.Tracing;
import brave.propagation.CurrentTraceContext;
import brave.sampler.Sampler;
import zipkin2.Span;

public class HttpTracingNestedContextIntegrationTest {

private static final ReporterImpl spanReporter = new ReporterImpl();

private HttpClient poolHttpClient;

private final CountDownLatch waitCreateCache = new CountDownLatch(1);

@Rule
public final ServerRule server = new ServerRule() {
@Override
protected void configure(ServerBuilder sb) throws Exception {

final CountDownLatch countDownLatch = new CountDownLatch(2);
final AtomicReference<CompletableFuture<HttpStatus>> cache = new AtomicReference<>();

sb.service("/non-trace", new AbstractHttpService() {
@Override
protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req)
throws Exception {
if (Tracing.currentTracer().currentSpan() != null) {
return HttpResponse.of(INTERNAL_SERVER_ERROR);
}
return HttpResponse.of(OK);
}
});

sb.service("/create-cache", decorate("service/create-cache", new AbstractHttpService() {
@Override
protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req)
throws Exception {
final CompletableFuture<HttpStatus> future = CompletableFuture.supplyAsync(() -> {
try {
countDownLatch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return OK;
}, RequestContext.current().contextAwareEventLoop());
cache.set(future);
waitCreateCache.countDown();
return HttpResponse.from(future.thenApply(status -> {
if (Tracing.currentTracer().currentSpan() == null) {
return HttpResponse.of(INTERNAL_SERVER_ERROR);
}
return HttpResponse.of(status);
}));
}
}));

sb.service("/read-cache", decorate("service/read-cache", new AbstractHttpService() {
@Override
protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) throws Exception {
try {
final RequestContext requestContext = RequestContext.current();
return HttpResponse.from(
cache.get().thenApply(status -> {
try (SafeCloseable ignored = RequestContext.push(requestContext)) {
if (Tracing.currentTracer().currentSpan() == null) {
return HttpResponse.of(INTERNAL_SERVER_ERROR);
}
return HttpResponse.of(status);
}
}));
} finally {
countDownLatch.countDown();
}
}
}));
}
};

@Before
public void setupClients() {
poolHttpClient = HttpClient.of(server.uri("/"));
}

@After
public void tearDown() {
Tracing.current().close();
}

@After
public void shouldHaveNoExtraSpans() {
assertThat(spanReporter.getSpans()).isEmpty();
}

private static HttpTracingService decorate(String name, Service<HttpRequest, HttpResponse> service) {
return HttpTracingService.newDecorator(newTracing(name)).apply(service);
}

private static Tracing newTracing(String name) {
return Tracing.newBuilder()
.currentTraceContext(CurrentTraceContext.Default.create())
.localServiceName(name)
.spanReporter(spanReporter)
.sampler(Sampler.ALWAYS_SAMPLE)
.build();
}

@Test(timeout = 20000)
public void testNestedRequestContext() throws Exception {
final CompletableFuture<AggregatedHttpMessage> create = poolHttpClient.get("/create-cache").aggregate();
waitCreateCache.await(3, SECONDS);
final CompletableFuture<AggregatedHttpMessage> read1 = poolHttpClient.get("/read-cache").aggregate();
final CompletableFuture<AggregatedHttpMessage> read2 = poolHttpClient.get("/read-cache").aggregate();

assertThat(create.get().status()).isEqualTo(OK);
assertThat(read1.get().status()).isEqualTo(OK);
assertThat(read2.get().status()).isEqualTo(OK);

final Span[] spans = spanReporter.take(3);
assertThat(Arrays.stream(spans).map(Span::traceId).collect(toImmutableSet())).hasSize(3);

try {
with().pollInterval(10, MILLISECONDS)
.then()
.atMost(10, SECONDS)
.untilAsserted(
() -> assertThat(poolHttpClient.get("/non-trace").aggregate().get().status())
.isEqualTo(INTERNAL_SERVER_ERROR));
fail("There is a leaked context.");
} catch (ConditionTimeoutException ignored) {
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2018 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.it.tracing;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import zipkin2.Span;
import zipkin2.reporter.Reporter;

class ReporterImpl implements Reporter<Span> {

private final BlockingQueue<Span> spans = new LinkedBlockingQueue<>();

@Override
public void report(Span span) {
spans.add(span);
}

BlockingQueue<Span> getSpans() {
return spans;
}

Span[] take(int numSpans) throws InterruptedException {
final List<Span> taken = new ArrayList<>();
while (taken.size() < numSpans) {
taken.add(spans.take());
}

// Reverse the collected spans to sort the spans by request time.
Collections.reverse(taken);
return taken.toArray(new Span[numSpans]);
}
}

0 comments on commit f0579f4

Please sign in to comment.