Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-9511 Expired event is not fired when modifying an expired entry #6260

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -159,7 +159,7 @@ AssertsNearCache<K, V> expectNearPutIfAbsent(K key, V value) {
return this;
}

public AssertsNearCache<K, V> expectNearPreemptiveRemove(K key, AssertsNearCache<K, V>... affected) {
public AssertsNearCache<K, V> expectNearPreemptiveRemove(K key) {
// Preemptive remove
MockRemoveEvent preemptiveRemove = pollEvent(events);
assertEquals(key, preemptiveRemove.key);
Expand Down
Expand Up @@ -69,7 +69,7 @@ protected NearCacheMode getNearCacheMode() {
protected void expectNearCacheUpdates(AssertsNearCache<Integer, String> producer,
Integer key, AssertsNearCache<Integer, String> consumer) {
producer.get(key, null).expectNearGetNull(key);
producer.put(key, "v1").expectNearPreemptiveRemove(key, consumer);
producer.put(key, "v1").expectNearPreemptiveRemove(key);
producer.get(key, "v1").expectNearGetNull(key).expectNearPutIfAbsent(key, "v1");
producer.put(key, "v2").expectNearRemove(key, consumer);
producer.get(key, "v2").expectNearGetNull(key).expectNearPutIfAbsent(key, "v2");
Expand Down
Expand Up @@ -55,11 +55,11 @@ protected NearCacheMode getNearCacheMode() {
public void testNearCacheClearedUponFailover() {
AssertsNearCache<Integer, String> stickyClient = createStickyAssertClient();
try {
stickyClient.put(1, "v1").expectNearPreemptiveRemove(1, headClient(), tailClient());
stickyClient.put(1, "v1").expectNearPreemptiveRemove(1);
stickyClient.get(1, "v1").expectNearGetNull(1).expectNearPutIfAbsent(1, "v1");
stickyClient.put(2, "v1").expectNearPreemptiveRemove(2, headClient(), tailClient());
stickyClient.put(2, "v1").expectNearPreemptiveRemove(2);
stickyClient.get(2, "v1").expectNearGetNull(2).expectNearPutIfAbsent(2, "v1");
stickyClient.put(3, "v1").expectNearPreemptiveRemove(3, headClient(), tailClient());
stickyClient.put(3, "v1").expectNearPreemptiveRemove(3);
stickyClient.get(3, "v1").expectNearGetNull(3).expectNearPutIfAbsent(3, "v1");
findServerAndKill(stickyClient.manager, servers, cacheManagers);
// The clear will be executed when the connection to the server is closed from the listener.
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/java/org/infinispan/cache/impl/CacheImpl.java
Expand Up @@ -684,8 +684,12 @@ private RemoveCommand createRemoveCommand(Object key, long explicitFlags) {

@Override
public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan) {
return removeLifespanExpired(key, value, lifespan, EnumUtil.EMPTY_BIT_SET);
}

final CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan, long explicitFlags) {
RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value, keyPartitioner.getSegment(key),
lifespan);
lifespan, explicitFlags | FlagBitSets.SKIP_CACHE_LOAD);
// Remove expired returns a boolean - just ignore it, the caller just needs to know that the expired
// entry is removed when this completes
CompletableFuture<Boolean> completableFuture = performRemoveExpiredCommand(command);
Expand All @@ -694,7 +698,12 @@ public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifesp

@Override
public CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value) {
RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value, keyPartitioner.getSegment(key));
return removeMaxIdleExpired(key, value, EnumUtil.EMPTY_BIT_SET);
}

final CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value, long explicitFlags) {
RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value, keyPartitioner.getSegment(key),
explicitFlags | FlagBitSets.SKIP_CACHE_LOAD);
return performRemoveExpiredCommand(command);
}

Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/infinispan/cache/impl/DecoratedCache.java
Expand Up @@ -432,6 +432,16 @@ public CompletableFuture<Boolean> removeAsync(Object key, Object value) {
return cacheImplementation.removeAsync(key, value, flags, contextBuilder);
}

@Override
public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan) {
return cacheImplementation.removeLifespanExpired(key, value, lifespan, flags);
}

@Override
public CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value) {
return cacheImplementation.removeMaxIdleExpired(key, value, flags);
}

@Override
public CompletableFuture<V> replaceAsync(K key, V value) {
return replaceAsync(key, value, cacheImplementation.defaultMetadata);
Expand Down
Expand Up @@ -161,18 +161,20 @@ public interface CommandsFactory {
* @param value the value of the entry when it was expired
* @param segment the segment of the given key
* @param lifespan the lifespan that expired from the command
* @param flagsBitSet Command flags provided by cache
* @return a RemovedExpiredCommand
*/
RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, int segment, Long lifespan);
RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, int segment, Long lifespan, long flagsBitSet);

/**
* Builds an expired remove command that is used to remove only a specific entry when it expires via maxIdle
* @param key the key of the expired entry
* @param value the value of the entry when it was expired
* @param segment the segment of the given key
* @param flagsBitSet Command flags provided by cache
* @return a RemovedExpiredCommand
*/
RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, int segment);
RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, int segment, long flagsBitSet);

/**
* Builds a retrieve max idle command that is used to get the last access time for a given key.
Expand Down
Expand Up @@ -251,15 +251,16 @@ public InvalidateCommand buildInvalidateFromL1Command(Address origin, long flags
}

@Override
public RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, int segment, Long lifespan) {
return new RemoveExpiredCommand(key, value, lifespan, false, notifier, segment, generateUUID(transactional),
versionGenerator.nonExistingVersion(), timeService);
public RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, int segment, Long lifespan,
long flagsBitSet) {
return new RemoveExpiredCommand(key, value, lifespan, false, notifier, segment, flagsBitSet,
generateUUID(transactional), versionGenerator.nonExistingVersion(), timeService);
}

@Override
public RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, int segment) {
return new RemoveExpiredCommand(key, value, null, true, notifier, segment, generateUUID(transactional),
versionGenerator.nonExistingVersion(), timeService);
public RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, int segment, long flagsBitSet) {
return new RemoveExpiredCommand(key, value, null, true, notifier, segment, flagsBitSet,
generateUUID(transactional), versionGenerator.nonExistingVersion(), timeService);
}

@Override
Expand Down
Expand Up @@ -120,6 +120,7 @@ public void writeTo(ObjectOutput output) throws IOException {
case REPLACE:
case WRITE:
output.writeObject(metadata);
// falls through
case REMOVE_EXPIRED:
output.writeObject(valueOrFunction);
break;
Expand All @@ -141,6 +142,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti
case REPLACE:
case WRITE:
metadata = (Metadata) input.readObject();
// falls through
case REMOVE_EXPIRED:
valueOrFunction = input.readObject();
break;
Expand Down Expand Up @@ -171,8 +173,8 @@ WriteCommand createWriteCommand() {
getCommandInvocationId());
case REMOVE_EXPIRED:
// Doesn't matter if it is max idle or not - important thing is that it raises expired event
return new RemoveExpiredCommand(key, valueOrFunction, null, false, cacheNotifier, segmentId, getCommandInvocationId(),
versionGenerator.nonExistingVersion(), componentRegistry.getTimeService());
return new RemoveExpiredCommand(key, valueOrFunction, null, false, cacheNotifier, segmentId, getFlags(),
getCommandInvocationId(), versionGenerator.nonExistingVersion(), componentRegistry.getTimeService());
case COMPUTE_IF_PRESENT:
return new ComputeCommand(key, (BiFunction) valueOrFunction, true, segmentId, getFlags(), getCommandInvocationId(),
metadata, cacheNotifier, componentRegistry);
Expand Down
Expand Up @@ -11,7 +11,6 @@
import org.infinispan.commands.Visitor;
import org.infinispan.commons.io.UnsignedNumeric;
import org.infinispan.commons.time.TimeService;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.container.entries.ExpiryHelper;
import org.infinispan.container.entries.MVCCEntry;
import org.infinispan.container.versioning.IncrementableEntryVersion;
Expand Down Expand Up @@ -48,10 +47,10 @@ public RemoveExpiredCommand() {
}

public RemoveExpiredCommand(Object key, Object value, Long lifespan, boolean maxIdle, CacheNotifier notifier, int segment,
CommandInvocationId commandInvocationId, IncrementableEntryVersion nonExistentVersion,
long flagBitSet, CommandInvocationId commandInvocationId, IncrementableEntryVersion nonExistentVersion,
TimeService timeService) {
//valueEquivalence can be null because this command never compares values.
super(key, value, notifier, segment, EnumUtil.EMPTY_BIT_SET, commandInvocationId);
super(key, value, notifier, segment, flagBitSet, commandInvocationId);
this.lifespan = lifespan;
this.maxIdle = maxIdle;
this.valueMatcher = ValueMatcher.MATCH_EXPECTED_OR_NULL;
Expand Down Expand Up @@ -160,6 +159,8 @@ public void writeTo(ObjectOutput output) throws IOException {
output.writeBoolean(false);
}
output.writeBoolean(maxIdle);
output.writeLong(FlagBitSets.copyWithoutRemotableFlags(getFlagsBitSet()));

}

@Override
Expand All @@ -175,6 +176,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti
lifespan = null;
}
maxIdle = input.readBoolean();
setFlagsBitSet(input.readLong());
}

@Override
Expand All @@ -191,12 +193,6 @@ public int hashCode() {
return Objects.hash(super.hashCode(), lifespan, maxIdle);
}

@Override
public long getFlagsBitSet() {
// Override the flags
return FlagBitSets.SKIP_CACHE_LOAD;
}

/**
* Whether this remove expired was fired because of max idle
* @return if this command is max idle based expiration
Expand Down
Expand Up @@ -80,7 +80,7 @@ public InternalCacheEntry<K, V> get(int segment, Object k) {
if (e != null && e.canExpire()) {
long currentTimeMillis = timeService.wallClockTime();
if (e.isExpired(currentTimeMillis) &&
expirationManager.entryExpiredInMemory(e, currentTimeMillis).join() == Boolean.TRUE) {
expirationManager.entryExpiredInMemory(e, currentTimeMillis, false).join() == Boolean.TRUE) {
e = null;
} else {
e.touch(currentTimeMillis);
Expand Down Expand Up @@ -159,7 +159,7 @@ public boolean containsKey(int segment, Object k) {
if (ice != null && ice.canExpire()) {
long currentTimeMillis = timeService.wallClockTime();
if (ice.isExpired(currentTimeMillis)) {
if (expirationManager.entryExpiredInMemory(ice, currentTimeMillis).join() == Boolean.TRUE) {
if (expirationManager.entryExpiredInMemory(ice, currentTimeMillis, false).join() == Boolean.TRUE) {
ice = null;
}
}
Expand Down
Expand Up @@ -2,6 +2,7 @@

import static org.infinispan.commons.util.Util.toStr;

import org.infinispan.commons.time.TimeService;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.Configurations;
import org.infinispan.container.entries.CacheEntry;
Expand All @@ -13,11 +14,12 @@
import org.infinispan.container.entries.VersionedRepeatableReadEntry;
import org.infinispan.container.versioning.VersionGenerator;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.expiration.impl.InternalExpirationManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.metadata.EmbeddedMetadata;
import org.infinispan.metadata.Metadata;
import org.infinispan.commons.time.TimeService;
import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
Expand All @@ -37,6 +39,8 @@ public class EntryFactoryImpl implements EntryFactory {
@Inject private Configuration configuration;
@Inject private TimeService timeService;
@Inject private VersionGenerator versionGenerator;
@Inject private DistributionManager distributionManager;
@Inject private InternalExpirationManager expirationManager;

private boolean isL1Enabled;
private boolean useRepeatableRead;
Expand Down Expand Up @@ -94,7 +98,7 @@ public void wrapEntryForWriting(InvocationContext ctx, Object key, int segment,
log.tracef("Updated context entry %s -> %s", contextEntry, mvccEntry);
} else {
// Not in the context yet.
CacheEntry cacheEntry = getFromContainer(key, segment, isOwner, true);
CacheEntry cacheEntry = getFromContainerForWrite(key, segment, isOwner);
if (cacheEntry == null) {
return;
}
Expand Down Expand Up @@ -125,7 +129,7 @@ public void wrapEntryForExpired(InvocationContext ctx, Object key, int segment,
log.tracef("Updated context entry %s -> %s", contextEntry, mvccEntry);
} else {
// Not in the context yet.
CacheEntry cacheEntry = innerGetFromContainer(key, segment, true, true);
CacheEntry cacheEntry = innerGetFromContainerForWrite(key, segment, true, false);
if (cacheEntry == null) {
cacheEntry = NullCacheEntry.getInstance();
}
Expand Down Expand Up @@ -197,17 +201,22 @@ private CacheEntry getFromContext(InvocationContext ctx, Object key) {
return cacheEntry;
}

private CacheEntry getFromContainer(Object key, int segment, boolean isOwner, boolean writeOperation) {
private boolean isPrimaryOwner(int segment) {
return distributionManager == null ||
distributionManager.getCacheTopology().getSegmentDistribution(segment).isPrimary();
}

private CacheEntry getFromContainerForWrite(Object key, int segment, boolean isOwner) {
if (isOwner) {
final InternalCacheEntry ice = innerGetFromContainer(key, segment, writeOperation, false);
final InternalCacheEntry ice = innerGetFromContainerForWrite(key, segment, false, isPrimaryOwner(segment));
if (trace)
log.tracef("Retrieved from container %s", ice);
if (ice == null) {
return NullCacheEntry.getInstance();
}
return ice;
} else if (isL1Enabled) {
final InternalCacheEntry ice = innerGetFromContainer(key, segment, writeOperation, false);
final InternalCacheEntry ice = innerGetFromContainerForWrite(key, segment, false, false);
if (trace)
log.tracef("Retrieved from container %s", ice);
if (ice == null || !ice.isL1Entry()) return null;
Expand All @@ -228,22 +237,20 @@ private CacheEntry getFromContainerForRead(Object key, int segment, boolean isOw
}
}

private InternalCacheEntry innerGetFromContainer(Object key, int segment, boolean writeOperation, boolean returnExpired) {
InternalCacheEntry ice;
// Write operations should not cause expiration events to occur, because we will most likely overwrite the
// value anyways - also required for remove expired to not cause infinite loop
if (writeOperation) {
ice = container.peek(segment, key);
if (ice != null && !returnExpired && ice.canExpire()) {
long wallClockTime = timeService.wallClockTime();
if (ice.isExpired(wallClockTime)) {
ice = null;
} else {
ice.touch(wallClockTime);
private InternalCacheEntry innerGetFromContainerForWrite(Object key, int segment, boolean returnExpired, boolean isPrimaryOwner) {
InternalCacheEntry ice = container.peek(segment, key);
if (ice != null && !returnExpired) {
long currentTime = timeService.wallClockTime();
if (ice.isExpired(currentTime)) {
// This means it is a write operation that isn't expiration and we are the owner, thus we should
// actually expire the entry from memory
if (isPrimaryOwner) {
// This method is always called from a write operation - we have to wait for the remove expired to
// complete to guarantee any expiration event is notified before performing the actual write operation
expirationManager.entryExpiredInMemory(ice, currentTime, true).join();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this, it feels like we're spreading the expiration logic in too many places. And wrapping entries shouldn't block...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where would you recommend the logic should go? I don't have a good spot since we don't want this in the DataContainer. Unless you think we should move the other logic out of the DataContainer into here?

Wrapping entries already blocks for max idle :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not in the DataContainer? It already deals with expiration...

Although moving the expiration logic somewhere where it could be made non-blocking does sound better.

Sorry I missed the blocking in the max idle PR review :P

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I responded quickly to run out, but the more I am thinking I would rather probably have this in EntryFactoryImpl and make both methods non blocking with interceptor stack. Although I haven't looked at details if this is possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I got a chance to look at this, I am not sure how to make this non blocking without changing the EntryFactory interface. What do you think @danberindei ? You have had a lot more experience changing these things.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, making it non-blocking is a lot more involved. It's in an impl package so changing the interface is not a big deal, but I wouldn't push for it in 9.4.

Not sure if moving everything expiration-related to EntryFactoryImpl right now is feasible either, that's why my first thought was to keep everything in the data container.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah to add it to the DataContainer would require changing that interface, which is a lot more difficult to do. So I would rather move it into EntryFactoryImpl at some point and eventually try to push that to the EntryWrappingInterceptor, but I don't see a nice easy way to do that. I would say what is here is probably what will have to be for 9.4.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok for now :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree.

}
return null;
}
} else {
ice = container.get(segment, key);
}
return ice;
}
Expand Down