Skip to content

Commit

Permalink
ISPN-13612 InvalidatedNearRemoteCache can persist stale values in cas…
Browse files Browse the repository at this point in the history
…e of concurrent access
  • Loading branch information
wburns committed Aug 9, 2023
1 parent d0690f6 commit 03acc1c
Show file tree
Hide file tree
Showing 21 changed files with 331 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,20 @@ private int getCurrentVersion() {
@Override
public CompletableFuture<MetadataValue<V>> getWithMetadataAsync(K key) {
MetadataValue<V> nearValue = nearcache.get(key);
if (nearValue == null) {
// We rely upon the fact that a MetadataValue will never be null from a remote lookup but our placeholder will be null
if (nearValue == null || nearValue.getValue() == null) {
clientStatistics.incrementNearCacheMisses();
// Note that MetadataValueImpl does not implement equals method so we rely upon object identity in the replace below
// We cannot cache this value as we could have 2 concurrent gets and an update and we could cache a previous one
MetadataValue<V> calculatingPlaceholder = new MetadataValueImpl<>(-1, -1, -1, -1, -1, null);
boolean cache = nearcache.putIfAbsent(key, calculatingPlaceholder);
int prevVersion = getCurrentVersion();
RetryAwareCompletionStage<MetadataValue<V>> remoteValue = super.getWithMetadataAsync(key, listenerAddress);
if (!cache) {
return remoteValue.toCompletableFuture();
}
return remoteValue.thenApply(v -> {
boolean shouldRemove = true;
// We cannot cache the value if a retry was required - which means we did not talk to the listener node
if (v != null) {
// If previous version is odd we can't cache as that means it was started during
Expand All @@ -97,12 +106,16 @@ else if (listenerAddress != null && remoteValue.wasRetried()) {
log.tracef("Unable to cache returned value for key %s as operation was retried", key);
}
} else {
nearcache.putIfAbsent(key, v);
nearcache.replace(key, calculatingPlaceholder, v);
if (v.getMaxIdle() > 0) {
HOTROD.nearCacheMaxIdleUnsupported();
}
shouldRemove = false;
}
}
if (shouldRemove) {
nearcache.remove(key, calculatingPlaceholder);
}
return v;
}).toCompletableFuture();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,25 @@ public static <K, V> NearCache<K, V> create(final NearCacheConfiguration config,
}

@Override
public void put(K key, MetadataValue<V> value) {
cache.put(key, value);
public boolean putIfAbsent(K key, MetadataValue<V> value) {
return map.putIfAbsent(key, value) == null;
}

@Override
public void putIfAbsent(K key, MetadataValue<V> value) {
map.putIfAbsent(key, value);
public boolean replace(K key, MetadataValue<V> prevValue, MetadataValue<V> newValue) {
return map.replace(key, prevValue, newValue);
}

@Override
public boolean remove(K key) {
return map.remove(key) != null;
}

@Override
public boolean remove(K key, MetadataValue<V> value) {
return map.remove(key, value);
}

@Override
public MetadataValue<V> get(K key) {
return map.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@ final class ConcurrentMapNearCache<K, V> implements NearCache<K, V> {
private final ConcurrentMap<K, MetadataValue<V>> cache = new ConcurrentHashMap<>();

@Override
public void put(K key, MetadataValue<V> value) {
cache.put(key, value);
public boolean putIfAbsent(K key, MetadataValue<V> value) {
return cache.putIfAbsent(key, value) == null;
}

@Override
public void putIfAbsent(K key, MetadataValue<V> value) {
cache.putIfAbsent(key, value);
public boolean replace(K key, MetadataValue<V> prevValue, MetadataValue<V> newValue) {
return cache.replace(key, prevValue, newValue);
}

@Override
public boolean remove(K key) {
return cache.remove(key) != null;
}

@Override
public boolean remove(K key, MetadataValue<V> value) {
return cache.remove(key, value);
}

@Override
public MetadataValue<V> get(Object key) {
return cache.get(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
* @since 7.1
*/
public interface NearCache<K, V> extends Iterable<Map.Entry<K, MetadataValue<V>>> {
void put(K key, MetadataValue<V> value);
void putIfAbsent(K key, MetadataValue<V> value);
boolean putIfAbsent(K key, MetadataValue<V> value);
boolean replace(K key, MetadataValue<V> prevValue, MetadataValue<V> newValue);
boolean remove(K key);

// Removes a specific value from the near cache, note this method does not count towards and invalidation
boolean remove(K key, MetadataValue<V> value);
MetadataValue<V> get(K key);
void clear();
int size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,23 @@ public static <K, V> NearCacheService<K, V> create(
}

@Override
public void put(K key, MetadataValue<V> value) {
cache.put(key, value);

if (log.isTraceEnabled())
log.tracef("Put key=%s and value=%s in near cache (listenerId=%s)",
key, value, Util.printArray(listenerId));
public boolean replace(K key, MetadataValue<V> prevValue, MetadataValue<V> newValue) {
boolean replaced = cache.replace(key, prevValue, newValue);
if (log.isTraceEnabled()) {
log.tracef("Replaced key=%s and value=%s with new value=%s in near cache (listenerId=%s): %s",
key, prevValue, newValue, Util.printArray(listenerId), replaced);
}
return replaced;
}

@Override
public void putIfAbsent(K key, MetadataValue<V> value) {
cache.putIfAbsent(key, value);
public boolean putIfAbsent(K key, MetadataValue<V> value) {
boolean inserted = cache.putIfAbsent(key, value);

if (log.isTraceEnabled())
log.tracef("Conditionally put key=%s and value=%s if absent in near cache (listenerId=%s)",
key, value, Util.printArray(listenerId));
log.tracef("Conditionally put %s if absent in near cache (listenerId=%s): %s", value,
Util.printArray(listenerId), inserted);
return inserted;
}

@Override
Expand All @@ -152,6 +154,14 @@ public boolean remove(K key) {
return removed;
}

@Override
public boolean remove(K key, MetadataValue<V> value) {
boolean removed = cache.remove(key, value);
if (log.isTraceEnabled())
log.tracef("Removed value=%s for key=%s from near cache (listenedId=%s): %s", value, key, Util.printArray(listenerId), removed);
return removed;
}

@Override
public MetadataValue<V> get(K key) {
boolean listenerConnected = isConnected();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static org.infinispan.client.hotrod.near.MockNearCacheService.MockClearEvent;
import static org.infinispan.client.hotrod.near.MockNearCacheService.MockEvent;
import static org.infinispan.client.hotrod.near.MockNearCacheService.MockGetEvent;
import static org.infinispan.client.hotrod.near.MockNearCacheService.MockPutEvent;
import static org.infinispan.client.hotrod.near.MockNearCacheService.MockPutIfAbsentEvent;
import static org.infinispan.client.hotrod.near.MockNearCacheService.MockRemoveEvent;
import static org.infinispan.client.hotrod.test.HotRodClientTestingUtil.entryVersion;
Expand Down Expand Up @@ -35,17 +34,21 @@ class AssertsNearCache<K, V> {
final NearCacheMode nearCacheMode;
final AtomicReference<NearCacheService<K, V>> nearCacheService;

private AssertsNearCache(RemoteCacheManager manager, Cache<byte[], ?> server, BlockingQueue<MockEvent> events,
AtomicReference<NearCacheService<K, V>> nearCacheService) {
private AssertsNearCache(RemoteCacheManager manager, String cacheName, Cache<byte[], ?> server,
BlockingQueue<MockEvent> events, AtomicReference<NearCacheService<K, V>> nearCacheService) {
this.manager = manager;
this.remote = (InternalRemoteCache<K, V>) manager.getCache();
this.remote = (InternalRemoteCache<K, V>) (cacheName == null ? manager.getCache() : manager.getCache(cacheName));
this.server = server;
this.events = events;
this.nearCacheMode = manager.getConfiguration().nearCache().mode();
this.nearCacheService = nearCacheService;
}

static <K, V> AssertsNearCache<K, V> create(Cache<byte[], ?> server, ConfigurationBuilder builder) {
return create(server, null, builder);
}

static <K, V> AssertsNearCache<K, V> create(Cache<byte[], ?> server, String cacheName, ConfigurationBuilder builder) {
final BlockingQueue<MockEvent> events = new ArrayBlockingQueue<>(128);
AtomicReference<NearCacheService<K, V>> nearCacheServiceRef = new AtomicReference<>();
RemoteCacheManager manager = new RemoteCacheManager(builder.build()) {
Expand All @@ -57,7 +60,7 @@ protected <KK, VV> NearCacheService<KK, VV> createNearCacheService(String cacheN
}
};

return new AssertsNearCache<K, V>(manager, server, events, nearCacheServiceRef);
return new AssertsNearCache<K, V>(manager, cacheName, server, events, nearCacheServiceRef);
}

AssertsNearCache<K, V> get(K key, V expected) {
Expand Down Expand Up @@ -137,25 +140,27 @@ AssertsNearCache<K, V> expectNearGetValue(K key, V value) {
return this;
}

AssertsNearCache<K, V> expectNearGetNull(K key) {
AssertsNearCache<K, V> expectNearGetMiss(K key) {
MockGetEvent get = assertGetKey(key);
assertNull(get.value);
expectNearPutIfAbsent(key, null);
expectNearPreemptiveRemove(key);
return this;
}

@SafeVarargs
final AssertsNearCache<K, V> expectNearPut(K key, V value, AssertsNearCache<K, V>... affected) {
expectNearPutInClient(this, key, value);
for (AssertsNearCache<K, V> client : affected)
expectNearPutInClient(client, key, value);

AssertsNearCache<K, V> expectNearGetMissWithValue(K key, V value) {
MockGetEvent get = assertGetKey(key);
assertNull(get.value);
expectNearPutIfAbsent(key, null);
expectNearReplaceEvent(this, key, null, value);
return this;
}

private static <K, V> void expectNearPutInClient(AssertsNearCache<K, V> client, K key, V value) {
MockPutEvent put = pollEvent(client.events);
private static <K, V> void expectNearReplaceEvent(AssertsNearCache<K, V> client, K key, V prevValue, V newValue) {
MockNearCacheService.MockReplaceEvent<K, V> put = pollEvent(client.events);
assertEquals(key, put.key);
assertEquals(value, put.value.getValue());
assertEquals(prevValue, put.prevValue.getValue());
assertEquals(newValue, put.value.getValue());
}

AssertsNearCache<K, V> expectNearPutIfAbsent(K key, V value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
@Test(groups = "functional", testName = "client.hotrod.near.ClusterInvalidatedNearCacheBloomTest")
public class ClusterInvalidatedNearCacheBloomTest extends MultiHotRodServersTest {
private static final int CLUSTER_MEMBERS = 3;
private static final int NEAR_CACHE_SIZE = 4;
// This has to be 16 due to how NearCache bloom filter is calculated by dividing by 16 to require
// one update + 3 before it will send the updated bloom filter
private static final int NEAR_CACHE_SIZE = 16;

List<AssertsNearCache<Integer, String>> assertClients = new ArrayList<>(CLUSTER_MEMBERS);

Expand Down Expand Up @@ -56,7 +58,7 @@ void afterMethod() {
protected void destroy() {
for (AssertsNearCache<Integer, String> assertsNearCache : assertClients) {
try {
assertsNearCache.expectNoNearEvents(50, TimeUnit.MILLISECONDS);
assertsNearCache.expectNoNearEvents(500, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
Expand Down Expand Up @@ -97,19 +99,18 @@ private <K, V> AssertsNearCache<K, V> createAssertClient() {
public void testInvalidationFromOtherClientModification() {
int key = 0;

client1.get(key, null).expectNearGetNull(key);
client2.get(key, null).expectNearGetNull(key);
client1.get(key, null).expectNearGetMiss(key);
client2.get(key, null).expectNearGetMiss(key);

String value = "v1";
client1.put(key, value).expectNearPreemptiveRemove(key);
client2.expectNoNearEvents();

client2.get(key, value).expectNearGetNull(key).expectNearPutIfAbsent(key, value);
client2.get(key, value).expectNearGetMissWithValue(key, value);
client2.get(key, value).expectNearGetValue(key, value);

// Client 1 should only get a preemptive remove as it never cached the value locally
// However client 2 should get a remove as it had it cached
client1.remove(key).expectNearPreemptiveRemove(key, client2);
// Both client1 and client2 have remote removes due to having to add a null get to the bloom filter
// to guarantee consistency in ISPN-13612
client1.remove(key).expectNearRemove(key, client2);
}

public void testClientsBothCachedAndCanUpdate() {
Expand All @@ -118,8 +119,8 @@ public void testClientsBothCachedAndCanUpdate() {

client1.put(key, value).expectNearPreemptiveRemove(key);

client1.get(key, value).expectNearGetNull(key).expectNearPutIfAbsent(key, value);
client2.get(key, value).expectNearGetNull(key).expectNearPutIfAbsent(key, value);
client1.get(key, value).expectNearGetMissWithValue(key, value);
client2.get(key, value).expectNearGetMissWithValue(key, value);

String value2 = "v2";
client1.put(key, value2).expectNearRemove(key, client2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ protected NearCacheMode getNearCacheMode() {

protected void expectNearCacheUpdates(AssertsNearCache<Integer, String> producer,
Integer key, AssertsNearCache<Integer, String> consumer) {
producer.get(key, null).expectNearGetNull(key);
producer.get(key, null).expectNearGetMiss(key);
producer.put(key, "v1").expectNearPreemptiveRemove(key);
producer.get(key, "v1").expectNearGetNull(key).expectNearPutIfAbsent(key, "v1");
producer.get(key, "v1").expectNearGetMissWithValue(key, "v1");
producer.put(key, "v2").expectNearRemove(key, consumer);
producer.get(key, "v2").expectNearGetNull(key).expectNearPutIfAbsent(key, "v2");
producer.get(key, "v2").expectNearGetMissWithValue(key, "v2");
producer.remove(key).expectNearRemove(key, consumer);
producer.get(key, null).expectNearGetNull(key);
producer.get(key, null).expectNearGetMiss(key);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ public void testEvictAfterReachingMax() {
assertClient.expectNoNearEvents();
for (int i = 0; i < entryCount; ++i) {
assertClient.put(i, "v1").expectNearPreemptiveRemove(i);
assertClient.get(i, "v1").expectNearGetNull(i).expectNearPutIfAbsent(i, "v1");
assertClient.get(i, "v1").expectNearGetMissWithValue(i, "v1");
}

int extraKey = entryCount + 1;
assertClient.put(extraKey, "v1").expectNearPreemptiveRemove(extraKey);
assertClient.get(extraKey, "v1").expectNearGetNull(extraKey).expectNearPutIfAbsent(extraKey, "v1");
assertClient.get(extraKey, "v1").expectNearGetMissWithValue(extraKey, "v1");

// Caffeine is not deterministic as to which one it evicts - so we just verify size
assertEquals(entryCount, assertClient.nearCacheSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public void testNearCacheClearedUponFailover() {
AssertsNearCache<Integer, String> stickyClient = createStickyAssertClient();
try {
stickyClient.put(1, "v1").expectNearPreemptiveRemove(1);
stickyClient.get(1, "v1").expectNearGetNull(1).expectNearPutIfAbsent(1, "v1");
stickyClient.get(1, "v1").expectNearGetMissWithValue(1, "v1");
stickyClient.put(2, "v1").expectNearPreemptiveRemove(2);
stickyClient.get(2, "v1").expectNearGetNull(2).expectNearPutIfAbsent(2, "v1");
stickyClient.get(2, "v1").expectNearGetMissWithValue(2, "v1");
stickyClient.put(3, "v1").expectNearPreemptiveRemove(3);
stickyClient.get(3, "v1").expectNearGetNull(3).expectNearPutIfAbsent(3, "v1");
stickyClient.get(3, "v1").expectNearGetMissWithValue(3, "v1");
boolean headClientClear = isClientListenerAttachedToSameServer(headClient(), stickyClient);
boolean tailClientClear = isClientListenerAttachedToSameServer(tailClient(), stickyClient);
killServerForClient(stickyClient);
Expand All @@ -83,20 +83,19 @@ public void testNearCacheClearedUponFailover() {
eventually(stickyClient::isNearCacheConnected);

stickyClient.get(1, "v1")
.expectNearGetNull(1)
.expectNearPutIfAbsent(1, "v1");
.expectNearGetMissWithValue(1, "v1");
stickyClient.expectNoNearEvents();
if (headClientClear) {
headClient().expectNearClear();
}
eventually(() -> headClient().isNearCacheConnected());
headClient().get(2, "v1").expectNearGetNull(2).expectNearPutIfAbsent(2, "v1");
headClient().get(2, "v1").expectNearGetMissWithValue(2, "v1");
headClient().expectNoNearEvents();
if (tailClientClear) {
tailClient().expectNearClear();
}
eventually(() -> tailClient().isNearCacheConnected());
tailClient().get(3, "v1").expectNearGetNull(3).expectNearPutIfAbsent(3, "v1");
tailClient().get(3, "v1").expectNearGetMissWithValue(3, "v1");
tailClient().expectNoNearEvents();
} finally {
stickyClient.stop();
Expand Down

0 comments on commit 03acc1c

Please sign in to comment.