Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[13.0] Backport SIFS data files left around and multiple ThreadGroups #11575

Merged
merged 4 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import org.infinispan.client.hotrod.impl.ConfigurationProperties;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.ThreadGroups;
import org.infinispan.commons.executors.ExecutorFactory;
import org.infinispan.commons.executors.NonBlockingResource;

/**
* Default implementation for {@link org.infinispan.commons.executors.ExecutorFactory} based on an {@link
Expand All @@ -36,7 +36,7 @@ public ThreadPoolExecutor getExecutor(Properties p) {
int factoryIndex = DefaultAsyncExecutorFactory.factoryCounter.incrementAndGet();
String threadNamePrefix = cp.getDefaultExecutorFactoryThreadNamePrefix();
String threadNameSuffix = cp.getDefaultExecutorFactoryThreadNameSuffix();
ISPNNonBlockingThreadGroup nonBlockingThreadGroup = new ISPNNonBlockingThreadGroup(threadNamePrefix + "-group");
ThreadGroups.ISPNNonBlockingThreadGroup nonBlockingThreadGroup = ThreadGroups.NON_BLOCKING_GROUP;
ThreadFactory tf = r -> {
int threadIndex = threadCounter.incrementAndGet();
Thread th = new Thread(nonBlockingThreadGroup, r, threadNamePrefix + "-" + factoryIndex + "-" + threadIndex + threadNameSuffix);
Expand All @@ -53,9 +53,4 @@ public ThreadPoolExecutor getExecutor(Properties p) {
});
}

static final class ISPNNonBlockingThreadGroup extends ThreadGroup implements NonBlockingResource {
ISPNNonBlockingThreadGroup(String name) {
super(name);
}
}
}
22 changes: 22 additions & 0 deletions commons/all/src/main/java/org/infinispan/commons/ThreadGroups.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.infinispan.commons;

import org.infinispan.commons.executors.BlockingResource;
import org.infinispan.commons.executors.NonBlockingResource;

public interface ThreadGroups {

ISPNNonBlockingThreadGroup NON_BLOCKING_GROUP = new ISPNNonBlockingThreadGroup("ISPN-non-blocking-group");
ISPNBlockingThreadGroup BLOCKING_GROUP = new ISPNBlockingThreadGroup("ISPN-blocking-group");

final class ISPNNonBlockingThreadGroup extends ThreadGroup implements NonBlockingResource {
public ISPNNonBlockingThreadGroup(String name) {
super(name);
}
}

final class ISPNBlockingThreadGroup extends ThreadGroup implements BlockingResource {
public ISPNBlockingThreadGroup(String name) {
super(name);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.infinispan.commons.ThreadGroups;
import org.infinispan.commons.executors.SecurityAwareExecutorFactory;
import org.infinispan.commons.util.TypedProperties;
import org.infinispan.factories.threads.BlockingThreadFactory;
import org.infinispan.factories.threads.NonBlockingThreadFactory;

/**
* Default executor factory that creates executors using the JDK Executors service.
Expand Down Expand Up @@ -46,8 +45,8 @@ public ExecutorService getExecutor(Properties p, final AccessControlContext cont
if (blocking == null) {
threadGroup = Thread.currentThread().getThreadGroup();
} else {
threadGroup = Boolean.parseBoolean(blocking) ? new BlockingThreadFactory.ISPNBlockingThreadGroup(threadNamePrefix + "-group") :
new NonBlockingThreadFactory.ISPNNonBlockingThreadGroup(threadNamePrefix + "-group");
threadGroup = Boolean.parseBoolean(blocking) ? ThreadGroups.BLOCKING_GROUP :
ThreadGroups.NON_BLOCKING_GROUP;
}
BlockingQueue<Runnable> queue = queueSize == 0 ? new SynchronousQueue<>()
: new LinkedBlockingQueue<>(queueSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ private ThreadFactory createThreadFactoryWithDefaults(GlobalConfiguration global
ExecutorServiceType type) {
switch (type) {
case BLOCKING:
return new BlockingThreadFactory("ISPN-blocking-thread-group", getDefaultThreadPrio(componentName),
DefaultThreadFactory.DEFAULT_PATTERN, globalCfg.transport().nodeName(), shortened(componentName));
return new BlockingThreadFactory(getDefaultThreadPrio(componentName), DefaultThreadFactory.DEFAULT_PATTERN,
globalCfg.transport().nodeName(), shortened(componentName));
case NON_BLOCKING:
return new NonBlockingThreadFactory("ISPN-non-blocking-thread-group", getDefaultThreadPrio(componentName),
return new NonBlockingThreadFactory(getDefaultThreadPrio(componentName),
DefaultThreadFactory.DEFAULT_PATTERN, globalCfg.transport().nodeName(), shortened(componentName));
default:
// Use defaults
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
package org.infinispan.factories.threads;

import org.infinispan.commons.ThreadGroups;
import org.infinispan.commons.executors.BlockingResource;

public class BlockingThreadFactory extends DefaultThreadFactory implements BlockingResource {
public BlockingThreadFactory(String threadGroupName, int initialPriority, String threadNamePattern,
String node, String component) {
super(new ISPNBlockingThreadGroup(threadGroupName), initialPriority, threadNamePattern, node, component);

public BlockingThreadFactory(int initialPriority, String threadNamePattern, String node, String component) {
super(ThreadGroups.BLOCKING_GROUP, initialPriority, threadNamePattern, node, component);
}

public BlockingThreadFactory(String name, String threadGroupName, int initialPriority,
String threadNamePattern, String node, String component) {
super(name, new ISPNBlockingThreadGroup(threadGroupName), initialPriority, threadNamePattern, node, component);
super(name, new ThreadGroups.ISPNBlockingThreadGroup(threadGroupName), initialPriority, threadNamePattern, node, component);
}

public static final class ISPNBlockingThreadGroup extends ThreadGroup implements BlockingResource {
public ISPNBlockingThreadGroup(String name) {
super(name);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
package org.infinispan.factories.threads;

import org.infinispan.commons.ThreadGroups;
import org.infinispan.commons.executors.NonBlockingResource;

public class NonBlockingThreadFactory extends DefaultThreadFactory implements NonBlockingResource {
public NonBlockingThreadFactory(String threadGroupName, int initialPriority, String threadNamePattern,
String node, String component) {
super(new ISPNNonBlockingThreadGroup(threadGroupName), initialPriority, threadNamePattern, node, component);

public NonBlockingThreadFactory(int initialPriority, String threadNamePattern, String node, String component) {
super(ThreadGroups.NON_BLOCKING_GROUP, initialPriority, threadNamePattern, node, component);
}

public NonBlockingThreadFactory(String name, String threadGroupName, int initialPriority,
String threadNamePattern, String node, String component) {
super(name, new ISPNNonBlockingThreadGroup(threadGroupName), initialPriority, threadNamePattern, node, component);
super(name, new ThreadGroups.ISPNNonBlockingThreadGroup(threadGroupName), initialPriority, threadNamePattern, node, component);
}

public static final class ISPNNonBlockingThreadGroup extends ThreadGroup implements NonBlockingResource {
public ISPNNonBlockingThreadGroup(String name) {
super(name);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ public void accept(Object o) throws Throwable {
}
// Make sure we don't start another compaction for this file while performing expiration
stats.setScheduled();
// It is possible the log appender completed while we were compacting the file, if
// so we may need to resubmit the file to be compacted
if (stats.isCompleted() && stats.readyToBeScheduled(compactionThreshold, stats.free.get())) {
schedule(fileId, stats);
}
}
compactSingleFile(fileId, isLogFile, subscriber, currentTimeMilliseconds);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
import static org.testng.AssertJUnit.assertTrue;
import static org.testng.AssertJUnit.fail;
Expand Down Expand Up @@ -328,4 +329,48 @@ public void testExpirationStats(Method m, boolean extraRemovedEntry) throws Inte
TestingUtil.replaceComponent(cacheManager, BlockingManager.class, actualBlockingManager, true);
}
}

public void testExpirationCompactionOnLogFile(Method m) throws InterruptedException {
String cacheName = m.getName();

ControlledTimeService controlledTimeService = defineCacheConfigurationAndInjectTimeService(cacheName);
try {
Cache<String, Object> cache = cacheManager.getCache(cacheName);
cache.start();

cache.put("expired", "bar", 10, TimeUnit.SECONDS);

// This value will be overwritten in a loop later to cause log file to roll over
cache.put("replace-me", "1");

controlledTimeService.advance(TimeUnit.SECONDS.toMillis(11));

NonBlockingSoftIndexFileStore store = TestingUtil.getFirstStore(cache);

Compactor compactor = TestingUtil.extractField(store, "compactor");

// We use a sync queue so we can block the compaction and insert values
SynchronousQueue<Object> queue = new SynchronousQueue<>();

MyCompactionObserver myCompactionObserver = new MyCompactionObserver(queue);
// This will be blocked by the expired entry above
compactor.performExpirationCompaction(myCompactionObserver);

int increment = 2;
ConcurrentMap<Integer, Compactor.Stats> fileStats = compactor.getFileStats();
while (extractCompletedStat(fileStats) == null) {
cache.put("replace-me", String.valueOf(increment++));
}

// Let compaction expiration complete now
assertNotNull(queue.poll(10, TimeUnit.SECONDS));

// Compactor should clean up the old file and it should just be a single entry for the removed entry
eventually(() -> "File stats are: " + fileStats + " and data directory size is: " +
SoftIndexFileStoreTestUtils.dataDirectorySize(tmpDirectory, cacheName)
, () -> SoftIndexFileStoreTestUtils.dataDirectorySize(tmpDirectory, cacheName) < 1000L);
} finally {
TestingUtil.replaceComponent(cacheManager, TimeService.class, controlledTimeService.getActualTimeService(), true);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package org.infinispan.persistence.sifs;

import java.io.File;
import java.nio.file.FileSystems;
import java.nio.file.Path;

public class SoftIndexFileStoreTestUtils {

public static long dataDirectorySize(String tmpDirectory, String cacheName) {
Path dataPath = FileSystems.getDefault().getPath(tmpDirectory, "data", cacheName, "data");
File[] dataFiles = dataPath.toFile().listFiles();

long length = 0;
for (File file : dataFiles) {
length += file.length();
}
return length;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ void startMarshaller() {
nonBlockingExecutor = new ThreadPoolExecutor(0, ProcessorInfo.availableProcessors() * 2,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(KnownComponentNames.getDefaultQueueSize(KnownComponentNames.NON_BLOCKING_EXECUTOR)),
new NonBlockingThreadFactory("ISPN-non-blocking-thread-group", Thread.NORM_PRIORITY,
DefaultThreadFactory.DEFAULT_PATTERN, "Test", "non-blocking"),
new NonBlockingThreadFactory(Thread.NORM_PRIORITY, DefaultThreadFactory.DEFAULT_PATTERN, "Test", "non-blocking"),
NonBlockingRejectedExecutionHandler.getInstance());
blockingExecutor = new ThreadPoolExecutor(0, 150,
60L, TimeUnit.SECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public MigrationTask(String cacheName, Set<Integer> segments, int readBatch, int
@Override
public Integer apply(EmbeddedCacheManager embeddedCacheManager) {
AtomicInteger counter = new AtomicInteger(0);
DefaultThreadFactory threadFactory = new BlockingThreadFactory(null, 1, THREAD_NAME + "-%t", null, null);
DefaultThreadFactory threadFactory = new BlockingThreadFactory(1, THREAD_NAME + "-%t", null, null);
ExecutorService executorService = Executors.newFixedThreadPool(threads, threadFactory);
RemoveListener listener = null;
AdvancedCache<Object, Object> advancedCache = embeddedCacheManager.getCache(cacheName).getAdvancedCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class NettyEventLoopFactory extends AbstractComponentFactory implements A
public Object construct(String componentName) {
ThreadFactory threadFactory = globalConfiguration.nonBlockingThreadPool().threadFactory();
if (threadFactory == null) {
threadFactory = new NonBlockingThreadFactory("ISPN-non-blocking-thread-group",
getDefaultThreadPrio(KnownComponentNames.NON_BLOCKING_EXECUTOR), DefaultThreadFactory.DEFAULT_PATTERN,
globalConfiguration.transport().nodeName(), shortened(KnownComponentNames.NON_BLOCKING_EXECUTOR));
threadFactory = new NonBlockingThreadFactory(getDefaultThreadPrio(KnownComponentNames.NON_BLOCKING_EXECUTOR),
DefaultThreadFactory.DEFAULT_PATTERN, globalConfiguration.transport().nodeName(),
shortened(KnownComponentNames.NON_BLOCKING_EXECUTOR));
}

ThreadPoolExecutorFactory<?> tpef = globalConfiguration.nonBlockingThreadPool().threadPoolFactory();
Expand Down