Skip to content
This repository has been archived by the owner on Sep 29, 2021. It is now read-only.

Commit

Permalink
allow client timeouts to be customized
Browse files Browse the repository at this point in the history
Allows HeliosClient and RetryingRequestDispatcher timeouts to be
configured when building either.
  • Loading branch information
mattnworb committed Feb 25, 2016
1 parent 9a662dd commit 3e01679
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 47 deletions.
Expand Up @@ -40,7 +40,6 @@
import com.spotify.helios.common.HeliosException; import com.spotify.helios.common.HeliosException;
import com.spotify.helios.common.Json; import com.spotify.helios.common.Json;
import com.spotify.helios.common.Resolver; import com.spotify.helios.common.Resolver;
import com.spotify.helios.common.SystemClock;
import com.spotify.helios.common.Version; import com.spotify.helios.common.Version;
import com.spotify.helios.common.VersionCompatibility; import com.spotify.helios.common.VersionCompatibility;
import com.spotify.helios.common.descriptors.Deployment; import com.spotify.helios.common.descriptors.Deployment;
Expand Down Expand Up @@ -404,7 +403,7 @@ public ListenableFuture<Map<JobId, JobStatus>> jobStatuses(final Set<JobId> jobs
final ConvertResponseToPojo<Map<JobId, JobStatus>> converter = ConvertResponseToPojo.create( final ConvertResponseToPojo<Map<JobId, JobStatus>> converter = ConvertResponseToPojo.create(
TypeFactory.defaultInstance().constructMapType(Map.class, JobId.class, JobStatus.class), TypeFactory.defaultInstance().constructMapType(Map.class, JobId.class, JobStatus.class),
ImmutableSet.of(HTTP_OK)); ImmutableSet.of(HTTP_OK));

return transform(request(uri("/jobs/statuses"), "POST", jobs), converter); return transform(request(uri("/jobs/statuses"), "POST", jobs), converter);
} }


Expand Down Expand Up @@ -514,6 +513,8 @@ public static class Builder {
private boolean sslHostnameVerification = true; private boolean sslHostnameVerification = true;
private ListeningScheduledExecutorService executorService; private ListeningScheduledExecutorService executorService;
private boolean shutDownExecutorOnClose = true; private boolean shutDownExecutorOnClose = true;
private int httpTimeout = 10000;
private long requestRetryTimeout = 60000;


private Builder() { private Builder() {
} }
Expand Down Expand Up @@ -576,6 +577,24 @@ public Builder setShutDownExecutorOnClose(final boolean shutDownExecutorOnClose)
return this; return this;
} }


/**
* Set the per-request HTTP connect/read timeout used when communicating with master. Default is
* 10 seconds.
*/
public Builder setHttpTimeout(final int timeout, TimeUnit unit) {
this.httpTimeout = (int) unit.toMillis(timeout);
return this;
}

/**
* Set the total amount of time for which the HeliosClient will retrying failed requests to the
* Helios masters.
*/
public Builder setRetryTimeout(final int timeout, TimeUnit unit) {
this.requestRetryTimeout = (int) unit.toMillis(timeout);
return this;
}

public HeliosClient build() { public HeliosClient build() {
return new HeliosClient(user, createDispatcher()); return new HeliosClient(user, createDispatcher());
} }
Expand All @@ -594,11 +613,10 @@ private RequestDispatcher createDispatcher() {
final RequestDispatcher dispatcher = new DefaultRequestDispatcher( final RequestDispatcher dispatcher = new DefaultRequestDispatcher(
createHttpConnector(sslHostnameVerification), executorService, shutDownExecutorOnClose); createHttpConnector(sslHostnameVerification), executorService, shutDownExecutorOnClose);


return new RetryingRequestDispatcher(dispatcher, return RetryingRequestDispatcher.forDispatcher(dispatcher)
executorService, .setExecutor(executorService)
new SystemClock(), .setRetryTimeout(requestRetryTimeout, TimeUnit.MILLISECONDS)
5, .build();
TimeUnit.SECONDS);
} }


private HttpConnector createHttpConnector(final boolean sslHostnameVerification) { private HttpConnector createHttpConnector(final boolean sslHostnameVerification) {
Expand All @@ -609,8 +627,8 @@ private HttpConnector createHttpConnector(final boolean sslHostnameVerification)
"no endpoints found to connect to, check your configuration"); "no endpoints found to connect to, check your configuration");
} }


final DefaultHttpConnector connector = new DefaultHttpConnector(endpointIterator, 10000, final DefaultHttpConnector connector =
sslHostnameVerification); new DefaultHttpConnector(endpointIterator, httpTimeout, sslHostnameVerification);


Optional<AgentProxy> agentProxyOpt = Optional.absent(); Optional<AgentProxy> agentProxyOpt = Optional.absent();
try { try {
Expand Down
Expand Up @@ -22,9 +22,11 @@
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;


import com.spotify.helios.common.Clock; import com.spotify.helios.common.Clock;
import com.spotify.helios.common.SystemClock;


import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand All @@ -34,51 +36,48 @@
import java.net.URI; import java.net.URI;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import static java.util.concurrent.TimeUnit.SECONDS;

/** /**
* A {@link RequestDispatcher} that retries. * A {@link RequestDispatcher} that retries.
*/ */
class RetryingRequestDispatcher implements RequestDispatcher { class RetryingRequestDispatcher implements RequestDispatcher {


private static final Logger log = LoggerFactory.getLogger(RetryingRequestDispatcher.class); private static final Logger log = LoggerFactory.getLogger(RetryingRequestDispatcher.class);


private static final long RETRY_TIMEOUT_MILLIS = SECONDS.toMillis(60);

private final ListeningScheduledExecutorService executorService; private final ListeningScheduledExecutorService executorService;
private final RequestDispatcher delegate; private final RequestDispatcher delegate;
private final Clock clock; private final Clock clock;
private final long delay; private final long retryTimeoutMillis;
private final TimeUnit delayTimeUnit; private final long delayMillis;


RetryingRequestDispatcher(final RequestDispatcher delegate, private RetryingRequestDispatcher(final RequestDispatcher delegate,
final ListeningScheduledExecutorService executorService, final ListeningScheduledExecutorService executorService,
final Clock clock, final Clock clock,
final long delay, final long retryTimeoutMillis,
final TimeUnit delayTimeUnit) { final long delayMillis) {
this.delegate = delegate; this.delegate = delegate;
this.executorService = executorService; this.executorService = executorService;
this.clock = clock; this.clock = clock;
this.delay = delay; this.retryTimeoutMillis = retryTimeoutMillis;
this.delayTimeUnit = delayTimeUnit; this.delayMillis = delayMillis;
} }


@Override @Override
public ListenableFuture<Response> request(final URI uri, public ListenableFuture<Response> request(final URI uri,
final String method, final String method,
final byte[] entityBytes, final byte[] entityBytes,
final Map<String, List<String>> headers) { final Map<String, List<String>> headers) {
final long deadline = clock.now().getMillis() + RETRY_TIMEOUT_MILLIS; final long deadline = clock.now().getMillis() + retryTimeoutMillis;
final SettableFuture<Response> future = SettableFuture.create(); final SettableFuture<Response> future = SettableFuture.create();
final Supplier<ListenableFuture<Response>> code = new Supplier<ListenableFuture<Response>>() { final Supplier<ListenableFuture<Response>> code = new Supplier<ListenableFuture<Response>>() {
@Override @Override
public ListenableFuture<Response> get() { public ListenableFuture<Response> get() {
return delegate.request(uri, method, entityBytes, headers); return delegate.request(uri, method, entityBytes, headers);
} }
}; };
startRetry(future, code, deadline, delay, delayTimeUnit); startRetry(future, code, deadline, delayMillis);
return future; return future;
} }


Expand All @@ -90,14 +89,13 @@ public void close() throws IOException {
private void startRetry(final SettableFuture<Response> future, private void startRetry(final SettableFuture<Response> future,
final Supplier<ListenableFuture<Response>> code, final Supplier<ListenableFuture<Response>> code,
final long deadline, final long deadline,
final long delay, final long delayMillis) {
final TimeUnit timeUnit) {


ListenableFuture<Response> codeFuture; ListenableFuture<Response> codeFuture;
try { try {
codeFuture = code.get(); codeFuture = code.get();
} catch (Exception e) { } catch (Exception e) {
handleFailure(future, code, deadline, delay, timeUnit, e); handleFailure(future, code, deadline, delayMillis, e);
return; return;
} }


Expand All @@ -110,32 +108,82 @@ public void onSuccess(Response result) {
@Override @Override
public void onFailure(@NotNull Throwable t) { public void onFailure(@NotNull Throwable t) {
log.warn("Failed to connect, retrying in {} seconds.", log.warn("Failed to connect, retrying in {} seconds.",
timeUnit.convert(delay, TimeUnit.SECONDS)); TimeUnit.MILLISECONDS.toSeconds(delayMillis));
log.debug("Specific reason for connection failure follows", t); log.debug("Specific reason for connection failure follows", t);
handleFailure(future, code, deadline, delay, timeUnit, t); handleFailure(future, code, deadline, delayMillis, t);
} }
}); });
} }


private void handleFailure(final SettableFuture<Response> future, private void handleFailure(final SettableFuture<Response> future,
final Supplier<ListenableFuture<Response>> code, final Supplier<ListenableFuture<Response>> code,
final long deadline, final long deadline,
final long delay, final long delayMillis,
final TimeUnit timeUnit,
final Throwable t) { final Throwable t) {
if (clock.now().getMillis() < deadline) { if (clock.now().getMillis() < deadline) {
if (delay > 0) { if (delayMillis > 0) {
executorService.schedule(new Runnable() { executorService.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
startRetry(future, code, deadline - 1, delay, timeUnit); startRetry(future, code, deadline - 1, delayMillis);
} }
}, delay, timeUnit); }, delayMillis, TimeUnit.MILLISECONDS);
} else { } else {
startRetry(future, code, deadline - 1, delay, timeUnit); startRetry(future, code, deadline - 1, delayMillis);
} }
} else { } else {
future.setException(t); future.setException(t);
} }
} }

static Builder forDispatcher(RequestDispatcher delegate) {
return new Builder(delegate);
}

public static final class Builder {

private final RequestDispatcher delegate;
private ListeningScheduledExecutorService executor;
private Clock clock = new SystemClock();
private long retryTimeoutMillis = 60000;
private long delayMillis = 5000;

private Builder(final RequestDispatcher delegate) {
this.delegate = delegate;
}

public Builder setExecutor(ScheduledExecutorService executorService) {
this.executor = MoreExecutors.listeningDecorator(executorService);
return this;
}

/** Defaults to SystemClock. */
public Builder setClock(Clock clock) {
this.clock = clock;
return this;
}

/**
* Set the total amount of time that the RetryingRequestDispatcher allows before giving up on
* the request. Defaults to 60 seconds.
*/
public Builder setRetryTimeout(long timeout, TimeUnit unit) {
this.retryTimeoutMillis = unit.toMillis(timeout);
return this;
}

/**
* Set how much time the RetryingRequestDispatcher waits to retry the request. Defaults to 5
* seconds.
*/
public Builder setDelayOnFailure(long delay, TimeUnit unit) {
this.delayMillis = unit.toMillis(delay);
return this;
}

public RetryingRequestDispatcher build() {
return new RetryingRequestDispatcher(
delegate, executor, clock, retryTimeoutMillis, delayMillis);
}
}
} }
Expand Up @@ -19,13 +19,12 @@


import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;


import com.spotify.helios.common.Clock; import com.spotify.helios.common.Clock;


import org.hamcrest.CoreMatchers; import org.hamcrest.CoreMatchers;
import org.joda.time.Instant; import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
Expand Down Expand Up @@ -55,17 +54,25 @@ public class RetryingRequestDispatcherTest {
private final RequestDispatcher delegate = mock(RequestDispatcher.class); private final RequestDispatcher delegate = mock(RequestDispatcher.class);
private final Clock clock = mock(Clock.class); private final Clock clock = mock(Clock.class);


private RetryingRequestDispatcher dispatcher;

@Before
public void setUp() {
dispatcher = RetryingRequestDispatcher.forDispatcher(delegate)
.setExecutor(newSingleThreadScheduledExecutor())
.setClock(clock)
.setDelayOnFailure(0, SECONDS)
.build();
}

@Test @Test
public void testSuccess() throws Exception { public void testSuccess() throws Exception {
when(delegate.request(any(URI.class), anyString(), any(byte[].class), when(delegate.request(any(URI.class), anyString(), any(byte[].class),
Matchers.<Map<String, List<String>>>any())) Matchers.<Map<String, List<String>>>any()))
.thenReturn(Futures.<Response>immediateFuture(null)); .thenReturn(Futures.<Response>immediateFuture(null));
final ListeningScheduledExecutorService executorService = mock(
ListeningScheduledExecutorService.class);
when(clock.now()).thenReturn(new Instant(0)); when(clock.now()).thenReturn(new Instant(0));


final RetryingRequestDispatcher dispatcher =
new RetryingRequestDispatcher(delegate, executorService, clock, 0, SECONDS);
dispatcher.request(new URI("http://example.com"), "GET", null, dispatcher.request(new URI("http://example.com"), "GET", null,
Collections.<String, List<String>>emptyMap()); Collections.<String, List<String>>emptyMap());


Expand All @@ -81,12 +88,8 @@ public void testSuccessOnRetry() throws Exception {
.thenReturn(Futures.<Response>immediateFailedFuture(new IOException())) .thenReturn(Futures.<Response>immediateFailedFuture(new IOException()))
.thenReturn(Futures.<Response>immediateFuture(null)); .thenReturn(Futures.<Response>immediateFuture(null));


final ListeningScheduledExecutorService executorService =
MoreExecutors.listeningDecorator(newSingleThreadScheduledExecutor());
when(clock.now()).thenReturn(new Instant(0)); when(clock.now()).thenReturn(new Instant(0));


final RetryingRequestDispatcher dispatcher =
new RetryingRequestDispatcher(delegate, executorService, clock, 0, SECONDS);
dispatcher.request(new URI("http://example.com"), "GET", null, dispatcher.request(new URI("http://example.com"), "GET", null,
Collections.<String, List<String>>emptyMap()); Collections.<String, List<String>>emptyMap());


Expand All @@ -103,12 +106,8 @@ public void testFailureOnTimeout() throws Exception {
.thenReturn(Futures.<Response>immediateFailedFuture(new IOException())) .thenReturn(Futures.<Response>immediateFailedFuture(new IOException()))
.thenReturn(Futures.<Response>immediateFuture(null)); .thenReturn(Futures.<Response>immediateFuture(null));


final ListeningScheduledExecutorService executorService =
MoreExecutors.listeningDecorator(newSingleThreadScheduledExecutor());
when(clock.now()).thenReturn(new Instant(0)).thenReturn(new Instant(80000)); when(clock.now()).thenReturn(new Instant(0)).thenReturn(new Instant(80000));


final RetryingRequestDispatcher dispatcher =
new RetryingRequestDispatcher(delegate, executorService, clock, 0, SECONDS);
final ListenableFuture<Response> future = dispatcher.request( final ListenableFuture<Response> future = dispatcher.request(
new URI("http://example.com"), "GET", null, Collections.<String, List<String>>emptyMap()); new URI("http://example.com"), "GET", null, Collections.<String, List<String>>emptyMap());


Expand Down

0 comments on commit 3e01679

Please sign in to comment.