diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java index 9e0dd046f6939..501d3f6e606ab 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestQueryFramework.java @@ -63,11 +63,18 @@ import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.parallel.Execution; +import java.lang.reflect.Method; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -80,10 +87,12 @@ import static io.trino.execution.querystats.PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector; import static io.trino.sql.SqlFormatter.formatSql; import static io.trino.sql.planner.OptimizerConfig.JoinReorderingStrategy; +import static io.trino.testing.ThreadAssertions.reportLeakedThreads; import static io.trino.testing.TransactionBuilder.transaction; import static io.trino.testing.assertions.Assert.assertEventually; import static java.lang.String.format; import static java.util.Collections.emptyList; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -96,8 +105,10 @@ public abstract class AbstractTestQueryFramework { private static final SqlParser SQL_PARSER = new SqlParser(); + private static final AtomicInteger SEQUENCE = new AtomicInteger(); private AutoCloseableCloser afterClassCloser; + private ThreadGroup queryRunnerThreadGroup; private QueryRunner queryRunner; private H2QueryRunner h2QueryRunner; private io.trino.sql.query.QueryAssertions queryAssertions; @@ -110,11 +121,115 @@ public void init() logging.setLevel("org.testcontainers", Level.WARN); afterClassCloser = AutoCloseableCloser.create(); - queryRunner = afterClassCloser.register(createQueryRunner()); + + try { + // Init unnamed thread org.h2.engine.OnExitDatabaseCloser.INSTANCE + Class.forName("org.h2.engine.OnExitDatabaseCloser"); + } + catch (ClassNotFoundException _) { + } + + try { + // Init unnamed thread org.apache.hadoop.fs.FileSystem.Statistics.STATS_DATA_CLEANER + Class.forName("org.apache.hadoop.fs.FileSystem.Statistics"); + } + catch (ClassNotFoundException _) { + } + + try { + // Init reactor.core.scheduler.Schedulers.CACHED_BOUNDED_ELASTIC + Class schedulers = Class.forName("reactor.core.scheduler.Schedulers"); + int defaultBoundedElasticSize = (int) schedulers.getField("DEFAULT_BOUNDED_ELASTIC_SIZE").get(null); + Object scheduler = schedulers.getMethod("boundedElastic").invoke(null); + Method schedule = Class.forName("reactor.core.scheduler.Scheduler").getMethod("schedule", Runnable.class); + CyclicBarrier barrier = new CyclicBarrier(defaultBoundedElasticSize + 1); + for (int i = 0; i < defaultBoundedElasticSize; i++) { + schedule.invoke(scheduler, (Runnable) () -> { + try { + barrier.await(5, SECONDS); + } + catch (BrokenBarrierException | TimeoutException _) { + } + catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + try { + barrier.await(5, SECONDS); + } + catch (BrokenBarrierException | TimeoutException _) { + } + } + catch (ClassNotFoundException _) { + } + + try { + // Init org.apache.iceberg.hadoop.HadoopFileIO#executorService + Object configuration = Class.forName("org.apache.hadoop.conf.Configuration").getConstructor().newInstance(); + Object hadoopFileIo = Class.forName("org.apache.iceberg.hadoop.HadoopFileIO").getConstructor(configuration.getClass()).newInstance(configuration); + Method getOrCreateStaticExecutorService = hadoopFileIo.getClass().getDeclaredMethod("executorService"); + Method deleteThreads = hadoopFileIo.getClass().getDeclaredMethod("deleteThreads"); + getOrCreateStaticExecutorService.setAccessible(true); + deleteThreads.setAccessible(true); + CyclicBarrier barrier = new CyclicBarrier((int) deleteThreads.invoke(hadoopFileIo) + 1); + ExecutorService executorService = (ExecutorService) getOrCreateStaticExecutorService.invoke(hadoopFileIo); + for (int i = 0; i < 100; i++) { + executorService.submit(() -> { + barrier.await(5, SECONDS); + return null; + }); + } + // we might not be the first to create the ExecutorService, so the max number of threads in the pool may be different than expected + try { + barrier.await(5, SECONDS); + } + catch (BrokenBarrierException | TimeoutException _) { + } + } + catch (ClassNotFoundException _) { + } + + try { + // Init unnamed threads in io.prometheus.metrics.core.util.Scheduler.executor + Class schedulerClass = Class.forName("io.prometheus.metrics.core.util.Scheduler"); + schedulerClass.getMethod("awaitInitialization").invoke(null); + } + catch (ClassNotFoundException _) { + } + + try { + // Init and fill org.apache.accumulo.core.util.threads.ThreadPools#SCHEDULED_FUTURE_CHECKER_POOL + Class.forName("org.apache.accumulo.core.util.threads.ThreadPools"); + } + catch (ClassNotFoundException _) { + } + + queryRunnerThreadGroup = new ThreadGroup("atqf-test-group-" + SEQUENCE.incrementAndGet()); + Thread.Builder.OfPlatform threadBuilder = Thread.ofPlatform().group(queryRunnerThreadGroup).name("AbstractTestQueryFramework-", 0); + ThreadFactory threadFactory = (ThreadFactory) Thread.Builder.OfPlatform.class.getMethod("factory").invoke(threadBuilder); // Suppress modernizer + try (ExecutorService executor = newSingleThreadScheduledExecutor(threadFactory)) { + queryRunner = afterClassCloser.register(executor.submit(this::createQueryRunner).get()); + } h2QueryRunner = afterClassCloser.register(new H2QueryRunner()); queryAssertions = new io.trino.sql.query.QueryAssertions(queryRunner); } + public static final class Testing + { + private Testing() {} + + public static void main(String[] args) + throws Exception + { + Thread thread = new Thread(() -> { + System.out.println("Thread.currentThread().getName() = " + Thread.currentThread().getName()); + }); + thread.start(); + thread.join(); + } + } + protected abstract QueryRunner createQueryRunner() throws Exception; @@ -122,6 +237,7 @@ protected abstract QueryRunner createQueryRunner() public final void close() throws Exception { + System.out.println("AbstractTestQueryFramework.close"); try (AutoCloseable ignored = afterClassCloser) { checkQueryMemoryReleased(); checkQueryInfosFinal(); @@ -133,6 +249,20 @@ public final void close() h2QueryRunner = null; queryAssertions = null; } + + if (queryRunnerThreadGroup != null) { + boolean skipCheck = + getClass().getSimpleName().contains("Accumulo") || + getClass().getSimpleName().contains("Nessie") || + getClass().getSimpleName().contains("Kudu") || + getClass().getSimpleName().contains("Phoenix") || + getClass().getSimpleName().contains("Raptor"); + if (!skipCheck) { + reportLeakedThreads(queryRunnerThreadGroup); + } + queryRunnerThreadGroup = null; + } + System.out.println("AbstractTestQueryFramework.close OK"); } private void checkQueryMemoryReleased()