Skip to content

Loading…

ISPN-1908 Remote prepares should be dealt with in DIST mode #1011

Closed
wants to merge 1 commit into from

2 participants

@galderz
Infinispan member
@tristantarrant
Infinispan member

Merging

@tristantarrant
Infinispan member

Merged. Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
View
6 core/src/main/java/org/infinispan/interceptors/CacheStoreInterceptor.java
@@ -159,7 +159,7 @@ public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command)
@Override
public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable {
if (!skip(ctx, command)) {
- if (getLog().isTraceEnabled()) getLog().trace("transactional so don't put stuff in the cloader yet.");
+ if (getLog().isTraceEnabled()) getLog().trace("Transactional so don't put stuff in the cache store yet.");
if (ctx.hasModifications()) {
GlobalTransaction tx = ctx.getGlobalTransaction();
// this is a rollback method
@@ -178,7 +178,7 @@ public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand comm
@Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
if (!skip(ctx, command)) {
- if (getLog().isTraceEnabled()) getLog().trace("transactional so don't put stuff in the cloader yet.");
+ if (getLog().isTraceEnabled()) getLog().trace("Transactional so don't put stuff in the cache store yet.");
prepareCacheLoader(ctx, command.getGlobalTransaction(), ctx, command.isOnePhaseCommit());
}
return invokeNextInterceptor(ctx, command);
@@ -247,7 +247,7 @@ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) t
return returnValue;
}
- private void prepareCacheLoader(TxInvocationContext ctx, GlobalTransaction gtx, TxInvocationContext transactionContext, boolean onePhase) throws Throwable {
+ protected final void prepareCacheLoader(TxInvocationContext ctx, GlobalTransaction gtx, TxInvocationContext transactionContext, boolean onePhase) throws Throwable {
if (transactionContext == null) {
throw new Exception("transactionContext for transaction " + gtx + " not found in transaction table");
}
View
11 core/src/main/java/org/infinispan/interceptors/DistCacheStoreInterceptor.java
@@ -22,6 +22,7 @@
*/
package org.infinispan.interceptors;
+import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
@@ -29,6 +30,7 @@
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
+import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
@@ -137,6 +139,15 @@ public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command)
return returnValue;
}
+ @Override
+ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
+ if (!skip(ctx)) {
+ if (getLog().isTraceEnabled()) getLog().trace("Transactional so don't put stuff in the cache store yet.");
+ prepareCacheLoader(ctx, command.getGlobalTransaction(), ctx, command.isOnePhaseCommit());
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
/**
* Method that skips invocation if: - No store defined or, - The context contains Flag.SKIP_CACHE_STORE or, - The
* store is a shared one and node storing the key is not the 1st owner of the key or, - This is an L1 put operation.
View
119 core/src/test/java/org/infinispan/distribution/DistSyncTxCacheStoreSharedTest.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2012 Red Hat, Inc. and/or its affiliates.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA
+ */
+
+package org.infinispan.distribution;
+
+import org.infinispan.Cache;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.cache.LoaderConfigurationBuilder;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.loaders.CacheStore;
+import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.transaction.LockingMode;
+import org.infinispan.transaction.TransactionMode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Tests distributed caches with shared cache stores under transactional
+ * environments.
+ *
+ * @author Thomas Fromm
+ * @since 5.1
+ */
+@Test(groups = "functional", testName = "loaders.ISPN1908")
+public class DistSyncTxCacheStoreSharedTest extends MultipleCacheManagersTest {
+
+ private ConfigurationBuilder getCB(){
+ ConfigurationBuilder cb = new ConfigurationBuilder();
+ cb.clustering()
+ .cacheMode(CacheMode.DIST_SYNC)
+ .sync().replTimeout(60000)
+ .stateTransfer().timeout(180000).fetchInMemoryState(true)
+ .hash().numOwners(1).numVirtualNodes(48);
+
+ // transactions
+
+ cb.transaction()
+ .transactionMode(TransactionMode.TRANSACTIONAL)
+ .lockingMode(LockingMode.PESSIMISTIC)
+ .syncCommitPhase(true)
+ .syncRollbackPhase(true);
+
+ // cb.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL);
+
+ cb.loaders().passivation(false).preload(true).shared(true);
+ // Make it really shared by adding the test's name as store name
+ LoaderConfigurationBuilder lb = cb.loaders().addCacheLoader().cacheLoader(
+ new DummyInMemoryCacheStore());
+ lb.addProperty("storeName", getClass().getSimpleName());
+ lb.async().disable();
+ return cb;
+ }
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ createCluster(getCB(), 1);
+ waitForClusterToForm();
+ }
+
+ @Test
+ public void testInvalidPut() throws Exception {
+ Cache cache = cacheManagers.get(0).getCache("P006");
+
+ // add 1st 4 elements
+ for(int i = 0; i < 4; i++){
+ cache.put(cacheManagers.get(0).getAddress().toString()+"-"+i, "42");
+ }
+
+ // lets check if all elements arrived
+ CacheStore cs1 = cache.getAdvancedCache().getComponentRegistry().getComponent(CacheLoaderManager.class).getCacheStore();
+ Set<Object> keys = cs1.loadAllKeys(null);
+
+ Assert.assertEquals(keys.size(), 4);
+
+ // now start 2nd node
+ addClusterEnabledCacheManager(getCB());
+ waitForClusterToForm("P006");
+
+ cache = cacheManagers.get(1).getCache("P006");
+
+ // add next 4 elements
+ for(int i = 0; i < 4; i++){
+ cache.put(cacheManagers.get(1).getAddress().toString()+"-"+i, "42");
+ }
+
+ Set mergedKeys = new HashSet();
+ // add keys from all cache stores
+ CacheStore cs2 = cache.getAdvancedCache().getComponentRegistry().getComponent(CacheLoaderManager.class).getCacheStore();
+ log.debugf("Load from cache store via cache 1");
+ mergedKeys.addAll(cs1.loadAllKeys(null));
+ log.debugf("Load from cache store via cache 2");
+ mergedKeys.addAll(cs2.loadAllKeys(null));
+
+ Assert.assertEquals(mergedKeys.size(), 8);
+
+ }
+
+}
View
5 core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java
@@ -201,7 +201,10 @@ public InternalCacheEntry load(Object key) {
record("loadAllKeys");
Set<Object> set = new HashSet<Object>();
for (Object key: store.keySet()) {
- if (keysToExclude == null || !keysToExclude.contains(key)) set.add(key);
+ if (keysToExclude == null || !keysToExclude.contains(key)) {
+ log.debugf("Load %s from store %s@%s", key, storeName, Util.hexIdHashCode(store));
+ set.add(key);
+ }
}
return set;
}
Something went wrong with that request. Please try again.