Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

ISPN-2808 Make Infinispan use its own thread pool for sending message…

…s in order to avoid thread deadlocks
  • Loading branch information...
commit d200b9ac78886c347d642564243f2792f5cd41f7 1 parent 436fe5f
@pruivo pruivo authored
Showing with 778 additions and 53 deletions.
  1. +4 −0 core/src/main/java/org/infinispan/commands/CancelCommand.java
  2. +5 −0 core/src/main/java/org/infinispan/commands/CreateCacheCommand.java
  3. +5 −0 core/src/main/java/org/infinispan/commands/RemoveCacheCommand.java
  4. +13 −1 core/src/main/java/org/infinispan/commands/ReplicableCommand.java
  5. +5 −0 core/src/main/java/org/infinispan/commands/read/AbstractDataCommand.java
  6. +4 −0 core/src/main/java/org/infinispan/commands/read/AbstractLocalCommand.java
  7. +5 −0 core/src/main/java/org/infinispan/commands/read/DistributedExecuteCommand.java
  8. +5 −0 core/src/main/java/org/infinispan/commands/read/MapCombineCommand.java
  9. +5 −0 core/src/main/java/org/infinispan/commands/read/ReduceCommand.java
  10. +5 −0 core/src/main/java/org/infinispan/commands/remote/ClusteredGetCommand.java
  11. +11 −1 core/src/main/java/org/infinispan/commands/remote/MultipleRpcCommand.java
  12. +5 −0 core/src/main/java/org/infinispan/commands/remote/SingleRpcCommand.java
  13. +6 −0 core/src/main/java/org/infinispan/commands/remote/recovery/CompleteTransactionCommand.java
  14. +5 −0 core/src/main/java/org/infinispan/commands/remote/recovery/RecoveryCommand.java
  15. +6 −0 core/src/main/java/org/infinispan/commands/remote/recovery/TxCompletionNotificationCommand.java
  16. +6 −0 core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java
  17. +5 −0 core/src/main/java/org/infinispan/commands/write/AbstractDataWriteCommand.java
  18. +5 −0 core/src/main/java/org/infinispan/commands/write/ClearCommand.java
  19. +1 −1  core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
  20. +5 −0 core/src/main/java/org/infinispan/commands/write/PutMapCommand.java
  21. +5 −0 core/src/main/java/org/infinispan/configuration/global/AbstractGlobalConfigurationBuilder.java
  22. +9 −1 core/src/main/java/org/infinispan/configuration/global/GlobalConfiguration.java
  23. +14 −1 core/src/main/java/org/infinispan/configuration/global/GlobalConfigurationBuilder.java
  24. +2 −0  core/src/main/java/org/infinispan/configuration/global/GlobalConfigurationChildBuilder.java
  25. +1 −0  core/src/main/java/org/infinispan/configuration/parsing/Element.java
  26. +36 −0 core/src/main/java/org/infinispan/configuration/parsing/Parser53.java
  27. +8 −4 core/src/main/java/org/infinispan/executors/DefaultExecutorFactory.java
  28. +39 −0 core/src/main/java/org/infinispan/executors/WithinThreadExecutorFactory.java
  29. +16 −3 core/src/main/java/org/infinispan/factories/KnownComponentNames.java
  30. +16 −0 core/src/main/java/org/infinispan/factories/NamedExecutorsFactory.java
  31. +6 −3 core/src/main/java/org/infinispan/remoting/InboundInvocationHandler.java
  32. +49 −13 core/src/main/java/org/infinispan/remoting/InboundInvocationHandlerImpl.java
  33. +72 −14 core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
  34. +5 −1 core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
  35. +6 −1 core/src/main/java/org/infinispan/statetransfer/StateRequestCommand.java
  36. +5 −0 core/src/main/java/org/infinispan/statetransfer/StateResponseCommand.java
  37. +5 −0 core/src/main/java/org/infinispan/topology/CacheTopologyControlCommand.java
  38. +5 −0 core/src/main/java/org/infinispan/xsite/XSiteAdminCommand.java
  39. +7 −0 core/src/main/resources/schema/infinispan-config-5.3.xsd
  40. +18 −2 core/src/test/java/org/infinispan/configuration/XmlFileParsingTest.java
  41. +2 −4 core/src/test/java/org/infinispan/distribution/rehash/OngoingTransactionsAndJoinTest.java
  42. +264 −0 core/src/test/java/org/infinispan/remoting/AsynchronousInvocationTest.java
  43. +16 −3 core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java
  44. +5 −0 core/src/test/resources/configs/all.xml
  45. +11 −0 core/src/test/resources/configs/named-cache-test.xml
  46. +5 −0 query/src/main/java/org/infinispan/query/clustered/ClusteredQueryCommand.java
  47. +5 −0 query/src/main/java/org/infinispan/query/indexmanager/IndexUpdateCommand.java
  48. +35 −0 spring/src/main/java/org/infinispan/spring/AbstractEmbeddedCacheManagerFactory.java
View
4 core/src/main/java/org/infinispan/commands/CancelCommand.java
@@ -128,4 +128,8 @@ public String toString() {
return "CancelCommand [uuid=" + commandToCancel + "]";
}
+ @Override
+ public boolean canBlock() {
+ return false;
+ }
}
View
5 core/src/main/java/org/infinispan/commands/CreateCacheCommand.java
@@ -202,4 +202,9 @@ public String toString() {
public boolean isReturnValueExpected() {
return true;
}
+
+ @Override
+ public boolean canBlock() {
+ return true;
+ }
}
View
5 core/src/main/java/org/infinispan/commands/RemoveCacheCommand.java
@@ -92,4 +92,9 @@ public void setParameters(int commandId, Object[] parameters) {
public boolean isReturnValueExpected() {
return false;
}
+
+ @Override
+ public boolean canBlock() {
+ return true;
+ }
}
View
14 core/src/main/java/org/infinispan/commands/ReplicableCommand.java
@@ -73,4 +73,16 @@
* @return true or false
*/
boolean isReturnValueExpected();
-}
+
+ /**
+ * If true, the command is processed asynchronously in a thread provided by an Infinispan thread pool. Otherwise,
+ * the command is processed directly in the JGroups thread.
+ * <p/>
+ * This feature allows to avoid keep a JGroups thread busy that can originate discard of messages and
+ * retransmissions. So, the commands that can block (waiting for some state, acquiring locks, etc.) should return
+ * true.
+ *
+ * @return {@code true} if the command can block/wait, {@code false} otherwise
+ */
+ boolean canBlock();
+}
View
5 core/src/main/java/org/infinispan/commands/read/AbstractDataCommand.java
@@ -112,4 +112,9 @@ public String toString() {
public boolean isReturnValueExpected() {
return true;
}
+
+ @Override
+ public boolean canBlock() {
+ return false;
+ }
}
View
4 core/src/main/java/org/infinispan/commands/read/AbstractLocalCommand.java
@@ -64,4 +64,8 @@ public boolean ignoreCommandOnStatus(ComponentStatus status) {
public boolean isReturnValueExpected() {
return false;
}
+
+ public boolean canBlock() {
+ return false;
+ }
}
View
5 core/src/main/java/org/infinispan/commands/read/DistributedExecuteCommand.java
@@ -203,4 +203,9 @@ public boolean isReturnValueExpected() {
return true;
}
+ @Override
+ public boolean canBlock() {
+ return true;
+ }
+
}
View
5 core/src/main/java/org/infinispan/commands/read/MapCombineCommand.java
@@ -164,6 +164,11 @@ public boolean isReturnValueExpected() {
}
@Override
+ public boolean canBlock() {
+ return true;
+ }
+
+ @Override
public int hashCode() {
final int prime = 31;
int result = 1;
View
5 core/src/main/java/org/infinispan/commands/read/ReduceCommand.java
@@ -141,6 +141,11 @@ public boolean isReturnValueExpected() {
return true;
}
+ @Override
+ public boolean canBlock() {
+ return true;
+ }
+
@SuppressWarnings("rawtypes")
@Override
public boolean equals(Object obj) {
View
5 core/src/main/java/org/infinispan/commands/remote/ClusteredGetCommand.java
@@ -232,6 +232,11 @@ public boolean isReturnValueExpected() {
}
@Override
+ public boolean canBlock() {
+ return false;
+ }
+
+ @Override
public int getTopologyId() {
return topologyId;
}
View
12 core/src/main/java/org/infinispan/commands/remote/MultipleRpcCommand.java
@@ -140,4 +140,14 @@ public String toString() {
public boolean isReturnValueExpected() {
return false;
}
-}
+
+ @Override
+ public boolean canBlock() {
+ for (ReplicableCommand command : commands) {
+ if (command.canBlock()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
View
5 core/src/main/java/org/infinispan/commands/remote/SingleRpcCommand.java
@@ -109,4 +109,9 @@ public ReplicableCommand getCommand() {
public boolean isReturnValueExpected() {
return command.isReturnValueExpected();
}
+
+ @Override
+ public boolean canBlock() {
+ return command.canBlock();
+ }
}
View
6 core/src/main/java/org/infinispan/commands/remote/recovery/CompleteTransactionCommand.java
@@ -86,6 +86,12 @@ public void setParameters(int commandId, Object[] parameters) {
}
@Override
+ public boolean canBlock() {
+ //this command performs the 2PC commit.
+ return true;
+ }
+
+ @Override
public String toString() {
return getClass().getSimpleName() +
"{ xid=" + xid +
View
5 core/src/main/java/org/infinispan/commands/remote/recovery/RecoveryCommand.java
@@ -51,4 +51,9 @@ public void init(RecoveryManager rm) {
public boolean isReturnValueExpected() {
return true;
}
+
+ @Override
+ public boolean canBlock() {
+ return false;
+ }
}
View
6 ...rc/main/java/org/infinispan/commands/remote/recovery/TxCompletionNotificationCommand.java
@@ -152,6 +152,12 @@ public void setParameters(int commandId, Object[] parameters) {
}
@Override
+ public boolean canBlock() {
+ //this command can be forwarded (state transfer)
+ return true;
+ }
+
+ @Override
public String toString() {
return getClass().getSimpleName() +
"{ xid=" + xid +
View
6 core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java
@@ -180,4 +180,10 @@ public void setOrigin(Address origin) {
public boolean isReturnValueExpected() {
return true;
}
+
+ @Override
+ public final boolean canBlock() {
+ //all tx commands must wait for the correct topology
+ return true;
+ }
}
View
5 core/src/main/java/org/infinispan/commands/write/AbstractDataWriteCommand.java
@@ -69,4 +69,9 @@ public final void setPreviousRead(boolean value) {
public final boolean wasPreviousRead() {
return previousRead;
}
+
+ @Override
+ public boolean canBlock() {
+ return true;
+ }
}
View
5 core/src/main/java/org/infinispan/commands/write/ClearCommand.java
@@ -132,6 +132,11 @@ public boolean isReturnValueExpected() {
}
@Override
+ public boolean canBlock() {
+ return true;
+ }
+
+ @Override
public boolean ignoreCommandOnStatus(ComponentStatus status) {
return false;
}
View
2  core/src/main/java/org/infinispan/commands/write/InvalidateL1Command.java
@@ -161,7 +161,7 @@ public boolean equals(Object o) {
@Override
public Object[] getParameters() {
if (keys == null || keys.length == 0) {
- return new Object[]{forRehash, writeOrigin};
+ return new Object[]{forRehash, writeOrigin, 0};
} else if (keys.length == 1) {
return new Object[]{forRehash, writeOrigin, 1, keys[0]};
} else {
View
5 core/src/main/java/org/infinispan/commands/write/PutMapCommand.java
@@ -199,6 +199,11 @@ public boolean isReturnValueExpected() {
return false;
}
+ @Override
+ public boolean canBlock() {
+ return true;
+ }
+
public long getLifespanMillis() {
return lifespanMillis;
}
View
5 ...src/main/java/org/infinispan/configuration/global/AbstractGlobalConfigurationBuilder.java
@@ -58,6 +58,11 @@ public ExecutorFactoryConfigurationBuilder asyncTransportExecutor() {
}
@Override
+ public ExecutorFactoryConfigurationBuilder remoteCommandsExecutor() {
+ return globalConfig.remoteCommandsExecutor();
+ }
+
+ @Override
public ScheduledExecutorFactoryConfigurationBuilder evictionScheduledExecutor() {
return globalConfig.evictionScheduledExecutor();
}
View
10 core/src/main/java/org/infinispan/configuration/global/GlobalConfiguration.java
@@ -60,6 +60,7 @@
private final ExecutorFactoryConfiguration asyncListenerExecutor;
private final ExecutorFactoryConfiguration asyncTransportExecutor;
+ private final ExecutorFactoryConfiguration remoteCommandsExecutor;
private final ScheduledExecutorFactoryConfiguration evictionScheduledExecutor;
private final ScheduledExecutorFactoryConfiguration replicationQueueScheduledExecutor;
private final GlobalJmxStatisticsConfiguration globalJmxStatistics;
@@ -71,12 +72,14 @@
private final WeakReference<ClassLoader> cl;
GlobalConfiguration(ExecutorFactoryConfiguration asyncListenerExecutor,
- ExecutorFactoryConfiguration asyncTransportExecutor, ScheduledExecutorFactoryConfiguration evictionScheduledExecutor,
+ ExecutorFactoryConfiguration asyncTransportExecutor, ExecutorFactoryConfiguration remoteCommandsExecutor,
+ ScheduledExecutorFactoryConfiguration evictionScheduledExecutor,
ScheduledExecutorFactoryConfiguration replicationQueueScheduledExecutor, GlobalJmxStatisticsConfiguration globalJmxStatistics,
TransportConfiguration transport, SerializationConfiguration serialization, ShutdownConfiguration shutdown,
List<?> modules, SiteConfiguration site,ClassLoader cl) {
this.asyncListenerExecutor = asyncListenerExecutor;
this.asyncTransportExecutor = asyncTransportExecutor;
+ this.remoteCommandsExecutor = remoteCommandsExecutor;
this.evictionScheduledExecutor = evictionScheduledExecutor;
this.replicationQueueScheduledExecutor = replicationQueueScheduledExecutor;
this.globalJmxStatistics = globalJmxStatistics;
@@ -100,6 +103,10 @@ public ExecutorFactoryConfiguration asyncTransportExecutor() {
return asyncTransportExecutor;
}
+ public ExecutorFactoryConfiguration remoteCommandsExecutor() {
+ return remoteCommandsExecutor;
+ }
+
public ScheduledExecutorFactoryConfiguration evictionScheduledExecutor() {
return evictionScheduledExecutor;
}
@@ -149,6 +156,7 @@ public String toString() {
return "GlobalConfiguration{" +
"asyncListenerExecutor=" + asyncListenerExecutor +
", asyncTransportExecutor=" + asyncTransportExecutor +
+ ", remoteCommandsExecutor=" + remoteCommandsExecutor +
", evictionScheduledExecutor=" + evictionScheduledExecutor +
", replicationQueueScheduledExecutor=" + replicationQueueScheduledExecutor +
", globalJmxStatistics=" + globalJmxStatistics +
View
15 core/src/main/java/org/infinispan/configuration/global/GlobalConfigurationBuilder.java
@@ -42,6 +42,7 @@
private final SerializationConfigurationBuilder serialization;
private final ExecutorFactoryConfigurationBuilder asyncTransportExecutor;
private final ExecutorFactoryConfigurationBuilder asyncListenerExecutor;
+ private final ExecutorFactoryConfigurationBuilder remoteCommandsExecutor;
private final ScheduledExecutorFactoryConfigurationBuilder evictionScheduledExecutor;
private final ScheduledExecutorFactoryConfigurationBuilder replicationQueueScheduledExecutor;
private final ShutdownConfigurationBuilder shutdown;
@@ -55,6 +56,7 @@ public GlobalConfigurationBuilder() {
this.serialization = new SerializationConfigurationBuilder(this);
this.asyncListenerExecutor = new ExecutorFactoryConfigurationBuilder(this);
this.asyncTransportExecutor = new ExecutorFactoryConfigurationBuilder(this);
+ this.remoteCommandsExecutor = new ExecutorFactoryConfigurationBuilder(this);
this.evictionScheduledExecutor = new ScheduledExecutorFactoryConfigurationBuilder(this);
this.replicationQueueScheduledExecutor = new ScheduledExecutorFactoryConfigurationBuilder(this);
this.shutdown = new ShutdownConfigurationBuilder(this);
@@ -127,6 +129,11 @@ public ExecutorFactoryConfigurationBuilder asyncListenerExecutor() {
}
@Override
+ public ExecutorFactoryConfigurationBuilder remoteCommandsExecutor() {
+ return remoteCommandsExecutor;
+ }
+
+ @Override
public ScheduledExecutorFactoryConfigurationBuilder evictionScheduledExecutor() {
return evictionScheduledExecutor;
}
@@ -169,7 +176,7 @@ public SiteConfigurationBuilder site() {
@SuppressWarnings("unchecked")
public void validate() {
for (AbstractGlobalConfigurationBuilder<?> validatable : asList(asyncListenerExecutor, asyncTransportExecutor,
- evictionScheduledExecutor, replicationQueueScheduledExecutor, globalJmxStatistics, transport,
+ remoteCommandsExecutor, evictionScheduledExecutor, replicationQueueScheduledExecutor, globalJmxStatistics, transport,
serialization, shutdown, site)) {
validatable.validate();
}
@@ -187,6 +194,7 @@ public GlobalConfiguration build() {
return new GlobalConfiguration(
asyncListenerExecutor.create(),
asyncTransportExecutor.create(),
+ remoteCommandsExecutor.create(),
evictionScheduledExecutor.create(),
replicationQueueScheduledExecutor.create(),
globalJmxStatistics.create(),
@@ -210,6 +218,7 @@ public GlobalConfigurationBuilder read(GlobalConfiguration template) {
asyncListenerExecutor.read(template.asyncListenerExecutor());
asyncTransportExecutor.read(template.asyncTransportExecutor());
+ remoteCommandsExecutor.read(template.remoteCommandsExecutor());
evictionScheduledExecutor.read(template.evictionScheduledExecutor());
globalJmxStatistics.read(template.globalJmxStatistics());
replicationQueueScheduledExecutor.read(template.replicationQueueScheduledExecutor());
@@ -240,6 +249,7 @@ public String toString() {
", globalJmxStatistics=" + globalJmxStatistics +
", serialization=" + serialization +
", asyncTransportExecutor=" + asyncTransportExecutor +
+ ", remoteCommandsExecutor=" + remoteCommandsExecutor +
", evictionScheduledExecutor=" + evictionScheduledExecutor +
", replicationQueueScheduledExecutor=" + replicationQueueScheduledExecutor +
", shutdown=" + shutdown +
@@ -258,6 +268,8 @@ public boolean equals(Object o) {
return false;
if (asyncTransportExecutor != null ? !asyncTransportExecutor.equals(that.asyncTransportExecutor) : that.asyncTransportExecutor != null)
return false;
+ if (remoteCommandsExecutor != null ? !remoteCommandsExecutor.equals(that.remoteCommandsExecutor) : that.remoteCommandsExecutor != null)
+ return false;
if (cl != null ? !cl.equals(that.cl) : that.cl != null) return false;
if (evictionScheduledExecutor != null ? !evictionScheduledExecutor.equals(that.evictionScheduledExecutor) : that.evictionScheduledExecutor != null)
return false;
@@ -285,6 +297,7 @@ public int hashCode() {
result = 31 * result + (serialization != null ? serialization.hashCode() : 0);
result = 31 * result + (asyncTransportExecutor != null ? asyncTransportExecutor.hashCode() : 0);
result = 31 * result + (asyncListenerExecutor != null ? asyncListenerExecutor.hashCode() : 0);
+ result = 31 * result + (remoteCommandsExecutor != null ? remoteCommandsExecutor.hashCode() : 0);
result = 31 * result + (evictionScheduledExecutor != null ? evictionScheduledExecutor.hashCode() : 0);
result = 31 * result + (replicationQueueScheduledExecutor != null ? replicationQueueScheduledExecutor.hashCode() : 0);
result = 31 * result + (shutdown != null ? shutdown.hashCode() : 0);
View
2  core/src/main/java/org/infinispan/configuration/global/GlobalConfigurationChildBuilder.java
@@ -29,6 +29,8 @@
ExecutorFactoryConfigurationBuilder asyncTransportExecutor();
+ ExecutorFactoryConfigurationBuilder remoteCommandsExecutor();
+
ScheduledExecutorFactoryConfigurationBuilder evictionScheduledExecutor();
ScheduledExecutorFactoryConfigurationBuilder replicationQueueScheduledExecutor();
View
1  core/src/main/java/org/infinispan/configuration/parsing/Element.java
@@ -40,6 +40,7 @@
ASYNC("async"),
ASYNC_LISTENER_EXECUTOR("asyncListenerExecutor"),
ASYNC_TRANSPORT_EXECUTOR("asyncTransportExecutor"),
+ REMOTE_COMMNAND_EXECUTOR("remoteCommandsExecutor"),
CLUSTERING("clustering"),
CLUSTER_LOADER("clusterLoader"),
CUSTOM_INTERCEPTORS("customInterceptors"),
View
36 core/src/main/java/org/infinispan/configuration/parsing/Parser53.java
@@ -1486,6 +1486,10 @@ private void parseGlobal(final XMLExtendedStreamReader reader, final Configurati
parseAsyncTransportExecutor(reader, holder);
break;
}
+ case REMOTE_COMMNAND_EXECUTOR: {
+ parseRemoteCommandsExecutor(reader, holder);
+ break;
+ }
case EVICTION_SCHEDULED_EXECUTOR: {
parseEvictionScheduledExecutor(reader, holder);
break;
@@ -1535,6 +1539,38 @@ private void parseGlobal(final XMLExtendedStreamReader reader, final Configurati
}
}
+ private void parseRemoteCommandsExecutor(final XMLExtendedStreamReader reader, final ConfigurationBuilderHolder holder)
+ throws XMLStreamException {
+ GlobalConfigurationBuilder builder = holder.getGlobalConfigurationBuilder();
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
+ ParseUtils.requireNoNamespaceAttribute(reader, i);
+ String value = replaceProperties(reader.getAttributeValue(i));
+ Attribute attribute = Attribute.forName(reader.getAttributeLocalName(i));
+ switch (attribute) {
+ case FACTORY: {
+ builder.remoteCommandsExecutor().factory(Util.<ExecutorFactory> getInstance(value, holder.getClassLoader()));
+ break;
+ }
+ default: {
+ throw ParseUtils.unexpectedAttribute(reader, i);
+ }
+ }
+ }
+
+ while (reader.hasNext() && (reader.nextTag() != XMLStreamConstants.END_ELEMENT)) {
+ Element element = Element.forName(reader.getLocalName());
+ switch (element) {
+ case PROPERTIES: {
+ builder.remoteCommandsExecutor().withProperties(parseProperties(reader));
+ break;
+ }
+ default: {
+ throw ParseUtils.unexpectedElement(reader);
+ }
+ }
+ }
+ }
+
private void parseTransport(final XMLExtendedStreamReader reader, final ConfigurationBuilderHolder holder) throws XMLStreamException {
GlobalConfigurationBuilder builder = holder.getGlobalConfigurationBuilder();
for (int i = 0; i < reader.getAttributeCount(); i++) {
View
12 core/src/main/java/org/infinispan/executors/DefaultExecutorFactory.java
@@ -27,8 +27,10 @@
import org.infinispan.util.TypedProperties;
import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -48,9 +50,13 @@ public ExecutorService getExecutor(Properties p) {
TypedProperties tp = TypedProperties.toTypedProperties(p);
int maxThreads = tp.getIntProperty("maxThreads", 1);
int queueSize = tp.getIntProperty("queueSize", 100000);
+ int coreThreads = queueSize == 0 ? 1 : tp.getIntProperty("coreThreads", maxThreads);
+ long keepAliveTime = tp.getLongProperty("keepAliveTime", 60000);
final int threadPrio = tp.getIntProperty("threadPriority", Thread.MIN_PRIORITY);
final String threadNamePrefix = tp.getProperty("threadNamePrefix", tp.getProperty("componentName", "Thread"));
final String threadNameSuffix = tp.getProperty("threadNameSuffix", "");
+ BlockingQueue<Runnable> queue = queueSize == 0 ? new SynchronousQueue<Runnable>() :
+ new LinkedBlockingQueue<Runnable>(queueSize);
ThreadFactory tf = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@@ -62,9 +68,7 @@ public Thread newThread(Runnable r) {
}
};
- return new ThreadPoolExecutor(maxThreads, maxThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(queueSize),
- tf);
+ return new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS, queue, tf,
+ new ThreadPoolExecutor.CallerRunsPolicy());
}
}
View
39 core/src/main/java/org/infinispan/executors/WithinThreadExecutorFactory.java
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+
+package org.infinispan.executors;
+
+import org.infinispan.util.concurrent.WithinThreadExecutor;
+
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Executor factory that creates WithinThreadExecutor. This executor executes the tasks in the caller thread.
+ *
+ * @author Pedro Ruivo
+ * @since 5.3
+ */
+public class WithinThreadExecutorFactory implements ExecutorFactory {
+
+ @Override
+ public ExecutorService getExecutor(Properties p) {
+ return new WithinThreadExecutor();
+ }
+}
View
19 core/src/main/java/org/infinispan/factories/KnownComponentNames.java
@@ -36,6 +36,7 @@
*/
public class KnownComponentNames {
public static final String ASYNC_TRANSPORT_EXECUTOR = "org.infinispan.executors.transport";
+ public static final String REMOTE_COMMAND_EXECUTOR = "org.infinispan.executors.remote";
public static final String ASYNC_NOTIFICATION_EXECUTOR = "org.infinispan.executors.notification";
public static final String EVICTION_SCHEDULED_EXECUTOR = "org.infinispan.executors.eviction";
public static final String ASYNC_REPLICATION_QUEUE_EXECUTOR = "org.infinispan.executors.replicationQueue";
@@ -48,18 +49,26 @@
// Please make sure this is kept up to date
public static final Collection<String> ALL_KNOWN_COMPONENT_NAMES = Arrays.asList(
ASYNC_TRANSPORT_EXECUTOR, ASYNC_NOTIFICATION_EXECUTOR, EVICTION_SCHEDULED_EXECUTOR, ASYNC_REPLICATION_QUEUE_EXECUTOR,
- MODULE_COMMAND_INITIALIZERS, MODULE_COMMAND_FACTORIES, GLOBAL_MARSHALLER, CACHE_MARSHALLER, CLASS_LOADER
+ MODULE_COMMAND_INITIALIZERS, MODULE_COMMAND_FACTORIES, GLOBAL_MARSHALLER, CACHE_MARSHALLER, CLASS_LOADER,
+ REMOTE_COMMAND_EXECUTOR
);
- private static final Map<String, Integer> DEFAULT_THREADCOUNTS = new HashMap<String, Integer>(2);
- private static final Map<String, Integer> DEFAULT_THREADPRIO = new HashMap<String, Integer>(4);
+ private static final Map<String, Integer> DEFAULT_THREADCOUNTS = new HashMap<String, Integer>(3);
+ private static final Map<String, Integer> DEFAULT_QUEUE_SIZE = new HashMap<String, Integer>(3);
+ private static final Map<String, Integer> DEFAULT_THREADPRIO = new HashMap<String, Integer>(5);
static {
DEFAULT_THREADCOUNTS.put(ASYNC_NOTIFICATION_EXECUTOR, 1);
DEFAULT_THREADCOUNTS.put(ASYNC_TRANSPORT_EXECUTOR, 25);
+ DEFAULT_THREADCOUNTS.put(REMOTE_COMMAND_EXECUTOR, 32);
+
+ DEFAULT_QUEUE_SIZE.put(ASYNC_NOTIFICATION_EXECUTOR, 100000);
+ DEFAULT_QUEUE_SIZE.put(ASYNC_TRANSPORT_EXECUTOR, 100000);
+ DEFAULT_QUEUE_SIZE.put(REMOTE_COMMAND_EXECUTOR, 0);
DEFAULT_THREADPRIO.put(ASYNC_NOTIFICATION_EXECUTOR, Thread.MIN_PRIORITY);
DEFAULT_THREADPRIO.put(ASYNC_TRANSPORT_EXECUTOR, Thread.NORM_PRIORITY);
+ DEFAULT_THREADPRIO.put(REMOTE_COMMAND_EXECUTOR, Thread.NORM_PRIORITY);
DEFAULT_THREADPRIO.put(EVICTION_SCHEDULED_EXECUTOR, Thread.MIN_PRIORITY);
DEFAULT_THREADPRIO.put(ASYNC_REPLICATION_QUEUE_EXECUTOR, Thread.NORM_PRIORITY);
}
@@ -71,4 +80,8 @@ public static int getDefaultThreads(String componentName) {
public static int getDefaultThreadPrio(String componentName) {
return DEFAULT_THREADPRIO.get(componentName);
}
+
+ public static int getDefaultQueueSize(String componentName) {
+ return DEFAULT_QUEUE_SIZE.get(componentName);
+ }
}
View
16 core/src/main/java/org/infinispan/factories/NamedExecutorsFactory.java
@@ -48,6 +48,7 @@
private ExecutorService notificationExecutor;
private ExecutorService asyncTransportExecutor;
+ private ExecutorService remoteCommandsExecutor;
private ScheduledExecutorService evictionExecutor;
private ScheduledExecutorService asyncReplicationExecutor;
@@ -96,6 +97,15 @@
}
}
return (T) asyncReplicationExecutor;
+ } else if (componentName.equals(REMOTE_COMMAND_EXECUTOR)) {
+ synchronized (this) {
+ if (remoteCommandsExecutor == null) {
+ remoteCommandsExecutor = buildAndConfigureExecutorService(
+ globalConfiguration.remoteCommandsExecutor().factory(),
+ globalConfiguration.remoteCommandsExecutor().properties(), componentName, nodeName);
+ }
+ }
+ return (T) remoteCommandsExecutor;
} else {
throw new ConfigurationException("Unknown named executor " + componentName);
}
@@ -108,6 +118,7 @@
@Stop(priority = 999)
public void stop() {
+ if (remoteCommandsExecutor != null) remoteCommandsExecutor.shutdownNow();
if (notificationExecutor != null) notificationExecutor.shutdownNow();
if (asyncTransportExecutor != null) asyncTransportExecutor.shutdownNow();
if (asyncReplicationExecutor != null) asyncReplicationExecutor.shutdownNow();
@@ -122,6 +133,7 @@ private ExecutorService buildAndConfigureExecutorService(ExecutorFactory f, Prop
setComponentName(componentName, props);
setDefaultThreads(KnownComponentNames.getDefaultThreads(componentName), props);
setDefaultThreadPrio(KnownComponentNames.getDefaultThreadPrio(componentName), props);
+ setDefaultQueueSize(KnownComponentNames.getDefaultQueueSize(componentName), props);
return new LazyInitializingExecutorService(f, props);
}
@@ -142,6 +154,10 @@ private void setThreadSuffix(String nodeName, Properties props) {
}
}
+ private void setDefaultQueueSize(int queueSize, Properties props) {
+ if (!props.containsKey("queueSize")) props.setProperty("queueSize", String.valueOf(queueSize));
+ }
+
private void setDefaultThreadPrio(int prio, Properties props) {
if (!props.containsKey("threadPriority")) props.setProperty("threadPriority", String.valueOf(prio));
}
View
9 core/src/main/java/org/infinispan/remoting/InboundInvocationHandler.java
@@ -25,8 +25,8 @@
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
-import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.transport.Address;
+import org.jgroups.blocks.Response;
/**
* A globally scoped component, that is able to locate named caches and invoke remotely originating calls on the
@@ -42,9 +42,12 @@
/**
* Invokes a command on the cache, from a remote source.
*
+ *
* @param command command to invoke
- * @return results, if any, from the invocation
+ * @param response the asynchronous request reference from {@code org.infinispan.remoting.transport.Transport}.
+ * A {@code null} value means that the request does not expect a return value.
+ * @param preserveOrder
* @throws Throwable in the event of problems executing the command
*/
- Response handle(CacheRpcCommand command, Address origin) throws Throwable;
+ void handle(CacheRpcCommand command, Address origin, Response response, boolean preserveOrder) throws Throwable;
}
View
62 core/src/main/java/org/infinispan/remoting/InboundInvocationHandlerImpl.java
@@ -22,6 +22,7 @@
*/
package org.infinispan.remoting;
+import org.infinispan.CacheException;
import org.infinispan.commands.CancellableCommand;
import org.infinispan.commands.CancellationService;
import org.infinispan.commands.CommandsFactory;
@@ -29,6 +30,8 @@
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.GlobalComponentRegistry;
+import org.infinispan.factories.KnownComponentNames;
+import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
@@ -42,6 +45,8 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
+import java.util.concurrent.ExecutorService;
+
/**
* Sets the cache interceptor chain on an RPCCommand before calling it to perform
*
@@ -56,18 +61,21 @@
private GlobalConfiguration globalConfiguration;
private Transport transport;
private CancellationService cancelService;
+ private ExecutorService remoteCommandsExecutor;
@Inject
public void inject(GlobalComponentRegistry gcr, Transport transport,
+ @ComponentName(KnownComponentNames.REMOTE_COMMAND_EXECUTOR) ExecutorService remoteCommandsExecutor,
GlobalConfiguration globalConfiguration, CancellationService cancelService) {
this.gcr = gcr;
this.transport = transport;
this.globalConfiguration = globalConfiguration;
this.cancelService = cancelService;
+ this.remoteCommandsExecutor = remoteCommandsExecutor;
}
@Override
- public Response handle(final CacheRpcCommand cmd, Address origin) throws Throwable {
+ public void handle(final CacheRpcCommand cmd, Address origin, org.jgroups.blocks.Response response, boolean preserveOrder) throws Throwable {
cmd.setOrigin(origin);
String cacheName = cmd.getCacheName();
@@ -76,22 +84,21 @@ public Response handle(final CacheRpcCommand cmd, Address origin) throws Throwab
if (cr == null) {
if (!globalConfiguration.transport().strictPeerToPeer()) {
if (trace) log.tracef("Strict peer to peer off, so silently ignoring that %s cache is not defined", cacheName);
- return null;
+ reply(response, null);
+ return;
}
log.namedCacheDoesNotExist(cacheName);
- return new ExceptionResponse(new NamedCacheNotFoundException(cacheName, "Cache has not been started on node " + transport.getAddress()));
+ Response retVal = new ExceptionResponse(new NamedCacheNotFoundException(cacheName, "Cache has not been started on node " + transport.getAddress()));
+ reply(response, retVal);
+ return;
}
- return handleWithWaitForBlocks(cmd, cr);
+ handleWithWaitForBlocks(cmd, cr, response, preserveOrder);
}
private Response handleInternal(final CacheRpcCommand cmd, final ComponentRegistry cr) throws Throwable {
- CommandsFactory commandsFactory = cr.getCommandsFactory();
-
- // initialize this command with components specific to the intended cache instance
- commandsFactory.initializeReplicableCommand(cmd, true);
try {
if (trace) log.tracef("Calling perform() on %s", cmd);
ResponseGenerator respGen = cr.getResponseGenerator();
@@ -112,13 +119,37 @@ private Response handleInternal(final CacheRpcCommand cmd, final ComponentRegist
}
}
- private Response handleWithWaitForBlocks(final CacheRpcCommand cmd, final ComponentRegistry cr) throws Throwable {
+ private void handleWithWaitForBlocks(final CacheRpcCommand cmd, final ComponentRegistry cr, final org.jgroups.blocks.Response response, boolean preserveOrder) throws Throwable {
StateTransferManager stm = cr.getStateTransferManager();
// We must have completed the join before handling commands
// (even if we didn't complete the initial state transfer)
- if (!stm.isJoinComplete())
- return null;
+ if (!stm.isJoinComplete()) {
+ reply(response, null);
+ return;
+ }
+
+ CommandsFactory commandsFactory = cr.getCommandsFactory();
+ // initialize this command with components specific to the intended cache instance
+ commandsFactory.initializeReplicableCommand(cmd, true);
+
+ if (!preserveOrder && cmd.canBlock()) {
+ remoteCommandsExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ Response resp;
+ try {
+ resp = handleInternal(cmd, cr);
+ } catch (Throwable throwable) {
+ log.warnf(throwable, "Problems invoking command %s", cmd);
+ resp = new ExceptionResponse(new CacheException("Problems invoking command.", throwable));
+ }
+ //the ResponseGenerated is null in this case because the return value is a Response
+ reply(response, resp);
+ }
+ });
+ return;
+ }
Response resp = handleInternal(cmd, cr);
// A null response is valid and OK ...
@@ -126,8 +157,13 @@ private Response handleWithWaitForBlocks(final CacheRpcCommand cmd, final Compon
// invalid response
log.tracef("Unable to execute command, got invalid response %s", resp);
}
-
- return resp;
+ reply(response, resp);
+ }
+
+ private void reply(org.jgroups.blocks.Response response, Object retVal) {
+ if (response != null) {
+ response.send(retVal, false);
+ }
}
}
View
86 core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
@@ -78,11 +78,13 @@
* A JGroups RPC dispatcher that knows how to deal with {@link ReplicableCommand}s.
*
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @author Pedro Ruivo
* @since 4.0
*/
public class CommandAwareRpcDispatcher extends RpcDispatcher {
private final ExecutorService asyncExecutor;
+ private final ExecutorService remoteCommandsExecutor;
private final InboundInvocationHandler inboundInvocationHandler;
private static final Log log = LogFactory.getLog(CommandAwareRpcDispatcher.class);
private static final boolean trace = log.isTraceEnabled();
@@ -94,10 +96,12 @@
public CommandAwareRpcDispatcher(Channel channel,
JGroupsTransport transport,
ExecutorService asyncExecutor,
+ ExecutorService remoteCommandsExecutor,
InboundInvocationHandler inboundInvocationHandler,
GlobalComponentRegistry gcr, BackupReceiverRepository backupReceiverRepository) {
this.server_obj = transport;
this.asyncExecutor = asyncExecutor;
+ this.remoteCommandsExecutor = remoteCommandsExecutor;
this.inboundInvocationHandler = inboundInvocationHandler;
this.transport = transport;
this.gcr = gcr;
@@ -114,6 +118,7 @@ public CommandAwareRpcDispatcher(Channel channel,
mux.setDefaultHandler(this.prot_adapter);
}
channel.addChannelListener(this);
+ asyncDispatching(true);
}
private boolean isValid(Message req) {
@@ -208,49 +213,95 @@ private boolean containsOnlyNulls(RspList<Object> l) {
* Message contains a Command. Execute it against *this* object and return result.
*/
@Override
- public Object handle(Message req) {
+ public void handle(Message req, org.jgroups.blocks.Response response) throws Exception {
if (isValid(req)) {
+ boolean preserveOrder = !req.isFlagSet(Message.Flag.OOB);
ReplicableCommand cmd = null;
try {
cmd = (ReplicableCommand) req_marshaller.objectFromBuffer(req.getRawBuffer(), req.getOffset(), req.getLength());
if (cmd == null) throw new NullPointerException("Unable to execute a null command! Message was " + req);
if (req.getSrc() instanceof SiteAddress) {
- return executeCommandFromRemoteSite(cmd, (SiteAddress)req.getSrc());
+ executeCommandFromRemoteSite(cmd, (SiteAddress) req.getSrc(), response, preserveOrder);
} else {
- return executeCommandFromLocalCluster(cmd, req);
+ executeCommandFromLocalCluster(cmd, req, response, preserveOrder);
}
} catch (InterruptedException e) {
log.warnf("Shutdown while handling command %s", cmd);
- return new ExceptionResponse(new CacheException("Cache is shutting down"));
+ reply(response, new ExceptionResponse(new CacheException("Cache is shutting down")));
} catch (Throwable x) {
if (cmd == null)
log.warnf(x, "Problems unmarshalling remote command from byte buffer");
else
log.warnf(x, "Problems invoking command %s", cmd);
- return new ExceptionResponse(new CacheException("Problems invoking command.", x));
+ reply(response, new ExceptionResponse(new CacheException("Problems invoking command.", x)));
}
} else {
- return null;
+ reply(response, null);
}
}
- private Object executeCommandFromRemoteSite(ReplicableCommand cmd, SiteAddress src) throws Throwable {
+ private void executeCommandFromRemoteSite(final ReplicableCommand cmd, final SiteAddress src, final org.jgroups.blocks.Response response, boolean preserveOrder) throws Throwable {
if (! (cmd instanceof SingleRpcCommand)) {
throw new IllegalStateException("Only CacheRpcCommand commands expected as a result of xsite calls but got " + cmd.getClass().getName());
}
- return backupReceiverRepository.handleRemoteCommand((SingleRpcCommand) cmd, src);
+
+ if (preserveOrder) {
+ reply(response, backupReceiverRepository.handleRemoteCommand((SingleRpcCommand) cmd, src));
+ return;
+ }
+
+ //the remote site commands may need to be forwarded to the appropriate owners
+ remoteCommandsExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Object retVal = backupReceiverRepository.handleRemoteCommand((SingleRpcCommand) cmd, src);
+ reply(response, retVal);
+ } catch (InterruptedException e) {
+ log.warnf("Shutdown while handling command %s", cmd);
+ reply(response, new ExceptionResponse(new CacheException("Cache is shutting down")));
+ } catch (Throwable throwable) {
+ log.warnf(throwable, "Problems invoking command %s", cmd);
+ reply(response, new ExceptionResponse(new CacheException("Problems invoking command.", throwable)));
+ }
+ }
+ });
}
- private Object executeCommandFromLocalCluster(ReplicableCommand cmd, Message req) throws Throwable {
+ private void executeCommandFromLocalCluster(final ReplicableCommand cmd, final Message req, final org.jgroups.blocks.Response response, boolean preserveOrder) throws Throwable {
if (cmd instanceof CacheRpcCommand) {
if (trace) log.tracef("Attempting to execute command: %s [sender=%s]", cmd, req.getSrc());
- return inboundInvocationHandler.handle((CacheRpcCommand) cmd, fromJGroupsAddress(req.getSrc()));
+ inboundInvocationHandler.handle((CacheRpcCommand) cmd, fromJGroupsAddress(req.getSrc()), response, preserveOrder);
} else {
- if (trace) log.tracef("Attempting to execute non-CacheRpcCommand command: %s [sender=%s]", cmd, req.getSrc());
- gcr.wireDependencies(cmd);
+ if (!preserveOrder && cmd.canBlock()) {
+ remoteCommandsExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ if (trace)
+ log.tracef("Attempting to execute non-CacheRpcCommand command: %s [sender=%s]", cmd, req.getSrc());
+ gcr.wireDependencies(cmd);
+
+ //todo [anistor] the call to perform() should be wrapped in try/catch and any exception should be wrapped in an ExceptionResponse, as it happens for commands that go through InboundInvocationHandler
+ Object retVal = cmd.perform(null); //todo [anistor] here we should provide an InvocationContext that at least is able to provide the Address of the origin
+ reply(response, retVal);
+ } catch (InterruptedException e) {
+ log.warnf("Shutdown while handling command %s", cmd);
+ reply(response, new ExceptionResponse(new CacheException("Cache is shutting down")));
+ } catch (Throwable throwable) {
+ log.warnf(throwable, "Problems invoking command %s", cmd);
+ reply(response, new ExceptionResponse(new CacheException("Problems invoking command.", throwable)));
+ }
+ }
+ });
+ } else {
+ if (trace) log.tracef("Attempting to execute non-CacheRpcCommand command: %s [sender=%s]", cmd, req.getSrc());
+ gcr.wireDependencies(cmd);
- //todo [anistor] the call to perform() should be wrapped in try/catch and any exception should be wrapped in an ExceptionResponse, as it happens for commands that go through InboundInvocationHandler
- return cmd.perform(null); //todo [anistor] here we should provide an InvocationContext that at least is able to provide the Address of the origin
+ //todo [anistor] the call to perform() should be wrapped in try/catch and any exception should be wrapped in an ExceptionResponse, as it happens for commands that go through InboundInvocationHandler
+ Object retVal = cmd.perform(null); //todo [anistor] here we should provide an InvocationContext that at least is able to provide the Address of the origin
+ reply(response, retVal);
+ }
}
}
@@ -258,6 +309,13 @@ private Object executeCommandFromLocalCluster(ReplicableCommand cmd, Message req
public String toString() {
return getClass().getSimpleName() + "[Outgoing marshaller: " + req_marshaller + "; incoming marshaller: " + rsp_marshaller + "]";
}
+
+ private void reply(org.jgroups.blocks.Response response, Object retVal) {
+ if (response != null) {
+ //exceptionThrown is always false because the exceptions are wrapped in an ExceptionResponse
+ response.send(retVal, false);
+ }
+ }
protected static Message constructMessage(Buffer buf, Address recipient, boolean oob, ResponseMode mode, boolean rsvp) {
Message msg = new Message();
View
6 core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
@@ -82,6 +82,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import static org.infinispan.factories.KnownComponentNames.REMOTE_COMMAND_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.GLOBAL_MARSHALLER;
@@ -119,6 +120,7 @@
protected InboundInvocationHandler inboundInvocationHandler;
protected StreamingMarshaller marshaller;
protected ExecutorService asyncExecutor;
+ protected ExecutorService remoteCommandsExecutor;
protected CacheManagerNotifier notifier;
private GlobalComponentRegistry gcr;
private BackupReceiverRepository backupReceiverRepository;
@@ -177,10 +179,12 @@ public Log getLog() {
@Inject
public void initialize(@ComponentName(GLOBAL_MARSHALLER) StreamingMarshaller marshaller,
@ComponentName(ASYNC_TRANSPORT_EXECUTOR) ExecutorService asyncExecutor,
+ @ComponentName(REMOTE_COMMAND_EXECUTOR) ExecutorService remoteCommandsExecutor,
InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier,
GlobalComponentRegistry gcr, BackupReceiverRepository backupReceiverRepository) {
this.marshaller = marshaller;
this.asyncExecutor = asyncExecutor;
+ this.remoteCommandsExecutor = remoteCommandsExecutor;
this.inboundInvocationHandler = inboundInvocationHandler;
this.notifier = notifier;
this.gcr = gcr;
@@ -320,7 +324,7 @@ protected void initChannel() {
private void initChannelAndRPCDispatcher() throws CacheException {
initChannel();
- dispatcher = new CommandAwareRpcDispatcher(channel, this, asyncExecutor, inboundInvocationHandler, gcr, backupReceiverRepository);
+ dispatcher = new CommandAwareRpcDispatcher(channel, this, asyncExecutor, remoteCommandsExecutor, inboundInvocationHandler, gcr, backupReceiverRepository);
MarshallerAdapter adapter = new MarshallerAdapter(marshaller);
dispatcher.setRequestMarshaller(adapter);
dispatcher.setResponseMarshaller(adapter);
View
7 core/src/main/java/org/infinispan/statetransfer/StateRequestCommand.java
@@ -50,7 +50,7 @@
public static final byte COMMAND_ID = 15;
- private Type type;
+ private Type type = Type.CANCEL_STATE_TRANSFER; //default value for org.infinispan.remoting.AsynchronousInvocationTest
private int topologyId;
@@ -110,6 +110,11 @@ public boolean isReturnValueExpected() {
return type != Type.CANCEL_STATE_TRANSFER;
}
+ @Override
+ public boolean canBlock() {
+ return type == Type.GET_TRANSACTIONS || type == Type.START_STATE_TRANSFER;
+ }
+
public Type getType() {
return type;
}
View
5 core/src/main/java/org/infinispan/statetransfer/StateResponseCommand.java
@@ -96,6 +96,11 @@ public boolean isReturnValueExpected() {
}
@Override
+ public boolean canBlock() {
+ return true;
+ }
+
+ @Override
public byte getCommandId() {
return COMMAND_ID;
}
View
5 core/src/main/java/org/infinispan/topology/CacheTopologyControlCommand.java
@@ -247,4 +247,9 @@ public String toString() {
public boolean isReturnValueExpected() {
return true;
}
+
+ @Override
+ public boolean canBlock() {
+ return true;
+ }
}
View
5 core/src/main/java/org/infinispan/xsite/XSiteAdminCommand.java
@@ -128,6 +128,11 @@ public final boolean isReturnValueExpected() {
}
@Override
+ public boolean canBlock() {
+ return false;
+ }
+
+ @Override
public String toString() {
return "XSiteAdminCommand{" +
"siteName='" + siteName + '\'' +
View
7 core/src/main/resources/schema/infinispan-config-5.3.xsd
@@ -50,6 +50,13 @@
</xs:documentation>
</xs:annotation>
</xs:element>
+ <xs:element name="remoteCommandsExecutor" type="tns:executorFactory" minOccurs="0">
+ <xs:annotation>
+ <xs:documentation>
+ Configuration for the executor service used to execute remote commands. Use org.infinispan.executors.WithinThreadExecutorFactory to disable.
+ </xs:documentation>
+ </xs:annotation>
+ </xs:element>
<xs:element name="evictionScheduledExecutor" type="tns:scheduledExecutorFactory" minOccurs="0">
<xs:annotation>
<xs:documentation>
View
20 core/src/test/java/org/infinispan/configuration/XmlFileParsingTest.java
@@ -28,6 +28,7 @@
import static org.infinispan.test.TestingUtil.INFINISPAN_START_TAG_NO_SCHEMA;
import static org.infinispan.test.TestingUtil.withCacheManager;
import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
@@ -290,13 +291,28 @@ private void assertNamedCacheFile(EmbeddedCacheManager cm, boolean deprecated) {
assert gc.asyncListenerExecutor().factory() instanceof DefaultExecutorFactory;
assert gc.asyncListenerExecutor().properties().getProperty("maxThreads").equals("5");
+ if (!deprecated) {
+ assertEquals("10000", gc.asyncListenerExecutor().properties().getProperty("queueSize"));
+ }
assert gc.asyncListenerExecutor().properties().getProperty("threadNamePrefix").equals("AsyncListenerThread");
assert gc.asyncTransportExecutor().factory() instanceof DefaultExecutorFactory;
// Should be 25, but it's overriden by the test cache manager factory
- assertEquals("6", gc.asyncTransportExecutor().properties().getProperty("maxThreads"));
+ assertEquals(String.valueOf(TestCacheManagerFactory.MAX_ASYNC_EXEC_THREADS), gc.asyncTransportExecutor().properties().getProperty("maxThreads"));
+ if (!deprecated) {
+ assertEquals(String.valueOf(TestCacheManagerFactory.ASYNC_EXEC_QUEUE_SIZE), gc.asyncTransportExecutor().properties().getProperty("queueSize"));
+ }
assert gc.asyncTransportExecutor().properties().getProperty("threadNamePrefix").equals("AsyncSerializationThread");
+ if (!deprecated) {
+ assertTrue(gc.remoteCommandsExecutor().factory() instanceof DefaultExecutorFactory);
+ assertEquals(String.valueOf(TestCacheManagerFactory.MAX_REQ_EXEC_THREADS),
+ gc.remoteCommandsExecutor().properties().getProperty("maxThreads"));
+ assertEquals("RemoteCommandThread", gc.remoteCommandsExecutor().properties().getProperty("threadNamePrefix"));
+ assertEquals("2", gc.remoteCommandsExecutor().properties().getProperty("coreThreads"));
+ assertEquals(String.valueOf(TestCacheManagerFactory.KEEP_ALIVE), gc.remoteCommandsExecutor().properties().getProperty("keepAliveTime"));
+ }
+
assert gc.evictionScheduledExecutor().factory() instanceof DefaultScheduledExecutorFactory;
assert gc.evictionScheduledExecutor().properties().getProperty("threadNamePrefix").equals("EvictionThread");
@@ -485,4 +501,4 @@ private void assertReaperAndTimeoutInfo(Configuration defaultCfg) {
assertEquals(3123, defaultCfg.transaction().completedTxTimeout());
}
-}
+}
View
6 core/src/test/java/org/infinispan/distribution/rehash/OngoingTransactionsAndJoinTest.java
@@ -34,7 +34,6 @@
import org.infinispan.manager.CacheContainer;
import org.infinispan.remoting.InboundInvocationHandler;
import org.infinispan.remoting.InboundInvocationHandlerImpl;
-import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.jgroups.CommandAwareRpcDispatcher;
@@ -308,7 +307,7 @@ public ListeningHandler(InboundInvocationHandler delegate, CountDownLatch txsRea
}
@Override
- public Response handle(CacheRpcCommand cmd, Address origin) throws Throwable {
+ public void handle(CacheRpcCommand cmd, Address origin, org.jgroups.blocks.Response response, boolean preserveOrder) throws Throwable {
boolean notifyRehashStarted = false;
if (cmd instanceof CacheTopologyControlCommand) {
CacheTopologyControlCommand rcc = (CacheTopologyControlCommand) cmd;
@@ -325,9 +324,8 @@ public Response handle(CacheRpcCommand cmd, Address origin) throws Throwable {
}
}
- Response r = delegate.handle(cmd, origin);
+ delegate.handle(cmd, origin, response, preserveOrder);
if (notifyRehashStarted) rehashStarted.countDown();
- return r;
}
}
View
264 core/src/test/java/org/infinispan/remoting/AsynchronousInvocationTest.java
@@ -0,0 +1,264 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+
+package org.infinispan.remoting;
+
+import org.infinispan.Cache;
+import org.infinispan.commands.CommandsFactory;
+import org.infinispan.commands.ReplicableCommand;
+import org.infinispan.commands.read.GetKeyValueCommand;
+import org.infinispan.commands.read.ReduceCommand;
+import org.infinispan.commands.remote.ClusteredGetCommand;
+import org.infinispan.commands.remote.MultipleRpcCommand;
+import org.infinispan.commands.remote.SingleRpcCommand;
+import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.context.Flag;
+import org.infinispan.executors.ExecutorFactory;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.Transport;
+import org.infinispan.remoting.transport.jgroups.CommandAwareRpcDispatcher;
+import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
+import org.infinispan.test.AbstractInfinispanTest;
+import org.infinispan.topology.CacheTopologyControlCommand;
+import org.infinispan.util.InfinispanCollections;
+import org.jgroups.Address;
+import org.jgroups.Message;
+import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.util.Buffer;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.infinispan.test.TestingUtil.extractCommandsFactory;
+import static org.infinispan.test.TestingUtil.extractGlobalComponent;
+import static org.infinispan.test.fwk.TestCacheManagerFactory.createClusteredCacheManager;
+
+/**
+ * Tests the Asynchronous Invocation API and checks if the commands are correctly processed (or JGroups or Infinispan
+ * thread pool)
+ *
+ * @author Pedro Ruivo
+ * @since 5.3
+ */
+@Test(groups = "functional", testName = "remoting.AsynchronousInvocationTest")
+public class AsynchronousInvocationTest extends AbstractInfinispanTest {
+
+ private EmbeddedCacheManager cacheManager;
+ private DummyTaskCountExecutorService executorService;
+ private CommandAwareRpcDispatcher commandAwareRpcDispatcher;
+ private Address address;
+ private RpcDispatcher.Marshaller marshaller;
+ private CommandsFactory commandsFactory;
+ private ReplicableCommand blockingCacheRpcCommand;
+ private ReplicableCommand nonBlockingCacheRpcCommand;
+ private ReplicableCommand blockingNonCacheRpcCommand;
+ private ReplicableCommand nonBlockingNonCacheRpcCommand;
+ private ReplicableCommand blockingSingleRpcCommand;
+ private ReplicableCommand nonBlockingSingleRpcCommand;
+ private ReplicableCommand blockingMultipleRpcCommand;
+ private ReplicableCommand blockingMultipleRpcCommand2;
+ private ReplicableCommand nonBlockingMultipleRpcCommand;
+
+ @BeforeClass(alwaysRun = true)
+ public void setUp() {
+ GlobalConfigurationBuilder globalConfigurationBuilder = GlobalConfigurationBuilder.defaultClusteredBuilder();
+ ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
+ DummyExecutorFactory factory = new DummyExecutorFactory();
+
+ globalConfigurationBuilder.remoteCommandsExecutor().factory(factory);
+ configurationBuilder.clustering().cacheMode(CacheMode.DIST_SYNC);
+
+ cacheManager = createClusteredCacheManager(globalConfigurationBuilder, configurationBuilder);
+ Cache<Object, Object> cache = cacheManager.getCache();
+ String cacheName = cache.getName();
+ Transport transport = extractGlobalComponent(cacheManager, Transport.class);
+ if (transport instanceof JGroupsTransport) {
+ commandAwareRpcDispatcher = ((JGroupsTransport) transport).getCommandAwareRpcDispatcher();
+ address = ((JGroupsTransport) transport).getChannel().getAddress();
+ marshaller = commandAwareRpcDispatcher.getMarshaller();
+ } else {
+ Assert.fail("Expected a JGroups Transport");
+ }
+ commandsFactory = extractCommandsFactory(cache);
+ executorService = factory.getExecutorService();
+
+ GetKeyValueCommand getKeyValueCommand = new GetKeyValueCommand("key", InfinispanCollections.<Flag>emptySet());
+ PutKeyValueCommand putKeyValueCommand = new PutKeyValueCommand("key", "value", false, null, -1, -1, InfinispanCollections.<Flag>emptySet());
+
+ //populate commands
+ blockingCacheRpcCommand = new ReduceCommand<Object, Object>(cacheName);
+ nonBlockingCacheRpcCommand = new ClusteredGetCommand(cacheName);
+ blockingNonCacheRpcCommand = new CacheTopologyControlCommand();
+ //the GetKeyValueCommand is not replicated, but I only need a command that returns false in canBlock()
+ nonBlockingNonCacheRpcCommand = new GetKeyValueCommand("key", InfinispanCollections.<Flag>emptySet());
+ blockingSingleRpcCommand = new SingleRpcCommand(cacheName, putKeyValueCommand);
+ nonBlockingSingleRpcCommand = new SingleRpcCommand(cacheName, getKeyValueCommand);
+ blockingMultipleRpcCommand = new MultipleRpcCommand(Arrays.<ReplicableCommand>asList(putKeyValueCommand, putKeyValueCommand), cacheName);
+ blockingMultipleRpcCommand2 = new MultipleRpcCommand(Arrays.<ReplicableCommand>asList(putKeyValueCommand, getKeyValueCommand), cacheName);
+ nonBlockingMultipleRpcCommand = new MultipleRpcCommand(Arrays.<ReplicableCommand>asList(getKeyValueCommand, getKeyValueCommand), cacheName);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ if (cacheManager != null) {
+ cacheManager.stop();
+ }
+ }
+
+ public void testCommands() {
+ //if some of these tests fails, we need to pick another command to make the assertions true
+ Assert.assertTrue(blockingCacheRpcCommand.canBlock());
+ Assert.assertTrue(blockingNonCacheRpcCommand.canBlock());
+ Assert.assertTrue(blockingSingleRpcCommand.canBlock());
+ Assert.assertTrue(blockingMultipleRpcCommand.canBlock());
+ Assert.assertTrue(blockingMultipleRpcCommand2.canBlock());
+
+ Assert.assertFalse(nonBlockingCacheRpcCommand.canBlock());
+ Assert.assertFalse(nonBlockingNonCacheRpcCommand.canBlock());
+ Assert.assertFalse(nonBlockingSingleRpcCommand.canBlock());
+ Assert.assertFalse(nonBlockingMultipleRpcCommand.canBlock());
+ }
+
+ public void testCacheRpcCommands() throws Exception {
+ assertDispatchForCommand(blockingCacheRpcCommand, true);
+ assertDispatchForCommand(nonBlockingCacheRpcCommand, false);
+ }
+
+ public void testSingleRpcCommand() throws Exception {
+ assertDispatchForCommand(blockingSingleRpcCommand, true);
+ assertDispatchForCommand(nonBlockingSingleRpcCommand, false);
+ }
+
+ public void testMultipleRpcCommand() throws Exception {
+ assertDispatchForCommand(blockingMultipleRpcCommand, true);
+ assertDispatchForCommand(blockingMultipleRpcCommand2, true);
+ assertDispatchForCommand(nonBlockingMultipleRpcCommand, false);
+ }
+
+ public void testNonCacheRpcCommands() throws Exception {
+ assertDispatchForCommand(blockingNonCacheRpcCommand, true);
+ assertDispatchForCommand(nonBlockingNonCacheRpcCommand, false);
+ }
+
+ private void assertDispatchForCommand(ReplicableCommand command, boolean expected) throws Exception {
+ log.debugf("Testing " + command.getClass().getCanonicalName());
+ commandsFactory.initializeReplicableCommand(command, true);
+ Message oobRequest = serialize(command, true, address);
+ if (oobRequest == null) {
+ log.debugf("Don't test " + command.getClass() + ". it is not Serializable");
+ return;
+ }
+ executorService.reset();
+ commandAwareRpcDispatcher.handle(oobRequest, null);
+ Assert.assertEquals(executorService.hasExecutedCommand, expected,
+ "Command " + command.getClass() + " dispatched wrongly.");
+
+ Message nonOobRequest = serialize(command, false, address);
+ if (nonOobRequest == null) {
+ log.debugf("Don't test " + command.getClass() + ". it is not Serializable");
+ return;
+ }
+ executorService.reset();
+ commandAwareRpcDispatcher.handle(nonOobRequest, null);
+ Assert.assertFalse(executorService.hasExecutedCommand, "Command " + command.getClass() + " dispatched wrongly.");
+ }
+
+ private Message serialize(ReplicableCommand command, boolean oob, Address from) {
+ Buffer buffer;
+ try {
+ buffer = marshaller.objectToBuffer(command);
+ } catch (Exception e) {
+ //ignore, it will not be replicated
+ return null;
+ }
+ Message message = new Message(null, from, buffer.getBuf(), buffer.getOffset(), buffer.getLength());
+ if (oob) {
+ message.setFlag(Message.Flag.OOB);
+ }
+ return message;
+ }
+
+ private class DummyExecutorFactory implements ExecutorFactory {
+
+ private final DummyTaskCountExecutorService executorService;
+
+ private DummyExecutorFactory() {
+ executorService = new DummyTaskCountExecutorService();
+ }
+
+ @Override
+ public ExecutorService getExecutor(Properties p) {
+ return executorService;
+ }
+
+ public DummyTaskCountExecutorService getExecutorService() {
+ return executorService;
+ }
+ }
+
+ private class DummyTaskCountExecutorService extends AbstractExecutorService {
+
+ private volatile boolean hasExecutedCommand;
+
+ @Override
+ public void execute(Runnable command) {
+ hasExecutedCommand = true;
+ }
+
+ public void reset() {
+ hasExecutedCommand = false;
+ }
+
+ @Override
+ public void shutdown() {
+ //no-op
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return InfinispanCollections.emptyList(); //no-op
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return false; //no-op
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return false; //no-op
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return false; //no-op
+ }
+ }
+}
View
19 core/src/test/java/org/infinispan/test/fwk/TestCacheManagerFactory.java
@@ -45,7 +45,9 @@
import org.infinispan.config.GlobalConfiguration;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.cache.LegacyConfigurationAdaptor;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.configuration.global.LegacyGlobalConfigurationAdaptor;
import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
import org.infinispan.configuration.parsing.ConfigurationParser;
import org.infinispan.configuration.parsing.Namespace;
@@ -75,7 +77,11 @@
*/
public class TestCacheManagerFactory {
- private static final int MAX_ASYNC_EXEC_THREADS = 6;
+ public static final int MAX_ASYNC_EXEC_THREADS = 6;
+ public static final int ASYNC_EXEC_QUEUE_SIZE = 10000;
+ public static final int MAX_REQ_EXEC_THREADS = 6;
+ public static final int REQ_EXEC_QUEUE_SIZE = 0;
+ public static final int KEEP_ALIVE = 30000;
public static final String MARSHALLER = LegacyKeySupportSystemProperties.getProperty("infinispan.test.marshaller.class", "infinispan.marshaller.class");
private static final Log log = LogFactory.getLog(TestCacheManagerFactory.class);
@@ -535,7 +541,12 @@ public static void minimizeThreads(GlobalConfiguration gc) {
}
public static void minimizeThreads(GlobalConfigurationBuilder builder) {
- builder.asyncTransportExecutor().addProperty("maxThreads", String.valueOf(MAX_ASYNC_EXEC_THREADS));
+ builder.asyncTransportExecutor().addProperty("maxThreads", String.valueOf(MAX_ASYNC_EXEC_THREADS))
+ .addProperty("queueSize", String.valueOf(ASYNC_EXEC_QUEUE_SIZE))
+ .addProperty("keepAliveTime", String.valueOf(KEEP_ALIVE));
+ builder.remoteCommandsExecutor().addProperty("maxThreads", String.valueOf(MAX_REQ_EXEC_THREADS))
+ .addProperty("queueSize", String.valueOf(REQ_EXEC_QUEUE_SIZE))
+ .addProperty("keepAliveTime", String.valueOf(KEEP_ALIVE));
}
public static void amendMarshaller(GlobalConfiguration configuration) {
@@ -566,7 +577,9 @@ public static void amendMarshaller(GlobalConfigurationBuilder builder) {
}
private static DefaultCacheManager newDefaultCacheManager(boolean start, GlobalConfiguration gc, Configuration c) {
- DefaultCacheManager defaultCacheManager = new DefaultCacheManager(gc, c, start);
+ GlobalConfigurationBuilder builder = new GlobalConfigurationBuilder().read(LegacyGlobalConfigurationAdaptor.adapt(gc));
+ minimizeThreads(builder);
+ DefaultCacheManager defaultCacheManager = new DefaultCacheManager(builder.build(), LegacyConfigurationAdaptor.adapt(c), start);
return addThreadCacheManager(defaultCacheManager);
}
View
5 core/src/test/resources/configs/all.xml
@@ -37,6 +37,11 @@
<property name="foo" value="bar"/>
</properties>
</asyncTransportExecutor>
+ <remoteCommandsExecutor factory="com.acme.Factory">
+ <properties>
+ <property name="foo" value="bar"/>
+ </properties>
+ </remoteCommandsExecutor>
<evictionScheduledExecutor factory="com.acme.Factory">
<properties>
<property name="foo" value="bar"/>
View
11 core/src/test/resources/configs/named-cache-test.xml
@@ -31,6 +31,7 @@
<asyncListenerExecutor factory="org.infinispan.executors.DefaultExecutorFactory">
<properties>
<property name="maxThreads" value="5"/>
+ <property name="queueSize" value="10000"/>
<property name="threadNamePrefix" value="AsyncListenerThread"/>
</properties>
</asyncListenerExecutor>
@@ -38,10 +39,20 @@
<asyncTransportExecutor factory="org.infinispan.executors.DefaultExecutorFactory">
<properties>
<property name="maxThreads" value="25"/>
+ <property name="queueSize" value="10000"/>
<property name="threadNamePrefix" value="AsyncSerializationThread"/>
</properties>
</asyncTransportExecutor>
+ <remoteCommandsExecutor factory="org.infinispan.executors.DefaultExecutorFactory">
+ <properties>
+ <property name="maxThreads" value="30"/>
+ <property name="coreThreads" value="2"/>
+ <property name="keepAliveTime" value="10000"/>
+ <property name="threadNamePrefix" value="RemoteCommandThread"/>
+ </properties>
+ </remoteCommandsExecutor>
+
<evictionScheduledExecutor factory="org.infinispan.executors.DefaultScheduledExecutorFactory">
<properties>
<property name="threadNamePrefix" value="EvictionThread"/>
View
5 query/src/main/java/org/infinispan/query/clustered/ClusteredQueryCommand.java
@@ -191,4 +191,9 @@ public boolean equals(Object obj) {
public boolean isReturnValueExpected() {
return true;
}
+
+ @Override
+ public boolean canBlock() {
+ return true;
+ }
}
View
5 query/src/main/java/org/infinispan/query/indexmanager/IndexUpdateCommand.java
@@ -111,6 +111,11 @@ public boolean isReturnValueExpected() {
return false;
}
+ @Override
+ public boolean canBlock() {
+ return true;
+ }
+
/**
* This is invoked only on the receiving node, before {@link #perform(InvocationContext)}
*/
View
35 spring/src/main/java/org/infinispan/spring/AbstractEmbeddedCacheManagerFactory.java
@@ -212,6 +212,14 @@ public void setAsyncTransportExecutorFactoryClass(final String asyncTransportExe
}
/**
+ * @param remoteCommandsExecutorFactoryClass
+ * @see org.infinispan.config.GlobalConfiguration#setRemoteCommandsExecutorFactoryClass(java.lang.String)
+ */
+ public void setRemoteCommandsExecutorFactoryClass(final String remoteCommandsExecutorFactoryClass) {
+ this.globalConfigurationOverrides.remoteCommandsExecutorFactoryClass = remoteCommandsExecutorFactoryClass;
+ }
+
+ /**
* @param evictionScheduledExecutorFactoryClass
* @see org.infinispan.config.GlobalConfiguration#setEvictionScheduledExecutorFactoryClass(java.lang.String)
*/
@@ -318,6 +326,14 @@ public void setAsyncTransportExecutorProperties(final Properties asyncTransportE
}
/**
+ * @param remoteCommandsExecutorProperties
+ * @see org.infinispan.config.GlobalConfiguration#setRemoteCommandsExecutorProperties(java.util.Properties)
+ */
+ public void setRemoteCommandsExecutorProperties(final Properties remoteCommandsExecutorProperties) {
+ this.globalConfigurationOverrides.remoteCommandsExecutorProperties = remoteCommandsExecutorProperties;
+ }
+
+ /**
* @param evictionScheduledExecutorProperties
* @see org.infinispan.config.GlobalConfiguration#setEvictionScheduledExecutorProperties(java.util.Properties)
*/
@@ -810,6 +826,8 @@ public void setCustomInterceptors(final List<CustomInterceptorConfig> customInte
private String asyncTransportExecutorFactoryClass;
+ private String remoteCommandsExecutorFactoryClass;
+
private String evictionScheduledExecutorFactoryClass;
private String replicationQueueScheduledExecutorFactoryClass;
@@ -824,6 +842,8 @@ public void setCustomInterceptors(final List<CustomInterceptorConfig> customInte
private Properties asyncTransportExecutorProperties;
+ private Properties remoteCommandsExecutorProperties;
+
private Properties evictionScheduledExecutorProperties;
private Properties replicationQueueScheduledExecutorProperties;
@@ -927,6 +947,14 @@ public void applyOverridesTo(final GlobalConfigurationBuilder globalConfiguratio
Util.<ExecutorFactory>getInstance(this.asyncTransportExecutorFactoryClass,
Thread.currentThread().getContextClassLoader()));
}
+ if (this.remoteCommandsExecutorFactoryClass != null) {
+ this.logger
+ .debug("Overriding property [remoteCommandsExecutorFactoryClass] with new value ["
+ + this.remoteCommandsExecutorFactoryClass + "]");
+ globalConfigurationToOverride.remoteCommandsExecutor().factory(
+ Util.<ExecutorFactory>getInstance(this.remoteCommandsExecutorFactoryClass,
+ Thread.currentThread().getContextClassLoader()));
+ }
if (this.evictionScheduledExecutorFactoryClass != null) {
this.logger
.debug("Overriding property [evictionScheduledExecutorFactoryClass] with new value ["
@@ -975,6 +1003,13 @@ public void applyOverridesTo(final GlobalConfigurationBuilder globalConfiguratio
globalConfigurationToOverride.asyncTransportExecutor().withProperties(
this.asyncTransportExecutorProperties);
}
+ if (this.remoteCommandsExecutorProperties != null) {
+ this.logger
+ .debug("Overriding property [remoteCommandsExecutorProperties] with new value ["
+ + this.remoteCommandsExecutorProperties + "]");
+ globalConfigurationToOverride.remoteCommandsExecutor().withProperties(
+ this.remoteCommandsExecutorProperties);
+ }
if (this.evictionScheduledExecutorProperties != null) {
this.logger
.debug("Overriding property [evictionScheduledExecutorProperties] with new value ["
Please sign in to comment.
Something went wrong with that request. Please try again.