diff --git a/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java b/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java index f2f2add32824..193470eb25c9 100644 --- a/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java +++ b/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/impl/async/DefaultAsyncExecutorFactory.java @@ -13,8 +13,8 @@ import org.infinispan.client.hotrod.impl.ConfigurationProperties; import org.infinispan.client.hotrod.logging.Log; import org.infinispan.client.hotrod.logging.LogFactory; +import org.infinispan.commons.ThreadGroups; import org.infinispan.commons.executors.ExecutorFactory; -import org.infinispan.commons.executors.NonBlockingResource; /** * Default implementation for {@link org.infinispan.commons.executors.ExecutorFactory} based on an {@link @@ -36,7 +36,7 @@ public ThreadPoolExecutor getExecutor(Properties p) { int factoryIndex = DefaultAsyncExecutorFactory.factoryCounter.incrementAndGet(); String threadNamePrefix = cp.getDefaultExecutorFactoryThreadNamePrefix(); String threadNameSuffix = cp.getDefaultExecutorFactoryThreadNameSuffix(); - ISPNNonBlockingThreadGroup nonBlockingThreadGroup = new ISPNNonBlockingThreadGroup(threadNamePrefix + "-group"); + ThreadGroups.ISPNNonBlockingThreadGroup nonBlockingThreadGroup = ThreadGroups.NON_BLOCKING_GROUP; ThreadFactory tf = r -> { int threadIndex = threadCounter.incrementAndGet(); Thread th = new Thread(nonBlockingThreadGroup, r, threadNamePrefix + "-" + factoryIndex + "-" + threadIndex + threadNameSuffix); @@ -53,9 +53,4 @@ public ThreadPoolExecutor getExecutor(Properties p) { }); } - static final class ISPNNonBlockingThreadGroup extends ThreadGroup implements NonBlockingResource { - ISPNNonBlockingThreadGroup(String name) { - super(name); - } - } } diff --git a/commons/all/src/main/java/org/infinispan/commons/ThreadGroups.java b/commons/all/src/main/java/org/infinispan/commons/ThreadGroups.java new file mode 100644 index 000000000000..32fac16cca0d --- /dev/null +++ b/commons/all/src/main/java/org/infinispan/commons/ThreadGroups.java @@ -0,0 +1,22 @@ +package org.infinispan.commons; + +import org.infinispan.commons.executors.BlockingResource; +import org.infinispan.commons.executors.NonBlockingResource; + +public interface ThreadGroups { + + ISPNNonBlockingThreadGroup NON_BLOCKING_GROUP = new ISPNNonBlockingThreadGroup("ISPN-non-blocking-group"); + ISPNBlockingThreadGroup BLOCKING_GROUP = new ISPNBlockingThreadGroup("ISPN-blocking-group"); + + final class ISPNNonBlockingThreadGroup extends ThreadGroup implements NonBlockingResource { + public ISPNNonBlockingThreadGroup(String name) { + super(name); + } + } + + final class ISPNBlockingThreadGroup extends ThreadGroup implements BlockingResource { + public ISPNBlockingThreadGroup(String name) { + super(name); + } + } +} diff --git a/core/src/main/java/org/infinispan/executors/DefaultExecutorFactory.java b/core/src/main/java/org/infinispan/executors/DefaultExecutorFactory.java index 5754e9ddc27e..ef51f5e2d344 100644 --- a/core/src/main/java/org/infinispan/executors/DefaultExecutorFactory.java +++ b/core/src/main/java/org/infinispan/executors/DefaultExecutorFactory.java @@ -11,10 +11,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.infinispan.commons.ThreadGroups; import org.infinispan.commons.executors.SecurityAwareExecutorFactory; import org.infinispan.commons.util.TypedProperties; -import org.infinispan.factories.threads.BlockingThreadFactory; -import org.infinispan.factories.threads.NonBlockingThreadFactory; /** * Default executor factory that creates executors using the JDK Executors service. @@ -46,8 +45,8 @@ public ExecutorService getExecutor(Properties p, final AccessControlContext cont if (blocking == null) { threadGroup = Thread.currentThread().getThreadGroup(); } else { - threadGroup = Boolean.parseBoolean(blocking) ? new BlockingThreadFactory.ISPNBlockingThreadGroup(threadNamePrefix + "-group") : - new NonBlockingThreadFactory.ISPNNonBlockingThreadGroup(threadNamePrefix + "-group"); + threadGroup = Boolean.parseBoolean(blocking) ? ThreadGroups.BLOCKING_GROUP : + ThreadGroups.NON_BLOCKING_GROUP; } BlockingQueue queue = queueSize == 0 ? new SynchronousQueue<>() : new LinkedBlockingQueue<>(queueSize); diff --git a/core/src/main/java/org/infinispan/factories/NamedExecutorsFactory.java b/core/src/main/java/org/infinispan/factories/NamedExecutorsFactory.java index 139031753613..207f9de669e8 100644 --- a/core/src/main/java/org/infinispan/factories/NamedExecutorsFactory.java +++ b/core/src/main/java/org/infinispan/factories/NamedExecutorsFactory.java @@ -108,10 +108,10 @@ private ThreadFactory createThreadFactoryWithDefaults(GlobalConfiguration global ExecutorServiceType type) { switch (type) { case BLOCKING: - return new BlockingThreadFactory("ISPN-blocking-thread-group", getDefaultThreadPrio(componentName), - DefaultThreadFactory.DEFAULT_PATTERN, globalCfg.transport().nodeName(), shortened(componentName)); + return new BlockingThreadFactory(getDefaultThreadPrio(componentName), DefaultThreadFactory.DEFAULT_PATTERN, + globalCfg.transport().nodeName(), shortened(componentName)); case NON_BLOCKING: - return new NonBlockingThreadFactory("ISPN-non-blocking-thread-group", getDefaultThreadPrio(componentName), + return new NonBlockingThreadFactory(getDefaultThreadPrio(componentName), DefaultThreadFactory.DEFAULT_PATTERN, globalCfg.transport().nodeName(), shortened(componentName)); default: // Use defaults diff --git a/core/src/main/java/org/infinispan/factories/threads/BlockingThreadFactory.java b/core/src/main/java/org/infinispan/factories/threads/BlockingThreadFactory.java index fe17c9904e1b..1c0c3f4588da 100644 --- a/core/src/main/java/org/infinispan/factories/threads/BlockingThreadFactory.java +++ b/core/src/main/java/org/infinispan/factories/threads/BlockingThreadFactory.java @@ -1,21 +1,17 @@ package org.infinispan.factories.threads; +import org.infinispan.commons.ThreadGroups; import org.infinispan.commons.executors.BlockingResource; public class BlockingThreadFactory extends DefaultThreadFactory implements BlockingResource { - public BlockingThreadFactory(String threadGroupName, int initialPriority, String threadNamePattern, - String node, String component) { - super(new ISPNBlockingThreadGroup(threadGroupName), initialPriority, threadNamePattern, node, component); + + public BlockingThreadFactory(int initialPriority, String threadNamePattern, String node, String component) { + super(ThreadGroups.BLOCKING_GROUP, initialPriority, threadNamePattern, node, component); } public BlockingThreadFactory(String name, String threadGroupName, int initialPriority, String threadNamePattern, String node, String component) { - super(name, new ISPNBlockingThreadGroup(threadGroupName), initialPriority, threadNamePattern, node, component); + super(name, new ThreadGroups.ISPNBlockingThreadGroup(threadGroupName), initialPriority, threadNamePattern, node, component); } - public static final class ISPNBlockingThreadGroup extends ThreadGroup implements BlockingResource { - public ISPNBlockingThreadGroup(String name) { - super(name); - } - } } diff --git a/core/src/main/java/org/infinispan/factories/threads/NonBlockingThreadFactory.java b/core/src/main/java/org/infinispan/factories/threads/NonBlockingThreadFactory.java index ee4af01b2b9c..467084129c2e 100644 --- a/core/src/main/java/org/infinispan/factories/threads/NonBlockingThreadFactory.java +++ b/core/src/main/java/org/infinispan/factories/threads/NonBlockingThreadFactory.java @@ -1,21 +1,17 @@ package org.infinispan.factories.threads; +import org.infinispan.commons.ThreadGroups; import org.infinispan.commons.executors.NonBlockingResource; public class NonBlockingThreadFactory extends DefaultThreadFactory implements NonBlockingResource { - public NonBlockingThreadFactory(String threadGroupName, int initialPriority, String threadNamePattern, - String node, String component) { - super(new ISPNNonBlockingThreadGroup(threadGroupName), initialPriority, threadNamePattern, node, component); + + public NonBlockingThreadFactory(int initialPriority, String threadNamePattern, String node, String component) { + super(ThreadGroups.NON_BLOCKING_GROUP, initialPriority, threadNamePattern, node, component); } public NonBlockingThreadFactory(String name, String threadGroupName, int initialPriority, String threadNamePattern, String node, String component) { - super(name, new ISPNNonBlockingThreadGroup(threadGroupName), initialPriority, threadNamePattern, node, component); + super(name, new ThreadGroups.ISPNNonBlockingThreadGroup(threadGroupName), initialPriority, threadNamePattern, node, component); } - public static final class ISPNNonBlockingThreadGroup extends ThreadGroup implements NonBlockingResource { - public ISPNNonBlockingThreadGroup(String name) { - super(name); - } - } } diff --git a/core/src/main/java/org/infinispan/persistence/sifs/Compactor.java b/core/src/main/java/org/infinispan/persistence/sifs/Compactor.java index 53ce182b6706..51ce37c9d05c 100644 --- a/core/src/main/java/org/infinispan/persistence/sifs/Compactor.java +++ b/core/src/main/java/org/infinispan/persistence/sifs/Compactor.java @@ -346,6 +346,11 @@ public void accept(Object o) throws Throwable { } // Make sure we don't start another compaction for this file while performing expiration stats.setScheduled(); + // It is possible the log appender completed while we were compacting the file, if + // so we may need to resubmit the file to be compacted + if (stats.isCompleted() && stats.readyToBeScheduled(compactionThreshold, stats.free.get())) { + schedule(fileId, stats); + } } compactSingleFile(fileId, isLogFile, subscriber, currentTimeMilliseconds); } diff --git a/core/src/test/java/org/infinispan/persistence/sifs/SoftIndexFileStoreFileStatsTest.java b/core/src/test/java/org/infinispan/persistence/sifs/SoftIndexFileStoreFileStatsTest.java index 112ea3de80aa..aac4efd4dca3 100644 --- a/core/src/test/java/org/infinispan/persistence/sifs/SoftIndexFileStoreFileStatsTest.java +++ b/core/src/test/java/org/infinispan/persistence/sifs/SoftIndexFileStoreFileStatsTest.java @@ -2,6 +2,7 @@ import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; import static org.testng.AssertJUnit.assertTrue; import static org.testng.AssertJUnit.fail; @@ -328,4 +329,48 @@ public void testExpirationStats(Method m, boolean extraRemovedEntry) throws Inte TestingUtil.replaceComponent(cacheManager, BlockingManager.class, actualBlockingManager, true); } } + + public void testExpirationCompactionOnLogFile(Method m) throws InterruptedException { + String cacheName = m.getName(); + + ControlledTimeService controlledTimeService = defineCacheConfigurationAndInjectTimeService(cacheName); + try { + Cache cache = cacheManager.getCache(cacheName); + cache.start(); + + cache.put("expired", "bar", 10, TimeUnit.SECONDS); + + // This value will be overwritten in a loop later to cause log file to roll over + cache.put("replace-me", "1"); + + controlledTimeService.advance(TimeUnit.SECONDS.toMillis(11)); + + NonBlockingSoftIndexFileStore store = TestingUtil.getFirstStore(cache); + + Compactor compactor = TestingUtil.extractField(store, "compactor"); + + // We use a sync queue so we can block the compaction and insert values + SynchronousQueue queue = new SynchronousQueue<>(); + + MyCompactionObserver myCompactionObserver = new MyCompactionObserver(queue); + // This will be blocked by the expired entry above + compactor.performExpirationCompaction(myCompactionObserver); + + int increment = 2; + ConcurrentMap fileStats = compactor.getFileStats(); + while (extractCompletedStat(fileStats) == null) { + cache.put("replace-me", String.valueOf(increment++)); + } + + // Let compaction expiration complete now + assertNotNull(queue.poll(10, TimeUnit.SECONDS)); + + // Compactor should clean up the old file and it should just be a single entry for the removed entry + eventually(() -> "File stats are: " + fileStats + " and data directory size is: " + + SoftIndexFileStoreTestUtils.dataDirectorySize(tmpDirectory, cacheName) + , () -> SoftIndexFileStoreTestUtils.dataDirectorySize(tmpDirectory, cacheName) < 1000L); + } finally { + TestingUtil.replaceComponent(cacheManager, TimeService.class, controlledTimeService.getActualTimeService(), true); + } + } } diff --git a/core/src/test/java/org/infinispan/persistence/sifs/SoftIndexFileStoreTestUtils.java b/core/src/test/java/org/infinispan/persistence/sifs/SoftIndexFileStoreTestUtils.java new file mode 100644 index 000000000000..e257a05e6f29 --- /dev/null +++ b/core/src/test/java/org/infinispan/persistence/sifs/SoftIndexFileStoreTestUtils.java @@ -0,0 +1,19 @@ +package org.infinispan.persistence.sifs; + +import java.io.File; +import java.nio.file.FileSystems; +import java.nio.file.Path; + +public class SoftIndexFileStoreTestUtils { + + public static long dataDirectorySize(String tmpDirectory, String cacheName) { + Path dataPath = FileSystems.getDefault().getPath(tmpDirectory, "data", cacheName, "data"); + File[] dataFiles = dataPath.toFile().listFiles(); + + long length = 0; + for (File file : dataFiles) { + length += file.length(); + } + return length; + } +} diff --git a/core/src/test/java/org/infinispan/stress/AsyncStoreStressTest.java b/core/src/test/java/org/infinispan/stress/AsyncStoreStressTest.java index 1215aa0ee40f..948037d7352c 100644 --- a/core/src/test/java/org/infinispan/stress/AsyncStoreStressTest.java +++ b/core/src/test/java/org/infinispan/stress/AsyncStoreStressTest.java @@ -108,8 +108,7 @@ void startMarshaller() { nonBlockingExecutor = new ThreadPoolExecutor(0, ProcessorInfo.availableProcessors() * 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(KnownComponentNames.getDefaultQueueSize(KnownComponentNames.NON_BLOCKING_EXECUTOR)), - new NonBlockingThreadFactory("ISPN-non-blocking-thread-group", Thread.NORM_PRIORITY, - DefaultThreadFactory.DEFAULT_PATTERN, "Test", "non-blocking"), + new NonBlockingThreadFactory(Thread.NORM_PRIORITY, DefaultThreadFactory.DEFAULT_PATTERN, "Test", "non-blocking"), NonBlockingRejectedExecutionHandler.getInstance()); blockingExecutor = new ThreadPoolExecutor(0, 150, 60L, TimeUnit.SECONDS, diff --git a/persistence/remote/src/main/java/org/infinispan/persistence/remote/upgrade/MigrationTask.java b/persistence/remote/src/main/java/org/infinispan/persistence/remote/upgrade/MigrationTask.java index 13a133889e74..f9cf5daa890b 100644 --- a/persistence/remote/src/main/java/org/infinispan/persistence/remote/upgrade/MigrationTask.java +++ b/persistence/remote/src/main/java/org/infinispan/persistence/remote/upgrade/MigrationTask.java @@ -80,7 +80,7 @@ public MigrationTask(String cacheName, Set segments, int readBatch, int @Override public Integer apply(EmbeddedCacheManager embeddedCacheManager) { AtomicInteger counter = new AtomicInteger(0); - DefaultThreadFactory threadFactory = new BlockingThreadFactory(null, 1, THREAD_NAME + "-%t", null, null); + DefaultThreadFactory threadFactory = new BlockingThreadFactory(1, THREAD_NAME + "-%t", null, null); ExecutorService executorService = Executors.newFixedThreadPool(threads, threadFactory); RemoveListener listener = null; AdvancedCache advancedCache = embeddedCacheManager.getCache(cacheName).getAdvancedCache(); diff --git a/server/core/src/main/java/org/infinispan/server/core/factories/NettyEventLoopFactory.java b/server/core/src/main/java/org/infinispan/server/core/factories/NettyEventLoopFactory.java index a95c006efa5e..0544fa199e8b 100644 --- a/server/core/src/main/java/org/infinispan/server/core/factories/NettyEventLoopFactory.java +++ b/server/core/src/main/java/org/infinispan/server/core/factories/NettyEventLoopFactory.java @@ -25,9 +25,9 @@ public class NettyEventLoopFactory extends AbstractComponentFactory implements A public Object construct(String componentName) { ThreadFactory threadFactory = globalConfiguration.nonBlockingThreadPool().threadFactory(); if (threadFactory == null) { - threadFactory = new NonBlockingThreadFactory("ISPN-non-blocking-thread-group", - getDefaultThreadPrio(KnownComponentNames.NON_BLOCKING_EXECUTOR), DefaultThreadFactory.DEFAULT_PATTERN, - globalConfiguration.transport().nodeName(), shortened(KnownComponentNames.NON_BLOCKING_EXECUTOR)); + threadFactory = new NonBlockingThreadFactory(getDefaultThreadPrio(KnownComponentNames.NON_BLOCKING_EXECUTOR), + DefaultThreadFactory.DEFAULT_PATTERN, globalConfiguration.transport().nodeName(), + shortened(KnownComponentNames.NON_BLOCKING_EXECUTOR)); } ThreadPoolExecutorFactory tpef = globalConfiguration.nonBlockingThreadPool().threadPoolFactory();