Skip to content

Commit

Permalink
ISPN-6405 Persistence configuration with clustered cache can cause
Browse files Browse the repository at this point in the history
duplicate expiration messages

* Fixed issue where reaper caused 2 expirations in cluster cache
  • Loading branch information
wburns authored and tristantarrant committed Apr 29, 2016
1 parent e23b912 commit 7e08e15
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 70 deletions.
Expand Up @@ -118,8 +118,8 @@ public byte getCommandId() {
public String toString() { public String toString() {
return "RemoveExpiredCommand{" + return "RemoveExpiredCommand{" +
"key=" + key + "key=" + key +
"value=" + value + ", value=" + value +
"lifespan=" + lifespan + ", lifespan=" + lifespan +
'}'; '}';
} }


Expand Down
Expand Up @@ -69,7 +69,7 @@ public void processExpiration() {
InternalCacheEntry<K, V> e = purgeCandidates.next(); InternalCacheEntry<K, V> e = purgeCandidates.next();
if (e.canExpire()) { if (e.canExpire()) {
if (ExpiryHelper.isExpiredMortal(e.getLifespan(), e.getCreated(), currentTimeMillis)) { if (ExpiryHelper.isExpiredMortal(e.getLifespan(), e.getCreated(), currentTimeMillis)) {
handleLifespanExpireEntry(e); handleLifespanExpireEntry(e, true);
} else if (ExpiryHelper.isExpiredTransient(e.getMaxIdle(), e.getLastUsed(), currentTimeMillis)) { } else if (ExpiryHelper.isExpiredTransient(e.getMaxIdle(), e.getLastUsed(), currentTimeMillis)) {
super.handleInMemoryExpiration(e, currentTimeMillis); super.handleInMemoryExpiration(e, currentTimeMillis);
} }
Expand All @@ -89,21 +89,26 @@ public void processExpiration() {
} }
} }


void handleLifespanExpireEntry(InternalCacheEntry<K, V> entry) { void handleLifespanExpireEntry(InternalCacheEntry<K, V> entry, boolean sync) {
K key = entry.getKey(); K key = entry.getKey();
// The most used case will be a miss so no extra read before // The most used case will be a miss so no extra read before
if (expiring.putIfAbsent(key, key) == null) { if (expiring.putIfAbsent(key, key) == null) {
long lifespan = entry.getLifespan(); long lifespan = entry.getLifespan();
if (trace) { if (trace) {
log.tracef("Submitting expiration removal for key %s which had lifespan of %s", key, lifespan); log.tracef("Submitting expiration removal for key %s which had lifespan of %s", key, lifespan);
} }
asyncExecutor.submit(() -> { Runnable runnable = () -> {
try { try {
removeExpired(key, entry.getValue(), lifespan); removeExpired(key, entry.getValue(), lifespan);
} finally { } finally {
expiring.remove(key); expiring.remove(key);
} }
}); };
if (sync) {
runnable.run();
} else {
asyncExecutor.submit(runnable);
}
} }
} }


Expand All @@ -117,7 +122,7 @@ public void handleInMemoryExpiration(InternalCacheEntry<K, V> entry, long curren
// so we can see both the new value and the metadata // so we can see both the new value and the metadata
synchronized (entry) { synchronized (entry) {
if (ExpiryHelper.isExpiredMortal(entry.getLifespan(), entry.getCreated(), currentTime)) { if (ExpiryHelper.isExpiredMortal(entry.getLifespan(), entry.getCreated(), currentTime)) {
handleLifespanExpireEntry(entry); handleLifespanExpireEntry(entry, false);
} else { } else {
super.handleInMemoryExpiration(entry, currentTime); super.handleInMemoryExpiration(entry, currentTime);
} }
Expand All @@ -126,25 +131,27 @@ public void handleInMemoryExpiration(InternalCacheEntry<K, V> entry, long curren


@Override @Override
public void handleInStoreExpiration(K key) { public void handleInStoreExpiration(K key) {
expiring.put(key, key); if (expiring.putIfAbsent(key, key) == null) {
// Unfortunately stores don't pull the entry so we can't tell exactly why it expired and thus we have to remove // Unfortunately stores don't pull the entry so we can't tell exactly why it expired and thus we have to remove
// the entire value. Unfortunately this could cause a concurrent write to be undone // the entire value. Unfortunately this could cause a concurrent write to be undone
try { try {
removeExpired(key, null, null); removeExpired(key, null, null);
} finally { } finally {
expiring.remove(key); expiring.remove(key);
}
} }
} }


@Override @Override
public void handleInStoreExpiration(MarshalledEntry<K, V> marshalledEntry) { public void handleInStoreExpiration(MarshalledEntry<K, V> marshalledEntry) {
K key = marshalledEntry.getKey(); K key = marshalledEntry.getKey();
expiring.put(key, key); if (expiring.putIfAbsent(key, key) == null) {
try { try {
InternalMetadata metadata = marshalledEntry.getMetadata(); InternalMetadata metadata = marshalledEntry.getMetadata();
removeExpired(key, marshalledEntry.getValue(), metadata.lifespan() == -1 ? null : metadata.lifespan()); removeExpired(key, marshalledEntry.getValue(), metadata.lifespan() == -1 ? null : metadata.lifespan());
} finally { } finally {
expiring.remove(key); expiring.remove(key);
}
} }
} }
} }
Expand Up @@ -659,48 +659,42 @@ private void mergeFreeEntries(List<FileEntry> entries) {


@Override @Override
public void purge(Executor threadPool, final PurgeListener task) { public void purge(Executor threadPool, final PurgeListener task) {

long now = timeService.wallClockTime();
threadPool.execute(new Runnable() { List<KeyValuePair<Object, FileEntry>> entriesToPurge = new ArrayList<KeyValuePair<Object, FileEntry>>();
@Override synchronized (entries) {
public void run() { for (Iterator<Map.Entry<K, FileEntry>> it = entries.entrySet().iterator(); it.hasNext(); ) {
long now = timeService.wallClockTime(); Map.Entry<K, FileEntry> next = it.next();
List<KeyValuePair<Object, FileEntry>> entriesToPurge = new ArrayList<KeyValuePair<Object, FileEntry>>(); FileEntry fe = next.getValue();
synchronized (entries) { if (fe.isExpired(now)) {
for (Iterator<Map.Entry<K, FileEntry>> it = entries.entrySet().iterator(); it.hasNext(); ) { it.remove();
Map.Entry<K, FileEntry> next = it.next(); entriesToPurge.add(new KeyValuePair<Object, FileEntry>(next.getKey(), fe));
FileEntry fe = next.getValue();
if (fe.isExpired(now)) {
it.remove();
entriesToPurge.add(new KeyValuePair<Object, FileEntry>(next.getKey(), fe));
}
}
} }
}
}


resizeLock.readLock().lock(); resizeLock.readLock().lock();
try { try {
for (Iterator<KeyValuePair<Object, FileEntry>> it = entriesToPurge.iterator(); it.hasNext(); ) { for (Iterator<KeyValuePair<Object, FileEntry>> it = entriesToPurge.iterator(); it.hasNext(); ) {
KeyValuePair<Object, FileEntry> next = it.next(); KeyValuePair<Object, FileEntry> next = it.next();
FileEntry fe = next.getValue(); FileEntry fe = next.getValue();
if (fe.isExpired(now)) { if (fe.isExpired(now)) {
it.remove(); it.remove();
try { try {
free(fe); free(fe);
} catch (Exception e) { } catch (Exception e) {
throw new PersistenceException(e); throw new PersistenceException(e);
}
if (task != null) task.entryPurged(next.getKey());
}
}

// Disk space optimizations
synchronized (freeList) {
processFreeEntries();
} }
} finally { if (task != null) task.entryPurged(next.getKey());
resizeLock.readLock().unlock();
} }
} }
});
// Disk space optimizations
synchronized (freeList) {
processFreeEntries();
}
} finally {
resizeLock.readLock().unlock();
}
} }


@Override @Override
Expand Down
Expand Up @@ -23,7 +23,10 @@ public interface AdvancedCacheWriter<K, V> extends CacheWriter<K, V> {
/** /**
* Using the thread in the pool, removed all the expired data from the persistence storage. For each removed entry, * Using the thread in the pool, removed all the expired data from the persistence storage. For each removed entry,
* the supplied listener is invoked. * the supplied listener is invoked.
* * <p>
* When this method returns all entries will be purged and no tasks will be running due to this loader in the
* provided executor. If however an exception is thrown there could be tasks still pending or running in the
* executor.
* @throws PersistenceException in case of an error, e.g. communicating with the external storage * @throws PersistenceException in case of an error, e.g. communicating with the external storage
*/ */
void purge(Executor threadPool, PurgeListener<? super K> listener); void purge(Executor threadPool, PurgeListener<? super K> listener);
Expand Down
Expand Up @@ -22,15 +22,19 @@ public class ExpirationFunctionalTest extends SingleCacheManagerTest {
protected EmbeddedCacheManager createCacheManager() throws Exception { protected EmbeddedCacheManager createCacheManager() throws Exception {
ConfigurationBuilder builder = TestCacheManagerFactory.getDefaultCacheConfiguration(false); ConfigurationBuilder builder = TestCacheManagerFactory.getDefaultCacheConfiguration(false);
configure(builder); configure(builder);
EmbeddedCacheManager cm = TestCacheManagerFactory.createCacheManager(builder); EmbeddedCacheManager cm = createCacheManager(builder);
TestingUtil.replaceComponent(cm, TimeService.class, timeService, true); TestingUtil.replaceComponent(cm, TimeService.class, timeService, true);
cache = cm.getCache(); cache = cm.getCache();
afterCacheCreated(cm); afterCacheCreated(cm);
return cm; return cm;
} }


protected void configure(ConfigurationBuilder config) { protected EmbeddedCacheManager createCacheManager(ConfigurationBuilder builder) {
return TestCacheManagerFactory.createCacheManager(builder);
}


protected void configure(ConfigurationBuilder config) {
config.expiration().disableReaper();
} }


protected void afterCacheCreated(EmbeddedCacheManager cm) { protected void afterCacheCreated(EmbeddedCacheManager cm) {
Expand Down
@@ -0,0 +1,24 @@
package org.infinispan.expiration.impl;

import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.testng.annotations.Test;

@Test(groups = "functional", testName = "expiration.impl.ExpirationSingleFileStoreDistListenerFunctionalTest")
public class ExpirationSingleFileStoreDistListenerFunctionalTest extends ExpirationStoreListenerFunctionalTest {
@Override
protected void configure(ConfigurationBuilder config) {
config
// Prevent the reaper from running, reaperEnabled(false) doesn't work when a store is present
.expiration().wakeUpInterval(Long.MAX_VALUE)
.clustering().cacheMode(CacheMode.DIST_SYNC)
.persistence().addSingleFileStore();
}

@Override
protected EmbeddedCacheManager createCacheManager(ConfigurationBuilder builder) {
return TestCacheManagerFactory.createClusteredCacheManager(builder);
}
}
@@ -0,0 +1,17 @@
package org.infinispan.expiration.impl;

import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder;
import org.infinispan.persistence.file.SingleFileStore;
import org.testng.annotations.Test;

@Test(groups = "functional", testName = "expiration.impl.ExpirationSingleFileStoreListenerFunctionalTest")
public class ExpirationSingleFileStoreListenerFunctionalTest extends ExpirationStoreListenerFunctionalTest {
@Override
protected void configure(ConfigurationBuilder config) {
config
// Prevent the reaper from running, reaperEnabled(false) doesn't work when a store is present
.expiration().wakeUpInterval(Long.MAX_VALUE)
.persistence().addSingleFileStore();
}
}
Expand Up @@ -9,6 +9,9 @@ public class ExpirationStoreFunctionalTest extends ExpirationFunctionalTest {


@Override @Override
protected void configure(ConfigurationBuilder config) { protected void configure(ConfigurationBuilder config) {
config.persistence().addStore(DummyInMemoryStoreConfigurationBuilder.class); config
// Prevent the reaper from running, reaperEnabled(false) doesn't work when a store is present
.expiration().wakeUpInterval(Long.MAX_VALUE)
.persistence().addStore(DummyInMemoryStoreConfigurationBuilder.class);
} }
} }
Expand Up @@ -4,6 +4,7 @@
import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.cachelistener.event.CacheEntryExpiredEvent; import org.infinispan.notifications.cachelistener.event.CacheEntryExpiredEvent;
import org.infinispan.notifications.cachelistener.event.Event; import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.persistence.spi.AdvancedCacheExpirationWriter;
import org.infinispan.test.TestingUtil; import org.infinispan.test.TestingUtil;
import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
Expand Down Expand Up @@ -56,18 +57,20 @@ public void testExpirationOfStoreWhenDataNotInMemory() throws Exception {


assertEquals(1, listener.getInvocationCount()); assertEquals(1, listener.getInvocationCount());


CacheEntryExpiredEvent event = listener.events.iterator().next(); CacheEntryExpiredEvent event = listener.getEvents().iterator().next();
assertEquals(Event.Type.CACHE_ENTRY_EXPIRED, event.getType()); assertEquals(Event.Type.CACHE_ENTRY_EXPIRED, event.getType());
assertEquals(cache, event.getCache()); assertEquals(cache, event.getCache());
assertFalse(event.isPre()); assertFalse(event.isPre());
assertNotNull(event.getKey()); assertNotNull(event.getKey());
// The dummy store produces value and metadata so lets make sure // The dummy store produces value and metadata so lets make sure
assertEquals("v", event.getValue()); if (TestingUtil.getCacheLoader(cache) instanceof AdvancedCacheExpirationWriter) {
assertNotNull(event.getMetadata()); assertEquals("v", event.getValue());
assertNotNull(event.getMetadata());
}
} }


private void assertExpiredEvents(int count) { private void assertExpiredEvents(int count) {
assertEquals(count, listener.getInvocationCount()); eventuallyEquals(count, () -> listener.getInvocationCount());
listener.getEvents().forEach(event -> { listener.getEvents().forEach(event -> {
assertEquals(Event.Type.CACHE_ENTRY_EXPIRED, event.getType()); assertEquals(Event.Type.CACHE_ENTRY_EXPIRED, event.getType());
assertEquals(cache, event.getCache()); assertEquals(cache, event.getCache());
Expand Down
Expand Up @@ -3,35 +3,41 @@
import org.infinispan.notifications.Listener; import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired; import org.infinispan.notifications.cachelistener.annotation.CacheEntryExpired;
import org.infinispan.notifications.cachelistener.event.CacheEntryExpiredEvent; import org.infinispan.notifications.cachelistener.event.CacheEntryExpiredEvent;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;


@Listener @Listener
public class ExpiredCacheListener { public class ExpiredCacheListener {
List<CacheEntryExpiredEvent> events = new ArrayList<>(); private final static Log log = LogFactory.getLog(ExpiredCacheListener.class);
int invocationCount; private final List<CacheEntryExpiredEvent> events = Collections.synchronizedList(new ArrayList<>());
private final AtomicInteger invocationCount = new AtomicInteger();


public void reset() { public void reset() {
events.clear(); events.clear();
invocationCount = 0; invocationCount.set(0);
} }


public List<CacheEntryExpiredEvent> getEvents() { public List<CacheEntryExpiredEvent> getEvents() {
return events; return events;
} }


public int getInvocationCount() { public int getInvocationCount() {
return invocationCount; return invocationCount.get();
} }




// handler // handler


@CacheEntryExpired @CacheEntryExpired
public void handle(CacheEntryExpiredEvent e) { public void handle(CacheEntryExpiredEvent e) {
log.trace("Received event: " + e);
events.add(e); events.add(e);


invocationCount++; invocationCount.incrementAndGet();
} }
} }

0 comments on commit 7e08e15

Please sign in to comment.