Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,16 +11,20 @@ | |
import org.apache.log4j.Level; | ||
import org.apache.log4j.LogManager; | ||
import org.hamcrest.Matchers; | ||
import org.mockito.Mock; | ||
import org.mockito.MockitoAnnotations; | ||
import org.testng.annotations.BeforeMethod; | ||
import org.testng.annotations.Test; | ||
import com.google.common.base.Function; | ||
import com.google.common.base.Stopwatch; | ||
import com.google.common.base.Throwables; | ||
import com.google.common.base.Ticker; | ||
import com.google.common.collect.Lists; | ||
import com.google.common.util.concurrent.Uninterruptibles; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.mockito.Mockito.*; | ||
|
||
/** | ||
* @author Patrick Huang <a | ||
|
@@ -29,32 +33,41 @@ | |
@Test(groups = "unit-tests") | ||
@Slf4j | ||
public class LeakyBucketTest { | ||
@Mock | ||
private Ticker ticker; | ||
|
||
@BeforeMethod | ||
public void beforeMethod() { | ||
// LogManager.getLogger(LeakyBucket.class.getPackage().getName()) | ||
// .setLevel(Level.DEBUG); | ||
LogManager.getLogger(LeakyBucket.class.getPackage().getName()) | ||
.setLevel(Level.DEBUG); | ||
This comment has been minimized.
Sorry, something went wrong. |
||
MockitoAnnotations.initMocks(this); | ||
} | ||
|
||
@Test | ||
public void willWaitUntilRefill() throws InterruptedException { | ||
int refillDuration = 20; | ||
TimeUnit refillTimeUnit = TimeUnit.MILLISECONDS; | ||
LeakyBucket bucket = new LeakyBucket(1, refillDuration, refillTimeUnit); | ||
LeakyBucket bucket = | ||
new LeakyBucket(1, refillDuration, refillTimeUnit, ticker); | ||
|
||
assertThat(bucket.tryAcquire(), Matchers.is(true)); | ||
assertThat(bucket.tryAcquire(), Matchers.is(false)); | ||
long timeOverRefillDuration = | ||
TimeUnit.NANOSECONDS.convert(refillDuration, refillTimeUnit); | ||
|
||
Thread.sleep(TimeUnit.MILLISECONDS.convert(refillDuration, refillTimeUnit)); | ||
// 3nd call when the ticker is read we mock the time to pass refill | ||
// duration | ||
when(ticker.read()).thenReturn(0L, 1L, timeOverRefillDuration); | ||
This comment has been minimized.
Sorry, something went wrong.
seanf
Contributor
|
||
|
||
assertThat(bucket.tryAcquire(), Matchers.is(true)); | ||
assertThat(bucket.tryAcquire(), Matchers.is(false)); | ||
assertThat(bucket.tryAcquire(), Matchers.is(true)); | ||
} | ||
|
||
@Test | ||
public void concurrentTest() throws InterruptedException { | ||
int refillDuration = 10; | ||
TimeUnit refillTimeUnit = TimeUnit.MILLISECONDS; | ||
final LeakyBucket bucket = | ||
new LeakyBucket(1, refillDuration, refillTimeUnit); | ||
new LeakyBucket(1, refillDuration, refillTimeUnit, ticker); | ||
Callable<Boolean> callable = new Callable<Boolean>() { | ||
|
||
@Override | ||
|
@@ -72,8 +85,12 @@ public Boolean call() throws Exception { | |
List<Boolean> result = getFutureResult(futures); | ||
assertThat(result, Matchers.containsInAnyOrder(true, false, false)); | ||
|
||
// wait enough time and try again | ||
Uninterruptibles.sleepUninterruptibly(refillDuration, refillTimeUnit); | ||
// by default mock will return 0 for ticker.read. | ||
// here we simulate that we have waited enough time and try again | ||
long timeOverRefillDuration = | ||
TimeUnit.NANOSECONDS.convert(refillDuration, refillTimeUnit); | ||
when(ticker.read()).thenReturn(timeOverRefillDuration); | ||
|
||
List<Future<Boolean>> callAgain = executorService.invokeAll(callables); | ||
assertThat(getFutureResult(callAgain), | ||
Matchers.containsInAnyOrder(true, false, false)); | ||
|
@@ -94,8 +111,7 @@ public Boolean apply(Future<Boolean> input) { | |
} | ||
|
||
@Test | ||
public void willMakeUpTheRefillWhenTimePassed() | ||
throws InterruptedException { | ||
public void willMakeUpTheRefillWhenTimePassed() throws InterruptedException { | ||
LeakyBucket bucket = new LeakyBucket(2, 20, TimeUnit.MILLISECONDS); | ||
|
||
assertThat(bucket.tryAcquire(), Matchers.is(true)); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,9 @@ | |
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.Semaphore; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
import javax.servlet.ServletException; | ||
|
||
|
@@ -51,22 +53,19 @@ public class RestCallLimiterTest { | |
|
||
@Mock | ||
private Runnable runnable; | ||
private CountBlockingSemaphore countBlockingSemaphore; | ||
|
||
@BeforeClass | ||
public void beforeClass() { | ||
// set logging to debug | ||
// testeeLogger.setLevel(Level.DEBUG); | ||
testeeLogger.setLevel(Level.DEBUG); | ||
testLogger.setLevel(Level.DEBUG); | ||
This comment has been minimized.
Sorry, something went wrong.
seanf
Contributor
|
||
} | ||
|
||
@BeforeMethod | ||
public void beforeMethod() { | ||
MockitoAnnotations.initMocks(this); | ||
limiter = new RestCallLimiter(maxConcurrent, maxActive); | ||
// the first time this method is executed it seems to cause 10-30ms | ||
// overhead by itself (moving things to heap and register classes maybe) | ||
// This will reduce that overhead for actual tests | ||
limiter.tryAcquireAndRun(runnable); | ||
} | ||
|
||
@Test | ||
|
@@ -97,7 +96,7 @@ public boolean apply(Boolean input) { | |
} | ||
}); | ||
assertThat(successRequest, | ||
Matchers.<Boolean> iterableWithSize(maxConcurrent)); | ||
Matchers.<Boolean>iterableWithSize(maxConcurrent)); | ||
// last request which exceeds the limit will fail to get permit | ||
assertThat(result, Matchers.hasItem(false)); | ||
} | ||
|
@@ -106,7 +105,6 @@ static <T> List<T> submitConcurrentTasksAndGetResult(Callable<T> task, | |
int numOfThreads) throws InterruptedException, ExecutionException { | ||
List<Callable<T>> tasks = Collections.nCopies(numOfThreads, task); | ||
|
||
|
||
ExecutorService service = Executors.newFixedThreadPool(numOfThreads); | ||
|
||
List<Future<T>> futures = service.invokeAll(tasks); | ||
|
@@ -115,8 +113,7 @@ static <T> List<T> submitConcurrentTasksAndGetResult(Callable<T> task, | |
public T apply(Future<T> input) { | ||
try { | ||
return input.get(); | ||
} | ||
catch (Exception e) { | ||
} catch (Exception e) { | ||
throw Throwables.propagate(e); | ||
} | ||
} | ||
|
@@ -126,28 +123,32 @@ public T apply(Future<T> input) { | |
@Test | ||
public void canOnlyHaveMaxActiveConcurrentRequest() | ||
throws InterruptedException, ExecutionException { | ||
countBlockingSemaphore = new CountBlockingSemaphore(maxActive); | ||
limiter = | ||
new RestCallLimiter(maxConcurrent, maxActive) | ||
.changeActiveSemaphore(countBlockingSemaphore); | ||
|
||
// Given: each thread will take some time to do its job | ||
final int timeSpentDoingWork = 20; | ||
runnableWillTakeTime(timeSpentDoingWork); | ||
|
||
// When: max concurrent threads are accessing simultaneously | ||
Callable<Long> callable = | ||
taskToAcquireAndMeasureTime(); | ||
Callable<Boolean> callable = new Callable<Boolean>() { | ||
|
||
@Override | ||
public Boolean call() throws Exception { | ||
return limiter.tryAcquireAndRun(runnable); | ||
} | ||
}; | ||
|
||
// Then: only max active threads will be served immediately while others | ||
// will block until them finish | ||
List<Long> timeBlockedInMillis = | ||
List<Boolean> requests = | ||
submitConcurrentTasksAndGetResult(callable, maxConcurrent); | ||
log.debug("result: {}", timeBlockedInMillis); | ||
Iterable<Long> blocked = | ||
Iterables.filter(timeBlockedInMillis, new Predicate<Long>() { | ||
@Override | ||
public boolean apply(Long input) { | ||
return input >= timeSpentDoingWork * 2; | ||
} | ||
}); | ||
assertThat(blocked, | ||
Matchers.<Long> iterableWithSize(maxConcurrent - maxActive)); | ||
log.debug("result: {}", requests); | ||
|
||
assertThat(countBlockingSemaphore.numOfBlockedThreads(), | ||
Matchers.equalTo(maxConcurrent - maxActive)); | ||
} | ||
|
||
void runnableWillTakeTime(final int timeSpentDoingWork) { | ||
|
@@ -219,8 +220,7 @@ public void changeMaxActiveLimitWhenHasBlockedThreads() | |
// 3 requests (each takes 20ms) and 1 request should block | ||
final int timeSpentDoingWork = 20; | ||
runnableWillTakeTime(timeSpentDoingWork); | ||
Callable<Long> callable = | ||
taskToAcquireAndMeasureTime(); | ||
Callable<Long> callable = taskToAcquireAndMeasureTime(); | ||
List<Callable<Long>> requests = Collections.nCopies(3, callable); | ||
// 1 task to update the active permit with 5ms delay | ||
// (so that it will happen while there is a blocked request) | ||
|
@@ -286,8 +286,7 @@ public void willReleaseSemaphoreWhenThereIsException() throws IOException, | |
|
||
@Test | ||
public void zeroPermitMeansNoLimit() { | ||
limiter = | ||
new RestCallLimiter(0, 0); | ||
limiter = new RestCallLimiter(0, 0); | ||
|
||
assertThat(limiter.tryAcquireAndRun(runnable), Matchers.is(true)); | ||
assertThat(limiter.tryAcquireAndRun(runnable), Matchers.is(true)); | ||
|
@@ -335,4 +334,27 @@ private static long roundToTens(long arg) { | |
return arg / 10 * 10; | ||
} | ||
|
||
private static class CountBlockingSemaphore extends Semaphore { | ||
private static final long serialVersionUID = 1L; | ||
private AtomicInteger blockCounter = new AtomicInteger(0); | ||
|
||
public CountBlockingSemaphore(Integer permits) { | ||
super(permits); | ||
} | ||
|
||
@Override | ||
public boolean tryAcquire(long timeout, TimeUnit unit) | ||
throws InterruptedException { | ||
boolean got = tryAcquire(); | ||
if (!got) { | ||
blockCounter.incrementAndGet(); | ||
} | ||
// we don't care the result | ||
return true; | ||
} | ||
|
||
public int numOfBlockedThreads() { | ||
return blockCounter.get(); | ||
} | ||
} | ||
} |
I don't think you want to check this in.