Skip to content

Commit

Permalink
Test no thread leaks in all the ATQF
Browse files Browse the repository at this point in the history
  • Loading branch information
findepi committed May 20, 2024
1 parent 9acc6ed commit b737deb
Showing 1 changed file with 131 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,18 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;

import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -80,10 +87,12 @@
import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector;
import static io.trino.sql.SqlFormatter.formatSql;
import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy;
import static io.trino.testing.ThreadAssertions.reportLeakedThreads;
import static io.trino.testing.TransactionBuilder.transaction;
import static io.trino.testing.assertions.Assert.assertEventually;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -96,8 +105,10 @@
public abstract class AbstractTestQueryFramework
{
private static final SqlParser SQL_PARSER = new SqlParser();
private static final AtomicInteger SEQUENCE = new AtomicInteger();

private AutoCloseableCloser afterClassCloser;
private ThreadGroup queryRunnerThreadGroup;
private QueryRunner queryRunner;
private H2QueryRunner h2QueryRunner;
private io.trino.sql.query.QueryAssertions queryAssertions;
Expand All @@ -110,18 +121,123 @@ public void init()
logging.setLevel("org.testcontainers", Level.WARN);

afterClassCloser = AutoCloseableCloser.create();
queryRunner = afterClassCloser.register(createQueryRunner());

try {
// Init unnamed thread org.h2.engine.OnExitDatabaseCloser.INSTANCE
Class.forName("org.h2.engine.OnExitDatabaseCloser");
}
catch (ClassNotFoundException _) {
}

try {
// Init unnamed thread org.apache.hadoop.fs.FileSystem.Statistics.STATS_DATA_CLEANER
Class.forName("org.apache.hadoop.fs.FileSystem.Statistics");
}
catch (ClassNotFoundException _) {
}

try {
// Init reactor.core.scheduler.Schedulers.CACHED_BOUNDED_ELASTIC
Class<?> schedulers = Class.forName("reactor.core.scheduler.Schedulers");
int defaultBoundedElasticSize = (int) schedulers.getField("DEFAULT_BOUNDED_ELASTIC_SIZE").get(null);
Object scheduler = schedulers.getMethod("boundedElastic").invoke(null);
Method schedule = Class.forName("reactor.core.scheduler.Scheduler").getMethod("schedule", Runnable.class);
CyclicBarrier barrier = new CyclicBarrier(defaultBoundedElasticSize + 1);
for (int i = 0; i < defaultBoundedElasticSize; i++) {
schedule.invoke(scheduler, (Runnable) () -> {
try {
barrier.await(5, SECONDS);
}
catch (BrokenBarrierException | TimeoutException _) {
}
catch (Exception e) {
throw new RuntimeException(e);
}
});
}
try {
barrier.await(5, SECONDS);
}
catch (BrokenBarrierException | TimeoutException _) {
}
}
catch (ClassNotFoundException _) {
}

try {
// Init org.apache.iceberg.hadoop.HadoopFileIO#executorService
Object configuration = Class.forName("org.apache.hadoop.conf.Configuration").getConstructor().newInstance();
Object hadoopFileIo = Class.forName("org.apache.iceberg.hadoop.HadoopFileIO").getConstructor(configuration.getClass()).newInstance(configuration);
Method getOrCreateStaticExecutorService = hadoopFileIo.getClass().getDeclaredMethod("executorService");
Method deleteThreads = hadoopFileIo.getClass().getDeclaredMethod("deleteThreads");
getOrCreateStaticExecutorService.setAccessible(true);
deleteThreads.setAccessible(true);
CyclicBarrier barrier = new CyclicBarrier((int) deleteThreads.invoke(hadoopFileIo) + 1);
ExecutorService executorService = (ExecutorService) getOrCreateStaticExecutorService.invoke(hadoopFileIo);
for (int i = 0; i < 100; i++) {
executorService.submit(() -> {
barrier.await(5, SECONDS);
return null;
});
}
// we might not be the first to create the ExecutorService, so the max number of threads in the pool may be different than expected
try {
barrier.await(5, SECONDS);
}
catch (BrokenBarrierException | TimeoutException _) {
}
}
catch (ClassNotFoundException _) {
}

try {
// Init unnamed threads in io.prometheus.metrics.core.util.Scheduler.executor
Class<?> schedulerClass = Class.forName("io.prometheus.metrics.core.util.Scheduler");
schedulerClass.getMethod("awaitInitialization").invoke(null);
}
catch (ClassNotFoundException _) {
}

try {
// Init and fill org.apache.accumulo.core.util.threads.ThreadPools#SCHEDULED_FUTURE_CHECKER_POOL
Class.forName("org.apache.accumulo.core.util.threads.ThreadPools");
}
catch (ClassNotFoundException _) {
}

queryRunnerThreadGroup = new ThreadGroup("atqf-test-group-" + SEQUENCE.incrementAndGet());
Thread.Builder.OfPlatform threadBuilder = Thread.ofPlatform().group(queryRunnerThreadGroup).name("AbstractTestQueryFramework-", 0);
ThreadFactory threadFactory = (ThreadFactory) Thread.Builder.OfPlatform.class.getMethod("factory").invoke(threadBuilder); // Suppress modernizer
try (ExecutorService executor = newSingleThreadScheduledExecutor(threadFactory)) {
queryRunner = afterClassCloser.register(executor.submit(this::createQueryRunner).get());
}
h2QueryRunner = afterClassCloser.register(new H2QueryRunner());
queryAssertions = new io.trino.sql.query.QueryAssertions(queryRunner);
}

public static final class Testing
{
private Testing() {}

public static void main(String[] args)
throws Exception
{
Thread thread = new Thread(() -> {
System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName());
});
thread.start();
thread.join();
}
}

protected abstract QueryRunner createQueryRunner()
throws Exception;

@AfterAll
public final void close()
throws Exception
{
System.out.println("AbstractTestQueryFramework.close");
try (AutoCloseable ignored = afterClassCloser) {
checkQueryMemoryReleased();
checkQueryInfosFinal();
Expand All @@ -133,6 +249,20 @@ public final void close()
h2QueryRunner = null;
queryAssertions = null;
}

if (queryRunnerThreadGroup != null) {
boolean skipCheck =
getClass().getSimpleName().contains("Accumulo") ||
getClass().getSimpleName().contains("Nessie") ||
getClass().getSimpleName().contains("Kudu") ||
getClass().getSimpleName().contains("Phoenix") ||
getClass().getSimpleName().contains("Raptor");
if (!skipCheck) {
reportLeakedThreads(queryRunnerThreadGroup);
}
queryRunnerThreadGroup = null;
}
System.out.println("AbstractTestQueryFramework.close OK");
}

private void checkQueryMemoryReleased()
Expand Down

0 comments on commit b737deb

Please sign in to comment.