Skip to content

Commit

Permalink
improve rate specs
Browse files Browse the repository at this point in the history
  • Loading branch information
jshook committed Apr 16, 2024
1 parent 6effb27 commit 68744f3
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 39 deletions.
Expand Up @@ -18,10 +18,15 @@

import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.nb.api.engine.util.Unit;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* <H2>Rate Limiter Specifications</H2>
Expand Down Expand Up @@ -127,6 +132,7 @@ public class SimRateSpec {
public static final double DEFAULT_RATE_OPS_S = 1.0D;
public static final double DEFAULT_BURST_RATIO = 1.1D;
public static Verb DEFAULT_VERB = Verb.start;

public enum Scope {
thread,
activity
Expand Down Expand Up @@ -162,19 +168,19 @@ public int nanosToTicks(long newNanoTokens) {
// }
return switch (unit) {
case NANOS -> (int) newNanoTokens;
case MICROS -> (int) (newNanoTokens/1_000L);
case MILLIS -> (int) (newNanoTokens/1_000_000L);
case SECONDS -> (int) (newNanoTokens/1_000_000_000L);
case MICROS -> (int) (newNanoTokens / 1_000L);
case MILLIS -> (int) (newNanoTokens / 1_000_000L);
case SECONDS -> (int) (newNanoTokens / 1_000_000_000L);
default -> throw new RuntimeException("invalid ChronoUnit for nanosToTicks:" + unit);
};
}

public long ticksToNanos(int newTicks) {
return switch (unit) {
case NANOS -> newTicks;
case MICROS -> newTicks*1_000L;
case MILLIS -> newTicks*1_000_000L;
case SECONDS -> newTicks*1_000_000_000L;
case MICROS -> newTicks * 1_000L;
case MILLIS -> newTicks * 1_000_000L;
case SECONDS -> newTicks * 1_000_000_000L;
default -> throw new RuntimeException("invalid ChronoUnit for ticksToNanos:" + unit);
};
}
Expand Down Expand Up @@ -216,12 +222,15 @@ public enum Verb {
public SimRateSpec(double opsPerSec, double burstRatio) {
this(opsPerSec, burstRatio, DEFAULT_VERB);
}

public SimRateSpec(double opsPerSec, double burstRatio, Verb verb) {
apply(opsPerSec, burstRatio, verb, Scope.activity);
}

public SimRateSpec(double opsPerSec, double burstRatio, Scope scope) {
apply(opsPerSec, burstRatio, DEFAULT_VERB, scope);
}

public SimRateSpec(double opsPerSec, double burstRatio, Verb verb, Scope scope) {
apply(opsPerSec, burstRatio, verb, scope);
}
Expand Down Expand Up @@ -256,31 +265,57 @@ public SimRateSpec(ParameterMap.NamedParameter tuple) {

public SimRateSpec(String spec) {
String[] specs = spec.split("[,:;]");
int offset=0;
double opsPerSec;
double burstRatio = DEFAULT_BURST_RATIO;
Verb verb = Verb.start;
Scope scope = Scope.activity;
switch (specs.length) {
case 4:
scope = Scope.valueOf(specs[3].toLowerCase());
case 3:
try {
scope = Scope.valueOf(specs[2].toLowerCase());
logger.debug("selected rate limiter scope: " + scope);
} catch (IllegalArgumentException iae) {
verb = Verb.valueOf(specs[2].toLowerCase());
logger.debug("selected rate limiter type: " + verb);
}
case 2:
burstRatio = Double.valueOf(specs[1]);
if (burstRatio < 1.0) {
throw new RuntimeException("burst ratios less than 1.0 are invalid.");
}
case 1:
opsPerSec = Unit.doubleCountFor(specs[0]).orElseThrow(() -> new RuntimeException("Unparsable:" + specs[0]));
break;
default:
throw new RuntimeException("Rate specs must be either '<rate>' or '<rate>:<burstRatio>' as in 5000.0 or 5000.0:1.0");
String oprateSpec = specs[offset++];
opsPerSec = Unit.doubleCountFor(oprateSpec).orElseThrow(() -> new RuntimeException("Unparsable:" + oprateSpec));
if (specs.length >= 2) {
try {
burstRatio = Double.parseDouble(specs[1]);
offset++;
} catch (NumberFormatException ignored) {
}
}

for (int i = offset; i < specs.length; i++) {
String specword = specs[i].toLowerCase();

try {
scope = Scope.valueOf(specword);
specword = null;
logger.debug("selected rate limiter scope: " + scope);
continue;
} catch (IllegalArgumentException ignored) {
}

try {
verb = Verb.valueOf(specword);
specword = null;
logger.debug("selected rate limiter type: " + verb);
continue;
} catch (IllegalArgumentException ignored) {
}

if (specword != null) {
String msg = """
Spec format 'SPECFORMAT' was not recognized for FORTYPE.
Use the format <ops/s>[,<burst ratio][,<verb>][,<scope>]
Examples:
100 (100 ops per second)
100,1.1 (with a burst ratio of 10% over)
100,start (start the rate limiter automatically)
100,thread (scope the rate limiter to each thread in an activity)
100,1.1,start,thread (all of the above)
Defaults: burst_ratio=1.1 verb=start scope=activity
"""
.replaceAll("SPECFORMAT", spec)
.replaceAll("FORTYPE", this.getClass().getSimpleName());
throw new BasicError(msg);
}

}
apply(opsPerSec, burstRatio, verb, scope);
}
Expand Down Expand Up @@ -312,6 +347,8 @@ public boolean equals(Object o) {
SimRateSpec simRateSpec = (SimRateSpec) o;

if (Double.compare(simRateSpec.opsPerSec, opsPerSec) != 0) return false;
if (verb!=simRateSpec.verb) return false;
if (scope!=simRateSpec.scope) return false;
return Double.compare(simRateSpec.burstRatio, burstRatio) == 0;
}

Expand Down Expand Up @@ -343,5 +380,4 @@ public Scope getScope() {
}



}
Expand Up @@ -48,17 +48,31 @@ public void testTypeSelection() {
}

@Test
public void testScopeSelection() {
SimRateSpec asd = new SimRateSpec("12345,1.4");
assertThat(asd.getScope()).isEqualTo(SimRateSpec.Scope.activity);
SimRateSpec ts = new SimRateSpec("12345,1.4,start,thread");
assertThat(ts.getScope()).isEqualTo(SimRateSpec.Scope.thread);
SimRateSpec as = new SimRateSpec("12345,1.4,start,activity");
assertThat(as.getScope()).isEqualTo(SimRateSpec.Scope.activity);
SimRateSpec asa = new SimRateSpec("12345,1.4,activity");
assertThat(asa.getScope()).isEqualTo(SimRateSpec.Scope.activity);
SimRateSpec ast = new SimRateSpec("12345,1.4,thread");
assertThat(ast.getScope()).isEqualTo(SimRateSpec.Scope.thread);
public void testVariantFormats() {
assertThat(new SimRateSpec("12345"))
.isEqualTo(new SimRateSpec(
12345.0d, 1.1d, SimRateSpec.Verb.start, SimRateSpec.Scope.activity
));
assertThat(new SimRateSpec("12345,1.4"))
.isEqualTo(new SimRateSpec(
12345.0d, 1.4d, SimRateSpec.Verb.start, SimRateSpec.Scope.activity
));
assertThat(new SimRateSpec("12345,configure"))
.isEqualTo(new SimRateSpec(
12345.0d, 1.1d, SimRateSpec.Verb.configure, SimRateSpec.Scope.activity
));
assertThat(new SimRateSpec("12345,thread"))
.isEqualTo(new SimRateSpec(
12345.0d, 1.1d, SimRateSpec.Verb.start, SimRateSpec.Scope.thread
));
assertThat(new SimRateSpec("12345,1.4,thread"))
.isEqualTo(new SimRateSpec(
12345.0d, 1.4d, SimRateSpec.Verb.start, SimRateSpec.Scope.thread
));
assertThat(new SimRateSpec("12345,configure,activity"))
.isEqualTo(new SimRateSpec(
12345.0d, 1.1d, SimRateSpec.Verb.configure, SimRateSpec.Scope.activity
));

}
}

0 comments on commit 68744f3

Please sign in to comment.