Skip to content

Commit

Permalink
ISPN-902, ISPN-493, ISPN-914 combined into one. Details:
Browse files Browse the repository at this point in the history
* Change the DistSyncTimeout default from 1 minute to 4 minutes
* Update RehashControlCommand to contain 3 more phases, for draining and applying transaction logs from providers onto joiners
* Add a RemoteTransactionLogDetails object to encapsulate a state map and incomplete transaction log
* Add externalizer hooks for the above
* Add a RemoteTransactionLog facade around a TransactionLog, to allow joiners to control transaction log draining on a provider in an easy manner.  The RemoteTransactionLog just issues RehashControlCommands, but presents this to the join task in a more readable manner.
* Clean up ConsistentHash implementations (logging, redundant methods, etc)
* Add Configuration and ComponentRegistry to CacheRpcCommands, so that InboundInvocationHandler wouldn't need to look these components up every time when retrying commands.
* CommitCommand now has a return type that can request for a resend of the prepare if the global transaction is not recognised.  This can happen if a node sees a commit but not the prepare, for example during a rehash when a prepared node fails and the commit is sent to a new data owner.
* TransactionLogger now blocks and holds all new transactions from starting when applying state on a node during a LEAVE, to prevent overwriting.  Note that this doesn't affect the state provider, just the recipient.
* TransactionLogger's logic on determining whether shouldDrainWithoutLock() is much more sophisticated now, watching for progress being made during a non-blocking drain.
* DistTxInterceptor checks on transaction logging is reordered now, to provide more accurate semantics and to minimise races.  Note that races are NOT eliminated, and will need to be handled during the commit-retry process.  But this minimises it, since transaction logging is faster and cheaper.
* Update DistributionInterceptor to deal with retries when committing, and also to broadcast commit messages to freshly calculated target nodes as well as nodes that the prepare had previously been sent to, as this could be a disjoint set.
* CommandAwareRpcDispatcher now doesn't handle retries or acquiring processing locks with the DistributedSync.  Instead this is handled by the InboundInvocationHandler, the retry logic encapsulated in the RetryQueue inner class.  This is a far more robust approach than relying on the sender to re-send a message.
* Rehash tasks should not block on invalidations
* Rehash state pulls should happen in parallel.
  • Loading branch information
maniksurtani committed Feb 16, 2011
1 parent 4921368 commit 0b87cdb
Show file tree
Hide file tree
Showing 29 changed files with 1,315 additions and 571 deletions.
10 changes: 8 additions & 2 deletions core/src/main/java/org/infinispan/commands/CommandsFactory.java
Expand Up @@ -259,7 +259,7 @@ public interface CommandsFactory {
* Builds a RehashControlCommand for coordinating a rehash event. This version of this factory method creates a
* control command with a sender and a payload - a transaction log of writes that occured during the generation and
* delivery of state. The {@link org.infinispan.commands.control.RehashControlCommand.Type}
* of this command is {@link org.infinispan.commands.control.RehashControlCommand.Type#DRAIN_TX}.
* of this command is {@link org.infinispan.commands.control.RehashControlCommand.Type#LEAVE_DRAIN_TX}.
*
* @param sender sender's Address
* @param state list of writes
Expand All @@ -271,7 +271,7 @@ public interface CommandsFactory {
* Builds a RehashControlCommand for coordinating a rehash event. This version of this factory method creates a
* control command with a sender and a payload - a transaction log of pending prepares that occured during the generation
* and delivery of state. The {@link org.infinispan.commands.control.RehashControlCommand.Type}
* of this command is {@link org.infinispan.commands.control.RehashControlCommand.Type#DRAIN_TX_PREPARES}.
* of this command is {@link org.infinispan.commands.control.RehashControlCommand.Type#LEAVE_DRAIN_TX_PREPARES}.
*
* @param sender sender's Address
* @param state list of pending prepares
Expand All @@ -287,4 +287,10 @@ public interface CommandsFactory {
RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type subtype,
Address sender, Map<Object, InternalCacheValue> state, ConsistentHash oldCH,
ConsistentHash newCH, List<Address> leaversHandled);

/**
* Retrieves the cache name this CommandFactory is set up to construct commands for.
* @return the name of the cache this CommandFactory is set up to construct commands for.
*/
String getCacheName();
}
Expand Up @@ -25,8 +25,6 @@
import org.infinispan.commands.control.LockControlCommand;
import org.infinispan.commands.control.RehashControlCommand;
import org.infinispan.commands.control.StateTransferControlCommand;
import static org.infinispan.commands.control.RehashControlCommand.Type.DRAIN_TX_PREPARES;
import static org.infinispan.commands.control.RehashControlCommand.Type.DRAIN_TX;
import org.infinispan.commands.read.EntrySetCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.read.KeySetCommand;
Expand All @@ -52,8 +50,8 @@
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContextContainer;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.InterceptorChain;
Expand All @@ -71,6 +69,9 @@
import java.util.Map;
import java.util.Set;

import static org.infinispan.commands.control.RehashControlCommand.Type.LEAVE_DRAIN_TX;
import static org.infinispan.commands.control.RehashControlCommand.Type.LEAVE_DRAIN_TX_PREPARES;

/**
* @author Mircea.Markus@jboss.com
* @author Galder Zamarreño
Expand Down Expand Up @@ -289,7 +290,7 @@ public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) {
break;
case ClusteredGetCommand.COMMAND_ID:
ClusteredGetCommand clusteredGetCommand = (ClusteredGetCommand) c;
clusteredGetCommand.initialize(dataContainer, icc, this, interceptorChain);
clusteredGetCommand.initialize(icc, this, interceptorChain, distributionManager);
break;
case LockControlCommand.COMMAND_ID:
LockControlCommand lcc = (LockControlCommand) c;
Expand Down Expand Up @@ -330,16 +331,20 @@ public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type
}

public RehashControlCommand buildRehashControlCommandTxLog(Address sender, List<WriteCommand> commands) {
return new RehashControlCommand(cacheName, DRAIN_TX, sender, commands, null, this);
return new RehashControlCommand(cacheName, LEAVE_DRAIN_TX, sender, commands, null, this);
}

public RehashControlCommand buildRehashControlCommandTxLogPendingPrepares(Address sender, List<PrepareCommand> commands) {
return new RehashControlCommand(cacheName, DRAIN_TX_PREPARES, sender, null, commands, this);
return new RehashControlCommand(cacheName, LEAVE_DRAIN_TX_PREPARES, sender, null, commands, this);
}

public RehashControlCommand buildRehashControlCommand(RehashControlCommand.Type type,
Address sender, Map<Object, InternalCacheValue> state, ConsistentHash oldCH,
ConsistentHash newCH, List<Address> leavers) {
return new RehashControlCommand(cacheName, type, sender, state, oldCH, newCH, leavers, this);
}

public String getCacheName() {
return cacheName;
}
}
Expand Up @@ -11,8 +11,10 @@
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.RemoteTransactionLogDetails;
import org.infinispan.distribution.TransactionLogger;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.NodeTopologyInfo;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
Expand All @@ -22,17 +24,19 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Arrays;

/**
* A control command to coordinate rehashes that may occur when nodes join or leave a cluster, when DIST is used as a
* cache mode. This complex command coordinates the various phases of a rehash event when a joiner joins or a leaver
* leaves a cluster running in "distribution" mode.
* <p />
* It may break up into several commands in future.
* <p />
* In its current form, it is documented on <a href="http://community.jboss.org/wiki/DesignOfDynamicRehashing">this wiki page</a>.
*
* @author Manik Surtani
* @author Vladimir Blagojevic
Expand All @@ -42,8 +46,19 @@ public class RehashControlCommand extends BaseRpcCommand {

public static final int COMMAND_ID = 17;

/* For a detailed description of the interactions involved here, please visit http://community.jboss.org/wiki/DesignOfDynamicRehashing */
public enum Type {
JOIN_REQ, JOIN_REHASH_START, JOIN_REHASH_END, JOIN_ABORT, PULL_STATE_JOIN, PULL_STATE_LEAVE, LEAVE_REHASH_END, DRAIN_TX, DRAIN_TX_PREPARES
JOIN_REQ,
JOIN_REHASH_START,
JOIN_REHASH_END,
PULL_STATE_JOIN,
PULL_STATE_LEAVE,
LEAVE_REHASH_END,
LEAVE_DRAIN_TX,
LEAVE_DRAIN_TX_PREPARES,
JOIN_TX_LOG_REQ,
JOIN_TX_FINAL_LOG_REQ,
JOIN_TX_LOG_CLOSE
}

Type type;
Expand Down Expand Up @@ -118,25 +133,49 @@ public Object perform(InvocationContext ctx) throws Throwable {
case JOIN_REHASH_END:
distributionManager.informRehashOnJoin(sender, false, nodeTopologyInfo);
return null;
case LEAVE_REHASH_END:
case LEAVE_REHASH_END: // Signalled when a node 'applies' changes it has requested.
distributionManager.informRehashOnLeave(sender);
return null;
case PULL_STATE_JOIN:
return pullStateForJoin();
case PULL_STATE_LEAVE:
return pullStateForLeave();
case DRAIN_TX:
case LEAVE_DRAIN_TX: // used for a LEAVE ONLY!!
distributionManager.applyRemoteTxLog(txLogCommands);
return null;
case DRAIN_TX_PREPARES:
case LEAVE_DRAIN_TX_PREPARES:
for (PrepareCommand pc : pendingPrepares) pc.perform(null);
return null;
case JOIN_TX_LOG_REQ:
return drainTxLog();
case JOIN_TX_FINAL_LOG_REQ:
return lockAndDrainTxLog();
case JOIN_TX_LOG_CLOSE:
unlockAndCloseTxLog();
return null;
}
throw new CacheException("Unknown rehash control command type " + type);
}

private RemoteTransactionLogDetails drainTxLog() {
TransactionLogger tl = distributionManager.getTransactionLogger();
List<WriteCommand> mods = tl.drain();
return new RemoteTransactionLogDetails(tl.shouldDrainWithoutLock(), mods, null);
}

private RemoteTransactionLogDetails lockAndDrainTxLog() {
TransactionLogger tl = distributionManager.getTransactionLogger();
return new RemoteTransactionLogDetails(false, tl.drainAndLock(sender), tl.getPendingPrepares());

}

private void unlockAndCloseTxLog() {
TransactionLogger tl = distributionManager.getTransactionLogger();
tl.unlockAndDisable(sender);
}

public Map<Object, InternalCacheValue> pullStateForJoin() throws CacheLoaderException {

distributionManager.getTransactionLogger().enable();
Map<Object, InternalCacheValue> state = new HashMap<Object, InternalCacheValue>();
for (InternalCacheEntry ice : dataContainer) {
Object k = ice.getKey();
Expand Down Expand Up @@ -218,6 +257,10 @@ final boolean shouldTransferOwnershipToJoinNode(Object k) {
return false;
}

public Type getType() {
return type;
}

public byte getCommandId() {
return COMMAND_ID;
}
Expand Down Expand Up @@ -250,12 +293,12 @@ public String toString() {
return "RehashControlCommand{" +
"type=" + type +
", sender=" + sender +
", state=" + state +
", state=" + (state == null ? "N/A" : state.size()) +
", oldConsistentHash=" + oldCH +
", nodesLeft=" + nodesLeft +
", consistentHash=" + newCH +
", txLogCommands=" + txLogCommands +
", pendingPrepares=" + pendingPrepares +
", txLogCommands=" + (txLogCommands == null ? "N/A" : txLogCommands.size()) +
", pendingPrepares=" + (pendingPrepares == null ? "N/A" : pendingPrepares.size()) +
", nodeTopologyInfo=" + nodeTopologyInfo +
'}';
}
Expand Down
@@ -1,8 +1,28 @@
package org.infinispan.commands.remote;

import org.infinispan.config.Configuration;
import org.infinispan.factories.ComponentRegistry;

public abstract class BaseRpcCommand implements CacheRpcCommand {
protected String cacheName;

protected Configuration configuration;
protected ComponentRegistry componentRegistry;

public void injectComponents(Configuration configuration, ComponentRegistry componentRegistry) {
this.configuration = configuration;
this.componentRegistry = componentRegistry;
}

public Configuration getConfiguration() {
return configuration;
}

public ComponentRegistry getComponentRegistry() {
return componentRegistry;
}


protected BaseRpcCommand(String cacheName) {
this.cacheName = cacheName;
}
Expand Down
@@ -1,6 +1,8 @@
package org.infinispan.commands.remote;

import org.infinispan.commands.ReplicableCommand;
import org.infinispan.config.Configuration;
import org.infinispan.factories.ComponentRegistry;

/**
* The {@link org.infinispan.remoting.rpc.RpcManager} only replicates commands wrapped in a {@link CacheRpcCommand}.
Expand All @@ -16,4 +18,24 @@ public interface CacheRpcCommand extends ReplicableCommand {
* intended for.
*/
String getCacheName();

/**
* Sets up some more context for the invocation of this command, so that these components wouldn't need to be looked
* up again later.
* @param cfg configuration of the named cache associated with this command
* @param cr component registry of the named cache associated with this command
*/
void injectComponents(Configuration cfg, ComponentRegistry cr);

/**
* Retrieves the configuration associated with this command
* @return a Configuration instance
*/
Configuration getConfiguration();

/**
* Retrieves the component registry associated with this command
* @return a component registry
*/
ComponentRegistry getComponentRegistry();
}
Expand Up @@ -21,13 +21,10 @@
*/
package org.infinispan.commands.remote;

import java.util.Collections;
import java.util.Set;

import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.container.DataContainer;
import org.infinispan.config.Configuration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
Expand All @@ -37,10 +34,14 @@
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextContainer;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import java.util.Collections;
import java.util.Set;

/**
* Issues a remote get call. This is not a {@link org.infinispan.commands.VisitableCommand} and hence not passed up the
* {@link org.infinispan.interceptors.base.CommandInterceptor} chain.
Expand All @@ -58,13 +59,28 @@ public class ClusteredGetCommand implements CacheRpcCommand, FlagAffectedCommand
private Object key;
private String cacheName;

private DataContainer dataContainer;
private InvocationContextContainer icc;
private CommandsFactory commandsFactory;
private InterceptorChain invoker;

private DistributionManager distributionManager;
private Set<Flag> flags;
protected Configuration configuration;
protected ComponentRegistry componentRegistry;

private DistributionManager distributionManager;

public void injectComponents(Configuration configuration, ComponentRegistry componentRegistry) {
this.configuration = configuration;
this.componentRegistry = componentRegistry;
}

public Configuration getConfiguration() {
return configuration;
}

public ComponentRegistry getComponentRegistry() {
return componentRegistry;
}

public ClusteredGetCommand() {
}
Expand All @@ -79,8 +95,9 @@ public ClusteredGetCommand(Object key, String cacheName) {
this(key, cacheName, Collections.<Flag>emptySet());
}

public void initialize(DataContainer dataContainer, InvocationContextContainer icc, CommandsFactory commandsFactory, InterceptorChain interceptorChain) {
this.dataContainer = dataContainer;
public void initialize(InvocationContextContainer icc, CommandsFactory commandsFactory,
InterceptorChain interceptorChain, DistributionManager distributionManager) {
this.distributionManager = distributionManager;
this.icc = icc;
this.commandsFactory = commandsFactory;
this.invoker = interceptorChain;
Expand Down

0 comments on commit 0b87cdb

Please sign in to comment.