Skip to content
This repository has been archived by the owner on Nov 9, 2017. It is now read-only.

Commit

Permalink
rhbz988202 - new leaky bucket to control logging volumn and refactor …
Browse files Browse the repository at this point in the history
…to use single processor for all requests
  • Loading branch information
Patrick Huang committed Mar 25, 2014
1 parent e4fa529 commit 74c811f
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 50 deletions.
79 changes: 79 additions & 0 deletions zanata-war/src/main/java/org/zanata/limits/LeakyBucket.java
@@ -0,0 +1,79 @@
package org.zanata.limits;

import java.util.concurrent.TimeUnit;

import com.google.common.base.Objects;
import com.google.common.base.Ticker;
import lombok.extern.slf4j.Slf4j;

/**
* @author Patrick Huang <a
* href="mailto:pahuang@redhat.com">pahuang@redhat.com</a>
*/
@Slf4j
public class LeakyBucket {
private final long refillPeriod;
private final long capacity;
private final Ticker ticker;
private volatile long permit;
private volatile long lastRead;

/**
* Simple form leaky bucket. Initialized with a capacity and full. Each
* #tryAcquire() will deduct 1 permit. Permits is refilled on demand after
* set time period.
*
* @param capacity
* capacity
* @param refillDuration
* refill duration
* @param refillTimeUnit
* refill time unit
*/
public LeakyBucket(long capacity, int refillDuration,
TimeUnit refillTimeUnit) {
this.capacity = capacity;
permit = capacity;
ticker = Ticker.systemTicker();
refillPeriod =
TimeUnit.NANOSECONDS.convert(refillDuration, refillTimeUnit);
}

public synchronized boolean tryAcquire() {
onDemandRefill();
if (permit > 0) {
log.debug("deduct 1 permit and try acquire return true");
lastRead = ticker.read();
permit--;
return true;
} else {
return false;
}
}

private synchronized void onDemandRefill() {
if (permit == capacity) {
return;
}
long timePassed = ticker.read() - lastRead;
log.debug("time passed: {}", timePassed);
long permitsShouldAdd = timePassed / refillPeriod;
log.debug("permits should add: {}", permitsShouldAdd);
if (timePassed >= refillPeriod) {
permit = Math.min(capacity, permit + permitsShouldAdd);
log.debug("refilled and now with {} permits", permit);
}
}

@Override
public String toString() {
// @formatter:off
return Objects.toStringHelper(this)
.add("refillPeriod", refillPeriod)
.add("capacity", capacity)
.add("permit", permit)
.add("lastRead", lastRead)
.toString();
// @formatter:on
}
}
Expand Up @@ -9,8 +9,10 @@
import org.jboss.resteasy.spi.HttpResponse;
import org.jboss.seam.Component;
import org.zanata.ApplicationConfiguration;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -26,20 +28,18 @@ public class RateLimitingProcessor {
// http://tools.ietf.org/html/rfc6585
public static final int TOO_MANY_REQUEST = 429;

private static final RateLimiter logLimiter = RateLimiter.create(1);
private final String apiKey;
private final HttpResponse response;
private final Runnable taskToRun;

public RateLimitingProcessor(String apiKey,
HttpResponse response, Runnable taskToRun) {
this.apiKey = apiKey;

this.response = response;
this.taskToRun = taskToRun;
}

public void process() throws Exception {
private static final Cache<String, LeakyBucket> logLimiters = CacheBuilder
.newBuilder().build(

This comment has been minimized.

Copy link
@seanf

seanf Mar 26, 2014

Contributor

As I said for a previous commit, a single leaky bucket for all the log warnings would be fine. A cache keyed by api key would be nice, but this one is not configured to evict anything, ever.

Also, I don't think this needs to be static, now that there is a single RateLimitingProcessor.

This comment has been minimized.

Copy link
@huangp

huangp Mar 26, 2014

Collaborator

Good catch. Forgot to set a max size.

CacheLoader.from(new Function<String, LeakyBucket>() {
@Override
public LeakyBucket apply(String input) {
return new LeakyBucket(1, 5, TimeUnit.MINUTES);
}
}));

public void
process(String apiKey, HttpResponse response, Runnable taskToRun)
throws Exception {
ApplicationConfiguration appConfig = getApplicationConfiguration();

if (appConfig.getMaxConcurrentRequestsPerApiKey() == 0
Expand Down Expand Up @@ -67,14 +67,16 @@ public void process() throws Exception {
log.debug("check semaphore for {}", this);

if (!rateLimiter.tryAcquireAndRun(taskToRun)) {
if (logLimiter.tryAcquire(1, TimeUnit.SECONDS)) {
LeakyBucket bucket = logLimiters.getIfPresent(apiKey);
if (bucket != null && bucket.tryAcquire()) {
log.warn(
"{} has too many concurrent requests. Returning status 429",
apiKey);
}
String errorMessage = String.format(
"Too many concurrent request for this API key (maximum is %d)",
appConfig.getMaxConcurrentRequestsPerApiKey());
String errorMessage =
String.format(
"Too many concurrent request for this API key (maximum is %d)",
appConfig.getMaxConcurrentRequestsPerApiKey());
response.sendError(TOO_MANY_REQUEST, errorMessage);
}
}
Expand All @@ -90,12 +92,6 @@ protected ApplicationConfiguration getApplicationConfiguration() {
.getInstance("applicationConfiguration");
}

@Override
public String toString() {
return Objects.toStringHelper(this).add("id", super.toString())
.add("apiKey", apiKey).toString();
}

private static class RestRateLimiterLoader implements
Callable<RestCallLimiter> {
private final RestCallLimiter.RateLimitConfig limitConfig;
Expand Down
Expand Up @@ -6,6 +6,7 @@
import org.jboss.resteasy.core.SynchronousDispatcher;
import org.jboss.resteasy.spi.HttpRequest;
import org.jboss.resteasy.spi.HttpResponse;
import org.jboss.resteasy.spi.ResteasyProviderFactory;
import org.jboss.resteasy.spi.UnhandledException;
import org.jboss.seam.resteasy.SeamResteasyProviderFactory;
import org.zanata.limits.RateLimitingProcessor;
Expand All @@ -21,10 +22,22 @@
class RestLimitingSynchronousDispatcher extends SynchronousDispatcher {
static final String API_KEY_ABSENCE_WARNING =
"You must have a valid API key. You can create one by logging in to Zanata and visiting the settings page.";
private final RateLimitingProcessor processor;

public RestLimitingSynchronousDispatcher(
SeamResteasyProviderFactory providerFactory) {
super(providerFactory);
processor = new RateLimitingProcessor();
}

/**
* Test to use.
*/
RestLimitingSynchronousDispatcher(
ResteasyProviderFactory providerFactory,
RateLimitingProcessor processor) {
super(providerFactory);
this.processor = processor;
}

@Override
Expand All @@ -50,9 +63,8 @@ public void run() {
response);
}
};
RateLimitingProcessor processor =
createRateLimitingRequest(apiKey, response, taskToRun);
processor.process();

processor.process(apiKey, response, taskToRun);

} catch (UnhandledException e) {
Throwable cause = e.getCause();
Expand All @@ -77,12 +89,4 @@ public void run() {
throw Throwables.propagate(e);
}
}

/**
* Test override-able.
*/
protected RateLimitingProcessor createRateLimitingRequest(String apiKey,
HttpResponse response, Runnable runnable) {
return new RateLimitingProcessor(apiKey, response, runnable);
}
}
94 changes: 94 additions & 0 deletions zanata-war/src/test/java/org/zanata/limits/LeakyBucketTest.java
@@ -0,0 +1,94 @@
package org.zanata.limits;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.hamcrest.Matchers;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import lombok.extern.slf4j.Slf4j;

import static org.hamcrest.MatcherAssert.assertThat;

/**
* @author Patrick Huang <a
* href="mailto:pahuang@redhat.com">pahuang@redhat.com</a>
*/
@Test(groups = "unit-tests")
@Slf4j
public class LeakyBucketTest {
@BeforeMethod
public void beforeMethod() {
// LogManager.getLogger(LeakyBucket.class.getPackage().getName())
// .setLevel(Level.DEBUG);
}

@Test
public void willWaitUntilRefill() {
int refillDuration = 20;
TimeUnit refillTimeUnit = TimeUnit.MILLISECONDS;
LeakyBucket bucket = new LeakyBucket(1, refillDuration, refillTimeUnit);

assertThat(bucket.tryAcquire(), Matchers.is(true));
assertThat(bucket.tryAcquire(), Matchers.is(false));

Uninterruptibles.sleepUninterruptibly(refillDuration, refillTimeUnit);

assertThat(bucket.tryAcquire(), Matchers.is(true));
}

@Test
public void concurrentTest() throws InterruptedException {
int refillDuration = 10;
TimeUnit refillTimeUnit = TimeUnit.MILLISECONDS;
final LeakyBucket bucket =
new LeakyBucket(1, refillDuration, refillTimeUnit);
Callable<Boolean> callable = new Callable<Boolean>() {

@Override
public Boolean call() throws Exception {
return bucket.tryAcquire();
}
};

int threads = 3;
List<Callable<Boolean>> callables =
Collections.nCopies(threads, callable);
ExecutorService executorService = Executors.newFixedThreadPool(threads);
List<Future<Boolean>> futures = executorService.invokeAll(callables);

List<Boolean> result = getFutureResult(futures);
assertThat(result, Matchers.containsInAnyOrder(true, false, false));

// wait enough time and try again
Uninterruptibles.sleepUninterruptibly(refillDuration, refillTimeUnit);

This comment has been minimized.

Copy link
@seanf

seanf Mar 26, 2014

Contributor

Blocking interrupts should be very rare, and only used when necessary.

This comment has been minimized.

Copy link
@huangp

huangp Mar 26, 2014

Collaborator

This is in test. I try to avoid the try/catch block.

This comment has been minimized.

Copy link
@seanf

seanf Mar 27, 2014

Contributor

Even tests need to be interrupted sometimes. Besides, the method already throws InterruptedException.

List<Future<Boolean>> callAgain = executorService.invokeAll(callables);
assertThat(getFutureResult(callAgain),
Matchers.containsInAnyOrder(true, false, false));
}

private static List<Boolean> getFutureResult(List<Future<Boolean>> futures) {
return Lists.transform(futures,
new Function<Future<Boolean>, Boolean>() {
@Override
public Boolean apply(Future<Boolean> input) {
try {
return input.get();
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
});
}
}
@@ -1,8 +1,6 @@
package org.zanata.limits;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
Expand All @@ -13,8 +11,6 @@
import java.util.concurrent.TimeUnit;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.jboss.resteasy.spi.HttpResponse;
import org.mockito.Mock;
Expand Down Expand Up @@ -55,8 +51,7 @@ public void beforeMethod() throws IOException {
// so that we can verify its interaction
rateLimitManager = spy(new RateLimitManager());
processor =
spy(new RateLimitingProcessor(API_KEY, response,
runnable));
spy(new RateLimitingProcessor());

doReturn(applicationConfiguration).when(processor)
.getApplicationConfiguration();
Expand All @@ -70,7 +65,7 @@ public void willSkipIfRateLimitAreAllZero() throws Exception {
when(applicationConfiguration.getMaxConcurrentRequestsPerApiKey()).thenReturn(0);
when(applicationConfiguration.getRateLimitPerSecond()).thenReturn(0D);

processor.process();
processor.process(API_KEY, response, runnable);

verify(runnable).run();
verifyZeroInteractions(rateLimitManager);
Expand Down Expand Up @@ -98,7 +93,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {

@Override
public Void call() throws Exception {
processor.process();
processor.process(API_KEY, response, runnable);
return null;
}
};
Expand Down
Expand Up @@ -2,7 +2,6 @@

import java.io.IOException;
import javax.servlet.ServletException;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedMap;

import org.jboss.resteasy.core.ResourceInvoker;
Expand Down Expand Up @@ -54,9 +53,9 @@ public void beforeMethod() throws ServletException, IOException {
API_KEY);

dispatcher =
spy(new RestLimitingSynchronousDispatcher(providerFactory));
doReturn(processor).when(dispatcher).createRateLimitingRequest(
eq(API_KEY), same(response), taskCaptor.capture());
spy(new RestLimitingSynchronousDispatcher(providerFactory,
processor));

// this way we can verify the task actually called super.invoke()
doReturn(superInvoker).when(dispatcher).getInvoker(request);
doNothing().when(dispatcher).invoke(request, response, superInvoker);
Expand All @@ -81,7 +80,7 @@ public void willCallRateLimitingProcessorIfAllConditionsAreMet()
throws Exception {
dispatcher.invoke(request, response);

verify(processor).process();
verify(processor).process(same(API_KEY), same(response), taskCaptor.capture());

// verify task is calling super.invoke
Runnable task = taskCaptor.getValue();
Expand Down

3 comments on commit 74c811f

@seanf
Copy link
Contributor

@seanf seanf commented on 74c811f Mar 26, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@huangp Why didn't you just bring back https://github.com/bbeck/token-bucket ?

@huangp
Copy link
Collaborator

@huangp huangp commented on 74c811f Mar 26, 2014

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First I try to avoid adding another dependency if I can. Second the refill in his implementation is not strictly time based(Not that it matters for this particular case since we only have 1 permit max). But I used to have a test for it:

TokenBucket bucket = TokenBuckets
        .newFixedIntervalRefill(2, 1, 100, TimeUnit.MILLISECONDS);

bucket.tryConsume(); // true
bucket.tryConsume(); // false

// refill rate is 1 per 100 ms so after 200 ms it should've fill up.
Uninterruptibles.sleepUninterruptibly(200, TimeUnit.MILLISECONDS);

bucket.tryConsume(); // true
bucket.tryConsume(); // false. Refill will give you maximum 1 permit each call.

If we have some other needs and needed to customize it, we will have to modify. Since it's a simple class I'd use our own copy of implementation.

@seanf
Copy link
Contributor

@seanf seanf commented on 74c811f Mar 28, 2014 via email

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.