Permalink
Browse files

ISPN-2856 Implement support for JSR-107 expired entry notification

* Use an interceptor to detect when an entry is expired, and based
on that send the notification to any registered listeners.
  • Loading branch information...
1 parent 1aa2554 commit 088eebae91a0f00b1e8c341be6efa5fe8226c5b9 @galderz galderz committed with maniksurtani Feb 26, 2013
@@ -45,7 +45,7 @@
private static final boolean trace = log.isTraceEnabled();
private InternalCacheEntry remotelyFetchedValue;
- public GetKeyValueCommand(Object key, Set<Flag> flags) {
+ public GetKeyValueCommand(Object key, Set<Flag> flags) {
this.key = key;
this.flags = flags;
}
@@ -67,15 +67,15 @@
private final AdvancedCache<K, V> skipListenerCache;
private final CacheStatisticsMXBean stats;
private final CacheMXBean mxBean;
- private final JCacheNotifier<K, V> notifier = new JCacheNotifier<K, V>();
+ private final JCacheNotifier<K, V> notifier;
private final ExpiryPolicy<? super K, ? super V> expiryPolicy;
private final ExecutorService executorService = Executors.newFixedThreadPool(1);
private final LockContainer processorLocks = new ReentrantPerEntryLockContainer(32);
private final long lockTimeout; // milliseconds
- public JCache(AdvancedCache<K, V> cache,
- JCacheManager cacheManager, Configuration<K, V> c) {
+ public JCache(AdvancedCache<K, V> cache, JCacheManager cacheManager,
+ JCacheNotifier<K, V> notifier, Configuration<K, V> c) {
super();
this.cache = cache;
this.ignoreReturnValuesCache = cache.withFlags(Flag.IGNORE_RETURN_VALUES);
@@ -91,7 +91,7 @@ public JCache(AdvancedCache<K, V> cache,
this.expiryPolicy = configuration.getExpiryPolicy();
this.lockTimeout = cache.getCacheConfiguration()
.locking().lockAcquisitionTimeout();
-
+ this.notifier = notifier;
for (CacheEntryListenerRegistration<? super K, ? super V> r
: c.getCacheEntryListenerRegistrations()) {
notifier.addListener(r);
@@ -110,7 +110,8 @@ public void start() {
cache.start();
// Add listener as they were wiped out on stop
- cache.addListener(new JCacheListenerAdapter<K, V>(this));
+ // TODO: Why not add listener only when a listener is actually registered?
+ cache.addListener(new JCacheListenerAdapter<K, V>(this, notifier));
}
@Override
@@ -608,10 +609,6 @@ public long size() {
return cache.size();
}
- public JCacheNotifier<K, V> getNotifier() {
- return notifier;
- }
-
private void checkStarted() {
if (!getStatus().equals(Status.STARTED)) {
throw new IllegalStateException("Cache is in " + getStatus() + " state");
@@ -18,7 +18,7 @@
*/
package org.infinispan.jcache;
-import javax.cache.event.CacheEntryEvent;
+import javax.cache.Cache;
import org.infinispan.jcache.logging.Log;
import org.infinispan.notifications.Listener;
@@ -47,60 +47,44 @@
private static final boolean isTrace = log.isTraceEnabled();
- private final JCache<K, V> cache;
+ private final Cache<K, V> cache;
+ private final JCacheNotifier<K, V> notifier;
- public JCacheListenerAdapter(JCache<K, V> cache) {
+ public JCacheListenerAdapter(Cache<K, V> cache, JCacheNotifier<K, V> notifier) {
this.cache = cache;
+ this.notifier = notifier;
}
@CacheEntryCreated
@SuppressWarnings("unused")
public void handleCacheEntryCreatedEvent(CacheEntryCreatedEvent<K, V> e) {
// JCache listeners notified only once, so do it after the event
- if (!e.isPre()) {
- JCacheNotifier<K, V> notifier = cache.getNotifier();
- CacheEntryEvent<? extends K, ? extends V> event =
- new RICacheEntryEvent<K, V>(cache, e.getKey(), e.getValue());
- if (isTrace) log.tracef("Received event: %s", e);
- notifier.notifyEntryCreated(event);
- }
+ if (!e.isPre())
+ notifier.notifyEntryCreated(cache, e.getKey(), e.getValue());
}
@CacheEntryModified
@SuppressWarnings("unused")
public void handleCacheEntryModifiedEvent(CacheEntryModifiedEvent<K, V> e) {
// JCache listeners notified only once, so do it after the event
- if (!e.isPre() && !e.isCreated()) {
- JCacheNotifier<K, V> notifier = cache.getNotifier();
- CacheEntryEvent<? extends K, ? extends V> event =
- new RICacheEntryEvent<K, V>(cache, e.getKey(), e.getValue());
- if (isTrace) log.tracef("Received event: %s", e);
- notifier.notifyEntryUpdated(event);
- }
+ if (!e.isPre() && !e.isCreated())
+ notifier.notifyEntryUpdated(cache, e.getKey(), e.getValue());
}
@CacheEntryRemoved
@SuppressWarnings("unused")
public void handleCacheEntryRemovedEvent(CacheEntryRemovedEvent<K, V> e) {
// JCache listeners notified only once, so do it after the event
- if (!e.isPre()) {
- JCacheNotifier<K, V> notifier = cache.getNotifier();
- CacheEntryEvent<? extends K, ? extends V> event =
- new RICacheEntryEvent<K, V>(cache, e.getKey(), e.getOldValue());
- notifier.notifyEntryRemoved(event);
- }
+ if (!e.isPre())
+ notifier.notifyEntryRemoved(cache, e.getKey(), e.getOldValue());
}
@CacheEntryVisited
@SuppressWarnings("unused")
public void handleCacheEntryVisitedEvent(CacheEntryVisitedEvent<K, V> e) {
// JCache listeners notified only once, so do it after the event
- if (!e.isPre()) {
- JCacheNotifier<K, V> notifier = cache.getNotifier();
- CacheEntryEvent<? extends K, ? extends V> event =
- new RICacheEntryEvent<K, V>(cache, e.getKey(), e.getValue());
- notifier.notifyEntryRead(event);
- }
+ if (!e.isPre())
+ notifier.notifyEntryRead(cache, e.getKey(), e.getValue());
}
}
@@ -43,6 +43,8 @@
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
import org.infinispan.configuration.parsing.ParserRegistry;
+import org.infinispan.interceptors.EntryWrappingInterceptor;
+import org.infinispan.jcache.interceptor.ExpirationTrackingInterceptor;
import org.infinispan.jcache.logging.Log;
import org.infinispan.loaders.CacheLoaderManager;
import org.infinispan.manager.DefaultCacheManager;
@@ -106,7 +108,7 @@ public JCacheManager(String name, ClassLoader classLoader) {
for (String cacheName : cacheNames) {
caches.put(cacheName, new JCache<Object, Object>(
cm.getCache(cacheName).getAdvancedCache(),
- this, null));
+ this, new JCacheNotifier<Object, Object>(), null));
}
status = Status.STARTED;
@@ -193,7 +195,13 @@ public Status getStatus() {
ispnCacheStore.setCacheWriter(cacheWriter);
}
- cache = new JCache<K, V>(ispnCache, this, c);
+ JCacheNotifier<K, V> notifier = new JCacheNotifier<K, V>();
+ cache = new JCache<K, V>(ispnCache, this, notifier, c);
+
+ ispnCache.addInterceptorBefore(new ExpirationTrackingInterceptor(
+ ispnCache.getDataContainer(), cache, notifier),
+ EntryWrappingInterceptor.class);
+
cache.start();
caches.put(cache.getName(), cache);
} else {
@@ -26,9 +26,11 @@
import org.infinispan.jcache.logging.Log;
import org.infinispan.util.logging.LogFactory;
+import javax.cache.Cache;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
+import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
import javax.cache.event.CacheEntryListenerRegistration;
import javax.cache.event.CacheEntryReadListener;
@@ -68,6 +70,9 @@
private final List<CacheEntryListenerRegistration<? super K, ? super V>> readListeners =
new CopyOnWriteArrayList<CacheEntryListenerRegistration<? super K, ? super V>>();
+ private final List<CacheEntryListenerRegistration<? super K, ? super V>> expiredListeners =
+ new CopyOnWriteArrayList<CacheEntryListenerRegistration<? super K, ? super V>>();
+
public void addListener(CacheEntryListenerRegistration<? super K, ? super V> reg) {
addListener(reg, false);
}
@@ -90,49 +95,70 @@ public boolean removeListener(CacheEntryListener<?, ?> listener) {
if (listener instanceof CacheEntryReadListener)
removed = removeListener(listener, readListeners);
+ if (listener instanceof CacheEntryExpiredListener)
+ removed = removeListener(listener, expiredListeners);
+
return removed;
}
@SuppressWarnings("unchecked")
- public void notifyEntryCreated(CacheEntryEvent<? extends K, ? extends V> event) {
- List<CacheEntryEvent<? extends K, ? extends V>> events =
- Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event);
- for (CacheEntryListenerRegistration<? super K, ? super V> reg : createdListeners) {
- ((CacheEntryCreatedListener<K, V>) reg.getCacheEntryListener())
- .onCreated(getEntryIterable(events, reg));
+ public void notifyEntryCreated(Cache<K, V> cache, K key, V value) {
+ if (!createdListeners.isEmpty()) {
+ List<CacheEntryEvent<? extends K, ? extends V>> events =
+ createEvent(cache, key, value);
+ for (CacheEntryListenerRegistration<? super K, ? super V> reg : createdListeners) {
+ ((CacheEntryCreatedListener<K, V>) reg.getCacheEntryListener())
+ .onCreated(getEntryIterable(events, reg));
+ }
}
}
@SuppressWarnings("unchecked")
- public void notifyEntryUpdated(CacheEntryEvent<? extends K, ? extends V> event) {
- List<CacheEntryEvent<? extends K, ? extends V>> events =
- Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event);
- if (isTrace) log.tracef("Registered update listeners: %s", updatedListeners);
- for (CacheEntryListenerRegistration<? super K, ? super V> reg : updatedListeners) {
- CacheEntryUpdatedListener<K, V> listener =
- (CacheEntryUpdatedListener<K, V>) reg.getCacheEntryListener();
- if (isTrace) log.tracef("Executing onUpdated for listener %s", listener);
- listener.onUpdated(getEntryIterable(events, reg));
+ public void notifyEntryUpdated(Cache<K, V> cache, K key, V value) {
+ if (!updatedListeners.isEmpty()) {
+ List<CacheEntryEvent<? extends K, ? extends V>> events =
+ createEvent(cache, key, value);
+ for (CacheEntryListenerRegistration<? super K, ? super V> reg : updatedListeners) {
+ CacheEntryUpdatedListener<K, V> listener =
+ (CacheEntryUpdatedListener<K, V>) reg.getCacheEntryListener();
+ listener.onUpdated(getEntryIterable(events, reg));
+ }
}
}
@SuppressWarnings("unchecked")
- public void notifyEntryRemoved(CacheEntryEvent<? extends K, ? extends V> event) {
- List<CacheEntryEvent<? extends K, ? extends V>> events =
- Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event);
- for (CacheEntryListenerRegistration<? super K, ? super V> reg : removedListeners) {
- ((CacheEntryRemovedListener<K, V>) reg.getCacheEntryListener())
- .onRemoved(getEntryIterable(events, reg));
+ public void notifyEntryRemoved(Cache<K, V> cache, K key, V value) {
+ if (!removedListeners.isEmpty()) {
+ List<CacheEntryEvent<? extends K, ? extends V>> events =
+ createEvent(cache, key, value);
+ for (CacheEntryListenerRegistration<? super K, ? super V> reg : removedListeners) {
+ ((CacheEntryRemovedListener<K, V>) reg.getCacheEntryListener())
+ .onRemoved(getEntryIterable(events, reg));
+ }
}
}
@SuppressWarnings("unchecked")
- public void notifyEntryRead(CacheEntryEvent<? extends K, ? extends V> event) {
- List<CacheEntryEvent<? extends K, ? extends V>> events =
- Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(event);
- for (CacheEntryListenerRegistration<? super K, ? super V> reg : readListeners) {
- ((CacheEntryReadListener<K, V>) reg.getCacheEntryListener())
- .onRead(getEntryIterable(events, reg));
+ public void notifyEntryRead(Cache<K, V> cache, K key, V value) {
+ if (!readListeners.isEmpty()) {
+ List<CacheEntryEvent<? extends K, ? extends V>> events =
+ createEvent(cache, key, value);
+ for (CacheEntryListenerRegistration<? super K, ? super V> reg : readListeners) {
+ ((CacheEntryReadListener<K, V>) reg.getCacheEntryListener())
+ .onRead(getEntryIterable(events, reg));
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void notifyEntryExpired(Cache<K, V> cache, K key, V value) {
+ if (!expiredListeners.isEmpty()) {
+ List<CacheEntryEvent<? extends K, ? extends V>> events =
+ createEvent(cache, key, value);
+ for (CacheEntryListenerRegistration<? super K, ? super V> reg : expiredListeners) {
+ ((CacheEntryExpiredListener<K, V>) reg.getCacheEntryListener())
+ .onExpired(getEntryIterable(events, reg));
+ }
}
}
@@ -144,37 +170,51 @@ public void notifyEntryRead(CacheEntryEvent<? extends K, ? extends V> event) {
: new JCacheEventFilteringIterable<K, V>(events, filter);
}
- private boolean addListener(CacheEntryListenerRegistration<? super K, ? super V> reg, boolean ifAbsent) {
+ private boolean addListener(CacheEntryListenerRegistration<? super K, ? super V> reg,
+ boolean addIfAbsent) {
boolean added = false;
CacheEntryListener<? super K, ? super V> listener = reg.getCacheEntryListener();
if (listener instanceof CacheEntryCreatedListener) {
- if (!ifAbsent || !containsListener(listener, createdListeners))
- added = createdListeners.add(reg);
+ added = addListener(addIfAbsent, reg, listener, createdListeners);
}
if (listener instanceof CacheEntryUpdatedListener) {
- if (!ifAbsent || !containsListener(listener, updatedListeners))
- added = updatedListeners.add(reg);
+ added = addListener(addIfAbsent, reg, listener, updatedListeners);
}
if (listener instanceof CacheEntryRemovedListener) {
- if (!ifAbsent || !containsListener(listener, removedListeners))
- added = removedListeners.add(reg);
+ added = addListener(addIfAbsent, reg, listener, removedListeners);
}
if (listener instanceof CacheEntryReadListener) {
- if (!ifAbsent || !containsListener(listener, readListeners))
- added = readListeners.add(reg);
+ added = addListener(addIfAbsent, reg, listener, readListeners);
+ }
+
+ if (listener instanceof CacheEntryExpiredListener) {
+ added = addListener(addIfAbsent, reg, listener, expiredListeners);
}
return added;
}
- private boolean containsListener(CacheEntryListener<? super K, ? super V> listener,
+ private boolean addListener(boolean addIfAbsent,
+ CacheEntryListenerRegistration<? super K, ? super V> reg,
+ CacheEntryListener<? super K, ? super V> listener,
List<CacheEntryListenerRegistration<? super K, ? super V>> listeners) {
- for (CacheEntryListenerRegistration<? super K, ? super V> reg : listeners) {
- if (reg.getCacheEntryListener().equals(listener))
- return true;
+ return !containsListener(addIfAbsent, listener, listeners)
+ && listeners.add(reg);
+
+ }
+
+ private boolean containsListener(boolean addIfAbsent,
+ CacheEntryListener<? super K, ? super V> listener,
+ List<CacheEntryListenerRegistration<? super K, ? super V>> listeners) {
+ // If add only if no listener present, check the listeners collection
+ if (addIfAbsent) {
+ for (CacheEntryListenerRegistration<? super K, ? super V> reg : listeners) {
+ if (reg.getCacheEntryListener().equals(listener))
+ return true;
+ }
}
return false;
@@ -190,4 +230,13 @@ private boolean removeListener(CacheEntryListener<?, ?> listener,
return false;
}
+ private List<CacheEntryEvent<? extends K, ? extends V>> createEvent(
+ Cache<K, V> cache, K key, V value) {
+ List<CacheEntryEvent<? extends K, ? extends V>> events =
+ Collections.<CacheEntryEvent<? extends K, ? extends V>>singletonList(
+ new RICacheEntryEvent<K, V>(cache, key, value));
+ if (isTrace) log.tracef("Received event: %s", events);
+ return events;
+ }
+
}
Oops, something went wrong.

0 comments on commit 088eeba

Please sign in to comment.