Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JBTM-1453] Handle exceptions from WorkerWorkload #708

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -31,6 +31,11 @@

package com.hp.mwtests.ts.jts.orbspecific.local.performance;

import com.arjuna.ArjunaOTS.ActiveThreads;
import com.arjuna.ArjunaOTS.ActiveTransaction;
import com.arjuna.ArjunaOTS.BadControl;
import com.arjuna.ArjunaOTS.Destroyed;
import com.arjuna.ats.jts.OTSManager;
import io.narayana.perf.Measurement;
import io.narayana.perf.Worker;
import io.narayana.perf.WorkerLifecycle;
Expand All @@ -56,6 +61,7 @@ public void test()
.numberOfThreads(numberOfThreads).batchSize(batchSize)
.numberOfWarmupCalls(warmUpCount).build().measure(worker, worker);

Assert.assertNull("Test exception: " + measurement.getException(), measurement.getException());
Assert.assertEquals(0, measurement.getNumberOfErrors());
Assert.assertFalse(measurement.getInfo(), measurement.shouldFail());

Expand All @@ -66,7 +72,6 @@ public void test()

Worker<Void> worker = new Worker<Void>() {
WorkerLifecycle<Void> lifecycle = new PerformanceWorkerLifecycle<>();
boolean doCommit = true;
TransactionFactoryImple factory = null;

@Override
Expand All @@ -83,18 +88,12 @@ public void fini() {
@Override
public Void doWork(Void context, int batchSize, Measurement<Void> measurement) {
for (int i = 0; i < batchSize; i++) {
Control control = factory.create(0);

try {
if (doCommit)
control.get_terminator().commit(true);
else
control.get_terminator().rollback();
} catch (org.omg.CORBA.UserException e) {
if (measurement.getNumberOfErrors() == 0)
e.printStackTrace();
Control control = factory.create(1);

measurement.incrementErrorCount();
control.get_terminator().commit(true);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}

Expand Down
30 changes: 26 additions & 4 deletions tools/src/main/java/io/narayana/perf/Measurement.java
Expand Up @@ -57,6 +57,7 @@ public class Measurement<T> implements Serializable {
int numberOfBatches = 0;
boolean regression;
boolean failOnRegression;
private Exception exception;

public Measurement(int numberOfThreads, int numberOfCalls) {
this(numberOfThreads, numberOfCalls, 10);
Expand Down Expand Up @@ -322,7 +323,7 @@ private Measurement<T> doWork(final WorkerWorkload<T> workload, final Measuremen
for (int i = 0; i < opts.getNumberOfThreads(); i++)
tasks.add(executor.submit(new Callable<Measurement<T>>() {
public Measurement<T> call() throws Exception {
Measurement<T> res = new Measurement<>(
Measurement<T> res = new Measurement<>(
opts.getMaxTestTime(), opts.getNumberOfThreads(), opts.getNumberOfCalls(), opts.getBatchSize());
int errorCount = 0;

Expand All @@ -331,11 +332,19 @@ public Measurement<T> call() throws Exception {
long start = System.nanoTime();

// all threads are ready - this thread gets more work in batch size chunks until there isn't anymore
while(count.decrementAndGet() >= 0) {
while (count.decrementAndGet() >= 0) {
res.setNumberOfCalls(opts.getBatchSize());
// ask the worker to do batchSize units or work
res.setContext(workload.doWork(res.getContext(), opts.getBatchSize(), res));
errorCount += res.getNumberOfErrors();
try {
res.setContext(workload.doWork(res.getContext(), opts.getBatchSize(), res));
errorCount += res.getNumberOfErrors();
} catch (Exception e) {
if (res.getException() == null)
e.printStackTrace();
res.setException(e);
errorCount += opts.getBatchSize();
res.setCancelled(true);
}

if (res.isCancelled()) {
for (Future<Measurement<T>> task : tasks) {
Expand All @@ -344,6 +353,8 @@ public Measurement<T> call() throws Exception {
}

opts.setContext(res.getContext());
if (res.getException() != null)
opts.setException(res.getException());

break;
}
Expand Down Expand Up @@ -403,6 +414,9 @@ public Measurement<T> call() throws Exception {
if (context != null)
opts.addContext(context);

if (outcome.getException() != null)
opts.setException(outcome.getException());

opts.incrementErrorCount(outcome.getNumberOfErrors());
} catch (CancellationException e) {
opts.incrementErrorCount(opts.getBatchSize());
Expand All @@ -422,6 +436,14 @@ public Measurement<T> call() throws Exception {
return opts;
}

public void setException(Exception exception) {
this.exception = exception;
}

public Exception getException() {
return exception;
}

public static final class Builder<T> {
private String name;
private int numberOfCalls = 10;
Expand Down
3 changes: 3 additions & 0 deletions tools/src/main/java/io/narayana/perf/WorkerWorkload.java
Expand Up @@ -31,6 +31,9 @@ public interface WorkerWorkload<T> {
* Perform a batch of work units. @see Measurement#measure begins a number of threads and each thread
* then invokes the doWork method in parallel until there is no more remaining work.
*
* If work throws an exception then whole test is cancelled and exception is reported via the
* (@link{Measurement.exception}) property
*
* @param context a thread specific instance that may have been returned by a previous invocation of the doWork
* method by this thread. This may be useful if the worker needs to save thread specific data
* for use by later invocations on the same thread
Expand Down
31 changes: 31 additions & 0 deletions tools/src/test/java/io/narayana/perf/PerformanceTest.java
Expand Up @@ -31,8 +31,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static junit.framework.Assert.assertNotNull;
import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.*;
Expand Down Expand Up @@ -395,4 +397,33 @@ public void testAverager() {
assertEquals("Outlier in data was not detected", size - 1, values.size());
}
}

/**
* Test that if a worker throws an exception that the test is canceled
*/
@Test
public void testCancelOnException() {
WorkerWorkload<Void> worker = new WorkerWorkload<Void>() {
AtomicBoolean cancelled = new AtomicBoolean(false);

@Override
public Void doWork(Void context, int niters, Measurement<Void> opts) {
if (!cancelled.getAndSet(true))
throw new RuntimeException("Testing throw exception");

return context;
}
@Override
public void finishWork(Measurement<Void> measurement) {
}
};

Measurement<Void> config = new Measurement<>(2, 100);

config.measure(worker);

assertNotNull("Test should have thrown an exception", config.getException());

System.out.printf("testCancelOnException!: %s%n", config);
}
}