Skip to content

Commit

Permalink
#220 Change response of tryConsumeAndReturnRemaining for cases when r…
Browse files Browse the repository at this point in the history
…equested amount of tokens is greater than bandwidth capacity
  • Loading branch information
vladimir-bukhtoyarov committed Jan 28, 2022
1 parent 55016e1 commit 15154f3
Show file tree
Hide file tree
Showing 20 changed files with 62 additions and 172 deletions.
Expand Up @@ -41,7 +41,7 @@ public interface BucketState {

void consume(long toConsume);

long calculateDelayNanosAfterWillBePossibleToConsume(long tokensToConsume, long currentTimeNanos);
long calculateDelayNanosAfterWillBePossibleToConsume(long tokensToConsume, long currentTimeNanos, boolean checkTokensToConsumeShouldBeLessThenCapacity);

long calculateFullRefillingTime(long currentTimeNanos);

Expand Down
Expand Up @@ -295,12 +295,12 @@ public void consume(long toConsume) {
}

@Override
public long calculateDelayNanosAfterWillBePossibleToConsume(long tokensToConsume, long currentTimeNanos) {
public long calculateDelayNanosAfterWillBePossibleToConsume(long tokensToConsume, long currentTimeNanos, boolean checkTokensToConsumeShouldBeLessThenCapacity) {
Bandwidth[] bandwidths = configuration.getBandwidths();
long delayAfterWillBePossibleToConsume = calculateDelayNanosAfterWillBePossibleToConsume(0, bandwidths[0], tokensToConsume, currentTimeNanos);
long delayAfterWillBePossibleToConsume = calculateDelayNanosAfterWillBePossibleToConsume(0, bandwidths[0], tokensToConsume, currentTimeNanos, checkTokensToConsumeShouldBeLessThenCapacity);
for (int i = 1; i < bandwidths.length; i++) {
Bandwidth bandwidth = bandwidths[i];
long delay = calculateDelayNanosAfterWillBePossibleToConsume(i, bandwidth, tokensToConsume, currentTimeNanos);
long delay = calculateDelayNanosAfterWillBePossibleToConsume(i, bandwidth, tokensToConsume, currentTimeNanos, checkTokensToConsumeShouldBeLessThenCapacity);
delayAfterWillBePossibleToConsume = Math.max(delayAfterWillBePossibleToConsume, delay);
if (delay > delayAfterWillBePossibleToConsume) {
delayAfterWillBePossibleToConsume = delay;
Expand Down Expand Up @@ -497,7 +497,10 @@ private void resetBandwidth(int bandwidthIndex, long capacity) {
setRoundingError(bandwidthIndex, 0);
}

private long calculateDelayNanosAfterWillBePossibleToConsume(int bandwidthIndex, Bandwidth bandwidth, long tokens, long currentTimeNanos) {
private long calculateDelayNanosAfterWillBePossibleToConsume(int bandwidthIndex, Bandwidth bandwidth, long tokens, long currentTimeNanos, boolean checkTokensToConsumeShouldBeLessThenCapacity) {
if (checkTokensToConsumeShouldBeLessThenCapacity && tokens > bandwidth.capacity) {
return Long.MAX_VALUE;
}
long currentSize = getCurrentSize(bandwidthIndex);
if (tokens <= currentSize) {
return 0;
Expand Down
Expand Up @@ -230,12 +230,12 @@ private void refill(int bandwidthIndex, Bandwidth bandwidth, long currentTimeNan
}

@Override
public long calculateDelayNanosAfterWillBePossibleToConsume(long tokensToConsume, long currentTimeNanos) {
public long calculateDelayNanosAfterWillBePossibleToConsume(long tokensToConsume, long currentTimeNanos, boolean checkTokensToConsumeShouldBeLessThenCapacity) {
Bandwidth[] bandwidths = configuration.getBandwidths();
long delayAfterWillBePossibleToConsume = calculateDelayNanosAfterWillBePossibleToConsumeForBandwidth(0, bandwidths[0], tokensToConsume, currentTimeNanos);
long delayAfterWillBePossibleToConsume = calculateDelayNanosAfterWillBePossibleToConsumeForBandwidth(0, bandwidths[0], tokensToConsume, currentTimeNanos, checkTokensToConsumeShouldBeLessThenCapacity);
for (int i = 1; i < bandwidths.length; i++) {
Bandwidth bandwidth = bandwidths[i];
long delay = calculateDelayNanosAfterWillBePossibleToConsumeForBandwidth(i, bandwidth, tokensToConsume, currentTimeNanos);
long delay = calculateDelayNanosAfterWillBePossibleToConsumeForBandwidth(i, bandwidth, tokensToConsume, currentTimeNanos, checkTokensToConsumeShouldBeLessThenCapacity);
delayAfterWillBePossibleToConsume = Math.max(delayAfterWillBePossibleToConsume, delay);
}
return delayAfterWillBePossibleToConsume;
Expand Down Expand Up @@ -267,7 +267,10 @@ private long calculateFullRefillingTime(int bandwidthIndex, Bandwidth bandwidth,
return nanosToWait < Long.MAX_VALUE? (long) nanosToWait : Long.MAX_VALUE;
}

private long calculateDelayNanosAfterWillBePossibleToConsumeForBandwidth(int bandwidthIndex, Bandwidth bandwidth, long tokensToConsume, long currentTimeNanos) {
private long calculateDelayNanosAfterWillBePossibleToConsumeForBandwidth(int bandwidthIndex, Bandwidth bandwidth, long tokensToConsume, long currentTimeNanos, boolean checkTokensToConsumeShouldBeLessThenCapacity) {
if (checkTokensToConsumeShouldBeLessThenCapacity && tokensToConsume > bandwidth.capacity) {
return Long.MAX_VALUE;
}
double currentSize = tokens[bandwidthIndex];
if (tokensToConsume <= currentSize) {
return 0;
Expand Down
Expand Up @@ -97,8 +97,8 @@ public long calculateFullRefillingTime(long currentTimeNanos) {
return state.calculateFullRefillingTime(currentTimeNanos);
}

public long calculateDelayNanosAfterWillBePossibleToConsume(long tokensToConsume, long currentTimeNanos) {
return state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
public long calculateDelayNanosAfterWillBePossibleToConsume(long tokensToConsume, long currentTimeNanos, boolean checkTokensToConsumeShouldBeLessThenCapacity) {
return state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, checkTokensToConsumeShouldBeLessThenCapacity);
}

public void addTokens(long tokensToAdd) {
Expand Down
Expand Up @@ -86,7 +86,7 @@ public CommandResult<Long> execute(MutableBucketEntry mutableEntry, long current

RemoteBucketState state = mutableEntry.get();
state.refillAllBandwidth(currentTimeNanos);
long nanosToCloseDeficit = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToCloseDeficit = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, false);

if (nanosToCloseDeficit == Long.MAX_VALUE) {
return CommandResult.success(Long.MAX_VALUE, LONG_HANDLE);
Expand Down
Expand Up @@ -87,7 +87,7 @@ public CommandResult<EstimationProbe> execute(MutableBucketEntry mutableEntry, l
EstimationProbe estimationProbe = EstimationProbe.canBeConsumed(availableToConsume);
return CommandResult.success(estimationProbe, EstimationProbe.SERIALIZATION_HANDLE);
} else {
long nanosToWaitForRefill = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToWaitForRefill = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, true);
EstimationProbe estimationProbe = EstimationProbe.canNotBeConsumed(availableToConsume, nanosToWaitForRefill);
return CommandResult.success(estimationProbe, EstimationProbe.SERIALIZATION_HANDLE);
}
Expand Down
Expand Up @@ -88,7 +88,7 @@ public CommandResult<Long> execute(MutableBucketEntry mutableEntry, long current
RemoteBucketState state = mutableEntry.get();
state.refillAllBandwidth(currentTimeNanos);

long nanosToCloseDeficit = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToCloseDeficit = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, false);
if (nanosToCloseDeficit == Long.MAX_VALUE || nanosToCloseDeficit > waitIfBusyNanosLimit) {
return CommandResult.MAX_VALUE;
} else {
Expand Down
Expand Up @@ -93,7 +93,7 @@ public CommandResult<ConsumptionProbe> execute(MutableBucketEntry mutableEntry,
return CommandResult.success(probe, ConsumptionProbe.SERIALIZATION_HANDLE);
} else {
long nanosToWaitForReset = state.calculateFullRefillingTime(currentTimeNanos);
long nanosToWaitForRefill = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToWaitForRefill = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, true);
ConsumptionProbe probe = ConsumptionProbe.rejected(availableToConsume, nanosToWaitForRefill, nanosToWaitForReset);
return CommandResult.success(probe, ConsumptionProbe.SERIALIZATION_HANDLE);
}
Expand Down
Expand Up @@ -99,7 +99,7 @@ protected ConsumptionProbe tryConsumeAndReturnRemainingTokensImpl(long tokensToC
newState.refillAllBandwidth(currentTimeNanos);
long availableToConsume = newState.getAvailableTokens();
if (tokensToConsume > availableToConsume) {
long nanosToWaitForRefill = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToWaitForRefill = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, true);
long nanosToWaitForReset = newState.calculateFullRefillingTime(currentTimeNanos);
return ConsumptionProbe.rejected(availableToConsume, nanosToWaitForRefill, nanosToWaitForReset);
}
Expand All @@ -124,7 +124,7 @@ protected EstimationProbe estimateAbilityToConsumeImpl(long tokensToEstimate) {
newState.refillAllBandwidth(currentTimeNanos);
long availableToConsume = newState.getAvailableTokens();
if (tokensToEstimate > availableToConsume) {
long nanosToWaitForRefill = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToEstimate, currentTimeNanos);
long nanosToWaitForRefill = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToEstimate, currentTimeNanos, true);
return EstimationProbe.canNotBeConsumed(availableToConsume, nanosToWaitForRefill);
} else {
return EstimationProbe.canBeConsumed(availableToConsume);
Expand All @@ -139,7 +139,7 @@ protected long reserveAndCalculateTimeToSleepImpl(long tokensToConsume, long wai

while (true) {
newState.refillAllBandwidth(currentTimeNanos);
long nanosToCloseDeficit = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToCloseDeficit = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, false);
if (nanosToCloseDeficit == 0) {
newState.consume(tokensToConsume);
if (stateRef.compareAndSet(previousState, newState)) {
Expand Down Expand Up @@ -225,7 +225,7 @@ protected long consumeIgnoringRateLimitsImpl(long tokensToConsume) {

while (true) {
newState.refillAllBandwidth(currentTimeNanos);
long nanosToCloseDeficit = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToCloseDeficit = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, false);

if (nanosToCloseDeficit == INFINITY_DURATION) {
return nanosToCloseDeficit;
Expand Down Expand Up @@ -295,7 +295,7 @@ protected VerboseResult<ConsumptionProbe> tryConsumeAndReturnRemainingTokensVerb
newState.refillAllBandwidth(currentTimeNanos);
long availableToConsume = newState.getAvailableTokens();
if (tokensToConsume > availableToConsume) {
long nanosToWaitForRefill = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToWaitForRefill = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, true);
long nanosToWaitForReset = newState.calculateFullRefillingTime(currentTimeNanos);
ConsumptionProbe consumptionProbe = ConsumptionProbe.rejected(availableToConsume, nanosToWaitForRefill, nanosToWaitForReset);
return new VerboseResult<>(currentTimeNanos, consumptionProbe, newState);
Expand All @@ -321,7 +321,7 @@ protected VerboseResult<EstimationProbe> estimateAbilityToConsumeVerboseImpl(lon
newState.refillAllBandwidth(currentTimeNanos);
long availableToConsume = newState.getAvailableTokens();
if (tokensToEstimate > availableToConsume) {
long nanosToWaitForRefill = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToEstimate, currentTimeNanos);
long nanosToWaitForRefill = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToEstimate, currentTimeNanos, true);
EstimationProbe estimationProbe = EstimationProbe.canNotBeConsumed(availableToConsume, nanosToWaitForRefill);
return new VerboseResult<>(currentTimeNanos, estimationProbe, newState);
} else {
Expand Down Expand Up @@ -400,7 +400,7 @@ protected VerboseResult<Long> consumeIgnoringRateLimitsVerboseImpl(long tokensTo

while (true) {
newState.refillAllBandwidth(currentTimeNanos);
long nanosToCloseDeficit = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToCloseDeficit = newState.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, false);

if (nanosToCloseDeficit == INFINITY_DURATION) {
return new VerboseResult<>(currentTimeNanos, nanosToCloseDeficit, newState);
Expand Down
Expand Up @@ -101,7 +101,7 @@ protected ConsumptionProbe tryConsumeAndReturnRemainingTokensImpl(long tokensToC
state.refillAllBandwidth(currentTimeNanos);
long availableToConsume = state.getAvailableTokens();
if (tokensToConsume > availableToConsume) {
long nanosToWaitForRefill = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToWaitForRefill = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, true);
long nanosToWaitForReset = state.calculateFullRefillingTime(currentTimeNanos);
return ConsumptionProbe.rejected(availableToConsume, nanosToWaitForRefill, nanosToWaitForReset);
}
Expand All @@ -122,7 +122,7 @@ protected EstimationProbe estimateAbilityToConsumeImpl(long tokensToEstimate) {
state.refillAllBandwidth(currentTimeNanos);
long availableToConsume = state.getAvailableTokens();
if (tokensToEstimate > availableToConsume) {
long nanosToWaitForRefill = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToEstimate, currentTimeNanos);
long nanosToWaitForRefill = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToEstimate, currentTimeNanos, true);
return EstimationProbe.canNotBeConsumed(availableToConsume, nanosToWaitForRefill);
}
return EstimationProbe.canBeConsumed(availableToConsume);
Expand All @@ -137,7 +137,7 @@ protected long reserveAndCalculateTimeToSleepImpl(long tokensToConsume, long wai
lock.lock();
try {
state.refillAllBandwidth(currentTimeNanos);
long nanosToCloseDeficit = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToCloseDeficit = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, false);

if (nanosToCloseDeficit == Long.MAX_VALUE || nanosToCloseDeficit > waitIfBusyNanosLimit) {
return Long.MAX_VALUE;
Expand All @@ -156,7 +156,7 @@ protected long consumeIgnoringRateLimitsImpl(long tokensToConsume) {
lock.lock();
try {
state.refillAllBandwidth(currentTimeNanos);
long nanosToCloseDeficit = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToCloseDeficit = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, false);

if (nanosToCloseDeficit == INFINITY_DURATION) {
return nanosToCloseDeficit;
Expand Down Expand Up @@ -211,7 +211,7 @@ protected VerboseResult<ConsumptionProbe> tryConsumeAndReturnRemainingTokensVerb
state.refillAllBandwidth(currentTimeNanos);
long availableToConsume = state.getAvailableTokens();
if (tokensToConsume > availableToConsume) {
long nanosToWaitForRefill = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToWaitForRefill = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, true);
long nanosToWaitForReset = state.calculateFullRefillingTime(currentTimeNanos);
ConsumptionProbe probe = ConsumptionProbe.rejected(availableToConsume, nanosToWaitForRefill, nanosToWaitForReset);
return new VerboseResult<>(currentTimeNanos, probe, state.copy());
Expand All @@ -233,7 +233,7 @@ protected VerboseResult<EstimationProbe> estimateAbilityToConsumeVerboseImpl(lon
state.refillAllBandwidth(currentTimeNanos);
long availableToConsume = state.getAvailableTokens();
if (tokensToEstimate > availableToConsume) {
long nanosToWaitForRefill = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToEstimate, currentTimeNanos);
long nanosToWaitForRefill = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToEstimate, currentTimeNanos, true);
EstimationProbe estimationProbe = EstimationProbe.canNotBeConsumed(availableToConsume, nanosToWaitForRefill);
return new VerboseResult<>(currentTimeNanos, estimationProbe, state.copy());
}
Expand Down Expand Up @@ -303,7 +303,7 @@ protected VerboseResult<Long> consumeIgnoringRateLimitsVerboseImpl(long tokensTo
lock.lock();
try {
state.refillAllBandwidth(currentTimeNanos);
long nanosToCloseDeficit = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos);
long nanosToCloseDeficit = state.calculateDelayNanosAfterWillBePossibleToConsume(tokensToConsume, currentTimeNanos, false);

if (nanosToCloseDeficit == INFINITY_DURATION) {
return new VerboseResult<>(currentTimeNanos, nanosToCloseDeficit, state.copy());
Expand Down
Expand Up @@ -11,7 +11,9 @@ import io.github.bucket4j.mock.TimeMeterMock
import spock.lang.Specification
import spock.lang.Unroll

import java.time.Duration;
import java.time.Duration

import static java.lang.Long.MAX_VALUE;

class EstimateAbilityToConsumeSpecification extends Specification {

Expand Down Expand Up @@ -43,10 +45,10 @@ class EstimateAbilityToConsumeSpecification extends Specification {
2 | 1 | true | 0 | BucketConfiguration.builder().addLimit(Bandwidth.simple(100, Duration.ofNanos(100)).withInitialTokens(1)).build()
3 | 80 | false | 10 | BucketConfiguration.builder().addLimit(Bandwidth.simple(100, Duration.ofNanos(100)).withInitialTokens(70)).build()
4 | 10 | false | 10 | BucketConfiguration.builder().addLimit(Bandwidth.simple(100, Duration.ofNanos(100)).withInitialTokens(0)).build()
5 | 120 | false | 110 | BucketConfiguration.builder().addLimit(Bandwidth.simple(100, Duration.ofNanos(100)).withInitialTokens(10)).build()
5 | 120 | false | MAX_VALUE | BucketConfiguration.builder().addLimit(Bandwidth.simple(100, Duration.ofNanos(100)).withInitialTokens(10)).build()
6 | 80 | false | 100 | BucketConfiguration.builder().addLimit(Bandwidth.classic(100, Refill.intervally(100, Duration.ofNanos(100))).withInitialTokens(70)).build()
7 | 10 | false | 100 | BucketConfiguration.builder().addLimit(Bandwidth.classic(100, Refill.intervally(100, Duration.ofNanos(100))).withInitialTokens(0)).build()
8 | 120 | false | 200 | BucketConfiguration.builder().addLimit(Bandwidth.classic(100, Refill.intervally(100, Duration.ofNanos(100))).withInitialTokens(10)).build()
8 | 120 | false | MAX_VALUE | BucketConfiguration.builder().addLimit(Bandwidth.classic(100, Refill.intervally(100, Duration.ofNanos(100))).withInitialTokens(10)).build()
}

}

0 comments on commit 15154f3

Please sign in to comment.