Skip to content

Commit

Permalink
Revert "ISPN-7986 Hot RodTransaction (Server only)"
Browse files Browse the repository at this point in the history
Reverts all the commits from PR
  • Loading branch information
pruivo authored and tristantarrant committed Sep 29, 2017
1 parent 9a6e702 commit bad6b4b
Show file tree
Hide file tree
Showing 53 changed files with 153 additions and 3,716 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class TransactionImpl implements Transaction {

private static final Log log = LogFactory.getLog(TransactionImpl.class);
private static final String FORCE_ROLLBACK_MESSAGE = "Force rollback invoked. (debug mode)";
private static final boolean trace = log.isTraceEnabled();
private static boolean trace = log.isTraceEnabled();
private final List<Synchronization> syncs;
private final List<Map.Entry<XAResource, Integer>> resources;
private final Object xidLock = new Object();
Expand Down Expand Up @@ -345,7 +345,7 @@ public void runCommit(boolean forceRollback)
public String toString() {
return "EmbeddedTransaction{" +
"xid=" + xid +
", status=" + Util.transactionStatusToString(status) +
", status=" + statusToString() +
'}';
}

Expand Down Expand Up @@ -378,6 +378,33 @@ public final boolean equals(Object obj) {
return this == obj;
}

private String statusToString() {
switch (status) {
case Status.STATUS_ACTIVE:
return "ACTIVE";
case Status.STATUS_MARKED_ROLLBACK:
return "MARKED_ROLLBACK";
case Status.STATUS_PREPARED:
return "PREPARED";
case Status.STATUS_COMMITTED:
return "COMMITTED";
case Status.STATUS_ROLLEDBACK:
return "ROLLED_BACK";
case Status.STATUS_UNKNOWN:
return "UNKNOWN";
case Status.STATUS_NO_TRANSACTION:
return "NO_TRANSACTION";
case Status.STATUS_PREPARING:
return "PREPARING";
case Status.STATUS_COMMITTING:
return "COMMITTING";
case Status.STATUS_ROLLING_BACK:
return "ROLLING_BACK";
default:
return "unknown status (" + status + ")";
}
}

private void throwRollbackExceptionIfAny(boolean forceRollback) throws RollbackException {
if (firstRollbackException != null) {
if (forceRollback && FORCE_ROLLBACK_MESSAGE.equals(firstRollbackException.getMessage())) {
Expand Down
43 changes: 0 additions & 43 deletions commons/src/main/java/org/infinispan/commons/tx/Util.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public AdvancedCache<K, V> withFlags(final Flag... flags) {

@Override
public AdvancedCache<K, V> lockAs(Object lockOwner) {
Objects.requireNonNull(lockOwner);
Objects.nonNull(lockOwner);
if (lockOwner != this.lockOwner) {
return new DecoratedCache<>(cacheImplementation, lockOwner, flags);
}
Expand Down Expand Up @@ -456,7 +456,7 @@ public boolean containsKey(Object key) {

@Override
public boolean containsValue(Object value) {
Objects.requireNonNull(value);
Objects.nonNull(value);
return values().stream().anyMatch(StreamMarshalling.equalityPredicate(value));
}

Expand Down Expand Up @@ -636,15 +636,15 @@ public CacheEntry getCacheEntry(Object key) {
return cacheImplementation.getCacheEntry(key, flags, readContext(1));
}

protected InvocationContext readContext(int size) {
InvocationContext readContext(int size) {
InvocationContext ctx = cacheImplementation.invocationContextFactory.createInvocationContext(false, size);
if (lockOwner != null) {
ctx.setLockOwner(lockOwner);
}
return ctx;
}

protected InvocationContext writeContext(int size) {
InvocationContext writeContext(int size) {
InvocationContext ctx = cacheImplementation.getInvocationContextWithImplicitTransaction(false, size);
if (lockOwner != null) {
ctx.setLockOwner(lockOwner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,11 +915,4 @@ public <C> void addFilteredListener(Object listener,
super.addFilteredListener(listener, filter, converter, filterAnnotations);
}
}

//HACK!
public EncoderCache<K, V> withCache(AdvancedCache<K, V> otherCache) {
EncoderCache<K, V> cache = new EncoderCache<>(otherCache, keyDataConversion, valueDataConversion);
initState(cache, this);
return cache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@
import org.infinispan.statetransfer.CommitManager;
import org.infinispan.statetransfer.StateTransferLock;
import org.infinispan.statetransfer.StateTransferLockImpl;
import org.infinispan.transaction.impl.ClusteredTransactionOriginatorChecker;
import org.infinispan.transaction.impl.TransactionCoordinator;
import org.infinispan.transaction.impl.TransactionOriginatorChecker;
import org.infinispan.transaction.totalorder.TotalOrderManager;
import org.infinispan.transaction.xa.TransactionFactory;
import org.infinispan.transaction.xa.recovery.RecoveryAdminOperations;
Expand Down Expand Up @@ -75,7 +73,7 @@
RemoteValueRetrievedListener.class, InvocationContextFactory.class, CommitManager.class,
XSiteStateTransferManager.class, XSiteStateConsumer.class, XSiteStateProvider.class,
FunctionalNotifier.class, CommandAckCollector.class, TriangleOrderManager.class,
OrderedUpdatesManager.class, ScatteredVersionManager.class, TransactionOriginatorChecker.class})
OrderedUpdatesManager.class, ScatteredVersionManager.class})
public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {

@Override
Expand Down Expand Up @@ -161,10 +159,6 @@ public <T> T construct(Class<T> componentType) {
return componentType.cast(new OrderedUpdatesManagerImpl());
} else if (componentType.equals(ScatteredVersionManager.class)) {
return componentType.cast(new ScatteredVersionManagerImpl());
} else if (componentType.equals(TransactionOriginatorChecker.class)) {
return configuration.clustering().cacheMode() == CacheMode.LOCAL ?
componentType.cast(TransactionOriginatorChecker.LOCAL) :
componentType.cast(new ClusteredTransactionOriginatorChecker());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.jmx.annotations.MeasurementType;
import org.infinispan.partitionhandling.impl.PartitionHandlingManager;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.stream.impl.interceptor.AbstractDelegatingEntryCacheSet;
import org.infinispan.stream.impl.interceptor.AbstractDelegatingKeyCacheSet;
Expand Down Expand Up @@ -99,24 +101,29 @@ public class TxInterceptor<K, V> extends DDAsyncInterceptor implements JmxStatis
private final AtomicLong commits = new AtomicLong(0);
private final AtomicLong rollbacks = new AtomicLong(0);

private RpcManager rpcManager;
private CommandsFactory commandsFactory;
private Cache<K, V> cache;
private RecoveryManager recoveryManager;
private TransactionTable txTable;
private PartitionHandlingManager partitionHandlingManager;

private boolean isTotalOrder;
private boolean useOnePhaseForAutoCommitTx;
private boolean useVersioning;
private boolean statisticsEnabled;

@Inject
public void init(TransactionTable txTable, Configuration configuration, RecoveryManager recoveryManager,
CommandsFactory commandsFactory, Cache<K, V> cache) {
public void init(TransactionTable txTable, Configuration configuration, RpcManager rpcManager,
RecoveryManager recoveryManager, CommandsFactory commandsFactory, Cache<K, V> cache,
PartitionHandlingManager partitionHandlingManager) {
this.cacheConfiguration = configuration;
this.txTable = txTable;
this.rpcManager = rpcManager;
this.recoveryManager = recoveryManager;
this.commandsFactory = commandsFactory;
this.cache = cache;
this.partitionHandlingManager = partitionHandlingManager;

statisticsEnabled = cacheConfiguration.jmxStatistics().enabled();
isTotalOrder = configuration.transaction().transactionProtocol().isTotalOrder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@
import org.infinispan.remoting.transport.Address;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.impl.LocalTransaction;
import org.infinispan.transaction.impl.TransactionOriginatorChecker;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.xa.CacheTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
Expand Down Expand Up @@ -71,7 +69,6 @@ public class StateProviderImpl implements StateProvider {
protected int chunkSize;
protected KeyPartitioner keyPartitioner;
protected StateConsumer stateConsumer;
private TransactionOriginatorChecker transactionOriginatorChecker;

/**
* A map that keeps track of current outbound state transfers by destination address. There could be multiple transfers
Expand All @@ -94,8 +91,7 @@ public void init(Cache cache,
TransactionTable transactionTable,
StateTransferLock stateTransferLock,
StateConsumer stateConsumer, InternalEntryFactory entryFactory,
KeyPartitioner keyPartitioner,
TransactionOriginatorChecker transactionOriginatorChecker) {
KeyPartitioner keyPartitioner) {
this.cacheName = cache.getName();
this.executorService = executorService;
this.configuration = configuration;
Expand All @@ -108,7 +104,6 @@ public void init(Cache cache,
this.stateTransferLock = stateTransferLock;
this.stateConsumer = stateConsumer;
this.entryFactory = entryFactory;
this.transactionOriginatorChecker = transactionOriginatorChecker;

timeout = configuration.clustering().stateTransfer().timeout();

Expand Down Expand Up @@ -233,15 +228,13 @@ private void collectTransactionsToTransfer(Address destination,
Collection<? extends CacheTransaction> transactions,
Set<Integer> segments, CacheTopology cacheTopology) {
int topologyId = cacheTopology.getTopologyId();
Set<Address> members = new HashSet<>(cacheTopology.getMembers());
List<Address> members = cacheTopology.getMembers();

// no need to filter out state transfer generated transactions because there should not be any such transactions running for any of the requested segments
for (CacheTransaction tx : transactions) {
final GlobalTransaction gtx = tx.getGlobalTransaction();
// Skip transactions whose originators left. The topology id check is needed for joiners.
// Also skip transactions that originates after state transfer starts.
if (tx.getTopologyId() == topologyId ||
(transactionOriginatorChecker.isOriginatorMissing(gtx, members))) {
if (tx.getTopologyId() == topologyId || !members.contains(tx.getGlobalTransaction().getAddress())) {
if (trace) log.tracef("Skipping transaction %s as it was started in the current topology or by a leaver", tx);
continue;
}
Expand Down Expand Up @@ -282,9 +275,9 @@ private void collectTransactionsToTransfer(Address destination,
LocalTransaction localTx = (LocalTransaction) tx;
localTx.locksAcquired(Collections.singleton(destination));
if (trace) log.tracef("Adding affected node %s to transferred transaction %s (keys %s)", destination,
gtx, filteredLockedKeys);
tx.getGlobalTransaction(), filteredLockedKeys);
}
transactionsToTransfer.add(new TransactionInfo(gtx, tx.getTopologyId(),
transactionsToTransfer.add(new TransactionInfo(tx.getGlobalTransaction(), tx.getTopologyId(),
modifications, filteredLockedKeys));
}
}
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit bad6b4b

Please sign in to comment.