Skip to content

Commit

Permalink
prepared measurement, metrics stub
Browse files Browse the repository at this point in the history
  • Loading branch information
FeiWongReed committed Apr 28, 2015
1 parent 42f15b1 commit 1a7ad33
Show file tree
Hide file tree
Showing 12 changed files with 310 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@
*/
public final class MeasureProvider extends MeasureData {
private final StressScenario scenario;
private final ConcurrentMap<String, MultiInvocationStatCalculator> map;
private final Function<String, MultiInvocationStatCalculator> calculatorFunction;
private final ConcurrentMap<String, StatAccumulator> map;
private final Function<String, StatAccumulator> calculatorFunction;

protected MeasureProvider(final StressScenario scenario,
final int threadId,
final int operationNumber,
final int concurrency,
final ConcurrentMap<String, MultiInvocationStatCalculator> map) {
final ConcurrentMap<String, StatAccumulator> map) {
super(threadId, operationNumber, concurrency);
this.scenario = scenario;
this.map = map;
this.calculatorFunction = newName -> new MultiInvocationStatCalculator(
this.calculatorFunction = newName -> new StatAccumulator(
this.scenario.getResolution(), newName,
concurrency);
}
Expand All @@ -36,7 +36,7 @@ public void call(final String name, final Invoke invoke) throws Exception {
getStat(name).invoke(getThreadId(), invoke);
}

private MultiInvocationStatCalculator getStat(final String name) {
private StatAccumulator getStat(final String name) {
return map.computeIfAbsent(name, calculatorFunction);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
*/
public final class PooledMeasureProvider extends MeasureData {
private final LoadingCache<String, ExecutorService> poolCache;
private final LoadingCache<String, MultiInvocationStatCalculator> statCache;
private final LoadingCache<String, StatAccumulator> statCache;

protected PooledMeasureProvider(final int threadId,
final int operationNumber,
final int concurrency,
final LoadingCache<String, MultiInvocationStatCalculator> statCache,
final LoadingCache<String, StatAccumulator> statCache,
final LoadingCache<String, ExecutorService> poolCache) {
super(threadId, operationNumber, concurrency);
this.poolCache = poolCache;
Expand All @@ -31,12 +31,12 @@ public long getThreadId() {
}

public <T> Future<T> invoke(final String name, final Get<T> getter) throws Exception {
final MultiInvocationStatCalculator calc = statCache.get(name);
final StatAccumulator calc = statCache.get(name);
return poolCache.get(name).submit(() -> calc.invoke(getThreadId(), getter));
}

public void invoke(final String name, final Invoke invoke) throws Exception {
final MultiInvocationStatCalculator calc = statCache.get(name);
final StatAccumulator calc = statCache.get(name);
poolCache.get(name).submit(() -> {
try {
calc.invoke(getThreadId(), invoke);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package org.nohope.test.stress;

import org.nohope.test.stress.result.ActionResult;
import org.nohope.test.stress.result.StressResult;
import org.nohope.test.stress.util.Memory;
import org.nohope.test.stress.util.MetricsAccumulator;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

final public class PreparedMeasurement {
private final int threadsNumber;
private final int cycleCount;
private final Iterable<ExecutorService> executors;
private final Iterable<? extends StatAccumulator> statAccumulators;
private final List<Thread> threads;
private final TimerResolution timerResolution;


PreparedMeasurement(final TimerResolution resolution, final int threadsNumber, final int cycleCount, final Iterable<ExecutorService> executors,
final Iterable<? extends StatAccumulator> statAccumulators, final List<Thread> threads) {
this.timerResolution = resolution;
this.threadsNumber = threadsNumber;
this.cycleCount = cycleCount;
this.executors = executors;
this.statAccumulators = statAccumulators;
this.threads = threads;
}


public int getThreadsNumber() {
return threadsNumber;
}


public int getCycleCount() {
return cycleCount;
}


public Iterable<ExecutorService> getExecutors() {
return executors;
}


public Iterable<? extends StatAccumulator> getStatAccumulators() {
return statAccumulators;
}


public List<Thread> getThreads() {
return threads;
}

public StressResult perform() throws InterruptedException {
return awaitResult(this);
}

private StressResult awaitResult(final PreparedMeasurement preparedMeasurement) throws InterruptedException {
final MetricsAccumulator metrics = new MetricsAccumulator();
metrics.start();

final Memory memoryStart = Memory.getCurrent();
final long overallStart = timerResolution.currentTime();

preparedMeasurement.getThreads().forEach(java.lang.Thread::start);

for (final Thread thread : preparedMeasurement.getThreads()) {
thread.join();
}

metrics.stop();

for (final ExecutorService service : preparedMeasurement.getExecutors()) {
try {
service.shutdown();
service.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
} catch (final InterruptedException ignored) {
}
}

final long overallEnd = timerResolution.currentTime();
final Memory memoryEnd = Memory.getCurrent();

final double runtime = overallEnd - overallStart;

final Map<String, ActionResult> results = new HashMap<>();
for (final StatAccumulator stats : preparedMeasurement.getStatAccumulators()) {
final ActionResult r = stats.getResult();
results.put(r.getName(), r);
}

return new StressResult(results, preparedMeasurement.getThreadsNumber(), preparedMeasurement.getCycleCount(), runtime, metrics.getMetrics(), memoryStart, memoryEnd);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
* @author <a href="mailto:ketoth.xupack@gmail.com">ketoth xupack</a>
* @since 2013-12-27 16:18
*/
class SingleInvocationStatCalculator extends StatCalculator {
class SingleInvocationStatAccumulator extends StatAccumulator {
private final NamedAction action;

public SingleInvocationStatCalculator(final TimerResolution resolution,
final NamedAction action,
final int concurrency) {
public SingleInvocationStatAccumulator(final TimerResolution resolution,
final NamedAction action,
final int concurrency) {
super(resolution, action.getName(), concurrency);
this.action = action;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.nohope.test.stress.functors.Get;
import org.nohope.test.stress.functors.Invoke;
import org.nohope.test.stress.result.ActionResult;

import javax.annotation.Nonnull;
import java.util.ArrayList;
Expand All @@ -11,7 +12,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Map.Entry;
Expand All @@ -20,19 +20,17 @@
* @author <a href="mailto:ketoth.xupack@gmail.com">ketoth xupack</a>
* @since 2013-12-27 16:18
*/
class StatCalculator {
class StatAccumulator {
private final ConcurrentHashMap<Long, List<Entry<Long, Long>>> timesPerThread = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Class<?>, ConcurrentLinkedQueue<Exception>> errorStats = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Class<?>, ConcurrentLinkedQueue<Throwable>> rootErrorStats = new ConcurrentHashMap<>();
private final AtomicReference<Result> result = new AtomicReference<>();
private final AtomicInteger fails = new AtomicInteger(0);
private final AtomicReference<ActionResult> result = new AtomicReference<>();
private final String name;
private final TimerResolution resolution;
private final int concurrency;


protected StatCalculator(final TimerResolution resolution,
final String name, final int concurrency) {
protected StatAccumulator(final TimerResolution resolution,
final String name, final int concurrency) {
this.resolution = resolution;
this.name = name;
this.concurrency = concurrency;
Expand All @@ -44,7 +42,7 @@ protected StatCalculator(final TimerResolution resolution,
}

@Nonnull
public Result getResult() {
public ActionResult getResult() {
if (result.get() == null) {
calculate();
}
Expand Down Expand Up @@ -84,7 +82,6 @@ protected final void invoke(final long threadId,
private void handleException(final Exception e) {
final Class<?> aClass = e.getClass();
errorStats.computeIfAbsent(aClass, clazz -> new ConcurrentLinkedQueue<>()).add(e);
fails.getAndIncrement();
}


Expand Down Expand Up @@ -114,7 +111,7 @@ private void calculate() {
}


result.set(new Result(
result.set(new ActionResult(
name,
timesPerThread,
eStats,
Expand Down
Loading

0 comments on commit 1a7ad33

Please sign in to comment.