Skip to content

Commit

Permalink
Merge pull request #59 from dmlloyd/threadpool
Browse files Browse the repository at this point in the history
[MSC-194] Introduce new thread pool implementation
  • Loading branch information
ropalka committed Nov 3, 2017
2 parents 71d1722 + 17afef0 commit 9aacc7d
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 10 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -42,10 +42,16 @@
<version.jboss-logging>3.1.3.GA</version.jboss-logging>
<version.jboss-logging-processor>1.1.0.Final</version.jboss-logging-processor>
<version.jboss-modules>1.2.2.Final</version.jboss-modules>
<version.org.jboss.threads>2.3.0.Beta1</version.org.jboss.threads>
<version.junit>4.8.2</version.junit>
</properties>

<dependencies>
<dependency>
<groupId>org.jboss.threads</groupId>
<artifactId>jboss-threads</artifactId>
<version>${version.org.jboss.threads}</version>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>
Expand Down
96 changes: 86 additions & 10 deletions src/main/java/org/jboss/msc/service/ServiceContainerImpl.java
Expand Up @@ -44,14 +44,19 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import javax.management.MBeanServer;
Expand All @@ -68,6 +73,7 @@
import org.jboss.msc.service.management.ServiceContainerMXBean;
import org.jboss.msc.service.management.ServiceStatus;
import org.jboss.msc.value.InjectedValue;
import org.jboss.threads.EnhancedQueueExecutor;

/**
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
Expand Down Expand Up @@ -876,27 +882,97 @@ public ServiceThread run() {
}


final class ContainerExecutor extends ThreadPoolExecutor {
final class ContainerExecutor implements ExecutorService {

private final ExecutorService delegate;

ContainerExecutor(final int corePoolSize, final int maximumPoolSize, final long keepAliveTime, final TimeUnit unit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
final ThreadFactory threadFactory = new ThreadFactory() {
private final int id = executorSeq.getAndIncrement();
private final AtomicInteger threadSeq = new AtomicInteger(1);

public Thread newThread(final Runnable r) {
return doPrivileged(new ThreadAction(r, id, threadSeq));
}
}, POLICY);
}
};
if (EnhancedQueueExecutor.DISABLE_HINT) {
delegate = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(), threadFactory, POLICY) {
protected void afterExecute(final Runnable r, final Throwable t) {
super.afterExecute(r, t);
if (t != null) {
HANDLER.uncaughtException(Thread.currentThread(), t);
}
}

protected void afterExecute(final Runnable r, final Throwable t) {
super.afterExecute(r, t);
if (t != null) {
HANDLER.uncaughtException(Thread.currentThread(), t);
protected void terminated() {
shutdownComplete(shutdownInitiated);
}
};
} else {
delegate = new EnhancedQueueExecutor.Builder()
.setCorePoolSize(corePoolSize)
.setMaximumPoolSize(maximumPoolSize)
.setKeepAliveTime(keepAliveTime, unit)
.setTerminationTask(new Runnable() {
public void run() {
shutdownComplete(shutdownInitiated);
}
})
.setThreadFactory(threadFactory)
.build();
}
}

protected void terminated() {
shutdownComplete(shutdownInitiated);
public void shutdown() {
delegate.shutdown();
}

public List<Runnable> shutdownNow() {
return delegate.shutdownNow();
}

public boolean isShutdown() {
return delegate.isShutdown();
}

public boolean isTerminated() {
return delegate.isTerminated();
}

public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
return delegate.awaitTermination(timeout, unit);
}

public <T> Future<T> submit(final Callable<T> task) {
return delegate.submit(task);
}

public <T> Future<T> submit(final Runnable task, final T result) {
return delegate.submit(task, result);
}

public Future<?> submit(final Runnable task) {
return delegate.submit(task);
}

public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks) throws InterruptedException {
return delegate.invokeAll(tasks);
}

public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException {
return delegate.invokeAll(tasks, timeout, unit);
}

public <T> T invokeAny(final Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return delegate.invokeAny(tasks);
}

public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.invokeAny(tasks, timeout, unit);
}

public void execute(final Runnable command) {
delegate.execute(command);
}
}
}

0 comments on commit 9aacc7d

Please sign in to comment.