Skip to content

Commit

Permalink
adapters ("drivers") can be specified per op template within a workload
Browse files Browse the repository at this point in the history
  • Loading branch information
jshook committed Jun 30, 2022
1 parent 8d3d929 commit c55bbfc
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 73 deletions.
Expand Up @@ -159,6 +159,7 @@ public NBConfigModel getConfigModel() {
.add(Param.optional("seq", String.class, "sequencing algorithm"))
.add(Param.optional("instrument", Boolean.class))
.add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file"))
.add(Param.optional("driver",String.class))
.asReadOnly();
}

Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.common.schema.KeyValueEncodingType;

import java.util.Map;
import java.util.Optional;

public class PulsarActivity extends SimpleActivity implements ActivityDefObserver {

Expand Down Expand Up @@ -136,7 +137,7 @@ public void initActivity() {
createPulsarSchemaFromConf();


this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache, this), false);
this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache, this), false, Optional.empty());
setDefaultsFromOpSequence(sequencer);
onActivityDefUpdate(activityDef);

Expand Down
Expand Up @@ -52,7 +52,7 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
private final Timer resultTimer;
private final Timer bindTimer;
private final NBErrorHandler errorHandler;
private final OpSequence<OpDispenser<? extends R>> opsequence;
private final OpSequence<OpDispenser<? extends Op>> opsequence;

public StandardAction(A activity, int slot) {
this.activity = activity;
Expand All @@ -69,7 +69,7 @@ public StandardAction(A activity, int slot) {
@Override
public int runCycle(long cycle) {

OpDispenser<? extends R> dispenser;
OpDispenser<? extends Op> dispenser;
Op op = null;

try (Timer.Context ct = bindTimer.time()) {
Expand Down
Expand Up @@ -26,16 +26,15 @@
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.annotations.ServiceSelector;
import io.nosqlbench.nb.api.config.standard.*;
import io.nosqlbench.nb.api.errors.OpConfigError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

/**
* This is a typed activity which is expected to become the standard
Expand All @@ -48,38 +47,64 @@
public class StandardActivity<R extends Op, S> extends SimpleActivity implements SyntheticOpTemplateProvider {
private final static Logger logger = LogManager.getLogger("ACTIVITY");

private final DriverAdapter<R, S> adapter;
private final OpSequence<OpDispenser<? extends R>> sequence;
private final OpSequence<OpDispenser<? extends Op>> sequence;
private final NBConfigModel yamlmodel;
private final ConcurrentHashMap<String, DriverAdapter> adapters = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, OpMapper<Op>> mappers = new ConcurrentHashMap<>();

public StandardActivity(DriverAdapter<R, S> adapter, ActivityDef activityDef) {
public StandardActivity(ActivityDef activityDef) {
super(activityDef);
this.adapter = adapter;

if (adapter instanceof NBConfigurable configurable) {
NBConfigModel cmodel = configurable.getConfigModel();
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
if (yaml_loc.isPresent()) {
Map<String,Object> disposable = new LinkedHashMap<>(activityDef.getParams());
StmtsDocList workload = StatementsLoader.loadPath(logger, yaml_loc.get(), disposable, "activities");
yamlmodel = workload.getConfigModel();
}
else {
yamlmodel= ConfigModel.of(StandardActivity.class).asReadOnly();
}
NBConfigModel combinedModel = cmodel.add(yamlmodel);
NBConfiguration configuration = combinedModel.apply(activityDef.getParams());
configurable.applyConfig(configuration);
this.adapters.putAll(adapters);

Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
if (yaml_loc.isPresent()) {
Map<String,Object> disposable = new LinkedHashMap<>(activityDef.getParams());
StmtsDocList workload = StatementsLoader.loadPath(logger, yaml_loc.get(), disposable, "activities");
yamlmodel = workload.getConfigModel();
}
else {
yamlmodel= ConfigModel.of(StandardActivity.class).asReadOnly();
}

ServiceLoader<DriverAdapter> adapterLoader = ServiceLoader.load(DriverAdapter.class);
Optional<DriverAdapter> defaultAdapter = activityDef.getParams().getOptionalString("driver")
.map(s -> ServiceSelector.of(s, adapterLoader).getOne());

List<OpTemplate> opTemplates = loadOpTemplates(defaultAdapter);


List<ParsedOp> pops = new ArrayList<>();
for (OpTemplate ot : opTemplates) {
String driverName = ot.getOptionalStringParam("driver")
.or(() -> activityDef.getParams().getOptionalString("driver"))
.orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));

if (!adapters.containsKey(driverName)) {
DriverAdapter adapter = ServiceSelector.of(driverName, adapterLoader).get().orElseThrow(
() -> new OpConfigError("Unable to load driver adapter for name '" + driverName + "'")
);

NBConfigModel combinedModel = yamlmodel;
NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams());

if (adapter instanceof NBConfigurable configurable) {
NBConfigModel adapterModel = configurable.getConfigModel();
combinedModel = adapterModel.add(yamlmodel);
combinedConfig = combinedModel.matchConfig(activityDef.getParams());
configurable.applyConfig(combinedConfig);
}
adapters.put(driverName,adapter);
mappers.put(driverName,adapter.getOpMapper());
}

DriverAdapter adapter = adapters.get(driverName);
ParsedOp pop = new ParsedOp(ot,adapter.getConfiguration(),List.of(adapter.getPreprocessor()));
pops.add(pop);
}

try {
OpMapper<R> opmapper = adapter.getOpMapper();
Function<Map<String, Object>, Map<String, Object>> preprocessor = adapter.getPreprocessor();
boolean strict = activityDef.getParams().getOptionalBoolean("strict").orElse(false);
sequence = createOpSourceFromCommands(opmapper, adapter.getConfiguration(), List.of(preprocessor), strict);
sequence = createOpSourceFromParsedOps(adapters, mappers, pops);
} catch (Exception e) {
if (e instanceof OpConfigError) {
throw e;
Expand All @@ -95,40 +120,45 @@ public void initActivity() {
setDefaultsFromOpSequence(sequence);
}

public OpSequence<OpDispenser<? extends R>> getOpSequence() {
public OpSequence<OpDispenser<? extends Op>> getOpSequence() {
return sequence;
}

/**
* When an adapter needs to identify an error uniquely for the purposes of
* routing it to the correct error handler, or naming it in logs, or naming
* metrics, override this method in your activity.
*
* @return A function that can reliably and safely map an instance of Throwable to a stable name.
*/
@Override
public final Function<Throwable, String> getErrorNameMapper() {
return adapter.getErrorNameMapper();
}
// /**
// * When an adapter needs to identify an error uniquely for the purposes of
// * routing it to the correct error handler, or naming it in logs, or naming
// * metrics, override this method in your activity.
// *
// * @return A function that can reliably and safely map an instance of Throwable to a stable name.
// */
// @Override
// public final Function<Throwable, String> getErrorNameMapper() {
// return adapter.getErrorNameMapper();
// }

@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);

if (adapter instanceof NBReconfigurable configurable) {
NBConfigModel cfgModel = configurable.getReconfigModel();
NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams());
NBReconfigurable.applyMatching(cfg,List.of(configurable));
for (DriverAdapter adapter : adapters.values()) {
if (adapter instanceof NBReconfigurable configurable) {
NBConfigModel cfgModel = configurable.getReconfigModel();
NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams());
NBReconfigurable.applyMatching(cfg,List.of(configurable));
}
}
}

@Override
public List<OpTemplate> getSyntheticOpTemplates(StmtsDocList stmtsDocList, Map<String,Object> cfg) {
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
return sotp.getSyntheticOpTemplates(stmtsDocList, cfg);
} else {
return List.of();
List<OpTemplate> opTemplates = new ArrayList<>();
for (DriverAdapter adapter : adapters.values()) {
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
List<OpTemplate> newTemplates = sotp.getSyntheticOpTemplates(stmtsDocList, cfg);
opTemplates.addAll(newTemplates);
}
}
return opTemplates;
}

}
Expand Up @@ -150,7 +150,7 @@ private Optional<ActivityType> getDriverAdapter(String activityTypeName, Activit
if (oda.isPresent()) {
DriverAdapter<?, ?> driverAdapter = oda.get();

activityDef.getParams().remove("driver");
// activityDef.getParams().remove("driver");
// if (driverAdapter instanceof NBConfigurable) {
// NBConfigModel cfgModel = ((NBConfigurable) driverAdapter).getConfigModel();
// Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
Expand Down
Expand Up @@ -21,6 +21,7 @@
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.nb.annotations.Maturity;
Expand Down Expand Up @@ -319,27 +320,31 @@ private ActivityExecutor getActivityExecutor(ActivityDef activityDef, boolean cr
ActivityExecutor executor = activityExecutors.get(activityDef.getAlias());

if (executor == null && createIfMissing) {

ActivityType<?> activityType = new ActivityTypeLoader()
.setMaturity(this.minMaturity)
.load(activityDef)
.orElseThrow(
() -> new RuntimeException("Driver for '" + activityDef + "' was not found." +
"\nYou can use --list-drivers to see what drivers are supported in this runtime." +
ConfigSuggestions.suggestAlternates(
new ActivityTypeLoader().getAllSelectors(),activityDef.getActivityType(),4)
.orElse("")
)
if (activityDef.getParams().containsKey("driver")) {
ActivityType<?> activityType = new ActivityTypeLoader()
.setMaturity(this.minMaturity)
.load(activityDef)
.orElseThrow(
() -> new RuntimeException("Driver for '" + activityDef + "' was not found." +
"\nYou can use --list-drivers to see what drivers are supported in this runtime." +
ConfigSuggestions.suggestAlternates(
new ActivityTypeLoader().getAllSelectors(),activityDef.getActivityType(),4)
.orElse("")
)
);

executor = new ActivityExecutor(
activityType.getAssembledActivity(
activityDef,
getActivityMap()
),
this.sessionId
);
activityExecutors.put(activityDef.getAlias(), executor);
} else {
new StandardActivityType(activityDef);
}

executor = new ActivityExecutor(
activityType.getAssembledActivity(
activityDef,
getActivityMap()
),
this.sessionId
);
activityExecutors.put(activityDef.getAlias(), executor);
}
return executor;
}
Expand Down
Expand Up @@ -51,7 +51,7 @@ public static ScenarioResult runScenario(String scriptname, String... params) {
paramsMap.put(params[i], params[i + 1]);
}
String scenarioName = "scenario " + scriptname;
System.out.println("=".repeat(29) + " Running ASYNC integration test for: " + scenarioName);
System.out.println("=".repeat(29) + " Running integration test for example scenario: " + scenarioName);
ScenariosExecutor executor = new ScenariosExecutor(ScriptExampleTests.class.getSimpleName() + ":" + scriptname, 1);
Scenario s = new Scenario(scenarioName, Scenario.Engine.Graalvm,"stdout:300", Maturity.Any);

Expand Down Expand Up @@ -236,7 +236,7 @@ public void testShutdownHook() {
public void testExceptionPropagationFromActivityInit() {
ScenarioResult scenarioResult = runScenario("activityiniterror");
assertThat(scenarioResult.getException()).isPresent();
assertThat(scenarioResult.getException().get().getMessage()).contains("Unknown config parameter 'unknown_config'");
assertThat(scenarioResult.getException().get().getMessage()).contains("Unable to convert end cycle from invalid");
assertThat(scenarioResult.getException()).isNotNull();
}

Expand Down
Expand Up @@ -17,7 +17,7 @@
activitydef1 = {
"alias" : "erroring_activity_init",
"driver" : "diag",
"cycles" : "0..1500000",
"cycles" : "invalid",
"threads" : "1",
"targetrate" : "500",
"unknown_config" : "unparsable",
Expand Down

0 comments on commit c55bbfc

Please sign in to comment.