Skip to content

Commit

Permalink
Update to Armeria 0.97.0 (#2943)
Browse files Browse the repository at this point in the history
* Update to Armeria 0.97.0

* Update benchmark too
  • Loading branch information
anuraaga committed Dec 8, 2019
1 parent 02a28eb commit 05bfa8f
Show file tree
Hide file tree
Showing 23 changed files with 89 additions and 140 deletions.
Expand Up @@ -15,7 +15,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.io.Closer;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.WebClient;
import io.netty.handler.codec.http.QueryStringEncoder;
import java.io.File;
import java.nio.file.Files;
Expand Down Expand Up @@ -207,7 +207,7 @@ void runBenchmark(@Nullable GenericContainer<?> storage) throws Exception {
System.out.println("Benchmark complete, wrk output:");
System.out.println(wrk.getLogs().replace("\n\n", "\n"));

HttpClient prometheusClient = HttpClient.of(
WebClient prometheusClient = WebClient.of(
"h1c://" + prometheus.getContainerIpAddress() + ":" + prometheus.getFirstMappedPort());

System.out.println(String.format("Messages received: %s", prometheusValue(
Expand Down Expand Up @@ -321,22 +321,22 @@ static void printContainerMapping(GenericContainer<?> container) {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
}

static void printQuartiles(HttpClient prometheus, String metric) throws Exception {
static void printQuartiles(WebClient prometheus, String metric) throws Exception {
for (double quantile : Arrays.asList(0.0, 0.25, 0.5, 0.75, 1.0)) {
String value = prometheusValue(prometheus, "quantile(" + quantile + ", " + metric + ")");
System.out.println(String.format("%s[%s] = %s", metric, quantile, value));
}
}

static void printHistogram(HttpClient prometheus, String metric) throws Exception {
static void printHistogram(WebClient prometheus, String metric) throws Exception {
for (double quantile : Arrays.asList(0.5, 0.9, 0.99)) {
String value =
prometheusValue(prometheus, "histogram_quantile(" + quantile + ", " + metric + ")");
System.out.println(String.format("%s[%s] = %s", metric, quantile, value));
}
}

static String prometheusValue(HttpClient prometheus, String query) throws Exception {
static String prometheusValue(WebClient prometheus, String query) throws Exception {
QueryStringEncoder encoder = new QueryStringEncoder("/api/v1/query");
encoder.addParam("query", query);
String response = prometheus.get(encoder.toString()).aggregate().join().contentUtf8();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -49,7 +49,7 @@

<!-- This allows you to test feature branches with jitpack -->
<armeria.groupId>com.linecorp.armeria</armeria.groupId>
<armeria.version>0.95.0</armeria.version>
<armeria.version>0.97.0</armeria.version>
<!-- This should only be used in tests, and be careful to avoid >= v20 apis -->
<guava.version>28.1-jre</guava.version>

Expand Down
Expand Up @@ -13,9 +13,9 @@
*/
package zipkin.minimal;

import com.linecorp.armeria.server.Server;
import java.io.IOException;
import java.util.Collections;
import com.linecorp.armeria.server.Server;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
Expand Down
Expand Up @@ -15,11 +15,8 @@

import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.MediaType;
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.Service;
import com.linecorp.armeria.server.cors.CorsServiceBuilder;
import com.linecorp.armeria.server.file.HttpFileBuilder;
import com.linecorp.armeria.server.metric.PrometheusExpositionService;
Expand Down Expand Up @@ -51,7 +48,7 @@ public class ZipkinHttpConfiguration {
@Value("${zipkin.query.timeout:11s}") Duration queryTimeout) {
return sb -> {
httpQuery.ifPresent(h -> {
Function<Service<HttpRequest, HttpResponse>, Service<HttpRequest, HttpResponse>>
Function<HttpService, HttpService>
timeoutDecorator = service -> (ctx, req) -> {
ctx.setRequestTimeout(queryTimeout);
return service.serve(ctx, req);
Expand Down
Expand Up @@ -13,9 +13,9 @@
*/
package zipkin2.server.internal.elasticsearch;

import com.linecorp.armeria.client.Client;
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.SimpleDecoratingClient;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.SimpleDecoratingHttpClient;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
Expand All @@ -26,11 +26,11 @@
/**
* Adds basic auth username and password to every request per https://www.elastic.co/guide/en/x-pack/current/how-security-works.html
*/
final class BasicAuthInterceptor extends SimpleDecoratingClient<HttpRequest, HttpResponse> {
final class BasicAuthInterceptor extends SimpleDecoratingHttpClient {

final String basicCredentials;

BasicAuthInterceptor(Client<HttpRequest, HttpResponse> client, String username, String password) {
BasicAuthInterceptor(HttpClient client, String username, String password) {
super(client);
if (username == null) throw new NullPointerException("username == null");
if (password == null) throw new NullPointerException("password == null");
Expand Down
Expand Up @@ -17,7 +17,7 @@
import com.linecorp.armeria.client.ClientOptions;
import com.linecorp.armeria.client.ClientOptionsBuilder;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.client.encoding.HttpDecodingClient;
import com.linecorp.armeria.client.logging.LoggingClientBuilder;
import com.linecorp.armeria.client.metric.MetricCollectingClient;
Expand All @@ -33,7 +33,7 @@
import zipkin2.server.internal.elasticsearch.ZipkinElasticsearchStorageProperties.HttpLogging;

// Exposed as a bean so that zipkin-aws can use this for api requests to get initial endpoints.
public class HttpClientFactory implements Function<Endpoint, HttpClient>, Closeable {
public class HttpClientFactory implements Function<Endpoint, WebClient>, Closeable {
final SessionProtocol protocol;
final ClientOptions options;
final ClientFactory delegate;
Expand Down Expand Up @@ -84,8 +84,8 @@ public class HttpClientFactory implements Function<Endpoint, HttpClient>, Closea
this.options = configureOptionsExceptLogging(options).build();
}

@Override public HttpClient apply(Endpoint endpoint) {
return HttpClient.of(delegate, protocol, endpoint, options);
@Override public WebClient apply(Endpoint endpoint) {
return WebClient.of(delegate, protocol, endpoint, options);
}

@Override public void close() {
Expand Down
Expand Up @@ -14,7 +14,7 @@
package zipkin2.server.internal.elasticsearch;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.client.endpoint.EndpointGroupRegistry;
import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroup;
Expand All @@ -37,7 +37,7 @@ final class LazyHttpClientImpl implements LazyHttpClient {
final int timeoutMillis;
final MeterRegistry meterRegistry;

volatile HttpClient result;
volatile WebClient result;

LazyHttpClientImpl(HttpClientFactory factory, SessionProtocol protocol,
Supplier<EndpointGroup> initialEndpoints, ZipkinElasticsearchStorageProperties es,
Expand All @@ -58,7 +58,7 @@ final class LazyHttpClientImpl implements LazyHttpClient {
}
}

@Override public HttpClient get() {
@Override public WebClient get() {
if (result == null) {
synchronized (this) {
if (result == null) {
Expand Down
Expand Up @@ -13,9 +13,9 @@
*/
package zipkin2.server.internal.elasticsearch;

import com.linecorp.armeria.client.Client;
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.SimpleDecoratingClient;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.SimpleDecoratingHttpClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
Expand All @@ -27,9 +27,9 @@
*/
// TODO: move upstream https://github.com/line/armeria/issues/1997
// TODO: unit test coverage
final class RawContentLoggingClient extends SimpleDecoratingClient<HttpRequest, HttpResponse> {
final class RawContentLoggingClient extends SimpleDecoratingHttpClient {

RawContentLoggingClient(Client<HttpRequest, HttpResponse> delegate) {
RawContentLoggingClient(HttpClient delegate) {
super(delegate);
}

Expand Down
Expand Up @@ -13,17 +13,16 @@
*/
package zipkin2.server.internal.prometheus;

import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.Request;
import com.linecorp.armeria.common.RequestContext;
import com.linecorp.armeria.common.Response;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.common.logging.RequestLogAvailability;
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.Route;
import com.linecorp.armeria.server.Service;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.SimpleDecoratingService;
import com.linecorp.armeria.server.SimpleDecoratingHttpService;
import com.linecorp.armeria.spring.ArmeriaServerConfigurator;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -94,7 +93,7 @@ public class ZipkinPrometheusMetricsConfiguration {
return serverBuilder -> serverBuilder.routeDecorator()
.pathPrefix("/zipkin/api")
.pathPrefix("/api")
.build(s -> new MetricCollectingService<>(s, registry, metricName));
.build(s -> new MetricCollectingService(s, registry, metricName));
}

// We need to make sure not-found requests are still handled by a service to be decorated for
Expand All @@ -108,19 +107,18 @@ ArmeriaServerConfigurator notFoundMetricCollector() {
(ctx, req) -> HttpResponse.of(HttpStatus.NOT_FOUND));
}

static final class MetricCollectingService<I extends Request, O extends Response>
extends SimpleDecoratingService<I, O> {
static final class MetricCollectingService extends SimpleDecoratingHttpService {
final MeterRegistry registry;
final String metricName;

MetricCollectingService(Service<I, O> delegate, MeterRegistry registry, String metricName) {
MetricCollectingService(HttpService delegate, MeterRegistry registry, String metricName) {
super(delegate);
this.registry = registry;
this.metricName = metricName;
}

@Override
public O serve(ServiceRequestContext ctx, I req) throws Exception {
public HttpResponse serve(ServiceRequestContext ctx, HttpRequest req) throws Exception {
setup(ctx, registry, metricName);
return delegate().serve(ctx, req);
}
Expand Down
Expand Up @@ -15,7 +15,7 @@

import com.linecorp.armeria.client.ClientFactory;
import com.linecorp.armeria.client.ClientFactoryBuilder;
import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.SessionProtocol;
Expand Down Expand Up @@ -77,7 +77,7 @@ public class ITZipkinServerSsl {
}

void callHealthEndpoint(SessionProtocol http) {
AggregatedHttpResponse response = HttpClient.of(clientFactory, baseUrl(server, http))
AggregatedHttpResponse response = WebClient.of(clientFactory, baseUrl(server, http))
.get("/health")
.aggregate().join();

Expand Down
Expand Up @@ -18,7 +18,6 @@
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Configuration;
import zipkin2.collector.activemq.ActiveMQCollector;
import zipkin2.server.internal.activemq.ZipkinActiveMQCollectorProperties;

/** opens package access for testing */
public final class Access {
Expand Down
Expand Up @@ -13,15 +13,9 @@
*/
package zipkin2.server.internal.elasticsearch;

import com.linecorp.armeria.client.Client;
import com.linecorp.armeria.client.ClientOption;
import com.linecorp.armeria.client.ClientOptionsBuilder;
import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.RequestHeaders;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.SessionProtocol;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
Expand All @@ -38,13 +32,6 @@
import zipkin2.elasticsearch.ElasticsearchStorage;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class ZipkinElasticsearchStorageConfigurationTest {
final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
Expand Down Expand Up @@ -288,8 +275,7 @@ public void dailyIndexFormat_overridingDateSeparator_invalidToBeMultiChar() {
}

@Test
public void doesntProvideBasicAuthInterceptor_whenBasicAuthUserNameandPasswordNotConfigured()
throws Exception {
public void doesntProvideBasicAuthInterceptor_whenBasicAuthUserNameandPasswordNotConfigured() {
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:127.0.0.1:1234")
Expand All @@ -298,29 +284,13 @@ public void doesntProvideBasicAuthInterceptor_whenBasicAuthUserNameandPasswordNo
context.refresh();

HttpClientFactory factory = context.getBean(HttpClientFactory.class);

Client<HttpRequest, HttpResponse> delegate = mock(Client.class);
Client<HttpRequest, HttpResponse> decorated =
factory.options.decoration().decorate(HttpRequest.class, HttpResponse.class, delegate);

HttpRequest req = HttpRequest.of(RequestHeaders.builder()
.method(HttpMethod.GET)
.scheme("http")
.authority("localhost")
.path("/")
.build()
);
// TODO(anuraaga): This can be cleaner after https://github.com/line/armeria/issues/1883
ClientRequestContext ctx = spy(ClientRequestContext.of(req));
when(delegate.execute(any(), any())).thenReturn(HttpResponse.of(HttpStatus.OK));

decorated.execute(ctx, req);

verify(ctx, never()).addAdditionalRequestHeader(eq(HttpHeaderNames.AUTHORIZATION), any());
WebClient client = WebClient.builder("http://127.0.0.1:1234")
.option(ClientOption.DECORATION, factory.options.decoration())
.build();
assertThat(client.as(BasicAuthInterceptor.class)).isEmpty();
}

@Test public void providesBasicAuthInterceptor_whenBasicAuthUserNameAndPasswordConfigured()
throws Exception {
@Test public void providesBasicAuthInterceptor_whenBasicAuthUserNameAndPasswordConfigured() {
TestPropertyValues.of(
"zipkin.storage.type:elasticsearch",
"zipkin.storage.elasticsearch.hosts:127.0.0.1:1234",
Expand All @@ -332,24 +302,10 @@ public void doesntProvideBasicAuthInterceptor_whenBasicAuthUserNameandPasswordNo

HttpClientFactory factory = context.getBean(HttpClientFactory.class);

Client<HttpRequest, HttpResponse> delegate = mock(Client.class);
Client<HttpRequest, HttpResponse> decorated = factory.options.decoration()
.decorate(HttpRequest.class, HttpResponse.class, delegate);

HttpRequest req = HttpRequest.of(RequestHeaders.builder()
.method(HttpMethod.GET)
.scheme("http")
.authority("localhost")
.path("/")
.build()
);
// TODO(anuraaga): This can be cleaner after https://github.com/line/armeria/issues/1883
ClientRequestContext ctx = spy(ClientRequestContext.of(req));
when(delegate.execute(any(), any())).thenReturn(HttpResponse.of(HttpStatus.OK));

decorated.execute(ctx, req);

verify(ctx).addAdditionalRequestHeader(eq(HttpHeaderNames.AUTHORIZATION), any());
WebClient client = WebClient.builder("http://127.0.0.1:1234")
.option(ClientOption.DECORATION, factory.options.decoration())
.build();
assertThat(client.as(BasicAuthInterceptor.class)).isPresent();
}

@Test public void searchEnabled_false() {
Expand Down
Expand Up @@ -13,7 +13,7 @@
*/
package zipkin2.server.internal.ui;

import com.linecorp.armeria.client.HttpClient;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpMethod;
Expand Down Expand Up @@ -143,7 +143,7 @@ private Response conditionalGet(String path, String etag) throws IOException {

private String getContentEncodingFromRequestThatAcceptsGzip(String path) {
// We typically use OkHttp in our tests, but that automatically unzips..
AggregatedHttpResponse response = HttpClient.of(url(server, "/"))
AggregatedHttpResponse response = WebClient.of(url(server, "/"))
.execute(RequestHeaders.of(HttpMethod.GET, path, HttpHeaderNames.ACCEPT_ENCODING, "gzip"))
.aggregate().join();

Expand Down

0 comments on commit 05bfa8f

Please sign in to comment.