Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow configuring a timeout for Endpoint selection in EndpointGroup #4246

Merged
merged 17 commits into from Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -83,10 +83,11 @@ public static ConsulEndpointGroupBuilder builder(URI consulUri, String serviceNa
@Nullable
private volatile ScheduledFuture<?> scheduledFuture;

ConsulEndpointGroup(EndpointSelectionStrategy selectionStrategy, ConsulClient consulClient,
ConsulEndpointGroup(EndpointSelectionStrategy selectionStrategy, boolean allowEmptyEndpoints,
long selectionTimeoutMillis, ConsulClient consulClient,
String serviceName, long registryFetchIntervalMillis, boolean useHealthyEndpoints,
@Nullable String datacenter, @Nullable String filter) {
super(selectionStrategy);
super(selectionStrategy, allowEmptyEndpoints, selectionTimeoutMillis);
this.consulClient = requireNonNull(consulClient, "consulClient");
this.serviceName = requireNonNull(serviceName, "serviceName");
this.registryFetchIntervalMillis = registryFetchIntervalMillis;
Expand Down
Expand Up @@ -21,8 +21,10 @@
import java.net.URI;
import java.time.Duration;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.AbstractDynamicEndpointGroupBuilder;
import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy;
import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.common.consul.ConsulConfigSetters;
Expand Down Expand Up @@ -58,6 +60,7 @@ public final class ConsulEndpointGroupBuilder
private String filter;

ConsulEndpointGroupBuilder(URI consulUri, String serviceName) {
super(Flags.defaultResponseTimeoutMillis());
this.serviceName = requireNonNull(serviceName, "serviceName");
consulClientBuilder = ConsulClient.builder(consulUri);
}
Expand Down Expand Up @@ -114,7 +117,8 @@ public ConsulEndpointGroupBuilder datacenter(String datacenter) {
}

/**
* Filters the endpoints using the Consul <a href="https://www.consul.io/api-docs/features/filtering">filter</a>.
* Filters the endpoints using the Consul
* <a href="https://www.consul.io/api-docs/features/filtering">filter</a>.
* If not set, all endpoints are returned.
*/
public ConsulEndpointGroupBuilder filter(String filter) {
Expand All @@ -138,12 +142,33 @@ public ConsulEndpointGroupBuilder consulToken(String consulToken) {
* Returns a newly-created {@link ConsulEndpointGroup}.
*/
public ConsulEndpointGroup build() {
return new ConsulEndpointGroup(selectionStrategy, consulClientBuilder.build(), serviceName,
registryFetchIntervalMillis, useHealthyEndpoints, datacenter, filter);
return new ConsulEndpointGroup(selectionStrategy, shouldAllowEmptyEndpoints(), selectionTimeoutMillis(),
consulClientBuilder.build(), serviceName, registryFetchIntervalMillis,
useHealthyEndpoints, datacenter, filter);
}

@Override
public ConsulEndpointGroupBuilder allowEmptyEndpoints(boolean allowEmptyEndpoints) {
return (ConsulEndpointGroupBuilder) super.allowEmptyEndpoints(allowEmptyEndpoints);
}

/**
* Sets the timeout to wait until a successful {@link Endpoint} selection.
* {@link Duration#ZERO} disables the timeout.
* If unspecified, {@link Flags#defaultResponseTimeoutMillis()} is used by default.
*/
@Override
public ConsulEndpointGroupBuilder selectionTimeout(Duration selectionTimeout) {
return (ConsulEndpointGroupBuilder) super.selectionTimeout(selectionTimeout);
}

/**
* Sets the timeout to wait until a successful {@link Endpoint} selection.
* {@code 0} disables the timeout.
* If unspecified, {@link Flags#defaultResponseTimeoutMillis()} is used by default.
*/
@Override
public ConsulEndpointGroupBuilder selectionTimeoutMillis(long selectionTimeoutMillis) {
return (ConsulEndpointGroupBuilder) super.selectionTimeoutMillis(selectionTimeoutMillis);
}
}
@@ -0,0 +1,46 @@
/*
* Copyright 2022 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.client.consul;

import static org.assertj.core.api.Assertions.assertThat;

import java.net.URI;

import org.junit.jupiter.api.Test;

import com.linecorp.armeria.common.Flags;

class ConsulEndpointGroupBuilderTest {

@Test
void selectionTimeout_default() {
try (ConsulEndpointGroup group = ConsulEndpointGroup.of(URI.create("http://127.0.0.1/node"),
"my-service")) {
assertThat(group.selectionTimeoutMillis()).isEqualTo(Flags.defaultResponseTimeoutMillis());
}
}

@Test
void selectionTimeout_custom() {
try (ConsulEndpointGroup group =
ConsulEndpointGroup.builder(URI.create("http://127.0.0.1/node"), "my-service")
.selectionTimeoutMillis(4000)
.build()) {
assertThat(group.selectionTimeoutMillis()).isEqualTo(4000);
}
}
}
Expand Up @@ -317,7 +317,7 @@ private CompletableFuture<Boolean> initEndpointGroup(EndpointGroup endpointGroup

// Use an arbitrary event loop for asynchronous Endpoint selection.
final EventLoop temporaryEventLoop = options().factory().eventLoopSupplier().get();
return endpointGroup.select(this, temporaryEventLoop, connectTimeoutMillis()).handle((e, cause) -> {
return endpointGroup.select(this, temporaryEventLoop).handle((e, cause) -> {
updateEndpoint(e);
acquireEventLoop(endpointGroup);

Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/com/linecorp/armeria/client/Endpoint.java
Expand Up @@ -268,16 +268,27 @@ public Endpoint selectNow(ClientRequestContext ctx) {
return this;
}

@Deprecated
@Override
public CompletableFuture<Endpoint> select(ClientRequestContext ctx,
ScheduledExecutorService executor,
long timeoutMillis) {
return select(ctx, executor);
}

@Override
public CompletableFuture<Endpoint> select(ClientRequestContext ctx, ScheduledExecutorService executor) {
if (selectFuture == null) {
selectFuture = UnmodifiableFuture.completedFuture(this);
}
return selectFuture;
}

@Override
public long selectionTimeoutMillis() {
return 0;
}

@Override
public CompletableFuture<List<Endpoint>> whenReady() {
if (whenReadyFuture == null) {
Expand Down
Expand Up @@ -15,7 +15,12 @@
*/
package com.linecorp.armeria.client.endpoint;

import static com.google.common.base.Preconditions.checkArgument;

import java.time.Duration;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.annotation.UnstableApi;

/**
Expand All @@ -25,11 +30,23 @@
public abstract class AbstractDynamicEndpointGroupBuilder implements DynamicEndpointGroupSetters {

private boolean allowEmptyEndpoints = true;
private long selectionTimeoutMillis;

/**
* Creates a new instance.
*/
protected AbstractDynamicEndpointGroupBuilder() {}
protected AbstractDynamicEndpointGroupBuilder() {
this(Flags.defaultConnectTimeoutMillis());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default for DnsEndpointGroupBuilder will be connectTimeout instead of responseTimeout. I wanted to check if this is your intention.

If the intention is to set responseTimeout as the default for pre-defined EndpointGroups in armeria, what do you think of just removing the empty constructor?
We can catch these type of mistakes at compile-time this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late response.

I think the default for DnsEndpointGroupBuilder will be connectTimeout instead of responseTimeout. I wanted to check if this is your intention.

Currently, the default select timeout for DnsEndpointGroup is the default connection timeout.

assertSelectionTimeout(endpointGroup).isEqualTo(Flags.defaultConnectTimeoutMillis());

DNS servers usually return a response quickly because they cache DNS records. Therefore, 3.2 seconds may be reasonable.

By the way, I checked the default timeout for a DNS query. It is 5 seconds that can be a more sensible default timeout for `DnsEndpointGroup/

what do you think of just removing the empty constructor?

As this class is a public API, that could cause a breaking change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this class is a public API, that could cause a breaking change.

AbstractDynamicEndpointGroupBuilder is part of unstable APIs. Will remove the default constructor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds reasonable to use the same timeout for DnsEndpointGroup and our DNS resolver.

}

/**
* Creates a new instance with the specified {@code selectionTimeoutMillis}.
*/
protected AbstractDynamicEndpointGroupBuilder(long selectionTimeoutMillis) {
checkArgument(selectionTimeoutMillis >= 0, "selectionTimeoutMillis: %s (expected: >= 0)",
selectionTimeoutMillis);
this.selectionTimeoutMillis = selectionTimeoutMillis;
}

// Note that we don't have `selectionStrategy()` here because some subclasses are delegating and
// thus they use the `EndpointSelectionStrategy` of the delegate.
Expand All @@ -47,4 +64,28 @@ public AbstractDynamicEndpointGroupBuilder allowEmptyEndpoints(boolean allowEmpt
protected boolean shouldAllowEmptyEndpoints() {
return allowEmptyEndpoints;
}

@Override
public AbstractDynamicEndpointGroupBuilder selectionTimeout(Duration selectionTimeout) {
return (AbstractDynamicEndpointGroupBuilder)
DynamicEndpointGroupSetters.super.selectionTimeout(selectionTimeout);
}

@Override
public AbstractDynamicEndpointGroupBuilder selectionTimeoutMillis(long selectionTimeoutMillis) {
checkArgument(selectionTimeoutMillis >= 0, "selectionTimeoutMillis: %s (expected: >= 0)",
selectionTimeoutMillis);
if (selectionTimeoutMillis == 0) {
selectionTimeoutMillis = Long.MAX_VALUE;
}
this.selectionTimeoutMillis = selectionTimeoutMillis;
return this;
}

/**
* Returns the timeout to wait until a successful {@link Endpoint} selection.
*/
protected long selectionTimeoutMillis() {
trustin marked this conversation as resolved.
Show resolved Hide resolved
return selectionTimeoutMillis;
}
}
Expand Up @@ -25,14 +25,16 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import com.google.common.annotations.VisibleForTesting;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.common.util.UnmodifiableFuture;

/**
* A skeletal {@link EndpointSelector} implementation. This abstract class implements the
* {@link #select(ClientRequestContext, ScheduledExecutorService, long)} method by listening to
* {@link #select(ClientRequestContext, ScheduledExecutorService)} method by listening to
* the change events emitted by {@link EndpointGroup} specified at construction time.
*/
public abstract class AbstractEndpointSelector implements EndpointSelector {
Expand All @@ -53,10 +55,17 @@ protected final EndpointGroup group() {
return endpointGroup;
}

@Deprecated
@Override
public final CompletableFuture<Endpoint> select(ClientRequestContext ctx,
ScheduledExecutorService executor,
long timeoutMillis) {
return select(ctx, executor);
}

@Override
public final CompletableFuture<Endpoint> select(ClientRequestContext ctx,
ScheduledExecutorService executor) {
Endpoint endpoint = selectNow(ctx);
if (endpoint != null) {
return UnmodifiableFuture.completedFuture(endpoint);
Expand All @@ -73,25 +82,41 @@ public final CompletableFuture<Endpoint> select(ClientRequestContext ctx,
return UnmodifiableFuture.completedFuture(endpoint);
}

final long selectionTimeoutMillis = endpointGroup.selectionTimeoutMillis();
if (selectionTimeoutMillis == 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would it be better to use -1 to indicate StaticEndpointGroup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to differentiate the timeout of StaticEndpointGroup from others.
A DynamincEndpointGroup may have 0 selection timeout if it is made from a static resource such as a file or an environment variable.

// A static EndpointGroup.
return UnmodifiableFuture.completedFuture(null);
}
long responseTimeoutMillis = ctx.responseTimeoutMillis();
if (responseTimeoutMillis == 0) {
responseTimeoutMillis = Long.MAX_VALUE;
}

final long timeoutMillis = Math.min(selectionTimeoutMillis, responseTimeoutMillis);

// Schedule the timeout task.
final ScheduledFuture<?> timeoutFuture =
executor.schedule(() -> listeningFuture.complete(null),
timeoutMillis, TimeUnit.MILLISECONDS);
listeningFuture.timeoutFuture = timeoutFuture;

// Cancel the timeout task if listeningFuture is done already.
// This guards against the following race condition:
// 1) (Current thread) Timeout task is scheduled.
// 2) ( Other thread ) listeningFuture is completed, but the timeout task is not cancelled
// 3) (Current thread) timeoutFuture is assigned to listeningFuture.timeoutFuture, but it's too late.
if (listeningFuture.isDone()) {
timeoutFuture.cancel(false);
if (timeoutMillis < Long.MAX_VALUE) {
final ScheduledFuture<?> timeoutFuture = executor.schedule(() -> {
listeningFuture.complete(null);
}, timeoutMillis, TimeUnit.MILLISECONDS);
listeningFuture.timeoutFuture = timeoutFuture;

// Cancel the timeout task if listeningFuture is done already.
// This guards against the following race condition:
// 1) (Current thread) Timeout task is scheduled.
// 2) ( Other thread ) listeningFuture is completed, but the timeout task is not cancelled
// 3) (Current thread) timeoutFuture is assigned to listeningFuture.timeoutFuture, but it's too
// late.
if (listeningFuture.isDone()) {
timeoutFuture.cancel(false);
}
}

return listeningFuture;
}

private class ListeningFuture extends CompletableFuture<Endpoint> implements Consumer<List<Endpoint>> {
@VisibleForTesting
class ListeningFuture extends CompletableFuture<Endpoint> implements Consumer<List<Endpoint>> {
ikhoon marked this conversation as resolved.
Show resolved Hide resolved
private final ClientRequestContext ctx;
private final Executor executor;
@Nullable
Expand Down Expand Up @@ -150,5 +175,11 @@ private void cleanup() {
timeoutFuture.cancel(false);
}
}

@Nullable
@VisibleForTesting
ScheduledFuture<?> timeoutFuture() {
return timeoutFuture;
}
}
}
Expand Up @@ -43,6 +43,7 @@ final class CompositeEndpointGroup extends AbstractEndpointGroup implements List

private final EndpointSelectionStrategy selectionStrategy;
private final EndpointSelector selector;
private final long selectionTimeoutMillis;

private final AsyncCloseableSupport closeable = AsyncCloseableSupport.of(this::closeAsync);

Expand All @@ -57,12 +58,16 @@ final class CompositeEndpointGroup extends AbstractEndpointGroup implements List
this.endpointGroups = ImmutableList.copyOf(requireNonNull(endpointGroups, "endpointGroups"));
dirty = new AtomicBoolean(true);

long selectionTimeoutMillis = 0;
for (EndpointGroup endpointGroup : endpointGroups) {
endpointGroup.addListener(unused -> {
dirty.set(true);
notifyListeners(endpoints());
});
selectionTimeoutMillis = Math.max(selectionTimeoutMillis,
endpointGroup.selectionTimeoutMillis());
}
this.selectionTimeoutMillis = selectionTimeoutMillis;

initialEndpointsFuture =
CompletableFuture.anyOf(this.endpointGroups.stream()
Expand Down Expand Up @@ -104,11 +109,22 @@ public Endpoint selectNow(ClientRequestContext ctx) {
return selector.selectNow(ctx);
}

@Deprecated
@Override
public CompletableFuture<Endpoint> select(ClientRequestContext ctx,
ScheduledExecutorService executor,
long timeoutMillis) {
return selector.select(ctx, executor, timeoutMillis);
return select(ctx, executor);
}

@Override
public CompletableFuture<Endpoint> select(ClientRequestContext ctx, ScheduledExecutorService executor) {
return selector.select(ctx, executor);
}

@Override
public long selectionTimeoutMillis() {
return selectionTimeoutMillis;
}

@Override
Expand Down