diff --git a/zanata-war/src/main/java/org/zanata/limits/RateLimitManager.java b/zanata-war/src/main/java/org/zanata/limits/RateLimitManager.java index 0f182b8529..09b2eed411 100644 --- a/zanata-war/src/main/java/org/zanata/limits/RateLimitManager.java +++ b/zanata-war/src/main/java/org/zanata/limits/RateLimitManager.java @@ -42,9 +42,12 @@ public class RateLimitManager implements Introspectable { private final Cache activeCallers = CacheBuilder .newBuilder().maximumSize(100).build(); - @Getter(AccessLevel.PACKAGE) + @Getter(AccessLevel.PROTECTED) @VisibleForTesting - private RestCallLimiter.RateLimitConfig limitConfig; + private int maxConcurrent; + @Getter(AccessLevel.PROTECTED) + @VisibleForTesting + private int maxActive; public static RateLimitManager getInstance() { return (RateLimitManager) Component @@ -60,21 +63,29 @@ private void readRateLimitState() { ApplicationConfiguration appConfig = (ApplicationConfiguration) Component .getInstance("applicationConfiguration"); - int maxConcurrent = appConfig.getMaxConcurrentRequestsPerApiKey(); - int maxActive = appConfig.getMaxActiveRequestsPerApiKey(); - limitConfig = - new RestCallLimiter.RateLimitConfig(maxConcurrent, maxActive); + maxConcurrent = appConfig.getMaxConcurrentRequestsPerApiKey(); + maxActive = appConfig.getMaxActiveRequestsPerApiKey(); } @Observer({ ApplicationConfiguration.EVENT_CONFIGURATION_CHANGED }) public void configurationChanged() { - RestCallLimiter.RateLimitConfig old = limitConfig; + int oldConcurrent = maxConcurrent; + int oldActive = maxActive; + boolean changed = false; readRateLimitState(); - if (!Objects.equal(old, limitConfig)) { - log.info("application configuration changed. Old: {}, New: {}", - old, limitConfig); + if (oldConcurrent != maxConcurrent) { + log.info("application configuration changed. Old concurrent: {}, New concurrent: {}", + oldConcurrent, maxConcurrent); + changed = true; + } + if (oldActive != maxActive) { + log.info("application configuration changed. Old active: {}, New active: {}", + oldActive, maxActive); + changed = true; + } + if (changed) { for (RestCallLimiter restCallLimiter : activeCallers.asMap().values()) { - restCallLimiter.changeConfig(limitConfig); + restCallLimiter.changeConfig(maxConcurrent, maxActive); } } } @@ -116,29 +127,25 @@ private Iterable peekCurrentBuckets() { public RestCallLimiter getLimiter(String apiKey) { try { - return activeCallers.get(apiKey, new RestRateLimiterLoader( - getLimitConfig(), apiKey)); + return activeCallers.get(apiKey, new RestRateLimiterLoader(apiKey)); } catch (ExecutionException e) { throw new RuntimeException(e); } } - private static class RestRateLimiterLoader implements + private class RestRateLimiterLoader implements Callable { - private final RestCallLimiter.RateLimitConfig limitConfig; private final String apiKey; - public RestRateLimiterLoader( - RestCallLimiter.RateLimitConfig limitConfig, String apiKey) { - this.limitConfig = limitConfig; + public RestRateLimiterLoader(String apiKey) { this.apiKey = apiKey; } @Override public RestCallLimiter call() throws Exception { log.debug("creating rate limiter for api key: {}", apiKey); - return new RestCallLimiter(limitConfig); + return new RestCallLimiter(getMaxConcurrent(), getMaxActive()); } } } diff --git a/zanata-war/src/main/java/org/zanata/limits/RestCallLimiter.java b/zanata-war/src/main/java/org/zanata/limits/RestCallLimiter.java index a3439134cb..6246260b07 100644 --- a/zanata-war/src/main/java/org/zanata/limits/RestCallLimiter.java +++ b/zanata-war/src/main/java/org/zanata/limits/RestCallLimiter.java @@ -5,9 +5,6 @@ import com.google.common.base.Objects; import com.google.common.base.Throwables; -import lombok.EqualsAndHashCode; -import lombok.RequiredArgsConstructor; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; /** @@ -18,14 +15,14 @@ class RestCallLimiter { private volatile Semaphore maxConcurrentSemaphore; private volatile Semaphore maxActiveSemaphore; - private RateLimitConfig limitConfig; - private volatile LimitChange activeChange; - private volatile LimitChange concurrentChange; - - RestCallLimiter(RateLimitConfig limitConfig) { - this.limitConfig = limitConfig; - this.maxConcurrentSemaphore = makeSemaphore(limitConfig.maxConcurrent); - this.maxActiveSemaphore = makeSemaphore(limitConfig.maxActive); + private int maxConcurrent; + private int maxActive; + + RestCallLimiter(int maxConcurrent, int maxActive) { + this.maxConcurrent = maxConcurrent; + this.maxActive = maxActive; + this.maxConcurrentSemaphore = makeSemaphore(maxConcurrent); + this.maxActiveSemaphore = makeSemaphore(maxActive); } /** @@ -36,7 +33,6 @@ class RestCallLimiter { * task to perform after acquire */ public boolean tryAcquireAndRun(Runnable taskAfterAcquire) { - applyConcurrentPermitChangeIfApplicable(); // hang on to the semaphore, so that we can be certain of // releasing the same one we acquired final Semaphore concSem = maxConcurrentSemaphore; @@ -60,7 +56,6 @@ public boolean tryAcquireAndRun(Runnable taskAfterAcquire) { } private boolean acquireActiveAndRatePermit(Runnable taskAfterAcquire) { - applyActivePermitChangeIfApplicable(); log.debug("before acquire [active] semaphore:{}", maxActiveSemaphore); try { // hang on to the semaphore, so that we can be certain of @@ -84,60 +79,24 @@ private boolean acquireActiveAndRatePermit(Runnable taskAfterAcquire) { } } - private void applyConcurrentPermitChangeIfApplicable() { - if (concurrentChange != null) { - synchronized (this) { - if (concurrentChange != null) { - log.debug( - "change max [concurrent] semaphore with new permit ", - concurrentChange.newLimit); - maxConcurrentSemaphore = - makeSemaphore(concurrentChange.newLimit); - concurrentChange = null; - } - } + public synchronized void changeConfig(int maxConcurrent, int maxActive) { + if (maxConcurrent != this.maxConcurrent) { + log.debug( + "change max [concurrent] semaphore with new permit {}", + maxConcurrent); + maxConcurrentSemaphore = + makeSemaphore(maxConcurrent); + this.maxConcurrent = maxConcurrent; } - } - - private void applyActivePermitChangeIfApplicable() { - if (activeChange != null) { - synchronized (this) { - if (activeChange != null) { - // since this block is synchronized, there won't be new - // permit acquired from maxActiveSemaphore other than this - // thread. It ought to be the last and only one entering in - // this block. It will replace semaphore and old blocked - // threads will release on old semaphore - log.debug( - "change max [active] semaphore with new permit {}", - activeChange.newLimit); - maxActiveSemaphore = makeSemaphore(activeChange.newLimit); - activeChange = null; - } - } + if (maxActive != this.maxActive) { + log.debug( + "change max [active] semaphore with new permit {}", + maxActive); + maxActiveSemaphore = makeSemaphore(maxActive); + this.maxActive = maxActive; } } - public void changeConfig(RateLimitConfig newLimitConfig) { - if (newLimitConfig.maxConcurrent != limitConfig.maxConcurrent) { - changeConcurrentLimit(limitConfig.maxConcurrent, - newLimitConfig.maxConcurrent); - } - if (newLimitConfig.maxActive != limitConfig.maxActive) { - changeActiveLimit(limitConfig.maxActive, newLimitConfig.maxActive); - } - limitConfig = newLimitConfig; - } - - protected synchronized void - changeConcurrentLimit(int oldLimit, int newLimit) { - this.concurrentChange = new LimitChange(oldLimit, newLimit); - } - - protected synchronized void changeActiveLimit(int oldLimit, int newLimit) { - this.activeChange = new LimitChange(oldLimit, newLimit); - } - public int availableConcurrentPermit() { return maxConcurrentSemaphore.availablePermits(); } @@ -167,33 +126,23 @@ public String toString() { .toString(); } - @RequiredArgsConstructor - @EqualsAndHashCode - @ToString - public static class RateLimitConfig { - private final int maxConcurrent; - private final int maxActive; - } - - @RequiredArgsConstructor - @ToString - private static class LimitChange { - private final int oldLimit; - private final int newLimit; - } - /** * Overrides tryAcquire method to return true all the time. */ private static class NoLimitSemaphore extends Semaphore { private static final long serialVersionUID = 1L; private static final NoLimitSemaphore INSTANCE = - new NoLimitSemaphore(0); + new NoLimitSemaphore(); - private NoLimitSemaphore(int permits) { + private NoLimitSemaphore() { super(1); } + @Override + public void release() { + // do nothing + } + @Override public boolean tryAcquire() { return true; diff --git a/zanata-war/src/test/java/org/zanata/limits/RateLimitingProcessorTest.java b/zanata-war/src/test/java/org/zanata/limits/RateLimitingProcessorTest.java index 1bc04bc2ff..594715f7df 100644 --- a/zanata-war/src/test/java/org/zanata/limits/RateLimitingProcessorTest.java +++ b/zanata-war/src/test/java/org/zanata/limits/RateLimitingProcessorTest.java @@ -73,8 +73,8 @@ public void willSkipIfRateLimitAreAllZero() throws Exception { public void willFirstTryAcquire() throws InterruptedException, IOException, ServletException { - when(rateLimitManager.getLimitConfig()).thenReturn( - new RestCallLimiter.RateLimitConfig(1, 1)); + when(rateLimitManager.getMaxConcurrent()).thenReturn(1); + when(rateLimitManager.getMaxActive()).thenReturn(1); when(applicationConfiguration.getMaxConcurrentRequestsPerApiKey()).thenReturn(1); doAnswer(new Answer() { @Override diff --git a/zanata-war/src/test/java/org/zanata/limits/RestCallLimiterTest.java b/zanata-war/src/test/java/org/zanata/limits/RestCallLimiterTest.java index f1b4c5fc87..1f0a5c080f 100644 --- a/zanata-war/src/test/java/org/zanata/limits/RestCallLimiterTest.java +++ b/zanata-war/src/test/java/org/zanata/limits/RestCallLimiterTest.java @@ -50,7 +50,7 @@ public class RestCallLimiterTest { private Logger testeeLogger = LogManager.getLogger(RestCallLimiter.class); @Mock - private Runnable runntable; + private Runnable runnable; @BeforeClass public void beforeClass() { @@ -62,22 +62,18 @@ public void beforeClass() { @BeforeMethod public void beforeMethod() { MockitoAnnotations.initMocks(this); - limiter = - new RestCallLimiter(new RestCallLimiter.RateLimitConfig( - maxConcurrent, maxActive)); + limiter = new RestCallLimiter(maxConcurrent, maxActive); // the first time this method is executed it seems to cause 10-30ms // overhead by itself (moving things to heap and register classes maybe) // This will reduce that overhead for actual tests - limiter.tryAcquireAndRun(runntable); + limiter.tryAcquireAndRun(runnable); } @Test public void canOnlyHaveMaximumNumberOfConcurrentRequest() throws InterruptedException, ExecutionException { // we don't limit active requests - limiter = - new RestCallLimiter(new RestCallLimiter.RateLimitConfig( - maxConcurrent, maxConcurrent)); + limiter = new RestCallLimiter(maxConcurrent, maxConcurrent); // to ensure threads are actually running concurrently runnableWillTakeTime(20); @@ -85,7 +81,7 @@ public void canOnlyHaveMaximumNumberOfConcurrentRequest() Callable task = new Callable() { @Override public Boolean call() throws Exception { - return limiter.tryAcquireAndRun(runntable); + return limiter.tryAcquireAndRun(runnable); } }; int numOfThreads = maxConcurrent + 1; @@ -162,22 +158,21 @@ public Object answer(InvocationOnMock invocation) throws Throwable { TimeUnit.MILLISECONDS); return null; } - }).when(runntable).run(); + }).when(runnable).run(); } @Test - private void changeMaxConcurrentLimitWillTakeEffectImmediately() + public void changeMaxConcurrentLimitWillTakeEffectImmediately() throws ExecutionException, InterruptedException { runnableWillTakeTime(10); // we start off with only 1 concurrent permit - limiter = - new RestCallLimiter(new RestCallLimiter.RateLimitConfig(1, 10)); + limiter = new RestCallLimiter(1, 10); Callable task = new Callable() { @Override public Boolean call() throws Exception { - return limiter.tryAcquireAndRun(runntable); + return limiter.tryAcquireAndRun(runnable); } }; @@ -188,7 +183,7 @@ public Boolean call() throws Exception { assertThat(limiter.availableConcurrentPermit(), Matchers.is(1)); // change permit to match number of threads - limiter.changeConcurrentLimit(1, numOfThreads); + limiter.changeConfig(1, numOfThreads); List resultAfterChange = submitConcurrentTasksAndGetResult(task, numOfThreads); @@ -200,18 +195,17 @@ public Boolean call() throws Exception { @Test public void changeMaxActiveLimitWhenNoBlockedThreads() { - limiter = - new RestCallLimiter(new RestCallLimiter.RateLimitConfig(3, 3)); - limiter.tryAcquireAndRun(runntable); + limiter = new RestCallLimiter(3, 3); + limiter.tryAcquireAndRun(runnable); - limiter.changeActiveLimit(3, 2); + limiter.changeConfig(3, 2); // change won't happen until next request comes in - limiter.tryAcquireAndRun(runntable); + limiter.tryAcquireAndRun(runnable); assertThat(limiter.availableActivePermit(), Matchers.is(2)); - limiter.changeActiveLimit(2, 1); + limiter.changeConfig(2, 1); - limiter.tryAcquireAndRun(runntable); + limiter.tryAcquireAndRun(runnable); assertThat(limiter.availableActivePermit(), Matchers.is(1)); } @@ -219,8 +213,7 @@ public void changeMaxActiveLimitWhenNoBlockedThreads() { public void changeMaxActiveLimitWhenHasBlockedThreads() throws InterruptedException { // Given: only 2 active requests allowed - limiter = - new RestCallLimiter(new RestCallLimiter.RateLimitConfig(10, 2)); + limiter = new RestCallLimiter(10, 2); // When: below requests are fired simultaneously // 3 requests (each takes 20ms) and 1 request should block @@ -237,7 +230,7 @@ public void changeMaxActiveLimitWhenHasBlockedThreads() public Long call() throws Exception { // to ensure it happens when there is a blocked request Uninterruptibles.sleepUninterruptibly(5, TimeUnit.MILLISECONDS); - limiter.changeActiveLimit(2, 3); + limiter.changeConfig(2, 3); return -10L; } }; @@ -278,10 +271,10 @@ public Long call() throws Exception { @Test public void willReleaseSemaphoreWhenThereIsException() throws IOException, ServletException { - doThrow(new RuntimeException("bad")).when(runntable).run(); + doThrow(new RuntimeException("bad")).when(runnable).run(); try { - limiter.tryAcquireAndRun(runntable); + limiter.tryAcquireAndRun(runnable); } catch (Exception e) { // I know } @@ -294,12 +287,11 @@ public void willReleaseSemaphoreWhenThereIsException() throws IOException, @Test public void zeroPermitMeansNoLimit() { limiter = - new RestCallLimiter( - new RestCallLimiter.RateLimitConfig(0, 0)); + new RestCallLimiter(0, 0); - assertThat(limiter.tryAcquireAndRun(runntable), Matchers.is(true)); - assertThat(limiter.tryAcquireAndRun(runntable), Matchers.is(true)); - assertThat(limiter.tryAcquireAndRun(runntable), Matchers.is(true)); + assertThat(limiter.tryAcquireAndRun(runnable), Matchers.is(true)); + assertThat(limiter.tryAcquireAndRun(runnable), Matchers.is(true)); + assertThat(limiter.tryAcquireAndRun(runnable), Matchers.is(true)); } /** @@ -318,7 +310,7 @@ public Long call() throws Exception { private long tryAcquireAndMeasureTime() { Stopwatch stopwatch = new Stopwatch(); stopwatch.start(); - limiter.tryAcquireAndRun(runntable); + limiter.tryAcquireAndRun(runnable); stopwatch.stop(); long timeSpent = stopwatch.elapsedMillis(); log.debug("real time try acquire and run task takes: {}", timeSpent);