Skip to content

Commit

Permalink
test-utils-stress: getting rid of NamedAction,
Browse files Browse the repository at this point in the history
  StatAccumulator simplification, new result introduced
  • Loading branch information
KetothXupack committed Apr 29, 2015
1 parent 8dea8eb commit 9bb9ef1
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@
* @author <a href="mailto:ketoth.xupack@gmail.com">Ketoth Xupack</a>
* @since 2013-12-29 18:39
*/
class InvocationException extends Exception {
public class InvocationException extends Exception {
private static final long serialVersionUID = 1L;

private final long startNanos;
private final long endNanos;

public InvocationException(final Exception e,
final long startNanos,
final long endNanos) {
super(e);
this.startNanos = startNanos;
this.endNanos = endNanos;
}

public long getStartNanos() {
return startNanos;
}

public long getEndNanos() {
return endNanos;
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
package org.nohope.test.stress;

import org.nohope.test.stress.result.ActionResult;
import org.nohope.test.stress.result.StressResult;
import org.nohope.test.stress.util.Memory;
import org.nohope.test.stress.util.MetricsAccumulator;

import java.util.HashMap;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public final class PreparedMeasurement {
private final int threadsNumber;
private final int cycleCount;
private final Iterable<ExecutorService> executors;
private final Iterable<StatAccumulator> statAccumulators;
private final Collection<StatAccumulator> statAccumulators;
private final List<Thread> threads;

PreparedMeasurement(final int threadsNumber,
final int cycleCount,
final Iterable<ExecutorService> executors,
final Iterable<StatAccumulator> statAccumulators,
final Collection<StatAccumulator> statAccumulators,
final List<Thread> threads) {
this.threadsNumber = threadsNumber;
this.cycleCount = cycleCount;
Expand Down Expand Up @@ -50,27 +47,22 @@ public List<Thread> getThreads() {
return threads;
}

public StressResult perform() throws InterruptedException {
return awaitResult(this);
}

private static StressResult awaitResult(final PreparedMeasurement preparedMeasurement)
throws InterruptedException {
public Result perform() throws InterruptedException {
final MetricsAccumulator metrics = new MetricsAccumulator();
metrics.start();

final Memory memoryStart = Memory.getCurrent();
final long overallStart = System.nanoTime();

preparedMeasurement.threads.parallelStream().forEach(Thread::start);
threads.parallelStream().forEach(Thread::start);

for (final Thread thread : preparedMeasurement.threads) {
for (final Thread thread : threads) {
thread.join();
}

metrics.stop();

for (final ExecutorService service : preparedMeasurement.executors) {
for (final ExecutorService service : executors) {
try {
service.shutdown();
service.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
Expand All @@ -81,20 +73,15 @@ private static StressResult awaitResult(final PreparedMeasurement preparedMeasur
final long overallEnd = System.nanoTime();
final Memory memoryEnd = Memory.getCurrent();

final double runtime = overallEnd - overallStart;

final Map<String, ActionResult> results = new HashMap<>();
for (final StatAccumulator stats : preparedMeasurement.statAccumulators) {
final ActionResult r = stats.getResult();
results.put(r.getName(), r);
}

return new StressResult(results,
preparedMeasurement.threadsNumber,
preparedMeasurement.cycleCount,
runtime,
return new Result(
threadsNumber,
cycleCount,
overallStart,
overallEnd,
statAccumulators,
metrics.getMetrics(),
memoryStart,
memoryEnd);
memoryEnd
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package org.nohope.test.stress;

import org.nohope.test.stress.result.ActionResult;
import org.nohope.test.stress.result.StressMetrics;
import org.nohope.test.stress.result.StressResult;
import org.nohope.test.stress.util.Measurement;
import org.nohope.test.stress.util.Memory;

import java.util.*;
import java.util.Map.Entry;

/**
* @author <a href="mailto:ketoth.xupack@gmail.com">Ketoth Xupack</a>
* @since 2015-04-29 15:05
*/
public class Result {
private final Collection<StatAccumulator> accumulators = new ArrayList<>();
private final Collection<StressMetrics> metrics = new ArrayList<>();

private final long startNanos;
private final long endNanos;
private final int threadsNumber;
private final int cycleCount;

private final Memory memoryStart;
private final Memory memoryEnd;

public Result(final int threadsNumber,
final int cycleCount,
final long startNanos,
final long endNanos,
final Collection<StatAccumulator> accumulators,
final Collection<StressMetrics> metrics,
final Memory memoryStart,
final Memory memoryEnd) {
this.startNanos = startNanos;
this.endNanos = endNanos;
this.threadsNumber = threadsNumber;
this.cycleCount = cycleCount;
this.accumulators.addAll(accumulators);
this.metrics.addAll(metrics);
this.memoryStart = memoryStart;
this.memoryEnd = memoryEnd;
}

public void visitError(final ErrorProcessor processor) {
for (final StatAccumulator accumulator : accumulators) {
final String name = accumulator.getName();
for (Entry<Long, Collection<InvocationException>> e : accumulator.getErrorStats().entrySet()) {
for (InvocationException m : e.getValue()) {
processor.process(name, e.getKey(), m.getCause(), m.getStartNanos(), m.getEndNanos());
}
}
}
}

public void visitResult(final ResultProcessor processor) {
for (final StatAccumulator accumulator : accumulators) {
final String name = accumulator.getName();
for (Entry<Long, Collection<Measurement>> e : accumulator.getTimesPerThread().entrySet()) {
for (Measurement m : e.getValue()) {
processor.process(name, e.getKey(), m.getStartNanos(), m.getEndNanos());
}
}
}
}

@Deprecated
public StressResult asResult() {
final double runtime = endNanos - startNanos;
final Map<String, ActionResult> results = new HashMap<>();
for (final StatAccumulator accumulator : accumulators) {
long maxTimeNanos = 0;
long minTimeNanos = Long.MAX_VALUE;
long totalDeltaNanos = 0L;

for (final Collection<Measurement> perThread : accumulator.getTimesPerThread().values()) {
for (final Measurement e : perThread) {
final long runtimeNanos = e.getEndNanos() - e.getStartNanos();
totalDeltaNanos += runtimeNanos;
if (maxTimeNanos < runtimeNanos) {
maxTimeNanos = runtimeNanos;
}
if (minTimeNanos > runtimeNanos) {
minTimeNanos = runtimeNanos;
}
}
}

final Map<Long, Collection<InvocationException>> errorStats = accumulator.getErrorStats();
final Map<Class<?>, Collection<Exception>> eStats = new HashMap<>();

for (final Entry<Long, Collection<InvocationException>> entry: errorStats.entrySet()) {
for (InvocationException ex : entry.getValue()) {
final Exception e = (Exception) ex.getCause();
eStats.computeIfAbsent(e.getClass(), x -> new ArrayList<>()).add(e);
}
}

results.put(accumulator.getName(), new ActionResult(
accumulator.getName(),
accumulator.getTimesPerThread(),
eStats,
totalDeltaNanos,
minTimeNanos,
maxTimeNanos));
}

return new StressResult(results, threadsNumber, cycleCount, runtime, metrics,
memoryStart, memoryEnd);
}

@FunctionalInterface
public interface ResultProcessor {
void process(final String name, final long threadId, long startNanos, long endNanos);
}

@FunctionalInterface
public interface ErrorProcessor {
void process(final String name, final long threadId, final Throwable e, long startNanos, long endNanos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,113 +2,85 @@

import org.nohope.test.stress.functors.Call;
import org.nohope.test.stress.functors.Get;
import org.nohope.test.stress.result.ActionResult;
import org.nohope.test.stress.util.Measurement;

import javax.annotation.Nonnull;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static java.util.Map.Entry;

/**
* @author <a href="mailto:ketoth.xupack@gmail.com">ketoth xupack</a>
* @since 2013-12-27 16:18
*/
final class StatAccumulator {
private static final Function<Long, List<Measurement>> NEW_LIST =
x -> new ArrayList<>();
private static final Function<Class<?>, ConcurrentLinkedQueue<Exception>> NEW_QUEUE =
private static final Function<Long, Queue<Measurement>> NEW_LIST =
x -> new ConcurrentLinkedQueue<>();
private static final Function<Long, Queue<InvocationException>> NEW_QUEUE =
c -> new ConcurrentLinkedQueue<>();

private final ConcurrentHashMap<Long, List<Measurement>> timesPerThread = new ConcurrentHashMap<>();
private final Map<Class<?>, ConcurrentLinkedQueue<Exception>> errorStats = new ConcurrentHashMap<>();
private final AtomicReference<ActionResult> result = new AtomicReference<>();
private final Map<Long, Collection<Measurement>> timesPerThread = new ConcurrentHashMap<>();
private final Map<Long, Collection<InvocationException>> errorStats = new ConcurrentHashMap<>();
private final String name;

StatAccumulator(final String name) {
this.name = name;
}

@Nonnull
public ActionResult getResult() {
if (result.get() == null) {
calculate();
}
return result.get();
public Map<Long, Collection<Measurement>> getTimesPerThread() {
return timesPerThread;
}

public Map<Long, Collection<InvocationException>> getErrorStats() {
return errorStats;
}

public String getName() {
return name;
}

<T> T measure(final long threadId, final Get<T> invoke) throws InvocationException {
Optional<Measurement> times = Optional.empty();
Measurement times = null;
final long start = System.nanoTime();
try {
final long start = System.nanoTime();
final T result = invoke.get();
final long end = System.nanoTime();
times = Optional.of(Measurement.of(start, end));
times = Measurement.of(start, end);
return result;
} catch (final Exception e) {
handleException(e);
throw new InvocationException();
final InvocationException ex = new InvocationException(e, start, System.nanoTime());
handleException(threadId, ex);
throw ex;
} finally {
// it's safe to use ArrayList here, they are always modified by same thread!
times.ifPresent(timesPerThread.computeIfAbsent(threadId, NEW_LIST)::add);
final Collection<Measurement> list = timesPerThread.computeIfAbsent(threadId, NEW_LIST);
if (times != null) {
list.add(times);
}
}
}

void measure(final long threadId, final Call call) throws InvocationException {
Optional<Measurement> times = Optional.empty();
Measurement times = null;
final long start = System.nanoTime();
try {
final long start = System.nanoTime();
call.call();
final long end = System.nanoTime();
times = Optional.of(Measurement.of(start, end));
times = Measurement.of(start, end);
} catch (final Exception e) {
handleException(e);
throw new InvocationException();
final InvocationException ex = new InvocationException(e, start, System.nanoTime());
handleException(threadId, ex);
throw ex;
} finally {
// it's safe to use ArrayList here, they are always modified by same thread!
times.ifPresent(timesPerThread.computeIfAbsent(threadId, NEW_LIST)::add);
}
}

private void handleException(final Exception e) {
final Class<?> aClass = e.getClass();
errorStats.computeIfAbsent(aClass, NEW_QUEUE).add(e);
}

private void calculate() {
long maxTimeNanos = 0;
long minTimeNanos = Long.MAX_VALUE;
long totalDeltaNanos = 0L;

for (final List<Measurement> perThread : timesPerThread.values()) {
for (final Measurement e : perThread) {
final long runtimeNanos = e.getEndNanos() - e.getStartNanos();
totalDeltaNanos += runtimeNanos;
if (maxTimeNanos < runtimeNanos) {
maxTimeNanos = runtimeNanos;
}
if (minTimeNanos > runtimeNanos) {
minTimeNanos = runtimeNanos;
}
final Collection<Measurement> list = timesPerThread.computeIfAbsent(threadId, NEW_LIST);
if (times != null) {
list.add(times);
}
}
}

final Map<Class<?>, List<Exception>> eStats = new HashMap<>(errorStats.size());

for (final Entry<Class<?>, ConcurrentLinkedQueue<Exception>> entry: errorStats.entrySet()) {
eStats.put(entry.getKey(), new ArrayList<>(entry.getValue()));
}

result.set(new ActionResult(
name,
timesPerThread,
eStats,
totalDeltaNanos,
minTimeNanos,
maxTimeNanos));
private void handleException(final long threadId, final InvocationException e) {
errorStats.computeIfAbsent(threadId, NEW_QUEUE).add(e);
}
}
Loading

0 comments on commit 9bb9ef1

Please sign in to comment.