Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
rhbz988202 - encapsulate control branches (acquire and release or sem…
Browse files Browse the repository at this point in the history
…aphore etc) into RestCallLimiter
  • Loading branch information
Patrick Huang committed Mar 21, 2014
1 parent ce4f704 commit 33b9802
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 141 deletions.
@@ -1,9 +1,12 @@
package org.zanata.limits;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
Expand All @@ -15,6 +18,8 @@
import org.jboss.seam.servlet.ContextualHttpServletRequest;
import org.zanata.ApplicationConfiguration;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -27,9 +32,10 @@
*/
@Slf4j
public class RateLimitingProcessor extends ContextualHttpServletRequest {

// http://tools.ietf.org/html/rfc6585
public static final int TOO_MANY_REQUEST = 429;

private static final RateLimiter logLimiter = RateLimiter.create(1);
private final String apiKey;
private final FilterChain filterChain;
private final ServletRequest servletRequest;
Expand Down Expand Up @@ -71,18 +77,26 @@ public void process() throws Exception {

log.debug("check semaphore for {}", this);

if (rateLimiter.tryAcquire()) {
try {
filterChain.doFilter(servletRequest, servletResponse);
} finally {
log.debug("releasing semaphore for {}", apiKey);
rateLimiter.release();
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
filterChain.doFilter(servletRequest, servletResponse);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
catch (ServletException e) {
throw Throwables.propagate(e);
}
}
};
if (!rateLimiter.tryAcquireAndRun(runnable)) {
if (logLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
log.warn(
"{} has too many concurrent requests. Returning status 429",
apiKey);
}
} else {
// TODO pahuang rate limit the logging otherwise it may become excessive
log.warn(
"{} has too many concurrent requests. Returning status 429",
apiKey);
httpResponse.setStatus(TOO_MANY_REQUEST);
PrintWriter writer = httpResponse.getWriter();
writer.append(String.format(
Expand Down
137 changes: 86 additions & 51 deletions zanata-war/src/main/java/org/zanata/limits/RestCallLimiter.java
Expand Up @@ -4,6 +4,7 @@
import java.util.concurrent.TimeUnit;

import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import lombok.EqualsAndHashCode;
Expand All @@ -17,11 +18,12 @@
*/
@Slf4j
public class RestCallLimiter {
private Semaphore maxConcurrentSemaphore;
private Semaphore maxActiveSemaphore;
private volatile Semaphore maxConcurrentSemaphore;
private volatile Semaphore maxActiveSemaphore;
private RateLimiter rateLimiter;
private RateLimitConfig limitConfig;
private volatile ActiveLimitChange change;
private volatile LimitChange activeChange;
private volatile LimitChange concurrentChange;

public RestCallLimiter(RateLimitConfig limitConfig) {
this.limitConfig = limitConfig;
Expand All @@ -31,60 +33,89 @@ public RestCallLimiter(RateLimitConfig limitConfig) {
rateLimiter = RateLimiter.create(limitConfig.rateLimitPerSecond);
}

public boolean tryAcquire() {
log.debug("before try acquire concurrent semaphore:{}",
maxConcurrentSemaphore);
boolean got = maxConcurrentSemaphore.tryAcquire();
log.debug("get permit:{}", got);
if (got) {
acquireActiveAndRatePermit();
log.debug("got all permits and ready to go: {}", this);
public boolean tryAcquireAndRun(Runnable taskAfterAcquire) {
applyConcurrentPermitChangeIfApplicable();
boolean gotConcurrentPermit = maxConcurrentSemaphore.tryAcquire();
log.debug("try acquire [concurrent] permit:{}", gotConcurrentPermit);
if (gotConcurrentPermit) {
try {
if (acquireActiveAndRatePermit()) {
try {
taskAfterAcquire.run();
} finally {
log.debug("releasing active concurrent semaphore");
maxActiveSemaphore.release();
}
} else {
throw new RuntimeException(
"Couldn't get an [active] permit in time");
}
} finally {
log.debug("releasing max [concurrent] semaphore");
maxConcurrentSemaphore.release();
}
}
return gotConcurrentPermit;
}

private boolean acquireActiveAndRatePermit() {
applyActivePermitChangeIfApplicable();
log.debug("before acquire [active] semaphore:{}", maxActiveSemaphore);
try {
boolean gotActivePermit =
maxActiveSemaphore.tryAcquire(5, TimeUnit.MINUTES);
log.debug(
"got [active] semaphore [{}] and before acquire rate limit permit:{}",
gotActivePermit, rateLimiter);
if (gotActivePermit) {
rateLimiter.acquire();
}
return gotActivePermit;
} catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}

private void applyConcurrentPermitChangeIfApplicable() {
if (concurrentChange != null) {
synchronized (this) {
if (concurrentChange != null) {
log.debug(
"change max [concurrent] semaphore with new permit ",
concurrentChange.newLimit);
maxConcurrentSemaphore =
new Semaphore(concurrentChange.newLimit, true);
concurrentChange = null;
}
}
}
return got;
}

private void acquireActiveAndRatePermit() {
if (change != null) {
private void applyActivePermitChangeIfApplicable() {
if (activeChange != null) {
synchronized (this) {
if (change != null) {
if (activeChange != null) {
// since this block is synchronized, there won't be new
// permit acquired from maxActiveSemaphore other than this
// thread. It ought to be the last and only one entering in
// this block. It will have to wait for all other previous
// blocked threads to complete before changing the semaphore
log.debug(
"detects max active permit change [{}]. Will sleep until all blocking threads [#{}] released.",
change, maxActiveSemaphore.getQueueLength());
while (maxActiveSemaphore.availablePermits() != change.oldLimit) {
"detects max [active] permit change [{}]. Will sleep until all blocking threads [#{}] released.",
activeChange, maxActiveSemaphore.getQueueLength());
while (maxActiveSemaphore.availablePermits() != activeChange.oldLimit) {
Uninterruptibles.sleepUninterruptibly(1,
TimeUnit.NANOSECONDS);
}
log.debug("change max active semaphore with new permit");
maxActiveSemaphore = new Semaphore(change.newLimit, true);
change = null;
log.debug(
"change max [active] semaphore with new permit {}",
activeChange.newLimit);
maxActiveSemaphore =
new Semaphore(activeChange.newLimit, true);
activeChange = null;
}
}
}
log.debug("before acquire active semaphore:{}", maxActiveSemaphore);
maxActiveSemaphore.acquireUninterruptibly();
// if we want to enable timeout here,
// we must ensure release is not called when it timed out
// try {
// boolean gotIt = maxActiveSemaphore.tryAcquire(30, TimeUnit.SECONDS);
// if (!gotIt) {
// // timed out
// throw new WebApplicationException(Response.status(
// Response.Status.SERVICE_UNAVAILABLE)
// .entity("System too busy").build());
// }
// }
// catch (InterruptedException e) {
// throw Throwables.propagate(e);
// }
log.debug(
"got active semaphore and before acquire rate limit permit:{}",
rateLimiter);
rateLimiter.acquire();
}

public void release() {
Expand All @@ -96,27 +127,29 @@ public void release() {

public void changeConfig(RateLimitConfig newLimitConfig) {
if (newLimitConfig.maxConcurrent != limitConfig.maxConcurrent) {
changeConcurrentLimit(newLimitConfig.maxConcurrent);
changeConcurrentLimit(limitConfig.maxConcurrent,
newLimitConfig.maxConcurrent);
}
if (newLimitConfig.rateLimitPerSecond != limitConfig.rateLimitPerSecond) {
changeRateLimitPermitsPerSecond(newLimitConfig.rateLimitPerSecond);
}
if (newLimitConfig.maxActive != limitConfig.maxActive) {
changeActiveLimit(limitConfig.maxActive, newLimitConfig.maxActive);
}
limitConfig = newLimitConfig;
}

protected synchronized void changeConcurrentLimit(int maxConcurrent) {
log.info("max concurrent limit changed: {}", maxConcurrent);
maxConcurrentSemaphore = new Semaphore(maxConcurrent);
protected synchronized void
changeConcurrentLimit(int oldLimit, int newLimit) {
this.concurrentChange = new LimitChange(oldLimit, newLimit);
}

protected synchronized void changeRateLimitPermitsPerSecond(double permits) {
log.info("rate limit changed: {}", permits);
rateLimiter.setRate(permits);
}

protected synchronized void changeActiveLimit(int oldLimit, int newLimit) {
this.change = new ActiveLimitChange(oldLimit, newLimit);
log.info("max active limit changed: {}", change);
this.activeChange = new LimitChange(oldLimit, newLimit);
}

public int availableConcurrentPermit() {
Expand All @@ -136,8 +169,10 @@ public String toString() {
return Objects
.toStringHelper(this)
.add("id", super.toString())
.add("maxConcurrent(available)", maxConcurrentSemaphore.availablePermits())
.add("maxActive(available)", maxActiveSemaphore.availablePermits())
.add("maxConcurrent(available)",
maxConcurrentSemaphore.availablePermits())
.add("maxActive(available)",
maxActiveSemaphore.availablePermits())
.add("maxActive(queue)", maxActiveSemaphore.getQueueLength())
.add("rateLimiter", rateLimiter).toString();
}
Expand All @@ -153,7 +188,7 @@ public static class RateLimitConfig {

@RequiredArgsConstructor
@ToString
private static class ActiveLimitChange {
private static class LimitChange {
private final int oldLimit;
private final int newLimit;
}
Expand Down
6 changes: 1 addition & 5 deletions zanata-war/src/test/java/org/zanata/ZanataRestTest.java
Expand Up @@ -159,11 +159,7 @@ protected void prepareProviders() {
protected void prepareSeamAutowire() {
seamAutowire
.reset()
.ignoreNonResolvable()
.use(SeamAutowire.getComponentName(JndiBackedConfig.class),
jndiBackedConfig)
.use(SeamAutowire.getComponentName(RateLimitManager.class),
new RateLimitManager());
.ignoreNonResolvable();
}

/**
Expand Down
Expand Up @@ -122,27 +122,5 @@ public Void call() throws Exception {
verify(response, atLeastOnce()).setStatus(429);
// one should go through
verify(filterChain).doFilter(request, response);
// semaphore is released
assertThat(rateLimitManager.getIfPresent(API_KEY)
.availableConcurrentPermit(), Matchers.equalTo(1));
}

@Test
public void willReleaseSemaphoreWhenThereIsException() throws IOException,
ServletException {
when(rateLimitManager.getLimitConfig()).thenReturn(
new RestCallLimiter.RateLimitConfig(1, 1, 100.0));
when(applicationConfiguration.getRateLimitSwitch()).thenReturn(true);
doThrow(new RuntimeException("bad")).when(filterChain).doFilter(
request, response);

try {
processor.process();
} catch (Exception e) {
// I know
}

assertThat(rateLimitManager.getIfPresent(API_KEY)
.availableConcurrentPermit(), Matchers.equalTo(1));
}
}

0 comments on commit 33b9802

Please sign in to comment.