Skip to content

Commit

Permalink
Favour CompletableFuture instead of PoolService
Browse files Browse the repository at this point in the history
  • Loading branch information
krmahadevan committed Nov 28, 2023
1 parent ea53982 commit 6de4807
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 108 deletions.
16 changes: 4 additions & 12 deletions testng-core/src/main/java/org/testng/internal/ObjectBag.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package org.testng.internal;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.testng.ISuite;
import org.testng.log4testng.Logger;
Expand Down Expand Up @@ -42,16 +41,9 @@ public Object createIfRequired(Class<?> type, Supplier<Object> supplier) {

public void cleanup() {
bag.values().stream()
.filter(it -> it instanceof Closeable)
.map(it -> (Closeable) it)
.forEach(
it -> {
try {
it.close();
} catch (IOException e) {
logger.debug("Could not clean-up " + it, e);
}
});
.filter(it -> it instanceof ExecutorService)
.map(it -> (ExecutorService) it)
.forEach(ExecutorService::shutdown);
bag.clear();
}
}
77 changes: 0 additions & 77 deletions testng-core/src/main/java/org/testng/internal/PoolService.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
package org.testng.internal.invokers;

import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.supplyAsync;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.testng.ITestContext;
import org.testng.ITestResult;
import org.testng.collections.CollectionUtils;
import org.testng.collections.Lists;
import org.testng.internal.ObjectBag;
import org.testng.internal.Parameters;
import org.testng.internal.PoolService;
import org.testng.internal.invokers.ITestInvoker.FailureContext;
import org.testng.internal.invokers.TestMethodArguments.Builder;
import org.testng.internal.thread.ThreadUtil;
import org.testng.xml.XmlSuite;

public class MethodRunner implements IMethodRunner {
Expand Down Expand Up @@ -95,11 +105,13 @@ public List<ITestResult> runInParallel(
Iterator<Object[]> allParamValues,
boolean skipFailedInvocationCounts) {
XmlSuite suite = context.getSuite().getXmlSuite();
List<ITestResult> result = Lists.newArrayList();
List<TestMethodWithDataProviderMethodWorker> workers = Lists.newArrayList();
int parametersIndex = 0;
Iterable<Object[]> allParameterValues = CollectionUtils.asIterable(allParamValues);
for (Object[] next : allParameterValues) {
ObjectBag objectBag = ObjectBag.getInstance(context.getSuite());
boolean reUse = suite.isShareThreadPoolForDataProviders();

ExecutorService service = getOrCreate(reUse, suite.getDataProviderThreadCount(), objectBag);
List<CompletableFuture<List<ITestResult>>> all = new ArrayList<>();
for (Object[] next : CollectionUtils.asIterable(allParamValues)) {
if (next == null) {
// skipped value
parametersIndex += 1;
Expand All @@ -126,26 +138,43 @@ public List<ITestResult> runInParallel(
invocationCount.get(),
failure.count.get(),
testInvoker.getNotifier());
workers.add(w);
all.add(supplyAsync(w::call, service));
// testng387: increment the param index in the bag.
parametersIndex += 1;
}

ObjectBag objectBag = ObjectBag.getInstance(context.getSuite());
boolean sharedThreadPool = context.getSuite().getXmlSuite().isShareThreadPoolForDataProviders();
// don't block on execution of any of the completablefuture
CompletableFuture<Void> combined = allOf(all.toArray(new CompletableFuture[0]));

@SuppressWarnings("unchecked")
PoolService<List<ITestResult>> ps =
sharedThreadPool
? (PoolService<List<ITestResult>>)
objectBag.createIfRequired(
PoolService.class,
() -> new PoolService<>(suite.getDataProviderThreadCount(), false))
: new PoolService<>(suite.getDataProviderThreadCount());
List<List<ITestResult>> r = ps.submitTasksAndWait(workers);
for (List<ITestResult> l2 : r) {
result.addAll(l2);
// Now start processing the results of each of the CompletableFutures as and when they
// become available
List<ITestResult> result =
combined
.thenApply(
ignored ->
all.stream()
.map(CompletableFuture::join)
.flatMap(Collection::stream)
.collect(Collectors.toList()))
.join();
if (!reUse) {
service.shutdown();
}
return result;
}

private static ExecutorService getOrCreate(boolean reUse, int count, ObjectBag objectBag) {
if (reUse) {
return (ExecutorService)
objectBag.createIfRequired(
ExecutorService.class, () -> Executors.newFixedThreadPool(count, threadFactory()));
}
return Executors.newFixedThreadPool(count, threadFactory());
}

private static ThreadFactory threadFactory() {
AtomicInteger threadNumber = new AtomicInteger(0);
return r ->
new Thread(r, ThreadUtil.THREAD_NAME + "-PoolService-" + threadNumber.getAndIncrement());
}
}

0 comments on commit 6de4807

Please sign in to comment.