Skip to content

Commit

Permalink
ISPN-1137 - Locking optimization: only lock main data owner (dist only)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmarkus authored and Dan Berindei committed Nov 8, 2011
1 parent a1d85d4 commit 2167afb
Show file tree
Hide file tree
Showing 128 changed files with 4,833 additions and 2,135 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/org/infinispan/CacheImpl.java
Expand Up @@ -454,7 +454,7 @@ boolean lock(Collection<? extends K> keys, EnumSet<Flag> explicitFlags, ClassLoa
throw new IllegalArgumentException("Cannot lock empty list of keys");
}
InvocationContext ctx = getInvocationContextForWrite(explicitFlags, explicitClassLoader);
LockControlCommand command = commandsFactory.buildLockControlCommand(keys, false, ctx.getFlags());
LockControlCommand command = commandsFactory.buildLockControlCommand(keys, ctx.getFlags());
return (Boolean) invoker.invoke(ctx, command);
}

Expand Down Expand Up @@ -616,7 +616,7 @@ public Stats getStats() {

@Override
public XAResource getXAResource() {
return new TransactionXaAdapter(null, txTable, config, recoveryManager, txCoordinator);
return new TransactionXaAdapter(txTable, config, recoveryManager, txCoordinator, commandsFactory, rpcManager, null, config);
}

public final V put(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit idleTimeUnit) {
Expand Down
Expand Up @@ -124,27 +124,24 @@ public Object getKeyForAddress(Address address) {

try {
maxNumberInvariant.readLock().lock();
Object result;
Object result = null;
try {
// first try to take an element without waiting
result = queue.poll();
if (result == null) {
// there are no elements in the queue, make sure the producer is started
keyProducerStartLatch.open();
// our address might have been removed from the consistent hash
if (!address.equals(getAddressForKey(address)))
throw new IllegalStateException("Address " + address + " is no longer in the cluster");

result = queue.take();
while (result == null && !keyGenWorker.isStopped()) {
// first try to take an element without waiting
result = queue.poll();
if (result == null) {
// there are no elements in the queue, make sure the producer is started
keyProducerStartLatch.open();
// our address might have been removed from the consistent hash
if (!address.equals(getAddressForKey(address)))
throw new IllegalStateException("Address " + address + " is no longer in the cluster");
}
}
} finally {
maxNumberInvariant.readLock().unlock();
}
exitingNumberOfKeys.decrementAndGet();
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
if (queue.size() < maxNumberOfKeys.get() * THRESHOLD + 1) {
keyProducerStartLatch.open();
Expand Down Expand Up @@ -214,7 +211,7 @@ public void handleViewChange(TopologyChangedEvent vce) {
}

public boolean isKeyGeneratorThreadAlive() {
return keyGenWorker.isAlive() ;
return !keyGenWorker.isStopped();
}

public void handleCacheStopped(CacheStoppedEvent cse) {
Expand All @@ -228,27 +225,33 @@ public void handleCacheStopped(CacheStoppedEvent cse) {
public class KeyGeneratorWorker implements Runnable {

private volatile boolean isActive;
private boolean isAlive;
private volatile boolean isStopped = false;
private volatile Thread runner;

@Override
public void run() {
this.runner = Thread.currentThread();
isAlive = true;
while (true) {
if (waitToBeWakenUp()) break;
isActive = true;
if (log.isTraceEnabled()) {
log.trace("KeyGeneratorWorker marked as ACTIVE");
}
generateKeys();

isActive = false;
if (log.isTraceEnabled()) {
log.trace("KeyGeneratorWorker marked as INACTIVE");
try {
while (true) {
if (waitToBeWakenUp()) break;
isActive = true;
if (log.isTraceEnabled()) {
log.trace("KeyGeneratorWorker marked as ACTIVE");
}
generateKeys();

isActive = false;
if (log.isTraceEnabled()) {
log.trace("KeyGeneratorWorker marked as INACTIVE");
}
}
} finally {
isStopped = true;
}
isAlive = false;
}

public boolean isStopped() {
return isStopped;
}

private void generateKeys() {
Expand Down Expand Up @@ -302,10 +305,6 @@ public boolean isActive() {
return isActive;
}

public boolean isAlive() {
return isAlive;
}

public void stop() {
runner.interrupt();
}
Expand Down
24 changes: 14 additions & 10 deletions core/src/main/java/org/infinispan/commands/CommandsFactory.java
Expand Up @@ -38,7 +38,7 @@
import org.infinispan.commands.remote.recovery.CompleteTransactionCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTransactionsCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTxInfoCommand;
import org.infinispan.commands.remote.recovery.RemoveRecoveryInfoCommand;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
Expand Down Expand Up @@ -244,17 +244,22 @@ public interface CommandsFactory {

/**
* Builds a LockControlCommand to control explicit remote locking
*
* @param keys keys to lock
* @param implicit whether the lock command was implicit (triggered internally) or explicit (triggered by an API call)
* @param gtx
* @return a LockControlCommand
*/
LockControlCommand buildLockControlCommand(Collection keys, boolean implicit, Set<Flag> flags);
LockControlCommand buildLockControlCommand(Collection keys, boolean implicit, Set<Flag> flags, GlobalTransaction gtx);

/**
* Same as {@link #buildLockControlCommand(java.util.Collection, boolean, java.util.Set)} but for locking a single key
* vs a collection of keys.
* Same as {@link #buildLockControlCommand(Object, boolean, java.util.Set,
* org.infinispan.transaction.xa.GlobalTransaction)} but for locking a single key vs a collection of keys.
*/
LockControlCommand buildLockControlCommand(Object key, boolean implicit, Set<Flag> flags);
LockControlCommand buildLockControlCommand(Object key, boolean implicit, Set<Flag> flags, GlobalTransaction gtx);


LockControlCommand buildLockControlCommand(Collection keys, Set<Flag> flags);

/**
* Builds a RehashControlCommand for coordinating a rehash event. This version of this factory method creates a simple
Expand Down Expand Up @@ -285,9 +290,9 @@ StateTransferControlCommand buildStateTransferCommand(StateTransferControlComman
GetInDoubtTransactionsCommand buildGetInDoubtTransactionsCommand();

/**
* Builds a {@link org.infinispan.commands.remote.recovery.RemoveRecoveryInfoCommand}.
* Builds a {@link org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand}.
*/
RemoveRecoveryInfoCommand buildRemoveRecoveryInfoCommand(Xid xid);
TxCompletionNotificationCommand buildTxCompletionNotificationCommand(Xid xid, GlobalTransaction globalTransaction);

/**
* Builds a DistributedExecuteCommand used for migration and execution of distributed Callables and Runnables.
Expand Down Expand Up @@ -324,9 +329,9 @@ StateTransferControlCommand buildStateTransferCommand(StateTransferControlComman

/**
* @param internalId the internal id identifying the transaction to be removed.
* @see RemoveRecoveryInfoCommand
* @see org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand
*/
RemoveRecoveryInfoCommand buildRemoveRecoveryInfoCommand(long internalId);
TxCompletionNotificationCommand buildTxCompletionNotificationCommand(long internalId);


/**
Expand All @@ -336,5 +341,4 @@ StateTransferControlCommand buildStateTransferCommand(StateTransferControlComman
* @see ApplyDeltaCommand
*/
ApplyDeltaCommand buildApplyDeltaCommand(Object deltaAwareValueKey, Delta delta, Collection keys);

}
42 changes: 25 additions & 17 deletions core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java
Expand Up @@ -40,7 +40,7 @@
import org.infinispan.commands.remote.recovery.CompleteTransactionCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTransactionsCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTxInfoCommand;
import org.infinispan.commands.remote.recovery.RemoveRecoveryInfoCommand;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
Expand Down Expand Up @@ -75,6 +75,7 @@
import org.infinispan.transaction.xa.DldGlobalTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.transaction.xa.recovery.RecoveryManager;
import org.infinispan.util.concurrent.locks.LockManager;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

Expand Down Expand Up @@ -114,6 +115,7 @@ public class CommandsFactoryImpl implements CommandsFactory {
private Configuration configuration;
private RecoveryManager recoveryManager;
private StateTransferManager stateTransferManager;
private LockManager lockManager;

private Map<Byte, ModuleCommandInitializer> moduleCommandInitializers;

Expand All @@ -122,7 +124,7 @@ public void setupDependencies(DataContainer container, CacheNotifier notifier, C
InterceptorChain interceptorChain, DistributionManager distributionManager,
InvocationContextContainer icc, TransactionTable txTable, Configuration configuration,
@ComponentName(KnownComponentNames.MODULE_COMMAND_INITIALIZERS) Map<Byte, ModuleCommandInitializer> moduleCommandInitializers,
RecoveryManager recoveryManager, StateTransferManager stateTransferManager) {
RecoveryManager recoveryManager, StateTransferManager stateTransferManager, LockManager lockManager) {
this.dataContainer = container;
this.notifier = notifier;
this.cache = cache;
Expand All @@ -134,6 +136,7 @@ public void setupDependencies(DataContainer container, CacheNotifier notifier, C
this.moduleCommandInitializers = moduleCommandInitializers;
this.recoveryManager = recoveryManager;
this.stateTransferManager = stateTransferManager;
this.lockManager = lockManager;
}

@Start(priority = 1)
Expand Down Expand Up @@ -332,9 +335,9 @@ public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) {
GetInDoubtTransactionsCommand gptx = (GetInDoubtTransactionsCommand) c;
gptx.init(recoveryManager);
break;
case RemoveRecoveryInfoCommand.COMMAND_ID:
RemoveRecoveryInfoCommand ftx = (RemoveRecoveryInfoCommand) c;
ftx.init(recoveryManager);
case TxCompletionNotificationCommand.COMMAND_ID:
TxCompletionNotificationCommand ftx = (TxCompletionNotificationCommand) c;
ftx.init(txTable, lockManager, recoveryManager);
break;
case MapReduceCommand.COMMAND_ID:
MapReduceCommand mrc = (MapReduceCommand)c;
Expand Down Expand Up @@ -365,12 +368,17 @@ public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) {
}
}

public LockControlCommand buildLockControlCommand(Collection keys, boolean implicit, Set<Flag> flags) {
return new LockControlCommand(keys, cacheName, flags, implicit);
public LockControlCommand buildLockControlCommand(Collection keys, boolean implicit, Set<Flag> flags, GlobalTransaction gtx) {
return new LockControlCommand(keys, cacheName, flags, gtx);
}

public LockControlCommand buildLockControlCommand(Object key, boolean implicit, Set<Flag> flags) {
return new LockControlCommand(key, cacheName, flags, implicit);
public LockControlCommand buildLockControlCommand(Object key, boolean implicit, Set<Flag> flags, GlobalTransaction gtx) {
return new LockControlCommand(key, cacheName, flags, gtx);
}

@Override
public LockControlCommand buildLockControlCommand(Collection keys, Set<Flag> flags) {
return new LockControlCommand(keys, cacheName, flags, null);
}

public StateTransferControlCommand buildStateTransferCommand(StateTransferControlCommand.Type type, Address sender,
Expand All @@ -393,10 +401,15 @@ public GetInDoubtTransactionsCommand buildGetInDoubtTransactionsCommand() {
}

@Override
public RemoveRecoveryInfoCommand buildRemoveRecoveryInfoCommand(Xid xid) {
return new RemoveRecoveryInfoCommand(xid, cacheName);
public TxCompletionNotificationCommand buildTxCompletionNotificationCommand(Xid xid, GlobalTransaction globalTransaction) {
return new TxCompletionNotificationCommand(xid, globalTransaction, cacheName);
}


@Override
public TxCompletionNotificationCommand buildTxCompletionNotificationCommand(long internalId) {
return new TxCompletionNotificationCommand(internalId, cacheName);
}

@Override
public <T> DistributedExecuteCommand<T> buildDistributedExecuteCommand(Callable<T> callable, Address sender, Collection keys) {
return new DistributedExecuteCommand(keys, callable);
Expand All @@ -417,11 +430,6 @@ public CompleteTransactionCommand buildCompleteTransactionCommand(Xid xid, boole
return new CompleteTransactionCommand(cacheName, xid, commit);
}

@Override
public RemoveRecoveryInfoCommand buildRemoveRecoveryInfoCommand(long internalId) {
return new RemoveRecoveryInfoCommand(internalId, cacheName);
}

@Override
public ApplyDeltaCommand buildApplyDeltaCommand(Object deltaAwareValueKey, Delta delta, Collection keys) {
return new ApplyDeltaCommand(deltaAwareValueKey, delta, keys);
Expand Down
Expand Up @@ -38,7 +38,7 @@
import org.infinispan.commands.remote.recovery.CompleteTransactionCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTransactionsCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTxInfoCommand;
import org.infinispan.commands.remote.recovery.RemoveRecoveryInfoCommand;
import org.infinispan.commands.remote.recovery.TxCompletionNotificationCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
Expand Down Expand Up @@ -191,8 +191,8 @@ public CacheRpcCommand fromStream(byte id, Object[] parameters, byte type, Strin
case RemoveCacheCommand.COMMAND_ID:
command = new RemoveCacheCommand(cacheName, cacheManager, registry);
break;
case RemoveRecoveryInfoCommand.COMMAND_ID:
command = new RemoveRecoveryInfoCommand(cacheName);
case TxCompletionNotificationCommand.COMMAND_ID:
command = new TxCompletionNotificationCommand(cacheName);
break;
case GetInDoubtTransactionsCommand.COMMAND_ID:
command = new GetInDoubtTransactionsCommand(cacheName);
Expand Down

0 comments on commit 2167afb

Please sign in to comment.