Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
Simplify configuration of RestCallLimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
seanf committed Apr 3, 2014
1 parent ffc34b5 commit 66dfc2a
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 134 deletions.
45 changes: 26 additions & 19 deletions zanata-war/src/main/java/org/zanata/limits/RateLimitManager.java
Expand Up @@ -42,9 +42,12 @@ public class RateLimitManager implements Introspectable {
private final Cache<String, RestCallLimiter> activeCallers = CacheBuilder
.newBuilder().maximumSize(100).build();

@Getter(AccessLevel.PACKAGE)
@Getter(AccessLevel.PROTECTED)
@VisibleForTesting
private RestCallLimiter.RateLimitConfig limitConfig;
private int maxConcurrent;
@Getter(AccessLevel.PROTECTED)
@VisibleForTesting
private int maxActive;

public static RateLimitManager getInstance() {
return (RateLimitManager) Component
Expand All @@ -60,21 +63,29 @@ private void readRateLimitState() {
ApplicationConfiguration appConfig =
(ApplicationConfiguration) Component
.getInstance("applicationConfiguration");
int maxConcurrent = appConfig.getMaxConcurrentRequestsPerApiKey();
int maxActive = appConfig.getMaxActiveRequestsPerApiKey();
limitConfig =
new RestCallLimiter.RateLimitConfig(maxConcurrent, maxActive);
maxConcurrent = appConfig.getMaxConcurrentRequestsPerApiKey();
maxActive = appConfig.getMaxActiveRequestsPerApiKey();
}

@Observer({ ApplicationConfiguration.EVENT_CONFIGURATION_CHANGED })
public void configurationChanged() {
RestCallLimiter.RateLimitConfig old = limitConfig;
int oldConcurrent = maxConcurrent;
int oldActive = maxActive;
boolean changed = false;
readRateLimitState();
if (!Objects.equal(old, limitConfig)) {
log.info("application configuration changed. Old: {}, New: {}",
old, limitConfig);
if (oldConcurrent != maxConcurrent) {
log.info("application configuration changed. Old concurrent: {}, New concurrent: {}",
oldConcurrent, maxConcurrent);
changed = true;
}
if (oldActive != maxActive) {
log.info("application configuration changed. Old active: {}, New active: {}",
oldActive, maxActive);
changed = true;
}
if (changed) {
for (RestCallLimiter restCallLimiter : activeCallers.asMap().values()) {
restCallLimiter.changeConfig(limitConfig);
restCallLimiter.changeConfig(maxConcurrent, maxActive);
}
}
}
Expand Down Expand Up @@ -116,29 +127,25 @@ private Iterable<String> peekCurrentBuckets() {

public RestCallLimiter getLimiter(String apiKey) {
try {
return activeCallers.get(apiKey, new RestRateLimiterLoader(
getLimitConfig(), apiKey));
return activeCallers.get(apiKey, new RestRateLimiterLoader(apiKey));
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

private static class RestRateLimiterLoader implements
private class RestRateLimiterLoader implements
Callable<RestCallLimiter> {
private final RestCallLimiter.RateLimitConfig limitConfig;
private final String apiKey;

public RestRateLimiterLoader(
RestCallLimiter.RateLimitConfig limitConfig, String apiKey) {
this.limitConfig = limitConfig;
public RestRateLimiterLoader(String apiKey) {
this.apiKey = apiKey;
}

@Override
public RestCallLimiter call() throws Exception {
log.debug("creating rate limiter for api key: {}", apiKey);
return new RestCallLimiter(limitConfig);
return new RestCallLimiter(getMaxConcurrent(), getMaxActive());
}
}
}
109 changes: 29 additions & 80 deletions zanata-war/src/main/java/org/zanata/limits/RestCallLimiter.java
Expand Up @@ -5,9 +5,6 @@

import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -18,14 +15,14 @@
class RestCallLimiter {
private volatile Semaphore maxConcurrentSemaphore;
private volatile Semaphore maxActiveSemaphore;
private RateLimitConfig limitConfig;
private volatile LimitChange activeChange;
private volatile LimitChange concurrentChange;

RestCallLimiter(RateLimitConfig limitConfig) {
this.limitConfig = limitConfig;
this.maxConcurrentSemaphore = makeSemaphore(limitConfig.maxConcurrent);
this.maxActiveSemaphore = makeSemaphore(limitConfig.maxActive);
private int maxConcurrent;
private int maxActive;

RestCallLimiter(int maxConcurrent, int maxActive) {
this.maxConcurrent = maxConcurrent;
this.maxActive = maxActive;
this.maxConcurrentSemaphore = makeSemaphore(maxConcurrent);
this.maxActiveSemaphore = makeSemaphore(maxActive);
}

/**
Expand All @@ -36,7 +33,6 @@ class RestCallLimiter {
* task to perform after acquire
*/
public boolean tryAcquireAndRun(Runnable taskAfterAcquire) {
applyConcurrentPermitChangeIfApplicable();
// hang on to the semaphore, so that we can be certain of
// releasing the same one we acquired
final Semaphore concSem = maxConcurrentSemaphore;
Expand All @@ -60,7 +56,6 @@ public boolean tryAcquireAndRun(Runnable taskAfterAcquire) {
}

private boolean acquireActiveAndRatePermit(Runnable taskAfterAcquire) {
applyActivePermitChangeIfApplicable();
log.debug("before acquire [active] semaphore:{}", maxActiveSemaphore);
try {
// hang on to the semaphore, so that we can be certain of
Expand All @@ -84,60 +79,24 @@ private boolean acquireActiveAndRatePermit(Runnable taskAfterAcquire) {
}
}

private void applyConcurrentPermitChangeIfApplicable() {
if (concurrentChange != null) {
synchronized (this) {
if (concurrentChange != null) {
log.debug(
"change max [concurrent] semaphore with new permit ",
concurrentChange.newLimit);
maxConcurrentSemaphore =
makeSemaphore(concurrentChange.newLimit);
concurrentChange = null;
}
}
public synchronized void changeConfig(int maxConcurrent, int maxActive) {
if (maxConcurrent != this.maxConcurrent) {
log.debug(
"change max [concurrent] semaphore with new permit {}",
maxConcurrent);
maxConcurrentSemaphore =
makeSemaphore(maxConcurrent);
this.maxConcurrent = maxConcurrent;
}
}

private void applyActivePermitChangeIfApplicable() {
if (activeChange != null) {
synchronized (this) {
if (activeChange != null) {
// since this block is synchronized, there won't be new
// permit acquired from maxActiveSemaphore other than this
// thread. It ought to be the last and only one entering in
// this block. It will replace semaphore and old blocked
// threads will release on old semaphore
log.debug(
"change max [active] semaphore with new permit {}",
activeChange.newLimit);
maxActiveSemaphore = makeSemaphore(activeChange.newLimit);
activeChange = null;
}
}
if (maxActive != this.maxActive) {
log.debug(
"change max [active] semaphore with new permit {}",
maxActive);
maxActiveSemaphore = makeSemaphore(maxActive);
this.maxActive = maxActive;
}
}

public void changeConfig(RateLimitConfig newLimitConfig) {
if (newLimitConfig.maxConcurrent != limitConfig.maxConcurrent) {
changeConcurrentLimit(limitConfig.maxConcurrent,
newLimitConfig.maxConcurrent);
}
if (newLimitConfig.maxActive != limitConfig.maxActive) {
changeActiveLimit(limitConfig.maxActive, newLimitConfig.maxActive);
}
limitConfig = newLimitConfig;
}

protected synchronized void
changeConcurrentLimit(int oldLimit, int newLimit) {
this.concurrentChange = new LimitChange(oldLimit, newLimit);
}

protected synchronized void changeActiveLimit(int oldLimit, int newLimit) {
this.activeChange = new LimitChange(oldLimit, newLimit);
}

public int availableConcurrentPermit() {
return maxConcurrentSemaphore.availablePermits();
}
Expand Down Expand Up @@ -167,33 +126,23 @@ public String toString() {
.toString();
}

@RequiredArgsConstructor
@EqualsAndHashCode
@ToString
public static class RateLimitConfig {
private final int maxConcurrent;
private final int maxActive;
}

@RequiredArgsConstructor
@ToString
private static class LimitChange {
private final int oldLimit;
private final int newLimit;
}

/**
* Overrides tryAcquire method to return true all the time.
*/
private static class NoLimitSemaphore extends Semaphore {
private static final long serialVersionUID = 1L;
private static final NoLimitSemaphore INSTANCE =
new NoLimitSemaphore(0);
new NoLimitSemaphore();

private NoLimitSemaphore(int permits) {
private NoLimitSemaphore() {
super(1);
}

@Override
public void release() {
// do nothing
}

@Override
public boolean tryAcquire() {
return true;
Expand Down
Expand Up @@ -73,8 +73,8 @@ public void willSkipIfRateLimitAreAllZero() throws Exception {
public void willFirstTryAcquire() throws InterruptedException, IOException,
ServletException {

when(rateLimitManager.getLimitConfig()).thenReturn(
new RestCallLimiter.RateLimitConfig(1, 1));
when(rateLimitManager.getMaxConcurrent()).thenReturn(1);
when(rateLimitManager.getMaxActive()).thenReturn(1);
when(applicationConfiguration.getMaxConcurrentRequestsPerApiKey()).thenReturn(1);
doAnswer(new Answer() {
@Override
Expand Down

0 comments on commit 66dfc2a

Please sign in to comment.