Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

ISPN-2530 Clear command does not work for distributed transactional c…

…aches with pessimistic locking

* Add CacheTransaction.hasModification(Class clazz) to be able to test if current transaction has a modification of a certain type
* DistributionInterceptor.visitPrepareCommand sends the PrepareCommand to all cache members if the current TX has a ClearCommand modification.
* LocalTransaction.getCommitNodes(..) will return the whole list of members if the current transaction has a ClearCommand
* LocalTransaction.getCommitNodes(..) can accept an empty list of recipients without failing
* Add functional tests for clear operation on distributed cache with non-tx, tx with pessimistic locking, tx with optimistic locking
  • Loading branch information...
commit d6b06c8461a825e2116236b78ad6f3a355658f63 1 parent 1429643
@anistor anistor authored danberindei committed
View
1  core/src/main/java/org/infinispan/commands/write/ClearCommand.java
@@ -32,7 +32,6 @@
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.util.InfinispanCollections;
-import java.util.Collections;
import java.util.Set;
/**
View
3  core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
@@ -374,7 +374,8 @@ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand comman
if (shouldInvokeRemoteTxCommand(ctx)) {
if (command.isOnePhaseCommit()) flushL1Caches(ctx); // if we are one-phase, don't block on this future.
- Collection<Address> recipients = dm.getAffectedNodes(ctx.getAffectedKeys());
+ boolean affectsAllNodes = ctx.getCacheTransaction().hasModification(ClearCommand.class);
+ Collection<Address> recipients = affectsAllNodes ? dm.getConsistentHash().getMembers() : dm.getAffectedNodes(ctx.getAffectedKeys());
prepareOnAffectedNodes(ctx, command, recipients, defaultSynchronous);
((LocalTxInvocationContext) ctx).remoteLocksAcquired(recipients);
View
11 core/src/main/java/org/infinispan/transaction/AbstractCacheTransaction.java
@@ -107,6 +107,17 @@ public void setModifications(WriteCommand[] modifications) {
this.modifications = Collections.synchronizedList(new ArrayList<WriteCommand>(Arrays.asList(modifications)));
}
+ public boolean hasModification(Class modificationClass) {
+ if (modifications != null) {
+ for (WriteCommand mod : modifications) {
+ if (modificationClass.isAssignableFrom(mod.getClass())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
@Override
public Map<Object, CacheEntry> getLookedUpEntries() {
return lookedUpEntries;
View
5 core/src/main/java/org/infinispan/transaction/AbstractEnlistmentAdapter.java
@@ -83,10 +83,7 @@ private void removeTransactionInfoRemotely(LocalTransaction localTransaction, Gl
if (mayHaveRemoteLocks(localTransaction) && isClustered() && !isSecondPhaseAsync) {
final TxCompletionNotificationCommand command = commandsFactory.buildTxCompletionNotificationCommand(null, gtx);
final Collection<Address> owners = clusteringLogic.getOwners(localTransaction.getAffectedKeys());
- Collection<Address> commitNodes = null;
- if (owners != null) {
- commitNodes = localTransaction.getCommitNodes(owners, rpcManager.getTopologyId(), rpcManager.getTransport().getMembers());
- }
+ Collection<Address> commitNodes = localTransaction.getCommitNodes(owners, rpcManager.getTopologyId(), rpcManager.getTransport().getMembers());
log.tracef("About to invoke tx completion notification on commitNodes: %s", commitNodes);
rpcManager.invokeRemotely(commitNodes, command, false, true);
}
View
9 core/src/main/java/org/infinispan/transaction/LocalTransaction.java
@@ -24,6 +24,7 @@
package org.infinispan.transaction;
import org.infinispan.CacheException;
+import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.remoting.transport.Address;
@@ -198,9 +199,15 @@ public void setFromRemoteSite(boolean fromRemoteSite) {
* this method returns the reunion between 'recipients' and {@link #getRemoteLocksAcquired()} from which it discards
* the members that left.
*/
- public Collection<Address> getCommitNodes(Collection<Address> recipients, int currentTopologyId, List<Address> members) {
+ public Collection<Address> getCommitNodes(Collection<Address> recipients, int currentTopologyId, Collection<Address> members) {
if (trace) log.tracef("getCommitNodes recipients=%s, currentTopologyId=%s, members=%s, txTopologyId=%s",
recipients, currentTopologyId, members, getTopologyId());
+ if (hasModification(ClearCommand.class)) {
+ return members;
+ }
+ if (recipients == null) {
+ return null;
+ }
Set<Address> allRecipients = new HashSet<Address>(getRemoteLocksAcquired());
allRecipients.addAll(recipients);
allRecipients.retainAll(members);
View
1  core/src/main/java/org/infinispan/transaction/xa/CacheTransaction.java
@@ -48,6 +48,7 @@
*/
List<WriteCommand> getModifications();
+ boolean hasModification(Class modificationClass);
CacheEntry lookupEntry(Object key);
View
113 core/src/test/java/org/infinispan/api/BaseDistClearTest.java
@@ -0,0 +1,113 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2012 Red Hat Inc. and/or its affiliates and other
+ * contributors as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.api;
+
+import org.infinispan.AdvancedCache;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.container.DataContainer;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.transaction.LockingMode;
+import org.infinispan.transaction.TransactionMode;
+import org.infinispan.transaction.lookup.DummyTransactionManagerLookup;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests if clear operation actually succeeds in removing all keys from all nodes of a distributed cluster.
+ * See https://issues.jboss.org/browse/ISPN-2530.
+ *
+ * @author anistor@redhat.com
+ * @since 5.2
+ */
+@Test(groups = "functional")
+public abstract class BaseDistClearTest extends MultipleCacheManagersTest {
+
+ protected final Log log = LogFactory.getLog(getClass());
+
+ protected AdvancedCache<Integer, String> c0;
+ protected AdvancedCache<Integer, String> c1;
+ protected AdvancedCache<Integer, String> c2;
+
+ private final ConfigurationBuilder builder;
+
+ protected BaseDistClearTest(boolean transactional, boolean optimistic) {
+ builder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, transactional, transactional);
+ builder.clustering().hash().numSegments(3).numOwners(2)
+ .stateTransfer().fetchInMemoryState(true)
+ .locking().lockAcquisitionTimeout(1000l);
+
+ if (transactional) {
+ builder.transaction().transactionMode(TransactionMode.TRANSACTIONAL)
+ .transactionManagerLookup(new DummyTransactionManagerLookup())
+ .syncCommitPhase(true).syncRollbackPhase(true)
+ .lockingMode(optimistic ? LockingMode.OPTIMISTIC : LockingMode.PESSIMISTIC);
+ }
+ }
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ createCluster(builder, 3);
+ waitForClusterToForm();
+
+ c0 = advancedCache(0);
+ c1 = advancedCache(1);
+ c2 = advancedCache(2);
+ }
+
+ public void testClear() throws Exception {
+ final int numKeys = 5;
+ log.infof("Putting %d keys into cache ..", numKeys);
+ for (int i = 0; i < numKeys; i++) {
+ String value = "val_" + i;
+ c0.put(i, value);
+
+ // force all values into L1 of the other nodes
+ assertEquals(value, c0.get(i));
+ assertEquals(value, c1.get(i));
+ assertEquals(value, c2.get(i));
+ }
+ log.info("Finished putting keys");
+
+ DataContainer dc0 = c0.getDataContainer();
+ DataContainer dc1 = c1.getDataContainer();
+ DataContainer dc2 = c2.getDataContainer();
+
+ assertTrue(dc0.size() > 0);
+ assertTrue(dc1.size() > 0);
+ assertTrue(dc2.size() > 0);
+
+ log.info("Clearing cache ..");
+ c0.clear();
+ log.info("Finished clearing cache");
+
+ assertEquals(0, dc0.size());
+ assertEquals(0, dc1.size());
+ assertEquals(0, dc2.size());
+ }
+}
View
38 core/src/test/java/org/infinispan/api/DistNonTxClearTest.java
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2012 Red Hat Inc. and/or its affiliates and other
+ * contributors as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.api;
+
+import org.testng.annotations.Test;
+
+/**
+ * @author anistor@redhat.com
+ * @since 5.2
+ */
+@Test(groups = "functional", testName = "api.DistNonTxClearTest")
+public class DistNonTxClearTest extends BaseDistClearTest {
+
+ public DistNonTxClearTest() {
+ super(false, false);
+ }
+}
View
38 core/src/test/java/org/infinispan/api/DistOptimisticTxClearTest.java
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2012 Red Hat Inc. and/or its affiliates and other
+ * contributors as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.api;
+
+import org.testng.annotations.Test;
+
+/**
+ * @author anistor@redhat.com
+ * @since 5.2
+ */
+@Test(groups = "functional", testName = "api.DistOptimisticTxClearTest")
+public class DistOptimisticTxClearTest extends BaseDistClearTest {
+
+ public DistOptimisticTxClearTest() {
+ super(true, true);
+ }
+}
View
38 core/src/test/java/org/infinispan/api/DistPessimisticTxClearTest.java
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2012 Red Hat Inc. and/or its affiliates and other
+ * contributors as indicated by the @author tags. All rights reserved.
+ * See the copyright.txt in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.infinispan.api;
+
+import org.testng.annotations.Test;
+
+/**
+ * @author anistor@redhat.com
+ * @since 5.2
+ */
+@Test(groups = "functional", testName = "api.DistPessimisticTxClearTest")
+public class DistPessimisticTxClearTest extends BaseDistClearTest {
+
+ public DistPessimisticTxClearTest() {
+ super(true, false);
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.