Skip to content

Commit

Permalink
ISPN-5232 Entry iterator returns entries created in current TX but does
Browse files Browse the repository at this point in the history
not apply the filter/converter

* Transaction context values now check filter/converter
* Fixed issue when multiple context values were present we only chose 1
  • Loading branch information
wburns authored and anistor committed Mar 2, 2015
1 parent 44dd5bc commit cc502f4
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 36 deletions.
Expand Up @@ -45,4 +45,8 @@ public EntryIterable<K, V> perform(InvocationContext ctx) throws Throwable {
return new EntryIterableImpl<>(retriever, filter, flags != null ? EnumSet.copyOf(flags) :
EnumSet.noneOf(Flag.class), cache);
}

public KeyValueFilter<K, V> getFilter() {
return filter;
}
}
Expand Up @@ -325,7 +325,8 @@ public EntryIterable visitEntryRetrievalCommand(InvocationContext ctx, EntryRetr
// command and the iterator itself does not place read values into the context.
EntryIterable iterable = (EntryIterable) super.visitEntryRetrievalCommand(ctx, command);
if (ctx.isInTxScope()) {
return new TransactionAwareEntryIterable(iterable, (TxInvocationContext<LocalTransaction>) ctx, cache);
return new TransactionAwareEntryIterable(iterable, command.getFilter(),
(TxInvocationContext<LocalTransaction>) ctx, cache);
} else {
return iterable;
}
Expand Down
Expand Up @@ -13,15 +13,15 @@
* @author wburns
* @since 7.0
*/
public class RemovableEntryIterator<K, C> implements CloseableIterator<CacheEntry<K, C>> {
protected final CloseableIterator<CacheEntry<K, C>> realIterator;
protected final Cache<K, ?> cache;
public class RemovableEntryIterator<K, V, C> implements CloseableIterator<CacheEntry<K, C>> {
protected final CloseableIterator<CacheEntry<K, V>> realIterator;
protected final Cache<K, V> cache;

protected CacheEntry<K, C> previousValue;
protected CacheEntry<K, C> currentValue;

public RemovableEntryIterator(CloseableIterator<CacheEntry<K, C>> realIterator,
Cache<K, ?> cache, boolean initIterator) {
public RemovableEntryIterator(CloseableIterator<CacheEntry<K, V>> realIterator,
Cache<K, V> cache, boolean initIterator) {
this.realIterator = realIterator;
this.cache = cache;
if (initIterator) {
Expand All @@ -31,7 +31,7 @@ public RemovableEntryIterator(CloseableIterator<CacheEntry<K, C>> realIterator,

protected CacheEntry<K, C> getNextFromIterator() {
if (realIterator.hasNext()) {
return realIterator.next();
return (CacheEntry<K, C>) realIterator.next();
} else {
return null;
}
Expand Down
Expand Up @@ -5,6 +5,8 @@
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.filter.Converter;
import org.infinispan.filter.KeyValueFilter;
import org.infinispan.transaction.impl.LocalTransaction;

/**
Expand All @@ -14,16 +16,22 @@
* @author wburns
* @since 7.0
*/
public class TransactionAwareCloseableIterable<K, C> implements CloseableIterable<CacheEntry<K, C>> {
public class TransactionAwareCloseableIterable<K, V, C> implements CloseableIterable<CacheEntry<K, C>> {
protected final CloseableIterable<CacheEntry<K, C>> iterable;
protected final TxInvocationContext<LocalTransaction> ctx;
protected final Cache<K, ?> cache;
protected final KeyValueFilter<? super K, ? super V> filter;
protected final Converter<? super K, ? super V, ? extends C> converter;

public TransactionAwareCloseableIterable(CloseableIterable<CacheEntry<K, C>> iterable,
TxInvocationContext<LocalTransaction> ctx, Cache<K, ?> cache) {
public TransactionAwareCloseableIterable(CloseableIterable<CacheEntry<K, C>> iterable,
KeyValueFilter<? super K, ? super V> filter,
Converter<? super K, ? super V, ? extends C> converter,
TxInvocationContext<LocalTransaction> ctx, Cache<K, ?> cache) {
this.iterable = iterable;
this.ctx = ctx;
this.cache = cache;
this.filter = filter;
this.converter = converter;
}

@Override
Expand All @@ -33,6 +41,7 @@ public void close() {

@Override
public CloseableIterator<CacheEntry<K, C>> iterator() {
return new TransactionAwareCloseableIterator(iterable.iterator(), ctx, cache);
return new TransactionAwareCloseableIterator(iterable.iterator(), filter, converter,
ctx, cache);
}
}
Expand Up @@ -2,8 +2,12 @@

import org.infinispan.Cache;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.container.InternalEntryFactory;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.filter.Converter;
import org.infinispan.filter.KeyValueFilter;
import org.infinispan.filter.KeyValueFilterConverter;
import org.infinispan.transaction.impl.LocalTransaction;

import java.util.ArrayList;
Expand All @@ -19,45 +23,71 @@
* @author wburns
* @since 7.0
*/
public class TransactionAwareCloseableIterator<K, C> extends RemovableEntryIterator<K, C> {
public class TransactionAwareCloseableIterator<K, V, C> extends RemovableEntryIterator<K, V, C> {
private final TxInvocationContext<LocalTransaction> ctx;
// We store all the not yet seen context entries here. We rely on the fact that the cache entry reference is updated
// if a change occurs in between iterations to see updates.
private final List<CacheEntry> contextEntries;
private final Set<Object> seenContextKeys = new HashSet<>();
private final KeyValueFilter<? super K, ? super V> filter;
private final Converter<? super K, ? super V, ? extends C> converter;
private final InternalEntryFactory entryFactory;

public TransactionAwareCloseableIterator(CloseableIterator<CacheEntry<K, C>> realIterator,
TxInvocationContext<LocalTransaction> ctx, Cache<K, ?> cache) {
public TransactionAwareCloseableIterator(CloseableIterator<CacheEntry<K, V>> realIterator,
KeyValueFilter<? super K, ? super V> filter,
Converter<? super K, ? super V, ? extends C> converter,
TxInvocationContext<LocalTransaction> ctx, Cache<K, V> cache) {
super(realIterator, cache, false);
this.ctx = ctx;
this.filter = filter;
this.converter = checkForKeyValueFilterConverter(filter, converter);
this.entryFactory = cache.getAdvancedCache().getComponentRegistry().getComponent(
InternalEntryFactory.class);
contextEntries = new ArrayList<>(ctx.getLookedUpEntries().values());
currentValue = getNextFromIterator();
}

protected CacheEntry<K, V> filterEntry(CacheEntry<K, V> entry) {
if (converter == null && filter instanceof KeyValueFilterConverter) {
K key = entry.getKey();
C converted = ((KeyValueFilterConverter<K, V, C>)filter).filterAndConvert(
key, entry.getValue(), entry.getMetadata());
if (converted != null) {
entry = entryFactory.create(entry);
entry.setValue((V) converted);
return entry;
}
} else if (filter == null ||
filter.accept(entry.getKey(), entry.getValue(), entry.getMetadata())) {
return entry;
}
return null;
}

@Override
protected CacheEntry<K, C> getNextFromIterator() {
CacheEntry<K, C> returnedEntry = null;
CacheEntry<K, V> returnedEntry = null;
// We first have to exhaust all of our context entries
CacheEntry<K, C> entry;
while (!contextEntries.isEmpty() && (entry = contextEntries.remove(0)) != null) {
CacheEntry<K, V> entry;
while (returnedEntry == null && !contextEntries.isEmpty() &&
(entry = contextEntries.remove(0)) != null) {
seenContextKeys.add(entry.getKey());
if (!ctx.isEntryRemovedInContext(entry.getKey())) {
returnedEntry = entry;
returnedEntry = filterEntry(entry);
}
}
if (returnedEntry == null) {
while (realIterator.hasNext()) {
CacheEntry<K, C> iteratedEntry = realIterator.next();
CacheEntry<K, V> iteratedEntry = realIterator.next();
CacheEntry contextEntry;
// If the value was in the context then we ignore the stored value since we use the context value
if ((contextEntry = ctx.lookupEntry(iteratedEntry.getKey())) != null) {
if (seenContextKeys.add(contextEntry.getKey()) && !contextEntry.isRemoved()) {
returnedEntry = contextEntry;
if (seenContextKeys.add(contextEntry.getKey()) && !contextEntry.isRemoved() &&
(returnedEntry = filterEntry(contextEntry)) != null) {
break;
}

} else {
returnedEntry = iteratedEntry;
} else if ((returnedEntry = filterEntry(iteratedEntry)) != null) {
break;
}
}
Expand All @@ -75,6 +105,24 @@ protected CacheEntry<K, C> getNextFromIterator() {
}
}
}
return returnedEntry;
if (returnedEntry != null && converter != null) {
C newValue = converter.convert(returnedEntry.getKey(), returnedEntry.getValue(),
returnedEntry.getMetadata());
returnedEntry = entryFactory.create(returnedEntry);
returnedEntry.setValue((V) newValue);
}
return (CacheEntry<K, C>) returnedEntry;
}

protected <C> Converter<? super K, ? super V, ? extends C> checkForKeyValueFilterConverter(
KeyValueFilter<? super K, ? super V> filter, Converter<? super K, ? super V, ? extends C> converter) {
Converter<? super K, ? super V, ? extends C> usedConverter;
if (filter == converter && filter instanceof KeyValueFilterConverter) {
// If we were supplied an efficient KeyValueFilterConverter don't use a converter!
usedConverter = null;
} else {
usedConverter = converter;
}
return usedConverter;
}
}
Expand Up @@ -5,6 +5,7 @@
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.filter.Converter;
import org.infinispan.filter.KeyValueFilter;
import org.infinispan.iteration.EntryIterable;
import org.infinispan.transaction.impl.LocalTransaction;

Expand All @@ -14,17 +15,19 @@
* @author wburns
* @since 7.0
*/
public class TransactionAwareEntryIterable<K, V> extends TransactionAwareCloseableIterable<K, V> implements EntryIterable<K, V> {
public class TransactionAwareEntryIterable<K, V> extends TransactionAwareCloseableIterable<K, V, V> implements EntryIterable<K, V> {
private final EntryIterable<K, V> entryIterable;

public TransactionAwareEntryIterable(EntryIterable<K, V> entryIterable, TxInvocationContext<LocalTransaction> ctx,
public TransactionAwareEntryIterable(EntryIterable<K, V> entryIterable,
KeyValueFilter<? super K, ? super V> filter, TxInvocationContext<LocalTransaction> ctx,
Cache<K, V> cache) {
super(entryIterable, ctx, cache);
super(entryIterable, filter, null, ctx, cache);
this.entryIterable = entryIterable;
}

@Override
public <C> CloseableIterable<CacheEntry<K, C>> converter(Converter<? super K, ? super V, ? extends C> converter) {
return new TransactionAwareCloseableIterable<>(entryIterable.converter(converter), ctx, cache);
return new TransactionAwareCloseableIterable<>(entryIterable.converter(converter),
filter, converter, ctx, cache);
}
}
39 changes: 37 additions & 2 deletions core/src/test/java/org/infinispan/api/CacheAPITest.java
Expand Up @@ -183,7 +183,40 @@ public void testEntrySetEqualityInTx(Method m) throws Exception {
}
}

public void testEntrySetIterationInTx(Method m) throws Exception {
public void testEntrySetIterationBeforeInTx(Method m) throws Exception {
Map<Integer, String> dataIn = new HashMap<Integer, String>();
dataIn.put(1, v(m, 1));
dataIn.put(2, v(m, 2));

cache.putAll(dataIn);

Map<Object, Object> foundValues = new HashMap<>();
TransactionManager tm = cache.getAdvancedCache().getTransactionManager();
tm.begin();
try {
Set<Entry<Object, Object>> entries = cache.entrySet();

// Add an entry within tx
cache.put(3, v(m, 3));
cache.put(4, v(m, 4));

Iterator<Entry<Object, Object>> itr = entries.iterator();

while (itr.hasNext()) {
Entry<Object, Object> entry = itr.next();
foundValues.put(entry.getKey(), entry.getValue());
}
} finally {
tm.rollback();
}
assertEquals(4, foundValues.size());
assertEquals(v(m, 1), foundValues.get(1));
assertEquals(v(m, 2), foundValues.get(2));
assertEquals(v(m, 3), foundValues.get(3));
assertEquals(v(m, 4), foundValues.get(4));
}

public void testEntrySetIterationAfterInTx(Method m) throws Exception {
Map<Integer, String> dataIn = new HashMap<Integer, String>();
dataIn.put(1, v(m, 1));
dataIn.put(2, v(m, 2));
Expand All @@ -200,6 +233,7 @@ public void testEntrySetIterationInTx(Method m) throws Exception {

// Add an entry within tx
cache.put(3, v(m, 3));
cache.put(4, v(m, 4));

while (itr.hasNext()) {
Entry<Object, Object> entry = itr.next();
Expand All @@ -208,10 +242,11 @@ public void testEntrySetIterationInTx(Method m) throws Exception {
} finally {
tm.rollback();
}
assertEquals(3, foundValues.size());
assertEquals(4, foundValues.size());
assertEquals(v(m, 1), foundValues.get(1));
assertEquals(v(m, 2), foundValues.get(2));
assertEquals(v(m, 3), foundValues.get(3));
assertEquals(v(m, 4), foundValues.get(4));
}

public void testRollbackAfterPut() throws Exception {
Expand Down
Expand Up @@ -4,20 +4,16 @@
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.TransientMortalCacheEntry;
import org.infinispan.distribution.MagicKey;
import org.infinispan.filter.CollectionKeyFilter;
import org.infinispan.filter.KeyFilterAsKeyValueFilter;
import org.infinispan.iteration.impl.EntryRetriever;
import org.testng.annotations.Test;

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;

Expand Down
@@ -1,6 +1,5 @@
package org.infinispan.iteration;

import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.container.entries.CacheEntry;
Expand Down
Expand Up @@ -56,7 +56,11 @@
@Test(groups = {"functional", "smoke"}, testName = "iteration.DistributedEntryRetrieverTest")
public class DistributedEntryRetrieverTest extends BaseClusteredEntryRetrieverTest {
public DistributedEntryRetrieverTest() {
super(false, CacheMode.DIST_SYNC);
this(false);
}

public DistributedEntryRetrieverTest(boolean tx) {
super(tx, CacheMode.DIST_SYNC);
// This is needed since we kill nodes
cleanup = CleanupPhase.AFTER_METHOD;
}
Expand Down

0 comments on commit cc502f4

Please sign in to comment.