Skip to content

Commit

Permalink
ISPN-8500 Don't return expired entries from off heap container
Browse files Browse the repository at this point in the history
  • Loading branch information
wburns committed Nov 15, 2017
1 parent 76d53da commit 0e0ae79
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 15 deletions.
Expand Up @@ -553,8 +553,9 @@ public void executeTask(final KeyFilter<? super K> filter, final BiConsumer<? su
if (action == null)
throw new IllegalArgumentException("No action specified");

long now = timeService.wallClockTime();
entries.forEach((K key, InternalCacheEntry<K, V> value) -> {
if (filter.accept(key)) {
if (filter.accept(key) && !value.isExpired(now)) {
action.accept(key, value);
}
});
Expand All @@ -572,8 +573,9 @@ public void executeTask(final KeyValueFilter<? super K, ? super V> filter, final
if (action == null)
throw new IllegalArgumentException("No action specified");

long now = timeService.wallClockTime();
entries.forEach((K key, InternalCacheEntry<K, V> value) -> {
if (filter.accept(key, value.getValue(), value.getMetadata())) {
if (filter.accept(key, value.getValue(), value.getMetadata()) && !value.isExpired(now)) {
action.accept(key, value);
}
});
Expand Down
Expand Up @@ -158,7 +158,7 @@ private InternalCacheEntry<WrappedBytes, WrappedBytes> peekOrGet(Object k, boole
}

/**
* Gets the actual address for the given key in the given bucket or 0 if it isn't present
* Gets the actual address for the given key in the given bucket or 0 if it isn't present or expired
* @param bucketHeadAddress the starting address of the address hash
* @param k the key to retrieve the address for it if matches
* @return the address matching the key or 0
Expand All @@ -169,12 +169,15 @@ protected long performGet(long bucketHeadAddress, Object k) {
while (address != 0) {
long nextAddress = offHeapEntryFactory.getNext(address);
if (offHeapEntryFactory.equalsKey(address, wrappedKey)) {
return address;
if (offHeapEntryFactory.isExpired(address)) {
address = 0;
}
break;
} else {
address = nextAddress;
}
}
return 0;
return address;
}

@Override
Expand Down Expand Up @@ -302,7 +305,7 @@ public boolean containsKey(Object k) {
while (address != 0) {
long nextAddress = offHeapEntryFactory.getNext(address);
if (offHeapEntryFactory.equalsKey(address, wba)) {
return true;
return !offHeapEntryFactory.isExpired(address);
}
address = nextAddress;
}
Expand Down Expand Up @@ -361,7 +364,7 @@ protected InternalCacheEntry<WrappedBytes, WrappedBytes> performRemove(long buck
// If the actualAddress was not known check key equality otherwise just compare with the address
removeThisAddress = actualAddress == 0 ? offHeapEntryFactory.equalsKey(address, wba) : actualAddress == address;
if (removeThisAddress) {
if (requireReturn) {
if (requireReturn && !offHeapEntryFactory.isExpired(address)) {
ice = offHeapEntryFactory.fromMemory(address);
}
entryRemoved(address);
Expand Down Expand Up @@ -570,12 +573,15 @@ private void executeTask(Consumer<InternalCacheEntry<WrappedBytes, WrappedBytes>
lock.lock();
try {
checkDeallocation();
long now = timeService.wallClockTime();
for (int j = i; j < memoryAddressCount; j += lockCount) {
long address = memoryLookup.getMemoryAddressOffset(j);
while (address != 0) {
long nextAddress = offHeapEntryFactory.getNext(address);
InternalCacheEntry<WrappedBytes, WrappedBytes> ice = offHeapEntryFactory.fromMemory(address);
consumer.accept(ice);
if (!ice.isExpired(now)) {
consumer.accept(ice);
}
address = nextAddress;
}
}
Expand Down Expand Up @@ -605,7 +611,7 @@ public void executeTask(KeyValueFilter<? super WrappedBytes, ? super WrappedByte
});
}

private Stream<InternalCacheEntry<WrappedBytes, WrappedBytes>> entryStream() {
private Stream<InternalCacheEntry<WrappedBytes, WrappedBytes>> entryStreamIncludingExpired() {
return IntStream.range(0, memoryAddressCount)
.mapToObj(a -> {
Lock lock = locks.getLockWithOffset(a % lockCount).readLock();
Expand All @@ -629,14 +635,18 @@ private Stream<InternalCacheEntry<WrappedBytes, WrappedBytes>> entryStream() {
}).flatMap(Function.identity());
}

private Stream<InternalCacheEntry<WrappedBytes, WrappedBytes>> entryStream() {
long now = timeService.wallClockTime();
return entryStreamIncludingExpired().filter(e -> !e.isExpired(now));
}

@Override
public Iterator<InternalCacheEntry<WrappedBytes, WrappedBytes>> iterator() {
long time = timeService.wallClockTime();
return entryStream().filter(e -> !e.isExpired(time)).iterator();
return entryStream().iterator();
}

@Override
public Iterator<InternalCacheEntry<WrappedBytes, WrappedBytes>> iteratorIncludingExpired() {
return entryStream().iterator();
return entryStreamIncludingExpired().iterator();
}
}
Expand Up @@ -63,4 +63,12 @@ public interface OffHeapEntryFactory {
* @return whether or not the keys are equal
*/
boolean equalsKey(long address, WrappedBytes wrappedBytes);


/**
* Returns whether entry is expired or not.
* @param address the address of the entry's key to check
* @return {@code true} if the entry is expired, {@code false} otherwise
*/
boolean isExpired(long address);
}
Expand Up @@ -8,6 +8,7 @@
import org.infinispan.commons.marshall.WrappedBytes;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.InternalEntryFactory;
import org.infinispan.container.entries.ExpiryHelper;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.factories.annotations.Inject;
Expand Down Expand Up @@ -359,6 +360,83 @@ public boolean equalsKey(long address, WrappedBytes wrappedBytes) {
return true;
}

/**
* Returns whether entry is expired or not.
* @param address the address of the entry's key to check
* @return {@code true} if the entry is expired, {@code false} otherwise
*/
@Override
public boolean isExpired(long address) {
// 16 bytes for eviction if needed (optional)
// 8 bytes for linked pointer
int offset = evictionEnabled ? 24 : 8;

byte metadataType = MEMORY.getByte(address, offset);
if ((metadataType & IMMORTAL) != 0) {
return false;
}
// type
offset += 1;
// hashCode
offset += 4;
// key length
int keyLength = MEMORY.getInt(address, offset);
offset += 4;

long now = timeService.wallClockTime();

byte[] metadataBytes;
if ((metadataType & CUSTOM) == CUSTOM) {
// TODO: this needs to be fixed in ISPN-8539
return false;
// int metadataLength = MEMORY.getInt(address, offset);
// metadataBytes = new byte[metadataLength];
//
// // value and keyLength
// offset += 4 + keyLength;
//
// MEMORY.getBytes(address, offset, metadataBytes, 0, metadataBytes.length);
//
// Metadata metadata;
// try {
// metadata = (Metadata) marshaller.objectFromByteBuffer(metadataBytes);
// // TODO: custom metadata is not implemented properly for expiration
// return false;
// } catch (IOException | ClassNotFoundException e) {
// throw new CacheException(e);
// }
} else {
// value and keyLength
offset += 4 + keyLength;

// We don't actually care if it HAS_VERSION, just have to offset to read in the type properly
if ((metadataType & HAS_VERSION) == HAS_VERSION) {
// HAS_VERSION sets the metadata size
offset += 4;
}
switch (metadataType & 0xFC) {
case MORTAL:
metadataBytes = new byte[16];
MEMORY.getBytes(address, offset, metadataBytes, 0, metadataBytes.length);
return ExpiryHelper.isExpiredMortal(Bits.getLong(metadataBytes, 0), Bits.getLong(metadataBytes, 8), now);
case TRANSIENT:
metadataBytes = new byte[16];
MEMORY.getBytes(address, offset, metadataBytes, 0, metadataBytes.length);
return ExpiryHelper.isExpiredTransient(Bits.getLong(metadataBytes, 0), Bits.getLong(metadataBytes, 8), now);
case TRANSIENT_MORTAL:
metadataBytes = new byte[32];
MEMORY.getBytes(address, offset, metadataBytes, 0, metadataBytes.length);
long lifespan = Bits.getLong(metadataBytes, 0);
long maxIdle = Bits.getLong(metadataBytes, 8);
long created = Bits.getLong(metadataBytes, 16);
long lastUsed = Bits.getLong(metadataBytes, 24);
return ExpiryHelper.isExpiredTransientMortal(maxIdle, lastUsed, lifespan, created, now);
default:
return false;
}
}
}

static private boolean requiresMetadataSize(byte type) {
return (type & (CUSTOM | HAS_VERSION)) != 0;
}
Expand Down
@@ -1,10 +1,14 @@
package org.infinispan.expiration.impl;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNull;

import java.util.concurrent.TimeUnit;

import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.StorageType;
import org.infinispan.filter.KeyFilter;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.SingleCacheManagerTest;
import org.infinispan.test.TestingUtil;
Expand All @@ -18,6 +22,15 @@ public class ExpirationFunctionalTest extends SingleCacheManagerTest {

protected final int SIZE = 10;
protected ControlledTimeService timeService = new ControlledTimeService();
protected StorageType storage;

public Object[] factory() {
return new Object[]{
new ExpirationFunctionalTest().withStorage(StorageType.BINARY),
new ExpirationFunctionalTest().withStorage(StorageType.OBJECT),
new ExpirationFunctionalTest().withStorage(StorageType.OFF_HEAP)
};
}

protected EmbeddedCacheManager createCacheManager() throws Exception {
ConfigurationBuilder builder = TestCacheManagerFactory.getDefaultCacheConfiguration(false);
Expand All @@ -34,15 +47,25 @@ protected EmbeddedCacheManager createCacheManager(ConfigurationBuilder builder)
}

protected void configure(ConfigurationBuilder config) {
config.expiration().disableReaper();
config
.expiration().disableReaper()
.memory().storageType(storage);
}

protected void afterCacheCreated(EmbeddedCacheManager cm) {

}

public void testSimpleExpirationLifespan() throws Exception {
public ExpirationFunctionalTest withStorage(StorageType storage) {
this.storage = storage;
return this;
}

public StorageType getStorageType() {
return storage;
}

public void testSimpleExpirationLifespan() throws Exception {
for (int i = 0; i < SIZE; i++) {
cache.put("key-" + i, "value-" + i, 1, TimeUnit.MILLISECONDS);
}
Expand All @@ -51,11 +74,58 @@ public void testSimpleExpirationLifespan() throws Exception {
}

public void testSimpleExpirationMaxIdle() throws Exception {

for (int i = 0; i < SIZE; i++) {
cache.put("key-" + i, "value-" + i,-1, null, 1, TimeUnit.MILLISECONDS);
}
timeService.advance(2);
assertEquals(0, cache.size());
}

public void testExpirationLifespanInOps() throws Exception {
for (int i = 0; i < SIZE; i++) {
cache.put("key-" + i, "value-" + i, 1, TimeUnit.MILLISECONDS);
}
timeService.advance(2);

for (int i = 0; i < SIZE; i++) {
assertFalse(cache.containsKey("key-" + 1));
assertNull(cache.get("key-" + i));
assertNull(cache.remove("key-" + i));
}
}

public void testExpirationMaxIdleInOps() throws Exception {
for (int i = 0; i < SIZE; i++) {
cache.put("key-" + i, "value-" + i,-1, null, 1, TimeUnit.MILLISECONDS);
}
timeService.advance(2);

for (int i = 0; i < SIZE; i++) {
assertFalse(cache.containsKey("key-" + 1));
assertNull(cache.get("key-" + i));
assertNull(cache.remove("key-" + i));
}
}

public void testExpirationLifespanInExec() throws Exception {
for (int i = 0; i < SIZE; i++) {
cache.put("key-" + i, "value-" + i, 1, TimeUnit.MILLISECONDS);
}
timeService.advance(2);

for (int i = 0; i < SIZE; i++) {
cache.getAdvancedCache().getDataContainer().executeTask(KeyFilter.ACCEPT_ALL_FILTER, (k, ice) -> { throw new RuntimeException("No task should be executed on expired entry"); });
}
}

public void testExpirationMaxIdleInExec() throws Exception {
for (int i = 0; i < SIZE; i++) {
cache.put("key-" + i, "value-" + i,-1, null, 1, TimeUnit.MILLISECONDS);
}
timeService.advance(2);

for (int i = 0; i < SIZE; i++) {
cache.getAdvancedCache().getDataContainer().executeTask(KeyFilter.ACCEPT_ALL_FILTER, (k, ice) -> { throw new RuntimeException("No task should be executed on expired entry"); });
}
}
}

0 comments on commit 0e0ae79

Please sign in to comment.