Skip to content

Commit

Permalink
Merge 46ad70f into aca9c2e
Browse files Browse the repository at this point in the history
  • Loading branch information
storozhukBM committed May 26, 2018
2 parents aca9c2e + 46ad70f commit 573792f
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

/**
* A RateLimiter instance is thread-safe can be used to decorate multiple requests.
*
* <p>
* A RateLimiter distributes permits at a configurable rate. {@link #getPermission} blocks if necessary
* until a permit is available, and then takes it. Once acquired, permits need not be released.
*/
Expand Down Expand Up @@ -68,7 +68,7 @@ static RateLimiter of(String name, Supplier<RateLimiterConfig> rateLimiterConfig
/**
* Creates a RateLimiter with a default RateLimiterConfig configuration.
*
* @param name the name of the RateLimiter
* @param name the name of the RateLimiter
* @return The {@link RateLimiter}
*/
static RateLimiter ofDefaults(String name) {
Expand All @@ -79,8 +79,8 @@ static RateLimiter ofDefaults(String name) {
* Returns a supplier which is decorated by a rateLimiter.
*
* @param rateLimiter the rateLimiter
* @param supplier the original supplier
* @param <T> the type of the returned CompletionStage's result
* @param supplier the original supplier
* @param <T> the type of the returned CompletionStage's result
* @return a supplier which is decorated by a RateLimiter.
*/
static <T> Supplier<CompletionStage<T>> decorateCompletionStage(RateLimiter rateLimiter, Supplier<CompletionStage<T>> supplier) {
Expand All @@ -90,15 +90,15 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(RateLimiter rate
try {
waitForPermission(rateLimiter);
supplier.get()
.whenComplete(
(result, throwable) -> {
if (throwable != null) {
promise.completeExceptionally(throwable);
} else {
promise.complete(result);
}
}
);
.whenComplete(
(result, throwable) -> {
if (throwable != null) {
promise.completeExceptionally(throwable);
} else {
promise.complete(result);
}
}
);
} catch (Throwable throwable) {
promise.completeExceptionally(throwable);
}
Expand All @@ -111,7 +111,7 @@ static <T> Supplier<CompletionStage<T>> decorateCompletionStage(RateLimiter rate
*
* @param rateLimiter the RateLimiter
* @param supplier the original supplier
* @param <T> the type of results supplied supplier
* @param <T> the type of results supplied supplier
* @return a supplier which is restricted by a RateLimiter.
*/
static <T> CheckedFunction0<T> decorateCheckedSupplier(RateLimiter rateLimiter, CheckedFunction0<T> supplier) {
Expand Down Expand Up @@ -141,8 +141,8 @@ static CheckedRunnable decorateCheckedRunnable(RateLimiter rateLimiter, CheckedR
*
* @param rateLimiter the RateLimiter
* @param function the original function
* @param <T> the type of function argument
* @param <R> the type of function results
* @param <T> the type of function argument
* @param <R> the type of function results
* @return a function which is restricted by a RateLimiter.
*/
static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(RateLimiter rateLimiter, CheckedFunction1<T, R> function) {
Expand All @@ -157,7 +157,7 @@ static <T, R> CheckedFunction1<T, R> decorateCheckedFunction(RateLimiter rateLim
*
* @param rateLimiter the RateLimiter
* @param supplier the original supplier
* @param <T> the type of results supplied supplier
* @param <T> the type of results supplied supplier
* @return a supplier which is restricted by a RateLimiter.
*/
static <T> Supplier<T> decorateSupplier(RateLimiter rateLimiter, Supplier<T> supplier) {
Expand All @@ -179,7 +179,7 @@ static <T> Callable<T> decorateCallable(RateLimiter rateLimiter, Callable<T> cal
*
* @param rateLimiter the RateLimiter
* @param consumer the original consumer
* @param <T> the type of the input to the consumer
* @param <T> the type of the input to the consumer
* @return a consumer which is restricted by a RateLimiter.
*/
static <T> Consumer<T> decorateConsumer(RateLimiter rateLimiter, Consumer<T> consumer) {
Expand Down Expand Up @@ -209,8 +209,8 @@ static Runnable decorateRunnable(RateLimiter rateLimiter, Runnable runnable) {
*
* @param rateLimiter the RateLimiter
* @param function the original function
* @param <T> the type of the input to the function
* @param <R> the type of the result of the function
* @param <T> the type of the input to the function
* @param <R> the type of the result of the function
* @return a function which is restricted by a RateLimiter.
*/
static <T, R> Function<T, R> decorateFunction(RateLimiter rateLimiter, Function<T, R> function) {
Expand All @@ -224,7 +224,7 @@ static <T, R> Function<T, R> decorateFunction(RateLimiter rateLimiter, Function<
* Will wait for permission within default timeout duration.
*
* @param rateLimiter the RateLimiter to get permission from
* @throws RequestNotPermitted if waiting time elapsed before a permit was acquired.
* @throws RequestNotPermitted if waiting time elapsed before a permit was acquired.
* @throws IllegalStateException if thread was interrupted during permission wait
*/
static void waitForPermission(final RateLimiter rateLimiter) throws IllegalStateException, RequestNotPermitted {
Expand All @@ -243,6 +243,7 @@ static void waitForPermission(final RateLimiter rateLimiter) throws IllegalState
* Dynamic rate limiter configuration change.
* This method allows to change timeout duration of current limiter.
* NOTE! New timeout duration won't affect threads that are currently waiting for permission.
*
* @param timeoutDuration new timeout duration
*/
void changeTimeoutDuration(Duration timeoutDuration);
Expand All @@ -251,6 +252,7 @@ static void waitForPermission(final RateLimiter rateLimiter) throws IllegalState
* Dynamic rate limiter configuration change.
* This method allows to change count of permissions available during refresh period.
* NOTE! New limit won't affect current period permissions and will apply only from next one.
*
* @param limitForPeriod new permissions limit
*/
void changeLimitForPeriod(int limitForPeriod);
Expand All @@ -268,6 +270,16 @@ static void waitForPermission(final RateLimiter rateLimiter) throws IllegalState
*/
boolean getPermission(Duration timeoutDuration);

/**
* Reserves a permission from this rate limiter and returns nanoseconds you should wait for it.
* If returned long is negative, it means that you failed to reserve permission,
* possibly your {@code timeoutDuration} is less then time to wait for permission.
*
* @param timeoutDuration the maximum time you want to wait.
* @return {@code long} amount of nanoseconds you should wait for reserved permission. if negative, it means you failed to reserve.
*/
long reservePermission(Duration timeoutDuration);

/**
* Get the name of this RateLimiter
*
Expand Down Expand Up @@ -300,23 +312,22 @@ static void waitForPermission(final RateLimiter rateLimiter) throws IllegalState
* Decorates and executes the decorated Supplier.
*
* @param supplier the original Supplier
* @param <T> the type of results supplied by this supplier
* @param <T> the type of results supplied by this supplier
* @return the result of the decorated Supplier.
*/
default <T> T executeSupplier(Supplier<T> supplier){
default <T> T executeSupplier(Supplier<T> supplier) {
return decorateSupplier(this, supplier).get();
}

/**
* Decorates and executes the decorated Callable.
*
* @param callable the original Callable
*
* @param <T> the result type of callable
* @return the result of the decorated Callable.
* @param <T> the result type of callable
* @throws Exception if unable to compute a result
*/
default <T> T executeCallable(Callable<T> callable) throws Exception{
default <T> T executeCallable(Callable<T> callable) throws Exception {
return decorateCallable(this, callable).call();
}

Expand All @@ -325,7 +336,7 @@ default <T> T executeCallable(Callable<T> callable) throws Exception{
*
* @param runnable the original Runnable
*/
default void executeRunnable(Runnable runnable){
default void executeRunnable(Runnable runnable) {
decorateRunnable(this, runnable).run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) {

waitingThreads = new AtomicInteger(0);
state = new AtomicReference<>(new State(
rateLimiterConfig,0, rateLimiterConfig.getLimitForPeriod(), 0
rateLimiterConfig, 0, rateLimiterConfig.getLimitForPeriod(), 0
));
eventProcessor = new RateLimiterEventProcessor();
}
Expand Down Expand Up @@ -107,6 +107,30 @@ public boolean getPermission(final Duration timeoutDuration) {
return result;
}

/**
* {@inheritDoc}
*/
@Override
public long reservePermission(Duration timeoutDuration) {
long timeoutInNanos = timeoutDuration.toNanos();
State modifiedState = updateStateWithBackOff(timeoutInNanos);

boolean canAcquireImmediately = modifiedState.nanosToWait <= 0;
if (canAcquireImmediately) {
publishRateLimiterEvent(true);
return 0;
}

boolean canAcquireInTime = timeoutInNanos >= modifiedState.nanosToWait;
if (canAcquireInTime) {
publishRateLimiterEvent(true);
return modifiedState.nanosToWait;
}

publishRateLimiterEvent(false);
return -1;
}

/**
* Atomically updates the current {@link State} with the results of
* applying the {@link AtomicRateLimiter#calculateNextState}, returning the updated {@link State}.
Expand Down Expand Up @@ -188,10 +212,9 @@ private State calculateNextState(final long timeoutInNanos, final State activeSt
* Calculates time to wait for next permission as
* [time to the next cycle] + [duration of full cycles until reserved permissions expire]
*
*
* @param cyclePeriodInNanos current configuration values
* @param permissionsPerCycle current configuration values
*@param availablePermissions currently available permissions, can be negative if some permissions have been reserved
* @param cyclePeriodInNanos current configuration values
* @param permissionsPerCycle current configuration values
* @param availablePermissions currently available permissions, can be negative if some permissions have been reserved
* @param currentNanos current time in nanoseconds
* @param currentCycle current {@link AtomicRateLimiter} cycle @return nanoseconds to wait for the next permission
*/
Expand All @@ -210,7 +233,6 @@ private long nanosToWaitForPermission(final long cyclePeriodInNanos, final int p
* Determines whether caller can acquire permission before timeout or not and then creates corresponding {@link State}.
* Reserves permissions only if caller can successfully wait for permission.
*
*
* @param config
* @param timeoutInNanos max time that caller can wait for permission in nanoseconds
* @param cycle cycle for new {@link State}
Expand Down Expand Up @@ -303,11 +325,12 @@ public EventPublisher getEventPublisher() {
return eventProcessor;
}

@Override public String toString() {
@Override
public String toString() {
return "AtomicRateLimiter{" +
"name='" + name + '\'' +
", rateLimiterConfig=" + state.get().config +
'}';
"name='" + name + '\'' +
", rateLimiterConfig=" + state.get().config +
'}';
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,16 @@ public boolean getPermission(final Duration timeoutDuration) {
}
}

/**
* {@inheritDoc}
* SemaphoreBasedRateLimiter is totally blocking by it's nature. So this non-blocking API isn't supported.
* It will return negative numbers all the time.
*/
@Override
public long reservePermission(Duration timeoutDuration) {
return -1;
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,51 @@ public void notSpyRawTest() {
then(secondNoPermission).isFalse();
}

@Test
public void notSpyRawNonBlockingTest() {
AtomicRateLimiter rawLimiter = new AtomicRateLimiter("rawLimiter", rateLimiterConfig);
AtomicRateLimiter.AtomicRateLimiterMetrics rawDetailedMetrics = rawLimiter.getDetailedMetrics();

long firstCycle = rawDetailedMetrics.getCycle();
while (firstCycle == rawDetailedMetrics.getCycle()) {
System.out.print('.'); // wait for current cycle to pass
}

long firstPermission = rawLimiter.reservePermission(Duration.ZERO);
long nanosToWait = rawDetailedMetrics.getNanosToWait();
long startTime = System.nanoTime();
while(System.nanoTime() - startTime < nanosToWait) {
System.out.print('*'); // wait for permission renewal
}

long secondPermission = rawLimiter.reservePermission(Duration.ZERO);
long firstNoPermission = rawLimiter.reservePermission(Duration.ZERO);
long secondCycle = rawDetailedMetrics.getCycle();

rawLimiter.changeLimitForPeriod(PERMISSIONS_RER_CYCLE * 2);
nanosToWait = rawDetailedMetrics.getNanosToWait();
startTime = System.nanoTime();
while(System.nanoTime() - startTime < nanosToWait) {
System.out.print('^'); // wait for permission renewal
}
long thirdPermission = rawLimiter.reservePermission(Duration.ZERO);
long fourthPermission = rawLimiter.reservePermission(Duration.ZERO);
long secondNoPermission = rawLimiter.reservePermission(Duration.ZERO);
long thirdCycle = rawDetailedMetrics.getCycle();


then(secondCycle - firstCycle).isEqualTo(2);
then(thirdCycle - secondCycle).isEqualTo(1);

then(firstPermission).isZero();
then(secondPermission).isZero();
then(thirdPermission).isZero();
then(fourthPermission).isZero();

then(firstNoPermission).isNegative();
then(secondNoPermission).isNegative();
}

@Test
public void permissionsInFirstCycle() throws Exception {
setTimeOnNanos(CYCLE_IN_NANOS - 10);
Expand Down Expand Up @@ -179,6 +224,37 @@ public void reserveAndRefresh() throws Exception {
then(metrics.getNumberOfWaitingThreads()).isEqualTo(0);
}

@Test
public void reserveFewThenSkipCyclesBeforeRefreshNonBlocking() throws Exception {
setTimeOnNanos(CYCLE_IN_NANOS);
long permission = rateLimiter.reservePermission(Duration.ZERO);
then(permission).isZero();
then(metrics.getAvailablePermissions()).isEqualTo(0);
then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS);
then(metrics.getNumberOfWaitingThreads()).isEqualTo(0);

long reservation = rateLimiter.reservePermission(Duration.ofNanos(CYCLE_IN_NANOS));
then(reservation).isPositive();
then(reservation).isLessThanOrEqualTo(CYCLE_IN_NANOS);
then(metrics.getAvailablePermissions()).isEqualTo(-1);
then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS * 2);
then(metrics.getNumberOfWaitingThreads()).isEqualTo(0);


long additionalReservation = rateLimiter.reservePermission(Duration.ofNanos(CYCLE_IN_NANOS * 2));
then(additionalReservation).isPositive();
then(additionalReservation).isGreaterThan(CYCLE_IN_NANOS);
then(additionalReservation).isLessThanOrEqualTo(CYCLE_IN_NANOS * 2);
then(metrics.getAvailablePermissions()).isEqualTo(-2);
then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS * 3);
then(metrics.getNumberOfWaitingThreads()).isEqualTo(0);

setTimeOnNanos(CYCLE_IN_NANOS * 6 + 10);
then(metrics.getAvailablePermissions()).isEqualTo(1);
then(metrics.getNanosToWait()).isEqualTo(0L);
then(metrics.getNumberOfWaitingThreads()).isEqualTo(0);
}

@Test
public void reserveFewThenSkipCyclesBeforeRefresh() throws Exception {
setTimeOnNanos(CYCLE_IN_NANOS);
Expand Down Expand Up @@ -256,6 +332,27 @@ public void rejectedByTimeout() throws Exception {
then(metrics.getNumberOfWaitingThreads()).isEqualTo(0);
}

@Test
public void rejectedByTimeoutNonBlocking() throws Exception {
setTimeOnNanos(CYCLE_IN_NANOS);
long permission = rateLimiter.reservePermission(Duration.ZERO);
then(permission).isZero();
then(metrics.getAvailablePermissions()).isEqualTo(0);
then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS);
then(metrics.getNumberOfWaitingThreads()).isEqualTo(0);

long failedPermission = rateLimiter.reservePermission(Duration.ofNanos(CYCLE_IN_NANOS - 1));
then(failedPermission).isNegative();
then(metrics.getAvailablePermissions()).isEqualTo(0);
then(metrics.getNanosToWait()).isEqualTo(CYCLE_IN_NANOS);
then(metrics.getNumberOfWaitingThreads()).isEqualTo(0);

setTimeOnNanos(CYCLE_IN_NANOS * 2 - 1);
then(metrics.getAvailablePermissions()).isEqualTo(0);
then(metrics.getNanosToWait()).isEqualTo(1L);
then(metrics.getNumberOfWaitingThreads()).isEqualTo(0);
}

@Test
public void waitingThreadIsInterrupted() throws Exception {
setTimeOnNanos(CYCLE_IN_NANOS);
Expand Down
Loading

0 comments on commit 573792f

Please sign in to comment.