Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from okhttp to armeria #2653

Merged
merged 27 commits into from
Jul 14, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e2314d0
Migrate more
anuraaga Jul 5, 2019
26eb587
Migrate more
anuraaga Jul 5, 2019
e476ab6
Migrate some tests.
anuraaga Jul 5, 2019
2e9e1f1
Finish migrating tests
anuraaga Jul 8, 2019
7b3031b
Update zipkin-server
anuraaga Jul 9, 2019
f0d6f72
Remove okhttp dependency
anuraaga Jul 9, 2019
3e844b9
benchmarks compile
anuraaga Jul 9, 2019
65c543e
Allow customizing connect timeout
anuraaga Jul 10, 2019
4090c8f
Integration tests pass.
anuraaga Jul 10, 2019
4a9f9de
Fix flake?
anuraaga Jul 10, 2019
d96d9ee
Disable HTTP/2 preface by default since elasticsearch v7 doesn't supp…
anuraaga Jul 10, 2019
fa39b6f
Workaround Armeria server broken when disabling HTTP/2 preface.
anuraaga Jul 11, 2019
f1ccb11
Missed one spot.
anuraaga Jul 11, 2019
b44792b
Fix ReadBuffer and do DNS round robin.
anuraaga Jul 11, 2019
88cc4c3
Update README some more.
anuraaga Jul 11, 2019
043b7b1
Wait for load balanced endpoints when starting.
anuraaga Jul 11, 2019
9eb0412
Log bodies.
anuraaga Jul 12, 2019
851d294
Copyright
anuraaga Jul 12, 2019
59eef1e
Fix decorator constructor.
anuraaga Jul 12, 2019
bb8143c
Fix health endpoint is blocking the event loop.
anuraaga Jul 12, 2019
b38f760
Don't use pooled buffer for span indexing since we copy back right aw…
anuraaga Jul 12, 2019
71e6e7f
Unregister endpoint group when closing.
anuraaga Jul 12, 2019
e4cfc4e
Work around bad interaction between recreating Storage object and Arm…
anuraaga Jul 12, 2019
0b3f903
Fuzz
anuraaga Jul 12, 2019
c2d4b30
Small cleanup
anuraaga Jul 12, 2019
76263f9
Don't reinitialize storage so much during IT.
anuraaga Jul 12, 2019
c1d3eac
Better toString
anuraaga Jul 13, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ public class BulkRequestBenchmarks {
@Benchmark public void buildAndWriteRequest_singleSpan() throws IOException {
BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
builder.build().call.request().body().writeTo(Okio.buffer(Okio.blackhole()));
Okio.buffer(Okio.blackhole()).write(builder.build().request.content().array());
}

@Benchmark public void buildAndWriteRequest_tenSpans() throws IOException {
BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
for (int i = 0; i < 10; i++) {
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
}
builder.build().call.request().body().writeTo(Okio.buffer(Okio.blackhole()));
Okio.buffer(Okio.blackhole()).write(builder.build().request.content().array());
}

// Convenience main entry-point
Expand Down
22 changes: 6 additions & 16 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,6 @@
<version>${project.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>logging-interceptor</artifactId>
<version>${okhttp.version}</version>
<optional>true</optional>
</dependency>

<!-- MySQL backend -->
<dependency>
Expand Down Expand Up @@ -295,6 +284,12 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.linecorp.armeria</groupId>
<artifactId>armeria-testing-junit4</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand All @@ -309,11 +304,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,73 @@
*/
package zipkin2.server.internal.elasticsearch;

import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import com.linecorp.armeria.client.Client;
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.SimpleDecoratingClient;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.util.Exceptions;
import com.squareup.moshi.JsonReader;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import okhttp3.Credentials;
import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import okio.Okio;

import static zipkin2.elasticsearch.internal.JsonReaders.enterPath;

/**
* Adds basic auth username and password to every request per
* https://www.elastic.co/guide/en/x-pack/current/how-security-works.html
*/
final class BasicAuthInterceptor implements Interceptor {
final class BasicAuthInterceptor extends SimpleDecoratingClient<HttpRequest, HttpResponse> {

private String basicCredentials;

BasicAuthInterceptor(ZipkinElasticsearchStorageProperties es) {
basicCredentials = Credentials.basic(es.getUsername(), es.getPassword());
BasicAuthInterceptor(
Client<HttpRequest, HttpResponse> client,
ZipkinElasticsearchStorageProperties es) {
super(client);
String token = es.getUsername() + ':' + es.getPassword();
basicCredentials = Base64.getEncoder().encodeToString(token.getBytes(StandardCharsets.UTF_8));
}

@Override
public Response intercept(Chain chain) throws IOException {

Request input = chain.request();
Request requestWithCredentials = appendBasicAuthHeaderParameters(input);
Response response = chain.proceed(requestWithCredentials);
if (response.code() == 403) {
try (ResponseBody body = response.body()) {
JsonReader message = enterPath(JsonReader.of(body.source()), "message");
if (message != null) throw new IllegalStateException(message.nextString());
}
throw new IllegalStateException(response.toString());
}
return response;
}

private Request appendBasicAuthHeaderParameters(Request input) {

Request.Builder builder = input.newBuilder();
return builder.header("authorization", basicCredentials).build();
@Override public HttpResponse execute(ClientRequestContext ctx, HttpRequest req)
throws Exception {
ctx.addAdditionalRequestHeader(HttpHeaderNames.AUTHORIZATION, basicCredentials);
return HttpResponse.from(
delegate().execute(ctx, req).aggregateWithPooledObjects(ctx.eventLoop(), ctx.alloc())
.thenApply(msg -> {
HttpData content = msg.content();
try {
if (!msg.status().equals(HttpStatus.FORBIDDEN) || content.isEmpty()) {
return HttpResponse.of(msg);
}
final ByteBuffer buf;
if (content instanceof ByteBufHolder) {
buf = ((ByteBufHolder) content).content().nioBuffer();
} else {
buf = ByteBuffer.wrap(content.array());
}
try {
JsonReader message = enterPath(JsonReader.of(
Okio.buffer(Okio.source(new ByteBufferBackedInputStream(buf)))), "message");
if (message != null) throw new IllegalStateException(message.nextString());
} catch (IOException e) {
Exceptions.throwUnsafely(e);
throw new UncheckedIOException(e); // unreachable
}
throw new IllegalStateException(msg.toString());
} finally {
ReferenceCountUtil.safeRelease(content);
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,18 @@
*/
package zipkin2.server.internal.elasticsearch;

import brave.ScopedSpan;
import brave.Tracer;
import brave.Tracing;
import brave.http.HttpTracing;
import brave.okhttp3.TracingInterceptor;
import brave.propagation.CurrentTraceContext;
import com.linecorp.armeria.common.RequestContext;
import java.io.IOException;
import java.util.Collections;
import com.linecorp.armeria.client.ClientFactoryBuilder;
import com.linecorp.armeria.client.HttpClientBuilder;
import com.linecorp.armeria.client.brave.BraveClient;
import com.linecorp.armeria.client.logging.LoggingClientBuilder;
import com.linecorp.armeria.common.HttpHeaders;
import com.linecorp.armeria.common.logging.LogLevel;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import okhttp3.Dispatcher;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.logging.HttpLoggingInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Optional;
import java.util.function.Consumer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
Expand All @@ -49,7 +37,6 @@
import zipkin2.elasticsearch.ElasticsearchStorage;
import zipkin2.elasticsearch.ElasticsearchStorage.HostsSupplier;
import zipkin2.server.internal.ConditionalOnSelfTracing;
import zipkin2.server.internal.WrappingExecutorService;
import zipkin2.storage.StorageComponent;

@Configuration
Expand All @@ -59,126 +46,79 @@
public class ZipkinElasticsearchStorageAutoConfiguration {
static final String QUALIFIER = "zipkinElasticsearchHttp";

// allows extensions like zipkin-storage-elasticsearch-aws to intercept requests
@Autowired(required = false) @Qualifier(QUALIFIER)
List<Interceptor> zipkinElasticsearchHttpNetworkInterceptors = Collections.emptyList();
// allows extensions like zipkin-storage-elasticsearch-aws to control host resolution
@Autowired(required = false) HostsSupplier hostsSupplier;

// Allows us to trace the elasticsearch client
@Bean @Qualifier(QUALIFIER)
OkHttpClient.Builder zipkinElasticsearchHttpBuilder() {
return new OkHttpClient.Builder();
@Bean @Qualifier(QUALIFIER) Consumer<HttpClientBuilder> zipkinElasticsearchHttp(
@Value("${zipkin.storage.elasticsearch.timeout:10000}") int timeout) {
return client -> client.responseTimeoutMillis(timeout).writeTimeoutMillis(timeout);
}

@Bean @Qualifier(QUALIFIER) OkHttpClient zipkinElasticsearchHttp(
OkHttpClient.Builder zipkinElasticsearchHttpBuilder,
@Bean @Qualifier(QUALIFIER) Consumer<ClientFactoryBuilder> zipkinElasticsearchClientFactory(
@Value("${zipkin.storage.elasticsearch.timeout:10000}") int timeout) {
for (Interceptor interceptor : zipkinElasticsearchHttpNetworkInterceptors) {
zipkinElasticsearchHttpBuilder.addNetworkInterceptor(interceptor);
}
return zipkinElasticsearchHttpBuilder.connectTimeout(timeout, TimeUnit.MILLISECONDS)
.readTimeout(timeout, TimeUnit.MILLISECONDS)
.writeTimeout(timeout, TimeUnit.MILLISECONDS).build();
return factory -> factory.connectTimeoutMillis(timeout);
}


@Bean @Qualifier(QUALIFIER) @Conditional(HttpLoggingSet.class)
Interceptor zipkinElasticsearchHttpLoggingInterceptor(ZipkinElasticsearchStorageProperties es) {
Logger logger = Logger.getLogger(ElasticsearchStorage.class.getName());
return new HttpLoggingInterceptor(logger::info).setLevel(es.getHttpLogging());
Consumer<HttpClientBuilder> zipkinElasticsearchHttpLogging(
anuraaga marked this conversation as resolved.
Show resolved Hide resolved
ZipkinElasticsearchStorageProperties es) {
LoggingClientBuilder builder = new LoggingClientBuilder()
.requestLogLevel(LogLevel.INFO)
.successfulResponseLogLevel(LogLevel.INFO);

switch (es.getHttpLogging()) {
case HEADERS:
builder.contentSanitizer(unused -> "");
break;
case BASIC:
builder.contentSanitizer(unused -> "");
builder.headersSanitizer(unused -> HttpHeaders.of());
break;
case BODY:
default:
break;
}

return client -> client.decorator(builder.newDecorator());
}

@Bean @Qualifier(QUALIFIER) @Conditional(BasicAuthRequired.class)
Interceptor zipkinElasticsearchHttpBasicAuthInterceptor(ZipkinElasticsearchStorageProperties es) {
return new BasicAuthInterceptor(es);
Consumer<HttpClientBuilder> zipkinElasticsearchHttpBasicAuth(
ZipkinElasticsearchStorageProperties es) {
return client -> client.decorator(delegate -> new BasicAuthInterceptor(delegate, es));
}

@Bean @ConditionalOnMissingBean StorageComponent storage(
ZipkinElasticsearchStorageProperties elasticsearch,
@Qualifier(QUALIFIER) OkHttpClient zipkinElasticsearchHttp,
@Qualifier(QUALIFIER) List<Consumer<HttpClientBuilder>> zipkinElasticsearchHttpCustomizers,
@Qualifier(QUALIFIER) List<Consumer<ClientFactoryBuilder>>
zipkinElasticsearchClientFactoryCustomizers,
Optional<HostsSupplier> hostsSupplier,
@Value("${zipkin.query.lookback:86400000}") int namesLookback,
@Value("${zipkin.storage.strict-trace-id:true}") boolean strictTraceId,
@Value("${zipkin.storage.search-enabled:true}") boolean searchEnabled,
@Value("${zipkin.storage.autocomplete-keys:}") List<String> autocompleteKeys,
@Value("${zipkin.storage.autocomplete-ttl:3600000}") int autocompleteTtl,
@Value("${zipkin.storage.autocomplete-cardinality:20000}") int autocompleteCardinality) {
ElasticsearchStorage.Builder result = elasticsearch
.toBuilder(zipkinElasticsearchHttp)
.toBuilder()
.clientCustomizer(client -> zipkinElasticsearchHttpCustomizers.forEach(c -> c.accept(client)))
.clientFactoryCustomizer(factory ->
zipkinElasticsearchClientFactoryCustomizers.forEach(c -> c.accept(factory)))
.namesLookback(namesLookback)
.strictTraceId(strictTraceId)
.searchEnabled(searchEnabled)
.autocompleteKeys(autocompleteKeys)
.autocompleteTtl(autocompleteTtl)
.autocompleteCardinality(autocompleteCardinality);
if (hostsSupplier != null) result.hostsSupplier(hostsSupplier);
hostsSupplier.ifPresent(result::hostsSupplier);
return result.build();
}

// our elasticsearch impl uses an instance of OkHttpClient, not Call.Factory, so we have to trace
// carefully the pieces inside OkHttpClient
@Configuration
@ConditionalOnSelfTracing
static class TracingOkHttpClientBuilderEnhancer implements BeanPostProcessor {

@Autowired(required = false) HttpTracing httpTracing;
@Autowired(required = false) Tracing tracing;

@Override public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean;
}

@Override public Object postProcessAfterInitialization(Object bean, String beanName) {
if (httpTracing == null || !"zipkinElasticsearchHttpBuilder".equals(beanName)) return bean;
Tracer tracer = tracing.tracer();

OkHttpClient.Builder builder = (OkHttpClient.Builder) bean;
builder.addInterceptor(new Interceptor() {
/** create a local span with the same name as the request tag */
@Override public Response intercept(Chain chain) throws IOException {
// don't start new traces (to prevent amplifying writes to local storage)
if (tracer.currentSpan() == null) return chain.proceed(chain.request());

Request request = chain.request();
ScopedSpan span = tracer.startScopedSpan(request.tag().toString());
try {
return chain.proceed(request);
} catch (RuntimeException | IOException | Error e) {
span.error(e);
throw e;
} finally {
span.finish();
}
}
});
builder.addNetworkInterceptor(
TracingInterceptor.create(httpTracing.clientOf("elasticsearch")));
ExecutorService delegate = new Dispatcher().executorService();
builder.dispatcher(new Dispatcher(makeContextAware(delegate, tracing.currentTraceContext())));
return builder;
}
}

/**
* Decorates the input such that the {@link RequestContext#current() current request context} and
* the and the {@link CurrentTraceContext#get() current trace context} at assembly time is made
* current when task is executed.
*/
static ExecutorService makeContextAware(ExecutorService delegate, CurrentTraceContext traceCtx) {
class TracingCurrentRequestContextExecutorService extends WrappingExecutorService {

@Override protected ExecutorService delegate() {
return delegate;
}

@Override protected <C> Callable<C> wrap(Callable<C> task) {
return RequestContext.current().makeContextAware(traceCtx.wrap(task));
}

@Override protected Runnable wrap(Runnable task) {
return RequestContext.current().makeContextAware(traceCtx.wrap(task));
}
@Bean @Qualifier(QUALIFIER) @ConditionalOnSelfTracing Consumer<HttpClientBuilder>
elasticsearchTracing(Optional<Tracing> tracing) {
anuraaga marked this conversation as resolved.
Show resolved Hide resolved
if (!tracing.isPresent()) {
return client -> {};
}
return new TracingCurrentRequestContextExecutorService();
return client -> client.decorator(BraveClient.newDecorator(tracing.get(), "elasticsearch"));
anuraaga marked this conversation as resolved.
Show resolved Hide resolved
}

static final class HttpLoggingSet implements Condition {
Expand Down