Skip to content

Commit

Permalink
diag progress on nb5
Browse files Browse the repository at this point in the history
  • Loading branch information
jshook committed Jun 1, 2022
1 parent c29e759 commit 7c9f427
Show file tree
Hide file tree
Showing 17 changed files with 938 additions and 0 deletions.
78 changes: 78 additions & 0 deletions adapter-diag/pom.xml
@@ -0,0 +1,78 @@
<!--
~ Copyright (c) 2022 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>


<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.15-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>

<artifactId>adapter-diag</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>A nosqlbench ActivityType (AT) driver module;
Provides a diagnostic activity that logs input at some interval
</description>

<dependencies>

<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-annotations</artifactId>
<version>4.17.15-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.15-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.17.15-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

</dependencies>

<!-- <build>-->
<!-- <plugins>-->
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-compiler-plugin</artifactId>-->
<!-- <configuration>-->
<!-- <annotationProcessorPaths>-->
<!-- <path>-->
<!-- <groupId>io.nosqlbench</groupId>-->
<!-- <artifactId>nb-annotations</artifactId>-->
<!-- <version>${version}</version>-->
<!-- </path>-->
<!-- </annotationProcessorPaths>-->
<!-- </configuration>-->
<!-- </plugin>-->
<!-- </plugins>-->

<!-- </build>-->

</project>
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nosqlbench.activitytype.diag;

public class DiagDummyError extends RuntimeException {
public DiagDummyError(String s) {
super(s);
}
}
@@ -0,0 +1,84 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nosqlbench.activitytype.diag;

import java.util.ArrayList;
import java.util.List;
import java.util.function.LongToIntFunction;

public class DiagOpData {

private final String description;
private final List<String> diaglog = new ArrayList<>();

private LongToIntFunction resultFunc;
private long simulatedDelayNanos;

public DiagOpData(String description) {
this.description = description;
}

/**
* If this function is provided, the result will be set to the value of the
* evaluated function with the op cycle.
*
* This is known as "resultfunc" in parameter space.
*
* The function must be thread-safe.
*
* @param resultFunc A function to map the cycle to the result value
* @return this, for method chaining
*/
public DiagOpData withResultFunction(LongToIntFunction resultFunc) {
this.resultFunc = resultFunc;
return this;
}

/**
* If this function is provided, the completion of the operation will be
* delayed until the system nanotime is at least the op start time in
* addition to the provided delay.
*
* This is controlled as "delayfunc" in parameter space.
*
* @param simulatedDelayNanos The amount of nanos ensure as a minimum
* of processing time for this op
*/
public DiagOpData setSimulatedDelayNanos(long simulatedDelayNanos) {
this.simulatedDelayNanos = simulatedDelayNanos;
return this;
}

public long getSimulatedDelayNanos() {
return simulatedDelayNanos;
}

@Override
public String toString() {
return super.toString() + ", description:'" + description;
}
public String getDescription() {
return description;
}
public void log(String logline) {
this.diaglog.add(logline);
}
public List<String> getDiagLog() {
return diaglog;
}

}
@@ -0,0 +1,20 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nosqlbench.activitytype.diag;

public class DiagResult {
}
@@ -0,0 +1,106 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nosqlbench.activitytype.diag;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

import java.security.InvalidParameterException;
import java.util.concurrent.atomic.AtomicLong;

public class SequenceBlocker {
private final static Logger logger = LogManager.getLogger(SequenceBlocker.class);
private final AtomicLong sequence;
private final AtomicLong waiting=new AtomicLong(0L);
private final boolean errorsAreFatal;
// private PriorityBlockingQueue<TakeANumber> queue = new PriorityBlockingQueue<>();
private Exception fatalError;

public SequenceBlocker(long start, boolean errorsAreFatal) {
this.sequence = new AtomicLong(start);
this.errorsAreFatal = errorsAreFatal;
}

public synchronized void awaitAndRun(long startAt, long endPlus, Runnable task) {
waiting.incrementAndGet();

if (fatalError != null) {
throw new RuntimeException("There was previously a fatal error, not allowing new tasks. Error=" + fatalError.getMessage());
}

// queue.add(new TakeANumber(startAt, sequencePlusCount, task));
while (sequence.get() != startAt) {
try {
wait(1_000);
} catch (InterruptedException ignored) {
}
}

try {
task.run();
} catch (Exception e) {
logger.error("Runnable errored in SequenceBlocker: " + e.getMessage());
if (errorsAreFatal) {
this.fatalError = e;
}
throw e;
} finally {
waiting.decrementAndGet();
if (!sequence.compareAndSet(startAt,endPlus)) {
throw new InvalidParameterException("Serious logic error in synchronizer. This should never fail.");
}
}
notifyAll();
}

public synchronized void awaitCompletion() {
while (waiting.get()>0)
try {
wait(60_000);
} catch (InterruptedException ignored) {
}
}

private final static class TakeANumber implements Comparable<TakeANumber> {
private final long start;
private final long endPlus;
private final Runnable task;

public TakeANumber(long start, long endPlus, Runnable task) {
this.start = start;
this.endPlus = endPlus;
this.task = task;
}

@Override
public int compareTo(TakeANumber o) {
return Long.compare(start, o.start);
}

public long getStart() {
return start;
}

public long getEndPlus() {
return endPlus;
}

public String toString() {
return "[" + getStart() + "-" + getEndPlus() + ")";
}
}
}
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nosqlbench.adapter.diag;


import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.function.Function;

@Service(value=DriverAdapter.class,selector="diag")
public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp,DiagSpace> {

private final static Logger logger = LogManager.getLogger(DiagDriverAdapter.class);

public DiagDriverAdapter() {
logger.debug("starting up");
}

@Override
public OpMapper<DiagOp> getOpMapper() {
return new DiagOpMapper();
}

@Override
public Function<String, ? extends DiagSpace> getSpaceInitializer(NBConfiguration cfg) {
return (String name) -> new DiagSpace(cfg);
}

@Override
public NBConfigModel getConfigModel() {
return super.getConfigModel().add(DiagSpace.getConfigModel());
}
}

0 comments on commit 7c9f427

Please sign in to comment.