diff --git a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBBaseComponent.java b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBBaseComponent.java
index f45dc4e489..92ed6273cc 100644
--- a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBBaseComponent.java
+++ b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBBaseComponent.java
@@ -74,7 +74,7 @@ public NBComponent getParent() {
}
@Override
- public NBComponent attachChild(NBComponent... children) {
+ public synchronized NBComponent attachChild(NBComponent... children) {
for (NBComponent child : children) {
logger.debug(() -> "attaching " + child.description() + " to parent " + this.description());
diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Activity.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Activity.java
index 5ab967f9f3..e1eea06fd7 100644
--- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Activity.java
+++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Activity.java
@@ -106,22 +106,6 @@ default String getCycleSummary() {
*/
RateLimiter getCycleLimiter();
- /**
- * Set the cycle rate limiter for this activity. This method should only
- * be used non-concurrently. Otherwise, the supplier version
- * {@link #getCycleRateLimiter(Supplier)} should be used.
- * @param rateLimiter The cycle {@link RateLimiter} for this activity
- */
- void setCycleLimiter(RateLimiter rateLimiter);
-
- /**
- * Get or create the cycle rate limiter in a safe way. Implementations
- * should ensure that this method is synchronized or that each requester
- * gets the same cycle rate limiter for the activity.
- * @param supplier A {@link RateLimiter} {@link Supplier}
- * @return An extant or newly created cycle {@link RateLimiter}
- */
- RateLimiter getCycleRateLimiter(Supplier extends RateLimiter> supplier);
/**
* Get the current stride rate limiter for this activity.
@@ -131,23 +115,6 @@ default String getCycleSummary() {
*/
RateLimiter getStrideLimiter();
- /**
- * Set the stride rate limiter for this activity. This method should only
- * be used non-concurrently. Otherwise, the supplier version
- * {@link #getStrideRateLimiter(Supplier)}} should be used.
- * @param rateLimiter The stride {@link RateLimiter} for this activity.
- */
- void setStrideLimiter(RateLimiter rateLimiter);
-
- /**
- * Get or create the stride {@link RateLimiter} in a concurrent-safe
- * way. Implementations should ensure that this method is synchronized or
- * that each requester gets the same stride rate limiter for the activity.
- * @param supplier A {@link RateLimiter} {@link Supplier}
- * @return An extant or newly created stride {@link RateLimiter}
- */
- RateLimiter getStrideRateLimiter(Supplier extends RateLimiter> supplier);
-
/**
* Get or create the instrumentation needed for this activity. This provides
* a single place to find and manage, and document instrumentation that is
diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/simrate/SimRate.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/simrate/SimRate.java
index e2848b3fc7..9f260116a9 100644
--- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/simrate/SimRate.java
+++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/simrate/SimRate.java
@@ -79,7 +79,11 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
private long startTime;
public SimRate(NBComponent parent, SimRateSpec spec) {
- super(parent, NBLabels.forKV().and("rateType",
+ this(parent, spec, NBLabels.forKV());
+ }
+
+ public SimRate(NBComponent parent, SimRateSpec spec, NBLabels extraLabels) {
+ super(parent, extraLabels.and("rateType",
(spec instanceof CycleRateSpec? "cycle" : "stride")));
this.spec = spec;
initMetrics();
diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/simrate/SimRateSpec.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/simrate/SimRateSpec.java
index 6dfdca1c63..cf74bd6356 100644
--- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/simrate/SimRateSpec.java
+++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/simrate/SimRateSpec.java
@@ -18,10 +18,15 @@
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.nb.api.engine.util.Unit;
+import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
*
Rate Limiter Specifications
@@ -49,7 +54,6 @@
*
*
* Where:
- *
* rate is the ops per second, expressed as any positive floating point value.
* burst ratio is a floating point value greater than 1.0 which determines how much faster
* the rate limiter may go to catch up to the overall.
@@ -129,7 +133,13 @@ public class SimRateSpec {
public static final double DEFAULT_BURST_RATIO = 1.1D;
public static Verb DEFAULT_VERB = Verb.start;
+ public enum Scope {
+ thread,
+ activity
+ }
+
public ChronoUnit unit;
+ private Scope scope = Scope.activity;
/**
* Target rate in Operations Per Second
@@ -158,9 +168,9 @@ public int nanosToTicks(long newNanoTokens) {
// }
return switch (unit) {
case NANOS -> (int) newNanoTokens;
- case MICROS -> (int) (newNanoTokens/1_000L);
- case MILLIS -> (int) (newNanoTokens/1_000_000L);
- case SECONDS -> (int) (newNanoTokens/1_000_000_000L);
+ case MICROS -> (int) (newNanoTokens / 1_000L);
+ case MILLIS -> (int) (newNanoTokens / 1_000_000L);
+ case SECONDS -> (int) (newNanoTokens / 1_000_000_000L);
default -> throw new RuntimeException("invalid ChronoUnit for nanosToTicks:" + unit);
};
}
@@ -168,9 +178,9 @@ public int nanosToTicks(long newNanoTokens) {
public long ticksToNanos(int newTicks) {
return switch (unit) {
case NANOS -> newTicks;
- case MICROS -> newTicks*1_000L;
- case MILLIS -> newTicks*1_000_000L;
- case SECONDS -> newTicks*1_000_000_000L;
+ case MICROS -> newTicks * 1_000L;
+ case MILLIS -> newTicks * 1_000_000L;
+ case SECONDS -> newTicks * 1_000_000_000L;
default -> throw new RuntimeException("invalid ChronoUnit for ticksToNanos:" + unit);
};
}
@@ -213,15 +223,25 @@ public SimRateSpec(double opsPerSec, double burstRatio) {
this(opsPerSec, burstRatio, DEFAULT_VERB);
}
- public SimRateSpec(double opsPerSec, double burstRatio, Verb type) {
- apply(opsPerSec, burstRatio, verb);
+ public SimRateSpec(double opsPerSec, double burstRatio, Verb verb) {
+ apply(opsPerSec, burstRatio, verb, Scope.activity);
+ }
+
+ public SimRateSpec(double opsPerSec, double burstRatio, Scope scope) {
+ apply(opsPerSec, burstRatio, DEFAULT_VERB, scope);
}
- private void apply(double opsPerSec, double burstRatio, Verb verb) {
+ public SimRateSpec(double opsPerSec, double burstRatio, Verb verb, Scope scope) {
+ apply(opsPerSec, burstRatio, verb, scope);
+ }
+
+
+ private void apply(double opsPerSec, double burstRatio, Verb verb, Scope scope) {
this.opsPerSec = opsPerSec;
this.burstRatio = burstRatio;
this.verb = verb;
this.unit = chronoUnitFor(opsPerSec);
+ this.scope = scope;
// TODO: include burst into ticks calculation
}
@@ -245,25 +265,59 @@ public SimRateSpec(ParameterMap.NamedParameter tuple) {
public SimRateSpec(String spec) {
String[] specs = spec.split("[,:;]");
- Verb verb = Verb.start;
- double burstRatio = DEFAULT_BURST_RATIO;
+ int offset=0;
double opsPerSec;
- switch (specs.length) {
- case 3:
- verb = Verb.valueOf(specs[2].toLowerCase());
+ double burstRatio = DEFAULT_BURST_RATIO;
+ Verb verb = Verb.start;
+ Scope scope = Scope.activity;
+ String oprateSpec = specs[offset++];
+ opsPerSec = Unit.doubleCountFor(oprateSpec).orElseThrow(() -> new RuntimeException("Unparsable:" + oprateSpec));
+ if (specs.length >= 2) {
+ try {
+ burstRatio = Double.parseDouble(specs[1]);
+ offset++;
+ } catch (NumberFormatException ignored) {
+ }
+ }
+
+ for (int i = offset; i < specs.length; i++) {
+ String specword = specs[i].toLowerCase();
+
+ try {
+ scope = Scope.valueOf(specword);
+ specword = null;
+ logger.debug("selected rate limiter scope: " + scope);
+ continue;
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ try {
+ verb = Verb.valueOf(specword);
+ specword = null;
logger.debug("selected rate limiter type: " + verb);
- case 2:
- burstRatio = Double.valueOf(specs[1]);
- if (burstRatio < 1.0) {
- throw new RuntimeException("burst ratios less than 1.0 are invalid.");
- }
- case 1:
- opsPerSec = Unit.doubleCountFor(specs[0]).orElseThrow(() -> new RuntimeException("Unparsable:" + specs[0]));
- break;
- default:
- throw new RuntimeException("Rate specs must be either '' or ':' as in 5000.0 or 5000.0:1.0");
+ continue;
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ if (specword != null) {
+ String msg = """
+ Spec format 'SPECFORMAT' was not recognized for FORTYPE.
+ Use the format [,][,]
+ Examples:
+ 100 (100 ops per second)
+ 100,1.1 (with a burst ratio of 10% over)
+ 100,start (start the rate limiter automatically)
+ 100,thread (scope the rate limiter to each thread in an activity)
+ 100,1.1,start,thread (all of the above)
+ Defaults: burst_ratio=1.1 verb=start scope=activity
+ """
+ .replaceAll("SPECFORMAT", spec)
+ .replaceAll("FORTYPE", this.getClass().getSimpleName());
+ throw new BasicError(msg);
+ }
+
}
- apply(opsPerSec, burstRatio, verb);
+ apply(opsPerSec, burstRatio, verb, scope);
}
public String toString() {
@@ -293,6 +347,8 @@ public boolean equals(Object o) {
SimRateSpec simRateSpec = (SimRateSpec) o;
if (Double.compare(simRateSpec.opsPerSec, opsPerSec) != 0) return false;
+ if (verb!=simRateSpec.verb) return false;
+ if (scope!=simRateSpec.scope) return false;
return Double.compare(simRateSpec.burstRatio, burstRatio) == 0;
}
@@ -319,5 +375,9 @@ public boolean isRestart() {
return this.verb == Verb.restart;
}
+ public Scope getScope() {
+ return this.scope;
+ }
+
}
diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/simrate/ThreadLocalRateLimiters.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/simrate/ThreadLocalRateLimiters.java
new file mode 100644
index 0000000000..361a0dd963
--- /dev/null
+++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/simrate/ThreadLocalRateLimiters.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2022-2023 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.engine.api.activityapi.simrate;
+
+import io.nosqlbench.nb.api.components.core.NBComponent;
+import io.nosqlbench.nb.api.labels.NBLabels;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.function.Supplier;
+
+public class ThreadLocalRateLimiters {
+
+ private static final Logger logger = LogManager.getLogger(ThreadLocalRateLimiters.class);
+
+ public static synchronized ThreadLocal createOrUpdate(
+ final NBComponent parent,
+ final ThreadLocal extantSource,
+ final SimRateSpec spec
+ ) {
+ if (extantSource != null) {
+ RateLimiter rl = extantSource.get();
+ rl.applyRateSpec(spec);
+ return extantSource;
+ } else {
+ Supplier rls;
+ rls = switch (spec.getScope()) {
+ case activity -> {
+ SimRate rl = new SimRate(parent, spec);
+ yield () -> rl;
+ }
+ case thread -> () -> new SimRate(
+ parent,
+ spec,
+ NBLabels.forKV("thread", Thread.currentThread().getName())
+ );
+ };
+ return ThreadLocal.withInitial(rls);
+ }
+ }
+
+}
diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java
index 7d153b65aa..6e31f8c469 100644
--- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java
+++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java
@@ -18,6 +18,7 @@
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
+import io.nosqlbench.engine.api.activityapi.simrate.*;
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.events.ParamChange;
@@ -28,9 +29,6 @@
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
-import io.nosqlbench.engine.api.activityapi.simrate.RateLimiters;
-import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
-import io.nosqlbench.engine.api.activityapi.simrate.SimRateSpec;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.nb.api.components.status.NBStatusComponent;
@@ -42,12 +40,10 @@
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
-import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter;
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
-import io.nosqlbench.engine.api.activityapi.simrate.StrideRateSpec;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryRunOpDispenserWrapper;
@@ -78,8 +74,8 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
private OutputDispenser markerDispenser;
private IntPredicateDispenser resultFilterDispenser;
private RunState runState = RunState.Uninitialized;
- private RateLimiter strideLimiter;
- private RateLimiter cycleLimiter;
+ private ThreadLocal strideLimiterSource;
+ private ThreadLocal cycleLimiterSource;
private ActivityInstrumentation activityInstrumentation;
private PrintWriter console;
private long startedAtMillis;
@@ -229,41 +225,21 @@ public void closeAutoCloseables() {
@Override
public RateLimiter getCycleLimiter() {
- return this.cycleLimiter;
- }
-
- @Override
- public synchronized void setCycleLimiter(RateLimiter rateLimiter) {
- this.cycleLimiter = rateLimiter;
- }
-
- @Override
- public synchronized RateLimiter getCycleRateLimiter(Supplier extends RateLimiter> s) {
- if (null == this.cycleLimiter) {
- cycleLimiter = s.get();
+ if (cycleLimiterSource!=null) {
+ return cycleLimiterSource.get();
+ } else {
+ return null;
}
- return cycleLimiter;
}
-
@Override
public synchronized RateLimiter getStrideLimiter() {
- return this.strideLimiter;
- }
-
- @Override
- public synchronized void setStrideLimiter(RateLimiter rateLimiter) {
- this.strideLimiter = rateLimiter;
- }
-
- @Override
- public synchronized RateLimiter getStrideRateLimiter(Supplier extends RateLimiter> s) {
- if (null == this.strideLimiter) {
- strideLimiter = s.get();
+ if (strideLimiterSource!=null) {
+ return strideLimiterSource.get();
+ } else {
+ return null;
}
- return strideLimiter;
}
-
@Override
public synchronized ActivityInstrumentation getInstrumentation() {
if (null == this.activityInstrumentation) {
@@ -306,6 +282,8 @@ public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {
+// cycleratePerThread = activityDef.getParams().takeBoolOrDefault("cyclerate_per_thread", false);
+
activityDef.getParams().getOptionalNamedParameter("striderate")
.map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
@@ -315,13 +293,14 @@ public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {
}
public void createOrUpdateStrideLimiter(SimRateSpec spec) {
- strideLimiter = RateLimiters.createOrUpdate(this, strideLimiter, spec);
+ strideLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, strideLimiterSource, spec);
}
public void createOrUpdateCycleLimiter(SimRateSpec spec) {
- cycleLimiter = RateLimiters.createOrUpdate(this, cycleLimiter, spec);
+ cycleLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, cycleLimiterSource, spec);
}
+
/**
* Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been provided, based on the
* length of the sequence as determined by the provided ratios. Also, modify the ActivityDef with reasonable
@@ -344,10 +323,7 @@ public synchronized void setDefaultsFromOpSequence(OpSequence> seq) {
if (cyclesOpt.isEmpty()) {
String cycles = getParams().getOptionalString("stride").orElseThrow();
logger.info(() -> "defaulting cycles to " + cycles + " (the stride length)");
-// getParams().set("cycles", getParams().getOptionalString("stride").orElseThrow());
-// getParams().setSilently("cycles", getParams().getOptionalString("stride").orElseThrow());
this.getActivityDef().setCycles(getParams().getOptionalString("stride").orElseThrow());
-// getParams().set("cycles", getParams().getOptionalString("stride").orElseThrow());
} else {
if (0 == activityDef.getCycleCount()) {
throw new RuntimeException(
@@ -680,4 +656,22 @@ public RunStateTally getRunStateTally() {
public Map asResult() {
return Map.of("activity",this.getAlias());
}
+
+// private final ThreadLocal cycleLimiterThreadLocal = ThreadLocal.withInitial(() -> {
+// RateLimiters.createOrUpdate(this,null,new SimRateSpec()
+// if (cycleratePerThread) {
+// return RateLimiters.createOrUpdate(new NBThreadComponent(this),null,)
+// } else {
+// RateLimiters.createOrUpdate(new NBThreadComponent(this),null,activityDef)
+// }
+// if (getCycleLimiter() != null) {
+// return RateLimiters.createOrUpdate(
+// new NBThreadComponent(this),
+// getCycleLimiter(),
+// getCycleLimiter().getSpec());
+// } else {
+// return null;
+// }
+// });
+
}
diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java
index 8a6b9b0127..d3154d15d3 100644
--- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java
+++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java
@@ -54,16 +54,18 @@
* core of all new activity types. Extant NB drivers should also migrate
* to this when possible.
*
- * @param A type of runnable which wraps the operations for this type of driver.
- * @param The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph
+ * @param
+ * A type of runnable which wraps the operations for this type of driver.
+ * @param
+ * The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph
*/
public class StandardActivity extends SimpleActivity implements SyntheticOpTemplateProvider, ActivityDefObserver {
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final OpSequence> sequence;
- private final ConcurrentHashMap> adapters = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap> adapters = new ConcurrentHashMap<>();
public StandardActivity(NBComponent parent, ActivityDef activityDef) {
- super(parent,activityDef);
+ super(parent, activityDef);
OpsDocList workload;
Optional yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
@@ -77,9 +79,9 @@ public StandardActivity(NBComponent parent, ActivityDef activityDef) {
}
Optional defaultDriverName = activityDef.getParams().getOptionalString("driver");
- Optional> defaultAdapter = defaultDriverName
- .flatMap(name -> ServiceSelector.of(name,ServiceLoader.load(DriverAdapterLoader.class)).get())
- .map(l -> l.load(this,NBLabels.forKV()));
+ Optional> defaultAdapter = defaultDriverName
+ .flatMap(name -> ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class)).get())
+ .map(l -> l.load(this, NBLabels.forKV()));
if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) {
throw new BasicError("Unable to load default driver adapter '" + defaultDriverName.get() + '\'');
@@ -90,7 +92,7 @@ public StandardActivity(NBComponent parent, ActivityDef activityDef) {
List pops = new ArrayList<>();
- List> adapterlist = new ArrayList<>();
+ List> adapterlist = new ArrayList<>();
NBConfigModel supersetConfig = ConfigModel.of(StandardActivity.class).add(yamlmodel);
Optional defaultDriverOption = defaultDriverName;
@@ -107,16 +109,15 @@ public StandardActivity(NBComponent parent, ActivityDef activityDef) {
// .orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
-
// HERE
if (!adapters.containsKey(driverName)) {
- DriverAdapter,?> adapter = Optional.of(driverName)
+ DriverAdapter, ?> adapter = Optional.of(driverName)
.flatMap(
- name -> ServiceSelector.of(
- name,
- ServiceLoader.load(DriverAdapterLoader.class)
- )
+ name -> ServiceSelector.of(
+ name,
+ ServiceLoader.load(DriverAdapterLoader.class)
+ )
.get())
.map(
l -> l.load(
@@ -143,7 +144,7 @@ public StandardActivity(NBComponent parent, ActivityDef activityDef) {
supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
- DriverAdapter,?> adapter = adapters.get(driverName);
+ DriverAdapter, ?> adapter = adapters.get(driverName);
adapterlist.add(adapter);
ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
Optional discard = pop.takeOptionalStaticValue("driver", String.class);
@@ -166,24 +167,24 @@ public StandardActivity(NBComponent parent, ActivityDef activityDef) {
throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
}
- create().gauge(
+ create().gauge(
"ops_pending",
- () -> this.getProgressMeter().getSummary().pending(),
- MetricCategory.Core,
- "The current number of operations which have not been dispatched for processing yet."
- );
- create().gauge(
+ () -> this.getProgressMeter().getSummary().pending(),
+ MetricCategory.Core,
+ "The current number of operations which have not been dispatched for processing yet."
+ );
+ create().gauge(
"ops_active",
- () -> this.getProgressMeter().getSummary().current(),
- MetricCategory.Core,
- "The current number of operations which have been dispatched for processing, but which have not yet completed."
- );
- create().gauge(
+ () -> this.getProgressMeter().getSummary().current(),
+ MetricCategory.Core,
+ "The current number of operations which have been dispatched for processing, but which have not yet completed."
+ );
+ create().gauge(
"ops_complete",
- () -> this.getProgressMeter().getSummary().complete(),
- MetricCategory.Core,
- "The current number of operations which have been completed"
- );
+ () -> this.getProgressMeter().getSummary().complete(),
+ MetricCategory.Core,
+ "The current number of operations which have been completed"
+ );
}
@Override
@@ -213,7 +214,7 @@ public OpSequence> getOpSequence() {
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
- for (DriverAdapter,?> adapter : adapters.values()) {
+ for (DriverAdapter, ?> adapter : adapters.values()) {
if (adapter instanceof NBReconfigurable configurable) {
NBConfigModel cfgModel = configurable.getReconfigModel();
NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams());
@@ -244,7 +245,7 @@ public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
@Override
public List getSyntheticOpTemplates(OpsDocList opsDocList, Map cfg) {
List opTemplates = new ArrayList<>();
- for (DriverAdapter,?> adapter : adapters.values()) {
+ for (DriverAdapter, ?> adapter : adapters.values()) {
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
List newTemplates = sotp.getSyntheticOpTemplates(opsDocList, cfg);
opTemplates.addAll(newTemplates);
@@ -260,7 +261,7 @@ public List getSyntheticOpTemplates(OpsDocList opsDocList, Map> entry : adapters.entrySet()) {
+ for (Map.Entry> entry : adapters.entrySet()) {
String adapterName = entry.getKey();
DriverAdapter, ?> adapter = entry.getValue();
adapter.getSpaceCache().getElements().forEach((spaceName, space) -> {
@@ -284,7 +285,7 @@ public NBLabels getLabels() {
@Override
public void onEvent(NBEvent event) {
- switch(event) {
+ switch (event) {
case ParamChange> pc -> {
switch (pc.value()) {
case SetThreads st -> activityDef.setThreads(st.threads);
diff --git a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityapi/ratelimits/SimRateSpecTest.java b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityapi/ratelimits/SimRateSpecTest.java
index 1ca9a6c1c9..53b381b631 100644
--- a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityapi/ratelimits/SimRateSpecTest.java
+++ b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityapi/ratelimits/SimRateSpecTest.java
@@ -46,4 +46,33 @@ public void testTypeSelection() {
SimRateSpec c = new SimRateSpec("12345,1.1");
assertThat(c.verb).isEqualTo(SimRateSpec.Verb.start);
}
+
+ @Test
+ public void testVariantFormats() {
+ assertThat(new SimRateSpec("12345"))
+ .isEqualTo(new SimRateSpec(
+ 12345.0d, 1.1d, SimRateSpec.Verb.start, SimRateSpec.Scope.activity
+ ));
+ assertThat(new SimRateSpec("12345,1.4"))
+ .isEqualTo(new SimRateSpec(
+ 12345.0d, 1.4d, SimRateSpec.Verb.start, SimRateSpec.Scope.activity
+ ));
+ assertThat(new SimRateSpec("12345,configure"))
+ .isEqualTo(new SimRateSpec(
+ 12345.0d, 1.1d, SimRateSpec.Verb.configure, SimRateSpec.Scope.activity
+ ));
+ assertThat(new SimRateSpec("12345,thread"))
+ .isEqualTo(new SimRateSpec(
+ 12345.0d, 1.1d, SimRateSpec.Verb.start, SimRateSpec.Scope.thread
+ ));
+ assertThat(new SimRateSpec("12345,1.4,thread"))
+ .isEqualTo(new SimRateSpec(
+ 12345.0d, 1.4d, SimRateSpec.Verb.start, SimRateSpec.Scope.thread
+ ));
+ assertThat(new SimRateSpec("12345,configure,activity"))
+ .isEqualTo(new SimRateSpec(
+ 12345.0d, 1.1d, SimRateSpec.Verb.configure, SimRateSpec.Scope.activity
+ ));
+
+ }
}