Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate from okhttp to armeria #2653

Merged
merged 27 commits into from
Jul 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e2314d0
Migrate more
anuraaga Jul 5, 2019
26eb587
Migrate more
anuraaga Jul 5, 2019
e476ab6
Migrate some tests.
anuraaga Jul 5, 2019
2e9e1f1
Finish migrating tests
anuraaga Jul 8, 2019
7b3031b
Update zipkin-server
anuraaga Jul 9, 2019
f0d6f72
Remove okhttp dependency
anuraaga Jul 9, 2019
3e844b9
benchmarks compile
anuraaga Jul 9, 2019
65c543e
Allow customizing connect timeout
anuraaga Jul 10, 2019
4090c8f
Integration tests pass.
anuraaga Jul 10, 2019
4a9f9de
Fix flake?
anuraaga Jul 10, 2019
d96d9ee
Disable HTTP/2 preface by default since elasticsearch v7 doesn't supp…
anuraaga Jul 10, 2019
fa39b6f
Workaround Armeria server broken when disabling HTTP/2 preface.
anuraaga Jul 11, 2019
f1ccb11
Missed one spot.
anuraaga Jul 11, 2019
b44792b
Fix ReadBuffer and do DNS round robin.
anuraaga Jul 11, 2019
88cc4c3
Update README some more.
anuraaga Jul 11, 2019
043b7b1
Wait for load balanced endpoints when starting.
anuraaga Jul 11, 2019
9eb0412
Log bodies.
anuraaga Jul 12, 2019
851d294
Copyright
anuraaga Jul 12, 2019
59eef1e
Fix decorator constructor.
anuraaga Jul 12, 2019
bb8143c
Fix health endpoint is blocking the event loop.
anuraaga Jul 12, 2019
b38f760
Don't use pooled buffer for span indexing since we copy back right aw…
anuraaga Jul 12, 2019
71e6e7f
Unregister endpoint group when closing.
anuraaga Jul 12, 2019
e4cfc4e
Work around bad interaction between recreating Storage object and Arm…
anuraaga Jul 12, 2019
0b3f903
Fuzz
anuraaga Jul 12, 2019
c2d4b30
Small cleanup
anuraaga Jul 12, 2019
76263f9
Don't reinitialize storage so much during IT.
anuraaga Jul 12, 2019
c1d3eac
Better toString
anuraaga Jul 13, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ public class BulkRequestBenchmarks {
@Benchmark public void buildAndWriteRequest_singleSpan() throws IOException {
BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
builder.build().call.request().body().writeTo(Okio.buffer(Okio.blackhole()));
Okio.buffer(Okio.blackhole()).write(builder.build().request.content().array());
}

@Benchmark public void buildAndWriteRequest_tenSpans() throws IOException {
BulkCallBuilder builder = new BulkCallBuilder(es, 6.7f, "index-span");
for (int i = 0; i < 10; i++) {
builder.index(spanIndex, "span", CLIENT_SPAN, BulkIndexWriter.SPAN);
}
builder.build().call.request().body().writeTo(Okio.buffer(Okio.blackhole()));
Okio.buffer(Okio.blackhole()).write(builder.build().request.content().array());
}

// Convenience main entry-point
Expand Down
22 changes: 6 additions & 16 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,17 +190,6 @@
<version>${project.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>logging-interceptor</artifactId>
<version>${okhttp.version}</version>
<optional>true</optional>
</dependency>

<!-- MySQL backend -->
<dependency>
Expand Down Expand Up @@ -295,6 +284,12 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>com.linecorp.armeria</groupId>
<artifactId>armeria-testing-junit4</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
Expand All @@ -309,11 +304,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.server.annotation.Get;
Expand Down Expand Up @@ -85,7 +85,7 @@ public ObjectNode fetchMetricsFromMicrometer() {
// Delegates the health endpoint from the Actuator to the root context path and can be deprecated
// in future in favour of Actuator endpoints
@Get("/health")
public HttpResponse getHealth() throws JsonProcessingException {
public AggregatedHttpResponse getHealth() throws JsonProcessingException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side q: will any of the other read methods end up blocking the event loop? ex getTraces etc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope they all return AggregatedHttpResponse so don't run on the event loop. Though I'm definitely interested in eventually migrating them to use enqueue instead of execute to be fully async :)

Health health = healthEndpoint.health();

Map<String, Object> healthJson = new LinkedHashMap<>();
Expand All @@ -96,6 +96,6 @@ public HttpResponse getHealth() throws JsonProcessingException {
ResponseHeaders headers = ResponseHeaders.builder(statusMapper.mapStatus(health.getStatus()))
.contentType(MediaType.JSON)
.setInt(HttpHeaderNames.CONTENT_LENGTH, body.length).build();
return HttpResponse.of(headers, HttpData.wrap(body));
return AggregatedHttpResponse.of(headers, HttpData.wrap(body));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,73 @@
*/
package zipkin2.server.internal.elasticsearch;

import com.linecorp.armeria.client.Client;
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.SimpleDecoratingClient;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.util.Exceptions;
import com.squareup.moshi.JsonReader;
import io.netty.buffer.ByteBufHolder;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import okhttp3.Credentials;
import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import okio.Okio;
import zipkin2.internal.ReadBuffer;

import static zipkin2.elasticsearch.internal.JsonReaders.enterPath;

/**
* Adds basic auth username and password to every request per
* https://www.elastic.co/guide/en/x-pack/current/how-security-works.html
*/
final class BasicAuthInterceptor implements Interceptor {
final class BasicAuthInterceptor extends SimpleDecoratingClient<HttpRequest, HttpResponse> {

private String basicCredentials;

BasicAuthInterceptor(ZipkinElasticsearchStorageProperties es) {
basicCredentials = Credentials.basic(es.getUsername(), es.getPassword());
BasicAuthInterceptor(
Client<HttpRequest, HttpResponse> client,
ZipkinElasticsearchStorageProperties es) {
super(client);
String token = es.getUsername() + ':' + es.getPassword();
basicCredentials = Base64.getEncoder().encodeToString(token.getBytes(StandardCharsets.UTF_8));
}

@Override
public Response intercept(Chain chain) throws IOException {

Request input = chain.request();
Request requestWithCredentials = appendBasicAuthHeaderParameters(input);
Response response = chain.proceed(requestWithCredentials);
if (response.code() == 403) {
try (ResponseBody body = response.body()) {
JsonReader message = enterPath(JsonReader.of(body.source()), "message");
if (message != null) throw new IllegalStateException(message.nextString());
}
throw new IllegalStateException(response.toString());
}
return response;
}

private Request appendBasicAuthHeaderParameters(Request input) {

Request.Builder builder = input.newBuilder();
return builder.header("authorization", basicCredentials).build();
@Override public HttpResponse execute(ClientRequestContext ctx, HttpRequest req)
throws Exception {
ctx.addAdditionalRequestHeader(HttpHeaderNames.AUTHORIZATION, basicCredentials);
return HttpResponse.from(
delegate().execute(ctx, req).aggregateWithPooledObjects(ctx.eventLoop(), ctx.alloc())
.thenApply(msg -> {
HttpData content = msg.content();
if (!msg.status().equals(HttpStatus.FORBIDDEN) || content.isEmpty()) {
return HttpResponse.of(msg);
}
try {
final ByteBuffer buf;
if (content instanceof ByteBufHolder) {
buf = ((ByteBufHolder) content).content().nioBuffer();
} else {
buf = ByteBuffer.wrap(content.array());
}
try {
JsonReader message = enterPath(JsonReader.of(
Okio.buffer(Okio.source(ReadBuffer.wrapUnsafe(buf)))), "message");
if (message != null) throw new IllegalStateException(message.nextString());
} catch (IOException e) {
Exceptions.throwUnsafely(e);
throw new UncheckedIOException(e); // unreachable
}
throw new IllegalStateException(msg.toString());
} finally {
ReferenceCountUtil.safeRelease(content);
}
}));
}
}