From 7c65215c71016a0241ca02f10fbb05295ef3d424 Mon Sep 17 00:00:00 2001 From: William Burns Date: Tue, 14 Nov 2017 17:11:04 -0500 Subject: [PATCH] ISPN-8500 Don't return expired entries from off heap container --- .../container/DefaultDataContainer.java | 6 +- .../offheap/OffHeapDataContainer.java | 30 ++++--- .../offheap/OffHeapEntryFactory.java | 8 ++ .../offheap/OffHeapEntryFactoryImpl.java | 73 ++++++++++++++++ .../impl/ExpirationFunctionalTest.java | 83 ++++++++++++++++++- 5 files changed, 185 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/infinispan/container/DefaultDataContainer.java b/core/src/main/java/org/infinispan/container/DefaultDataContainer.java index aa75f4275a06..374e1d1cf166 100644 --- a/core/src/main/java/org/infinispan/container/DefaultDataContainer.java +++ b/core/src/main/java/org/infinispan/container/DefaultDataContainer.java @@ -553,8 +553,9 @@ public void executeTask(final KeyFilter filter, final BiConsumer value) -> { - if (filter.accept(key)) { + if (filter.accept(key) && !value.isExpired(now)) { action.accept(key, value); } }); @@ -572,8 +573,9 @@ public void executeTask(final KeyValueFilter filter, final if (action == null) throw new IllegalArgumentException("No action specified"); + long now = timeService.wallClockTime(); entries.forEach((K key, InternalCacheEntry value) -> { - if (filter.accept(key, value.getValue(), value.getMetadata())) { + if (filter.accept(key, value.getValue(), value.getMetadata()) && !value.isExpired(now)) { action.accept(key, value); } }); diff --git a/core/src/main/java/org/infinispan/container/offheap/OffHeapDataContainer.java b/core/src/main/java/org/infinispan/container/offheap/OffHeapDataContainer.java index c3031280b041..c3e1b573a846 100644 --- a/core/src/main/java/org/infinispan/container/offheap/OffHeapDataContainer.java +++ b/core/src/main/java/org/infinispan/container/offheap/OffHeapDataContainer.java @@ -161,7 +161,7 @@ private InternalCacheEntry peekOrGet(Object k, boole } /** - * Gets the actual address for the given key in the given bucket or 0 if it isn't present + * Gets the actual address for the given key in the given bucket or 0 if it isn't present or expired * @param bucketHeadAddress the starting address of the address hash * @param k the key to retrieve the address for it if matches * @return the address matching the key or 0 @@ -172,12 +172,15 @@ protected long performGet(long bucketHeadAddress, Object k) { while (address != 0) { long nextAddress = offHeapEntryFactory.getNext(address); if (offHeapEntryFactory.equalsKey(address, wrappedKey)) { - return address; + if (offHeapEntryFactory.isExpired(address)) { + address = 0; + } + break; } else { address = nextAddress; } } - return 0; + return address; } @Override @@ -309,7 +312,7 @@ public boolean containsKey(Object k) { while (address != 0) { long nextAddress = offHeapEntryFactory.getNext(address); if (offHeapEntryFactory.equalsKey(address, wba)) { - return true; + return !offHeapEntryFactory.isExpired(address); } address = nextAddress; } @@ -372,7 +375,7 @@ protected InternalCacheEntry performRemove(long buck // If the actualAddress was not known, check key equality otherwise just compare with the address removeThisAddress = actualAddress == 0 ? offHeapEntryFactory.equalsKey(address, wba) : actualAddress == address; if (removeThisAddress) { - if (requireReturn) { + if (requireReturn && !offHeapEntryFactory.isExpired(address)) { ice = offHeapEntryFactory.fromMemory(address); } entryRemoved(address); @@ -583,12 +586,15 @@ private void executeTask(Consumer lock.lock(); try { checkDeallocation(); + long now = timeService.wallClockTime(); for (int j = i; j < memoryAddressCount; j += lockCount) { long address = memoryLookup.getMemoryAddressOffset(j); while (address != 0) { long nextAddress = offHeapEntryFactory.getNext(address); InternalCacheEntry ice = offHeapEntryFactory.fromMemory(address); - consumer.accept(ice); + if (!ice.isExpired(now)) { + consumer.accept(ice); + } address = nextAddress; } } @@ -618,7 +624,7 @@ public void executeTask(KeyValueFilter> entryStream() { + private Stream> entryStreamIncludingExpired() { return IntStream.range(0, memoryAddressCount) .mapToObj(a -> { Lock lock = locks.getLockWithOffset(a % lockCount).readLock(); @@ -642,14 +648,18 @@ private Stream> entryStream() { }).flatMap(Function.identity()); } + private Stream> entryStream() { + long now = timeService.wallClockTime(); + return entryStreamIncludingExpired().filter(e -> !e.isExpired(now)); + } + @Override public Iterator> iterator() { - long time = timeService.wallClockTime(); - return entryStream().filter(e -> !e.isExpired(time)).iterator(); + return entryStream().iterator(); } @Override public Iterator> iteratorIncludingExpired() { - return entryStream().iterator(); + return entryStreamIncludingExpired().iterator(); } } diff --git a/core/src/main/java/org/infinispan/container/offheap/OffHeapEntryFactory.java b/core/src/main/java/org/infinispan/container/offheap/OffHeapEntryFactory.java index 01e3c0fa044d..adeba33d69a1 100644 --- a/core/src/main/java/org/infinispan/container/offheap/OffHeapEntryFactory.java +++ b/core/src/main/java/org/infinispan/container/offheap/OffHeapEntryFactory.java @@ -63,4 +63,12 @@ public interface OffHeapEntryFactory { * @return whether or not the keys are equal */ boolean equalsKey(long address, WrappedBytes wrappedBytes); + + + /** + * Returns whether entry is expired or not. + * @param address the address of the entry's key to check + * @return {@code true} if the entry is expired, {@code false} otherwise + */ + boolean isExpired(long address); } diff --git a/core/src/main/java/org/infinispan/container/offheap/OffHeapEntryFactoryImpl.java b/core/src/main/java/org/infinispan/container/offheap/OffHeapEntryFactoryImpl.java index f41e3fc5fcba..1d18d3f2a848 100644 --- a/core/src/main/java/org/infinispan/container/offheap/OffHeapEntryFactoryImpl.java +++ b/core/src/main/java/org/infinispan/container/offheap/OffHeapEntryFactoryImpl.java @@ -8,6 +8,7 @@ import org.infinispan.commons.marshall.WrappedBytes; import org.infinispan.configuration.cache.Configuration; import org.infinispan.container.InternalEntryFactory; +import org.infinispan.container.entries.ExpiryHelper; import org.infinispan.container.entries.InternalCacheEntry; import org.infinispan.container.versioning.EntryVersion; import org.infinispan.factories.annotations.Inject; @@ -359,6 +360,78 @@ public boolean equalsKey(long address, WrappedBytes wrappedBytes) { return true; } + /** + * Returns whether entry is expired. + * @param address the address of the entry to check + * @return {@code true} if the entry is expired, {@code false} otherwise + */ + @Override + public boolean isExpired(long address) { + // 16 bytes for eviction if needed (optional) + // 8 bytes for linked pointer + int offset = evictionEnabled ? 24 : 8; + + byte metadataType = MEMORY.getByte(address, offset); + if ((metadataType & IMMORTAL) != 0) { + return false; + } + // type + offset += 1; + // hashCode + offset += 4; + // key length + int keyLength = MEMORY.getInt(address, offset); + offset += 4; + + long now = timeService.wallClockTime(); + + byte[] metadataBytes; + if ((metadataType & CUSTOM) == CUSTOM) { + // TODO: this needs to be fixed in ISPN-8539 + return false; +// int metadataLength = MEMORY.getInt(address, offset); +// metadataBytes = new byte[metadataLength]; +// +// // value and keyLength +// offset += 4 + keyLength; +// +// MEMORY.getBytes(address, offset, metadataBytes, 0, metadataBytes.length); +// +// Metadata metadata; +// try { +// metadata = (Metadata) marshaller.objectFromByteBuffer(metadataBytes); +// // TODO: custom metadata is not implemented properly for expiration +// return false; +// } catch (IOException | ClassNotFoundException e) { +// throw new CacheException(e); +// } + } else { + // value and keyLength + offset += 4 + keyLength; + + switch (metadataType & 0xFC) { + case MORTAL: + metadataBytes = new byte[16]; + MEMORY.getBytes(address, offset, metadataBytes, 0, metadataBytes.length); + return ExpiryHelper.isExpiredMortal(Bits.getLong(metadataBytes, 0), Bits.getLong(metadataBytes, 8), now); + case TRANSIENT: + metadataBytes = new byte[16]; + MEMORY.getBytes(address, offset, metadataBytes, 0, metadataBytes.length); + return ExpiryHelper.isExpiredTransient(Bits.getLong(metadataBytes, 0), Bits.getLong(metadataBytes, 8), now); + case TRANSIENT_MORTAL: + metadataBytes = new byte[32]; + MEMORY.getBytes(address, offset, metadataBytes, 0, metadataBytes.length); + long lifespan = Bits.getLong(metadataBytes, 0); + long maxIdle = Bits.getLong(metadataBytes, 8); + long created = Bits.getLong(metadataBytes, 16); + long lastUsed = Bits.getLong(metadataBytes, 24); + return ExpiryHelper.isExpiredTransientMortal(maxIdle, lastUsed, lifespan, created, now); + default: + return false; + } + } + } + static private boolean requiresMetadataSize(byte type) { return (type & (CUSTOM | HAS_VERSION)) != 0; } diff --git a/core/src/test/java/org/infinispan/expiration/impl/ExpirationFunctionalTest.java b/core/src/test/java/org/infinispan/expiration/impl/ExpirationFunctionalTest.java index c2e6961396bb..fdcb7a69aaa1 100644 --- a/core/src/test/java/org/infinispan/expiration/impl/ExpirationFunctionalTest.java +++ b/core/src/test/java/org/infinispan/expiration/impl/ExpirationFunctionalTest.java @@ -1,10 +1,14 @@ package org.infinispan.expiration.impl; import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertNull; import java.util.concurrent.TimeUnit; import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.configuration.cache.StorageType; +import org.infinispan.filter.KeyFilter; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.test.SingleCacheManagerTest; import org.infinispan.test.TestingUtil; @@ -18,6 +22,20 @@ public class ExpirationFunctionalTest extends SingleCacheManagerTest { protected final int SIZE = 10; protected ControlledTimeService timeService = new ControlledTimeService(); + protected StorageType storage; + + public Object[] factory() { + return new Object[]{ + new ExpirationFunctionalTest().withStorage(StorageType.BINARY), + new ExpirationFunctionalTest().withStorage(StorageType.OBJECT), + new ExpirationFunctionalTest().withStorage(StorageType.OFF_HEAP) + }; + } + + @Override + protected String parameters() { + return "[" + storage + "]"; + } protected EmbeddedCacheManager createCacheManager() throws Exception { ConfigurationBuilder builder = TestCacheManagerFactory.getDefaultCacheConfiguration(false); @@ -34,15 +52,25 @@ protected EmbeddedCacheManager createCacheManager(ConfigurationBuilder builder) } protected void configure(ConfigurationBuilder config) { - config.expiration().disableReaper(); + config + .expiration().disableReaper() + .memory().storageType(storage); } protected void afterCacheCreated(EmbeddedCacheManager cm) { } - public void testSimpleExpirationLifespan() throws Exception { + public ExpirationFunctionalTest withStorage(StorageType storage) { + this.storage = storage; + return this; + } + public StorageType getStorageType() { + return storage; + } + + public void testSimpleExpirationLifespan() throws Exception { for (int i = 0; i < SIZE; i++) { cache.put("key-" + i, "value-" + i, 1, TimeUnit.MILLISECONDS); } @@ -51,11 +79,60 @@ public void testSimpleExpirationLifespan() throws Exception { } public void testSimpleExpirationMaxIdle() throws Exception { - for (int i = 0; i < SIZE; i++) { cache.put("key-" + i, "value-" + i,-1, null, 1, TimeUnit.MILLISECONDS); } timeService.advance(2); assertEquals(0, cache.size()); } + + public void testExpirationLifespanInOps() throws Exception { + for (int i = 0; i < SIZE; i++) { + cache.put("key-" + i, "value-" + i, 1, TimeUnit.MILLISECONDS); + } + timeService.advance(2); + + for (int i = 0; i < SIZE; i++) { + assertFalse(cache.containsKey("key-" + 1)); + assertNull(cache.get("key-" + i)); + assertNull(cache.remove("key-" + i)); + } + } + + public void testExpirationMaxIdleInOps() throws Exception { + for (int i = 0; i < SIZE; i++) { + cache.put("key-" + i, "value-" + i,-1, null, 1, TimeUnit.MILLISECONDS); + } + timeService.advance(2); + + for (int i = 0; i < SIZE; i++) { + assertFalse(cache.containsKey("key-" + 1)); + assertNull(cache.get("key-" + i)); + assertNull(cache.remove("key-" + i)); + } + } + + public void testExpirationLifespanInExec() throws Exception { + for (int i = 0; i < SIZE; i++) { + cache.put("key-" + i, "value-" + i, 1, TimeUnit.MILLISECONDS); + } + timeService.advance(2); + + for (int i = 0; i < SIZE; i++) { + cache.getAdvancedCache().getDataContainer().executeTask(KeyFilter.ACCEPT_ALL_FILTER, + (k, ice) -> { throw new RuntimeException("No task should be executed on expired entry"); }); + } + } + + public void testExpirationMaxIdleInExec() throws Exception { + for (int i = 0; i < SIZE; i++) { + cache.put("key-" + i, "value-" + i,-1, null, 1, TimeUnit.MILLISECONDS); + } + timeService.advance(2); + + for (int i = 0; i < SIZE; i++) { + cache.getAdvancedCache().getDataContainer().executeTask(KeyFilter.ACCEPT_ALL_FILTER, + (k, ice) -> { throw new RuntimeException("No task should be executed on expired entry"); }); + } + } }