Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

ISPN-2635 and ISPN-2536: Support total order based commit protocol

  • Loading branch information...
commit ed480339df6186acfb8185f3144fcef51e3e3d6e 1 parent 45b3374
@pruivo pruivo authored danberindei committed
Showing with 6,044 additions and 433 deletions.
  1. +1 −1  cli/cli-server/src/main/java/org/infinispan/cli/interpreter/session/SessionImpl.java
  2. +21 −5 core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
  3. +20 −0 core/src/main/java/org/infinispan/commands/RemoteCommandsFactory.java
  4. +5 −1 core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java
  5. +61 −1 core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java
  6. +61 −0 core/src/main/java/org/infinispan/commands/tx/totalorder/TotalOrderCommitCommand.java
  7. +80 −0 core/src/main/java/org/infinispan/commands/tx/totalorder/TotalOrderNonVersionedPrepareCommand.java
  8. +62 −0 core/src/main/java/org/infinispan/commands/tx/totalorder/TotalOrderPrepareCommand.java
  9. +62 −0 core/src/main/java/org/infinispan/commands/tx/totalorder/TotalOrderRollbackCommand.java
  10. +61 −0 core/src/main/java/org/infinispan/commands/tx/totalorder/TotalOrderVersionedCommitCommand.java
  11. +77 −0 core/src/main/java/org/infinispan/commands/tx/totalorder/TotalOrderVersionedPrepareCommand.java
  12. +11 −0 core/src/main/java/org/infinispan/configuration/cache/Configurations.java
  13. +15 −1 core/src/main/java/org/infinispan/configuration/cache/TransactionConfiguration.java
  14. +29 −1 core/src/main/java/org/infinispan/configuration/cache/TransactionConfigurationBuilder.java
  15. +9 −1 core/src/main/java/org/infinispan/configuration/global/GlobalConfiguration.java
  16. +15 −4 core/src/main/java/org/infinispan/configuration/global/GlobalConfigurationBuilder.java
  17. +3 −1 core/src/main/java/org/infinispan/configuration/parsing/Attribute.java
  18. +2 −1  core/src/main/java/org/infinispan/configuration/parsing/Element.java
  19. +29 −113 core/src/main/java/org/infinispan/configuration/parsing/Parser53.java
  20. +1 −1  core/src/main/java/org/infinispan/distexec/DefaultExecutorService.java
  21. +2 −2 core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceTask.java
  22. +4 −4 core/src/main/java/org/infinispan/distribution/L1ManagerImpl.java
  23. +154 −0 core/src/main/java/org/infinispan/executors/LazyInitializingBlockingTaskAwareExecutorService.java
  24. +7 −0 core/src/main/java/org/infinispan/factories/ComponentRegistry.java
  25. +6 −1 core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
  26. +65 −18 core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java
  27. +8 −4 core/src/main/java/org/infinispan/factories/KnownComponentNames.java
  28. +29 −1 core/src/main/java/org/infinispan/factories/NamedExecutorsFactory.java
  29. +22 −6 core/src/main/java/org/infinispan/interceptors/EntryWrappingInterceptor.java
  30. +2 −2 core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
  31. +4 −4 core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
  32. +4 −2 core/src/main/java/org/infinispan/interceptors/TxInterceptor.java
  33. +2 −4 core/src/main/java/org/infinispan/interceptors/VersionedEntryWrappingInterceptor.java
  34. +1 −1  core/src/main/java/org/infinispan/interceptors/VersionedReplicationInterceptor.java
  35. +86 −2 core/src/main/java/org/infinispan/interceptors/base/BaseRpcInterceptor.java
  36. +1 −1  core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java
  37. +5 −5 core/src/main/java/org/infinispan/interceptors/distribution/NonTxConcurrentDistributionInterceptor.java
  38. +2 −2 core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java
  39. +5 −5 core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java
  40. +1 −1  core/src/main/java/org/infinispan/interceptors/distribution/VersionedDistributionInterceptor.java
  41. +48 −2 core/src/main/java/org/infinispan/interceptors/locking/ClusteringDependentLogic.java
  42. +1 −45 core/src/main/java/org/infinispan/interceptors/locking/OptimisticLockingInterceptor.java
  43. +1 −1  core/src/main/java/org/infinispan/interceptors/locking/PessimisticLockingInterceptor.java
  44. +35 −0 core/src/main/java/org/infinispan/interceptors/totalorder/RetryPrepareException.java
  45. +100 −0 core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderDistributionInterceptor.java
  46. +192 −0 core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderInterceptor.java
  47. +68 −0 core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderReplicationInterceptor.java
  48. +117 −0 core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderStateTransferInterceptor.java
  49. +118 −0 core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderVersionedDistributionInterceptor.java
  50. +116 −0 core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderVersionedEntryWrappingInterceptor.java
  51. +80 −0 core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderVersionedReplicationInterceptor.java
  52. +1 −1  core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java
  53. +1 −1  core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
  54. +8 −1 core/src/main/java/org/infinispan/marshall/exts/CacheRpcCommandExternalizer.java
  55. +50 −3 core/src/main/java/org/infinispan/remoting/InboundInvocationHandlerImpl.java
  56. +1 −1  core/src/main/java/org/infinispan/remoting/ReplicationQueueImpl.java
  57. +2 −2 core/src/main/java/org/infinispan/remoting/responses/IgnoreExtraResponsesValidityFilter.java
  58. +87 −0 core/src/main/java/org/infinispan/remoting/responses/KeysValidateFilter.java
  59. +56 −0 core/src/main/java/org/infinispan/remoting/responses/SelfDeliverFilter.java
  60. +19 −12 core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
  61. +24 −19 core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
  62. +11 −3 core/src/main/java/org/infinispan/remoting/transport/Transport.java
  63. +51 −21 core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
  64. +32 −12 core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
  65. +3 −3 core/src/main/java/org/infinispan/statetransfer/InboundTransferTask.java
  66. +1 −1  core/src/main/java/org/infinispan/statetransfer/OutboundTransferTask.java
  67. +5 −0 core/src/main/java/org/infinispan/statetransfer/StateConsumer.java
  68. +54 −7 core/src/main/java/org/infinispan/statetransfer/StateConsumerImpl.java
  69. +1 −1  core/src/main/java/org/infinispan/statetransfer/StateTransferLock.java
  70. +5 −0 core/src/main/java/org/infinispan/statetransfer/StateTransferManager.java
  71. +10 −4 core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java
  72. +20 −2 core/src/main/java/org/infinispan/topology/CacheJoinInfo.java
  73. +8 −0 core/src/main/java/org/infinispan/topology/ClusterCacheStatus.java
  74. +25 −5 core/src/main/java/org/infinispan/topology/ClusterTopologyManagerImpl.java
  75. +2 −2 core/src/main/java/org/infinispan/topology/LocalTopologyManagerImpl.java
  76. +6 −3 core/src/main/java/org/infinispan/transaction/AbstractEnlistmentAdapter.java
  77. +33 −0 core/src/main/java/org/infinispan/transaction/LocalTransaction.java
  78. +19 −0 core/src/main/java/org/infinispan/transaction/RemoteTransaction.java
  79. +254 −0 core/src/main/java/org/infinispan/transaction/TotalOrderRemoteTransactionState.java
  80. +19 −12 core/src/main/java/org/infinispan/transaction/TransactionCoordinator.java
  81. +40 −0 core/src/main/java/org/infinispan/transaction/TransactionProtocol.java
  82. +2 −2 core/src/main/java/org/infinispan/transaction/TransactionTable.java
  83. +32 −0 core/src/main/java/org/infinispan/transaction/WriteSkewHelper.java
  84. +50 −0 core/src/main/java/org/infinispan/transaction/totalorder/TotalOrderLatch.java
  85. +81 −0 core/src/main/java/org/infinispan/transaction/totalorder/TotalOrderLatchImpl.java
  86. +190 −0 core/src/main/java/org/infinispan/transaction/totalorder/TotalOrderManager.java
  87. +8 −1 core/src/main/java/org/infinispan/transaction/xa/GlobalTransaction.java
  88. +13 −2 core/src/main/java/org/infinispan/transaction/xa/TransactionXaAdapter.java
  89. +5 −5 core/src/main/java/org/infinispan/transaction/xa/recovery/RecoveryManagerImpl.java
  90. +42 −0 core/src/main/java/org/infinispan/util/concurrent/BlockingRunnable.java
  91. +48 −0 core/src/main/java/org/infinispan/util/concurrent/BlockingTaskAwareExecutorService.java
  92. +146 −0 core/src/main/java/org/infinispan/util/concurrent/BlockingTaskAwareExecutorServiceImpl.java
  93. +16 −0 core/src/main/java/org/infinispan/util/logging/Log.java
  94. +1 −1  core/src/main/java/org/infinispan/xsite/XSiteAdminOperations.java
  95. +2 −0  core/src/main/resources/jgroups-ec2.xml
  96. +2 −0  core/src/main/resources/jgroups-tcp.xml
  97. +2 −0  core/src/main/resources/jgroups-udp.xml
  98. +26 −0 core/src/main/resources/schema/infinispan-config-5.3.xsd
  99. +2 −1  core/src/test/java/org/infinispan/api/mvcc/PutForExternalReadTest.java
  100. +16 −0 core/src/test/java/org/infinispan/configuration/XmlFileParsingTest.java
  101. +4 −0 core/src/test/java/org/infinispan/container/versioning/VersionedDistStateTransferTest.java
  102. +5 −0 core/src/test/java/org/infinispan/container/versioning/VersionedReplStateTransferTest.java
  103. +142 −0 core/src/test/java/org/infinispan/executors/BlockingTaskAwareExecutorServiceTest.java
  104. +2 −2 core/src/test/java/org/infinispan/interceptors/ReplicationInterceptorTest.java
  105. +1 −1  core/src/test/java/org/infinispan/invalidation/BaseInvalidationTest.java
  106. +1 −1  core/src/test/java/org/infinispan/jmx/RpcManagerMBeanTest.java
  107. +2 −2 core/src/test/java/org/infinispan/lock/singlelock/AbstractCrashTest.java
  108. +21 −0 core/src/test/java/org/infinispan/marshall/VersionAwareMarshallerTest.java
  109. +2 −2 core/src/test/java/org/infinispan/remoting/MessageSentToLeaverTest.java
  110. +6 −6 core/src/test/java/org/infinispan/replication/ForceSyncAsyncFlagsTest.java
  111. +4 −4 core/src/test/java/org/infinispan/replication/SyncReplTest.java
  112. +4 −2 core/src/test/java/org/infinispan/statetransfer/StateConsumerTest.java
  113. +1 −1  core/src/test/java/org/infinispan/statetransfer/StateTransferFunctionalTest.java
  114. +2 −2 core/src/test/java/org/infinispan/statetransfer/StateTransferRestartTest.java
  115. +1 −1  core/src/test/java/org/infinispan/test/AbstractInfinispanTest.java
  116. +3 −1 core/src/test/java/org/infinispan/test/fwk/JGroupsConfigBuilder.java
  117. +14 −14 core/src/test/java/org/infinispan/tx/dld/ControlledRpcManager.java
  118. +294 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/dist/FullAsyncTotalOrderTest.java
  119. +38 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/dist/FullSyncTotalOrderTest.java
  120. +63 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/dist/FullSyncWriteSkewTotalOrderTest.java
  121. +59 −0 ...c/test/java/org/infinispan/tx/totalorder/simple/dist/FullSyncWriteSkewUseSynchronizationTotalOrderTest.java
  122. +288 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/dist/SingleNodeOnePhaseTotalOrderTest.java
  123. +34 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/dist/SingleNodeTwoPhaseTotalOrderTest.java
  124. +34 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/dist/SyncPrepareTotalOrderTest.java
  125. +59 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/dist/SyncPrepareWriteSkewTotalOrderTest.java
  126. +59 −0 ...est/java/org/infinispan/tx/totalorder/simple/dist/SyncPrepareWriteSkewUseSynchronizationTotalOrderTest.java
  127. +294 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/repl/FullAsyncTotalOrderTest.java
  128. +34 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/repl/FullSyncTotalOrderTest.java
  129. +63 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/repl/FullSyncWriteSkewTotalOrderTest.java
  130. +63 −0 ...c/test/java/org/infinispan/tx/totalorder/simple/repl/FullSyncWriteSkewUseSynchronizationTotalOrderTest.java
  131. +273 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/repl/SingleNodeOnePhaseTotalOrderTest.java
  132. +33 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/repl/SingleNodeTwoPhaseTotalOrderTest.java
  133. +34 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/repl/SyncPrepareTotalOrderTest.java
  134. +59 −0 core/src/test/java/org/infinispan/tx/totalorder/simple/repl/SyncPrepareWriteSkewTotalOrderTest.java
  135. +59 −0 ...est/java/org/infinispan/tx/totalorder/simple/repl/SyncPrepareWriteSkewUseSynchronizationTotalOrderTest.java
  136. +64 −0 .../src/test/java/org/infinispan/tx/totalorder/statetransfer/DistTotalOrderStateTransferFunctional1PcTest.java
  137. +35 −0 .../src/test/java/org/infinispan/tx/totalorder/statetransfer/DistTotalOrderStateTransferFunctional2PcTest.java
  138. +37 −0 core/src/test/java/org/infinispan/tx/totalorder/statetransfer/DistTotalOrderVersionedStateTransferTest.java
  139. +68 −0 .../src/test/java/org/infinispan/tx/totalorder/statetransfer/ReplTotalOrderStateTransferFunctional1PcTest.java
  140. +35 −0 .../src/test/java/org/infinispan/tx/totalorder/statetransfer/ReplTotalOrderStateTransferFunctional2PcTest.java
  141. +37 −0 core/src/test/java/org/infinispan/tx/totalorder/statetransfer/ReplTotalOrderVersionedStateTransferTest.java
  142. +70 −0 core/src/test/java/org/infinispan/tx/totalorder/writeskew/DistTotalOrderL1WriteSkewTest.java
  143. +70 −0 core/src/test/java/org/infinispan/tx/totalorder/writeskew/DistTotalOrderWriteSkewTest.java
  144. +67 −0 core/src/test/java/org/infinispan/tx/totalorder/writeskew/TotalOrderWriteSkewTest.java
  145. +14 −14 core/src/test/java/org/infinispan/util/CountingRpcManager.java
  146. +7 −2 core/src/test/java/org/infinispan/xsite/offline/DelegatingTransport.java
  147. +7 −2 core/src/test/resources/configs/all.xml
  148. +19 −0 core/src/test/resources/configs/named-cache-test.xml
  149. +2 −0  core/src/test/resources/stacks/tcp.xml
  150. +2 −0  core/src/test/resources/stacks/tcp_mping/tcp1.xml
  151. +2 −0  core/src/test/resources/stacks/tcp_mping/tcp2.xml
  152. +2 −0  core/src/test/resources/stacks/udp.xml
  153. +2 −2 query/src/main/java/org/infinispan/query/clustered/ClusteredQueryInvoker.java
  154. +1 −1  query/src/main/java/org/infinispan/query/indexmanager/InfinispanCommandsBackend.java
  155. +5 −1 spring/src/test/java/org/infinispan/spring/mock/MockTransport.java
View
2  cli/cli-server/src/main/java/org/infinispan/cli/interpreter/session/SessionImpl.java
@@ -118,7 +118,7 @@ public void createCache(String cacheName, String baseCacheName) {
CreateCacheCommand ccc = factory.buildCreateCacheCommand(cacheName, baseCacheName);
try {
- rpc.invokeRemotely(null, ccc, true, false);
+ rpc.invokeRemotely(null, ccc, true, false, false);
ccc.init(cacheManager);
ccc.perform(null);
} catch (Throwable e) {
View
26 core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
@@ -45,8 +45,13 @@
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
+import org.infinispan.commands.tx.totalorder.TotalOrderCommitCommand;
+import org.infinispan.commands.tx.totalorder.TotalOrderNonVersionedPrepareCommand;
import org.infinispan.commands.tx.VersionedCommitCommand;
import org.infinispan.commands.tx.VersionedPrepareCommand;
+import org.infinispan.commands.tx.totalorder.TotalOrderRollbackCommand;
+import org.infinispan.commands.tx.totalorder.TotalOrderVersionedCommitCommand;
+import org.infinispan.commands.tx.totalorder.TotalOrderVersionedPrepareCommand;
import org.infinispan.commands.write.ApplyDeltaCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.EvictCommand;
@@ -116,6 +121,7 @@
private CacheNotifier notifier;
private Cache<Object, Object> cache;
private String cacheName;
+ private boolean totalOrderProtocol;
// some stateless commands can be reused so that they aren't constructed again all the time.
private SizeCommand cachedSizeCommand;
@@ -171,6 +177,7 @@ public void setupDependencies(DataContainer container, CacheNotifier notifier, C
// needs to happen early on
public void start() {
cacheName = cache.getName();
+ this.totalOrderProtocol = configuration.transaction().transactionProtocol().isTotalOrder();
}
@Override
@@ -272,27 +279,31 @@ public EvictCommand buildEvictCommand(Object key, Set<Flag> flags) {
@Override
public PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List<WriteCommand> modifications, boolean onePhaseCommit) {
- return new PrepareCommand(cacheName, gtx, modifications, onePhaseCommit);
+ return totalOrderProtocol ? new TotalOrderNonVersionedPrepareCommand(cacheName, gtx, modifications) :
+ new PrepareCommand(cacheName, gtx, modifications, onePhaseCommit);
}
@Override
public VersionedPrepareCommand buildVersionedPrepareCommand(GlobalTransaction gtx, List<WriteCommand> modifications, boolean onePhase) {
- return new VersionedPrepareCommand(cacheName, gtx, modifications, onePhase);
+ return totalOrderProtocol ? new TotalOrderVersionedPrepareCommand(cacheName, gtx, modifications, onePhase) :
+ new VersionedPrepareCommand(cacheName, gtx, modifications, onePhase);
}
@Override
public CommitCommand buildCommitCommand(GlobalTransaction gtx) {
- return new CommitCommand(cacheName, gtx);
+ return totalOrderProtocol ? new TotalOrderCommitCommand(cacheName, gtx) :
+ new CommitCommand(cacheName, gtx);
}
@Override
public VersionedCommitCommand buildVersionedCommitCommand(GlobalTransaction gtx) {
- return new VersionedCommitCommand(cacheName, gtx);
+ return totalOrderProtocol ? new TotalOrderVersionedCommitCommand(cacheName, gtx) :
+ new VersionedCommitCommand(cacheName, gtx);
}
@Override
public RollbackCommand buildRollbackCommand(GlobalTransaction gtx) {
- return new RollbackCommand(cacheName, gtx);
+ return totalOrderProtocol ? new TotalOrderRollbackCommand(cacheName, gtx) : new RollbackCommand(cacheName, gtx);
}
@Override
@@ -355,6 +366,8 @@ public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) {
break;
case PrepareCommand.COMMAND_ID:
case VersionedPrepareCommand.COMMAND_ID:
+ case TotalOrderNonVersionedPrepareCommand.COMMAND_ID:
+ case TotalOrderVersionedPrepareCommand.COMMAND_ID:
PrepareCommand pc = (PrepareCommand) c;
pc.init(interceptorChain, icc, txTable);
pc.initialize(notifier, recoveryManager);
@@ -370,11 +383,14 @@ public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) {
break;
case CommitCommand.COMMAND_ID:
case VersionedCommitCommand.COMMAND_ID:
+ case TotalOrderCommitCommand.COMMAND_ID:
+ case TotalOrderVersionedCommitCommand.COMMAND_ID:
CommitCommand commitCommand = (CommitCommand) c;
commitCommand.init(interceptorChain, icc, txTable);
commitCommand.markTransactionAsRemote(isRemote);
break;
case RollbackCommand.COMMAND_ID:
+ case TotalOrderRollbackCommand.COMMAND_ID:
RollbackCommand rollbackCommand = (RollbackCommand) c;
rollbackCommand.init(interceptorChain, icc, txTable);
rollbackCommand.markTransactionAsRemote(isRemote);
View
20 core/src/main/java/org/infinispan/commands/RemoteCommandsFactory.java
@@ -43,6 +43,11 @@
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.tx.VersionedCommitCommand;
import org.infinispan.commands.tx.VersionedPrepareCommand;
+import org.infinispan.commands.tx.totalorder.TotalOrderCommitCommand;
+import org.infinispan.commands.tx.totalorder.TotalOrderNonVersionedPrepareCommand;
+import org.infinispan.commands.tx.totalorder.TotalOrderRollbackCommand;
+import org.infinispan.commands.tx.totalorder.TotalOrderVersionedCommitCommand;
+import org.infinispan.commands.tx.totalorder.TotalOrderVersionedPrepareCommand;
import org.infinispan.commands.write.ApplyDeltaCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.InvalidateCommand;
@@ -180,15 +185,30 @@ public CacheRpcCommand fromStream(byte id, Object[] parameters, byte type, Strin
case VersionedPrepareCommand.COMMAND_ID:
command = new VersionedPrepareCommand(cacheName);
break;
+ case TotalOrderNonVersionedPrepareCommand.COMMAND_ID:
+ command = new TotalOrderNonVersionedPrepareCommand(cacheName);
+ break;
+ case TotalOrderVersionedPrepareCommand.COMMAND_ID:
+ command = new TotalOrderVersionedPrepareCommand(cacheName);
+ break;
case CommitCommand.COMMAND_ID:
command = new CommitCommand(cacheName);
break;
case VersionedCommitCommand.COMMAND_ID:
command = new VersionedCommitCommand(cacheName);
break;
+ case TotalOrderCommitCommand.COMMAND_ID:
+ command = new TotalOrderCommitCommand(cacheName);
+ break;
+ case TotalOrderVersionedCommitCommand.COMMAND_ID:
+ command = new TotalOrderVersionedCommitCommand(cacheName);
+ break;
case RollbackCommand.COMMAND_ID:
command = new RollbackCommand(cacheName);
break;
+ case TotalOrderRollbackCommand.COMMAND_ID:
+ command = new TotalOrderRollbackCommand(cacheName);
+ break;
case MultipleRpcCommand.COMMAND_ID:
command = new MultipleRpcCommand(cacheName);
break;
View
6 core/src/main/java/org/infinispan/commands/tx/AbstractTransactionBoundaryCommand.java
@@ -105,7 +105,7 @@ protected Object invalidRemoteTxReturnValue() { //todo [anistor] no longer u
public Object perform(InvocationContext ctx) throws Throwable {
if (ctx != null) throw new IllegalStateException("Expected null context!");
markGtxAsRemote();
- RemoteTransaction transaction = txTable.getRemoteTransaction(globalTx);
+ RemoteTransaction transaction = getRemoteTransaction();
if (transaction == null) {
if (trace) log.tracef("Did not find a RemoteTransaction for %s", globalTx);
return invalidRemoteTxReturnValue();
@@ -122,6 +122,10 @@ protected void visitRemoteTransaction(RemoteTransaction tx) {
// to be overridden
}
+ protected RemoteTransaction getRemoteTransaction() {
+ return txTable.getRemoteTransaction(globalTx);
+ }
+
@Override
public Object[] getParameters() {
return new Object[]{globalTx};
View
62 core/src/main/java/org/infinispan/commands/tx/PrepareCommand.java
@@ -23,10 +23,16 @@
package org.infinispan.commands.tx;
import org.infinispan.commands.Visitor;
+import org.infinispan.commands.write.ApplyDeltaCommand;
+import org.infinispan.commands.write.ClearCommand;
+import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
+import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.commons.hash.Hash;
+import org.infinispan.commons.hash.MurmurHash3;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.RemoteTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
@@ -35,10 +41,12 @@
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.transaction.xa.recovery.RecoveryManager;
import org.infinispan.util.InfinispanCollections;
+import org.infinispan.util.TimSort;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -64,6 +72,16 @@
private transient boolean replayEntryWrapping = false;
private static final WriteCommand[] EMPTY_WRITE_COMMAND_ARRAY = new WriteCommand[0];
+ private static final Object[] EMPTY_ARRAY = new Object[0];
+ private static final Comparator<Object> KEY_COMPARATOR = new Comparator<Object>() {
+
+ private final Hash hash = new MurmurHash3();
+
+ @Override
+ public int compare(Object o1, Object o2) {
+ return Integer.valueOf(hash.hash(o1)).compareTo(hash.hash(o2));
+ }
+ };
public void initialize(CacheNotifier notifier, RecoveryManager recoveryManager) {
this.notifier = notifier;
@@ -103,7 +121,7 @@ public Object perform(InvocationContext ignored) throws Throwable {
}
// 1. first create a remote transaction (or get the existing one)
- RemoteTransaction remoteTransaction = txTable.getOrCreateRemoteTransaction(globalTx, modifications);
+ RemoteTransaction remoteTransaction = getRemoteTransaction();
//set the list of modifications anyway, as the transaction might have already been created by a previous
//LockControlCommand with null modifications.
if (hasModifications()) {
@@ -120,6 +138,11 @@ public Object perform(InvocationContext ignored) throws Throwable {
}
@Override
+ protected RemoteTransaction getRemoteTransaction() {
+ return txTable.getOrCreateRemoteTransaction(globalTx, modifications);
+ }
+
+ @Override
public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable {
return visitor.visitPrepareCommand((TxInvocationContext) ctx, this);
}
@@ -218,4 +241,41 @@ public boolean writesToASingleKey() {
public boolean isReturnValueExpected() {
return false;
}
+
+ /**
+ * It returns an array of keys affected by the WriteCommand in modifications.
+ *
+ * @param sort if {@code true}, the array returned is sorted by the key hash.
+ * @return an array of keys
+ */
+ public Object[] getAffectedKeysToLock(boolean sort) {
+ if (modifications == null) {
+ return EMPTY_ARRAY;
+ }
+ Set<Object> set = new HashSet<Object>(modifications.length);
+ for (WriteCommand wc : modifications) {
+ switch (wc.getCommandId()) {
+ case ClearCommand.COMMAND_ID:
+ return null;
+ case PutKeyValueCommand.COMMAND_ID:
+ case RemoveCommand.COMMAND_ID:
+ case ReplaceCommand.COMMAND_ID:
+ set.add(((DataWriteCommand) wc).getKey());
+ break;
+ case PutMapCommand.COMMAND_ID:
+ set.addAll(wc.getAffectedKeys());
+ break;
+ case ApplyDeltaCommand.COMMAND_ID:
+ ApplyDeltaCommand command = (ApplyDeltaCommand) wc;
+ Object[] compositeKeys = command.getCompositeKeys();
+ set.addAll(Arrays.asList(compositeKeys));
+ break;
+ }
+ }
+ Object[] sorted = set.toArray(new Object[set.size()]);
+ if (sort) {
+ TimSort.sort(sorted, KEY_COMPARATOR);
+ }
+ return sorted;
+ }
}
View
61 core/src/main/java/org/infinispan/commands/tx/totalorder/TotalOrderCommitCommand.java
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+
+package org.infinispan.commands.tx.totalorder;
+
+import org.infinispan.commands.tx.CommitCommand;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.RemoteTxInvocationContext;
+import org.infinispan.transaction.RemoteTransaction;
+import org.infinispan.transaction.xa.GlobalTransaction;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * Commit Command used in the 2nd phase of 2PC. This command is used when non versioned entries are needed
+ *
+ * @author Pedro Ruivo
+ * @since 5.3
+ */
+public class TotalOrderCommitCommand extends CommitCommand {
+
+ public static final byte COMMAND_ID = 35;
+ private static final Log log = LogFactory.getLog(TotalOrderCommitCommand.class);
+
+ public TotalOrderCommitCommand(String cacheName, GlobalTransaction gtx) {
+ super(cacheName, gtx);
+ }
+
+ public TotalOrderCommitCommand(String cacheName) {
+ super(cacheName);
+ }
+
+ private TotalOrderCommitCommand() {
+ super(null); // For command id uniqueness test
+ }
+
+ @Override
+ public byte getCommandId() {
+ return COMMAND_ID;
+ }
+
+ @Override
+ protected RemoteTransaction getRemoteTransaction() {
+ return txTable.getOrCreateRemoteTransaction(globalTx, null);
+ }
+}
View
80 ...main/java/org/infinispan/commands/tx/totalorder/TotalOrderNonVersionedPrepareCommand.java
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+
+package org.infinispan.commands.tx.totalorder;
+
+import org.infinispan.commands.tx.PrepareCommand;
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.transaction.TotalOrderRemoteTransactionState;
+import org.infinispan.transaction.xa.GlobalTransaction;
+
+import java.util.List;
+
+/**
+ * Command corresponding to the 1st phase of 2PC when Total Order based protocol is used. This command is used when non
+ * versioned entries are needed.
+ *
+ * @author Pedro Ruivo
+ * @since 5.3
+ */
+public class TotalOrderNonVersionedPrepareCommand extends PrepareCommand implements TotalOrderPrepareCommand {
+
+ public static final byte COMMAND_ID = 38;
+
+ public TotalOrderNonVersionedPrepareCommand(String cacheName, GlobalTransaction gtx, WriteCommand... modifications) {
+ super(cacheName, gtx, true, modifications);
+ }
+
+ public TotalOrderNonVersionedPrepareCommand(String cacheName, GlobalTransaction gtx, List<WriteCommand> commands) {
+ super(cacheName, gtx, commands, true);
+ }
+
+ public TotalOrderNonVersionedPrepareCommand(String cacheName) {
+ super(cacheName);
+ }
+
+ private TotalOrderNonVersionedPrepareCommand() {
+ super(null); // For command id uniqueness test
+ }
+
+ @Override
+ public byte getCommandId() {
+ return COMMAND_ID;
+ }
+
+ @Override
+ public void markAsOnePhaseCommit() {
+ //no-op. it is always one phase commit
+ }
+
+ @Override
+ public void markSkipWriteSkewCheck() {
+ //no-op. no write skew check in non versioned mode
+ }
+
+ @Override
+ public boolean skipWriteSkewCheck() {
+ return true; //no write skew check with non versioned mode
+ }
+
+ @Override
+ public TotalOrderRemoteTransactionState getOrCreateState() {
+ return getRemoteTransaction().getTransactionState();
+ }
+
+}
View
62 core/src/main/java/org/infinispan/commands/tx/totalorder/TotalOrderPrepareCommand.java
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+
+package org.infinispan.commands.tx.totalorder;
+
+import org.infinispan.commands.tx.TransactionBoundaryCommand;
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.transaction.TotalOrderRemoteTransactionState;
+
+/**
+ * Interface with the utilities methods that the prepare command must have when Total Order based protocol is used
+ *
+ * @author Pedro Ruivo
+ * @since 5.3
+ */
+public interface TotalOrderPrepareCommand extends TransactionBoundaryCommand {
+
+ /**
+ * marks the prepare phase as 1PC to apply immediately the modifications. It is used when the {@code
+ * org.infinispan.commands.tx.CommitCommand} is received before the {@code org.infinispan.commands.tx.PrepareCommand}.
+ */
+ void markAsOnePhaseCommit();
+
+ /**
+ * it signals that the write skew check is not needed (for versioned entries). It is used when the {@code
+ * org.infinispan.commands.tx.CommitCommand} is received before the {@code org.infinispan.commands.tx.PrepareCommand}.
+ */
+ void markSkipWriteSkewCheck();
+
+ /**
+ * @return {@code true} when the write skew check is not needed.
+ */
+ boolean skipWriteSkewCheck();
+
+ /**
+ * @return the modifications performed by this transaction
+ */
+ WriteCommand[] getModifications();
+
+ /**
+ * returns the {@link TotalOrderRemoteTransactionState} associated with this transaction, creating one if no one was
+ * associated to this transaction.
+ *
+ * @return returns the {@link TotalOrderRemoteTransactionState} associated with this transaction.
+ */
+ TotalOrderRemoteTransactionState getOrCreateState();
+}
View
62 core/src/main/java/org/infinispan/commands/tx/totalorder/TotalOrderRollbackCommand.java
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+
+package org.infinispan.commands.tx.totalorder;
+
+import org.infinispan.commands.tx.RollbackCommand;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.RemoteTxInvocationContext;
+import org.infinispan.transaction.RemoteTransaction;
+import org.infinispan.transaction.xa.GlobalTransaction;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * The 2nd phase command of 2PC, used when a transaction must be aborted. This implementation is used when Total Order
+ * based protocol is used
+ *
+ * @author Pedro Ruivo
+ * @since 5.3
+ */
+public class TotalOrderRollbackCommand extends RollbackCommand {
+
+ public static final byte COMMAND_ID = 37;
+ private static final Log log = LogFactory.getLog(TotalOrderRollbackCommand.class);
+
+ public TotalOrderRollbackCommand(String cacheName, GlobalTransaction globalTransaction) {
+ super(cacheName, globalTransaction);
+ }
+
+ public TotalOrderRollbackCommand(String cacheName) {
+ super(cacheName);
+ }
+
+ private TotalOrderRollbackCommand() {
+ super(null); // For command id uniqueness test
+ }
+
+ @Override
+ public byte getCommandId() {
+ return COMMAND_ID;
+ }
+
+ @Override
+ protected RemoteTransaction getRemoteTransaction() {
+ return txTable.getOrCreateRemoteTransaction(globalTx, null);
+ }
+}
View
61 ...src/main/java/org/infinispan/commands/tx/totalorder/TotalOrderVersionedCommitCommand.java
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+
+package org.infinispan.commands.tx.totalorder;
+
+import org.infinispan.commands.tx.VersionedCommitCommand;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.RemoteTxInvocationContext;
+import org.infinispan.transaction.RemoteTransaction;
+import org.infinispan.transaction.xa.GlobalTransaction;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * Command corresponding to the 2nd phase of 2PC. Used in Total Order based protocol when versioned entries are needed
+ *
+ * @author Pedro Ruivo
+ * @since 5.3
+ */
+public class TotalOrderVersionedCommitCommand extends VersionedCommitCommand {
+
+ public static final byte COMMAND_ID = 36;
+ private static final Log log = LogFactory.getLog(TotalOrderVersionedCommitCommand.class);
+
+ public TotalOrderVersionedCommitCommand(String cacheName, GlobalTransaction gtx) {
+ super(cacheName, gtx);
+ }
+
+ public TotalOrderVersionedCommitCommand(String cacheName) {
+ super(cacheName);
+ }
+
+ private TotalOrderVersionedCommitCommand() {
+ super(null); // For command id uniqueness test
+ }
+
+ @Override
+ public byte getCommandId() {
+ return COMMAND_ID;
+ }
+
+ @Override
+ protected RemoteTransaction getRemoteTransaction() {
+ return txTable.getOrCreateRemoteTransaction(globalTx, null);
+ }
+}
View
77 ...rc/main/java/org/infinispan/commands/tx/totalorder/TotalOrderVersionedPrepareCommand.java
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+
+package org.infinispan.commands.tx.totalorder;
+
+import org.infinispan.commands.tx.VersionedPrepareCommand;
+import org.infinispan.commands.write.WriteCommand;
+import org.infinispan.transaction.TotalOrderRemoteTransactionState;
+import org.infinispan.transaction.xa.GlobalTransaction;
+
+import java.util.List;
+
+/**
+ * Command corresponding to the 1st phase of 2PC when Total Order based protocol is used. This command is used when
+ * versioned entries are needed.
+ *
+ * @author Pedro Ruivo
+ * @since 5.3
+ */
+public class TotalOrderVersionedPrepareCommand extends VersionedPrepareCommand implements TotalOrderPrepareCommand {
+
+ public static final byte COMMAND_ID = 39;
+ private boolean skipWriteSkewCheck;
+
+ public TotalOrderVersionedPrepareCommand(String cacheName, GlobalTransaction gtx, List<WriteCommand> modifications, boolean onePhase) {
+ super(cacheName, gtx, modifications, onePhase);
+ }
+
+ public TotalOrderVersionedPrepareCommand(String cacheName) {
+ super(cacheName);
+ }
+
+ private TotalOrderVersionedPrepareCommand() {
+ super(null); // For command id uniqueness test
+ }
+
+ @Override
+ public byte getCommandId() {
+ return COMMAND_ID;
+ }
+
+ @Override
+ public void markAsOnePhaseCommit() {
+ this.onePhaseCommit = true;
+ }
+
+ @Override
+ public void markSkipWriteSkewCheck() {
+ this.skipWriteSkewCheck = true;
+ }
+
+ @Override
+ public boolean skipWriteSkewCheck() {
+ return skipWriteSkewCheck;
+ }
+
+ @Override
+ public TotalOrderRemoteTransactionState getOrCreateState() {
+ return getRemoteTransaction().getTransactionState();
+ }
+
+}
View
11 core/src/main/java/org/infinispan/configuration/cache/Configurations.java
@@ -25,6 +25,7 @@
* Helper configuration methods.
*
* @author Galder Zamarreño
+ * @author Pedro Ruivo
* @since 5.2
*/
public class Configurations {
@@ -50,4 +51,14 @@ public static boolean isStateTransferEnabled(Configuration cfg) {
|| (cfg.loaders().fetchPersistentState());
}
+ public static boolean isOnePhaseTotalOrderCommit(Configuration cfg) {
+ return cfg.transaction().transactionProtocol().isTotalOrder() && !isVersioningEnabled(cfg);
+ }
+
+ public static boolean isVersioningEnabled(Configuration cfg) {
+ return cfg.locking().writeSkewCheck() &&
+ cfg.transaction().lockingMode() == LockingMode.OPTIMISTIC &&
+ cfg.versioning().enabled();
+ }
+
}
View
16 core/src/main/java/org/infinispan/configuration/cache/TransactionConfiguration.java
@@ -20,6 +20,7 @@
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
+import org.infinispan.transaction.TransactionProtocol;
import org.infinispan.transaction.lookup.TransactionManagerLookup;
import org.infinispan.transaction.lookup.TransactionSynchronizationRegistryLookup;
@@ -27,6 +28,7 @@
* Defines transactional (JTA) characteristics of the cache.
*
* @author pmuir
+ * @author Pedro Ruivo
*
*/
public class TransactionConfiguration {
@@ -46,13 +48,14 @@
private final boolean use1PcForAutoCommitTransactions;
private final long reaperWakeUpInterval;
private final long completedTxTimeout;
+ private final TransactionProtocol transactionProtocol; //2PC or Total order protocol
TransactionConfiguration(boolean autoCommit, long cacheStopTimeout, boolean eagerLockingSingleNode, LockingMode lockingMode,
boolean syncCommitPhase, boolean syncRollbackPhase, TransactionManagerLookup transactionManagerLookup,
TransactionSynchronizationRegistryLookup transactionSynchronizationRegistryLookup, TransactionMode transactionMode,
boolean useEagerLocking, boolean useSynchronization, boolean use1PcForAutoCommitTransactions,
- long reaperWakeUpInterval, long completedTxTimeout, RecoveryConfiguration recovery) {
+ long reaperWakeUpInterval, long completedTxTimeout, RecoveryConfiguration recovery, TransactionProtocol transactionProtocol) {
this.autoCommit = autoCommit;
this.cacheStopTimeout = cacheStopTimeout;
this.eagerLockingSingleNode = eagerLockingSingleNode;
@@ -68,6 +71,7 @@
this.use1PcForAutoCommitTransactions = use1PcForAutoCommitTransactions;
this.reaperWakeUpInterval = reaperWakeUpInterval;
this.completedTxTimeout = completedTxTimeout;
+ this.transactionProtocol = transactionProtocol;
}
/**
@@ -324,6 +328,9 @@ public boolean equals(Object o) {
if (transactionMode != that.transactionMode) return false;
if (transactionSynchronizationRegistryLookup != null ? !transactionSynchronizationRegistryLookup.equals(that.transactionSynchronizationRegistryLookup) : that.transactionSynchronizationRegistryLookup != null)
return false;
+ if (transactionProtocol != that.transactionProtocol) {
+ return false;
+ }
return true;
}
@@ -343,7 +350,14 @@ public int hashCode() {
result = 31 * result + (useSynchronization ? 1 : 0);
result = 31 * result + (recovery != null ? recovery.hashCode() : 0);
result = 31 * result + (use1PcForAutoCommitTransactions ? 1 : 0);
+ result = 31 * result + (transactionProtocol != null ? transactionProtocol.hashCode() : 0);
return result;
}
+ /**
+ * @return the transaction protocol in use (2PC or Total Order)
+ */
+ public TransactionProtocol transactionProtocol() {
+ return transactionProtocol;
+ }
}
View
30 core/src/main/java/org/infinispan/configuration/cache/TransactionConfigurationBuilder.java
@@ -20,8 +20,10 @@
import org.infinispan.CacheConfigurationException;
import org.infinispan.configuration.Builder;
+import org.infinispan.config.ConfigurationException;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
+import org.infinispan.transaction.TransactionProtocol;
import org.infinispan.transaction.lookup.GenericTransactionManagerLookup;
import org.infinispan.transaction.lookup.TransactionManagerLookup;
import org.infinispan.transaction.lookup.TransactionSynchronizationRegistryLookup;
@@ -34,6 +36,7 @@
* Defines transactional (JTA) characteristics of the cache.
*
* @author pmuir
+ * @author Pedro Ruivo
*/
public class TransactionConfigurationBuilder extends AbstractConfigurationChildBuilder implements Builder<TransactionConfiguration> {
@@ -52,6 +55,7 @@
private boolean use1PcForAutoCommitTransactions = false;
private long reaperWakeUpInterval = 1000;
private long completedTxTimeout = 15000;
+ private TransactionProtocol transactionProtocol = TransactionProtocol.DEFAULT;
TransactionConfigurationBuilder(ConfigurationBuilder builder) {
@@ -254,6 +258,25 @@ public void validate() {
throw new CacheConfigurationException("reaperWakeUpInterval must be > 0, we got " + reaperWakeUpInterval);
if (completedTxTimeout < 0)
throw new CacheConfigurationException("completedTxTimeout must be > 0, we got " + reaperWakeUpInterval);
+ if(transactionProtocol == TransactionProtocol.TOTAL_ORDER) {
+ //total order only supports transactional caches
+ if(transactionMode != TransactionMode.TRANSACTIONAL) {
+ throw new ConfigurationException("Total Order based protocol not available in " + transactionMode +" cache");
+ }
+
+ //total order only supports replicated and distributed mode
+ if(!clustering().cacheMode().isReplicated() && !clustering().cacheMode().isDistributed()) {
+ throw new ConfigurationException(clustering().cacheMode().friendlyCacheModeString() + " is not supported by Total Order based protocol");
+ }
+
+ if (recovery.create().enabled()) {
+ throw new ConfigurationException("Total Order based protocol not available with recovery");
+ }
+
+ if (lockingMode != LockingMode.OPTIMISTIC) {
+ throw new ConfigurationException("Total Order based protocol not available with " + lockingMode);
+ }
+ }
}
@Override
@@ -267,7 +290,7 @@ else if (transactionMode == null)
transactionMode = TransactionMode.NON_TRANSACTIONAL;
return new TransactionConfiguration(autoCommit, cacheStopTimeout, eagerLockingSingleNode, lockingMode, syncCommitPhase,
syncRollbackPhase, transactionManagerLookup, transactionSynchronizationRegistryLookup, transactionMode,
- useEagerLocking, useSynchronization, use1PcForAutoCommitTransactions, reaperWakeUpInterval, completedTxTimeout, recovery.create());
+ useEagerLocking, useSynchronization, use1PcForAutoCommitTransactions, reaperWakeUpInterval, completedTxTimeout, recovery.create(), transactionProtocol);
}
@Override
@@ -287,6 +310,7 @@ public TransactionConfigurationBuilder read(TransactionConfiguration template) {
this.recovery.read(template.recovery());
this.reaperWakeUpInterval = template.reaperWakeUpInterval();
this.completedTxTimeout = template.completedTxTimeout();
+ this.transactionProtocol = template.transactionProtocol();
return this;
}
@@ -312,4 +336,8 @@ public String toString() {
'}';
}
+ public TransactionConfigurationBuilder transactionProtocol(TransactionProtocol transactionProtocol) {
+ this.transactionProtocol = transactionProtocol;
+ return this;
+ }
}
View
10 core/src/main/java/org/infinispan/configuration/global/GlobalConfiguration.java
@@ -44,6 +44,7 @@
* @author Mircea.Markus@jboss.com
* @author Galder Zamarreño
* @author Pete Muir
+ * @author Pedro Ruivo
* @since 5.1
*
* @see <a href="../../../config.html#ce_infinispan_global">Configuration reference</a>
@@ -61,6 +62,7 @@
private final ExecutorFactoryConfiguration asyncListenerExecutor;
private final ExecutorFactoryConfiguration asyncTransportExecutor;
private final ExecutorFactoryConfiguration remoteCommandsExecutor;
+ private final ExecutorFactoryConfiguration totalOrderExecutor;
private final ScheduledExecutorFactoryConfiguration evictionScheduledExecutor;
private final ScheduledExecutorFactoryConfiguration replicationQueueScheduledExecutor;
private final GlobalJmxStatisticsConfiguration globalJmxStatistics;
@@ -76,7 +78,7 @@
ScheduledExecutorFactoryConfiguration evictionScheduledExecutor,
ScheduledExecutorFactoryConfiguration replicationQueueScheduledExecutor, GlobalJmxStatisticsConfiguration globalJmxStatistics,
TransportConfiguration transport, SerializationConfiguration serialization, ShutdownConfiguration shutdown,
- List<?> modules, SiteConfiguration site,ClassLoader cl) {
+ List<?> modules, SiteConfiguration site,ClassLoader cl, ExecutorFactoryConfiguration totalOrderExecutor) {
this.asyncListenerExecutor = asyncListenerExecutor;
this.asyncTransportExecutor = asyncTransportExecutor;
this.remoteCommandsExecutor = remoteCommandsExecutor;
@@ -93,6 +95,7 @@
this.modules = Collections.unmodifiableMap(moduleMap);
this.site = site;
this.cl = new WeakReference<ClassLoader>(cl);
+ this.totalOrderExecutor = totalOrderExecutor;
}
public ExecutorFactoryConfiguration asyncListenerExecutor() {
@@ -166,6 +169,11 @@ public String toString() {
", modules=" + modules +
", site=" + site +
", cl=" + cl +
+ ", totalOrderExecutor=" + totalOrderExecutor +
'}';
}
+
+ public ExecutorFactoryConfiguration totalOrderExecutor() {
+ return totalOrderExecutor;
+ }
}
View
19 core/src/main/java/org/infinispan/configuration/global/GlobalConfigurationBuilder.java
@@ -43,6 +43,7 @@
private final ExecutorFactoryConfigurationBuilder asyncTransportExecutor;
private final ExecutorFactoryConfigurationBuilder asyncListenerExecutor;
private final ExecutorFactoryConfigurationBuilder remoteCommandsExecutor;
+ private final ExecutorFactoryConfigurationBuilder totalOrderExecutor;
private final ScheduledExecutorFactoryConfigurationBuilder evictionScheduledExecutor;
private final ScheduledExecutorFactoryConfigurationBuilder replicationQueueScheduledExecutor;
private final ShutdownConfigurationBuilder shutdown;
@@ -61,6 +62,8 @@ public GlobalConfigurationBuilder() {
this.replicationQueueScheduledExecutor = new ScheduledExecutorFactoryConfigurationBuilder(this);
this.shutdown = new ShutdownConfigurationBuilder(this);
this.site = new SiteConfigurationBuilder(this);
+ //set a new executor by default, that allows to set the core number of threads and the keep alive time
+ this.totalOrderExecutor = new ExecutorFactoryConfigurationBuilder(this);
}
/**
@@ -173,11 +176,15 @@ public SiteConfigurationBuilder site() {
}
}
+ public ExecutorFactoryConfigurationBuilder totalOrderExecutor() {
+ return totalOrderExecutor;
+ }
+
@SuppressWarnings("unchecked")
public void validate() {
for (AbstractGlobalConfigurationBuilder<?> validatable : asList(asyncListenerExecutor, asyncTransportExecutor,
remoteCommandsExecutor, evictionScheduledExecutor, replicationQueueScheduledExecutor, globalJmxStatistics, transport,
- serialization, shutdown, site)) {
+ serialization, shutdown, site, totalOrderExecutor)) {
validatable.validate();
}
for (Builder<?> m : modules) {
@@ -203,7 +210,8 @@ public GlobalConfiguration build() {
shutdown.create(),
modulesConfig,
site.create(),
- cl.get()
+ cl.get(),
+ totalOrderExecutor.create()
);
}
@@ -226,6 +234,7 @@ public GlobalConfigurationBuilder read(GlobalConfiguration template) {
shutdown.read(template.shutdown());
transport.read(template.transport());
site.read(template.sites());
+ totalOrderExecutor.read(template.totalOrderExecutor());
return this;
}
@@ -254,6 +263,7 @@ public String toString() {
", replicationQueueScheduledExecutor=" + replicationQueueScheduledExecutor +
", shutdown=" + shutdown +
", site=" + site +
+ ", totalOrderExecutor=" + totalOrderExecutor +
'}';
}
@@ -286,7 +296,7 @@ public boolean equals(Object o) {
if (transport != null ? !transport.equals(that.transport) : that.transport != null)
return false;
- return true;
+ return !(totalOrderExecutor != null ? !totalOrderExecutor.equals(that.totalOrderExecutor()) : that.totalOrderExecutor != null);
}
@Override
@@ -302,7 +312,8 @@ public int hashCode() {
result = 31 * result + (replicationQueueScheduledExecutor != null ? replicationQueueScheduledExecutor.hashCode() : 0);
result = 31 * result + (shutdown != null ? shutdown.hashCode() : 0);
result = 31 * result + (site != null ? site.hashCode() : 0);
+ result = 31 * result + (totalOrderExecutor != null ? totalOrderExecutor().hashCode() : 0);
return result;
}
-}
+}
View
4 core/src/main/java/org/infinispan/configuration/parsing/Attribute.java
@@ -149,7 +149,9 @@
MIN_TIME_TO_WAIT("minTimeToWait"),
USE_TWO_PHASE_COMMIT("useTwoPhaseCommit"),
REAPER_WAKE_UP_INTERVAL("reaperWakeUpInterval"),
- COMPLETED_TX_TIMEOUT("completedTxTimeout");
+ COMPLETED_TX_TIMEOUT("completedTxTimeout"),
+ TRANSACTION_PROTOCOL("transactionProtocol"),
+ ;
private final String name;
View
3  core/src/main/java/org/infinispan/configuration/parsing/Element.java
@@ -89,7 +89,8 @@
BACKUPS("backups"),
BACKUP("backup"),
BACKUP_FOR("backupFor"),
- TAKE_OFFLINE("takeOffline")
+ TAKE_OFFLINE("takeOffline"),
+ TOTAL_ORDER_EXECUTOR("totalOrderExecutor"),
;
private final String name;
View
142 core/src/main/java/org/infinispan/configuration/parsing/Parser53.java
@@ -23,7 +23,9 @@
import org.infinispan.configuration.cache.*;
import org.infinispan.configuration.cache.FileCacheStoreConfigurationBuilder.FsyncMode;
import org.infinispan.configuration.cache.InterceptorConfiguration.Position;
+import org.infinispan.configuration.global.ExecutorFactoryConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.configuration.global.ScheduledExecutorFactoryConfigurationBuilder;
import org.infinispan.configuration.global.ShutdownHookBehavior;
import org.infinispan.container.DataContainer;
import org.infinispan.distribution.ch.ConsistentHashFactory;
@@ -44,6 +46,7 @@
import org.infinispan.remoting.transport.Transport;
import org.infinispan.transaction.LockingMode;
import org.infinispan.transaction.TransactionMode;
+import org.infinispan.transaction.TransactionProtocol;
import org.infinispan.transaction.lookup.TransactionManagerLookup;
import org.infinispan.util.Util;
import org.infinispan.util.concurrent.IsolationLevel;
@@ -411,6 +414,9 @@ private void parseTransaction(final XMLExtendedStreamReader reader, final Config
case COMPLETED_TX_TIMEOUT:
builder.transaction().completedTxTimeout(Long.parseLong(value));
break;
+ case TRANSACTION_PROTOCOL:
+ builder.transaction().transactionProtocol(TransactionProtocol.valueOf(value));
+ break;
default:
throw ParseUtils.unexpectedAttribute(reader, i);
}
@@ -1479,19 +1485,23 @@ private void parseGlobal(final XMLExtendedStreamReader reader, final Configurati
Element element = Element.forName(reader.getLocalName());
switch (element) {
case ASYNC_LISTENER_EXECUTOR: {
- parseAsyncListenerExecutor(reader, holder);
+ parseExecutor(reader, holder.getGlobalConfigurationBuilder().asyncListenerExecutor(),
+ holder.getClassLoader());
break;
}
case ASYNC_TRANSPORT_EXECUTOR: {
- parseAsyncTransportExecutor(reader, holder);
+ parseExecutor(reader, holder.getGlobalConfigurationBuilder().asyncTransportExecutor(),
+ holder.getClassLoader());
break;
}
case REMOTE_COMMNAND_EXECUTOR: {
- parseRemoteCommandsExecutor(reader, holder);
+ parseExecutor(reader, holder.getGlobalConfigurationBuilder().remoteCommandsExecutor(),
+ holder.getClassLoader());
break;
}
case EVICTION_SCHEDULED_EXECUTOR: {
- parseEvictionScheduledExecutor(reader, holder);
+ parseScheduledExecutor(reader, holder.getGlobalConfigurationBuilder().evictionScheduledExecutor(),
+ holder.getClassLoader());
break;
}
case GLOBAL_JMX_STATISTICS: {
@@ -1503,7 +1513,8 @@ private void parseGlobal(final XMLExtendedStreamReader reader, final Configurati
break;
}
case REPLICATION_QUEUE_SCHEDULED_EXECUTOR: {
- parseReplicationQueueScheduledExecutor(reader, holder);
+ parseScheduledExecutor(reader, holder.getGlobalConfigurationBuilder().replicationQueueScheduledExecutor(),
+ holder.getClassLoader());
break;
}
case SERIALIZATION: {
@@ -1522,6 +1533,10 @@ private void parseGlobal(final XMLExtendedStreamReader reader, final Configurati
parseGlobalSites(reader, holder);
break;
}
+ case TOTAL_ORDER_EXECUTOR:
+ parseExecutor(reader, holder.getGlobalConfigurationBuilder().totalOrderExecutor(),
+ holder.getClassLoader());
+ break;
default: {
throw ParseUtils.unexpectedElement(reader);
}
@@ -1539,38 +1554,6 @@ private void parseGlobal(final XMLExtendedStreamReader reader, final Configurati
}
}
- private void parseRemoteCommandsExecutor(final XMLExtendedStreamReader reader, final ConfigurationBuilderHolder holder)
- throws XMLStreamException {
- GlobalConfigurationBuilder builder = holder.getGlobalConfigurationBuilder();
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- ParseUtils.requireNoNamespaceAttribute(reader, i);
- String value = replaceProperties(reader.getAttributeValue(i));
- Attribute attribute = Attribute.forName(reader.getAttributeLocalName(i));
- switch (attribute) {
- case FACTORY: {
- builder.remoteCommandsExecutor().factory(Util.<ExecutorFactory> getInstance(value, holder.getClassLoader()));
- break;
- }
- default: {
- throw ParseUtils.unexpectedAttribute(reader, i);
- }
- }
- }
-
- while (reader.hasNext() && (reader.nextTag() != XMLStreamConstants.END_ELEMENT)) {
- Element element = Element.forName(reader.getLocalName());
- switch (element) {
- case PROPERTIES: {
- builder.remoteCommandsExecutor().withProperties(parseProperties(reader));
- break;
- }
- default: {
- throw ParseUtils.unexpectedElement(reader);
- }
- }
- }
- }
-
private void parseTransport(final XMLExtendedStreamReader reader, final ConfigurationBuilderHolder holder) throws XMLStreamException {
GlobalConfigurationBuilder builder = holder.getGlobalConfigurationBuilder();
for (int i = 0; i < reader.getAttributeCount(); i++) {
@@ -1736,38 +1719,6 @@ private void parseAdvancedExternalizers(final XMLExtendedStreamReader reader, fi
}
}
- private void parseReplicationQueueScheduledExecutor(final XMLExtendedStreamReader reader, final ConfigurationBuilderHolder holder)
- throws XMLStreamException {
- GlobalConfigurationBuilder builder = holder.getGlobalConfigurationBuilder();
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- ParseUtils.requireNoNamespaceAttribute(reader, i);
- String value = replaceProperties(reader.getAttributeValue(i));
- Attribute attribute = Attribute.forName(reader.getAttributeLocalName(i));
- switch (attribute) {
- case FACTORY: {
- builder.replicationQueueScheduledExecutor().factory(Util.<ScheduledExecutorFactory> getInstance(value, holder.getClassLoader()));
- break;
- }
- default: {
- throw ParseUtils.unexpectedAttribute(reader, i);
- }
- }
- }
-
- while (reader.hasNext() && (reader.nextTag() != XMLStreamConstants.END_ELEMENT)) {
- Element element = Element.forName(reader.getLocalName());
- switch (element) {
- case PROPERTIES: {
- builder.replicationQueueScheduledExecutor().withProperties(parseProperties(reader));
- break;
- }
- default: {
- throw ParseUtils.unexpectedElement(reader);
- }
- }
- }
- }
-
private void parseGlobalJMXStatistics(final XMLExtendedStreamReader reader, final ConfigurationBuilderHolder holder)
throws XMLStreamException {
GlobalConfigurationBuilder builder = holder.getGlobalConfigurationBuilder();
@@ -1799,7 +1750,7 @@ private void parseGlobalJMXStatistics(final XMLExtendedStreamReader reader, fina
break;
}
case MBEAN_SERVER_LOOKUP: {
- builder.globalJmxStatistics().mBeanServerLookup(Util.<MBeanServerLookup> getInstance(value, holder.getClassLoader()));
+ builder.globalJmxStatistics().mBeanServerLookup(Util.<MBeanServerLookup>getInstance(value, holder.getClassLoader()));
break;
}
default: {
@@ -1822,16 +1773,15 @@ private void parseGlobalJMXStatistics(final XMLExtendedStreamReader reader, fina
}
}
- private void parseEvictionScheduledExecutor(final XMLExtendedStreamReader reader, final ConfigurationBuilderHolder holder)
- throws XMLStreamException {
- GlobalConfigurationBuilder builder = holder.getGlobalConfigurationBuilder();
+ private void parseScheduledExecutor(final XMLExtendedStreamReader reader, final ScheduledExecutorFactoryConfigurationBuilder factoryBuilder,
+ final ClassLoader classLoader) throws XMLStreamException {
for (int i = 0; i < reader.getAttributeCount(); i++) {
ParseUtils.requireNoNamespaceAttribute(reader, i);
String value = replaceProperties(reader.getAttributeValue(i));
Attribute attribute = Attribute.forName(reader.getAttributeLocalName(i));
switch (attribute) {
case FACTORY: {
- builder.evictionScheduledExecutor().factory(Util.<ScheduledExecutorFactory> getInstance(value, holder.getClassLoader()));
+ factoryBuilder.factory(Util.<ScheduledExecutorFactory> getInstance(value, classLoader));
break;
}
default: {
@@ -1844,7 +1794,7 @@ private void parseEvictionScheduledExecutor(final XMLExtendedStreamReader reader
Element element = Element.forName(reader.getLocalName());
switch (element) {
case PROPERTIES: {
- builder.evictionScheduledExecutor().withProperties(parseProperties(reader));
+ factoryBuilder.withProperties(parseProperties(reader));
break;
}
default: {
@@ -1854,16 +1804,15 @@ private void parseEvictionScheduledExecutor(final XMLExtendedStreamReader reader
}
}
- private void parseAsyncTransportExecutor(final XMLExtendedStreamReader reader, final ConfigurationBuilderHolder holder)
- throws XMLStreamException {
- GlobalConfigurationBuilder builder = holder.getGlobalConfigurationBuilder();
+ private void parseExecutor(final XMLExtendedStreamReader reader, final ExecutorFactoryConfigurationBuilder factoryBuilder,
+ final ClassLoader classLoader) throws XMLStreamException {
for (int i = 0; i < reader.getAttributeCount(); i++) {
ParseUtils.requireNoNamespaceAttribute(reader, i);
String value = replaceProperties(reader.getAttributeValue(i));
Attribute attribute = Attribute.forName(reader.getAttributeLocalName(i));
switch (attribute) {
case FACTORY: {
- builder .asyncTransportExecutor().factory(Util.<ExecutorFactory> getInstance(value, holder.getClassLoader()));
+ factoryBuilder.factory(Util.<ExecutorFactory>getInstance(value, classLoader));
break;
}
default: {
@@ -1876,7 +1825,7 @@ private void parseAsyncTransportExecutor(final XMLExtendedStreamReader reader, f
Element element = Element.forName(reader.getLocalName());
switch (element) {
case PROPERTIES: {
- builder.asyncTransportExecutor().withProperties(parseProperties(reader));
+ factoryBuilder.withProperties(parseProperties(reader));
break;
}
default: {
@@ -1886,39 +1835,6 @@ private void parseAsyncTransportExecutor(final XMLExtendedStreamReader reader, f
}
}
- private void parseAsyncListenerExecutor(final XMLExtendedStreamReader reader, final ConfigurationBuilderHolder holder)
- throws XMLStreamException {
- GlobalConfigurationBuilder builder = holder.getGlobalConfigurationBuilder();
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- ParseUtils.requireNoNamespaceAttribute(reader, i);
- String value = replaceProperties(reader.getAttributeValue(i));
- Attribute attribute = Attribute.forName(reader.getAttributeLocalName(i));
- switch (attribute) {
- case FACTORY: {
- builder.asyncListenerExecutor().factory(Util.<ExecutorFactory> getInstance(value, holder.getClassLoader()));
- break;
- }
- default: {
- throw ParseUtils.unexpectedAttribute(reader, i);
- }
- }
- }
-
- while (reader.hasNext() && (reader.nextTag() != XMLStreamConstants.END_ELEMENT)) {
- Element element = Element.forName(reader.getLocalName());
- switch (element) {
- case PROPERTIES: {
- builder.asyncListenerExecutor().withProperties(parseProperties(reader));
- break;
- }
- default: {
- throw ParseUtils.unexpectedElement(reader);
- }
- }
- }
-
- }
-
public static Properties parseProperties(final XMLExtendedStreamReader reader) throws XMLStreamException {
ParseUtils.requireNoAttributes(reader);
View
2  core/src/main/java/org/infinispan/distexec/DefaultExecutorService.java
@@ -891,7 +891,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
log.couldNotExecuteCancellationLocally(e.getLocalizedMessage());
}
} else {
- rpc.invokeRemotely(Collections.singletonList(getExecutionTarget()), ccc, true);
+ rpc.invokeRemotely(Collections.singletonList(getExecutionTarget()), ccc, true, false);
}
cancelled = true;
done = true;
View
4 core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceTask.java
@@ -387,7 +387,7 @@ public Object call() throws Exception {
}
}
});
- rpc.invokeRemotely(cache.getRpcManager().getMembers(), ccc, true, false);
+ rpc.invokeRemotely(cache.getRpcManager().getMembers(), ccc, true, false, false);
}
protected Set<KOut> executeMapPhase(boolean useCompositeKeys) throws InterruptedException,
@@ -769,7 +769,7 @@ public boolean cancel(boolean mayInterruptIfRunning) {
}
} else {
rpc.invokeRemotely(Collections.singletonList(task.getExecutionTarget()), cc,
- true);
+ true, false);
}
cancelled = true;
done = true;
View
8 core/src/main/java/org/infinispan/distribution/L1ManagerImpl.java
@@ -176,7 +176,7 @@ public void addRequestor(Object key, Address origin) {
toExecute = new Runnable() {
@Override
public void run() {
- rpcManager.broadcastRpcCommand(rpcCommand, true);
+ rpcManager.broadcastRpcCommand(rpcCommand, true, false);
}
};
} else {
@@ -184,7 +184,7 @@ public void run() {
@Override
public void run() {
rpcManager.invokeRemotely(invalidationAddresses, rpcCommand,
- ResponseMode.SYNCHRONOUS, rpcTimeout, true);
+ ResponseMode.SYNCHRONOUS, rpcTimeout, true, false);
}
};
}
@@ -221,7 +221,7 @@ public void run() {
return asyncTransportExecutor.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
- rpcManager.broadcastRpcCommand(ic, true);
+ rpcManager.broadcastRpcCommand(ic, true, false);
return retval;
}
});
@@ -240,7 +240,7 @@ public Object call() throws Exception {
return asyncTransportExecutor.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
- rpcManager.invokeRemotely(invalidationAddresses, rpc, ResponseMode.SYNCHRONOUS, rpcTimeout, true);
+ rpcManager.invokeRemotely(invalidationAddresses, rpc, ResponseMode.SYNCHRONOUS, rpcTimeout, true, false);
return retval;
}
});
View
154 .../main/java/org/infinispan/executors/LazyInitializingBlockingTaskAwareExecutorService.java
@@ -0,0 +1,154 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2013 Red Hat Inc. and/or its affiliates and other contributors
+ * as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This copyrighted material is made available to anyone wishing to use,
+ * modify, copy, or redistribute it subject to the terms and conditions
+ * of the GNU Lesser General Public License, v. 2.1.
+ * This program is distributed in the hope that it will be useful, but WITHOUT A
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ * You should have received a copy of the GNU Lesser General Public License,
+ * v.2.1 along with this distribution; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
+ * MA 02110-1301, USA.
+ */
+
+package org.infinispan.executors;
+
+import org.infinispan.util.InfinispanCollections;
+import org.infinispan.util.concurrent.BlockingRunnable;
+import org.infinispan.util.concurrent.BlockingTaskAwareExecutorService;
+import org.infinispan.util.concurrent.BlockingTaskAwareExecutorServiceImpl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A delegating executor that lazily constructs and initializes the underlying executor.
+ *
+ * @author Pedro Ruivo
+ * @since 5.3
+ */
+public final class LazyInitializingBlockingTaskAwareExecutorService implements BlockingTaskAwareExecutorService {
+
+ private final ExecutorFactory factory;
+ private final Properties executorProperties;
+ private volatile BlockingTaskAwareExecutorService delegate;
+
+ public LazyInitializingBlockingTaskAwareExecutorService(ExecutorFactory factory, Properties executorProperties) {
+ this.factory = factory;
+ this.executorProperties = executorProperties;
+ }
+
+ @Override
+ public void execute(BlockingRunnable runnable) {
+ initIfNeeded();
+ delegate.execute(runnable);
+ }
+
+ @Override
+ public void checkForReadyTasks() {
+ if (delegate != null) {
+ delegate.checkForReadyTasks();
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (delegate != null) delegate.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ if (delegate == null)
+ return InfinispanCollections.emptyList();
+ else
+ return delegate.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return delegate == null || delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return delegate == null || delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ if (delegate == null)
+ return true;
+ else
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ initIfNeeded();
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ initIfNeeded();
+ return delegate.submit(task, result);
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ initIfNeeded();
+ return delegate.submit(task);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ initIfNeeded();
+ return delegate.invokeAll(tasks);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ initIfNeeded();
+ return delegate.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ initIfNeeded();
+ return delegate.invokeAny(tasks);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ initIfNeeded();
+ return delegate.invokeAny(tasks, timeout, unit);
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ initIfNeeded();
+ delegate.execute(command);
+ }
+
+ private void initIfNeeded() {
+ if (delegate == null) {
+ synchronized (this) {
+ if (delegate == null) {
+ delegate = new BlockingTaskAwareExecutorServiceImpl(factory.getExecutor(executorProperties));
+ }
+ }
+ }
+ }
+}
View
7 core/src/main/java/org/infinispan/factories/ComponentRegistry.java
@@ -37,6 +37,7 @@
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.remoting.responses.ResponseGenerator;
import org.infinispan.statetransfer.StateTransferManager;
+import org.infinispan.transaction.totalorder.TotalOrderManager;
import org.infinispan.util.InfinispanCollections;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -64,6 +65,7 @@
private StateTransferManager stateTransferManager;
private ResponseGenerator responseGenerator;
private CommandsFactory commandsFactory;
+ private TotalOrderManager totalOrderManager;
protected final WeakReference<ClassLoader> defaultClassLoader;
@@ -289,6 +291,7 @@ public void cacheComponents() {
stateTransferManager = getOrCreateComponent(StateTransferManager.class);
responseGenerator = getOrCreateComponent(ResponseGenerator.class);
commandsFactory = getLocalComponent(CommandsFactory.class);
+ totalOrderManager = getOrCreateComponent(TotalOrderManager.class);
}
@Override
@@ -296,4 +299,8 @@ public ComponentMetadataRepo getComponentMetadataRepo() {
return globalComponents.getComponentMetadataRepo();
}
+ public final TotalOrderManager getTotalOrderManager() {
+ return totalOrderManager;
+ }
+
}
View
7 core/src/main/java/org/infinispan/factories/EmptyConstructorNamedCacheFactory.java
@@ -48,6 +48,7 @@
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.statetransfer.StateTransferLockImpl;
import org.infinispan.transaction.TransactionCoordinator;
+import org.infinispan.transaction.totalorder.TotalOrderManager;
import org.infinispan.transaction.xa.TransactionFactory;
import org.infinispan.transaction.xa.recovery.RecoveryAdminOperations;
import org.infinispan.util.concurrent.locks.containers.LockContainer;
@@ -64,6 +65,7 @@
* Simple factory that just uses reflection and an empty constructor of the component type.
*
* @author Manik Surtani (<a href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @author Pedro Ruivo
* @since 4.0
*/
@DefaultFactoryFor(classes = {CacheNotifier.class, CommandsFactory.class,
@@ -72,7 +74,8 @@
BatchContainer.class, EvictionManager.class,
TransactionCoordinator.class, RecoveryAdminOperations.class, StateTransferLock.class,
ClusteringDependentLogic.class, LockContainer.class,
- L1Manager.class, TransactionFactory.class, BackupSender.class})
+ L1Manager.class, TransactionFactory.class, BackupSender.class,
+ TotalOrderManager.class})
public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
@Override
@@ -130,6 +133,8 @@
return (T) new TransactionFactory();
} else if (componentType.equals(BackupSender.class)) {
return (T) new BackupSenderImpl(globalConfiguration.sites().localSite());
+ } else if (componentType.equals(TotalOrderManager.class)) {
+ return (T) new TotalOrderManager();
}
}
View
83 core/src/main/java/org/infinispan/factories/InterceptorChainFactory.java
@@ -26,6 +26,7 @@
import org.infinispan.CacheException;
import org.infinispan.config.ConfigurationException;
import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.Configurations;
import org.infinispan.configuration.cache.CustomInterceptorsConfiguration;
import org.infinispan.configuration.cache.InterceptorConfiguration;
import org.infinispan.configuration.cache.CacheLoaderConfiguration;
@@ -41,6 +42,13 @@
import org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor;
import org.infinispan.interceptors.locking.OptimisticLockingInterceptor;
import org.infinispan.interceptors.locking.PessimisticLockingInterceptor;
+import org.infinispan.interceptors.totalorder.TotalOrderDistributionInterceptor;
+import org.infinispan.interceptors.totalorder.TotalOrderInterceptor;
+import org.infinispan.interceptors.totalorder.TotalOrderReplicationInterceptor;
+import org.infinispan.interceptors.totalorder.TotalOrderStateTransferInterceptor;
+import org.infinispan.interceptors.totalorder.TotalOrderVersionedDistributionInterceptor;
+import org.infinispan.interceptors.totalorder.TotalOrderVersionedEntryWrappingInterceptor;
+import org.infinispan.interceptors.totalorder.TotalOrderVersionedReplicationInterceptor;
import org.infinispan.interceptors.xsite.NonTransactionalBackupInterceptor;
import org.infinispan.interceptors.xsite.OptimisticBackupInterceptor;
import org.infinispan.interceptors.xsite.PessimisticBackupInterceptor;
@@ -59,6 +67,7 @@
* @author <a href="mailto:manik@jboss.org">Manik Surtani (manik@jboss.org)</a>
* @author Mircea.Markus@jboss.com
* @author Marko Luksa
+ * @author Pedro Ruivo
* @since 4.0
*/
@DefaultFactoryFor(classes = InterceptorChain.class)
@@ -93,14 +102,15 @@ private boolean isUsingMarshalledValues(Configuration c) {
}
public InterceptorChain buildInterceptorChain() {
- boolean needsVersionAwareComponents = configuration.transaction().transactionMode().isTransactional() && configuration.locking().writeSkewCheck() &&
- configuration.transaction().lockingMode() == LockingMode.OPTIMISTIC && configuration.versioning().enabled();
+ boolean needsVersionAwareComponents = configuration.transaction().transactionMode().isTransactional() &&
+ Configurations.isVersioningEnabled(configuration);
InterceptorChain interceptorChain = new InterceptorChain(componentRegistry.getComponentMetadataRepo());
// add the interceptor chain to the registry first, since some interceptors may ask for it.
componentRegistry.registerComponent(interceptorChain, InterceptorChain.class);
boolean invocationBatching = configuration.invocationBatching().enabled();
+ boolean isTotalOrder = configuration.transaction().transactionProtocol().isTotalOrder();
// load the icInterceptor first
if (invocationBatching) {
interceptorChain.setFirstInChain(createInterceptor(new BatchingInterceptor(), BatchingInterceptor.class));
@@ -126,10 +136,20 @@ public InterceptorChain buildInterceptorChain() {
// the state transfer lock ensures that the cache member list is up-to-date
// so it's necessary even if state transfer is disabled
if (configuration.clustering().cacheMode().isDistributed() || configuration.clustering().cacheMode().isReplicated()) {
- interceptorChain.appendInterceptor(createInterceptor(new StateTransferInterceptor(), StateTransferInterceptor.class), false);
+ if (isTotalOrder) {
+ interceptorChain.appendInterceptor(createInterceptor(new TotalOrderStateTransferInterceptor(),
+ TotalOrderStateTransferInterceptor.class), false);
+ } else {
+ interceptorChain.appendInterceptor(createInterceptor(new StateTransferInterceptor(), StateTransferInterceptor.class), false);
+ }
interceptorChain.appendInterceptor(createInterceptor(new TransactionSynchronizerInterceptor(), TransactionSynchronizerInterceptor.class), false);
}
+ //load total order interceptor
+ if (isTotalOrder) {
+ interceptorChain.appendInterceptor(createInterceptor(new TotalOrderInterceptor(), TotalOrderInterceptor.class), false);
+ }
+
// load the tx interceptor
if (configuration.transaction().transactionMode().isTransactional())
interceptorChain.appendInterceptor(createInterceptor(new TxInterceptor(), TxInterceptor.class), false);
@@ -154,15 +174,18 @@ public InterceptorChain buildInterceptorChain() {
configuration.transaction().lockingMode(LockingMode.PESSIMISTIC);
}
- if (configuration.transaction().transactionMode().isTransactional()) {
- if (configuration.transaction().lockingMode() == LockingMode.PESSIMISTIC) {
- interceptorChain.appendInterceptor(createInterceptor(new PessimisticLockingInterceptor(), PessimisticLockingInterceptor.class), false);
+ //the total order protocol doesn't need locks
+ if (!isTotalOrder) {
+ if (configuration.transaction().transactionMode().isTransactional()) {
+ if (configuration.transaction().lockingMode() == LockingMode.PESSIMISTIC) {
+ interceptorChain.appendInterceptor(createInterceptor(new PessimisticLockingInterceptor(), PessimisticLockingInterceptor.class), false);
+ } else {
+ interceptorChain.appendInterceptor(createInterceptor(new OptimisticLockingInterceptor(), OptimisticLockingInterceptor.class), false);
+ }
} else {
- interceptorChain.appendInterceptor(createInterceptor(new OptimisticLockingInterceptor(), OptimisticLockingInterceptor.class), false);
+ if (configuration.locking().supportsConcurrentUpdates())
+ interceptorChain.appendInterceptor(createInterceptor(new NonTransactionalLockingInterceptor(), NonTransactionalLockingInterceptor.class), false);
}
- } else {
- if (configuration.locking().supportsConcurrentUpdates())
- interceptorChain.appendInterceptor(createInterceptor(new NonTransactionalLockingInterceptor(), NonTransactionalLockingInterceptor.class), false);
}
if (configuration.sites().hasEnabledBackups() && !configuration.sites().disableBackups()) {
@@ -177,9 +200,14 @@ public InterceptorChain buildInterceptorChain() {
}
}
- if (needsVersionAwareComponents && configuration.clustering().cacheMode().isClustered())
- interceptorChain.appendInterceptor(createInterceptor(new VersionedEntryWrappingInterceptor(), VersionedEntryWrappingInterceptor.class), false);
- else
+ if (needsVersionAwareComponents && configuration.clustering().cacheMode().isClustered()) {
+ if (isTotalOrder) {
+ interceptorChain.appendInterceptor(createInterceptor(new TotalOrderVersionedEntryWrappingInterceptor(),
+ TotalOrderVersionedEntryWrappingInterceptor.class), false);
+ } else {
+ interceptorChain.appendInterceptor(createInterceptor(new VersionedEntryWrappingInterceptor(), VersionedEntryWrappingInterceptor.class), false);
+ }
+ } else
interceptorChain.appendInterceptor(createInterceptor(new EntryWrappingInterceptor(), EntryWrappingInterceptor.class), false);
if (configuration.loaders().usingCacheLoaders()) {
@@ -206,7 +234,7 @@ public InterceptorChain buildInterceptorChain() {
}
}
- if (configuration.deadlockDetection().enabled()) {
+ if (configuration.deadlockDetection().enabled() && !isTotalOrder) {
interceptorChain.appendInterceptor(createInterceptor(new DeadlockDetectingInterceptor(), DeadlockDetectingInterceptor.class), false);
}
@@ -217,11 +245,21 @@ public InterceptorChain buildInterceptorChain() {
switch (configuration.clustering().cacheMode()) {
case REPL_SYNC:
if (needsVersionAwareComponents) {
- interceptorChain.appendInterceptor(createInterceptor(new VersionedReplicationInterceptor(), VersionedReplicationInterceptor.class), false);
+ //added custom interceptor to replace the original
+ if (isTotalOrder) {
+ interceptorChain.appendInterceptor(createInterceptor(new TotalOrderVersionedReplicationInterceptor(),
+ TotalOrderVersionedReplicationInterceptor.class), false);
+ } else {
+ interceptorChain.appendInterceptor(createInterceptor(new VersionedReplicationInterceptor(), VersionedReplicationInterceptor.class), false);
+ }
break;
}
case REPL_ASYNC:
- interceptorChain.appendInterceptor(createInterceptor(new ReplicationInterceptor(), ReplicationInterceptor.class), false);
+ if (isTotalOrder) {
+ interceptorChain.appendInterceptor(createInterceptor(new TotalOrderReplicationInterceptor(), TotalOrderReplicationInterceptor.class), false);
+ } else {
+ interceptorChain.appendInterceptor(createInterceptor(new ReplicationInterceptor(), ReplicationInterceptor.class), false);
+ }
break;
case INVALIDATION_SYNC:
case INVALIDATION_ASYNC:
@@ -229,12 +267,21 @@ public InterceptorChain buildInterceptorChain() {
break;
case DIST_SYNC:
if (needsVersionAwareComponents) {
- interceptorChain.appendInterceptor(createInterceptor(new VersionedDistributionInterceptor(), VersionedDistributionInterceptor.class), false);
+ if (isTotalOrder) {
+ interceptorChain.appendInterceptor(createInterceptor(new TotalOrderVersionedDistributionInterceptor(),
+ TotalOrderVersionedDistributionInterceptor.class), false);
+ } else {
+ interceptorChain.appendInterceptor(createInterceptor(new VersionedDistributionInterceptor(), VersionedDistributionInterceptor.class), false);
+ }
break;
}
case DIST_ASYNC:
if (configuration.transaction().transactionMode().isTransactional()) {
- interceptorChain.appendInterceptor(createInterceptor(new TxDistributionInterceptor(), TxDistributionInterceptor.class), false);
+ if (isTotalOrder) {
+ interceptorChain.appendInterceptor(createInterceptor(new TotalOrderDistributionInterceptor(), TotalOrderDistributionInterceptor.class), false);
+ } else {
+ interceptorChain.appendInterceptor(createInterceptor(new TxDistributionInterceptor(), TxDistributionInterceptor.class), false);
+ }
} else {
if (configuration.locking().supportsConcurrentUpdates()) {
interceptorChain.appendInterceptor(createInterceptor(new NonTxConcurrentDistributionInterceptor(), NonTxConcurrentDistributionInterceptor.class), false);
View
12 core/src/main/java/org/infinispan/factories/KnownComponentNames.java
@@ -45,30 +45,34 @@
public static final String GLOBAL_MARSHALLER = "org.infinispan.marshaller.global";
public static final String CACHE_MARSHALLER = "org.infinispan.marshaller.cache";
public static final String CLASS_LOADER = "java.lang.ClassLoader";
+ public static final String TOTAL_ORDER_EXECUTOR = "org.infinispan.executors.totalOrderExecutor";
// Please make sure this is kept up to date
public static final Collection<String> ALL_KNOWN_COMPONENT_NAMES = Arrays.asList(
ASYNC_TRANSPORT_EXECUTOR, ASYNC_NOTIFICATION_EXECUTOR, EVICTION_SCHEDULED_EXECUTOR, ASYNC_REPLICATION_QUEUE_EXECUTOR,
MODULE_COMMAND_INITIALIZERS, MODULE_COMMAND_FACTORIES, GLOBAL_MARSHALLER, CACHE_MARSHALLER, CLASS_LOADER,
- REMOTE_COMMAND_EXECUTOR
+ REMOTE_COMMAND_EXECUTOR, TOTAL_ORDER_EXECUTOR
);
- private static final Map<String, Integer> DEFAULT_THREADCOUNTS = new HashMap<String, Integer>(3);
- private static final Map<String, Integer> DEFAULT_QUEUE_SIZE = new HashMap<String, Integer>(3);
- private static final Map<String, Integer> DEFAULT_THREADPRIO = new HashMap<String, Integer>(5);
+ private static final Map<String, Integer> DEFAULT_THREADCOUNTS = new HashMap<String, Integer>(4);
+ private static final Map<String, Integer> DEFAULT_QUEUE_SIZE = new HashMap<String, Integer>(4);
+ private static final Map<String, Integer> DEFAULT_THREADPRIO = new HashMap<String, Integer>(6);
static {
DEFAULT_THREADCOUNTS.put(ASYNC_NOTIFICATION_EXECUTOR, 1);