Skip to content

Commit

Permalink
IGNITE-transactions-in-multiple-threads
Browse files Browse the repository at this point in the history
  • Loading branch information
voipp committed Mar 30, 2017
1 parent db21f73 commit aa3487b
Show file tree
Hide file tree
Showing 19 changed files with 389 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,9 @@ public class TestTransaction implements Transaction {
@Override public IgniteFuture<Void> rollbackAsync() throws IgniteException {
return null;
}

/** {@inheritDoc} */
@Override public void stop() {
// No-op.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,10 @@ public Transaction txStart(TransactionConcurrency concurrency, TransactionIsolat
* Resets transaction metrics.
*/
public void resetMetrics();

/**
* restarts transaction if it was stopped lately via {@link org.apache.ignite.transactions.Transaction#stop() }
* @param tx transaction to restart
*/
void txStart(Transaction tx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ public GridDistributedTxRemoteAdapter(
return txState;
}

/** {@inheritDoc} */
@Override public void setTxState(Object state) {
this.txState = (IgniteTxRemoteState) state;
}

/** {@inheritDoc} */
@Override public UUID eventNodeId() {
return nodeId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,11 @@ private static class TxProxy implements Transaction {
throw new UnsupportedOperationException();
}

/** {@inheritDoc} */
@Override public void stop() {
throw new UnsupportedOperationException();
}

/** {@inheritDoc} */
@Override public IgniteAsyncSupport withAsync() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ public enum FinalizationStatus {
*/
public IgniteTxState txState();

/**
* explicitly sets txState
* @param state
*/
public void setTxState(Object state);

/**
* @return {@code true} or {@code false} if the deployment is enabled or disabled for all active caches involved
* in this transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.ignite.transactions.TransactionMetrics;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.transactions.TransactionState.ACTIVE;
import static org.apache.ignite.transactions.TransactionState.STOPPED;

/**
* Grid transactions implementation.
*/
Expand Down Expand Up @@ -195,6 +198,19 @@ private GridNearTxLocal txStart0(
cctx.resetTxMetrics();
}

/** {@inheritDoc} */
@Override public void txStart(Transaction tx) {
assert tx.state().equals(STOPPED) : "transaction must be stopped before being reopened";
TransactionProxyImpl transactionProxy = (TransactionProxyImpl) tx;
IgniteInternalTx internalTx = transactionProxy.tx();
transactionProxy.bindToCurrentThread(cctx);
((IgniteTxLocalAdapter) transactionProxy.tx()).bindToCurrentThread(cctx);
this.cctx.tm().reopenTx(transactionProxy.tx());
internalTx.state(ACTIVE);
if (!internalTx.txState().empty())
((IgniteTxLocalState) internalTx.txState()).initReadWriteViews();
}

/**
* @param ctx Cache context.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,7 @@
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
import static org.apache.ignite.transactions.TransactionState.ACTIVE;
import static org.apache.ignite.transactions.TransactionState.COMMITTING;
import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK;
import static org.apache.ignite.transactions.TransactionState.PREPARED;
import static org.apache.ignite.transactions.TransactionState.PREPARING;
import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK;
import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK;
import static org.apache.ignite.transactions.TransactionState.*;

/**
* Managed transaction adapter.
Expand Down Expand Up @@ -971,7 +965,7 @@ protected boolean state(TransactionState state, boolean timedOut) {

switch (state) {
case ACTIVE: {
valid = false;
valid = prev == STOPPED;

break;
} // Active is initial state and cannot be transitioned to.
Expand Down Expand Up @@ -1031,6 +1025,11 @@ protected boolean state(TransactionState state, boolean timedOut) {

break;
}

case STOPPED: {
valid = prev == ACTIVE;
break;
}
}

if (valid) {
Expand Down Expand Up @@ -1986,6 +1985,11 @@ private static class TxShadow implements IgniteInternalTx {
return null;
}

@Override
public void setTxState(Object state) {
throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
}

/** {@inheritDoc} */
@Override public Collection<UUID> masterNodeIds() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
return cacheCtx;
}

/** {@inheritDoc} */
@Override public List<GridCacheContext> cacheContexts(GridCacheSharedContext cctx) {
return Collections.singletonList(cacheCtx);
}

/** {@inheritDoc} */
@Nullable @Override public Integer firstCacheId() {
return cacheCtx != null ? cacheCtx.cacheId() : null;
Expand Down Expand Up @@ -262,6 +267,12 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
return false;
}

/** {@inheritDoc} */
@Override
public void initReadWriteViews() {
// No-op.
}

/** {@inheritDoc} */
@Override public boolean initialized() {
return init;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ public void syncMode(CacheWriteSynchronizationMode syncMode) {
return txState;
}

/** {@inheritDoc} */
@Override
public void setTxState(Object state) {
txState = (IgniteTxLocalState) state;
}

/**
* Creates result instance.
*/
Expand Down Expand Up @@ -1726,4 +1732,13 @@ protected abstract class PostMissClosure<T> implements IgniteBiClosure<T, Except
*/
protected abstract IgniteInternalFuture<T> postMiss(T t) throws IgniteCheckedException;
}

/**
* updating shared context for new thread(as a result of Ioc absence)
* @param cctx
*/
public void bindToCurrentThread(GridCacheSharedContext cctx) {
this.cctx = cctx;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public interface IgniteTxLocalState extends IgniteTxState {
*/
public boolean init(int txSize);

/**
* initialization of read and write views with latest state
*/
void initReadWriteViews();

/**
* @return {@code True} if init method was called.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1387,7 +1387,7 @@ else if (log.isDebugEnabled())
/**
* @param tx Transaction to clear.
*/
private void clearThreadMap(IgniteInternalTx tx) {
public void clearThreadMap(IgniteInternalTx tx) {
if (tx.local() && !tx.dht()) {
if (!tx.system())
threadMap.remove(tx.threadId(), tx);
Expand Down Expand Up @@ -2207,6 +2207,34 @@ public Collection<IgniteInternalFuture<?>> deadlockDetectionFutures() {
return (Collection<IgniteInternalFuture<?>>)values;
}

/**
* binding transaction to another thread
* @param tx
*/
public void reopenTx(IgniteInternalTx tx) {
assert tx != null : "transaction must not be empty";

long threadId = Thread.currentThread().getId();

//transaction was started on the current node
if (tx.nodeId().equals(this.cctx.localNodeId())) {
Long oldThreadId = null;
for (Map.Entry<Long, IgniteInternalTx> txEntry : this.threadMap.entrySet()) {
if (txEntry.getValue().xid().equals(tx.xid())) {
oldThreadId = txEntry.getKey();
break;
}
}
if (oldThreadId != null && threadId != oldThreadId) {
threadMap.remove(oldThreadId);
assert !threadMap.values().contains(tx) : "transaction duplicates founded";
threadMap.put(threadId, tx);
}
} else {
threadMap.put(threadId, tx);
}
}

/**
* Timeout object for node failure handler.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.jetbrains.annotations.Nullable;

import java.util.List;

import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;

/**
Expand Down Expand Up @@ -102,6 +104,12 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
return null;
}

/** {@inheritDoc} */
@Override
public List<GridCacheContext> cacheContexts(GridCacheSharedContext cctx) {
return null;
}

/** {@inheritDoc} */
@Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) {
assert false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.transactions;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
Expand Down Expand Up @@ -56,6 +57,13 @@ public interface IgniteTxState {
*/
@Nullable public GridCacheContext singleCacheContext(GridCacheSharedContext cctx);

/**
* extract cache contexts from shared context
* @param cctx
* @return
*/
List<GridCacheContext> cacheContexts(GridCacheSharedContext cctx);

/**
* @param cctx Awaits for previous async operations on active caches to be completed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.ignite.internal.processors.cache.transactions;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.*;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
Expand Down Expand Up @@ -97,6 +97,17 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
return null;
}

/** {@inheritDoc} */
@Override
public List<GridCacheContext> cacheContexts(GridCacheSharedContext cctx) {

ArrayList<GridCacheContext> cacheContexts = new ArrayList<>();
for (long l : activeCacheIds.array()) {
cacheContexts.add(cctx.cacheContext((int) l));
}
return cacheContexts;
}

/** {@inheritDoc} */
@Override public void awaitLastFut(GridCacheSharedContext cctx) {
for (int i = 0; i < activeCacheIds.size(); i++) {
Expand Down Expand Up @@ -365,6 +376,12 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
return false;
}

/** {@inheritDoc} */
@Override public void initReadWriteViews() {
readView = new IgniteTxMap(txMap, CU.reads());
writeView = new IgniteTxMap(txMap, CU.writes());
}

/** {@inheritDoc} */
@Override public boolean initialized() {
return txMap != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ private void leave() {
}
}

/** {@inheritDoc} */
@Override public void stop() {
tx.state(TransactionState.STOPPED);
}

/**
* @param res Result to convert to finished future.
*/
Expand Down Expand Up @@ -377,4 +382,12 @@ private IgniteFuture<?> createFuture(IgniteInternalFuture<IgniteInternalTx> fut)
@Override public String toString() {
return S.toString(TransactionProxyImpl.class, this);
}

/**
* updating context
* @param cctx
*/
public void bindToCurrentThread(GridCacheSharedContext<K, V> cctx) {
this.cctx = cctx;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,9 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport {
* @throws IgniteException If rollback failed.
*/
public IgniteFuture<Void> rollbackAsync() throws IgniteException;

/**
* Stops transaction, preventing it from further commit. Until it is restarted via {@link IgniteTransactions#txStart(org.apache.ignite.transactions.Transaction)}
*/
void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ public enum TransactionState {
ROLLED_BACK,

/** Transaction rollback failed or is otherwise unknown state. */
UNKNOWN;
UNKNOWN,

/**
* Transaction has been stopped by user
*/
STOPPED;

/** Enumerated values. */
private static final TransactionState[] VALS = values();
Expand Down

0 comments on commit aa3487b

Please sign in to comment.