Skip to content

Commit

Permalink
some refactorings
Browse files Browse the repository at this point in the history
  • Loading branch information
FeiWongReed committed Apr 28, 2015
1 parent 1a7ad33 commit 024864e
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 54 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* @author <a href="mailto:ketoth.xupack@gmail.com">ketoth xupack</a>
* @since 2013-12-27 16:18
*/
class StatAccumulator {
final class StatAccumulator {
private final ConcurrentHashMap<Long, List<Entry<Long, Long>>> timesPerThread = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Class<?>, ConcurrentLinkedQueue<Exception>> errorStats = new ConcurrentHashMap<>();
private final AtomicReference<ActionResult> result = new AtomicReference<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,15 @@
import org.nohope.test.stress.actions.Action;
import org.nohope.test.stress.actions.NamedAction;
import org.nohope.test.stress.actions.PooledAction;
import org.nohope.test.stress.result.ActionResult;
import org.nohope.test.stress.result.StressResult;
import org.nohope.test.stress.util.Memory;
import org.nohope.test.stress.util.MetricsAccumulator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
* @author <a href="mailto:ketoth.xupack@gmail.com">ketoth xupack</a>
Expand All @@ -47,30 +40,34 @@ public PreparedMeasurement prepare(final int threadsNumber,
final int cycleCount,
final NamedAction... actions)
throws InterruptedException {
final ConcurrentMap<String, StatAccumulator> result =
new ConcurrentHashMap<>(16, 0.75f, threadsNumber);

final Map<String, SingleInvocationStatAccumulator> stats = new HashMap<>(actions.length);
for (final NamedAction action : actions) {
stats.put(action.getName(),
new SingleInvocationStatAccumulator(resolution, action, threadsNumber));
final List<MeasureProvider> providers = new ArrayList<>(threadsNumber * cycleCount);
for (int i = 0; i < threadsNumber; i++) {
for (int j = i * cycleCount; j < (i + 1) * cycleCount; j++) {
providers.add(new MeasureProvider(this, i, j, threadsNumber, result));
}
}

final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < threadsNumber; i++) {
final int k = i;
threads.add(new Thread(() -> {
for (int j = k * cycleCount; j < (k + 1) * cycleCount; j++) {
try {
for (final SingleInvocationStatAccumulator stat : stats.values()) {
stat.invoke(k, j);
for (NamedAction action : actions) {
try {
final MeasureProvider provider = providers.get(j);
provider.call(action.getName(), () -> action.doAction(provider));
} catch (final Exception e) {
// TODO: print skipped
}
} catch (final InvocationException e) {
// TODO
}
}
}, "stress-worker-" + k));
}

return new PreparedMeasurement(resolution, threadsNumber, cycleCount, Collections.emptyList(), stats.values(), threads);
return new PreparedMeasurement(resolution, threadsNumber, cycleCount, Collections.emptyList(), result.values(), threads);
}

public PreparedMeasurement prepare(final int threadsNumber,
Expand Down Expand Up @@ -106,22 +103,23 @@ public PreparedMeasurement prepare(final int threadsNumber,

}

public PreparedMeasurement prepare(final int threadsNumber,
public PreparedMeasurement prepare(final int threadsPerCoordinator,
final int cycleCount,
final int coordinateThreadsCount,
final PooledAction action)
throws InterruptedException {

final int concurrency = threadsPerCoordinator * coordinateThreadsCount;

final LoadingCache<String, ExecutorService> threadPools =
CacheBuilder.newBuilder()
.concurrencyLevel(threadsNumber)
.build(new PoolLoader(threadsNumber));
.concurrencyLevel(concurrency)
.build(new PoolLoader(threadsPerCoordinator));

final int concurrency = threadsNumber * coordinateThreadsCount;

final LoadingCache<String, StatAccumulator> calcPool =
CacheBuilder.newBuilder()
.concurrencyLevel(threadsNumber)
.concurrencyLevel(threadsPerCoordinator)
.build(new CacheLoader<String, StatAccumulator>() {
@Override
public StatAccumulator load(final String key) throws Exception {
Expand All @@ -130,14 +128,14 @@ public StatAccumulator load(final String key) throws Exception {
}
});

final List<PooledMeasureProvider> providers = new ArrayList<>(threadsNumber * cycleCount);
for (int i = 0; i < threadsNumber; i++) {
final List<PooledMeasureProvider> providers = new ArrayList<>(threadsPerCoordinator * cycleCount);
for (int i = 0; i < threadsPerCoordinator; i++) {
for (int j = i * cycleCount; j < (i + 1) * cycleCount; j++) {
providers.add(new PooledMeasureProvider(i, j, concurrency, calcPool, threadPools));
}
}

final int mul = threadsNumber / coordinateThreadsCount;
final int mul = threadsPerCoordinator / coordinateThreadsCount;
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < coordinateThreadsCount; i++) {
final int k = i;
Expand All @@ -152,7 +150,7 @@ public StatAccumulator load(final String key) throws Exception {
}, "stress-worker-" + k));
}

return new PreparedMeasurement(resolution, threadsNumber, cycleCount, threadPools.asMap().values(),
return new PreparedMeasurement(resolution, threadsPerCoordinator, cycleCount, threadPools.asMap().values(),
calcPool.asMap().values(), threads);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public void doAction(final MeasureProvider p) throws Exception {
final AtomicLong atomic = new AtomicLong();
final StressResult result =
StressScenario.of(TimerResolution.MILLISECONDS)
.prepare(500, 100, 10, new PooledAction() {
.prepare(50, 100, 10, new PooledAction() {
@Override
public void doAction(final PooledMeasureProvider p) throws Exception {
p.invoke("test1", () -> {
Expand Down

0 comments on commit 024864e

Please sign in to comment.