Skip to content

Commit

Permalink
Reactor and RxJava RateLimiter operator try to reserve a permission a…
Browse files Browse the repository at this point in the history
…nd d… (ReactiveX#465)

* Reactor and RxJava RateLimiter operator try to reserve a permission and delay the subscription if necessary.
  • Loading branch information
RobWin committed May 29, 2019
1 parent 44c6af9 commit fb143f3
Show file tree
Hide file tree
Showing 23 changed files with 288 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,7 @@ static <T, R> Function<T, R> decorateFunction(RateLimiter rateLimiter, Function<
* @throws IllegalStateException if thread was interrupted during permission wait
*/
static void waitForPermission(final RateLimiter rateLimiter) {
RateLimiterConfig rateLimiterConfig = rateLimiter.getRateLimiterConfig();
Duration timeoutDuration = rateLimiterConfig.getTimeoutDuration();
boolean permission = rateLimiter.acquirePermission(timeoutDuration);
boolean permission = rateLimiter.acquirePermission();
if (Thread.interrupted()) {
throw new IllegalStateException("Thread was interrupted during permission wait");
}
Expand Down Expand Up @@ -275,28 +273,41 @@ static void waitForPermission(final RateLimiter rateLimiter) {
@Deprecated
boolean getPermission(Duration timeoutDuration);

/**
* @deprecated Use {@link RateLimiter#acquirePermission()} instead.
* @since 0.16.0
*/
@Deprecated
boolean acquirePermission(Duration timeoutDuration);

/**
* Acquires a permission from this rate limiter, blocking until one is available, or the thread is interrupted.
* Maximum wait time is {@link RateLimiterConfig#getTimeoutDuration()}
*
* <p>If the current thread is {@linkplain Thread#interrupt interrupted}
* while waiting for a permit then it won't throw {@linkplain InterruptedException},
* but its interrupt status will be set.
*
* @param timeoutDuration the maximum time to wait
* @return {@code true} if a permit was acquired and {@code false}
* if waiting timeoutDuration elapsed before a permit was acquired
*/
boolean acquirePermission(Duration timeoutDuration);
boolean acquirePermission();

/**
* @deprecated Use {@link RateLimiter#reservePermission()} instead.
* @since 0.16.0
*/
@Deprecated
long reservePermission(Duration timeoutDuration);

/**
* Reserves a permission from this rate limiter and returns nanoseconds you should wait for it.
* If returned long is negative, it means that you failed to reserve permission,
* possibly your {@code timeoutDuration} is less then time to wait for permission.
* possibly your {@link RateLimiterConfig#getTimeoutDuration()} is less then time to wait for permission.
*
* @param timeoutDuration the maximum time you want to wait.
* @return {@code long} amount of nanoseconds you should wait for reserved permission. if negative, it means you failed to reserve.
*/
long reservePermission(Duration timeoutDuration);
long reservePermission();

/**
* Get the name of this RateLimiter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public class AtomicRateLimiter implements RateLimiter {
private final AtomicReference<State> state;
private final RateLimiterEventProcessor eventProcessor;


public AtomicRateLimiter(String name, RateLimiterConfig rateLimiterConfig) {
this.name = name;

Expand Down Expand Up @@ -112,6 +111,11 @@ public boolean acquirePermission(Duration timeoutDuration) {
return result;
}

@Override
public boolean acquirePermission() {
return acquirePermission(state.get().config.getTimeoutDuration());
}

/**
* {@inheritDoc}
*/
Expand All @@ -136,6 +140,11 @@ public long reservePermission(Duration timeoutDuration) {
return -1;
}

@Override
public long reservePermission() {
return reservePermission(state.get().config.getTimeoutDuration());
}

/**
* Atomically updates the current {@link State} with the results of
* applying the {@link AtomicRateLimiter#calculateNextState}, returning the updated {@link State}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@

/**
* A RateLimiter implementation that consists of {@link Semaphore}
* and scheduler that will refresh permissions
* after each {@link RateLimiterConfig#limitRefreshPeriod}.
* and scheduler that will refresh permissions after each {@link RateLimiterConfig#getLimitRefreshPeriod()}.
*/
public class SemaphoreBasedRateLimiter implements RateLimiter {

Expand Down Expand Up @@ -149,6 +148,11 @@ public boolean acquirePermission(Duration timeoutDuration) {
}
}

@Override
public boolean acquirePermission() {
return acquirePermission(rateLimiterConfig.get().getTimeoutDuration());
}

/**
* {@inheritDoc}
* SemaphoreBasedRateLimiter is totally blocking by it's nature. So this non-blocking API isn't supported.
Expand All @@ -159,6 +163,16 @@ public long reservePermission(Duration timeoutDuration) {
return -1;
}

/**
* {@inheritDoc}
* SemaphoreBasedRateLimiter is totally blocking by it's nature. So this non-blocking API isn't supported.
* It will return negative numbers all the time.
*/
@Override
public long reservePermission() {
return -1;
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ public void decorateCheckedSupplier() throws Throwable {
CheckedFunction0 supplier = mock(CheckedFunction0.class);
CheckedFunction0 decorated = RateLimiter.decorateCheckedSupplier(limit, supplier);

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(false);

Try decoratedSupplierResult = Try.of(decorated);
then(decoratedSupplierResult.isFailure()).isTrue();
then(decoratedSupplierResult.getCause()).isInstanceOf(RequestNotPermitted.class);
verify(supplier, never()).apply();

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(true);
Try secondSupplierResult = Try.of(decorated);
then(secondSupplierResult.isSuccess()).isTrue();
Expand All @@ -95,15 +95,15 @@ public void decorateCheckedRunnable() throws Throwable {
CheckedRunnable runnable = mock(CheckedRunnable.class);
CheckedRunnable decorated = RateLimiter.decorateCheckedRunnable(limit, runnable);

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(false);

Try decoratedRunnableResult = Try.run(decorated);
then(decoratedRunnableResult.isFailure()).isTrue();
then(decoratedRunnableResult.getCause()).isInstanceOf(RequestNotPermitted.class);
verify(runnable, never()).run();

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(true);
Try secondRunnableResult = Try.run(decorated);
then(secondRunnableResult.isSuccess()).isTrue();
Expand All @@ -115,15 +115,15 @@ public void decorateCheckedFunction() throws Throwable {
CheckedFunction1<Integer, String> function = mock(CheckedFunction1.class);
CheckedFunction1<Integer, String> decorated = RateLimiter.decorateCheckedFunction(limit, function);

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(false);

Try<String> decoratedFunctionResult = Try.success(1).mapTry(decorated);
then(decoratedFunctionResult.isFailure()).isTrue();
then(decoratedFunctionResult.getCause()).isInstanceOf(RequestNotPermitted.class);
verify(function, never()).apply(any());

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(true);
Try secondFunctionResult = Try.success(1).mapTry(decorated);
then(secondFunctionResult.isSuccess()).isTrue();
Expand All @@ -135,15 +135,15 @@ public void decorateSupplier() throws Exception {
Supplier supplier = mock(Supplier.class);
Supplier decorated = RateLimiter.decorateSupplier(limit, supplier);

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(false);

Try decoratedSupplierResult = Try.success(decorated).map(Supplier::get);
then(decoratedSupplierResult.isFailure()).isTrue();
then(decoratedSupplierResult.getCause()).isInstanceOf(RequestNotPermitted.class);
verify(supplier, never()).get();

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(true);
Try secondSupplierResult = Try.success(decorated).map(Supplier::get);
then(secondSupplierResult.isSuccess()).isTrue();
Expand All @@ -155,15 +155,15 @@ public void decorateConsumer() throws Exception {
Consumer<Integer> consumer = mock(Consumer.class);
Consumer<Integer> decorated = RateLimiter.decorateConsumer(limit, consumer);

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(false);

Try<Integer> decoratedConsumerResult = Try.success(1).andThen(decorated);
then(decoratedConsumerResult.isFailure()).isTrue();
then(decoratedConsumerResult.getCause()).isInstanceOf(RequestNotPermitted.class);
verify(consumer, never()).accept(any());

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(true);
Try secondConsumerResult = Try.success(1).andThen(decorated);
then(secondConsumerResult.isSuccess()).isTrue();
Expand All @@ -175,15 +175,15 @@ public void decorateRunnable() throws Exception {
Runnable runnable = mock(Runnable.class);
Runnable decorated = RateLimiter.decorateRunnable(limit, runnable);

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(false);

Try decoratedRunnableResult = Try.success(decorated).andThen(Runnable::run);
then(decoratedRunnableResult.isFailure()).isTrue();
then(decoratedRunnableResult.getCause()).isInstanceOf(RequestNotPermitted.class);
verify(runnable, never()).run();

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(true);
Try secondRunnableResult = Try.success(decorated).andThen(Runnable::run);
then(secondRunnableResult.isSuccess()).isTrue();
Expand All @@ -195,15 +195,15 @@ public void decorateFunction() throws Exception {
Function<Integer, String> function = mock(Function.class);
Function<Integer, String> decorated = RateLimiter.decorateFunction(limit, function);

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(false);

Try<String> decoratedFunctionResult = Try.success(1).map(decorated);
then(decoratedFunctionResult.isFailure()).isTrue();
then(decoratedFunctionResult.getCause()).isInstanceOf(RequestNotPermitted.class);
verify(function, never()).apply(any());

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(true);
Try secondFunctionResult = Try.success(1).map(decorated);
then(secondFunctionResult.isSuccess()).isTrue();
Expand All @@ -218,7 +218,7 @@ public void decorateCompletionStage() throws Exception {

Supplier<CompletionStage<String>> decorated = RateLimiter.decorateCompletionStage(limit, completionStage);

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(false);

AtomicReference<Throwable> error = new AtomicReference<>(null);
Expand All @@ -232,7 +232,7 @@ public void decorateCompletionStage() throws Exception {
then(error.get()).isExactlyInstanceOf(RequestNotPermitted.class);
verify(supplier, never()).get();

when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(true);

AtomicReference<Throwable> shouldBeEmpty = new AtomicReference<>(null);
Expand All @@ -248,25 +248,25 @@ public void decorateCompletionStage() throws Exception {

@Test
public void waitForPermissionWithOne() throws Exception {
when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(true);
RateLimiter.waitForPermission(limit);
verify(limit, times(1))
.acquirePermission(config.getTimeoutDuration());
.acquirePermission();
}

@Test(expected = RequestNotPermitted.class)
public void waitForPermissionWithoutOne() throws Exception {
when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.thenReturn(false);
RateLimiter.waitForPermission(limit);
verify(limit, times(1))
.acquirePermission(config.getTimeoutDuration());
.acquirePermission();
}

@Test
public void waitForPermissionWithInterruption() throws Exception {
when(limit.acquirePermission(config.getTimeoutDuration()))
when(limit.acquirePermission())
.then(invocation -> {
LockSupport.parkNanos(5_000_000_000L);
return null;
Expand Down
Loading

0 comments on commit fb143f3

Please sign in to comment.