Skip to content

Commit

Permalink
ISPN-5526 Replication: The DELTA_WRITE flag should force a remote get…
Browse files Browse the repository at this point in the history
… during state transfer
  • Loading branch information
dereed authored and danberindei committed Jun 15, 2015
1 parent 2a87ec1 commit 13da8da
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 6 deletions.
Expand Up @@ -76,17 +76,19 @@ protected boolean isNeedReliableReturnValues(FlagAffectedCommand command) {
}

protected boolean needsRemoteGet(InvocationContext ctx, AbstractDataCommand command) {
if (command.hasFlag(Flag.CACHE_MODE_LOCAL)
if (!command.hasFlag(Flag.DELTA_WRITE)
&& (command.hasFlag(Flag.CACHE_MODE_LOCAL)
|| command.hasFlag(Flag.SKIP_REMOTE_LOOKUP)
|| command.hasFlag(Flag.IGNORE_RETURN_VALUES)) {
|| command.hasFlag(Flag.IGNORE_RETURN_VALUES))) {
return false;
}
boolean shouldFetchFromRemote = false;
CacheEntry entry = ctx.lookupEntry(command.getKey());
if (entry == null || entry.isNull() || entry.isLockPlaceholder()) {
if (entry == null || entry.isNull() || entry.isLockPlaceholder()
|| command.hasFlag(Flag.DELTA_WRITE)) {
Object key = command.getKey();
ConsistentHash ch = stateTransferManager.getCacheTopology().getReadConsistentHash();
shouldFetchFromRemote = ctx.isOriginLocal() && !ch.isKeyLocalToNode(rpcManager.getAddress(), key) && !dataContainer.containsKey(key);
shouldFetchFromRemote = (ctx.isOriginLocal() && !ch.isKeyLocalToNode(rpcManager.getAddress(), key) || command.hasFlag(Flag.DELTA_WRITE)) && !dataContainer.containsKey(key);
if (!shouldFetchFromRemote && getLog().isTraceEnabled()) {
getLog().tracef("Not doing a remote get for key %s since entry is mapped to current node (%s) or is in L1. Owners are %s", key, rpcManager.getAddress(), ch.locateOwners(key));
}
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.infinispan.configuration.cache.Configurations;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
Expand Down Expand Up @@ -296,7 +297,7 @@ public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command)
* time. If the operation didn't originate locally we won't do any replication either.
*/
private Object handleCrudMethod(InvocationContext ctx, WriteCommand command, boolean skipRemoteGet) throws Throwable {
if (!skipRemoteGet) {
if (!skipRemoteGet || command.hasFlag(Flag.DELTA_WRITE)) {
remoteGetBeforeWrite(ctx, command);
}

Expand All @@ -309,7 +310,7 @@ private Object handleCrudMethod(InvocationContext ctx, WriteCommand command, boo
}

private void remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command) throws Throwable {
if (command instanceof AbstractDataCommand && (isNeedReliableReturnValues(command) || command.isConditional())) {
if (command instanceof AbstractDataCommand && (isNeedReliableReturnValues(command) || command.isConditional()) || command.hasFlag(Flag.DELTA_WRITE)) {
AbstractDataCommand singleKeyCommand = (AbstractDataCommand) command;

Object returnValue = null;
Expand Down
@@ -0,0 +1,78 @@
package org.infinispan.atomic;

import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.distribution.MagicKey;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.statetransfer.StateResponseCommand;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.tx.dld.ControlledRpcManager;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static org.testng.AssertJUnit.assertEquals;

/**
* Test modifications to an AtomicMap during state transfer are consistent.
*
* @author Dan Berindei
* @since 7.0
*/
@Test(groups = "functional", testName = "atomic.AtomicMapStateTransferTestRepl")
public class AtomicMapStateTransferTestRepl extends MultipleCacheManagersTest {

protected void createCacheManagers() throws Throwable {
ConfigurationBuilder c = getConfigurationBuilder();
createClusteredCaches(1, "atomic", c);
}

private ConfigurationBuilder getConfigurationBuilder() {
ConfigurationBuilder c = new ConfigurationBuilder();
c.clustering().cacheMode(CacheMode.REPL_SYNC);
c.transaction().transactionMode(TransactionMode.TRANSACTIONAL);
return c;
}

public void testAtomicMapPutDuringJoin() throws ExecutionException, InterruptedException {
Cache cache = cache(0, "atomic");
ControlledRpcManager crm = new ControlledRpcManager(cache.getAdvancedCache().getRpcManager());
TestingUtil.replaceComponent(cache, RpcManager.class, crm, true);

//MagicKey atomicMapKey = new MagicKey("atomicMapKey", cache);
String atomicMapKey = "atomicMapKey";
AtomicMap atomicMap = AtomicMapLookup.getAtomicMap(cache, atomicMapKey);
atomicMap.put("key1", "value1");

crm.blockBefore(StateResponseCommand.class);

ConfigurationBuilder c = getConfigurationBuilder();
final EmbeddedCacheManager joiner = addClusterEnabledCacheManager(c);
Future<Cache> future = fork(new Callable<Cache>() {
@Override
public Cache call() throws Exception {
return joiner.getCache("atomic");
}
});

crm.waitForCommandToBlock();

// Now we know state transfer will try to create an AtomicMap(key1=value1) on cache2
// Insert another key in the atomic map, and check that cache2 has both keys after the state transfer
atomicMap.put("key2", "value2");

crm.stopBlocking();
Cache cache2 = future.get();

AtomicMap atomicMap2 = AtomicMapLookup.getAtomicMap(cache2, atomicMapKey);
assertEquals(new HashSet<String>(Arrays.asList("key1", "key2")), atomicMap2.keySet());
}
}

0 comments on commit 13da8da

Please sign in to comment.