diff --git a/core/src/main/java/org/infinispan/xsite/BackupReceiver.java b/core/src/main/java/org/infinispan/xsite/BackupReceiver.java index 54adfaf05c65..b1fed09d3921 100644 --- a/core/src/main/java/org/infinispan/xsite/BackupReceiver.java +++ b/core/src/main/java/org/infinispan/xsite/BackupReceiver.java @@ -8,7 +8,7 @@ /** * Component present on a backup site that manages the backup information and logic. * - * @see BackupReceiverImpl + * @see ClusteredCacheBackupReceiver * @author Mircea Markus * @since 5.2 */ diff --git a/core/src/main/java/org/infinispan/xsite/BackupReceiverImpl.java b/core/src/main/java/org/infinispan/xsite/BackupReceiverImpl.java deleted file mode 100644 index 7037180b7e65..000000000000 --- a/core/src/main/java/org/infinispan/xsite/BackupReceiverImpl.java +++ /dev/null @@ -1,419 +0,0 @@ -package org.infinispan.xsite; - -import org.infinispan.AdvancedCache; -import org.infinispan.Cache; -import org.infinispan.commands.CommandsFactory; -import org.infinispan.commands.remote.CacheRpcCommand; -import org.infinispan.commons.util.concurrent.NotifyingFutureImpl; -import org.infinispan.commons.util.concurrent.NotifyingNotifiableFuture; -import org.infinispan.interceptors.locking.ClusteringDependentLogic; -import org.infinispan.metadata.Metadata; -import org.infinispan.commands.AbstractVisitor; -import org.infinispan.commands.VisitableCommand; -import org.infinispan.commands.tx.CommitCommand; -import org.infinispan.commands.tx.PrepareCommand; -import org.infinispan.commands.tx.RollbackCommand; -import org.infinispan.commands.write.ClearCommand; -import org.infinispan.commands.write.PutKeyValueCommand; -import org.infinispan.commands.write.PutMapCommand; -import org.infinispan.commands.write.RemoveCommand; -import org.infinispan.commands.write.ReplaceCommand; -import org.infinispan.commands.write.WriteCommand; -import org.infinispan.commons.CacheException; -import org.infinispan.context.Flag; -import org.infinispan.context.InvocationContext; -import org.infinispan.context.impl.TxInvocationContext; -import org.infinispan.remoting.LocalInvocation; -import org.infinispan.remoting.inboundhandler.DeliverOrder; -import org.infinispan.remoting.responses.CacheNotFoundResponse; -import org.infinispan.remoting.responses.Response; -import org.infinispan.remoting.rpc.RpcManager; -import org.infinispan.remoting.transport.Address; -import org.infinispan.transaction.TransactionMode; -import org.infinispan.transaction.impl.LocalTransaction; -import org.infinispan.transaction.impl.TransactionTable; -import org.infinispan.transaction.xa.GlobalTransaction; -import org.infinispan.util.logging.Log; -import org.infinispan.util.logging.LogFactory; -import org.infinispan.xsite.statetransfer.XSiteState; -import org.infinispan.xsite.statetransfer.XSiteStatePushCommand; -import org.infinispan.xsite.statetransfer.XSiteStateTransferControlCommand; - -import javax.transaction.TransactionManager; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -/** - * @author Mircea Markus - * @since 5.2 - */ -public class BackupReceiverImpl implements BackupReceiver { - - private static final Log log = LogFactory.getLog(BackupReceiverImpl.class); - private static final boolean trace = log.isDebugEnabled(); - private final Cache cache; - - private final BackupCacheUpdater siteUpdater; - - public BackupReceiverImpl(Cache cache) { - this.cache = cache; - siteUpdater = new BackupCacheUpdater(cache); - } - - @Override - public Cache getCache() { - return cache; - } - - @Override - public Object handleRemoteCommand(VisitableCommand command) throws Throwable { - return command.acceptVisitor(null, siteUpdater); - } - - @Override - public void handleStateTransferControl(XSiteStateTransferControlCommand command) throws Exception { - XSiteStateTransferControlCommand invokeCommand = command; - if (!command.getCacheName().equals(cache.getName())) { - //copy if the cache name is different - invokeCommand = command.copyForCache(cache.getName()); - } - invokeCommand.setSiteName(command.getOriginSite()); - invokeRemotelyInLocalSite(invokeCommand); - } - - @Override - public void handleStateTransferState(XSiteStatePushCommand cmd) throws Exception { - //split the state and forward it to the primary owners... - if (!cache.getStatus().allowInvocations()) { - throw new CacheException("Cache is stopping or terminated: " + cache.getStatus()); - } - final ClusteringDependentLogic clusteringDependentLogic = cache.getAdvancedCache().getComponentRegistry() - .getComponent(ClusteringDependentLogic.class); - final Map> primaryOwnersChunks = new HashMap<>(); - - if (trace) { - log.tracef("Received X-Site state transfer '%s'. Splitting by primary owner.", cmd); - } - - for (XSiteState state : cmd.getChunk()) { - final Address primaryOwner = clusteringDependentLogic.getPrimaryOwner(state.key()); - List primaryOwnerList = primaryOwnersChunks.get(primaryOwner); - if (primaryOwnerList == null) { - primaryOwnerList = new LinkedList<>(); - primaryOwnersChunks.put(primaryOwner, primaryOwnerList); - } - primaryOwnerList.add(state); - } - - final List localChunks = primaryOwnersChunks.remove(clusteringDependentLogic.getAddress()); - final List tasks = new ArrayList<>(primaryOwnersChunks.size()); - - for (Map.Entry> entry : primaryOwnersChunks.entrySet()) { - if (entry.getValue() == null || entry.getValue().isEmpty()) { - continue; - } - if (trace) { - log.tracef("Node '%s' will apply %s", entry.getKey(), entry.getValue()); - } - StatePushTask task = new StatePushTask(entry.getValue(), entry.getKey(), cache); - tasks.add(task); - task.executeRemote(); - } - - //help gc. this is safe because the chunks was already sent - primaryOwnersChunks.clear(); - - if (trace) { - log.tracef("Local node '%s' will apply %s", cache.getAdvancedCache().getRpcManager().getAddress(), - localChunks); - } - - if (localChunks != null) { - LocalInvocation.newInstanceFromCache(cache, newStatePushCommand(cache, localChunks)).call(); - //help gc - localChunks.clear(); - } - - if (trace) { - log.tracef("Waiting for the remote tasks..."); - } - - while (!tasks.isEmpty()) { - for (Iterator iterator = tasks.iterator(); iterator.hasNext(); ) { - awaitRemoteTask(cache, iterator.next()); - iterator.remove(); - } - } - //the put operation can fail silently. check in the end and it is better to resend the chunk than to lose keys. - if (!cache.getStatus().allowInvocations()) { - throw new CacheException("Cache is stopping or terminated: " + cache.getStatus()); - } - } - - private static void awaitRemoteTask(Cache cache, StatePushTask task) throws Exception { - try { - if (trace) { - log.tracef("Waiting reply from %s", task.address); - } - Map responseMap = task.awaitRemote(); - if (trace) { - log.tracef("Response received is %s", responseMap); - } - if (responseMap.size() > 1 || !responseMap.containsKey(task.address)) { - throw new IllegalStateException("Shouldn't happen. Map is " + responseMap); - } - Response response = responseMap.get(task.address); - if (response == CacheNotFoundResponse.INSTANCE) { - if (trace) { - log.tracef("Cache not found in node '%s'. Retrying locally!", task.address); - } - if (!cache.getStatus().allowInvocations()) { - throw new CacheException("Cache is stopping or terminated: " + cache.getStatus()); - } - task.executeLocal(); - } - } catch (Exception e) { - if (!cache.getStatus().allowInvocations()) { - throw new CacheException("Cache is stopping or terminated: " + cache.getStatus()); - } - if (cache.getAdvancedCache().getRpcManager().getMembers().contains(task.address)) { - if (trace) { - log.tracef(e, "An exception was sent by %s. Retrying!", task.address); - } - task.executeRemote(); //retry! - } else { - if (trace) { - log.tracef(e, "An exception was sent by %s. Retrying locally!", task.address); - } - //if the node left the cluster, we apply the missing state. This avoids the site provider to re-send the - //full chunk. - task.executeLocal(); - } - } - } - - private static XSiteStatePushCommand newStatePushCommand(Cache cache, List stateList) { - CommandsFactory commandsFactory = cache.getAdvancedCache().getComponentRegistry().getCommandsFactory(); - return commandsFactory.buildXSiteStatePushCommand(stateList.toArray(new XSiteState[stateList.size()])); - } - - private Map invokeRemotelyInLocalSite(CacheRpcCommand command) throws Exception { - final RpcManager rpcManager = cache.getAdvancedCache().getRpcManager(); - final NotifyingNotifiableFuture> remoteFuture = new NotifyingFutureImpl<>(); - final Map responseMap = new HashMap<>(); - rpcManager.invokeRemotelyInFuture(remoteFuture, null, command, rpcManager.getDefaultRpcOptions(true, DeliverOrder.NONE)); - responseMap.put(rpcManager.getAddress(), LocalInvocation.newInstanceFromCache(cache, command).call()); - //noinspection unchecked - responseMap.putAll(remoteFuture.get()); - return responseMap; - } - - public static final class BackupCacheUpdater extends AbstractVisitor { - - private static Log log = LogFactory.getLog(BackupCacheUpdater.class); - - private final ConcurrentMap remote2localTx; - - private final AdvancedCache backupCache; - - BackupCacheUpdater(Cache backup) { - //ignore return values on the backup - this.backupCache = backup.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES, Flag.SKIP_XSITE_BACKUP); - this.remote2localTx = new ConcurrentHashMap<>(); - } - - @Override - public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { - log.tracef("Processing a remote put %s", command); - if (command.isConditional()) { - return backupCache.putIfAbsent(command.getKey(), command.getValue(), command.getMetadata()); - } - return backupCache.put(command.getKey(), command.getValue(), command.getMetadata()); - } - - @Override - public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable { - if (command.isConditional()) { - return backupCache.remove(command.getKey(), command.getValue()); - } - return backupCache.remove(command.getKey()); - } - - @Override - public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable { - if (command.isConditional() && command.getOldValue() != null) { - return backupCache.replace(command.getKey(), command.getOldValue(), command.getNewValue(), - command.getMetadata()); - } - return backupCache.replace(command.getKey(), command.getNewValue(), - command.getMetadata()); - } - - @Override - public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable { - Metadata metadata = command.getMetadata(); - backupCache.putAll(command.getMap(), - metadata.lifespan(), TimeUnit.MILLISECONDS, - metadata.maxIdle(), TimeUnit.MILLISECONDS); - return null; - } - - @Override - public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable { - backupCache.clear(); - return null; - } - - @Override - public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable { - boolean isTransactional = isTransactional(); - if (isTransactional) { - - // Sanity check -- if the remote tx doesn't have modifications, it never should have been propagated! - if( !command.hasModifications() ) { - throw new IllegalStateException( "TxInvocationContext has no modifications!" ); - } - - replayModificationsInTransaction(command, command.isOnePhaseCommit()); - } else { - replayModifications(command); - } - return null; - } - - private boolean isTransactional() { - return backupCache.getCacheConfiguration().transaction().transactionMode() == TransactionMode.TRANSACTIONAL; - } - - @Override - public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable { - if (!isTransactional()) { - log.cannotRespondToCommit(command.getGlobalTransaction(), backupCache.getName()); - } else { - log.tracef("Committing remote transaction %s", command.getGlobalTransaction()); - completeTransaction(command.getGlobalTransaction(), true); - } - return null; - } - - @Override - public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable { - - if (!isTransactional()) { - log.cannotRespondToRollback(command.getGlobalTransaction(), backupCache.getName()); - } else { - log.tracef("Rolling back remote transaction %s", command.getGlobalTransaction()); - completeTransaction(command.getGlobalTransaction(), false); - } - return null; - } - - private void completeTransaction(GlobalTransaction globalTransaction, boolean commit) throws Throwable { - TransactionTable txTable = txTable(); - GlobalTransaction localTxId = remote2localTx.remove(globalTransaction); - if (localTxId == null) { - throw new CacheException("Couldn't find a local transaction corresponding to remote transaction " + globalTransaction); - } - LocalTransaction localTx = txTable.getLocalTransaction(localTxId); - if (localTx == null) { - throw new IllegalStateException("Local tx not found but present in the tx table!"); - } - TransactionManager txManager = txManager(); - txManager.resume(localTx.getTransaction()); - if (commit) { - txManager.commit(); - } else { - txManager.rollback(); - } - } - - private void replayModificationsInTransaction(PrepareCommand command, boolean onePhaseCommit) throws Throwable { - TransactionManager tm = txManager(); - boolean replaySuccessful = false; - try { - - tm.begin(); - replayModifications(command); - replaySuccessful = true; - } - finally { - LocalTransaction localTx = txTable().getLocalTransaction( tm.getTransaction() ); - if (localTx != null) { //possible for the tx to be null if we got an exception during applying modifications - localTx.setFromRemoteSite(true); - - if (onePhaseCommit) { - if( replaySuccessful ) { - log.tracef("Committing remotely originated tx %s as it is 1PC", command.getGlobalTransaction()); - tm.commit(); - } else { - log.tracef("Rolling back remotely originated tx %s", command.getGlobalTransaction()); - tm.rollback(); - } - } else { // Wait for a remote commit/rollback. - remote2localTx.put(command.getGlobalTransaction(), localTx.getGlobalTransaction()); - tm.suspend(); - } - } - } - } - - private TransactionManager txManager() { - return backupCache.getAdvancedCache().getTransactionManager(); - } - - public TransactionTable txTable() { - return backupCache.getComponentRegistry().getComponent(TransactionTable.class); - } - - private void replayModifications(PrepareCommand command) throws Throwable { - for (WriteCommand c : command.getModifications()) { - c.acceptVisitor(null, this); - } - } - } - - private static class StatePushTask { - private final List chunk; - private final Address address; - private final Cache cache; - private volatile Future> remoteFuture; - - - private StatePushTask(List chunk, Address address, Cache cache) { - this.chunk = chunk; - this.address = address; - this.cache = cache; - } - - public void executeRemote() { - final RpcManager rpcManager = cache.getAdvancedCache().getRpcManager(); - NotifyingNotifiableFuture> future = new NotifyingFutureImpl<>(); - remoteFuture = future; - rpcManager.invokeRemotelyInFuture(future, Collections.singletonList(address), newStatePushCommand(cache, chunk), - rpcManager.getDefaultRpcOptions(true)); - } - - public Response executeLocal() throws Exception { - return LocalInvocation.newInstanceFromCache(cache, newStatePushCommand(cache, chunk)).call(); - } - - public Map awaitRemote() throws Exception { - Future> future = remoteFuture; - if (future == null) { - throw new NullPointerException("Should not happen!"); - } - return future.get(); - } - - } -} diff --git a/core/src/main/java/org/infinispan/xsite/BackupReceiverRepositoryImpl.java b/core/src/main/java/org/infinispan/xsite/BackupReceiverRepositoryImpl.java index e5848edaf5b6..ca1f47ed7e65 100644 --- a/core/src/main/java/org/infinispan/xsite/BackupReceiverRepositoryImpl.java +++ b/core/src/main/java/org/infinispan/xsite/BackupReceiverRepositoryImpl.java @@ -1,8 +1,6 @@ package org.infinispan.xsite; import org.infinispan.Cache; -import org.infinispan.commands.VisitableCommand; -import org.infinispan.commands.remote.SingleRpcCommand; import org.infinispan.configuration.cache.Configuration; import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Start; @@ -13,9 +11,6 @@ import org.infinispan.notifications.cachemanagerlistener.event.CacheStoppedEvent; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; -import org.infinispan.xsite.statetransfer.XSiteStatePushCommand; -import org.infinispan.xsite.statetransfer.XSiteStateTransferControlCommand; -import org.jgroups.protocols.relay.SiteAddress; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -29,9 +24,8 @@ public class BackupReceiverRepositoryImpl implements BackupReceiverRepository { private static Log log = LogFactory.getLog(BackupReceiverRepositoryImpl.class); - private static boolean trace = log.isTraceEnabled(); - private final ConcurrentMap backupReceivers = new ConcurrentHashMap(); + private final ConcurrentMap backupReceivers = new ConcurrentHashMap<>(); public EmbeddedCacheManager cacheManager; @@ -79,7 +73,7 @@ public BackupReceiver getBackupReceiver(String remoteSite, String remoteCache) { Configuration dcc = cacheManager.getDefaultCacheConfiguration(); if (isBackupForRemoteCache(remoteSite, remoteCache, dcc, EmbeddedCacheManager.DEFAULT_CACHE_NAME)) { Cache cache = cacheManager.getCache(); - backupReceivers.putIfAbsent(toLookFor, new BackupReceiverImpl(cache)); + backupReceivers.putIfAbsent(toLookFor, createBackupReceiver(cache)); toLookFor.setLocalCacheName(EmbeddedCacheManager.DEFAULT_CACHE_NAME); return backupReceivers.get(toLookFor); } @@ -90,7 +84,7 @@ public BackupReceiver getBackupReceiver(String remoteSite, String remoteCache) { if (isBackupForRemoteCache(remoteSite, remoteCache, cacheConfiguration, name)) { Cache cache = cacheManager.getCache(name); toLookFor.setLocalCacheName(name); - backupReceivers.putIfAbsent(toLookFor, new BackupReceiverImpl(cache)); + backupReceivers.putIfAbsent(toLookFor, createBackupReceiver(cache)); return backupReceivers.get(toLookFor); } } @@ -98,7 +92,7 @@ public BackupReceiver getBackupReceiver(String remoteSite, String remoteCache) { remoteSite, remoteCache, remoteCache); Cache cache = cacheManager.getCache(remoteCache); - backupReceivers.putIfAbsent(toLookFor, new BackupReceiverImpl(cache)); + backupReceivers.putIfAbsent(toLookFor, createBackupReceiver(cache)); toLookFor.setLocalCacheName(cache.getName()); return backupReceivers.get(toLookFor); } @@ -164,4 +158,10 @@ public void replace(String site, String cache, BackupReceiver bcr) { public BackupReceiver get(String site, String cache) { return backupReceivers.get(new SiteCachePair(site, cache)); } + + private static BackupReceiver createBackupReceiver(Cache cache) { + return cache.getCacheConfiguration().clustering().cacheMode().isClustered() ? + new ClusteredCacheBackupReceiver(cache) : + new LocalCacheBackupReceiver(cache); + } } diff --git a/core/src/main/java/org/infinispan/xsite/BaseBackupReceiver.java b/core/src/main/java/org/infinispan/xsite/BaseBackupReceiver.java new file mode 100644 index 000000000000..4b9b1e8c9724 --- /dev/null +++ b/core/src/main/java/org/infinispan/xsite/BaseBackupReceiver.java @@ -0,0 +1,232 @@ +package org.infinispan.xsite; + +import org.infinispan.AdvancedCache; +import org.infinispan.Cache; +import org.infinispan.commands.AbstractVisitor; +import org.infinispan.commands.CommandsFactory; +import org.infinispan.commands.VisitableCommand; +import org.infinispan.commands.tx.CommitCommand; +import org.infinispan.commands.tx.PrepareCommand; +import org.infinispan.commands.tx.RollbackCommand; +import org.infinispan.commands.write.ClearCommand; +import org.infinispan.commands.write.PutKeyValueCommand; +import org.infinispan.commands.write.PutMapCommand; +import org.infinispan.commands.write.RemoveCommand; +import org.infinispan.commands.write.ReplaceCommand; +import org.infinispan.commands.write.WriteCommand; +import org.infinispan.commons.CacheException; +import org.infinispan.context.Flag; +import org.infinispan.context.InvocationContext; +import org.infinispan.context.impl.TxInvocationContext; +import org.infinispan.metadata.Metadata; +import org.infinispan.transaction.TransactionMode; +import org.infinispan.transaction.impl.LocalTransaction; +import org.infinispan.transaction.impl.TransactionTable; +import org.infinispan.transaction.xa.GlobalTransaction; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; +import org.infinispan.xsite.statetransfer.XSiteState; +import org.infinispan.xsite.statetransfer.XSiteStatePushCommand; + +import javax.transaction.TransactionManager; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +/** + * Common implementation logic for {@link org.infinispan.xsite.BackupReceiver} + * + * @author Mircea Markus + * @author Pedro Ruivo + * @since 7.1 + */ +public abstract class BaseBackupReceiver implements BackupReceiver { + + protected final Cache cache; + private final BackupCacheUpdater siteUpdater; + + protected BaseBackupReceiver(Cache cache) { + this.cache = cache; + siteUpdater = new BackupCacheUpdater(cache); + } + + protected static XSiteStatePushCommand newStatePushCommand(Cache cache, List stateList) { + CommandsFactory commandsFactory = cache.getAdvancedCache().getComponentRegistry().getCommandsFactory(); + return commandsFactory.buildXSiteStatePushCommand(stateList.toArray(new XSiteState[stateList.size()])); + } + + @Override + public final Cache getCache() { + return cache; + } + + @Override + public final Object handleRemoteCommand(VisitableCommand command) throws Throwable { + return command.acceptVisitor(null, siteUpdater); + } + + public static final class BackupCacheUpdater extends AbstractVisitor { + + private static Log log = LogFactory.getLog(BackupCacheUpdater.class); + + private final ConcurrentMap remote2localTx; + + private final AdvancedCache backupCache; + + BackupCacheUpdater(Cache backup) { + //ignore return values on the backup + this.backupCache = backup.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES, Flag.SKIP_XSITE_BACKUP); + this.remote2localTx = new ConcurrentHashMap<>(); + } + + @Override + public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { + log.tracef("Processing a remote put %s", command); + if (command.isConditional()) { + return backupCache.putIfAbsent(command.getKey(), command.getValue(), command.getMetadata()); + } + return backupCache.put(command.getKey(), command.getValue(), command.getMetadata()); + } + + @Override + public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable { + if (command.isConditional()) { + return backupCache.remove(command.getKey(), command.getValue()); + } + return backupCache.remove(command.getKey()); + } + + @Override + public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable { + if (command.isConditional() && command.getOldValue() != null) { + return backupCache.replace(command.getKey(), command.getOldValue(), command.getNewValue(), + command.getMetadata()); + } + return backupCache.replace(command.getKey(), command.getNewValue(), + command.getMetadata()); + } + + @Override + public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable { + Metadata metadata = command.getMetadata(); + backupCache.putAll(command.getMap(), + metadata.lifespan(), TimeUnit.MILLISECONDS, + metadata.maxIdle(), TimeUnit.MILLISECONDS); + return null; + } + + @Override + public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable { + backupCache.clear(); + return null; + } + + @Override + public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable { + boolean isTransactional = isTransactional(); + if (isTransactional) { + + // Sanity check -- if the remote tx doesn't have modifications, it never should have been propagated! + if (!command.hasModifications()) { + throw new IllegalStateException("TxInvocationContext has no modifications!"); + } + + replayModificationsInTransaction(command, command.isOnePhaseCommit()); + } else { + replayModifications(command); + } + return null; + } + + @Override + public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable { + if (!isTransactional()) { + log.cannotRespondToCommit(command.getGlobalTransaction(), backupCache.getName()); + } else { + log.tracef("Committing remote transaction %s", command.getGlobalTransaction()); + completeTransaction(command.getGlobalTransaction(), true); + } + return null; + } + + @Override + public Object visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) throws Throwable { + + if (!isTransactional()) { + log.cannotRespondToRollback(command.getGlobalTransaction(), backupCache.getName()); + } else { + log.tracef("Rolling back remote transaction %s", command.getGlobalTransaction()); + completeTransaction(command.getGlobalTransaction(), false); + } + return null; + } + + private TransactionTable txTable() { + return backupCache.getComponentRegistry().getComponent(TransactionTable.class); + } + + private boolean isTransactional() { + return backupCache.getCacheConfiguration().transaction().transactionMode() == TransactionMode.TRANSACTIONAL; + } + + private void completeTransaction(GlobalTransaction globalTransaction, boolean commit) throws Throwable { + TransactionTable txTable = txTable(); + GlobalTransaction localTxId = remote2localTx.remove(globalTransaction); + if (localTxId == null) { + throw new CacheException("Couldn't find a local transaction corresponding to remote transaction " + globalTransaction); + } + LocalTransaction localTx = txTable.getLocalTransaction(localTxId); + if (localTx == null) { + throw new IllegalStateException("Local tx not found but present in the tx table!"); + } + TransactionManager txManager = txManager(); + txManager.resume(localTx.getTransaction()); + if (commit) { + txManager.commit(); + } else { + txManager.rollback(); + } + } + + private void replayModificationsInTransaction(PrepareCommand command, boolean onePhaseCommit) throws Throwable { + TransactionManager tm = txManager(); + boolean replaySuccessful = false; + try { + + tm.begin(); + replayModifications(command); + replaySuccessful = true; + } finally { + LocalTransaction localTx = txTable().getLocalTransaction(tm.getTransaction()); + if (localTx != null) { //possible for the tx to be null if we got an exception during applying modifications + localTx.setFromRemoteSite(true); + + if (onePhaseCommit) { + if (replaySuccessful) { + log.tracef("Committing remotely originated tx %s as it is 1PC", command.getGlobalTransaction()); + tm.commit(); + } else { + log.tracef("Rolling back remotely originated tx %s", command.getGlobalTransaction()); + tm.rollback(); + } + } else { // Wait for a remote commit/rollback. + remote2localTx.put(command.getGlobalTransaction(), localTx.getGlobalTransaction()); + tm.suspend(); + } + } + } + } + + private TransactionManager txManager() { + return backupCache.getAdvancedCache().getTransactionManager(); + } + + private void replayModifications(PrepareCommand command) throws Throwable { + for (WriteCommand c : command.getModifications()) { + c.acceptVisitor(null, this); + } + } + } + +} diff --git a/core/src/main/java/org/infinispan/xsite/ClusteredCacheBackupReceiver.java b/core/src/main/java/org/infinispan/xsite/ClusteredCacheBackupReceiver.java new file mode 100644 index 000000000000..e60e2628d23b --- /dev/null +++ b/core/src/main/java/org/infinispan/xsite/ClusteredCacheBackupReceiver.java @@ -0,0 +1,211 @@ +package org.infinispan.xsite; + +import org.infinispan.Cache; +import org.infinispan.commands.remote.CacheRpcCommand; +import org.infinispan.commons.CacheException; +import org.infinispan.commons.util.concurrent.NotifyingFutureImpl; +import org.infinispan.commons.util.concurrent.NotifyingNotifiableFuture; +import org.infinispan.interceptors.locking.ClusteringDependentLogic; +import org.infinispan.remoting.LocalInvocation; +import org.infinispan.remoting.responses.CacheNotFoundResponse; +import org.infinispan.remoting.responses.Response; +import org.infinispan.remoting.rpc.RpcManager; +import org.infinispan.remoting.transport.Address; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; +import org.infinispan.xsite.statetransfer.XSiteState; +import org.infinispan.xsite.statetransfer.XSiteStatePushCommand; +import org.infinispan.xsite.statetransfer.XSiteStateTransferControlCommand; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +/** + * {@link org.infinispan.xsite.BackupReceiver} implementation for clustered caches. + * + * @author Pedro Ruivo + * @since 7.1 + */ +public class ClusteredCacheBackupReceiver extends BaseBackupReceiver { + + private static final Log log = LogFactory.getLog(ClusteredCacheBackupReceiver.class); + private static final boolean trace = log.isDebugEnabled(); + + public ClusteredCacheBackupReceiver(Cache cache) { + super(cache); + } + + private static void awaitRemoteTask(Cache cache, StatePushTask task) throws Exception { + try { + if (trace) { + log.tracef("Waiting reply from %s", task.address); + } + Map responseMap = task.awaitRemote(); + if (trace) { + log.tracef("Response received is %s", responseMap); + } + if (responseMap.size() > 1 || !responseMap.containsKey(task.address)) { + throw new IllegalStateException("Shouldn't happen. Map is " + responseMap); + } + Response response = responseMap.get(task.address); + if (response == CacheNotFoundResponse.INSTANCE) { + if (trace) { + log.tracef("Cache not found in node '%s'. Retrying locally!", task.address); + } + if (!cache.getStatus().allowInvocations()) { + throw new CacheException("Cache is stopping or terminated: " + cache.getStatus()); + } + task.executeLocal(); + } + } catch (Exception e) { + if (!cache.getStatus().allowInvocations()) { + throw new CacheException("Cache is stopping or terminated: " + cache.getStatus()); + } + if (cache.getAdvancedCache().getRpcManager().getMembers().contains(task.address)) { + if (trace) { + log.tracef(e, "An exception was sent by %s. Retrying!", task.address); + } + task.executeRemote(); //retry! + } else { + if (trace) { + log.tracef(e, "An exception was sent by %s. Retrying locally!", task.address); + } + //if the node left the cluster, we apply the missing state. This avoids the site provider to re-send the + //full chunk. + task.executeLocal(); + } + } + } + + @Override + public void handleStateTransferControl(XSiteStateTransferControlCommand command) throws Exception { + XSiteStateTransferControlCommand invokeCommand = command; + if (!command.getCacheName().equals(cache.getName())) { + //copy if the cache name is different + invokeCommand = command.copyForCache(cache.getName()); + } + invokeCommand.setSiteName(command.getOriginSite()); + invokeRemotelyInLocalSite(invokeCommand); + } + + @Override + public void handleStateTransferState(XSiteStatePushCommand cmd) throws Exception { + //split the state and forward it to the primary owners... + if (!cache.getStatus().allowInvocations()) { + throw new CacheException("Cache is stopping or terminated: " + cache.getStatus()); + } + final ClusteringDependentLogic clusteringDependentLogic = cache.getAdvancedCache().getComponentRegistry() + .getComponent(ClusteringDependentLogic.class); + final Map> primaryOwnersChunks = new HashMap<>(); + + if (trace) { + log.tracef("Received X-Site state transfer '%s'. Splitting by primary owner.", cmd); + } + + for (XSiteState state : cmd.getChunk()) { + final Address primaryOwner = clusteringDependentLogic.getPrimaryOwner(state.key()); + List primaryOwnerList = primaryOwnersChunks.get(primaryOwner); + if (primaryOwnerList == null) { + primaryOwnerList = new LinkedList<>(); + primaryOwnersChunks.put(primaryOwner, primaryOwnerList); + } + primaryOwnerList.add(state); + } + + final List localChunks = primaryOwnersChunks.remove(clusteringDependentLogic.getAddress()); + final List tasks = new ArrayList<>(primaryOwnersChunks.size()); + + for (Map.Entry> entry : primaryOwnersChunks.entrySet()) { + if (entry.getValue() == null || entry.getValue().isEmpty()) { + continue; + } + if (trace) { + log.tracef("Node '%s' will apply %s", entry.getKey(), entry.getValue()); + } + StatePushTask task = new StatePushTask(entry.getValue(), entry.getKey(), cache); + tasks.add(task); + task.executeRemote(); + } + + //help gc. this is safe because the chunks was already sent + primaryOwnersChunks.clear(); + + if (trace) { + log.tracef("Local node '%s' will apply %s", cache.getAdvancedCache().getRpcManager().getAddress(), + localChunks); + } + + if (localChunks != null) { + LocalInvocation.newInstanceFromCache(cache, newStatePushCommand(cache, localChunks)).call(); + //help gc + localChunks.clear(); + } + + if (trace) { + log.tracef("Waiting for the remote tasks..."); + } + + while (!tasks.isEmpty()) { + for (Iterator iterator = tasks.iterator(); iterator.hasNext(); ) { + awaitRemoteTask(cache, iterator.next()); + iterator.remove(); + } + } + //the put operation can fail silently. check in the end and it is better to resend the chunk than to lose keys. + if (!cache.getStatus().allowInvocations()) { + throw new CacheException("Cache is stopping or terminated: " + cache.getStatus()); + } + } + + private Map invokeRemotelyInLocalSite(CacheRpcCommand command) throws Exception { + final RpcManager rpcManager = cache.getAdvancedCache().getRpcManager(); + final NotifyingNotifiableFuture> remoteFuture = new NotifyingFutureImpl<>(); + final Map responseMap = new HashMap<>(); + rpcManager.invokeRemotelyInFuture(remoteFuture, null, command, rpcManager.getDefaultRpcOptions(true, false)); + responseMap.put(rpcManager.getAddress(), LocalInvocation.newInstanceFromCache(cache, command).call()); + //noinspection unchecked + responseMap.putAll(remoteFuture.get()); + return responseMap; + } + + private static class StatePushTask { + private final List chunk; + private final Address address; + private final Cache cache; + private volatile Future> remoteFuture; + + + private StatePushTask(List chunk, Address address, Cache cache) { + this.chunk = chunk; + this.address = address; + this.cache = cache; + } + + public void executeRemote() { + final RpcManager rpcManager = cache.getAdvancedCache().getRpcManager(); + NotifyingNotifiableFuture> future = new NotifyingFutureImpl<>(); + remoteFuture = future; + rpcManager.invokeRemotelyInFuture(future, Collections.singletonList(address), newStatePushCommand(cache, chunk), + rpcManager.getDefaultRpcOptions(true)); + } + + public Response executeLocal() throws Exception { + return LocalInvocation.newInstanceFromCache(cache, newStatePushCommand(cache, chunk)).call(); + } + + public Map awaitRemote() throws Exception { + Future> future = remoteFuture; + if (future == null) { + throw new NullPointerException("Should not happen!"); + } + return future.get(); + } + + } +} diff --git a/core/src/main/java/org/infinispan/xsite/LocalCacheBackupReceiver.java b/core/src/main/java/org/infinispan/xsite/LocalCacheBackupReceiver.java new file mode 100644 index 000000000000..e7271ed4aa73 --- /dev/null +++ b/core/src/main/java/org/infinispan/xsite/LocalCacheBackupReceiver.java @@ -0,0 +1,62 @@ +package org.infinispan.xsite; + +import org.infinispan.Cache; +import org.infinispan.commons.CacheException; +import org.infinispan.remoting.LocalInvocation; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; +import org.infinispan.xsite.statetransfer.XSiteState; +import org.infinispan.xsite.statetransfer.XSiteStatePushCommand; +import org.infinispan.xsite.statetransfer.XSiteStateTransferControlCommand; + +import java.util.Arrays; +import java.util.List; + +/** + * {@link org.infinispan.xsite.BackupReceiver} implementation for local caches. + * + * @author Pedro Ruivo + * @since 7.1 + */ +public class LocalCacheBackupReceiver extends BaseBackupReceiver { + + private static final Log log = LogFactory.getLog(LocalCacheBackupReceiver.class); + private static final boolean trace = log.isDebugEnabled(); + + public LocalCacheBackupReceiver(Cache cache) { + super(cache); + } + + @Override + public void handleStateTransferControl(XSiteStateTransferControlCommand command) throws Exception { + XSiteStateTransferControlCommand invokeCommand = command; + if (!command.getCacheName().equals(cache.getName())) { + //copy if the cache name is different + invokeCommand = command.copyForCache(cache.getName()); + } + invokeCommand.setSiteName(command.getOriginSite()); + LocalInvocation.newInstanceFromCache(cache, command).call(); + } + + @Override + public void handleStateTransferState(XSiteStatePushCommand cmd) throws Exception { + //split the state and forward it to the primary owners... + if (!cache.getStatus().allowInvocations()) { + throw new CacheException("Cache is stopping or terminated: " + cache.getStatus()); + } + + final List localChunks = Arrays.asList(cmd.getChunk()); + + if (trace) { + log.tracef("Local node will apply %s", localChunks); + } + + LocalInvocation.newInstanceFromCache(cache, newStatePushCommand(cache, localChunks)).call(); + + //the put operation can fail silently. check in the end and it is better to resend the chunk than to lose keys. + if (!cache.getStatus().allowInvocations()) { + throw new CacheException("Cache is stopping or terminated: " + cache.getStatus()); + } + } + +} diff --git a/core/src/test/java/org/infinispan/xsite/statetransfer/LocalCacheStateTransferTest.java b/core/src/test/java/org/infinispan/xsite/statetransfer/LocalCacheStateTransferTest.java new file mode 100644 index 000000000000..0dcddd748ee4 --- /dev/null +++ b/core/src/test/java/org/infinispan/xsite/statetransfer/LocalCacheStateTransferTest.java @@ -0,0 +1,163 @@ +package org.infinispan.xsite.statetransfer; + +import org.infinispan.Cache; +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.configuration.global.GlobalConfigurationBuilder; +import org.infinispan.context.Flag; +import org.infinispan.statetransfer.CommitManager; +import org.infinispan.test.fwk.TestCacheManagerFactory; +import org.infinispan.xsite.AbstractXSiteTest; +import org.infinispan.xsite.XSiteAdminOperations; +import org.testng.annotations.Test; + +import java.util.concurrent.TimeUnit; + +import static org.infinispan.test.TestingUtil.extractComponent; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertTrue; + +/** + * Tests the {@link org.infinispan.xsite.BackupReceiver} for the local caches. + * + * @author Pedro Ruivo + * @since 7.1 + */ +@Test(groups = "xsite", testName = "xsite.statetransfer.LocalCacheStateTransferTest") +public class LocalCacheStateTransferTest extends AbstractXSiteTest { + + + private static final String LON = "LON"; + private static final String NYC = "NYC"; + + public void testStateTransferWithClusterIdle() throws InterruptedException { + takeSiteOffline(LON, NYC); + assertOffline(LON, NYC); + + assertNoStateTransferInReceivingSite(NYC); + assertNoStateTransferInSendingSite(LON); + + //NYC is offline... lets put some initial data in + //we have 2 nodes in each site and the primary owner sends the state. Lets try to have more key than the chunk + //size in order to each site to send more than one chunk. + final int amountOfData = chunkSize(LON) * 4; + for (int i = 0; i < amountOfData; ++i) { + cache(LON, 0).put(key(i), value(0)); + } + + //check if NYC is empty (LON backup cache) + assertInSite(NYC, new AssertCondition() { + @Override + public void assertInCache(Cache cache) { + assertTrue(cache.isEmpty()); + } + }); + + //check if NYC is empty (default cache) + assertInSite(NYC, new AssertCondition() { + @Override + public void assertInCache(Cache cache) { + assertTrue(cache.isEmpty()); + } + }); + + startStateTransfer(LON, NYC); + + eventually(new Condition() { + @Override + public boolean isSatisfied() throws Exception { + return extractComponent(cache(LON, 0), XSiteAdminOperations.class).getRunningStateTransfer().isEmpty(); + } + }, TimeUnit.SECONDS.toMillis(30)); + + assertOnline(LON, NYC); + + //check if all data is visible (LON backup cache) + assertInSite(NYC, new AssertCondition() { + @Override + public void assertInCache(Cache cache) { + for (int i = 0; i < amountOfData; ++i) { + assertEquals(value(0), cache.get(key(i))); + } + } + }); + + //check if all data is visible NYC + assertInSite(NYC, new AssertCondition() { + @Override + public void assertInCache(Cache cache) { + for (int i = 0; i < amountOfData; ++i) { + assertEquals(value(0), cache.get(key(i))); + } + } + }); + + assertNoStateTransferInReceivingSite(NYC); + assertNoStateTransferInSendingSite(LON); + } + + @Override + protected void createSites() { + createSite(LON, 1, globalConfigurationBuilderForSite(LON), configurationBuilderForSite(NYC)); + createSite(NYC, 1, globalConfigurationBuilderForSite(NYC), TestCacheManagerFactory.getDefaultCacheConfiguration(false)); + } + + private GlobalConfigurationBuilder globalConfigurationBuilderForSite(String siteName) { + GlobalConfigurationBuilder builder = GlobalConfigurationBuilder.defaultClusteredBuilder(); + builder.site().localSite(siteName); + return builder; + } + + private ConfigurationBuilder configurationBuilderForSite(String backupSiteName) { + ConfigurationBuilder builder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC); + builder.sites().addBackup().site(backupSiteName) + .stateTransfer().chunkSize(1); + return builder; + } + + private void startStateTransfer(String fromSite, String toSite) { + XSiteAdminOperations operations = extractComponent(cache(fromSite, 0), XSiteAdminOperations.class); + assertEquals(XSiteAdminOperations.SUCCESS, operations.pushState(toSite)); + } + + private void takeSiteOffline(String localSite, String remoteSite) { + XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); + assertEquals(XSiteAdminOperations.SUCCESS, operations.takeSiteOffline(remoteSite)); + } + + private void assertOffline(String localSite, String remoteSite) { + XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); + assertEquals(XSiteAdminOperations.OFFLINE, operations.siteStatus(remoteSite)); + } + + private void assertOnline(String localSite, String remoteSite) { + XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class); + assertEquals(XSiteAdminOperations.ONLINE, operations.siteStatus(remoteSite)); + } + + private int chunkSize(String site) { + return cache(site, 0).getCacheConfiguration().sites().allBackups().get(0).stateTransfer().chunkSize(); + } + + private void assertNoStateTransferInReceivingSite(String siteName) { + Cache cache = cache(siteName, 0); + CommitManager commitManager = extractComponent(cache, CommitManager.class); + assertFalse(commitManager.isTracking(Flag.PUT_FOR_STATE_TRANSFER)); + assertFalse(commitManager.isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER)); + assertTrue(commitManager.isEmpty()); + } + + private void assertNoStateTransferInSendingSite(String siteName) { + Cache cache = cache(siteName, 0); + assertTrue(extractComponent(cache, XSiteStateProvider.class).getCurrentStateSending().isEmpty()); + } + + private Object key(int index) { + return "key-" + index; + } + + private Object value(int index) { + return "value-" + index; + } +}