Skip to content

Commit

Permalink
WFLY-6224 ISE "transaction is not in a valid state" when attempting t…
Browse files Browse the repository at this point in the history
…o create batch, but non-active batch is still associated with current thread.
  • Loading branch information
pferraro committed Jun 19, 2017
1 parent 7ae0e2e commit 4d85b99
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 45 deletions.
Expand Up @@ -114,4 +114,21 @@ public void close() {
}
}
}

@Override
public int hashCode() {
return this.tx.hashCode();
}

@Override
public boolean equals(Object object) {
if (!(object instanceof InfinispanBatch)) return false;
InfinispanBatch batch = (InfinispanBatch) object;
return this.tx.equals(batch.tx);
}

@Override
public String toString() {
return this.tx.toString();
}
}
Expand Up @@ -31,6 +31,7 @@

import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.wildfly.clustering.ee.Batch;
import org.wildfly.clustering.ee.BatchContext;
import org.wildfly.clustering.ee.Batcher;

Expand Down Expand Up @@ -76,16 +77,28 @@ public TransactionBatch interpose() {
};

// Used to coalesce interposed transactions
static final ThreadLocal<TransactionBatch> CURRENT_BATCH = new ThreadLocal<>();
private static final ThreadLocal<TransactionBatch> CURRENT_BATCH = new ThreadLocal<>();

private static final Synchronization CURRENT_BATCH_REMOVER = new Synchronization() {
static TransactionBatch getCurrentBatch() {
return CURRENT_BATCH.get();
}

static void setCurrentBatch(TransactionBatch batch) {
if (batch != null) {
CURRENT_BATCH.set(batch);
} else {
CURRENT_BATCH.remove();
}
}

private static final Synchronization CURRENT_BATCH_SYNCHRONIZATION = new Synchronization() {
@Override
public void beforeCompletion() {
}

@Override
public void afterCompletion(int status) {
CURRENT_BATCH.remove();
setCurrentBatch(null);
}
};

Expand All @@ -102,17 +115,17 @@ public InfinispanBatcher(TransactionManager tm) {
@Override
public TransactionBatch createBatch() {
if (this.tm == null) return NON_TX_BATCH;
TransactionBatch batch = CURRENT_BATCH.get();
if (batch != null) {
return batch.interpose();
}
TransactionBatch batch = getCurrentBatch();
try {
if ((batch != null) && (batch.getState() == Batch.State.ACTIVE)) {
return batch.interpose();
}
this.tm.suspend();
this.tm.begin();
Transaction tx = this.tm.getTransaction();
tx.registerSynchronization(CURRENT_BATCH_REMOVER);
tx.registerSynchronization(CURRENT_BATCH_SYNCHRONIZATION);
batch = new InfinispanBatch(tx);
CURRENT_BATCH.set(batch);
setCurrentBatch(batch);
return batch;
} catch (RollbackException | SystemException | NotSupportedException e) {
throw new CacheException(e);
Expand All @@ -121,15 +134,15 @@ public TransactionBatch createBatch() {

@Override
public BatchContext resumeBatch(TransactionBatch batch) {
TransactionBatch existingBatch = CURRENT_BATCH.get();
TransactionBatch existingBatch = getCurrentBatch();
// Trivial case - nothing to suspend/resume
if (batch == existingBatch) return PASSIVE_BATCH_CONTEXT;
Transaction tx = (batch != null) ? batch.getTransaction() : null;
// Non-tx case, just swap thread local
// Non-tx case, just swap batch references
if ((batch == null) || (tx == null)) {
CURRENT_BATCH.set(batch);
setCurrentBatch(batch);
return () -> {
CURRENT_BATCH.set(existingBatch);
setCurrentBatch(existingBatch);
};
}
try {
Expand All @@ -140,22 +153,21 @@ public BatchContext resumeBatch(TransactionBatch batch) {
}
}
this.tm.resume(tx);
CURRENT_BATCH.set(batch);
setCurrentBatch(batch);
return () -> {
try {
this.tm.suspend();
if (existingBatch != null) {
try {
this.tm.resume(existingBatch.getTransaction());
CURRENT_BATCH.set(existingBatch);
} catch (InvalidTransactionException e) {
throw new CacheException(e);
}
} else {
CURRENT_BATCH.remove();
}
} catch (SystemException e) {
throw new CacheException(e);
} finally {
setCurrentBatch(existingBatch);
}
};
} catch (SystemException | InvalidTransactionException e) {
Expand All @@ -166,7 +178,7 @@ public BatchContext resumeBatch(TransactionBatch batch) {
@Override
public TransactionBatch suspendBatch() {
if (this.tm == null) return NON_TX_BATCH;
TransactionBatch batch = CURRENT_BATCH.get();
TransactionBatch batch = getCurrentBatch();
if (batch != null) {
try {
Transaction tx = this.tm.suspend();
Expand All @@ -176,7 +188,7 @@ public TransactionBatch suspendBatch() {
} catch (SystemException e) {
throw new CacheException(e);
} finally {
CURRENT_BATCH.remove();
setCurrentBatch(null);
}
}
return batch;
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.junit.After;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.wildfly.clustering.ee.Batch;
import org.wildfly.clustering.ee.BatchContext;
import org.wildfly.clustering.ee.Batcher;

Expand All @@ -45,14 +46,15 @@ public class InfinispanBatcherTestCase {

@After
public void destroy() {
InfinispanBatcher.CURRENT_BATCH.remove();
InfinispanBatcher.setCurrentBatch(null);
}

@Test
public void createExistingBatch() throws Exception {
public void createExistingActiveBatch() throws Exception {
TransactionBatch existingBatch = mock(TransactionBatch.class);

InfinispanBatcher.CURRENT_BATCH.set(existingBatch);
InfinispanBatcher.setCurrentBatch(existingBatch);
when(existingBatch.getState()).thenReturn(Batch.State.ACTIVE);
when(existingBatch.interpose()).thenReturn(existingBatch);

TransactionBatch result = this.batcher.createBatch();
Expand All @@ -63,6 +65,33 @@ public void createExistingBatch() throws Exception {
assertSame(existingBatch, result);
}

@Test
public void createExistingClosedBatch() throws Exception {
TransactionBatch existingBatch = mock(TransactionBatch.class);
Transaction tx = mock(Transaction.class);
ArgumentCaptor<Synchronization> capturedSync = ArgumentCaptor.forClass(Synchronization.class);

InfinispanBatcher.setCurrentBatch(existingBatch);
when(existingBatch.getState()).thenReturn(Batch.State.CLOSED);

when(this.tm.getTransaction()).thenReturn(tx);

try (TransactionBatch batch = this.batcher.createBatch()) {
verify(this.tm).begin();
verify(tx).registerSynchronization(capturedSync.capture());

assertSame(tx, batch.getTransaction());
assertSame(batch, InfinispanBatcher.getCurrentBatch());
} finally {
capturedSync.getValue().afterCompletion(Status.STATUS_COMMITTED);
}

verify(tx).commit();

assertNull(InfinispanBatcher.getCurrentBatch());
}


@Test
public void createBatchClose() throws Exception {
Transaction tx = mock(Transaction.class);
Expand All @@ -81,7 +110,7 @@ public void createBatchClose() throws Exception {

verify(tx).commit();

assertNull(InfinispanBatcher.CURRENT_BATCH.get());
assertNull(InfinispanBatcher.getCurrentBatch());
}

@Test
Expand All @@ -105,7 +134,7 @@ public void createBatchDiscard() throws Exception {
verify(tx, never()).commit();
verify(tx).rollback();

assertNull(InfinispanBatcher.CURRENT_BATCH.get());
assertNull(InfinispanBatcher.getCurrentBatch());
}

@Test
Expand Down Expand Up @@ -138,7 +167,7 @@ public void createNestedBatchClose() throws Exception {
verify(tx, never()).rollback();
verify(tx).commit();

assertNull(InfinispanBatcher.CURRENT_BATCH.get());
assertNull(InfinispanBatcher.getCurrentBatch());
}

@Test
Expand Down Expand Up @@ -173,7 +202,7 @@ public void createNestedBatchDiscard() throws Exception {
verify(tx).rollback();
verify(tx, never()).commit();

assertNull(InfinispanBatcher.CURRENT_BATCH.get());
assertNull(InfinispanBatcher.getCurrentBatch());
}

@SuppressWarnings("resource")
Expand Down Expand Up @@ -211,7 +240,7 @@ public void createOverlappingBatchClose() throws Exception {
verify(tx, never()).rollback();
verify(tx).commit();

assertNull(InfinispanBatcher.CURRENT_BATCH.get());
assertNull(InfinispanBatcher.getCurrentBatch());
}

@SuppressWarnings("resource")
Expand Down Expand Up @@ -251,34 +280,34 @@ public void createOverlappingBatchDiscard() throws Exception {
verify(tx).rollback();
verify(tx, never()).commit();

assertNull(InfinispanBatcher.CURRENT_BATCH.get());
assertNull(InfinispanBatcher.getCurrentBatch());
}

@Test
public void resumeNullBatch() throws Exception {
TransactionBatch batch = mock(TransactionBatch.class);
InfinispanBatcher.CURRENT_BATCH.set(batch);
InfinispanBatcher.setCurrentBatch(batch);

try (BatchContext context = this.batcher.resumeBatch(null)) {
verifyZeroInteractions(this.tm);
assertNull(InfinispanBatcher.CURRENT_BATCH.get());
assertNull(InfinispanBatcher.getCurrentBatch());
}
verifyZeroInteractions(this.tm);
assertSame(batch, InfinispanBatcher.CURRENT_BATCH.get());
assertSame(batch, InfinispanBatcher.getCurrentBatch());
}

@Test
public void resumeNonTxBatch() throws Exception {
TransactionBatch existingBatch = mock(TransactionBatch.class);
InfinispanBatcher.CURRENT_BATCH.set(existingBatch);
InfinispanBatcher.setCurrentBatch(existingBatch);
TransactionBatch batch = mock(TransactionBatch.class);

try (BatchContext context = this.batcher.resumeBatch(batch)) {
verifyZeroInteractions(this.tm);
assertSame(batch, InfinispanBatcher.CURRENT_BATCH.get());
assertSame(batch, InfinispanBatcher.getCurrentBatch());
}
verifyZeroInteractions(this.tm);
assertSame(existingBatch, InfinispanBatcher.CURRENT_BATCH.get());
assertSame(existingBatch, InfinispanBatcher.getCurrentBatch());
}

@Test
Expand All @@ -293,20 +322,20 @@ public void resumeBatch() throws Exception {
verify(this.tm).resume(tx);
reset(this.tm);

assertSame(batch, InfinispanBatcher.CURRENT_BATCH.get());
assertSame(batch, InfinispanBatcher.getCurrentBatch());
}

verify(this.tm).suspend();
verify(this.tm, never()).resume(any());

assertNull(InfinispanBatcher.CURRENT_BATCH.get());
assertNull(InfinispanBatcher.getCurrentBatch());
}

@Test
public void resumeBatchExisting() throws Exception {
TransactionBatch existingBatch = mock(TransactionBatch.class);
Transaction existingTx = mock(Transaction.class);
InfinispanBatcher.CURRENT_BATCH.set(existingBatch);
InfinispanBatcher.setCurrentBatch(existingBatch);
TransactionBatch batch = mock(TransactionBatch.class);
Transaction tx = mock(Transaction.class);

Expand All @@ -318,27 +347,27 @@ public void resumeBatchExisting() throws Exception {
verify(this.tm).resume(tx);
reset(this.tm);

assertSame(batch, InfinispanBatcher.CURRENT_BATCH.get());
assertSame(batch, InfinispanBatcher.getCurrentBatch());

when(this.tm.suspend()).thenReturn(tx);
}

verify(this.tm).resume(existingTx);

assertSame(existingBatch, InfinispanBatcher.CURRENT_BATCH.get());
assertSame(existingBatch, InfinispanBatcher.getCurrentBatch());
}

@Test
public void suspendBatch() throws Exception {
TransactionBatch batch = mock(TransactionBatch.class);
InfinispanBatcher.CURRENT_BATCH.set(batch);
InfinispanBatcher.setCurrentBatch(batch);

TransactionBatch result = this.batcher.suspendBatch();

verify(this.tm).suspend();

assertSame(batch, result);
assertNull(InfinispanBatcher.CURRENT_BATCH.get());
assertNull(InfinispanBatcher.getCurrentBatch());
}

@Test
Expand Down
Expand Up @@ -149,12 +149,16 @@ public Group getGroup() {
@Override
public Map<K, V> getEntries() {
Set<Node> nodes = this.group.getNodes().stream().collect(Collectors.toSet());
return this.cache.getAdvancedCache().getAll(nodes).values().stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
try (Batch batch = this.batcher.createBatch()) {
return this.cache.getAdvancedCache().getAll(nodes).values().stream().collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
}
}

@Override
public Map.Entry<K, V> getEntry(Node node) {
return this.cache.get(node);
try (Batch batch = this.batcher.createBatch()) {
return this.cache.get(node);
}
}

@TopologyChanged
Expand Down
10 changes: 10 additions & 0 deletions ejb3/src/main/java/org/jboss/as/ejb3/cache/Contextual.java
Expand Up @@ -38,4 +38,14 @@ public interface Contextual<C> {
* @param context a cache context
*/
void setCacheContext(C context);

/**
* Removes any cache context associated with this cached object
* @return the removed cache context
*/
default C removeCacheContext() {
C context = this.getCacheContext();
this.setCacheContext(null);
return context;
}
}

0 comments on commit 4d85b99

Please sign in to comment.