Skip to content

Commit

Permalink
Merge pull request #1928 from nosqlbench/nosqlbench-1323-tlrate
Browse files Browse the repository at this point in the history
Implement thread-local scope for rate limiters and improve spec format
  • Loading branch information
jshook committed Apr 16, 2024
2 parents 3dbbc13 + 68744f3 commit c2aa0bf
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 135 deletions.
Expand Up @@ -74,7 +74,7 @@ public NBComponent getParent() {
}

@Override
public NBComponent attachChild(NBComponent... children) {
public synchronized NBComponent attachChild(NBComponent... children) {

for (NBComponent child : children) {
logger.debug(() -> "attaching " + child.description() + " to parent " + this.description());
Expand Down
Expand Up @@ -106,22 +106,6 @@ default String getCycleSummary() {
*/
RateLimiter getCycleLimiter();

/**
* Set the cycle rate limiter for this activity. This method should only
* be used non-concurrently. Otherwise, the supplier version
* {@link #getCycleRateLimiter(Supplier)} should be used.
* @param rateLimiter The cycle {@link RateLimiter} for this activity
*/
void setCycleLimiter(RateLimiter rateLimiter);

/**
* Get or create the cycle rate limiter in a safe way. Implementations
* should ensure that this method is synchronized or that each requester
* gets the same cycle rate limiter for the activity.
* @param supplier A {@link RateLimiter} {@link Supplier}
* @return An extant or newly created cycle {@link RateLimiter}
*/
RateLimiter getCycleRateLimiter(Supplier<? extends RateLimiter> supplier);

/**
* Get the current stride rate limiter for this activity.
Expand All @@ -131,23 +115,6 @@ default String getCycleSummary() {
*/
RateLimiter getStrideLimiter();

/**
* Set the stride rate limiter for this activity. This method should only
* be used non-concurrently. Otherwise, the supplier version
* {@link #getStrideRateLimiter(Supplier)}} should be used.
* @param rateLimiter The stride {@link RateLimiter} for this activity.
*/
void setStrideLimiter(RateLimiter rateLimiter);

/**
* Get or create the stride {@link RateLimiter} in a concurrent-safe
* way. Implementations should ensure that this method is synchronized or
* that each requester gets the same stride rate limiter for the activity.
* @param supplier A {@link RateLimiter} {@link Supplier}
* @return An extant or newly created stride {@link RateLimiter}
*/
RateLimiter getStrideRateLimiter(Supplier<? extends RateLimiter> supplier);

/**
* Get or create the instrumentation needed for this activity. This provides
* a single place to find and manage, and document instrumentation that is
Expand Down
Expand Up @@ -79,7 +79,11 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
private long startTime;

public SimRate(NBComponent parent, SimRateSpec spec) {
super(parent, NBLabels.forKV().and("rateType",
this(parent, spec, NBLabels.forKV());
}

public SimRate(NBComponent parent, SimRateSpec spec, NBLabels extraLabels) {
super(parent, extraLabels.and("rateType",
(spec instanceof CycleRateSpec? "cycle" : "stride")));
this.spec = spec;
initMetrics();
Expand Down
Expand Up @@ -18,10 +18,15 @@

import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.nb.api.engine.util.Unit;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* <H2>Rate Limiter Specifications</H2>
Expand Down Expand Up @@ -49,7 +54,6 @@
* </UL>
* <p>
* Where:
*
* <EM>rate</EM> is the ops per second, expressed as any positive floating point value.
* <EM>burst ratio</EM> is a floating point value greater than 1.0 which determines how much faster
* the rate limiter may go to catch up to the overall.
Expand Down Expand Up @@ -129,7 +133,13 @@ public class SimRateSpec {
public static final double DEFAULT_BURST_RATIO = 1.1D;
public static Verb DEFAULT_VERB = Verb.start;

public enum Scope {
thread,
activity
}

public ChronoUnit unit;
private Scope scope = Scope.activity;

/**
* Target rate in Operations Per Second
Expand Down Expand Up @@ -158,19 +168,19 @@ public int nanosToTicks(long newNanoTokens) {
// }
return switch (unit) {
case NANOS -> (int) newNanoTokens;
case MICROS -> (int) (newNanoTokens/1_000L);
case MILLIS -> (int) (newNanoTokens/1_000_000L);
case SECONDS -> (int) (newNanoTokens/1_000_000_000L);
case MICROS -> (int) (newNanoTokens / 1_000L);
case MILLIS -> (int) (newNanoTokens / 1_000_000L);
case SECONDS -> (int) (newNanoTokens / 1_000_000_000L);
default -> throw new RuntimeException("invalid ChronoUnit for nanosToTicks:" + unit);
};
}

public long ticksToNanos(int newTicks) {
return switch (unit) {
case NANOS -> newTicks;
case MICROS -> newTicks*1_000L;
case MILLIS -> newTicks*1_000_000L;
case SECONDS -> newTicks*1_000_000_000L;
case MICROS -> newTicks * 1_000L;
case MILLIS -> newTicks * 1_000_000L;
case SECONDS -> newTicks * 1_000_000_000L;
default -> throw new RuntimeException("invalid ChronoUnit for ticksToNanos:" + unit);
};
}
Expand Down Expand Up @@ -213,15 +223,25 @@ public SimRateSpec(double opsPerSec, double burstRatio) {
this(opsPerSec, burstRatio, DEFAULT_VERB);
}

public SimRateSpec(double opsPerSec, double burstRatio, Verb type) {
apply(opsPerSec, burstRatio, verb);
public SimRateSpec(double opsPerSec, double burstRatio, Verb verb) {
apply(opsPerSec, burstRatio, verb, Scope.activity);
}

public SimRateSpec(double opsPerSec, double burstRatio, Scope scope) {
apply(opsPerSec, burstRatio, DEFAULT_VERB, scope);
}

private void apply(double opsPerSec, double burstRatio, Verb verb) {
public SimRateSpec(double opsPerSec, double burstRatio, Verb verb, Scope scope) {
apply(opsPerSec, burstRatio, verb, scope);
}


private void apply(double opsPerSec, double burstRatio, Verb verb, Scope scope) {
this.opsPerSec = opsPerSec;
this.burstRatio = burstRatio;
this.verb = verb;
this.unit = chronoUnitFor(opsPerSec);
this.scope = scope;

// TODO: include burst into ticks calculation
}
Expand All @@ -245,25 +265,59 @@ public SimRateSpec(ParameterMap.NamedParameter tuple) {

public SimRateSpec(String spec) {
String[] specs = spec.split("[,:;]");
Verb verb = Verb.start;
double burstRatio = DEFAULT_BURST_RATIO;
int offset=0;
double opsPerSec;
switch (specs.length) {
case 3:
verb = Verb.valueOf(specs[2].toLowerCase());
double burstRatio = DEFAULT_BURST_RATIO;
Verb verb = Verb.start;
Scope scope = Scope.activity;
String oprateSpec = specs[offset++];
opsPerSec = Unit.doubleCountFor(oprateSpec).orElseThrow(() -> new RuntimeException("Unparsable:" + oprateSpec));
if (specs.length >= 2) {
try {
burstRatio = Double.parseDouble(specs[1]);
offset++;
} catch (NumberFormatException ignored) {
}
}

for (int i = offset; i < specs.length; i++) {
String specword = specs[i].toLowerCase();

try {
scope = Scope.valueOf(specword);
specword = null;
logger.debug("selected rate limiter scope: " + scope);
continue;
} catch (IllegalArgumentException ignored) {
}

try {
verb = Verb.valueOf(specword);
specword = null;
logger.debug("selected rate limiter type: " + verb);
case 2:
burstRatio = Double.valueOf(specs[1]);
if (burstRatio < 1.0) {
throw new RuntimeException("burst ratios less than 1.0 are invalid.");
}
case 1:
opsPerSec = Unit.doubleCountFor(specs[0]).orElseThrow(() -> new RuntimeException("Unparsable:" + specs[0]));
break;
default:
throw new RuntimeException("Rate specs must be either '<rate>' or '<rate>:<burstRatio>' as in 5000.0 or 5000.0:1.0");
continue;
} catch (IllegalArgumentException ignored) {
}

if (specword != null) {
String msg = """
Spec format 'SPECFORMAT' was not recognized for FORTYPE.
Use the format <ops/s>[,<burst ratio][,<verb>][,<scope>]
Examples:
100 (100 ops per second)
100,1.1 (with a burst ratio of 10% over)
100,start (start the rate limiter automatically)
100,thread (scope the rate limiter to each thread in an activity)
100,1.1,start,thread (all of the above)
Defaults: burst_ratio=1.1 verb=start scope=activity
"""
.replaceAll("SPECFORMAT", spec)
.replaceAll("FORTYPE", this.getClass().getSimpleName());
throw new BasicError(msg);
}

}
apply(opsPerSec, burstRatio, verb);
apply(opsPerSec, burstRatio, verb, scope);
}

public String toString() {
Expand Down Expand Up @@ -293,6 +347,8 @@ public boolean equals(Object o) {
SimRateSpec simRateSpec = (SimRateSpec) o;

if (Double.compare(simRateSpec.opsPerSec, opsPerSec) != 0) return false;
if (verb!=simRateSpec.verb) return false;
if (scope!=simRateSpec.scope) return false;
return Double.compare(simRateSpec.burstRatio, burstRatio) == 0;
}

Expand All @@ -319,5 +375,9 @@ public boolean isRestart() {
return this.verb == Verb.restart;
}

public Scope getScope() {
return this.scope;
}


}
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nosqlbench.engine.api.activityapi.simrate;

import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.function.Supplier;

public class ThreadLocalRateLimiters {

private static final Logger logger = LogManager.getLogger(ThreadLocalRateLimiters.class);

public static synchronized ThreadLocal<RateLimiter> createOrUpdate(
final NBComponent parent,
final ThreadLocal<RateLimiter> extantSource,
final SimRateSpec spec
) {
if (extantSource != null) {
RateLimiter rl = extantSource.get();
rl.applyRateSpec(spec);
return extantSource;
} else {
Supplier<RateLimiter> rls;
rls = switch (spec.getScope()) {
case activity -> {
SimRate rl = new SimRate(parent, spec);
yield () -> rl;
}
case thread -> () -> new SimRate(
parent,
spec,
NBLabels.forKV("thread", Thread.currentThread().getName())
);
};
return ThreadLocal.withInitial(rls);
}
}

}

0 comments on commit c2aa0bf

Please sign in to comment.