Skip to content

Commit

Permalink
ISPN-9511 Expired event is not raised when modifying an expired entry
Browse files Browse the repository at this point in the history
  • Loading branch information
wburns committed Sep 20, 2018
1 parent 8849a79 commit aceaecd
Show file tree
Hide file tree
Showing 17 changed files with 110 additions and 50 deletions.
Expand Up @@ -159,7 +159,7 @@ AssertsNearCache<K, V> expectNearPutIfAbsent(K key, V value) {
return this;
}

public AssertsNearCache<K, V> expectNearPreemptiveRemove(K key, AssertsNearCache<K, V>... affected) {
public AssertsNearCache<K, V> expectNearPreemptiveRemove(K key) {
// Preemptive remove
MockRemoveEvent preemptiveRemove = pollEvent(events);
assertEquals(key, preemptiveRemove.key);
Expand Down
Expand Up @@ -69,7 +69,7 @@ protected NearCacheMode getNearCacheMode() {
protected void expectNearCacheUpdates(AssertsNearCache<Integer, String> producer,
Integer key, AssertsNearCache<Integer, String> consumer) {
producer.get(key, null).expectNearGetNull(key);
producer.put(key, "v1").expectNearPreemptiveRemove(key, consumer);
producer.put(key, "v1").expectNearPreemptiveRemove(key);
producer.get(key, "v1").expectNearGetNull(key).expectNearPutIfAbsent(key, "v1");
producer.put(key, "v2").expectNearRemove(key, consumer);
producer.get(key, "v2").expectNearGetNull(key).expectNearPutIfAbsent(key, "v2");
Expand Down
Expand Up @@ -55,11 +55,11 @@ protected NearCacheMode getNearCacheMode() {
public void testNearCacheClearedUponFailover() {
AssertsNearCache<Integer, String> stickyClient = createStickyAssertClient();
try {
stickyClient.put(1, "v1").expectNearPreemptiveRemove(1, headClient(), tailClient());
stickyClient.put(1, "v1").expectNearPreemptiveRemove(1);
stickyClient.get(1, "v1").expectNearGetNull(1).expectNearPutIfAbsent(1, "v1");
stickyClient.put(2, "v1").expectNearPreemptiveRemove(2, headClient(), tailClient());
stickyClient.put(2, "v1").expectNearPreemptiveRemove(2);
stickyClient.get(2, "v1").expectNearGetNull(2).expectNearPutIfAbsent(2, "v1");
stickyClient.put(3, "v1").expectNearPreemptiveRemove(3, headClient(), tailClient());
stickyClient.put(3, "v1").expectNearPreemptiveRemove(3);
stickyClient.get(3, "v1").expectNearGetNull(3).expectNearPutIfAbsent(3, "v1");
findServerAndKill(stickyClient.manager, servers, cacheManagers);
// The clear will be executed when the connection to the server is closed from the listener.
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/infinispan/cache/impl/CacheImpl.java
Expand Up @@ -638,8 +638,13 @@ private RemoveCommand createRemoveCommand(Object key, long explicitFlags) {

@Override
public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan) {
return removeLifespanExpired(key, value, lifespan, EnumUtil.EMPTY_BIT_SET);
}

final CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan, long explicitFlags) {
RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value, keyPartitioner.getSegment(key),
lifespan);
command.setFlagsBitSet(explicitFlags);
// Remove expired returns a boolean - just ignore it, the caller just needs to know that the expired
// entry is removed when this completes
CompletableFuture<Boolean> completableFuture = performRemoveExpiredCommand(command);
Expand All @@ -648,7 +653,12 @@ public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifesp

@Override
public CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value) {
return removeMaxIdleExpired(key, value, EnumUtil.EMPTY_BIT_SET);
}

final CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value, long explicitFlags) {
RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value, keyPartitioner.getSegment(key));
command.setFlagsBitSet(explicitFlags);
return performRemoveExpiredCommand(command);
}

Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/infinispan/cache/impl/DecoratedCache.java
Expand Up @@ -432,6 +432,16 @@ public CompletableFuture<Boolean> removeAsync(Object key, Object value) {
return cacheImplementation.removeAsync(key, value, flags, contextBuilder);
}

@Override
public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan) {
return cacheImplementation.removeLifespanExpired(key, value, lifespan, flags);
}

@Override
public CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value) {
return cacheImplementation.removeMaxIdleExpired(key, value, flags);
}

@Override
public CompletableFuture<V> replaceAsync(K key, V value) {
return replaceAsync(key, value, cacheImplementation.defaultMetadata);
Expand Down
Expand Up @@ -16,7 +16,7 @@
*/
public abstract class AbstractDataCommand implements DataCommand, SegmentSpecificCommand {
protected Object key;
private long flags;
protected long flags;
// These 2 ints have to stay next to each other to ensure they are aligned together
private int topologyId = -1;
protected int segment;
Expand Down
Expand Up @@ -140,6 +140,8 @@ public void writeTo(ObjectOutput output) throws IOException {
output.writeBoolean(false);
}
output.writeBoolean(maxIdle);
output.writeLong(FlagBitSets.copyWithoutRemotableFlags(getFlagsBitSet()));

}

@Override
Expand All @@ -155,6 +157,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti
lifespan = null;
}
maxIdle = input.readBoolean();
setFlagsBitSet(input.readLong());
}

@Override
Expand All @@ -173,8 +176,8 @@ public int hashCode() {

@Override
public long getFlagsBitSet() {
// Override the flags
return FlagBitSets.SKIP_CACHE_LOAD;
// Override the flags to always include skip cache load
return flags | FlagBitSets.SKIP_CACHE_LOAD;
}

/**
Expand Down
Expand Up @@ -80,7 +80,7 @@ public InternalCacheEntry<K, V> get(int segment, Object k) {
if (e != null && e.canExpire()) {
long currentTimeMillis = timeService.wallClockTime();
if (e.isExpired(currentTimeMillis) &&
expirationManager.entryExpiredInMemory(e, currentTimeMillis).join() == Boolean.TRUE) {
expirationManager.entryExpiredInMemory(e, currentTimeMillis, false).join() == Boolean.TRUE) {
e = null;
} else {
e.touch(currentTimeMillis);
Expand Down Expand Up @@ -159,7 +159,7 @@ public boolean containsKey(int segment, Object k) {
if (ice != null && ice.canExpire()) {
long currentTimeMillis = timeService.wallClockTime();
if (ice.isExpired(currentTimeMillis)) {
if (expirationManager.entryExpiredInMemory(ice, currentTimeMillis).join() == Boolean.TRUE) {
if (expirationManager.entryExpiredInMemory(ice, currentTimeMillis, false).join() == Boolean.TRUE) {
ice = null;
}
}
Expand Down
Expand Up @@ -2,6 +2,7 @@

import static org.infinispan.commons.util.Util.toStr;

import org.infinispan.commons.time.TimeService;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.Configurations;
import org.infinispan.container.entries.CacheEntry;
Expand All @@ -13,11 +14,12 @@
import org.infinispan.container.entries.VersionedRepeatableReadEntry;
import org.infinispan.container.versioning.VersionGenerator;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.expiration.impl.InternalExpirationManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.metadata.EmbeddedMetadata;
import org.infinispan.metadata.Metadata;
import org.infinispan.commons.time.TimeService;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
Expand All @@ -37,6 +39,8 @@ public class EntryFactoryImpl implements EntryFactory {
@Inject private Configuration configuration;
@Inject private TimeService timeService;
@Inject private VersionGenerator versionGenerator;
@Inject private DistributionManager distributionManager;
@Inject private InternalExpirationManager expirationManager;

private boolean isL1Enabled;
private boolean useRepeatableRead;
Expand Down Expand Up @@ -94,7 +98,7 @@ public void wrapEntryForWriting(InvocationContext ctx, Object key, int segment,
log.tracef("Updated context entry %s -> %s", contextEntry, mvccEntry);
} else {
// Not in the context yet.
CacheEntry cacheEntry = getFromContainer(key, segment, isOwner, true);
CacheEntry cacheEntry = getFromContainerForWrite(key, segment, isOwner);
if (cacheEntry == null) {
return;
}
Expand Down Expand Up @@ -125,7 +129,7 @@ public void wrapEntryForExpired(InvocationContext ctx, Object key, int segment,
log.tracef("Updated context entry %s -> %s", contextEntry, mvccEntry);
} else {
// Not in the context yet.
CacheEntry cacheEntry = innerGetFromContainer(key, segment, true, true);
CacheEntry cacheEntry = innerGetFromContainer(key, segment, true, false);
if (cacheEntry == null) {
cacheEntry = NullCacheEntry.getInstance();
}
Expand Down Expand Up @@ -195,17 +199,25 @@ private CacheEntry getFromContext(InvocationContext ctx, Object key) {
return cacheEntry;
}

private CacheEntry getFromContainer(Object key, int segment, boolean isOwner, boolean writeOperation) {
private boolean isPrimaryOwner(boolean isOwner, int segment) {
if (distributionManager != null) {
return distributionManager.getCacheTopology().getSegmentDistribution(segment).isPrimary();
} else {
return isOwner;
}
}

private CacheEntry getFromContainerForWrite(Object key, int segment, boolean isOwner) {
if (isOwner) {
final InternalCacheEntry ice = innerGetFromContainer(key, segment, writeOperation, false);
final InternalCacheEntry ice = innerGetFromContainer(key, segment, false, isPrimaryOwner(isOwner, segment));
if (trace)
log.tracef("Retrieved from container %s", ice);
if (ice == null) {
return NullCacheEntry.getInstance();
}
return ice;
} else if (isL1Enabled) {
final InternalCacheEntry ice = innerGetFromContainer(key, segment, writeOperation, false);
final InternalCacheEntry ice = innerGetFromContainer(key, segment, false, isPrimaryOwner(isOwner, segment));
if (trace)
log.tracef("Retrieved from container %s", ice);
if (ice == null || !ice.isL1Entry()) return null;
Expand All @@ -226,22 +238,19 @@ private CacheEntry getFromContainerForRead(Object key, int segment, boolean isOw
}
}

private InternalCacheEntry innerGetFromContainer(Object key, int segment, boolean writeOperation, boolean returnExpired) {
InternalCacheEntry ice;
// Write operations should not cause expiration events to occur, because we will most likely overwrite the
// value anyways - also required for remove expired to not cause infinite loop
if (writeOperation) {
ice = container.peek(segment, key);
if (ice != null && !returnExpired && ice.canExpire()) {
long wallClockTime = timeService.wallClockTime();
if (ice.isExpired(wallClockTime)) {
ice = null;
} else {
ice.touch(wallClockTime);
private InternalCacheEntry innerGetFromContainer(Object key, int segment, boolean returnExpired, boolean isPrimaryOwner) {
InternalCacheEntry ice = container.peek(segment, key);
if (ice != null && !returnExpired) {
long currentTime = timeService.wallClockTime();
if (ice.isExpired(currentTime)) {
// This means it is a write operation that isn't expiration and we are the owner, thus we should
// actually expire the entry from memory
if (isPrimaryOwner) {
// This method is always called from a write operation
expirationManager.entryExpiredInMemory(ice, currentTime, true).join();
}
return null;
}
} else {
ice = container.get(segment, key);
}
return ice;
}
Expand Down
Expand Up @@ -14,6 +14,7 @@
import org.infinispan.commons.util.Util;
import org.infinispan.container.entries.ExpiryHelper;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
Expand Down Expand Up @@ -93,9 +94,9 @@ public void processExpiration() {
}
// We check lifespan first as this is much less expensive to remove than max idle.
if (expiredMortal) {
addAndWaitIfFull(handleLifespanExpireEntry(ice.getKey(), value, lifespan), futures);
addAndWaitIfFull(handleLifespanExpireEntry(ice.getKey(), value, lifespan, false), futures);
} else if (expiredTransient) {
addAndWaitIfFull(actualRemoveMaxIdleExpireEntry(ice.getKey(), value, maxIdle), futures);
addAndWaitIfFull(actualRemoveMaxIdleExpireEntry(ice.getKey(), value, maxIdle, false), futures);
}
}
});
Expand Down Expand Up @@ -126,25 +127,26 @@ private void addAndWaitIfFull(CompletableFuture future, List<CompletableFuture>
}
}

CompletableFuture<Void> handleLifespanExpireEntry(K key, V value, long lifespan) {
CompletableFuture<Void> handleLifespanExpireEntry(K key, V value, long lifespan, boolean skipLocking) {
// The most used case will be a miss so no extra read before
if (expiring.putIfAbsent(key, key) == null) {
if (trace) {
log.tracef("Submitting expiration removal for key %s which had lifespan of %s", toStr(key), lifespan);
}
CompletableFuture<Void> future = cache.removeLifespanExpired(key, value, lifespan);
AdvancedCache<K, V> cacheToUse = skipLocking ? cache.withFlags(Flag.SKIP_LOCKING) : cache;
CompletableFuture<Void> future = cacheToUse.removeLifespanExpired(key, value, lifespan);
return future.whenComplete((v, t) -> expiring.remove(key, key));
}
return CompletableFutures.completedNull();
}

// Method invoked when an entry is found to be expired via get
CompletableFuture<Boolean> handleMaxIdleExpireEntry(K key, V value, long maxIdle) {
return actualRemoveMaxIdleExpireEntry(key, value, maxIdle);
CompletableFuture<Boolean> handleMaxIdleExpireEntry(K key, V value, long maxIdle, boolean skipLocking) {
return actualRemoveMaxIdleExpireEntry(key, value, maxIdle, skipLocking);
}

// Method invoked when entry should be attempted to be removed via max idle
CompletableFuture<Boolean> actualRemoveMaxIdleExpireEntry(K key, V value, long maxIdle) {
CompletableFuture<Boolean> actualRemoveMaxIdleExpireEntry(K key, V value, long maxIdle, boolean skipLocking) {
CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
Object expiringObject = expiring.putIfAbsent(key, completableFuture);
if (expiringObject == null) {
Expand All @@ -153,7 +155,8 @@ CompletableFuture<Boolean> actualRemoveMaxIdleExpireEntry(K key, V value, long m
}
completableFuture.whenComplete((b, t) -> expiring.remove(key, completableFuture));
try {
CompletableFuture<Boolean> expired = cache.removeMaxIdleExpired(key, value);
AdvancedCache<K, V> cacheToUse = skipLocking ? cache.withFlags(Flag.SKIP_LOCKING) : cache;
CompletableFuture<Boolean> expired = cacheToUse.removeMaxIdleExpired(key, value);
expired.whenComplete((b, t) -> {
if (t != null) {
completableFuture.completeExceptionally(t);
Expand All @@ -176,7 +179,8 @@ CompletableFuture<Boolean> actualRemoveMaxIdleExpireEntry(K key, V value, long m
}

@Override
public CompletableFuture<Boolean> entryExpiredInMemory(InternalCacheEntry<K, V> entry, long currentTime) {
public CompletableFuture<Boolean> entryExpiredInMemory(InternalCacheEntry<K, V> entry, long currentTime,
boolean writeOperation) {
// We need to synchronize on the entry since {@link InternalCacheEntry} locks the entry when doing an update
// so we can see both the new value and the metadata
boolean expiredMortal;
Expand All @@ -188,12 +192,15 @@ public CompletableFuture<Boolean> entryExpiredInMemory(InternalCacheEntry<K, V>
expiredMortal = ExpiryHelper.isExpiredMortal(lifespan, entry.getCreated(), currentTime);
}
if (expiredMortal) {
handleLifespanExpireEntry(entry.getKey(), value, lifespan);
CompletableFuture<Void> future = handleLifespanExpireEntry(entry.getKey(), value, lifespan, writeOperation);
if (writeOperation) {
return future.thenCompose(CompletableFutures.composeTrue());
}
// We don't want to block the user while the remove expired is happening for lifespan
return CompletableFutures.completedTrue();
} else {
// This means it expired transiently - this will block user until we confirm the entry is okay
return handleMaxIdleExpireEntry(entry.getKey(), value, entry.getMaxIdle());
return handleMaxIdleExpireEntry(entry.getKey(), value, entry.getMaxIdle(), writeOperation);
}
}

Expand Down
Expand Up @@ -94,7 +94,7 @@ public void processExpiration() {
purgeCandidates.hasNext();) {
InternalCacheEntry<K, V> e = purgeCandidates.next();
if (e.isExpired(currentTimeMillis)) {
entryExpiredInMemory(e, currentTimeMillis);
entryExpiredInMemory(e, currentTimeMillis, false);
}
}
if (trace) {
Expand All @@ -117,7 +117,8 @@ public boolean isEnabled() {
}

@Override
public CompletableFuture<Boolean> entryExpiredInMemory(InternalCacheEntry<K, V> entry, long currentTime) {
public CompletableFuture<Boolean> entryExpiredInMemory(InternalCacheEntry<K, V> entry, long currentTime,
boolean writeOperation) {
// We ignore the return from this method. It is possible for the entry to no longer be expired, but this means
// it was updated by another thread. In that case it is a completely valid value for it to be expired then not.
// So for this we just tell the caller it was expired.
Expand All @@ -139,13 +140,13 @@ public CompletableFuture<Boolean> entryExpiredInMemory(InternalCacheEntry<K, V>
@Override
public CompletableFuture<Boolean> entryExpiredInMemoryFromIteration(InternalCacheEntry<K, V> entry, long currentTime) {
// Local we just remove the entry as we see them
return entryExpiredInMemory(entry, currentTime);
return entryExpiredInMemory(entry, currentTime, false);
}

@Override
public void handleInMemoryExpiration(InternalCacheEntry<K, V> entry, long currentTime) {
// Just invoke the new method and join
entryExpiredInMemory(entry, currentTime).join();
entryExpiredInMemory(entry, currentTime, false).join();
}

@Override
Expand Down
Expand Up @@ -23,12 +23,13 @@ public interface InternalExpirationManager<K, V> extends ExpirationManager<K, V>
* not removed due to expiration
* @param entry the entry that has expired
* @param currentTime the current time when it expired
* @param writeOperation if the expiration was found during a write operation
* @return if this entry actually expired or not
*/
CompletableFuture<Boolean> entryExpiredInMemory(InternalCacheEntry<K, V> entry, long currentTime);
CompletableFuture<Boolean> entryExpiredInMemory(InternalCacheEntry<K, V> entry, long currentTime, boolean writeOperation);

/**
* This method is very similar to {@link #entryExpiredInMemory(InternalCacheEntry, long)} except that it does the
* This method is very similar to {@link #entryExpiredInMemory(InternalCacheEntry, long, boolean)} except that it does the
* bare minimum when an entry expired to guarantee if the entry is valid or not. This is important to reduce time
* spent per entry when iterating. This method may not actually remove the entry and may just return immediately
* if it is safe to do so.
Expand Down

0 comments on commit aceaecd

Please sign in to comment.