Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure the PoolBasedSequentialScheduledExecutorService does keep a minimum size #4288

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -29,16 +28,18 @@
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.core.internal.common.WrappedScheduledExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A ScheduledExecutorService that will sequentially perform the tasks like a
Expand All @@ -55,20 +56,73 @@
@NonNullByDefault
final class PoolBasedSequentialScheduledExecutorService implements ScheduledExecutorService {

private static final WeakHashMap<ScheduledThreadPoolExecutor, @NonNull AtomicInteger> PENDING_BY_POOL = new WeakHashMap<>();
static class BasePoolExecutor extends WrappedScheduledExecutorService {

protected final Logger logger = LoggerFactory.getLogger(BasePoolExecutor.class);

private final String threadPoolName;
private final AtomicInteger pending;
private volatile int minimumPoolSize;

public BasePoolExecutor(String threadPoolName, int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);

this.threadPoolName = threadPoolName;
// set to one does ensure at least one thread more than tasks running
this.pending = new AtomicInteger(1);
}

public synchronized void resizePool(int mandatoryPoolSize) {
int corePoolSize = getCorePoolSize();

if (minimumPoolSize > mandatoryPoolSize) {
mandatoryPoolSize = minimumPoolSize;
}

if (mandatoryPoolSize > corePoolSize) {
// two more than needed, they will time out if there is no work for them im time
setMaximumPoolSize(mandatoryPoolSize + 2);
setCorePoolSize(mandatoryPoolSize);
} else if (mandatoryPoolSize < corePoolSize) {
setCorePoolSize(mandatoryPoolSize);
// ensure we drop not needed threads, this is only needed under higher load when none of the
// started threads have a chance to timeout
setMaximumPoolSize(mandatoryPoolSize + 2);
}
}

public int getMinimumPoolSize() {
return minimumPoolSize;
}

public void setMinimumPoolSize(int minimumPoolSize) {
this.minimumPoolSize = minimumPoolSize;

resizePool(getCorePoolSize());
}

@Override
public void shutdown() {
logger.warn("shutdown() invoked on a shared thread pool '{}'. This is a bug, please submit a bug report",
threadPoolName, new IllegalStateException());
}

@Override
@NonNullByDefault({})
public List<Runnable> shutdownNow() {
logger.warn("shutdownNow() invoked on a shared thread pool '{}'. This is a bug, please submit a bug report",
threadPoolName, new IllegalStateException());
return List.of();
}
}

private final WorkQueueEntry empty;
private final ScheduledThreadPoolExecutor pool;
private final AtomicInteger pending;
private final BasePoolExecutor pool;
private final List<RunnableFuture<?>> scheduled;
private final ScheduledFuture<?> cleaner;
private @Nullable WorkQueueEntry tail;

public PoolBasedSequentialScheduledExecutorService(ScheduledThreadPoolExecutor pool) {
if (pool.getMaximumPoolSize() != Integer.MAX_VALUE) {
throw new IllegalArgumentException("the pool must scale unlimited to avoid potential dead locks!");
}

public PoolBasedSequentialScheduledExecutorService(BasePoolExecutor pool) {
this.pool = pool;

// prepare the WorkQueueEntry we are using when no tasks are pending
Expand All @@ -81,20 +135,6 @@ public PoolBasedSequentialScheduledExecutorService(ScheduledThreadPoolExecutor p

tail = empty;

// we need one pending counter per pool
synchronized (PENDING_BY_POOL) {
AtomicInteger fromCache = PENDING_BY_POOL.get(pool);

if (fromCache == null) {
// set to one does ensure at least one thread more than tasks running
fromCache = new AtomicInteger(1);

PENDING_BY_POOL.put(pool, fromCache);
}

pending = fromCache;
}

// clean up to ensure we do not keep references to old tasks
cleaner = this.scheduleWithFixedDelay(() -> {
synchronized (this) {
Expand Down Expand Up @@ -197,6 +237,10 @@ private <V> ScheduledFuture<V> schedule(
@Override
public void shutdown() {
synchronized (this) {
if (tail == null) {
return;
}

cleaner.cancel(false);
scheduled.removeIf((sf) -> {
sf.cancel(false);
Expand Down Expand Up @@ -302,7 +346,7 @@ private <T> CompletableFuture<T> submitToWorkQueue(@Nullable RunnableFuture<?> o
// a small hack to throw the Exception unchecked
throw PoolBasedSequentialScheduledExecutorService.unchecked(ex);
} finally {
pending.decrementAndGet();
pool.pending.decrementAndGet();
}
};

Expand All @@ -314,8 +358,8 @@ private <T> CompletableFuture<T> submitToWorkQueue(@Nullable RunnableFuture<?> o
throw new RejectedExecutionException("this scheduled executor has been shutdown before");
}

// set the core pool size even if it does not change, this triggers idle threads to stop
pool.setCorePoolSize(pending.incrementAndGet());
var mandatoryPoolSize = pool.pending.incrementAndGet();
pool.resizePool(mandatoryPoolSize);

// avoid waiting for one pool thread to finish inside a pool thread
runNow = inPool && tail.future.isDone();
Expand All @@ -335,7 +379,7 @@ private <T> CompletableFuture<T> submitToWorkQueue(@Nullable RunnableFuture<?> o
try {
cf.run();
} finally {
pending.decrementAndGet();
pool.pending.decrementAndGet();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.openhab.core.common.PoolBasedSequentialScheduledExecutorService.BasePoolExecutor;
import org.openhab.core.internal.common.WrappedScheduledExecutorService;
import org.osgi.framework.Constants;
import org.osgi.service.component.ComponentConstants;
Expand Down Expand Up @@ -100,7 +101,10 @@ protected void modified(Map<String, Object> properties) {
Integer poolSize = Integer.valueOf(string);
configs.put(poolName, poolSize);
ThreadPoolExecutor pool = (ThreadPoolExecutor) pools.get(poolName);
if (pool instanceof ScheduledThreadPoolExecutor) {
if (pool instanceof BasePoolExecutor basePool) {
basePool.setMinimumPoolSize(poolSize);
LOGGER.debug("Updated scheduled thread pool '{}' to minimum size {}", poolName, poolSize);
} else if (pool instanceof ScheduledThreadPoolExecutor) {
pool.setCorePoolSize(poolSize);
LOGGER.debug("Updated scheduled thread pool '{}' to size {}", poolName, poolSize);
} else if (pool instanceof QueueingThreadPoolExecutor) {
Expand Down Expand Up @@ -129,9 +133,22 @@ protected void modified(Map<String, Object> properties) {
public static ScheduledExecutorService getPoolBasedSequentialScheduledExecutorService(String poolName,
String threadName) {
if (configs.getOrDefault(poolName, 0) > 0) {
ScheduledThreadPoolExecutor pool = getScheduledPoolUnwrapped(poolName);

return new PoolBasedSequentialScheduledExecutorService((ScheduledThreadPoolExecutor) pool);
ExecutorService pool = pools.computeIfAbsent(poolName, name -> {
int cfg = getConfig(name);
ScheduledThreadPoolExecutor executor = new BasePoolExecutor(name, cfg,
new NamedThreadFactory(name, true, Thread.NORM_PRIORITY));
executor.setKeepAliveTime(THREAD_TIMEOUT, TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(true);
executor.setRemoveOnCancelPolicy(true);
LOGGER.debug("Created scheduled pool based thread pool '{}' of size {}", name, cfg);
return executor;
});

if (pool instanceof BasePoolExecutor service) {
return new PoolBasedSequentialScheduledExecutorService(service);
} else {
throw new IllegalArgumentException("Pool " + poolName + " is not a base pool!");
}
} else {
return Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(threadName));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void testSchedulingDoesSpawnNewThreads() throws InterruptedException {
block.countDown();
}, 20, TimeUnit.MILLISECONDS);

assertTrue(check.await(80, TimeUnit.MILLISECONDS));
assertTrue(check.await(800, TimeUnit.MILLISECONDS));
}
}

Expand Down