Skip to content

Commit

Permalink
#116: Implement method to reset bucket to initial state
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimir-bukhtoyarov committed Apr 9, 2022
1 parent ee27e5c commit b2e7140
Show file tree
Hide file tree
Showing 19 changed files with 318 additions and 3 deletions.
9 changes: 9 additions & 0 deletions asciidoc/src/main/docs/asciidoc/basic/api-reference.adoc
Expand Up @@ -146,6 +146,15 @@ boolean tryConsume(long numTokens);
void forceAddTokens(long tokensToAdd);
----

===== reset
[source, java]
----
/**
* Reset all tokens up to maximum capacity.
*/
void reset();
----

===== getAvailableTokens
[source, java]
----
Expand Down
Expand Up @@ -63,6 +63,8 @@ public abstract class AbstractBucket implements Bucket, BlockingBucket, Scheduli

protected abstract VerboseResult<Nothing> forceAddTokensVerboseImpl(long tokensToAdd);

protected abstract VerboseResult<Nothing> resetVerboseImpl();

protected abstract VerboseResult<Nothing> replaceConfigurationVerboseImpl(BucketConfiguration newConfiguration, TokensInheritanceStrategy tokensInheritanceStrategy);

protected abstract VerboseResult<Long> consumeIgnoringRateLimitsVerboseImpl(long tokensToConsume);
Expand Down Expand Up @@ -157,6 +159,11 @@ public VerboseResult<Nothing> addTokens(long tokensToAdd) {
return addTokensVerboseImpl(tokensToAdd);
}

@Override
public VerboseResult<Nothing> reset() {
return resetVerboseImpl();
}

@Override
public VerboseResult<Nothing> forceAddTokens(long tokensToAdd) {
checkTokensToAdd(tokensToAdd);
Expand Down
5 changes: 5 additions & 0 deletions bucket4j-core/src/main/java/io/github/bucket4j/Bucket.java
Expand Up @@ -176,6 +176,11 @@ static LocalBucketBuilder builder() {
*/
void forceAddTokens(long tokensToAdd);

/**
* Reset all tokens up to maximum capacity.
*/
void reset();

/**
* Returns amount of available tokens in this bucket.
Expand Down
Expand Up @@ -49,6 +49,8 @@ public interface BucketState {

void addTokens(long tokensToAdd);

void reset();

void forceAddTokens(long tokensToAdd);

long getCurrentSize(int bandwidth);
Expand Down
Expand Up @@ -325,6 +325,14 @@ public void addTokens(long tokensToAdd) {
}
}

@Override
public void reset() {
Bandwidth[] bandwidths = configuration.getBandwidths();
for (int i = 0; i < bandwidths.length; i++) {
resetBandwidth(i, bandwidths[i].capacity);
}
}

@Override
public void forceAddTokens(long tokensToAdd) {
Bandwidth[] bandwidths = configuration.getBandwidths();
Expand Down
Expand Up @@ -179,6 +179,14 @@ public void addTokens(long tokensToAdd) {
}
}

@Override
public void reset() {
Bandwidth[] bandwidths = configuration.getBandwidths();
for (int i = 0; i < bandwidths.length; i++) {
tokens[i] = bandwidths[i].getCapacity();
}
}

@Override
public void refillAllBandwidth(long currentTimeNanos) {
Bandwidth[] limits = configuration.getBandwidths();
Expand Down
Expand Up @@ -65,6 +65,11 @@ public interface VerboseBucket {
*/
VerboseResult<Nothing> addTokens(long tokensToAdd);

/**
* Does the same that {@link Bucket#reset()}
*/
VerboseResult<Nothing> reset();

/**
* Does the same that {@link Bucket#forceAddTokens(long)}
*/
Expand Down
Expand Up @@ -275,6 +275,11 @@ public interface AsyncBucketProxy {
*/
CompletableFuture<Void> forceAddTokens(long tokensToAdd);

/**
* Reset all tokens up to maximum capacity.
*/
CompletableFuture<Void> reset();

/**
* Has the same semantic with {@link Bucket#replaceConfiguration(BucketConfiguration, TokensInheritanceStrategy)}
*/
Expand Down
Expand Up @@ -73,6 +73,11 @@ public CompletableFuture<VerboseResult<Nothing>> forceAddTokens(long tokensToAdd
return completedFuture(() -> target.asVerbose().forceAddTokens(tokensToAdd));
}

@Override
public CompletableFuture<VerboseResult<Nothing>> reset() {
return completedFuture(() -> target.asVerbose().reset());
}

@Override
public CompletableFuture<VerboseResult<Nothing>> replaceConfiguration(BucketConfiguration newConfiguration, TokensInheritanceStrategy tokensInheritanceStrategy) {
LimitChecker.checkConfiguration(newConfiguration);
Expand Down Expand Up @@ -156,6 +161,14 @@ public CompletableFuture<Void> forceAddTokens(long tokensToAdd) {
});
}

@Override
public CompletableFuture<Void> reset() {
return completedFuture(() -> {
target.reset();
return null;
});
}

@Override
public CompletableFuture<Void> replaceConfiguration(BucketConfiguration newConfiguration, TokensInheritanceStrategy tokensInheritanceStrategy) {
LimitChecker.checkConfiguration(newConfiguration);
Expand Down
Expand Up @@ -69,6 +69,11 @@ public interface AsyncVerboseBucket {
*/
CompletableFuture<VerboseResult<Nothing>> forceAddTokens(long tokensToAdd);

/**
* Does the same that {@link Bucket#reset()}
*/
CompletableFuture<VerboseResult<Nothing>> reset();

/**
* Does the same that {@link Bucket#replaceConfiguration(BucketConfiguration, TokensInheritanceStrategy)}
*/
Expand Down
Expand Up @@ -171,6 +171,12 @@ public CompletableFuture<VerboseResult<Nothing>> forceAddTokens(long tokensToAdd
return execute(verboseCommand).thenApply(RemoteVerboseResult::asLocal);
}

@Override
public CompletableFuture<VerboseResult<Nothing>> reset() {
VerboseCommand<Nothing> verboseCommand = new VerboseCommand<>(new ResetCommand());
return execute(verboseCommand).thenApply(RemoteVerboseResult::asLocal);
}

@Override
public CompletableFuture<VerboseResult<Nothing>> replaceConfiguration(BucketConfiguration newConfiguration, TokensInheritanceStrategy tokensInheritanceStrategy) {
checkConfiguration(newConfiguration);
Expand Down Expand Up @@ -345,6 +351,12 @@ public CompletableFuture<Void> forceAddTokens(long tokensToAdd) {
return future.thenApply(nothing -> null);
}

@Override
public CompletableFuture<Void> reset() {
CompletableFuture<Nothing> future = execute(new ResetCommand());
return future.thenApply(nothing -> null);
}

@Override
public CompletableFuture<Long> getAvailableTokens() {
return execute(new GetAvailableTokensCommand());
Expand Down
Expand Up @@ -118,6 +118,12 @@ protected long consumeIgnoringRateLimitsImpl(long tokensToConsume) {
return execute(command);
}

@Override
public void reset() {
ResetCommand command = new ResetCommand();
execute(command);
}

@Override
public long getAvailableTokens() {
return execute(new GetAvailableTokensCommand());
Expand Down Expand Up @@ -165,6 +171,12 @@ protected VerboseResult<Nothing> forceAddTokensVerboseImpl(long tokensToAdd) {
return execute(command.asVerbose()).asLocal();
}

@Override
protected VerboseResult<Nothing> resetVerboseImpl() {
ResetCommand command = new ResetCommand();
return execute(command.asVerbose()).asLocal();
}

@Override
protected VerboseResult<Nothing> replaceConfigurationVerboseImpl(BucketConfiguration newConfiguration, TokensInheritanceStrategy tokensInheritanceStrategy) {
ReplaceConfigurationCommand replaceConfigCommand = new ReplaceConfigurationCommand(newConfiguration, tokensInheritanceStrategy);
Expand Down
Expand Up @@ -109,6 +109,10 @@ public void forceAddTokens(long tokensToAdd) {
state.forceAddTokens(tokensToAdd);
}

public void reset() {
state.reset();
}

public BucketState copyBucketState() {
return state.copy();
}
Expand Down
@@ -0,0 +1,109 @@
/*-
* ========================LICENSE_START=================================
* Bucket4j
* %%
* Copyright (C) 2015 - 2020 Vladimir Bukhtoyarov
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* =========================LICENSE_END==================================
*/

package io.github.bucket4j.distributed.remote.commands;

import io.github.bucket4j.Nothing;
import io.github.bucket4j.distributed.remote.CommandResult;
import io.github.bucket4j.distributed.remote.MutableBucketEntry;
import io.github.bucket4j.distributed.remote.RemoteBucketState;
import io.github.bucket4j.distributed.remote.RemoteCommand;
import io.github.bucket4j.distributed.serialization.DeserializationAdapter;
import io.github.bucket4j.distributed.serialization.SerializationAdapter;
import io.github.bucket4j.distributed.serialization.SerializationHandle;
import io.github.bucket4j.distributed.versioning.Version;
import io.github.bucket4j.distributed.versioning.Versions;
import io.github.bucket4j.util.ComparableByContent;

import java.io.IOException;

import static io.github.bucket4j.distributed.versioning.Versions.v_7_0_0;


public class ResetCommand implements RemoteCommand<Nothing>, ComparableByContent<ResetCommand> {

public static final SerializationHandle<ResetCommand> SERIALIZATION_HANDLE = new SerializationHandle<ResetCommand>() {
@Override
public <S> ResetCommand deserialize(DeserializationAdapter<S> adapter, S input, Version backwardCompatibilityVersion) throws IOException {
int formatNumber = adapter.readInt(input);
Versions.check(formatNumber, v_7_0_0, v_7_0_0);
return new ResetCommand();
}

@Override
public <O> void serialize(SerializationAdapter<O> adapter, O output, ResetCommand command, Version backwardCompatibilityVersion) throws IOException {
adapter.writeInt(output, v_7_0_0.getNumber());
}

@Override
public int getTypeId() {
return 39;
}

@Override
public Class<ResetCommand> getSerializedType() {
return ResetCommand.class;
}

};

public ResetCommand() {

}

@Override
public CommandResult<Nothing> execute(MutableBucketEntry mutableEntry, long currentTimeNanos) {
if (!mutableEntry.exists()) {
return CommandResult.bucketNotFound();
}

RemoteBucketState state = mutableEntry.get();
state.refillAllBandwidth(currentTimeNanos);
state.reset();
mutableEntry.set(state);
return CommandResult.empty();
}

@Override
public SerializationHandle getSerializationHandle() {
return SERIALIZATION_HANDLE;
}

@Override
public boolean equalsByContent(ResetCommand other) {
return true;
}

@Override
public boolean isImmediateSyncRequired(long unsynchronizedTokens, long nanosSinceLastSync) {
return true;
}

@Override
public long estimateTokensToConsume() {
return 0;
}

@Override
public long getConsumedTokens(Nothing result) {
return 0;
}

}
Expand Up @@ -64,9 +64,8 @@ public interface SerializationHandle<T> {
VerboseCommand.SERIALIZATION_HANDLE, // 35
SyncCommand.SERIALIZATION_HANDLE, // 36
Request.SERIALIZATION_HANDLE, // 37
ForceAddTokensCommand.SERIALIZATION_HANDLE // 38


ForceAddTokensCommand.SERIALIZATION_HANDLE, // 38
ResetCommand.SERIALIZATION_HANDLE // 39
));

<I> T deserialize(DeserializationAdapter<I> adapter, I input, Version backwardCompatibilityVersion) throws IOException;
Expand Down
Expand Up @@ -199,6 +199,24 @@ protected void forceAddTokensImpl(long tokensToAdd) {
}
}

@Override
public void reset() {
BucketState previousState = stateRef.get();
BucketState newState = previousState.copy();
long currentTimeNanos = timeMeter.currentTimeNanos();

while (true) {
newState.refillAllBandwidth(currentTimeNanos);
newState.reset();
if (stateRef.compareAndSet(previousState, newState)) {
return;
} else {
previousState = stateRef.get();
newState.copyStateFrom(previousState);
}
}
}

@Override
protected void replaceConfigurationImpl(BucketConfiguration newConfiguration, TokensInheritanceStrategy tokensInheritanceStrategy) {
BucketState previousState = stateRef.get();
Expand Down Expand Up @@ -374,6 +392,24 @@ protected VerboseResult<Nothing> forceAddTokensVerboseImpl(long tokensToAdd) {
}
}

@Override
protected VerboseResult<Nothing> resetVerboseImpl() {
BucketState previousState = stateRef.get();
BucketState newState = previousState.copy();
long currentTimeNanos = timeMeter.currentTimeNanos();

while (true) {
newState.refillAllBandwidth(currentTimeNanos);
newState.reset();
if (stateRef.compareAndSet(previousState, newState)) {
return new VerboseResult<>(currentTimeNanos, Nothing.INSTANCE, newState.copy());
} else {
previousState = stateRef.get();
newState.copyStateFrom(previousState);
}
}
}

@Override
protected VerboseResult<Nothing> replaceConfigurationVerboseImpl(BucketConfiguration newConfiguration, TokensInheritanceStrategy tokensInheritanceStrategy) {
BucketState previousState = stateRef.get();
Expand Down

0 comments on commit b2e7140

Please sign in to comment.