From 76eca3f004760baa8bdb429c2e4a0a8f2081590d Mon Sep 17 00:00:00 2001 From: Dan Berindei Date: Thu, 27 Jun 2019 19:33:58 +0300 Subject: [PATCH] ISPN-10363 LazyInitializingExecutorService is not thread-safe * Remove cached executors from NamedExecutorFactory * Move the @Stop methods to the executors themselves * Don't allow lazy executors to start after stopping --- ...izingBlockingTaskAwareExecutorService.java | 34 ++++++-- .../LazyInitializingExecutorService.java | 29 +++++-- ...yInitializingScheduledExecutorService.java | 29 +++++-- .../factories/NamedExecutorsFactory.java | 83 ++----------------- .../BlockingTaskAwareExecutorServiceImpl.java | 2 + .../remoting/AsynchronousInvocationTest.java | 20 ++--- 6 files changed, 95 insertions(+), 102 deletions(-) diff --git a/core/src/main/java/org/infinispan/executors/LazyInitializingBlockingTaskAwareExecutorService.java b/core/src/main/java/org/infinispan/executors/LazyInitializingBlockingTaskAwareExecutorService.java index 93dfb3c7cb41..c82b8e31ee89 100644 --- a/core/src/main/java/org/infinispan/executors/LazyInitializingBlockingTaskAwareExecutorService.java +++ b/core/src/main/java/org/infinispan/executors/LazyInitializingBlockingTaskAwareExecutorService.java @@ -1,7 +1,6 @@ package org.infinispan.executors; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -13,9 +12,13 @@ import org.infinispan.commons.executors.ThreadPoolExecutorFactory; import org.infinispan.commons.time.TimeService; +import org.infinispan.factories.annotations.Stop; +import org.infinispan.factories.scopes.Scope; +import org.infinispan.factories.scopes.Scopes; import org.infinispan.util.concurrent.BlockingRunnable; import org.infinispan.util.concurrent.BlockingTaskAwareExecutorService; import org.infinispan.util.concurrent.BlockingTaskAwareExecutorServiceImpl; +import org.infinispan.util.concurrent.WithinThreadExecutor; /** * A delegating executor that lazily constructs and initializes the underlying executor. @@ -23,13 +26,22 @@ * @author Pedro Ruivo * @since 5.3 */ -public final class LazyInitializingBlockingTaskAwareExecutorService extends ManageableExecutorService implements BlockingTaskAwareExecutorService { +@Scope(Scopes.GLOBAL) +public final class LazyInitializingBlockingTaskAwareExecutorService + extends ManageableExecutorService implements BlockingTaskAwareExecutorService { + + private static final BlockingTaskAwareExecutorService STOPPED; + + static { + STOPPED = new BlockingTaskAwareExecutorServiceImpl("", new WithinThreadExecutor(), null); + STOPPED.shutdown(); + } private final ThreadPoolExecutorFactory executorFactory; private final ThreadFactory threadFactory; private final TimeService timeService; private final String controllerThreadName; - private volatile BlockingTaskAwareExecutorServiceImpl blockingExecutor; + private volatile BlockingTaskAwareExecutorService blockingExecutor; public LazyInitializingBlockingTaskAwareExecutorService(ThreadPoolExecutorFactory executorFactory, ThreadFactory threadFactory, @@ -55,15 +67,23 @@ public void checkForReadyTasks() { @Override public void shutdown() { - if (blockingExecutor != null) blockingExecutor.shutdown(); + synchronized (this) { + if (blockingExecutor == null) { + blockingExecutor = STOPPED; + } + blockingExecutor.shutdown(); + } } + @Stop @Override public List shutdownNow() { - if (blockingExecutor == null) - return Collections.emptyList(); - else + synchronized (this) { + if (blockingExecutor == null) { + blockingExecutor = STOPPED; + } return blockingExecutor.shutdownNow(); + } } @Override diff --git a/core/src/main/java/org/infinispan/executors/LazyInitializingExecutorService.java b/core/src/main/java/org/infinispan/executors/LazyInitializingExecutorService.java index e395fbab59c2..2cec683e840b 100644 --- a/core/src/main/java/org/infinispan/executors/LazyInitializingExecutorService.java +++ b/core/src/main/java/org/infinispan/executors/LazyInitializingExecutorService.java @@ -1,7 +1,6 @@ package org.infinispan.executors; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -12,6 +11,10 @@ import java.util.concurrent.TimeoutException; import org.infinispan.commons.executors.ThreadPoolExecutorFactory; +import org.infinispan.factories.annotations.Stop; +import org.infinispan.factories.scopes.Scope; +import org.infinispan.factories.scopes.Scopes; +import org.infinispan.util.concurrent.WithinThreadExecutor; /** * A delegating executor that lazily constructs and initializes the underlying executor, since unused JDK executors @@ -20,7 +23,15 @@ * @author Manik Surtani * @since 5.1 */ +@Scope(Scopes.GLOBAL) public final class LazyInitializingExecutorService extends ManageableExecutorService implements ExecutorService { + private static final ExecutorService STOPPED; + + static { + STOPPED = new WithinThreadExecutor(); + STOPPED.shutdown(); + } + private final ThreadPoolExecutorFactory executorFactory; private final ThreadFactory threadFactory; @@ -42,15 +53,23 @@ private void initIfNeeded() { @Override public void shutdown() { - if (executor != null) executor.shutdown(); + synchronized (this) { + if (executor == null) { + executor = STOPPED; + } + executor.shutdown(); + } } + @Stop @Override public List shutdownNow() { - if (executor == null) - return Collections.emptyList(); - else + synchronized (this) { + if (executor == null) { + executor = STOPPED; + } return executor.shutdownNow(); + } } @Override diff --git a/core/src/main/java/org/infinispan/executors/LazyInitializingScheduledExecutorService.java b/core/src/main/java/org/infinispan/executors/LazyInitializingScheduledExecutorService.java index bc74e32c0264..e4d6e135b77c 100644 --- a/core/src/main/java/org/infinispan/executors/LazyInitializingScheduledExecutorService.java +++ b/core/src/main/java/org/infinispan/executors/LazyInitializingScheduledExecutorService.java @@ -1,18 +1,21 @@ package org.infinispan.executors; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.infinispan.commons.executors.ThreadPoolExecutorFactory; +import org.infinispan.factories.annotations.Stop; +import org.infinispan.factories.scopes.Scope; +import org.infinispan.factories.scopes.Scopes; /** * A delegating scheduled executor that lazily constructs and initializes the underlying scheduled executor, since @@ -21,7 +24,15 @@ * @author Manik Surtani * @since 5.1 */ +@Scope(Scopes.GLOBAL) public class LazyInitializingScheduledExecutorService extends ManageableExecutorService implements ScheduledExecutorService { + private static final ScheduledExecutorService STOPPED; + + static { + STOPPED = new ScheduledThreadPoolExecutor(0); + STOPPED.shutdown(); + } + private final ThreadPoolExecutorFactory executorFactory; private final ThreadFactory threadFactory; @@ -43,15 +54,23 @@ private void initIfNeeded() { @Override public void shutdown() { - if (executor != null) executor.shutdown(); + synchronized (this) { + if (executor == null) { + executor = STOPPED; + } + executor.shutdown(); + } } + @Stop @Override public List shutdownNow() { - if (executor == null) - return Collections.emptyList(); - else + synchronized (this) { + if (executor == null) { + executor = STOPPED; + } return executor.shutdownNow(); + } } @Override diff --git a/core/src/main/java/org/infinispan/factories/NamedExecutorsFactory.java b/core/src/main/java/org/infinispan/factories/NamedExecutorsFactory.java index 592f8d29d2f8..59bba327e16b 100644 --- a/core/src/main/java/org/infinispan/factories/NamedExecutorsFactory.java +++ b/core/src/main/java/org/infinispan/factories/NamedExecutorsFactory.java @@ -12,7 +12,6 @@ import static org.infinispan.factories.KnownComponentNames.shortened; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import org.infinispan.commons.CacheConfigurationException; @@ -25,9 +24,7 @@ import org.infinispan.executors.LazyInitializingExecutorService; import org.infinispan.executors.LazyInitializingScheduledExecutorService; import org.infinispan.factories.annotations.DefaultFactoryFor; -import org.infinispan.factories.annotations.Stop; import org.infinispan.factories.threads.DefaultThreadFactory; -import org.infinispan.util.concurrent.BlockingTaskAwareExecutorService; /** * A factory that specifically knows how to create named executors. @@ -39,98 +36,47 @@ @DefaultFactoryFor(names = {ASYNC_TRANSPORT_EXECUTOR, ASYNC_NOTIFICATION_EXECUTOR, PERSISTENCE_EXECUTOR, ASYNC_OPERATIONS_EXECUTOR, EXPIRATION_SCHEDULED_EXECUTOR, REMOTE_COMMAND_EXECUTOR, STATE_TRANSFER_EXECUTOR, TIMEOUT_SCHEDULE_EXECUTOR}) public class NamedExecutorsFactory extends AbstractComponentFactory implements AutoInstantiableFactory { - - private ExecutorService notificationExecutor; - private ExecutorService asyncTransportExecutor; - private ExecutorService persistenceExecutor; - private BlockingTaskAwareExecutorService remoteCommandsExecutor; - private ScheduledExecutorService expirationExecutor; - private ExecutorService stateTransferExecutor; - private ExecutorService asyncOperationsExecutor; - private ScheduledExecutorService timeoutExecutor; - @Override - @SuppressWarnings("unchecked") public Object construct(String componentName) { try { // Construction happens only on startup of either CacheManager, or Cache, so // using synchronized protection does not have a great impact on app performance. if (componentName.equals(ASYNC_NOTIFICATION_EXECUTOR)) { - synchronized (this) { - if (notificationExecutor == null) { - notificationExecutor = createExecutorService( + return createExecutorService( globalConfiguration.listenerThreadPool(), ASYNC_NOTIFICATION_EXECUTOR, ExecutorServiceType.DEFAULT); - } - } - return notificationExecutor; } else if (componentName.equals(PERSISTENCE_EXECUTOR)) { - synchronized (this) { - if (persistenceExecutor == null) { - persistenceExecutor = createExecutorService( + return createExecutorService( globalConfiguration.persistenceThreadPool(), PERSISTENCE_EXECUTOR, ExecutorServiceType.DEFAULT); - } - } - return persistenceExecutor; } else if (componentName.equals(ASYNC_TRANSPORT_EXECUTOR)) { - synchronized (this) { - if (asyncTransportExecutor == null) { - asyncTransportExecutor = createExecutorService( + return createExecutorService( globalConfiguration.transport().transportThreadPool(), ASYNC_TRANSPORT_EXECUTOR, ExecutorServiceType.DEFAULT); - } - } - return asyncTransportExecutor; } else if (componentName.equals(EXPIRATION_SCHEDULED_EXECUTOR)) { - synchronized (this) { - if (expirationExecutor == null) { - expirationExecutor = createExecutorService( + return createExecutorService( globalConfiguration.expirationThreadPool(), EXPIRATION_SCHEDULED_EXECUTOR, ExecutorServiceType.SCHEDULED); - } - } - return expirationExecutor; } else if (componentName.equals(REMOTE_COMMAND_EXECUTOR)) { - synchronized (this) { - if (remoteCommandsExecutor == null) { - remoteCommandsExecutor = createExecutorService( + return createExecutorService( globalConfiguration.transport().remoteCommandThreadPool(), REMOTE_COMMAND_EXECUTOR, ExecutorServiceType.BLOCKING); - } - } - return remoteCommandsExecutor; } else if (componentName.equals(STATE_TRANSFER_EXECUTOR)) { - synchronized (this) { - if (stateTransferExecutor == null) { - stateTransferExecutor = createExecutorService( + return createExecutorService( globalConfiguration.stateTransferThreadPool(), STATE_TRANSFER_EXECUTOR, ExecutorServiceType.DEFAULT); - } - } - return stateTransferExecutor; } else if (componentName.equals(ASYNC_OPERATIONS_EXECUTOR)) { - synchronized (this) { - if (asyncOperationsExecutor == null) { - asyncOperationsExecutor = createExecutorService( + return createExecutorService( globalConfiguration.asyncThreadPool(), ASYNC_OPERATIONS_EXECUTOR, ExecutorServiceType.DEFAULT); - } - } - return asyncOperationsExecutor; } else if (componentName.endsWith(TIMEOUT_SCHEDULE_EXECUTOR)) { - synchronized (this) { - if (timeoutExecutor == null) { - timeoutExecutor = createExecutorService(null, TIMEOUT_SCHEDULE_EXECUTOR, ExecutorServiceType.SCHEDULED); - } - } - return timeoutExecutor; + return createExecutorService(null, TIMEOUT_SCHEDULE_EXECUTOR, ExecutorServiceType.SCHEDULED); } else { throw new CacheConfigurationException("Unknown named executor " + componentName); } @@ -141,19 +87,6 @@ public Object construct(String componentName) { } } - @Stop(priority = 999) - public void stop() { - // TODO Dan: awaitTermination()? - if (remoteCommandsExecutor != null) remoteCommandsExecutor.shutdownNow(); - if (notificationExecutor != null) notificationExecutor.shutdownNow(); - if (persistenceExecutor != null) persistenceExecutor.shutdownNow(); - if (asyncTransportExecutor != null) asyncTransportExecutor.shutdownNow(); - if (expirationExecutor != null) expirationExecutor.shutdownNow(); - if (stateTransferExecutor != null) stateTransferExecutor.shutdownNow(); - if (timeoutExecutor != null) timeoutExecutor.shutdownNow(); - if (asyncOperationsExecutor != null) asyncOperationsExecutor.shutdownNow(); - } - @SuppressWarnings("unchecked") private T createExecutorService(ThreadPoolConfiguration threadPoolConfiguration, String componentName, ExecutorServiceType type) { diff --git a/core/src/main/java/org/infinispan/util/concurrent/BlockingTaskAwareExecutorServiceImpl.java b/core/src/main/java/org/infinispan/util/concurrent/BlockingTaskAwareExecutorServiceImpl.java index c6951971e60b..91f9057a9866 100644 --- a/core/src/main/java/org/infinispan/util/concurrent/BlockingTaskAwareExecutorServiceImpl.java +++ b/core/src/main/java/org/infinispan/util/concurrent/BlockingTaskAwareExecutorServiceImpl.java @@ -66,6 +66,8 @@ public final void execute(BlockingRunnable runnable) { @Override public void shutdown() { shutdown = true; + controllerThread.interrupt(); + executorService.shutdown(); } @Override diff --git a/core/src/test/java/org/infinispan/remoting/AsynchronousInvocationTest.java b/core/src/test/java/org/infinispan/remoting/AsynchronousInvocationTest.java index 0ec091eb02f2..db1dfc6abe30 100644 --- a/core/src/test/java/org/infinispan/remoting/AsynchronousInvocationTest.java +++ b/core/src/test/java/org/infinispan/remoting/AsynchronousInvocationTest.java @@ -19,14 +19,14 @@ import org.infinispan.commands.CommandsFactory; import org.infinispan.commands.ReplicableCommand; import org.infinispan.commands.VisitableCommand; +import org.infinispan.commands.module.TestGlobalConfigurationBuilder; import org.infinispan.commands.remote.ClusteredGetCommand; import org.infinispan.commands.remote.SingleRpcCommand; import org.infinispan.commons.util.EnumUtil; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; -import org.infinispan.factories.GlobalComponentRegistry; +import org.infinispan.configuration.global.GlobalConfigurationBuilder; import org.infinispan.factories.KnownComponentNames; -import org.infinispan.factories.impl.BasicComponentRegistry; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.remoting.inboundhandler.DeliverOrder; import org.infinispan.remoting.inboundhandler.InboundInvocationHandler; @@ -81,21 +81,20 @@ private static C mockCommand(Class commandClass @BeforeClass public void setUp() throws Throwable { executorService = new DummyTaskCountExecutorService(); - final BlockingTaskAwareExecutorService remoteExecutorService = new BlockingTaskAwareExecutorServiceImpl("AsynchronousInvocationTest-Controller", executorService, - TIME_SERVICE); + BlockingTaskAwareExecutorService remoteExecutorService = + new BlockingTaskAwareExecutorServiceImpl("AsynchronousInvocationTest-Controller", executorService, + TIME_SERVICE); + GlobalConfigurationBuilder globalBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder(); + globalBuilder.addModule(TestGlobalConfigurationBuilder.class) + .testGlobalComponent(KnownComponentNames.REMOTE_COMMAND_EXECUTOR, remoteExecutorService); ConfigurationBuilder builder = getDefaultCacheConfiguration(false); builder.clustering().cacheMode(CacheMode.DIST_SYNC); - cacheManager = createClusteredCacheManager(builder); + cacheManager = createClusteredCacheManager(globalBuilder, builder); Cache cache = cacheManager.getCache(); ByteString cacheName = ByteString.fromString(cache.getName()); Transport transport = extractGlobalComponent(cacheManager, Transport.class); address = transport.getAddress(); invocationHandler = TestingUtil.extractGlobalComponent(cacheManager, InboundInvocationHandler.class); - GlobalComponentRegistry globalRegistry = cache.getCacheManager().getGlobalComponentRegistry(); - BasicComponentRegistry gbcr = globalRegistry.getComponent(BasicComponentRegistry.class); - gbcr.replaceComponent(KnownComponentNames.REMOTE_COMMAND_EXECUTOR, remoteExecutorService, false); - gbcr.rewire(); - globalRegistry.rewireNamedRegistries(); commandsFactory = extractCommandsFactory(cache); @@ -117,6 +116,7 @@ public void setUp() throws Throwable { @AfterClass public void tearDown() { if (cacheManager != null) { + // BlockingTaskAwareExecutorServiceImpl doesn't have a @Stop annotation so we need to stop it manually cacheManager.getGlobalComponentRegistry().getComponent(ExecutorService.class, KnownComponentNames.REMOTE_COMMAND_EXECUTOR).shutdownNow(); cacheManager.stop(); }