Skip to content

Commit

Permalink
Assure thread affinity
Browse files Browse the repository at this point in the history
Closes testng-team#89, testng-team#1050, testng-team#1066, testng-team#1173, testng-team#1185

This PR aims at assuring that methods that fall
under the below two use cases, all run on the 
same thread when classes are being run in parallel:

* @test methods in a class are ordered by priority
* @test methods have a single dependency on 
another method using “dependsOnMethods” attribute.

The thread affinity feature is supposed to be 
“experimental” and it can be turned on via the
JVM argument : -Dtestng.thread.affinity=true

This feature is turned off by default just to 
ensure that we don’t have any users experiencing
un-usual behavior.
  • Loading branch information
krmahadevan committed May 14, 2018
1 parent 6ebfe94 commit 02ef8b4
Show file tree
Hide file tree
Showing 16 changed files with 411 additions and 63 deletions.
6 changes: 6 additions & 0 deletions CHANGES.txt
@@ -1,4 +1,10 @@
Current
New : TestNG now guarantees thread-affinity for methods that use either preserve-order (or) dependsOnMethods (Krishnan Mahadevan)
Fixed: GITHUB-1185: DependsOnMethods made parallel class use several threads (Krishnan Mahadevan)
Fixed: GITHUB-1173: Parallel="classes" executes methods of test class in different threads (Krishnan Mahadevan)
Fixed: GITHUB-1066: Regression is in priority. It broke parallel mode (Krishnan Mahadevan)
Fixed: GITHUB-1050: Parallel classes runs methods from one class in different threads, interleaves two classes in one thread (Krishnan Mahadevan)
Fixed: GITHUB-89: parallel="classes" is not forcing test methods from the same testClass to be run in the same thread as it is suposed to (Krishnan Mahadevan)
Fixed: GITHUB-1719: successPercentage does not work correctly for tests with dataProvider (Krishnan Mahadevan)
Fixed: GITHUB-1241: Streamline Retry Analyzer usage when same test is run multiple times (Krishnan Mahadevan)
Fixed: GITHUB-1777: ITestListener.onTestStart() not called after fail or skip from @BeforeMethod (Krishnan Mahadevan)
Expand Down
21 changes: 11 additions & 10 deletions src/main/java/org/testng/TestRunner.java
Expand Up @@ -57,6 +57,8 @@
import com.google.inject.Injector;
import com.google.inject.Module;

import javax.annotation.Nonnull;

import static org.testng.internal.MethodHelper.fixMethodsWithClass;

/**
Expand Down Expand Up @@ -147,7 +149,6 @@ public class TestRunner
private ClassMethodMap m_classMethodMap;
private TestNGClassFinder m_testClassFinder;
private IConfiguration m_configuration;
private IMethodInterceptor builtinInterceptor;

public enum PriorityWeight {
groupByInstance, preserveOrder, priority, dependsOnGroups, dependsOnMethods
Expand Down Expand Up @@ -212,7 +213,7 @@ private void init(IConfiguration configuration,
setVerbose(test.getVerbose());

boolean preserveOrder = test.getPreserveOrder();
builtinInterceptor = preserveOrder ? new PreserveOrderMethodInterceptor() : new InstanceOrderingMethodInterceptor();
IMethodInterceptor builtinInterceptor = preserveOrder ? new PreserveOrderMethodInterceptor() : new InstanceOrderingMethodInterceptor();
m_methodInterceptors = new ArrayList<>();
//Add the built in interceptor as the first interceptor. That way we let our users determine the final order
//by plugging in their own custom interceptors as well.
Expand Down Expand Up @@ -370,7 +371,7 @@ private void initMethods() {
List<ITestNGMethod> afterXmlTestMethods = Lists.newArrayList();

ClassInfoMap classMap = new ClassInfoMap(m_testClassesFromXml);
m_testClassFinder= new TestNGClassFinder(classMap,Maps.<Class<?>, List<Object>>newHashMap(),
m_testClassFinder= new TestNGClassFinder(classMap,Maps.newHashMap(),
m_configuration, this, m_dataProviderListeners);
ITestMethodFinder testMethodFinder = new TestNGMethodFinder(m_runInfo, m_annotationFinder, comparator);

Expand Down Expand Up @@ -570,7 +571,7 @@ public void run() {
IJUnitTestRunner tr= ClassHelper.createTestRunner(TestRunner.this);
tr.setInvokedMethodListeners(m_invokedMethodListeners);
try {
tr.run(tc, methods.toArray(new String[methods.size()]));
tr.run(tc, methods.toArray(new String[0]));
}
catch(Exception ex) {
ex.printStackTrace();
Expand All @@ -596,13 +597,13 @@ public int getPriority() {
}

@Override
public int compareTo(IWorker<ITestNGMethod> other) {
public int compareTo(@Nonnull IWorker<ITestNGMethod> other) {
return getPriority() - other.getPriority();
}
});

runJUnitWorkers(workers);
m_allTestMethods= runMethods.toArray(new ITestNGMethod[runMethods.size()]);
m_allTestMethods= runMethods.toArray(new ITestNGMethod[0]);
}

/**
Expand All @@ -625,7 +626,7 @@ private void privateRun(XmlTest xmlTest) {
GraphThreadPoolExecutor<ITestNGMethod> executor =
new GraphThreadPoolExecutor<>("test=" + xmlTest.getName(), graph, this,
threadCount, threadCount, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
new LinkedBlockingQueue<>());
executor.run();
try {
long timeOut = m_xmlTest.getTimeOut(XmlTest.DEFAULT_TIMEOUT_MS);
Expand Down Expand Up @@ -675,7 +676,7 @@ private ITestNGMethod[] intercept(ITestNGMethod[] methods) {
//so let's update the current classMethodMap object with the list of methods obtained from the interceptor.
this.m_classMethodMap = new ClassMethodMap(result, null);

ITestNGMethod[] resultArray = result.toArray(new ITestNGMethod[result.size()]);
ITestNGMethod[] resultArray = result.toArray(new ITestNGMethod[0]);

//Check if an interceptor had altered the effective test method count. If yes, then we need to
//update our configurationGroupMethod object with that information.
Expand Down Expand Up @@ -822,13 +823,13 @@ public IResultMap getFailedButWithinSuccessPercentageTests() {
@Override
public String[] getIncludedGroups() {
Map<String, String> ig= m_xmlMethodSelector.getIncludedGroups();
return ig.values().toArray(new String[ig.size()]);
return ig.values().toArray(new String[0]);
}

@Override
public String[] getExcludedGroups() {
Map<String, String> eg= m_xmlMethodSelector.getExcludedGroups();
return eg.values().toArray(new String[eg.size()]);
return eg.values().toArray(new String[0]);
}

@Override
Expand Down
42 changes: 28 additions & 14 deletions src/main/java/org/testng/internal/DynamicGraph.java
Expand Up @@ -83,6 +83,14 @@ public List<T> getFreeNodes() {
return finalResult;
}

public List<T> getDependenciesFor(T node) {
Map<T, Integer> data = m_edges.to(node);
if (data == null) {
return Lists.newArrayList();
}
return Lists.newArrayList(data.keySet());
}

/**
* Set the status for a set of nodes.
*/
Expand Down Expand Up @@ -130,6 +138,10 @@ public void setStatus(T node, Status status) {

m_edges.removeNode(node);
break;
case READY:
m_nodesReady.add(node);
m_nodesRunning.remove(node);
break;
default:
throw new IllegalArgumentException("Unsupported status: " + status);
}
Expand All @@ -139,28 +151,30 @@ public void setStatus(T node, Status status) {
* @return the number of nodes in this graph.
*/
public int getNodeCount() {
int result = m_nodesReady.size() + m_nodesRunning.size() + m_nodesFinished.size();
return result;
return m_nodesReady.size() + m_nodesRunning.size() + m_nodesFinished.size();
}

public int getNodeCountWithStatus(Status status) {
return getNodesWithStatus(status).size();
}

public Set<T> getNodesWithStatus(Status status) {
switch(status) {
case READY: return m_nodesReady.size();
case RUNNING: return m_nodesRunning.size();
case FINISHED: return m_nodesFinished.size();
case READY: return m_nodesReady;
case RUNNING: return m_nodesRunning;
case FINISHED: return m_nodesFinished;
default: throw new IllegalArgumentException();
}

}

@Override
public String toString() {
StringBuilder result = new StringBuilder("[DynamicGraph ");
result.append("\n Ready:").append(m_nodesReady);
result.append("\n Running:").append(m_nodesRunning);
result.append("\n Finished:").append(m_nodesFinished);
result.append("\n Edges:\n").append(m_edges);
result.append("]");
return result.toString();
return "[DynamicGraph " + "\n Ready:" + m_nodesReady +
"\n Running:" + m_nodesRunning +
"\n Finished:" + m_nodesFinished +
"\n Edges:\n" + m_edges +
"]";
}

private static <T> String dotShortName(T t) {
Expand Down Expand Up @@ -255,8 +269,8 @@ Map<T, Integer> to(T node) {
* Return the weight of the edge in the graph that is the reversed direction of edge. For example, if
* edge a -> b exists, and edge b -> a is passed in, then return a -> b.
*
* @param from
* @param to
* @param from - the from edge
* @param to - the to edge
* @return the weight of the reversed edge or null if edge does not exist
*/
private Integer findReversedEdge(T from, T to) {
Expand Down
16 changes: 15 additions & 1 deletion src/main/java/org/testng/internal/RuntimeBehavior.java
Expand Up @@ -8,6 +8,8 @@
public final class RuntimeBehavior {

public static final String TESTNG_LISTENERS_ALWAYSRUN = "testng.listeners.alwaysrun";
public static final String TESTNG_THREAD_AFFINITY = "testng.thread.affinity";
public static final String TESTNG_MODE_DRYRUN = "testng.mode.dryrun";

private RuntimeBehavior() {
}
Expand All @@ -17,7 +19,7 @@ private RuntimeBehavior() {
* <code>false</code> otherwise.
*/
public static boolean isDryRun() {
String value = System.getProperty("testng.mode.dryrun", "false");
String value = System.getProperty(TESTNG_MODE_DRYRUN, "false");
return Boolean.parseBoolean(value);
}

Expand All @@ -42,4 +44,16 @@ public static TimeZone getTimeZone() {
public static boolean invokeListenersForSkippedTests() {
return Boolean.parseBoolean(System.getProperty(TESTNG_LISTENERS_ALWAYSRUN, "false"));
}

/**
* @return - <code>true</code> if we would like to enforce Thread affinity when dealing with the
* below two variants of execution models:
* <ul>
* <li>Ordering priority
* <li>Ordering by dependsOnMethods (will not work with dependency on multiple methods)
* </ul>
*/
public static boolean enforceThreadAffinity() {
return Boolean.parseBoolean(System.getProperty(TESTNG_THREAD_AFFINITY, "false"));
}
}
49 changes: 35 additions & 14 deletions src/main/java/org/testng/internal/TestMethodWorker.java
Expand Up @@ -11,11 +11,14 @@
import org.testng.internal.thread.graph.IWorker;
import org.testng.xml.XmlSuite;

import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

/**
* FIXME: reduce contention when this class is used through parallel invocation due to
Expand All @@ -39,6 +42,9 @@ public class TestMethodWorker implements IWorker<ITestNGMethod> {
private final ClassMethodMap m_classMethodMap;
private final ITestContext m_testContext;
private final List<IClassListener> m_listeners;
private long currentThreadId;
private long threadIdToRunOn = -1;
private boolean completed = true;

public TestMethodWorker(IInvoker invoker,
List<IMethodInstance> testMethods,
Expand Down Expand Up @@ -98,6 +104,12 @@ public String toString() {
*/
@Override
public void run() {
this.currentThreadId = Thread.currentThread().getId();
if (RuntimeBehavior.enforceThreadAffinity() && doesTaskHavePreRequistes() && currentThreadId != threadIdToRunOn) {
completed = false;
return;
}

for (IMethodInstance testMthdInst : m_methodInstances) {
ITestNGMethod testMethod = testMthdInst.getMethod();
ITestClass testClass = testMethod.getTestClass();
Expand All @@ -113,6 +125,10 @@ public void run() {
}
}

private boolean doesTaskHavePreRequistes() {
return threadIdToRunOn != -1;
}

protected void invokeTestMethods(ITestNGMethod tm, Object instance,
ITestContext testContext)
{
Expand All @@ -136,8 +152,6 @@ protected void invokeTestMethods(ITestNGMethod tm, Object instance,

/**
* Invoke the @BeforeClass methods if not done already
* @param testClass
* @param mi
*/
protected void invokeBeforeClassMethods(ITestClass testClass, IMethodInstance mi) {
// if no BeforeClass than return immediately
Expand Down Expand Up @@ -177,8 +191,6 @@ protected void invokeBeforeClassMethods(ITestClass testClass, IMethodInstance mi

/**
* Invoke the @AfterClass methods if not done already
* @param testClass
* @param mi
*/
protected void invokeAfterClassMethods(ITestClass testClass, IMethodInstance mi) {
// if no BeforeClass than return immediately
Expand Down Expand Up @@ -244,7 +256,7 @@ public List<ITestNGMethod> getTasks()
}

@Override
public int compareTo(IWorker<ITestNGMethod> other) {
public int compareTo(@Nonnull IWorker<ITestNGMethod> other) {
return getPriority() - other.getPriority();
}

Expand All @@ -257,6 +269,22 @@ public int getPriority() {
? m_methodInstances.get(0).getMethod().getPriority()
: 0;
}

@Override
public long getCurrentThreadId() {
return currentThreadId;
}

@Override
public void setThreadIdToRunOn(long threadIdToRunOn) {
this.threadIdToRunOn = threadIdToRunOn;
}

@Override
public boolean completed() {
return this.completed;
}

}

/**
Expand All @@ -265,8 +293,7 @@ public int getPriority() {
*/
class SingleTestMethodWorker extends TestMethodWorker {
private static final ConfigurationGroupMethods EMPTY_GROUP_METHODS =
new ConfigurationGroupMethods(new ITestNGMethod[0],
new HashMap<String, List<ITestNGMethod>>(), new HashMap<String, List<ITestNGMethod>>());
new ConfigurationGroupMethods(new ITestNGMethod[0], new HashMap<>(), new HashMap<>());

public SingleTestMethodWorker(IInvoker invoker,
IMethodInstance testMethod,
Expand All @@ -276,7 +303,7 @@ public SingleTestMethodWorker(IInvoker invoker,
List<IClassListener> listeners)
{
super(invoker,
asList(testMethod),
Collections.singletonList(testMethod),
suite,
parameters,
EMPTY_GROUP_METHODS,
Expand All @@ -285,11 +312,5 @@ public SingleTestMethodWorker(IInvoker invoker,
listeners);
}

//TODO Resorted to introducing this method to keep JDK7 happy. Can be removed once we move to JDK8
private static List<IMethodInstance> asList(IMethodInstance testMethod) {
List<IMethodInstance> methods = Lists.newLinkedList();
methods.add(testMethod);
return methods;
}

}

0 comments on commit 02ef8b4

Please sign in to comment.