Skip to content

Commit

Permalink
ISPN-5046 PartitionHandling: split during commit can leave the cache …
Browse files Browse the repository at this point in the history
…inconsistent after merge

* keep the transaction resources while the partition is not healed.
* when the merge happens, complete the pending transactions.
  • Loading branch information
pruivo authored and danberindei committed Jun 18, 2015
1 parent c1bb697 commit ce4f629
Show file tree
Hide file tree
Showing 65 changed files with 2,116 additions and 574 deletions.
8 changes: 2 additions & 6 deletions core/src/main/java/org/infinispan/cache/impl/CacheImpl.java
Expand Up @@ -948,11 +948,7 @@ public String getCacheStatus() {


@Override @Override
public AvailabilityMode getAvailability() { public AvailabilityMode getAvailability() {
if (partitionHandlingManager != null) { return partitionHandlingManager.getAvailabilityMode();
return partitionHandlingManager.getAvailabilityMode();
} else {
return AvailabilityMode.AVAILABLE;
}
} }


@Override @Override
Expand Down Expand Up @@ -1071,7 +1067,7 @@ public Stats getStats() {


@Override @Override
public XAResource getXAResource() { public XAResource getXAResource() {
return new TransactionXaAdapter(txTable, recoveryManager, txCoordinator, commandsFactory, rpcManager, null, config, name); return new TransactionXaAdapter(txTable, recoveryManager, txCoordinator, commandsFactory, rpcManager, null, config, name, partitionHandlingManager);
} }


@Override @Override
Expand Down
Expand Up @@ -114,7 +114,7 @@ public GlobalTransaction getGlobalTransaction() {
private void forwardCommandRemotely(RemoteTransaction remoteTx) { private void forwardCommandRemotely(RemoteTransaction remoteTx) {
Set<Object> affectedKeys = remoteTx.getAffectedKeys(); Set<Object> affectedKeys = remoteTx.getAffectedKeys();
log.tracef("Invoking forward of TxCompletionNotification for transaction %s. Affected keys: %s", gtx, affectedKeys); log.tracef("Invoking forward of TxCompletionNotification for transaction %s. Affected keys: %s", gtx, affectedKeys);
stateTransferManager.forwardCommandIfNeeded(this, affectedKeys, remoteTx.getGlobalTransaction().getAddress(), false); stateTransferManager.forwardCommandIfNeeded(this, affectedKeys, remoteTx.getGlobalTransaction().getAddress());
} }


@Override @Override
Expand Down
Expand Up @@ -2,6 +2,7 @@




import org.infinispan.factories.annotations.DefaultFactoryFor; import org.infinispan.factories.annotations.DefaultFactoryFor;
import org.infinispan.partitionhandling.impl.AvailablePartitionHandlingManager;
import org.infinispan.partitionhandling.impl.PartitionHandlingManager; import org.infinispan.partitionhandling.impl.PartitionHandlingManager;
import org.infinispan.partitionhandling.impl.PartitionHandlingManagerImpl; import org.infinispan.partitionhandling.impl.PartitionHandlingManagerImpl;


Expand All @@ -10,8 +11,8 @@
* @since 7.0 * @since 7.0
*/ */
@DefaultFactoryFor(classes = PartitionHandlingManager.class) @DefaultFactoryFor(classes = PartitionHandlingManager.class)
public class PartitionHandlingManagerFactory extends AbstractNamedCacheComponentFactory implements public class PartitionHandlingManagerFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
AutoInstantiableFactory {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T> T construct(Class<T> componentType) { public <T> T construct(Class<T> componentType) {
Expand All @@ -21,6 +22,6 @@ public <T> T construct(Class<T> componentType) {
return (T) new PartitionHandlingManagerImpl(); return (T) new PartitionHandlingManagerImpl();
} }
} }
return null; return (T) AvailablePartitionHandlingManager.getInstance();
} }
} }
186 changes: 102 additions & 84 deletions core/src/main/java/org/infinispan/interceptors/TxInterceptor.java

Large diffs are not rendered by default.

@@ -1,7 +1,6 @@
package org.infinispan.interceptors.distribution; package org.infinispan.interceptors.distribution;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
Expand Down Expand Up @@ -267,11 +266,9 @@ protected final Object handleNonTxWriteCommand(InvocationContext ctx, DataWriteC
throw new CacheException("Attempted execution of non-transactional write command in a transactional invocation context"); throw new CacheException("Attempted execution of non-transactional write command in a transactional invocation context");
} }


RecipientGenerator recipientGenerator = new SingleKeyRecipientGenerator(command.getKey());

// see if we need to load values from remote sources first // see if we need to load values from remote sources first
if (needValuesFromPreviousOwners(ctx, command)) { if (needValuesFromPreviousOwners(ctx, command)) {
remoteGetBeforeWrite(ctx, command, recipientGenerator); remoteGetBeforeWrite(ctx, command, command.getKey());
} }


// invoke the command locally, we need to know if it's successful or not // invoke the command locally, we need to know if it's successful or not
Expand Down Expand Up @@ -309,7 +306,7 @@ protected final Object handleNonTxWriteCommand(InvocationContext ctx, DataWriteC
log.tracef("Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", command); log.tracef("Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", command);
return localResult; return localResult;
} }
List<Address> recipients = recipientGenerator.generateRecipients(); List<Address> recipients = cdl.getOwners(command.getKey());
// Ignore the previous value on the backup owners // Ignore the previous value on the backup owners
command.setValueMatcher(ValueMatcher.MATCH_ALWAYS); command.setValueMatcher(ValueMatcher.MATCH_ALWAYS);
try { try {
Expand All @@ -327,7 +324,7 @@ protected final Object handleNonTxWriteCommand(InvocationContext ctx, DataWriteC
log.tracef("Skipping the replication of the command as it did not succeed on primary owner (%s).", command); log.tracef("Skipping the replication of the command as it did not succeed on primary owner (%s).", command);
return localResult; return localResult;
} }
List<Address> recipients = recipientGenerator.generateRecipients(); List<Address> recipients = cdl.getOwners(command.getKey());
log.tracef("I'm the primary owner, sending the command to all the backups (%s) in order to be applied.", log.tracef("I'm the primary owner, sending the command to all the backups (%s) in order to be applied.",
recipients); recipients);
// check if a single owner has been configured and the target for the key is the local address // check if a single owner has been configured and the target for the key is the local address
Expand Down Expand Up @@ -517,59 +514,5 @@ public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) t
*/ */
protected abstract boolean needValuesFromPreviousOwners(InvocationContext ctx, WriteCommand command); protected abstract boolean needValuesFromPreviousOwners(InvocationContext ctx, WriteCommand command);


protected abstract void remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, RecipientGenerator keygen) throws Throwable; protected abstract void remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, Object key) throws Throwable;

interface RecipientGenerator {

Collection<Object> getKeys();

List<Address> generateRecipients();
}

class SingleKeyRecipientGenerator implements RecipientGenerator {
private final Object key;
private final Set<Object> keys;
private List<Address> recipients = null;

SingleKeyRecipientGenerator(Object key) {
this.key = key;
keys = Collections.singleton(key);
}

@Override
public List<Address> generateRecipients() {
if (recipients == null) {
recipients = cdl.getOwners(key);
}
return recipients;
}

@Override
public Collection<Object> getKeys() {
return keys;
}
}

class MultipleKeysRecipientGenerator implements RecipientGenerator {

private final Collection<Object> keys;
private List<Address> recipients = null;

MultipleKeysRecipientGenerator(Collection<Object> keys) {
this.keys = keys;
}

@Override
public List<Address> generateRecipients() {
if (recipients == null) {
recipients = cdl.getOwners(keys);
}
return recipients;
}

@Override
public Collection<Object> getKeys() {
return keys;
}
}
} }
Expand Up @@ -204,13 +204,11 @@ public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command)
return handleNonTxWriteCommand(ctx, command); return handleNonTxWriteCommand(ctx, command);
} }


protected void remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, RecipientGenerator keygen) throws Throwable { protected void remoteGetBeforeWrite(InvocationContext ctx, WriteCommand command, Object key) throws Throwable {
for (Object k : keygen.getKeys()) { if (cdl.localNodeIsPrimaryOwner(key)) {
if (cdl.localNodeIsPrimaryOwner(k)) { // Then it makes sense to try a local get and wrap again. This will compensate the fact the the entry was not local
// Then it makes sense to try a local get and wrap again. This will compensate the fact the the entry was not local // earlier when the EntryWrappingInterceptor executed during current invocation context but it should be now.
// earlier when the EntryWrappingInterceptor executed during current invocation context but it should be now. localGetCacheEntry(ctx, key, true, command);
localGetCacheEntry(ctx, k, true, command);
}
} }
} }


Expand Down

0 comments on commit ce4f629

Please sign in to comment.