Skip to content

Commit

Permalink
Use proper executor for ConcurrencyLimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
spkrka committed Aug 28, 2018
1 parent 714b8ee commit d79facc
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 24 deletions.
10 changes: 3 additions & 7 deletions src/main/java/com/spotify/futures/AsyncRetrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,9 @@ private <T> void handleFailure(final SettableFuture<T> future,
final Predicate<T> retryCondition,
Throwable t) {
if (retries > 0) {
if (delay > 0) {
executorService.schedule(() ->
startRetry(future, code, retries - 1, delay, timeUnit, retryCondition),
delay, timeUnit);
} else {
startRetry(future, code, retries - 1, delay, timeUnit, retryCondition);
}
executorService.schedule(() ->
startRetry(future, code, retries - 1, delay, timeUnit, retryCondition),
delay, timeUnit);
} else {
future.setException(t);
}
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/com/spotify/futures/ConcurrencyLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;

/**
Expand All @@ -36,13 +36,15 @@
*/
public final class ConcurrencyLimiter<T> implements FutureJobInvoker<T> {

private final Executor executor;
private final BlockingQueue<Job<T>> queue;
private final Semaphore limit;
private final int maxQueueSize;

private final int maxConcurrency;

private ConcurrencyLimiter(int maxConcurrency, int maxQueueSize) {
private ConcurrencyLimiter(final Executor executor, int maxConcurrency, int maxQueueSize) {
this.executor = executor;
this.maxConcurrency = maxConcurrency;
this.maxQueueSize = maxQueueSize;
Preconditions.checkArgument(maxConcurrency > 0);
Expand All @@ -53,13 +55,15 @@ private ConcurrencyLimiter(int maxConcurrency, int maxQueueSize) {

/**
*
* @param executor the executor to run callables on.
* @param maxConcurrency maximum number of futures in progress,
* @param maxQueueSize maximum number of jobs in queue. This is a soft bound and may be
* temporarily exceeded if add() is called concurrently.
* @return a new concurrency limiter
*/
public static <T> ConcurrencyLimiter<T> create(int maxConcurrency, int maxQueueSize) {
return new ConcurrencyLimiter<>(maxConcurrency, maxQueueSize);
public static <T> ConcurrencyLimiter<T> create(
final Executor executor, int maxConcurrency, int maxQueueSize) {
return new ConcurrencyLimiter<>(executor, maxConcurrency, maxQueueSize);
}

/**
Expand All @@ -82,7 +86,7 @@ public ListenableFuture<T> add(Callable<? extends ListenableFuture<T>> callable)
final String message = "Queue size has reached capacity: " + maxQueueSize;
return Futures.immediateFailedFuture(new CapacityReachedException(message));
}
pump();
executor.execute(this::pump);
return response;
}

Expand Down Expand Up @@ -177,7 +181,7 @@ public void onFailure(Throwable t) {
response.setException(t);
pump();
}
}, MoreExecutors.directExecutor());
}, executor);
}

private static class Job<T> {
Expand Down
22 changes: 11 additions & 11 deletions src/test/java/com/spotify/futures/ConcurrencyLimiterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,23 @@ public class ConcurrencyLimiterTest {

@Test(expected = IllegalArgumentException.class)
public void testTooLowConcurrency() throws Exception {
ConcurrencyLimiter.create(0, 10);
ConcurrencyLimiter.create(MoreExecutors.directExecutor(), 0, 10);
}

@Test(expected = IllegalArgumentException.class)
public void testTooLowQueueSize() throws Exception {
ConcurrencyLimiter.create(10, 0);
ConcurrencyLimiter.create(MoreExecutors.directExecutor(), 10, 0);
}

@Test(expected = NullPointerException.class)
public void testNullJob() throws Exception {
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(1, 10);
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(MoreExecutors.directExecutor(), 1, 10);
limiter.add(null);
}

@Test
public void testJobReturnsNull() throws Exception {
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(1, 10);
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(MoreExecutors.directExecutor(), 1, 10);
final ListenableFuture<String> response = limiter.add(job(null));
assertTrue(response.isDone());
final Throwable exception = FuturesExtra.getException(response);
Expand All @@ -63,7 +63,7 @@ public void testJobReturnsNull() throws Exception {

@Test
public void testJobThrows() throws Exception {
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(1, 10);
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(MoreExecutors.directExecutor(), 1, 10);
final ListenableFuture<String> response = limiter.add(() -> {
throw new IllegalStateException();
});
Expand All @@ -75,7 +75,7 @@ public void testJobThrows() throws Exception {

@Test
public void testJobReturnsFailure() throws Exception {
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(1, 10);
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(MoreExecutors.directExecutor(), 1, 10);
final ListenableFuture<String> response = limiter.add(job(Futures.<String>immediateFailedFuture(new IllegalStateException())));

assertTrue(response.isDone());
Expand All @@ -85,7 +85,7 @@ public void testJobReturnsFailure() throws Exception {

@Test
public void testCancellation() throws Exception {
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(2, 10);
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(MoreExecutors.directExecutor(), 2, 10);
final SettableFuture<String> request1 = SettableFuture.create();
final SettableFuture<String> request2 = SettableFuture.create();

Expand Down Expand Up @@ -129,7 +129,7 @@ public void testCancellation() throws Exception {

@Test
public void testSimple() throws Exception {
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(2, 10);
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(MoreExecutors.directExecutor(), 2, 10);
final SettableFuture<String> request1 = SettableFuture.create();
final SettableFuture<String> request2 = SettableFuture.create();
final SettableFuture<String> request3 = SettableFuture.create();
Expand Down Expand Up @@ -169,7 +169,7 @@ public void testSimple() throws Exception {
public void testLongRunning() throws Exception {
final AtomicInteger activeCount = new AtomicInteger();
final AtomicInteger maxCount = new AtomicInteger();
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(10, 100000);
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(MoreExecutors.directExecutor(), 10, 100000);
List<CountingJob> jobs = Lists.newArrayList();
List<ListenableFuture<String>> responses = Lists.newArrayList();
for (int i = 0; i < 100000; i++) {
Expand Down Expand Up @@ -200,7 +200,7 @@ public void testLongRunning() throws Exception {

@Test
public void testQueueSize() throws Exception {
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(10, 10);
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(MoreExecutors.directExecutor(), 10, 10);
for (int i = 0; i < 20; i++) {
limiter.add(job(SettableFuture.<String>create()));
}
Expand All @@ -215,7 +215,7 @@ public void testQueueSize() throws Exception {
public void testQueueSizeCounter() throws Exception {
final SettableFuture<String> future = SettableFuture.create();

final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(10, 10);
final ConcurrencyLimiter<String> limiter = ConcurrencyLimiter.create(MoreExecutors.directExecutor(), 10, 10);
for (int i = 0; i < 20; i++) {
limiter.add(job(future));
}
Expand Down

0 comments on commit d79facc

Please sign in to comment.