Skip to content

Commit

Permalink
nosqlbench-1323 Add support for cyclerate_per_thread in NB5
Browse files Browse the repository at this point in the history
  • Loading branch information
jshook committed Apr 16, 2024
1 parent 0046230 commit 35a6f60
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 119 deletions.
Expand Up @@ -74,7 +74,7 @@ public NBComponent getParent() {
}

@Override
public NBComponent attachChild(NBComponent... children) {
public synchronized NBComponent attachChild(NBComponent... children) {

for (NBComponent child : children) {
logger.debug(() -> "attaching " + child.description() + " to parent " + this.description());
Expand Down
Expand Up @@ -106,22 +106,6 @@ default String getCycleSummary() {
*/
RateLimiter getCycleLimiter();

/**
* Set the cycle rate limiter for this activity. This method should only
* be used non-concurrently. Otherwise, the supplier version
* {@link #getCycleRateLimiter(Supplier)} should be used.
* @param rateLimiter The cycle {@link RateLimiter} for this activity
*/
void setCycleLimiter(RateLimiter rateLimiter);

/**
* Get or create the cycle rate limiter in a safe way. Implementations
* should ensure that this method is synchronized or that each requester
* gets the same cycle rate limiter for the activity.
* @param supplier A {@link RateLimiter} {@link Supplier}
* @return An extant or newly created cycle {@link RateLimiter}
*/
RateLimiter getCycleRateLimiter(Supplier<? extends RateLimiter> supplier);

/**
* Get the current stride rate limiter for this activity.
Expand All @@ -131,23 +115,6 @@ default String getCycleSummary() {
*/
RateLimiter getStrideLimiter();

/**
* Set the stride rate limiter for this activity. This method should only
* be used non-concurrently. Otherwise, the supplier version
* {@link #getStrideRateLimiter(Supplier)}} should be used.
* @param rateLimiter The stride {@link RateLimiter} for this activity.
*/
void setStrideLimiter(RateLimiter rateLimiter);

/**
* Get or create the stride {@link RateLimiter} in a concurrent-safe
* way. Implementations should ensure that this method is synchronized or
* that each requester gets the same stride rate limiter for the activity.
* @param supplier A {@link RateLimiter} {@link Supplier}
* @return An extant or newly created stride {@link RateLimiter}
*/
RateLimiter getStrideRateLimiter(Supplier<? extends RateLimiter> supplier);

/**
* Get or create the instrumentation needed for this activity. This provides
* a single place to find and manage, and document instrumentation that is
Expand Down
Expand Up @@ -79,7 +79,11 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
private long startTime;

public SimRate(NBComponent parent, SimRateSpec spec) {
super(parent, NBLabels.forKV().and("rateType",
this(parent, spec, NBLabels.forKV());
}

public SimRate(NBComponent parent, SimRateSpec spec, NBLabels extraLabels) {
super(parent, extraLabels.and("rateType",
(spec instanceof CycleRateSpec? "cycle" : "stride")));
this.spec = spec;
initMetrics();
Expand Down
Expand Up @@ -49,7 +49,6 @@
* </UL>
* <p>
* Where:
*
* <EM>rate</EM> is the ops per second, expressed as any positive floating point value.
* <EM>burst ratio</EM> is a floating point value greater than 1.0 which determines how much faster
* the rate limiter may go to catch up to the overall.
Expand Down Expand Up @@ -128,8 +127,13 @@ public class SimRateSpec {
public static final double DEFAULT_RATE_OPS_S = 1.0D;
public static final double DEFAULT_BURST_RATIO = 1.1D;
public static Verb DEFAULT_VERB = Verb.start;
public enum Scope {
thread,
activity
}

public ChronoUnit unit;
private Scope scope = Scope.activity;

/**
* Target rate in Operations Per Second
Expand Down Expand Up @@ -212,16 +216,23 @@ public enum Verb {
public SimRateSpec(double opsPerSec, double burstRatio) {
this(opsPerSec, burstRatio, DEFAULT_VERB);
}

public SimRateSpec(double opsPerSec, double burstRatio, Verb type) {
apply(opsPerSec, burstRatio, verb);
public SimRateSpec(double opsPerSec, double burstRatio, Verb verb) {
apply(opsPerSec, burstRatio, verb, Scope.activity);
}
public SimRateSpec(double opsPerSec, double burstRatio, Scope scope) {
apply(opsPerSec, burstRatio, DEFAULT_VERB, scope);
}
public SimRateSpec(double opsPerSec, double burstRatio, Verb verb, Scope scope) {
apply(opsPerSec, burstRatio, verb, scope);
}


private void apply(double opsPerSec, double burstRatio, Verb verb) {
private void apply(double opsPerSec, double burstRatio, Verb verb, Scope scope) {
this.opsPerSec = opsPerSec;
this.burstRatio = burstRatio;
this.verb = verb;
this.unit = chronoUnitFor(opsPerSec);
this.scope = scope;

// TODO: include burst into ticks calculation
}
Expand All @@ -245,13 +256,21 @@ public SimRateSpec(ParameterMap.NamedParameter tuple) {

public SimRateSpec(String spec) {
String[] specs = spec.split("[,:;]");
Verb verb = Verb.start;
double burstRatio = DEFAULT_BURST_RATIO;
double opsPerSec;
double burstRatio = DEFAULT_BURST_RATIO;
Verb verb = Verb.start;
Scope scope = Scope.activity;
switch (specs.length) {
case 4:
scope = Scope.valueOf(specs[3].toLowerCase());
case 3:
verb = Verb.valueOf(specs[2].toLowerCase());
logger.debug("selected rate limiter type: " + verb);
try {
scope = Scope.valueOf(specs[2].toLowerCase());
logger.debug("selected rate limiter scope: " + scope);
} catch (IllegalArgumentException iae) {
verb = Verb.valueOf(specs[2].toLowerCase());
logger.debug("selected rate limiter type: " + verb);
}
case 2:
burstRatio = Double.valueOf(specs[1]);
if (burstRatio < 1.0) {
Expand All @@ -263,7 +282,7 @@ public SimRateSpec(String spec) {
default:
throw new RuntimeException("Rate specs must be either '<rate>' or '<rate>:<burstRatio>' as in 5000.0 or 5000.0:1.0");
}
apply(opsPerSec, burstRatio, verb);
apply(opsPerSec, burstRatio, verb, scope);
}

public String toString() {
Expand Down Expand Up @@ -319,5 +338,10 @@ public boolean isRestart() {
return this.verb == Verb.restart;
}

public Scope getScope() {
return this.scope;
}



}
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nosqlbench.engine.api.activityapi.simrate;

import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.function.Supplier;

public class ThreadLocalRateLimiters {

private static final Logger logger = LogManager.getLogger(ThreadLocalRateLimiters.class);

public static synchronized ThreadLocal<RateLimiter> createOrUpdate(
final NBComponent parent,
final ThreadLocal<RateLimiter> extantSource,
final SimRateSpec spec
) {
if (extantSource != null) {
RateLimiter rl = extantSource.get();
rl.applyRateSpec(spec);
return extantSource;
} else {
Supplier<RateLimiter> rls;
rls = switch (spec.getScope()) {
case activity -> {
SimRate rl = new SimRate(parent, spec);
yield () -> rl;
}
case thread -> () -> new SimRate(
parent,
spec,
NBLabels.forKV("thread", Thread.currentThread().getName())
);
};
return ThreadLocal.withInitial(rls);
}
}

}
Expand Up @@ -18,6 +18,7 @@

import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.engine.api.activityapi.simrate.*;
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.events.ParamChange;
Expand All @@ -28,9 +29,6 @@
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiters;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.simrate.SimRateSpec;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.nb.api.components.status.NBStatusComponent;
Expand All @@ -42,12 +40,10 @@
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter;
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
import io.nosqlbench.engine.api.activityapi.simrate.StrideRateSpec;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryRunOpDispenserWrapper;
Expand Down Expand Up @@ -78,8 +74,8 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
private OutputDispenser markerDispenser;
private IntPredicateDispenser resultFilterDispenser;
private RunState runState = RunState.Uninitialized;
private RateLimiter strideLimiter;
private RateLimiter cycleLimiter;
private ThreadLocal<RateLimiter> strideLimiterSource;
private ThreadLocal<RateLimiter> cycleLimiterSource;
private ActivityInstrumentation activityInstrumentation;
private PrintWriter console;
private long startedAtMillis;
Expand Down Expand Up @@ -229,41 +225,21 @@ public void closeAutoCloseables() {

@Override
public RateLimiter getCycleLimiter() {
return this.cycleLimiter;
}

@Override
public synchronized void setCycleLimiter(RateLimiter rateLimiter) {
this.cycleLimiter = rateLimiter;
}

@Override
public synchronized RateLimiter getCycleRateLimiter(Supplier<? extends RateLimiter> s) {
if (null == this.cycleLimiter) {
cycleLimiter = s.get();
if (cycleLimiterSource!=null) {
return cycleLimiterSource.get();
} else {
return null;
}
return cycleLimiter;
}

@Override
public synchronized RateLimiter getStrideLimiter() {
return this.strideLimiter;
}

@Override
public synchronized void setStrideLimiter(RateLimiter rateLimiter) {
this.strideLimiter = rateLimiter;
}

@Override
public synchronized RateLimiter getStrideRateLimiter(Supplier<? extends RateLimiter> s) {
if (null == this.strideLimiter) {
strideLimiter = s.get();
if (strideLimiterSource!=null) {
return strideLimiterSource.get();
} else {
return null;
}
return strideLimiter;
}


@Override
public synchronized ActivityInstrumentation getInstrumentation() {
if (null == this.activityInstrumentation) {
Expand Down Expand Up @@ -306,6 +282,8 @@ public synchronized void onActivityDefUpdate(ActivityDef activityDef) {

public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {

// cycleratePerThread = activityDef.getParams().takeBoolOrDefault("cyclerate_per_thread", false);

activityDef.getParams().getOptionalNamedParameter("striderate")
.map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));

Expand All @@ -315,13 +293,14 @@ public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {
}

public void createOrUpdateStrideLimiter(SimRateSpec spec) {
strideLimiter = RateLimiters.createOrUpdate(this, strideLimiter, spec);
strideLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, strideLimiterSource, spec);
}

public void createOrUpdateCycleLimiter(SimRateSpec spec) {
cycleLimiter = RateLimiters.createOrUpdate(this, cycleLimiter, spec);
cycleLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, cycleLimiterSource, spec);
}


/**
* Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been provided, based on the
* length of the sequence as determined by the provided ratios. Also, modify the ActivityDef with reasonable
Expand All @@ -344,10 +323,7 @@ public synchronized void setDefaultsFromOpSequence(OpSequence<?> seq) {
if (cyclesOpt.isEmpty()) {
String cycles = getParams().getOptionalString("stride").orElseThrow();
logger.info(() -> "defaulting cycles to " + cycles + " (the stride length)");
// getParams().set("cycles", getParams().getOptionalString("stride").orElseThrow());
// getParams().setSilently("cycles", getParams().getOptionalString("stride").orElseThrow());
this.getActivityDef().setCycles(getParams().getOptionalString("stride").orElseThrow());
// getParams().set("cycles", getParams().getOptionalString("stride").orElseThrow());
} else {
if (0 == activityDef.getCycleCount()) {
throw new RuntimeException(
Expand Down Expand Up @@ -680,4 +656,22 @@ public RunStateTally getRunStateTally() {
public Map<String, String> asResult() {
return Map.of("activity",this.getAlias());
}

// private final ThreadLocal<RateLimiter> cycleLimiterThreadLocal = ThreadLocal.withInitial(() -> {
// RateLimiters.createOrUpdate(this,null,new SimRateSpec()
// if (cycleratePerThread) {
// return RateLimiters.createOrUpdate(new NBThreadComponent(this),null,)
// } else {
// RateLimiters.createOrUpdate(new NBThreadComponent(this),null,activityDef)
// }
// if (getCycleLimiter() != null) {
// return RateLimiters.createOrUpdate(
// new NBThreadComponent(this),
// getCycleLimiter(),
// getCycleLimiter().getSpec());
// } else {
// return null;
// }
// });

}

0 comments on commit 35a6f60

Please sign in to comment.