Skip to content

Commit

Permalink
ISPN-10363 LazyInitializingExecutorService is not thread-safe
Browse files Browse the repository at this point in the history
* Remove cached executors from NamedExecutorFactory
* Move the @Stop methods to the executors themselves
* Don't allow lazy executors to start after stopping
  • Loading branch information
danberindei authored and wburns committed Jun 28, 2019
1 parent b9397ea commit 76eca3f
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 102 deletions.
@@ -1,7 +1,6 @@
package org.infinispan.executors;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -13,23 +12,36 @@

import org.infinispan.commons.executors.ThreadPoolExecutorFactory;
import org.infinispan.commons.time.TimeService;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.util.concurrent.BlockingRunnable;
import org.infinispan.util.concurrent.BlockingTaskAwareExecutorService;
import org.infinispan.util.concurrent.BlockingTaskAwareExecutorServiceImpl;
import org.infinispan.util.concurrent.WithinThreadExecutor;

/**
* A delegating executor that lazily constructs and initializes the underlying executor.
*
* @author Pedro Ruivo
* @since 5.3
*/
public final class LazyInitializingBlockingTaskAwareExecutorService extends ManageableExecutorService<ExecutorService> implements BlockingTaskAwareExecutorService {
@Scope(Scopes.GLOBAL)
public final class LazyInitializingBlockingTaskAwareExecutorService
extends ManageableExecutorService<ExecutorService> implements BlockingTaskAwareExecutorService {

private static final BlockingTaskAwareExecutorService STOPPED;

static {
STOPPED = new BlockingTaskAwareExecutorServiceImpl("", new WithinThreadExecutor(), null);
STOPPED.shutdown();
}

private final ThreadPoolExecutorFactory<ExecutorService> executorFactory;
private final ThreadFactory threadFactory;
private final TimeService timeService;
private final String controllerThreadName;
private volatile BlockingTaskAwareExecutorServiceImpl blockingExecutor;
private volatile BlockingTaskAwareExecutorService blockingExecutor;

public LazyInitializingBlockingTaskAwareExecutorService(ThreadPoolExecutorFactory<ExecutorService> executorFactory,
ThreadFactory threadFactory,
Expand All @@ -55,15 +67,23 @@ public void checkForReadyTasks() {

@Override
public void shutdown() {
if (blockingExecutor != null) blockingExecutor.shutdown();
synchronized (this) {
if (blockingExecutor == null) {
blockingExecutor = STOPPED;
}
blockingExecutor.shutdown();
}
}

@Stop
@Override
public List<Runnable> shutdownNow() {
if (blockingExecutor == null)
return Collections.emptyList();
else
synchronized (this) {
if (blockingExecutor == null) {
blockingExecutor = STOPPED;
}
return blockingExecutor.shutdownNow();
}
}

@Override
Expand Down
@@ -1,7 +1,6 @@
package org.infinispan.executors;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -12,6 +11,10 @@
import java.util.concurrent.TimeoutException;

import org.infinispan.commons.executors.ThreadPoolExecutorFactory;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.util.concurrent.WithinThreadExecutor;

/**
* A delegating executor that lazily constructs and initializes the underlying executor, since unused JDK executors
Expand All @@ -20,7 +23,15 @@
* @author Manik Surtani
* @since 5.1
*/
@Scope(Scopes.GLOBAL)
public final class LazyInitializingExecutorService extends ManageableExecutorService<ExecutorService> implements ExecutorService {
private static final ExecutorService STOPPED;

static {
STOPPED = new WithinThreadExecutor();
STOPPED.shutdown();
}

private final ThreadPoolExecutorFactory<ExecutorService> executorFactory;
private final ThreadFactory threadFactory;

Expand All @@ -42,15 +53,23 @@ private void initIfNeeded() {

@Override
public void shutdown() {
if (executor != null) executor.shutdown();
synchronized (this) {
if (executor == null) {
executor = STOPPED;
}
executor.shutdown();
}
}

@Stop
@Override
public List<Runnable> shutdownNow() {
if (executor == null)
return Collections.emptyList();
else
synchronized (this) {
if (executor == null) {
executor = STOPPED;
}
return executor.shutdownNow();
}
}

@Override
Expand Down
@@ -1,18 +1,21 @@
package org.infinispan.executors;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.infinispan.commons.executors.ThreadPoolExecutorFactory;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;

/**
* A delegating scheduled executor that lazily constructs and initializes the underlying scheduled executor, since
Expand All @@ -21,7 +24,15 @@
* @author Manik Surtani
* @since 5.1
*/
@Scope(Scopes.GLOBAL)
public class LazyInitializingScheduledExecutorService extends ManageableExecutorService<ScheduledExecutorService> implements ScheduledExecutorService {
private static final ScheduledExecutorService STOPPED;

static {
STOPPED = new ScheduledThreadPoolExecutor(0);
STOPPED.shutdown();
}

private final ThreadPoolExecutorFactory<ScheduledExecutorService> executorFactory;
private final ThreadFactory threadFactory;

Expand All @@ -43,15 +54,23 @@ private void initIfNeeded() {

@Override
public void shutdown() {
if (executor != null) executor.shutdown();
synchronized (this) {
if (executor == null) {
executor = STOPPED;
}
executor.shutdown();
}
}

@Stop
@Override
public List<Runnable> shutdownNow() {
if (executor == null)
return Collections.emptyList();
else
synchronized (this) {
if (executor == null) {
executor = STOPPED;
}
return executor.shutdownNow();
}
}

@Override
Expand Down
Expand Up @@ -12,7 +12,6 @@
import static org.infinispan.factories.KnownComponentNames.shortened;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

import org.infinispan.commons.CacheConfigurationException;
Expand All @@ -25,9 +24,7 @@
import org.infinispan.executors.LazyInitializingExecutorService;
import org.infinispan.executors.LazyInitializingScheduledExecutorService;
import org.infinispan.factories.annotations.DefaultFactoryFor;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.threads.DefaultThreadFactory;
import org.infinispan.util.concurrent.BlockingTaskAwareExecutorService;

/**
* A factory that specifically knows how to create named executors.
Expand All @@ -39,98 +36,47 @@
@DefaultFactoryFor(names = {ASYNC_TRANSPORT_EXECUTOR, ASYNC_NOTIFICATION_EXECUTOR, PERSISTENCE_EXECUTOR, ASYNC_OPERATIONS_EXECUTOR,
EXPIRATION_SCHEDULED_EXECUTOR, REMOTE_COMMAND_EXECUTOR, STATE_TRANSFER_EXECUTOR, TIMEOUT_SCHEDULE_EXECUTOR})
public class NamedExecutorsFactory extends AbstractComponentFactory implements AutoInstantiableFactory {

private ExecutorService notificationExecutor;
private ExecutorService asyncTransportExecutor;
private ExecutorService persistenceExecutor;
private BlockingTaskAwareExecutorService remoteCommandsExecutor;
private ScheduledExecutorService expirationExecutor;
private ExecutorService stateTransferExecutor;
private ExecutorService asyncOperationsExecutor;
private ScheduledExecutorService timeoutExecutor;

@Override
@SuppressWarnings("unchecked")
public Object construct(String componentName) {
try {
// Construction happens only on startup of either CacheManager, or Cache, so
// using synchronized protection does not have a great impact on app performance.
if (componentName.equals(ASYNC_NOTIFICATION_EXECUTOR)) {
synchronized (this) {
if (notificationExecutor == null) {
notificationExecutor = createExecutorService(
return createExecutorService(
globalConfiguration.listenerThreadPool(),
ASYNC_NOTIFICATION_EXECUTOR,
ExecutorServiceType.DEFAULT);
}
}
return notificationExecutor;
} else if (componentName.equals(PERSISTENCE_EXECUTOR)) {
synchronized (this) {
if (persistenceExecutor == null) {
persistenceExecutor = createExecutorService(
return createExecutorService(
globalConfiguration.persistenceThreadPool(),
PERSISTENCE_EXECUTOR,
ExecutorServiceType.DEFAULT);
}
}
return persistenceExecutor;
} else if (componentName.equals(ASYNC_TRANSPORT_EXECUTOR)) {
synchronized (this) {
if (asyncTransportExecutor == null) {
asyncTransportExecutor = createExecutorService(
return createExecutorService(
globalConfiguration.transport().transportThreadPool(),
ASYNC_TRANSPORT_EXECUTOR,
ExecutorServiceType.DEFAULT);
}
}
return asyncTransportExecutor;
} else if (componentName.equals(EXPIRATION_SCHEDULED_EXECUTOR)) {
synchronized (this) {
if (expirationExecutor == null) {
expirationExecutor = createExecutorService(
return createExecutorService(
globalConfiguration.expirationThreadPool(),
EXPIRATION_SCHEDULED_EXECUTOR,
ExecutorServiceType.SCHEDULED);
}
}
return expirationExecutor;
} else if (componentName.equals(REMOTE_COMMAND_EXECUTOR)) {
synchronized (this) {
if (remoteCommandsExecutor == null) {
remoteCommandsExecutor = createExecutorService(
return createExecutorService(
globalConfiguration.transport().remoteCommandThreadPool(),
REMOTE_COMMAND_EXECUTOR,
ExecutorServiceType.BLOCKING);
}
}
return remoteCommandsExecutor;
} else if (componentName.equals(STATE_TRANSFER_EXECUTOR)) {
synchronized (this) {
if (stateTransferExecutor == null) {
stateTransferExecutor = createExecutorService(
return createExecutorService(
globalConfiguration.stateTransferThreadPool(),
STATE_TRANSFER_EXECUTOR,
ExecutorServiceType.DEFAULT);
}
}
return stateTransferExecutor;
} else if (componentName.equals(ASYNC_OPERATIONS_EXECUTOR)) {
synchronized (this) {
if (asyncOperationsExecutor == null) {
asyncOperationsExecutor = createExecutorService(
return createExecutorService(
globalConfiguration.asyncThreadPool(),
ASYNC_OPERATIONS_EXECUTOR, ExecutorServiceType.DEFAULT);
}
}
return asyncOperationsExecutor;
} else if (componentName.endsWith(TIMEOUT_SCHEDULE_EXECUTOR)) {
synchronized (this) {
if (timeoutExecutor == null) {
timeoutExecutor = createExecutorService(null, TIMEOUT_SCHEDULE_EXECUTOR, ExecutorServiceType.SCHEDULED);
}
}
return timeoutExecutor;
return createExecutorService(null, TIMEOUT_SCHEDULE_EXECUTOR, ExecutorServiceType.SCHEDULED);
} else {
throw new CacheConfigurationException("Unknown named executor " + componentName);
}
Expand All @@ -141,19 +87,6 @@ public Object construct(String componentName) {
}
}

@Stop(priority = 999)
public void stop() {
// TODO Dan: awaitTermination()?
if (remoteCommandsExecutor != null) remoteCommandsExecutor.shutdownNow();
if (notificationExecutor != null) notificationExecutor.shutdownNow();
if (persistenceExecutor != null) persistenceExecutor.shutdownNow();
if (asyncTransportExecutor != null) asyncTransportExecutor.shutdownNow();
if (expirationExecutor != null) expirationExecutor.shutdownNow();
if (stateTransferExecutor != null) stateTransferExecutor.shutdownNow();
if (timeoutExecutor != null) timeoutExecutor.shutdownNow();
if (asyncOperationsExecutor != null) asyncOperationsExecutor.shutdownNow();
}

@SuppressWarnings("unchecked")
private <T extends ExecutorService> T createExecutorService(ThreadPoolConfiguration threadPoolConfiguration,
String componentName, ExecutorServiceType type) {
Expand Down
Expand Up @@ -66,6 +66,8 @@ public final void execute(BlockingRunnable runnable) {
@Override
public void shutdown() {
shutdown = true;
controllerThread.interrupt();
executorService.shutdown();
}

@Override
Expand Down

0 comments on commit 76eca3f

Please sign in to comment.