Skip to content

Commit

Permalink
ISPN-5607 Preemptively invalidate near cache after writes
Browse files Browse the repository at this point in the history
* Near cache stale reads can't be fully guaranteed since there's always
  the possibility of a read to come in when the server has already
  executed a write operation but it's in process of sending back the
  response.
* However, we can't at least guarantee that within a single
  thread, a read after a write will read the written value. The change
  proposed in this PR addresses this, by preemptively invalidating data
  after a successful write.
  • Loading branch information
galderz authored and gustavonalle committed Aug 21, 2015
1 parent a73d0f5 commit c1d600b
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 24 deletions.
Expand Up @@ -17,7 +17,8 @@
import org.infinispan.client.hotrod.event.ClientListenerNotifier;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;
import org.infinispan.client.hotrod.impl.NearRemoteCache;
import org.infinispan.client.hotrod.impl.EagerNearRemoteCache;
import org.infinispan.client.hotrod.impl.InvalidatedNearRemoteCache;
import org.infinispan.client.hotrod.impl.RemoteCacheImpl;
import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
import org.infinispan.client.hotrod.impl.operations.PingOperation.PingResult;
Expand Down Expand Up @@ -645,11 +646,17 @@ private <K, V> RemoteCache<K, V> createRemoteCache(String cacheName, Boolean for
}

private <K, V> RemoteCacheImpl<K, V> createRemoteCache(String cacheName) {
if (configuration.nearCache().mode().enabled()) {
NearCacheService<K, V> srv = createNearCacheService(configuration.nearCache());
return new NearRemoteCache<K, V>(this, cacheName, srv);
} else {
return new RemoteCacheImpl<K, V>(this, cacheName);
switch (configuration.nearCache().mode()) {
case INVALIDATED:
case LAZY:
return new InvalidatedNearRemoteCache<>(this, cacheName,
createNearCacheService(configuration.nearCache()));
case EAGER:
return new EagerNearRemoteCache<>(this, cacheName,
createNearCacheService(configuration.nearCache()));
case DISABLED:
default:
return new RemoteCacheImpl<>(this, cacheName);
}
}

Expand Down
Expand Up @@ -11,11 +11,12 @@
* @param <K>
* @param <V>
*/
public class NearRemoteCache<K, V> extends RemoteCacheImpl<K, V> {
@Deprecated
public class EagerNearRemoteCache<K, V> extends RemoteCacheImpl<K, V> {

private final NearCacheService<K, V> nearcache;

public NearRemoteCache(RemoteCacheManager rcm, String name, NearCacheService<K, V> nearcache) {
public EagerNearRemoteCache(RemoteCacheManager rcm, String name, NearCacheService<K, V> nearcache) {
super(rcm, name);
this.nearcache = nearcache;
}
Expand Down
@@ -0,0 +1,111 @@
package org.infinispan.client.hotrod.impl;

import org.infinispan.client.hotrod.Flag;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.VersionedValue;
import org.infinispan.client.hotrod.near.NearCacheService;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Near {@link org.infinispan.client.hotrod.RemoteCache} implementation
* enabling
*
* @param <K>
* @param <V>
*/
public class InvalidatedNearRemoteCache<K, V> extends RemoteCacheImpl<K, V> {

private final NearCacheService<K, V> nearcache;

public InvalidatedNearRemoteCache(RemoteCacheManager rcm, String name, NearCacheService<K, V> nearcache) {
super(rcm, name);
this.nearcache = nearcache;
}

@Override
public V get(Object key) {
VersionedValue<V> versioned = getVersioned((K) key);
return versioned != null ? versioned.getValue() : null;
}

@Override
public VersionedValue<V> getVersioned(K key) {
VersionedValue<V> nearValue = nearcache.get(key);
if (nearValue == null) {
VersionedValue<V> remoteValue = super.getVersioned(key);
if (remoteValue != null)
nearcache.putIfAbsent(key, remoteValue);

return remoteValue;
}

return nearValue;
}

@Override
public V put(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
V ret = super.put(key, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
nearcache.remove(key); // Eager invalidation to avoid race
return ret;
}

@Override
public void putAll(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
super.putAll(map, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
map.keySet().forEach(nearcache::remove);
}

@Override
public V replace(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit) {
boolean hasForceReturnValue = operationsFactory.hasFlag(Flag.FORCE_RETURN_VALUE);
V prev = super.replace(key, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
invalidateNearCacheIfNeeded(hasForceReturnValue, key, prev);
return prev;
}

@Override
public boolean replaceWithVersion(K key, V newValue, long version, long lifespan, TimeUnit lifespanTimeUnit, long maxIdle, TimeUnit maxIdleTimeUnit) {
boolean replaced = super.replaceWithVersion(key, newValue, version, lifespan, lifespanTimeUnit, maxIdle, maxIdleTimeUnit);
if (replaced) nearcache.remove(key);
return replaced;
}

@Override
public V remove(Object key) {
boolean hasForceReturnValue = operationsFactory.hasFlag(Flag.FORCE_RETURN_VALUE);
V prev = super.remove(key);
invalidateNearCacheIfNeeded(hasForceReturnValue, key, prev);
return prev;
}

@Override
public boolean removeWithVersion(K key, long version) {
boolean removed = super.removeWithVersion(key, version);
if (removed) nearcache.remove(key); // Eager invalidation to avoid race
return removed;
}

@Override
public void clear() {
super.clear();
nearcache.clear(); // Clear near cache too
}

@SuppressWarnings("unchecked")
void invalidateNearCacheIfNeeded(boolean hasForceReturnValue, Object key, Object prev) {
if (!hasForceReturnValue || prev != null)
nearcache.remove((K) key);
}

@Override
public void start() {
nearcache.start(this);
}

@Override
public void stop() {
nearcache.stop(this);
}
}
Expand Up @@ -48,7 +48,7 @@ public class RemoteCacheImpl<K, V> extends RemoteCacheSupport<K, V> {
private final String name;
private final RemoteCacheManager remoteCacheManager;
private volatile ExecutorService executorService;
private OperationsFactory operationsFactory;
protected OperationsFactory operationsFactory;
private int estimateKeySize;
private int estimateValueSize;

Expand Down
Expand Up @@ -236,6 +236,11 @@ public void addFlags(Flag... flags) {
list.add(flag);
}

public boolean hasFlag(Flag flag) {
List<Flag> list = this.flagsMap.get();
return list != null && list.contains(flag);
}

public CacheTopologyInfo getCacheTopologyInfo() {
return transportFactory.getCacheTopologyInfo(cacheNameBytes);
}
Expand Down
Expand Up @@ -6,6 +6,7 @@
import org.infinispan.client.hotrod.VersionedValue;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.configuration.NearCacheConfiguration;
import org.infinispan.client.hotrod.configuration.NearCacheMode;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand All @@ -21,12 +22,14 @@ class AssertsNearCache<K, V> {
final Cache<byte[], ?> server;
final BlockingQueue<MockEvent> events;
final RemoteCacheManager manager;
final NearCacheMode nearCacheMode;

private AssertsNearCache(RemoteCacheManager manager, Cache<byte[], ?> server, BlockingQueue<MockEvent> events) {
this.manager = manager;
this.remote = manager.getCache();
this.server = server;
this.events = events;
this.nearCacheMode = manager.getConfiguration().nearCache().mode();
}

static <K, V> AssertsNearCache<K, V> create(Cache<byte[], ?> server, ConfigurationBuilder builder) {
Expand All @@ -41,11 +44,6 @@ protected NearCacheService<K, V> createNearCacheService(NearCacheConfiguration c
return new AssertsNearCache<>(manager, server, events);
}

static <K, V> AssertsNearCache<K, V> create(AssertsNearCache<K, V> client) {
final BlockingQueue<MockEvent> events = new ArrayBlockingQueue<>(128);
return new AssertsNearCache<>(client.manager, client.server, events);
}

AssertsNearCache<K, V> get(K key, V expected) {
assertEquals(expected, remote.get(key));
return this;
Expand Down Expand Up @@ -116,9 +114,9 @@ AssertsNearCache<K, V> expectNearPutIfAbsent(K key, V value) {

@SafeVarargs
final AssertsNearCache<K, V> expectNearRemove(K key, AssertsNearCache<K, V>... affected) {
expectNearRemoveInClient(this, key);
expectLocalNearRemoveInClient(this, key);
for (AssertsNearCache<K, V> client : affected)
expectNearRemoveInClient(client, key);
expectRemoteNearRemoveInClient(client, key);

return this;
}
Expand All @@ -141,9 +139,21 @@ void stop() {
killRemoteCacheManager(manager);
}

private static <K, V> void expectNearRemoveInClient(AssertsNearCache<K, V> client, K key) {
MockRemoveEvent remove = pollEvent(client.events);
assertEquals(key, remove.key);
private static <K, V> void expectLocalNearRemoveInClient(AssertsNearCache<K, V> client, K key) {
if (client.nearCacheMode.invalidated()) {
// Preemptive remove
MockRemoveEvent preemptiveRemove = pollEvent(client.events);
assertEquals(key, preemptiveRemove.key);
}
// Remote event remove
MockRemoveEvent remoteRemove = pollEvent(client.events);
assertEquals(key, remoteRemove.key);
}

private static <K, V> void expectRemoteNearRemoveInClient(AssertsNearCache<K, V> client, K key) {
// Remote event remove
MockRemoveEvent remoteRemove = pollEvent(client.events);
assertEquals(key, remoteRemove.key);
}

private static <E extends MockEvent> E pollEvent(BlockingQueue<MockEvent> events) {
Expand Down
@@ -0,0 +1,109 @@
package org.infinispan.client.hotrod.near;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.VersionedValue;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.configuration.NearCacheMode;
import org.infinispan.client.hotrod.test.SingleHotRodServerTest;
import org.infinispan.commons.util.concurrent.NotifyingFuture;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.IntStream;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNull;

@Test(groups = "functional", testName = "client.hotrod.near.AvoidStaleNearCacheReadsTest")
public class AvoidStaleNearCacheReadsTest extends SingleHotRodServerTest {

@AfterMethod(alwaysRun=true)
@Override
protected void clearContent() {
super.clearContent();
remoteCacheManager.getCache().clear(); // Clear the near cache too
}

@Override
protected RemoteCacheManager getRemoteCacheManager() {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.addServer().host("127.0.0.1").port(hotrodServer.getPort());
builder.nearCache().mode(NearCacheMode.INVALIDATED).maxEntries(-1);
return new RemoteCacheManager(builder.build());
}

public void testAvoidStaleReadsAfterPutRemove() {
repeated((i, remote) -> {
String value = "v" + i;
remote.put(1, value);
assertEquals(value, remote.get(1));
remote.remove(1);
assertNull(remote.get(1));
});
}

public void testAvoidStaleReadsAfterPutAll() {
repeated((i, remote) -> {
String value = "v" + i;
Map<Integer, String> map = new HashMap<>();
map.put(1, value);
remote.putAll(map);
assertEquals(value, remote.get(1));
});
}

public void testAvoidStaleReadsAfterReplace() {
repeated((i, remote) -> {
String value = "v" + i;
remote.replace(1, value);
VersionedValue<String> versioned = remote.getVersioned(1);
assertEquals(value, versioned.getValue());
});
}

public void testAvoidStaleReadsAfterReplaceWithVersion() {
repeated((i, remote) -> {
String value = "v" + i;
VersionedValue<String> versioned = remote.getVersioned(1);
remote.replaceWithVersion(1, value, versioned.getVersion());
assertEquals(value, remote.get(1));
});
}

public void testAvoidStaleReadsAfterPutAsyncRemoveVersioned() {
repeated((i, remote) -> {
String value = "v" + i;
await(remote.putAsync(1, value));
VersionedValue<String> versioned = remote.getVersioned(1);
assertEquals(value, versioned.getValue());
remote.removeWithVersion(1, versioned.getVersion());
assertNull(remote.get(1));
});
}

private void repeated(BiConsumer<Integer, RemoteCache<Integer, String>> c) {
RemoteCache<Integer, String> remote = remoteCacheManager.getCache();
remote.putIfAbsent(1, "v0");
IntStream.range(1, 1000).forEach(i -> {
c.accept(i, remote);
});
}

static <T> T await(Future<T> f) {
try {
return f.get(10000, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e ) {
throw new AssertionError(e);
}
}

}

0 comments on commit c1d600b

Please sign in to comment.