Skip to content

Commit

Permalink
ISPN-6491 Remove total order thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
pruivo authored and danberindei committed Jun 3, 2016
1 parent 25ee0ac commit c8d3df0
Show file tree
Hide file tree
Showing 14 changed files with 21 additions and 91 deletions.
Expand Up @@ -232,16 +232,6 @@ public String toString() {
'}';
}

/**
* @deprecated This method always returns null now.
* Look for a thread pool named as
* {@link TransportConfiguration#totalOrderThreadPool()} instead.
*/
@Deprecated
public ExecutorFactoryConfiguration totalOrderExecutor() {
return null;
}

public boolean isClustered() {
return transport().transport() != null;
}
Expand Down
Expand Up @@ -55,14 +55,12 @@ static AttributeSet attributeSet() {
private final AttributeSet attributes;
private final ThreadPoolConfiguration transportThreadPool;
private final ThreadPoolConfiguration remoteCommandThreadPool;
private final ThreadPoolConfiguration totalOrderThreadPool;

TransportConfiguration(AttributeSet attributes, ThreadPoolConfiguration transportThreadPool,
ThreadPoolConfiguration remoteCommandThreadPool, ThreadPoolConfiguration totalOrderThreadPool) {
ThreadPoolConfiguration remoteCommandThreadPool) {
this.attributes = attributes.checkProtection();
this.transportThreadPool = transportThreadPool;
this.remoteCommandThreadPool = remoteCommandThreadPool;
this.totalOrderThreadPool = totalOrderThreadPool;
clusterName = attributes.attribute(CLUSTER_NAME);
machineId = attributes.attribute(MACHINE_ID);
rackId = attributes.attribute(RACK_ID);
Expand Down Expand Up @@ -135,19 +133,14 @@ public ThreadPoolConfiguration remoteCommandThreadPool() {
return remoteCommandThreadPool;
}

public ThreadPoolConfiguration totalOrderThreadPool() {
return totalOrderThreadPool;
}

public AttributeSet attributes() {
return attributes;
}

@Override
public String toString() {
return "TransportConfiguration [attributes=" + attributes + ", transportThreadPool=" + transportThreadPool
+ ", remoteCommandThreadPool=" + remoteCommandThreadPool + ", totalOrderThreadPool=" + totalOrderThreadPool
+ "]";
+ ", remoteCommandThreadPool=" + remoteCommandThreadPool + "]";
}

@Override
Expand All @@ -156,7 +149,6 @@ public int hashCode() {
int result = 1;
result = prime * result + ((attributes == null) ? 0 : attributes.hashCode());
result = prime * result + ((remoteCommandThreadPool == null) ? 0 : remoteCommandThreadPool.hashCode());
result = prime * result + ((totalOrderThreadPool == null) ? 0 : totalOrderThreadPool.hashCode());
result = prime * result + ((transportThreadPool == null) ? 0 : transportThreadPool.hashCode());
return result;
}
Expand All @@ -180,11 +172,6 @@ public boolean equals(Object obj) {
return false;
} else if (!remoteCommandThreadPool.equals(other.remoteCommandThreadPool))
return false;
if (totalOrderThreadPool == null) {
if (other.totalOrderThreadPool != null)
return false;
} else if (!totalOrderThreadPool.equals(other.totalOrderThreadPool))
return false;
if (transportThreadPool == null) {
if (other.transportThreadPool != null)
return false;
Expand Down
Expand Up @@ -27,16 +27,13 @@ public class TransportConfigurationBuilder extends AbstractGlobalConfigurationBu

private final ThreadPoolConfigurationBuilder transportThreadPool;
private final ThreadPoolConfigurationBuilder remoteCommandThreadPool;
@Deprecated
private final ThreadPoolConfigurationBuilder totalOrderThreadPool;
private final AttributeSet attributes;

TransportConfigurationBuilder(GlobalConfigurationBuilder globalConfig) {
super(globalConfig);
attributes = TransportConfiguration.attributeSet();
transportThreadPool = new ThreadPoolConfigurationBuilder(globalConfig);
remoteCommandThreadPool = new ThreadPoolConfigurationBuilder(globalConfig);
totalOrderThreadPool = new ThreadPoolConfigurationBuilder(globalConfig);
}

/**
Expand Down Expand Up @@ -197,18 +194,10 @@ public ThreadPoolConfigurationBuilder remoteCommandThreadPool() {
return remoteCommandThreadPool;
}

@Deprecated
public ThreadPoolConfigurationBuilder totalOrderThreadPool() {
return totalOrderThreadPool;
}

@Override
public
void validate() {
for (Builder<?> validatable : asList(transportThreadPool,
remoteCommandThreadPool, totalOrderThreadPool)) {
validatable.validate();
}
asList(transportThreadPool, remoteCommandThreadPool).forEach(Builder::validate);
if(attributes.attribute(CLUSTER_NAME).get() == null){
throw new CacheConfigurationException("Transport clusterName cannot be null");
}
Expand All @@ -217,7 +206,7 @@ void validate() {
@Override
public
TransportConfiguration create() {
return new TransportConfiguration(attributes.protect(), transportThreadPool.create(), remoteCommandThreadPool.create(), totalOrderThreadPool.create());
return new TransportConfiguration(attributes.protect(), transportThreadPool.create(), remoteCommandThreadPool.create());
}

public TransportConfigurationBuilder defaultTransport() {
Expand All @@ -231,7 +220,6 @@ public TransportConfigurationBuilder defaultTransport() {
TransportConfigurationBuilder read(TransportConfiguration template) {
attributes.read(template.attributes());
this.remoteCommandThreadPool.read(template.remoteCommandThreadPool());
this.totalOrderThreadPool.read(template.totalOrderThreadPool());
this.transportThreadPool.read(template.transportThreadPool());
if (template.transport() != null) {
Transport transport = Util.getInstance(template.transport().getClass().getName(), template.transport().getClass().getClassLoader());
Expand All @@ -248,7 +236,7 @@ public Transport getTransport() {
@Override
public String toString() {
return "TransportConfigurationBuilder [transportThreadPool=" + transportThreadPool + ", remoteCommandThreadPool="
+ remoteCommandThreadPool + ", totalOrderThreadPool=" + totalOrderThreadPool + ", attributes=" + attributes
+ remoteCommandThreadPool + ", attributes=" + attributes
+ "]";
}

Expand Down
Expand Up @@ -842,9 +842,11 @@ private void parseTransport(XMLExtendedStreamReader reader, ConfigurationBuilder
break;
}
case TOTAL_ORDER_EXECUTOR: {
globalBuilder.transport().totalOrderThreadPool().read(
createThreadPoolConfiguration(value, TOTAL_ORDER_EXECUTOR));
break;
if (reader.getSchema().since(9, 0)) {
throw ParseUtils.unexpectedAttribute(reader, attribute.getLocalName());
} else {
log.ignoredAttribute("total order executor", "9.0", attribute.getLocalName(), reader.getLocation().getLineNumber());
}
}
case REMOTE_COMMAND_EXECUTOR: {
globalBuilder.transport().remoteCommandThreadPool().read(
Expand Down
Expand Up @@ -29,7 +29,6 @@ public class KnownComponentNames {
public static final String GLOBAL_MARSHALLER = "org.infinispan.marshaller.global";
public static final String CACHE_MARSHALLER = "org.infinispan.marshaller.cache";
public static final String CLASS_LOADER = "java.lang.ClassLoader";
public static final String TOTAL_ORDER_EXECUTOR = "org.infinispan.executors.totalOrderExecutor";
public static final String STATE_TRANSFER_EXECUTOR = "org.infinispan.executors.stateTransferExecutor";
public static final String TRANSACTION_VERSION_GENERATOR = "org.infinispan.transaction.versionGenerator";
public static final String ASYNC_OPERATIONS_EXECUTOR = "org.infinispan.executors.async";
Expand All @@ -41,7 +40,7 @@ public class KnownComponentNames {
ASYNC_TRANSPORT_EXECUTOR, ASYNC_NOTIFICATION_EXECUTOR, PERSISTENCE_EXECUTOR, ASYNC_OPERATIONS_EXECUTOR,
EXPIRATION_SCHEDULED_EXECUTOR,
MODULE_COMMAND_INITIALIZERS, MODULE_COMMAND_FACTORIES, GLOBAL_MARSHALLER, CACHE_MARSHALLER, CLASS_LOADER,
REMOTE_COMMAND_EXECUTOR, TOTAL_ORDER_EXECUTOR, STATE_TRANSFER_EXECUTOR, TRANSACTION_VERSION_GENERATOR,
REMOTE_COMMAND_EXECUTOR, STATE_TRANSFER_EXECUTOR, TRANSACTION_VERSION_GENERATOR,
TIMEOUT_SCHEDULE_EXECUTOR
);

Expand All @@ -57,7 +56,6 @@ public class KnownComponentNames {
DEFAULT_THREAD_COUNT.put(EXPIRATION_SCHEDULED_EXECUTOR, 1);
DEFAULT_THREAD_COUNT.put(PERSISTENCE_EXECUTOR, 4);
DEFAULT_THREAD_COUNT.put(REMOTE_COMMAND_EXECUTOR, 200);
DEFAULT_THREAD_COUNT.put(TOTAL_ORDER_EXECUTOR, 32);
DEFAULT_THREAD_COUNT.put(STATE_TRANSFER_EXECUTOR, 60);
DEFAULT_THREAD_COUNT.put(ASYNC_OPERATIONS_EXECUTOR, 25);

Expand All @@ -66,7 +64,6 @@ public class KnownComponentNames {
DEFAULT_QUEUE_SIZE.put(EXPIRATION_SCHEDULED_EXECUTOR, 0);
DEFAULT_QUEUE_SIZE.put(PERSISTENCE_EXECUTOR, 0);
DEFAULT_QUEUE_SIZE.put(REMOTE_COMMAND_EXECUTOR, 0);
DEFAULT_QUEUE_SIZE.put(TOTAL_ORDER_EXECUTOR, 0);
DEFAULT_QUEUE_SIZE.put(STATE_TRANSFER_EXECUTOR, 0);
DEFAULT_QUEUE_SIZE.put(ASYNC_OPERATIONS_EXECUTOR, 1000);

Expand All @@ -75,7 +72,6 @@ public class KnownComponentNames {
DEFAULT_THREAD_PRIORITY.put(EXPIRATION_SCHEDULED_EXECUTOR, Thread.MIN_PRIORITY);
DEFAULT_THREAD_PRIORITY.put(PERSISTENCE_EXECUTOR, Thread.NORM_PRIORITY);
DEFAULT_THREAD_PRIORITY.put(REMOTE_COMMAND_EXECUTOR, Thread.NORM_PRIORITY);
DEFAULT_THREAD_PRIORITY.put(TOTAL_ORDER_EXECUTOR, Thread.NORM_PRIORITY);
DEFAULT_THREAD_PRIORITY.put(STATE_TRANSFER_EXECUTOR, Thread.NORM_PRIORITY);
DEFAULT_THREAD_PRIORITY.put(ASYNC_OPERATIONS_EXECUTOR, Thread.NORM_PRIORITY);
DEFAULT_THREAD_PRIORITY.put(TIMEOUT_SCHEDULE_EXECUTOR, Thread.NORM_PRIORITY);
Expand Down
Expand Up @@ -27,7 +27,6 @@
import static org.infinispan.factories.KnownComponentNames.REMOTE_COMMAND_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.STATE_TRANSFER_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.TIMEOUT_SCHEDULE_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.TOTAL_ORDER_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.getDefaultThreadPrio;
import static org.infinispan.factories.KnownComponentNames.shortened;

Expand All @@ -47,7 +46,6 @@ public class NamedExecutorsFactory extends NamedComponentFactory implements Auto
private ExecutorService persistenceExecutor;
private BlockingTaskAwareExecutorService remoteCommandsExecutor;
private ScheduledExecutorService expirationExecutor;
private BlockingTaskAwareExecutorService totalOrderExecutor;
private ExecutorService stateTransferExecutor;
private ExecutorService asyncOperationsExecutor;
private ScheduledExecutorService timeoutExecutor;
Expand Down Expand Up @@ -108,16 +106,6 @@ public <T> T construct(Class<T> componentType, String componentName) {
}
}
return (T) remoteCommandsExecutor;
} else if (componentName.equals(TOTAL_ORDER_EXECUTOR)) {
synchronized (this) {
if (totalOrderExecutor == null) {
totalOrderExecutor = createExecutorService(
globalConfiguration.transport().totalOrderThreadPool(),
TOTAL_ORDER_EXECUTOR,
ExecutorServiceType.BLOCKING);
}
}
return (T) totalOrderExecutor;
} else if (componentName.equals(STATE_TRANSFER_EXECUTOR)) {
synchronized (this) {
if (stateTransferExecutor == null) {
Expand Down Expand Up @@ -161,7 +149,6 @@ public void stop() {
if (persistenceExecutor != null) persistenceExecutor.shutdownNow();
if (asyncTransportExecutor != null) asyncTransportExecutor.shutdownNow();
if (expirationExecutor != null) expirationExecutor.shutdownNow();
if (totalOrderExecutor != null) totalOrderExecutor.shutdownNow();
if (stateTransferExecutor != null) stateTransferExecutor.shutdownNow();
if (timeoutExecutor != null) timeoutExecutor.shutdownNow();
if (asyncOperationsExecutor != null) asyncOperationsExecutor.shutdownNow();
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/infinispan/util/logging/Log.java
Expand Up @@ -1423,4 +1423,7 @@ TimeoutException timeoutWaitingForView(int expectedViewId, int currentViewId,
@Message(value = "Global security authorization should be enabled if cache authorization enabled.", id = 414)
CacheConfigurationException globalSecurityAuthShouldBeEnabled();

@LogMessage(level = WARN)
@Message(value = "The %s is no longer supported since version %s. Attribute %s on line %d will be ignored.", id = 415)
void ignoredAttribute(String componentName, String version, String attributeName, int line);
}
5 changes: 0 additions & 5 deletions core/src/main/resources/schema/infinispan-config-9.0.xsd
Expand Up @@ -323,11 +323,6 @@
<xs:documentation>Defines the executor used for asynchronous transport communication.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="total-order-executor" type="xs:string" >
<xs:annotation>
<xs:documentation>Configuration for the total order executor service used to concurrent validate non conflicting transactions.</xs:documentation>
</xs:annotation>
</xs:attribute>
<xs:attribute name="remote-command-executor" type="xs:string" >
<xs:annotation>
<xs:documentation>Configuration for the executor service used to execute remote commands. Use org.infinispan.executors.WithinThreadExecutorFactory to disable.</xs:documentation>
Expand Down
Expand Up @@ -344,16 +344,6 @@ private void assertNamedCacheFile(EmbeddedCacheManager cm, boolean deprecated) {
assertEquals(TestCacheManagerFactory.STATE_TRANSFER_EXEC_QUEUE_SIZE, stateTransferThreadPool.queueLength());
assertEquals(TestCacheManagerFactory.KEEP_ALIVE, stateTransferThreadPool.keepAlive());

BlockingThreadPoolExecutorFactory totalOrderThreadPool =
cm.getCacheManagerConfiguration().transport().totalOrderThreadPool().threadPoolFactory();
assertEquals(16, totalOrderThreadPool.maxThreads());
assertEquals(1, totalOrderThreadPool.coreThreads());
assertEquals(1000, totalOrderThreadPool.keepAlive());
assertEquals(0, totalOrderThreadPool.queueLength());
DefaultThreadFactory totalOrderThreadFactory =
cm.getCacheManagerConfiguration().transport().totalOrderThreadPool().threadFactory();
assertEquals("TotalOrderValidatorThread", totalOrderThreadFactory.threadNamePattern());

DefaultThreadFactory evictionThreadFactory =
cm.getCacheManagerConfiguration().expirationThreadPool().threadFactory();
assertEquals("ExpirationThread", evictionThreadFactory.threadNamePattern());
Expand Down
Expand Up @@ -4,7 +4,6 @@
import org.infinispan.commons.CacheConfigurationException;
import org.infinispan.commons.equivalence.ByteArrayEquivalence;
import org.infinispan.commons.executors.BlockingThreadPoolExecutorFactory;
import org.infinispan.commons.executors.CachedThreadPoolExecutorFactory;
import org.infinispan.commons.executors.ScheduledThreadPoolExecutorFactory;
import org.infinispan.commons.executors.ThreadPoolExecutorFactory;
import org.infinispan.commons.marshall.AdvancedExternalizer;
Expand Down Expand Up @@ -203,12 +202,6 @@ private static void configurationCheck70(EmbeddedCacheManager cm) {
assertEquals(0, threadPool.queueLength());
assertEquals(0, threadPool.keepAlive());

assertTrue(cm.getCacheManagerConfiguration().transport().totalOrderThreadPool().threadPoolFactory() instanceof CachedThreadPoolExecutorFactory);
threadFactory = cm.getCacheManagerConfiguration().transport().totalOrderThreadPool().threadFactory();
assertEquals("infinispan", threadFactory.threadGroup().getName());
assertEquals("%G %i", threadFactory.threadNamePattern());
assertEquals(5, threadFactory.initialPriority());

assertTrue(cm.getCacheManagerConfiguration().expirationThreadPool().threadPoolFactory() instanceof ScheduledThreadPoolExecutorFactory);
threadFactory = cm.getCacheManagerConfiguration().expirationThreadPool().threadFactory();
assertEquals("infinispan", threadFactory.threadGroup().getName());
Expand Down
4 changes: 1 addition & 3 deletions core/src/test/resources/configs/named-cache-test.xml
Expand Up @@ -15,21 +15,19 @@
<thread-factory name="remote-factory" group-name="infinispan" thread-name-pattern="RemoteCommandThread"/>
<thread-factory name="expiration-factory" group-name="infinispan" thread-name-pattern="ExpirationThread"/>
<thread-factory name="replication-queue-factory" group-name="infinispan" thread-name-pattern="ReplicationQueueThread"/>
<thread-factory name="total-order-factory" group-name="infinispan" thread-name-pattern="TotalOrderValidatorThread"/>
<blocking-bounded-queue-thread-pool name="listener" thread-factory="listener-factory" max-threads="5" queue-length="10000"/>
<blocking-bounded-queue-thread-pool name="persistence" thread-factory="persistence-factory" max-threads="6" queue-length="10001"/>
<blocking-bounded-queue-thread-pool name="state-transfer" thread-factory="persistence-factory" max-threads="20" queue-length="5" keepalive-time="60000"/>
<blocking-bounded-queue-thread-pool name="transport" thread-factory="transport-factory" max-threads="25" queue-length="10000"/>
<blocking-bounded-queue-thread-pool name="remote" thread-factory="transport-factory" max-threads="30" core-threads="2" queue-length="10000" keepalive-time="10000"/>
<blocking-bounded-queue-thread-pool name="total" thread-factory="total-order-factory" max-threads="16" core-threads="1" queue-length="0" keepalive-time="1000"/>
<scheduled-thread-pool name="expiration" thread-factory="expiration-factory" />
<scheduled-thread-pool name="replication-queue" thread-factory="replication-queue-factory" />
</threads>

<cache-container default-cache="default" statistics="true" shutdown-hook="REGISTER"
listener-executor="listener" persistence-executor="persistence" state-transfer-executor="state-transfer" expiration-executor="expiration">
<transport stack="udp" cluster="infinispan-cluster" lock-timeout="50000" node-name="Jalapeno" machine="m1" rack="r1" site="s1"
executor="transport" total-order-executor="total" remote-command-executor="remote" />
executor="transport" remote-command-executor="remote" />
<serialization marshaller="org.infinispan.marshall.core.VersionAwareMarshaller" version="1.0">
<advanced-externalizer id="1234" class="org.infinispan.marshall.AdvancedExternalizerTest$IdViaConfigObj$Externalizer"/>
<advanced-externalizer class="org.infinispan.marshall.AdvancedExternalizerTest$IdViaAnnotationObj$Externalizer"/>
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/resources/configs/unified/9.0.xml
Expand Up @@ -20,7 +20,7 @@
<!-- state-transfer-executor -->
<blocking-bounded-queue-thread-pool name="infinispan-state-transfer" thread-factory="infinispan-factory"
core-threads="1" max-threads="60" queue-length="0" keepalive-time="0"/>
<!-- transport.total-order-executor, transport.remote-command-executor, persistence-executor -->
<!-- transport.remote-command-executor, persistence-executor -->
<cached-thread-pool name="infinispan-cached" thread-factory="infinispan-factory" />
<!-- expiration-executor -->
<scheduled-thread-pool name="infinispan-expiration" thread-factory="infinispan-factory" />
Expand All @@ -29,7 +29,7 @@
<cache-container name="maximal" aliases="alias1 alias2" default-cache="local" async-executor="infinispan-async" expiration-executor="infinispan-expiration"
jndi-name="java:global/infinispan/maximal" state-transfer-executor="infinispan-state-transfer" listener-executor="infinispan-listener"
persistence-executor="infinispan-cached" module="org.infinispan" statistics="true" shutdown-hook="DONT_REGISTER">
<transport cluster="maximal-cluster" executor="infinispan-transport" total-order-executor="infinispan-cached" remote-command-executor="infinispan-cached" lock-timeout="120000" stack="tcp" node-name="a-node" machine="a" rack="b" site="c"
<transport cluster="maximal-cluster" executor="infinispan-transport" remote-command-executor="infinispan-cached" lock-timeout="120000" stack="tcp" node-name="a-node" machine="a" rack="b" site="c"
initial-cluster-size="4" initial-cluster-timeout="30000" />
<serialization marshaller="org.infinispan.marshall.core.VersionAwareMarshaller" version="1.0">
<advanced-externalizer class="org.infinispan.marshall.AdvancedExternalizerTest$IdViaConfigObj$Externalizer" id="9001" />
Expand Down

0 comments on commit c8d3df0

Please sign in to comment.