Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way to fluently convert an response with WebClient and add BlockingWebClient #4021

Merged
merged 20 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected void closeClient(WebClient client) {

@Override
protected void get(WebClient client, String pathIncludingQuery) {
client.get(pathIncludingQuery).aggregate().join();
client.blocking().get(pathIncludingQuery);
}

@Override
Expand All @@ -172,12 +172,12 @@ protected void get(WebClient client, String path, BiConsumer<Integer, Throwable>

@Override
protected void post(WebClient client, String pathIncludingQuery, String body) {
client.post(pathIncludingQuery, body).aggregate().join();
client.blocking().post(pathIncludingQuery, body);
}

@Override
protected void options(WebClient client, String path) {
client.options(path).aggregate().join();
client.blocking().options(path);
}

static ServiceRequestContext serverContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import com.linecorp.armeria.client.BlockingWebClient;
import com.linecorp.armeria.client.ClientFactory;
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.client.InvalidResponseHeadersException;
Expand Down Expand Up @@ -263,29 +264,31 @@ private static Tracing newTracing(String name) {
void testTimingAnnotations() {
// Use separate client factory to make sure connection is created.
try (ClientFactory clientFactory = ClientFactory.builder().build()) {
final WebClient client = WebClient.builder(server.httpUri())
.factory(clientFactory)
.decorator(BraveClient.newDecorator(newTracing("timed-client")))
.build();
assertThat(client.get("/http").aggregate().join().status()).isEqualTo(HttpStatus.OK);
final BlockingWebClient client =
WebClient.builder(server.httpUri())
.factory(clientFactory)
.decorator(BraveClient.newDecorator(newTracing("timed-client")))
.build()
.blocking();
assertThat(client.get("/http").status()).isEqualTo(HttpStatus.OK);
final MutableSpan[] initialConnectSpans = spanHandler.take(1);
assertThat(initialConnectSpans[0].annotations())
.extracting(Map.Entry::getValue).containsExactlyInAnyOrder(
"connection-acquire.start",
"socket-connect.start",
"socket-connect.end",
"connection-acquire.end",
"ws",
"wr");
"connection-acquire.start",
"socket-connect.start",
"socket-connect.end",
"connection-acquire.end",
"ws",
"wr");

// Make another request which will reuse the connection so no connection timing.
assertThat(client.get("/http").aggregate().join().status()).isEqualTo(HttpStatus.OK);
assertThat(client.get("/http").status()).isEqualTo(HttpStatus.OK);

final MutableSpan[] secondConnectSpans = spanHandler.take(1);
assertThat(secondConnectSpans[0].annotations())
.extracting(Map.Entry::getValue).containsExactlyInAnyOrder(
"ws",
"wr");
"ws",
"wr");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.junit.Rule;
import org.junit.Test;

import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.client.BlockingWebClient;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpHeaderNames;
import com.linecorp.armeria.common.HttpRequest;
Expand Down Expand Up @@ -90,8 +90,8 @@ protected void configure(ServerBuilder sb) throws Exception {

@Test
public void serve1() throws Exception {
final WebClient client = WebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response = client.get("/http-serve").aggregate().get();
final BlockingWebClient client = BlockingWebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response = client.get("/http-serve");
assertThat(response.status()).isEqualTo(HttpStatus.OK);

assertThat(response.headers().contains(HttpHeaderNames.RETRY_AFTER)).isFalse();
Expand All @@ -108,8 +108,8 @@ public void serve1() throws Exception {

@Test
public void throttle1() throws Exception {
final WebClient client = WebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle1").aggregate().get();
final BlockingWebClient client = BlockingWebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle1");
assertThat(response1.status()).isEqualTo(HttpStatus.OK);

assertThat(response1.headers().contains(HttpHeaderNames.RETRY_AFTER)).isFalse();
Expand All @@ -121,7 +121,7 @@ public void throttle1() throws Exception {
assertThat(reset1).isBetween(0L, 10L);
assertThat(response1.headers().contains("X-RateLimit-Limit")).isFalse();

final AggregatedHttpResponse response2 = client.get("/http-throttle1").aggregate().get();
final AggregatedHttpResponse response2 = client.get("/http-throttle1");
assertThat(response2.status()).isEqualTo(HttpStatus.TOO_MANY_REQUESTS);

assertThat(response2.headers().contains(HttpHeaderNames.RETRY_AFTER)).isTrue();
Expand All @@ -138,8 +138,8 @@ public void throttle1() throws Exception {

@Test
public void throttle2() throws Exception {
final WebClient client = WebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle2").aggregate().get();
final BlockingWebClient client = BlockingWebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle2");
assertThat(response1.status()).isEqualTo(HttpStatus.OK);

assertThat(response1.headers().contains(HttpHeaderNames.RETRY_AFTER)).isFalse();
Expand All @@ -151,7 +151,7 @@ public void throttle2() throws Exception {
assertThat(reset1).isBetween(0L, 10L);
assertThat(response1.headers().get("X-RateLimit-Limit")).isEqualTo("1, 1;window=10");

final AggregatedHttpResponse response2 = client.get("/http-throttle2").aggregate().get();
final AggregatedHttpResponse response2 = client.get("/http-throttle2");
assertThat(response2.status()).isEqualTo(HttpStatus.TOO_MANY_REQUESTS);

assertThat(response2.headers().contains(HttpHeaderNames.RETRY_AFTER, "15")).isTrue();
Expand All @@ -164,8 +164,8 @@ public void throttle2() throws Exception {

@Test
public void throttle3() throws Exception {
final WebClient client = WebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle3").aggregate().get();
final BlockingWebClient client = BlockingWebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle3");
assertThat(response1.status()).isEqualTo(HttpStatus.OK);

assertThat(response1.headers().contains(HttpHeaderNames.RETRY_AFTER)).isFalse();
Expand All @@ -174,7 +174,7 @@ public void throttle3() throws Exception {
assertThat(response1.headers().contains("X-RateLimit-Remaining")).isFalse();
assertThat(response1.headers().contains("X-RateLimit-Reset")).isFalse();

final AggregatedHttpResponse response2 = client.get("/http-throttle3").aggregate().get();
final AggregatedHttpResponse response2 = client.get("/http-throttle3");
assertThat(response2.status()).isEqualTo(HttpStatus.TOO_MANY_REQUESTS);

assertThat(response2.headers().contains(HttpHeaderNames.RETRY_AFTER)).isTrue();
Expand All @@ -188,8 +188,8 @@ public void throttle3() throws Exception {

@Test
public void throttle4() throws Exception {
final WebClient client = WebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle4").aggregate().get();
final BlockingWebClient client = BlockingWebClient.of(serverRule.httpUri());
final AggregatedHttpResponse response1 = client.get("/http-throttle4");
assertThat(response1.status()).isEqualTo(HttpStatus.OK);

assertThat(response1.headers().contains(HttpHeaderNames.RETRY_AFTER)).isFalse();
Expand All @@ -198,7 +198,7 @@ public void throttle4() throws Exception {
assertThat(response1.headers().contains("X-RateLimit-Remaining")).isFalse();
assertThat(response1.headers().contains("X-RateLimit-Reset")).isFalse();

final AggregatedHttpResponse response2 = client.get("/http-throttle4").aggregate().get();
final AggregatedHttpResponse response2 = client.get("/http-throttle4");
assertThat(response2.status()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE);

assertThat(response2.headers().contains(HttpHeaderNames.RETRY_AFTER)).isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,23 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.CollectionType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpEntity;
import com.linecorp.armeria.common.QueryParams;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.internal.common.PercentEncoder;

/**
Expand All @@ -46,19 +43,16 @@
*/
final class CatalogClient {

private static final CollectionType collectionTypeForNode =
TypeFactory.defaultInstance().constructCollectionType(List.class, Node.class);
private static final TypeReference<List<Node>> collectionTypeForNode = new TypeReference<List<Node>>() {};

static CatalogClient of(ConsulClient consulClient) {
return new CatalogClient(consulClient);
}

private final WebClient client;
private final ObjectMapper mapper;

private CatalogClient(ConsulClient client) {
this.client = client.consulWebClient();
mapper = client.getObjectMapper();
}

/**
Expand Down Expand Up @@ -87,15 +81,11 @@ CompletableFuture<List<Node>> service(String serviceName, @Nullable String datac
if (!params.isEmpty()) {
path.append('?').append(params.toQueryString());
}
return client.get(path.toString())
.aggregate()
.thenApply(response -> {
try {
return mapper.readValue(response.content().array(), collectionTypeForNode);
} catch (IOException e) {
return Exceptions.throwUnsafely(e);
}
});
return client.prepare()
.get(path.toString())
.asJson(collectionTypeForNode)
.as(HttpEntity::content)
.execute();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ class ConsulClientBuilderTest extends ConsulTestBase {
@Test
void gets403WhenNoToken() throws Exception {
final HttpStatus status = WebClient.of("http://localhost:" + consul().getHttpPort())
.get("/v1/agent/self").aggregate()
.get().status();
.blocking()
.get("/v1/agent/self")
.status();
assertThat(status).isEqualTo(HttpStatus.FORBIDDEN);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.AfterAll;
Expand All @@ -32,8 +31,8 @@

import com.fasterxml.jackson.core.JsonProcessingException;

import com.linecorp.armeria.client.BlockingWebClient;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.WebClient;
import com.linecorp.armeria.common.HttpMethod;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.internal.consul.ConsulTestBase;
Expand Down Expand Up @@ -119,67 +118,64 @@ void testThatDefaultCheckMethodIsHead() {
serverRef.set(server);
}).doesNotThrowAnyException();
});
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(
client().healthyEndpoints("testThatDefaultCheckMethodIsHead").join().size()
).isEqualTo(1));
await().untilAsserted(() -> {
assertThat(client().healthyEndpoints("testThatDefaultCheckMethodIsHead").join().size())
.isEqualTo(1);
});
serverRef.get().stop();
}

@Test
void testEndpointsCountOfListeningServiceWithAServerStopAndStart() {
// Checks sample endpoints created when initialized.
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(client().endpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints));
await().untilAsserted(() -> {
assertThat(client().endpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints);
});

// When we close one server then the listener deregister it automatically from consul agent.
servers.get(0).stop().join();

await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> {
final List<Endpoint> results = client().endpoints(serviceName).join();
assertThat(results).hasSize(sampleEndpoints.size() - 1);
});
await().untilAsserted(() -> {
final List<Endpoint> results = client().endpoints(serviceName).join();
assertThat(results).hasSize(sampleEndpoints.size() - 1);
});

// Endpoints increased after service restart.
servers.get(0).start().join();

await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(client().endpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints));
await().untilAsserted(() -> {
assertThat(client().endpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints);
});
}

@Test
void testHealthyServiceWithAdditionalCheckRule() {
// Checks sample endpoints created when initialized.
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(client().healthyEndpoints(serviceName).join())
.hasSameSizeAs(sampleEndpoints));
await().untilAsserted(() -> {
assertThat(client().healthyEndpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints);
});

// Make a service to produce 503 error for checking by consul.
final Endpoint firstEndpoint = sampleEndpoints.get(0);
final WebClient webClient = WebClient.of(firstEndpoint.toUri(SessionProtocol.HTTP));
webClient.post("echo", "503").aggregate().join();
final BlockingWebClient webClient = BlockingWebClient.of(firstEndpoint.toUri(SessionProtocol.HTTP));
webClient.post("echo", "503");

// And then, consul marks the service to an unhealthy state.
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(client().healthyEndpoints(serviceName).join())
.hasSize(sampleEndpoints.size() - 1));
await().untilAsserted(() -> {
assertThat(client().healthyEndpoints(serviceName).join())
.hasSize(sampleEndpoints.size() - 1);
});

// But, the size of endpoints does not changed.
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(client().endpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints));
await().untilAsserted(() -> {
assertThat(client().endpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints);
});

// Make a service to produce 200 OK for checking by consul.
webClient.post("echo", "200").aggregate().join();
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() ->
assertThat(client().healthyEndpoints(serviceName).join())
.hasSameSizeAs(sampleEndpoints));
webClient.post("echo", "200");
await().untilAsserted(() -> {
assertThat(client().healthyEndpoints(serviceName).join()).hasSameSizeAs(sampleEndpoints);
});
}

@Test
Expand All @@ -198,13 +194,11 @@ void testThatTagsAreAdded() {
.build();
server.addListener(listener);
server.start().join();
await().atMost(10, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(
client().healthyEndpoints("testThatTagsAreAdded", null,
"Service.Tags contains \"v1\"")
.join()
.size()
).isEqualTo(1));
await().untilAsserted(() -> {
assertThat(client().healthyEndpoints("testThatTagsAreAdded", null,
"Service.Tags contains \"v1\"").join())
.hasSize(1);
});
server.stop();
}
}