Skip to content

Commit

Permalink
ISPN-15211 Listeners have no way to ignore a previous old value
Browse files Browse the repository at this point in the history
  • Loading branch information
wburns committed Oct 17, 2023
1 parent 93ef7d5 commit b5fa8d5
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void testRemovedEvent() {
withClientListener(l, remote -> {
l.expectNoEvents();
remote.remove(1);
l.expectNoEvents();
l.expectOnlyRemovedEvent(1);
remote.put(1, "one");
l.expectOnlyCreatedEvent(1);
remote.remove(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1959,7 +1959,8 @@ private EventImpl<K, V> convertEventToRequestFormat(EventImpl<K, V> eventImpl,
}
Object convertedKey = convertToRequestFormat(eventImpl.getKey(), keyFromFormat, keyDataConversion);
Object convertedValue = convertToRequestFormat(newValue, valueFromFormat, valueDataConversion);
Object convertedOldValue = convertToRequestFormat(eventImpl.getOldValue(), valueFromFormat, valueDataConversion);
Object convertedOldValue = converter == null || converter.includeOldValue() ?
convertToRequestFormat(eventImpl.getOldValue(), valueFromFormat, valueDataConversion) : null;
EventImpl<K, V> clone = eventImpl.clone();
clone.setKey((K) convertedKey);
clone.setValue((V) convertedValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,17 @@ default boolean useRequestFormat() {
return false;
}

/**
* Whether the old value should be returned in the event with the converted value. This is useful when you only
* care about the converted value and do not want to send around the old value payload.
* When this is false a <b>null</b> value will be present for any event that has an oldValue.
* <p>
* Note the oldValue is only present in {@link org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent}
* and {@link org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent} for local events and
* only {@link org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent} for cluster listeners.
* @return whether the old value is included in any raised event.
*/
default boolean includeOldValue() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* {@link CacheEventFilterConverter} that uses an underlying {@link KeyValueFilterConverter} to do the conversion and
* filtering. The new value and metadata are used as arguments to the underlying filter converter as it doesn't take
* both new and old.
* both new and old. The old value is not returned in any event.
* @author wburns
* @since 9.4
*/
Expand Down Expand Up @@ -58,6 +58,12 @@ public MediaType format() {
return format;
}

@Override
public boolean includeOldValue() {
// No reason to include old if new value is only ever used for conversions
return false;
}

@Inject
protected void injectDependencies(ComponentRegistry cr) {
cr.wireDependencies(keyValueFilterConverter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,55 @@ public void testPreviousValueFilterEventRaisedBackupOwnerNode() {
verifySimpleModification(cache0, key, newValue, newExpiration, clusterListener, newValue);
}

public void testRemoveConverterEventRaisedBackupOwner() {
Cache<Object, String> cache0 = cache(0, CACHE_NAME);
Cache<Object, String> cache1 = cache(1, CACHE_NAME);

String previousValue = "myOldValue";
MagicKey key = new MagicKey(cache0, cache1);
// This event is ignored because no previous lifespan
cache0.put(key, previousValue);

ClusterListener clusterListener = listener();
cache1.addListener(clusterListener, null, new StringAppender());

// StringAppender doesn't include old value
verifySimpleRemove(cache0, key, clusterListener, null);
}

public void testRemoveConverterEventRaisedNonOwner() {
Cache<Object, String> cache0 = cache(0, CACHE_NAME);
Cache<Object, String> cache1 = cache(1, CACHE_NAME);
Cache<Object, String> cache2 = cache(2, CACHE_NAME);

String previousValue = "myOldValue";
MagicKey key = new MagicKey(cache0, cache1);
// This event is ignored because no previous lifespan
cache0.put(key, previousValue);

ClusterListener clusterListener = listener();
cache2.addListener(clusterListener, null, new StringAppender());

// StringAppender doesn't include old value
verifySimpleRemove(cache0, key, clusterListener, null);
}

public void testRemoveConverterEventRaisedLocalNode() {
Cache<Object, String> cache0 = cache(0, CACHE_NAME);
Cache<Object, String> cache1 = cache(1, CACHE_NAME);

String previousValue = "myOldValue";
MagicKey key = new MagicKey(cache0, cache1);
// This event is ignored because no previous lifespan
cache0.put(key, previousValue);

ClusterListener clusterListener = listener();
cache0.addListener(clusterListener, null, new StringAppender());

// StringAppender doesn't include old value
verifySimpleRemove(cache0, key, clusterListener, null);
}

@Test
public void testCacheEventFilterConverter() {
Cache<Object, String> cache0 = cache(0, CACHE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.withSettings;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNull;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -259,6 +260,11 @@ public static class StringAppender implements CacheEventConverter<Object, String
public String convert(Object key, String oldValue, Metadata oldMetadata, String newValue, Metadata newMetadata, EventType eventType) {
return oldValue + (oldMetadata != null ? oldMetadata.lifespan() : "null") + newValue + (newMetadata != null ? newMetadata.lifespan() : "null");
}

@Override
public boolean includeOldValue() {
return false;
}
}

public static class FilterConverter implements CacheEventFilterConverter<Object, Object, Object> {
Expand Down Expand Up @@ -308,6 +314,11 @@ protected void verifySimpleInsertion(Cache<Object, String> cache, Object key, St
verifySimpleInsertionEvents(listener, key, expectedValue);
}

protected void verifySimpleRemove(Cache<Object, String> cache, Object key, ClusterListener listener, Object expectedValue) {
cache.remove(key);
verifySimpleRemovalEvents(listener, key, expectedValue);
}

protected void verifySimpleModification(Cache<Object, String> cache, Object key, String value, Long lifespan,
ClusterListener listener, Object expectedValue) {
if (lifespan != null) {
Expand Down Expand Up @@ -336,6 +347,20 @@ protected void verifySimpleModificationEvents(ClusterListener listener, Object k
assertEquals(expectedValue, event.getValue());
}

protected void verifySimpleRemovalEvents(ClusterListener listener, Object key, Object oldValue) {
assertEquals(listener.hasIncludeState() ? 2 : 1, listener.events.size());
CacheEntryEvent event = listener.events.get(listener.hasIncludeState() ? 1 :0);

assertEquals(Event.Type.CACHE_ENTRY_REMOVED, event.getType());
assertEquals(key, event.getKey());
Object eventOldValue = ((CacheEntryRemovedEvent) event).getOldValue();
if (oldValue != null) {
assertEquals(oldValue, eventOldValue);
} else {
assertNull(eventOldValue);
}
}

protected void verifySimpleExpirationEvents(ClusterListener listener, int expectedNumEvents, Object key, Object expectedValue) {
eventually(() -> listener.events.size() >= expectedNumEvents);

Expand Down
6 changes: 6 additions & 0 deletions documentation/src/main/asciidoc/topics/upgrading.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ The default `availability-interval` has been increased to 30 seconds. The previo
The RESP endpoint cache now requires the key storage media type to be
application/octet-stream.

== Client listeners remove events

Client listeners remove events will now be propagated even if the remove did not remove a value.
This is required to properly support new changes around the new `includeOldValue` method on `CacheEventConverter`.
NOTE: Remote events do not include any values by default

= Upgrading to 14.0.14 or newer

== NearCache SPI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void testEventReceiveBasic() {
withClientListener(l, remote -> {
l.expectNoEvents();
remote.remove(1);
l.expectNoEvents();
l.expectOnlyRemovedEvent(1);
remote.put(1, "one");
assertEquals("one", getEmbeddedCache().get(1));
l.expectOnlyCreatedEvent(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,10 +409,8 @@ boolean isSendEvent(CacheEntryEvent<byte[], byte[]> event) {
switch (event.getType()) {
case CACHE_ENTRY_CREATED:
case CACHE_ENTRY_MODIFIED:
return !event.isPre();
case CACHE_ENTRY_REMOVED:
CacheEntryRemovedEvent removedEvent = (CacheEntryRemovedEvent) event;
return !event.isPre() && removedEvent.getOldValue() != null;
return !event.isPre();
case CACHE_ENTRY_EXPIRED:
return true;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void testCustomEvents(Method m) {
eventListener.expectNoEvents(Optional.empty());
byte[] key = k(m);
client().remove(key);
eventListener.expectNoEvents(Optional.empty());
eventListener.expectSingleCustomEvent(cache, addLengthPrefix(key));
byte[] value = v(m);
client().put(key, 0, 0, value);
eventListener.expectSingleCustomEvent(cache, addLengthPrefix(key, value));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testRemovedEvent(Method m) {
eventListener.expectNoEvents(Optional.empty());
byte[] key = k(m);
client().remove(key);
eventListener.expectNoEvents(Optional.empty());
eventListener.expectOnlyRemovedEvent(cache, key);
client().put(key, 0, 0, v(m));
eventListener.expectOnlyCreatedEvent(cache, key);
client().remove(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testRemovedEvent(ProtocolVersion protocolVersion) {
new EventLogListener<>(remoteCache(protocolVersion)).accept((l, remote) -> {
l.expectNoEvents();
remote.remove(1);
l.expectNoEvents();
l.expectOnlyRemovedEvent(1);
remote.put(1, "one");
l.expectOnlyCreatedEvent(1);
remote.remove(1);
Expand Down

0 comments on commit b5fa8d5

Please sign in to comment.