diff --git a/core/src/main/java/org/infinispan/AdvancedCache.java b/core/src/main/java/org/infinispan/AdvancedCache.java index 95f94037ae85..b5a9053a5d6d 100644 --- a/core/src/main/java/org/infinispan/AdvancedCache.java +++ b/core/src/main/java/org/infinispan/AdvancedCache.java @@ -775,14 +775,37 @@ default Map getAndPutAll(Map map) { *

* This command will only remove the value if the value and lifespan also match if provided. *

+ * This method will suspend any ongoing transaction and start a new one just for the invocation of this command. It + * is automatically committed or rolled back after the command completes, either successfully or via an exception. + *

* NOTE: This method may be removed at any point including in a minor release and is not supported for external * usage. * * @param key the key that is expiring * @param value the value that mapped to the given. Null means it will match any value * @param lifespan the lifespan that should match. If null is provided it will match any lifespan value + * @return if the entry was removed + */ + CompletableFuture removeLifespanExpired(K key, V value, Long lifespan); + + /** + * Attempts to remove the entry for the given key, when it has expired due to max idle. This command first locks + * the key and then verifies that the entry has expired via maxIdle across all nodes. If it has this will then + * remove the given key. + *

+ * This method returns a boolean when it has determined if the entry has expired. This is useful for when a backup + * node invokes this command for a get that found the entry expired. This way the node can return back to the caller + * much faster when the entry is not expired. + *

+ * This method will suspend any ongoing transaction and start a new one just for the invocation of this command. It + * is automatically committed or rolled back after the command completes, either successfully or via an exception. + *

+ * NOTE: This method may be removed at any point including in a minor release and is not supported for external + * usage. + * @param key the key that expired via max idle for the given entry + * @return if the entry was removed */ - void removeExpired(K key, V value, Long lifespan); + CompletableFuture removeMaxIdleExpired(K key, V value); /** * Performs any cache operations using the specified pair of {@link Encoder}. diff --git a/core/src/main/java/org/infinispan/cache/impl/AbstractDelegatingAdvancedCache.java b/core/src/main/java/org/infinispan/cache/impl/AbstractDelegatingAdvancedCache.java index 82c062d86ef1..0122e243c5b2 100644 --- a/core/src/main/java/org/infinispan/cache/impl/AbstractDelegatingAdvancedCache.java +++ b/core/src/main/java/org/infinispan/cache/impl/AbstractDelegatingAdvancedCache.java @@ -436,8 +436,13 @@ public LockedStream lockedStream() { } @Override - public void removeExpired(K key, V value, Long lifespan) { - cache.removeExpired(key, value, lifespan); + public CompletableFuture removeLifespanExpired(K key, V value, Long lifespan) { + return cache.removeLifespanExpired(key, value, lifespan); + } + + @Override + public CompletableFuture removeMaxIdleExpired(K key, V value) { + return cache.removeMaxIdleExpired(key, value); } @Override diff --git a/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java b/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java index 16022b292756..7780d661b9c0 100644 --- a/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java +++ b/core/src/main/java/org/infinispan/cache/impl/CacheImpl.java @@ -176,6 +176,7 @@ public class CacheImpl implements AdvancedCache { private boolean transactional; private boolean batchingEnabled; private final ContextBuilder contextBuilder = this::getInvocationContextWithImplicitTransaction; + private final ContextBuilder expiredContextBuilder = i -> this.getInvocationContextWithImplicitTransaction(i, true); private final ContextBuilder pferContextBuilder = this::putForExternalReadContext; public CacheImpl(String name) { @@ -629,10 +630,31 @@ private RemoveCommand createRemoveCommand(Object key, long explicitFlags) { } @Override - public void removeExpired(K key, V value, Long lifespan) { + public CompletableFuture removeLifespanExpired(K key, V value, Long lifespan) { RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value, lifespan); - // Send an expired remove command to everyone - executeCommandAndCommitIfNeeded(contextBuilder, command, 1); + // Remove expired returns a boolean - just ignore it, the caller just needs to know that the expired + // entry is removed when this completes + CompletableFuture completableFuture = performRemoveExpiredCommand(command); + return completableFuture.thenApply(b -> null); + } + + @Override + public CompletableFuture removeMaxIdleExpired(K key, V value) { + RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value); + return performRemoveExpiredCommand(command); + } + + private CompletableFuture performRemoveExpiredCommand(RemoveExpiredCommand command) { + Transaction ongoingTransaction = null; + try { + ongoingTransaction = suspendOngoingTransactionIfExists(); + return executeCommandAndCommitIfNeededAsync(expiredContextBuilder, command, 1); + } catch (Exception e) { + if (log.isDebugEnabled()) log.debug("Caught exception while doing removeExpired()", e); + return CompletableFutures.completedExceptionFuture(e); + } finally { + resumePreviousOngoingTransaction(ongoingTransaction, true, "Had problems trying to resume a transaction after removeExpired()"); + } } @Override @@ -912,15 +934,28 @@ void addFilteredListener(ListenerHolder listener, } /** - * If this is a transactional cache and autoCommit is set to true then starts a transaction if this is not a - * transactional call. + * Creates an invocation context with an implicit transaction if it is required. An implicit transaction is created + * if there is no current transaction and autoCommit is enabled. + * @param keyCount how many keys are expected to be changed + * @return the invocation context */ InvocationContext getInvocationContextWithImplicitTransaction(int keyCount) { + return getInvocationContextWithImplicitTransaction(keyCount, false); + } + + /** + * Same as {@link #getInvocationContextWithImplicitTransaction(int)} except if forceCreateTransaction + * is true then autoCommit doesn't have to be enabled to start a new transaction. + * @param keyCount how many keys are expected to be changed + * @param forceCreateTransaction if true then a transaction is always started if there wasn't one + * @return the invocation context + */ + InvocationContext getInvocationContextWithImplicitTransaction(int keyCount, boolean forceCreateTransaction) { InvocationContext invocationContext; boolean txInjected = false; if (transactional) { Transaction transaction = getOngoingTransaction(true); - if (transaction == null && config.transaction().autoCommit()) { + if (transaction == null && (forceCreateTransaction || config.transaction().autoCommit())) { transaction = tryBegin(); txInjected = true; } diff --git a/core/src/main/java/org/infinispan/cache/impl/EncoderCache.java b/core/src/main/java/org/infinispan/cache/impl/EncoderCache.java index 6763e3e747e2..0949fea2a0df 100644 --- a/core/src/main/java/org/infinispan/cache/impl/EncoderCache.java +++ b/core/src/main/java/org/infinispan/cache/impl/EncoderCache.java @@ -477,8 +477,13 @@ public CacheSet> cacheEntrySet() { } @Override - public void removeExpired(K key, V value, Long lifespan) { - super.removeExpired(keyToStorage(key), valueToStorage(value), lifespan); + public CompletableFuture removeLifespanExpired(K key, V value, Long lifespan) { + return super.removeLifespanExpired(keyToStorage(key), valueToStorage(value), lifespan); + } + + @Override + public CompletableFuture removeMaxIdleExpired(K key, V value) { + return super.removeMaxIdleExpired(keyToStorage(key), valueToStorage(value)); } @Override diff --git a/core/src/main/java/org/infinispan/cache/impl/SimpleCacheImpl.java b/core/src/main/java/org/infinispan/cache/impl/SimpleCacheImpl.java index d1f4b71a7bf8..51a56bf340ea 100644 --- a/core/src/main/java/org/infinispan/cache/impl/SimpleCacheImpl.java +++ b/core/src/main/java/org/infinispan/cache/impl/SimpleCacheImpl.java @@ -462,8 +462,17 @@ public LockedStream lockedStream() { } @Override - public void removeExpired(K key, V value, Long lifespan) { + public CompletableFuture removeLifespanExpired(K key, V value, Long lifespan) { checkExpiration(getDataContainer().get(key), timeService.wallClockTime()); + return CompletableFutures.completedNull(); + } + + @Override + public CompletableFuture removeMaxIdleExpired(K key, V value) { + if (checkExpiration(getDataContainer().get(key), timeService.wallClockTime())) { + return CompletableFutures.completedTrue(); + } + return CompletableFutures.completedFalse(); } @Override diff --git a/core/src/main/java/org/infinispan/commands/CommandsFactory.java b/core/src/main/java/org/infinispan/commands/CommandsFactory.java index 6ce177354a12..80bc93c4acfc 100644 --- a/core/src/main/java/org/infinispan/commands/CommandsFactory.java +++ b/core/src/main/java/org/infinispan/commands/CommandsFactory.java @@ -41,6 +41,8 @@ import org.infinispan.commands.remote.RenewBiasCommand; import org.infinispan.commands.remote.RevokeBiasCommand; import org.infinispan.commands.remote.SingleRpcCommand; +import org.infinispan.commands.remote.expiration.RetrieveLastAccessCommand; +import org.infinispan.commands.remote.expiration.UpdateLastAccessCommand; import org.infinispan.commands.remote.recovery.CompleteTransactionCommand; import org.infinispan.commands.remote.recovery.GetInDoubtTransactionsCommand; import org.infinispan.commands.remote.recovery.GetInDoubtTxInfoCommand; @@ -153,7 +155,7 @@ public interface CommandsFactory { InvalidateCommand buildInvalidateFromL1Command(Address origin, long flagsBitSet, Collection keys); /** - * Builds an expired remove command that is used to remove only a specific expired entry + * Builds an expired remove command that is used to remove only a specific entry when it expires via lifespan * @param key the key of the expired entry * @param value the value of the entry when it was expired * @param lifespan the lifespan that expired from the command @@ -161,6 +163,29 @@ public interface CommandsFactory { */ RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, Long lifespan); + /** + * Builds an expired remove command that is used to remove only a specific entry when it expires via maxIdle + * @param key the key of the expired entry + * @param value the value of the entry when it was expired + * @return a RemovedExpiredCommand + */ + RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value); + + /** + * Builds a retrieve max idle command that is used to get the last access time for a given key. + * @param key the key of the entry to get the last access time of + * @return a RetrieveLastAccessCommand + */ + RetrieveLastAccessCommand buildRetrieveLastAccessCommand(Object key, Object value); + + /** + * Builds an update last access command that is used to update the last access time for a given key. + * @param key the key of the entry to update the last access time of + * @param accessTime the time to set the access time to + * @return a UpdateLastAccessCommand + */ + UpdateLastAccessCommand buildUpdateLastAccessCommand(Object key, long accessTime); + /** * Builds a ReplaceCommand * @param key key to replace diff --git a/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java b/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java index 605fe1a856af..4c884aa37d63 100644 --- a/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java +++ b/core/src/main/java/org/infinispan/commands/CommandsFactoryImpl.java @@ -46,6 +46,8 @@ import org.infinispan.commands.remote.RenewBiasCommand; import org.infinispan.commands.remote.RevokeBiasCommand; import org.infinispan.commands.remote.SingleRpcCommand; +import org.infinispan.commands.remote.expiration.RetrieveLastAccessCommand; +import org.infinispan.commands.remote.expiration.UpdateLastAccessCommand; import org.infinispan.commands.remote.recovery.CompleteTransactionCommand; import org.infinispan.commands.remote.recovery.GetInDoubtTransactionsCommand; import org.infinispan.commands.remote.recovery.GetInDoubtTxInfoCommand; @@ -139,6 +141,7 @@ import org.infinispan.transaction.xa.GlobalTransaction; import org.infinispan.transaction.xa.recovery.RecoveryManager; import org.infinispan.util.ByteString; +import org.infinispan.util.TimeService; import org.infinispan.util.concurrent.CommandAckCollector; import org.infinispan.util.concurrent.locks.LockManager; import org.infinispan.util.logging.Log; @@ -199,6 +202,7 @@ public class CommandsFactoryImpl implements CommandsFactory { private Map moduleCommandInitializers; @Inject private VersionGenerator versionGenerator; @Inject private KeyPartitioner keyPartitioner; + @Inject private TimeService timeService; private ByteString cacheName; private boolean transactional; @@ -247,10 +251,26 @@ public InvalidateCommand buildInvalidateFromL1Command(Address origin, long flags @Override public RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, Long lifespan) { - return new RemoveExpiredCommand(key, value, lifespan, notifier, generateUUID(transactional), + return new RemoveExpiredCommand(key, value, lifespan, false, notifier, generateUUID(transactional), versionGenerator.nonExistingVersion()); } + @Override + public RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value) { + return new RemoveExpiredCommand(key, value, null, true, notifier, generateUUID(transactional), + versionGenerator.nonExistingVersion()); + } + + @Override + public RetrieveLastAccessCommand buildRetrieveLastAccessCommand(Object key, Object value) { + return new RetrieveLastAccessCommand(cacheName, key, value); + } + + @Override + public UpdateLastAccessCommand buildUpdateLastAccessCommand(Object key, long accessTime) { + return new UpdateLastAccessCommand(cacheName, key, accessTime); + } + @Override public ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object newValue, Metadata metadata, long flagsBitSet) { return new ReplaceCommand(key, oldValue, newValue, notifier, metadata, flagsBitSet, @@ -509,6 +529,14 @@ public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) { RemoveExpiredCommand removeExpiredCommand = (RemoveExpiredCommand) c; removeExpiredCommand.init(notifier, versionGenerator.nonExistingVersion()); break; + case RetrieveLastAccessCommand.COMMAND_ID: + RetrieveLastAccessCommand retrieveLastAccessCommand = (RetrieveLastAccessCommand) c; + retrieveLastAccessCommand.inject(dataContainer, timeService); + break; + case UpdateLastAccessCommand.COMMAND_ID: + UpdateLastAccessCommand updateLastAccessCommand = (UpdateLastAccessCommand) c; + updateLastAccessCommand.inject(dataContainer); + break; case BackupAckCommand.COMMAND_ID: BackupAckCommand command = (BackupAckCommand) c; command.setCommandAckCollector(commandAckCollector); diff --git a/core/src/main/java/org/infinispan/commands/RemoteCommandsFactory.java b/core/src/main/java/org/infinispan/commands/RemoteCommandsFactory.java index 3a6d2e13eccb..b2ab1e04dff7 100644 --- a/core/src/main/java/org/infinispan/commands/RemoteCommandsFactory.java +++ b/core/src/main/java/org/infinispan/commands/RemoteCommandsFactory.java @@ -26,6 +26,8 @@ import org.infinispan.commands.remote.RenewBiasCommand; import org.infinispan.commands.remote.RevokeBiasCommand; import org.infinispan.commands.remote.SingleRpcCommand; +import org.infinispan.commands.remote.expiration.RetrieveLastAccessCommand; +import org.infinispan.commands.remote.expiration.UpdateLastAccessCommand; import org.infinispan.commands.remote.recovery.CompleteTransactionCommand; import org.infinispan.commands.remote.recovery.GetInDoubtTransactionsCommand; import org.infinispan.commands.remote.recovery.GetInDoubtTxInfoCommand; @@ -354,6 +356,12 @@ public CacheRpcCommand fromStream(byte id, byte type, ByteString cacheName) { case RenewBiasCommand.COMMAND_ID: command = new RenewBiasCommand(cacheName); break; + case RetrieveLastAccessCommand.COMMAND_ID: + command = new RetrieveLastAccessCommand(cacheName); + break; + case UpdateLastAccessCommand.COMMAND_ID: + command = new UpdateLastAccessCommand(cacheName); + break; default: throw new CacheException("Unknown command id " + id + "!"); } diff --git a/core/src/main/java/org/infinispan/commands/Visitor.java b/core/src/main/java/org/infinispan/commands/Visitor.java index 376bd8d42a09..dd36df3d05c7 100644 --- a/core/src/main/java/org/infinispan/commands/Visitor.java +++ b/core/src/main/java/org/infinispan/commands/Visitor.java @@ -32,6 +32,7 @@ import org.infinispan.commands.write.PutKeyValueCommand; import org.infinispan.commands.write.PutMapCommand; import org.infinispan.commands.write.RemoveCommand; +import org.infinispan.commands.write.RemoveExpiredCommand; import org.infinispan.commands.write.ReplaceCommand; import org.infinispan.context.InvocationContext; import org.infinispan.context.impl.TxInvocationContext; @@ -61,6 +62,10 @@ public interface Visitor { Object visitEvictCommand(InvocationContext ctx, EvictCommand command) throws Throwable; + default Object visitRemoveExpiredCommand(InvocationContext ctx, RemoveExpiredCommand command) throws Throwable { + return visitRemoveCommand(ctx, command); + } + @Deprecated default Object visitApplyDeltaCommand(InvocationContext ctx, ApplyDeltaCommand command) throws Throwable { throw new UnsupportedOperationException(); diff --git a/core/src/main/java/org/infinispan/commands/remote/expiration/RetrieveLastAccessCommand.java b/core/src/main/java/org/infinispan/commands/remote/expiration/RetrieveLastAccessCommand.java new file mode 100644 index 000000000000..55e3464cf242 --- /dev/null +++ b/core/src/main/java/org/infinispan/commands/remote/expiration/RetrieveLastAccessCommand.java @@ -0,0 +1,100 @@ +package org.infinispan.commands.remote.expiration; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +import org.infinispan.commands.TopologyAffectedCommand; +import org.infinispan.commands.remote.BaseRpcCommand; +import org.infinispan.container.DataContainer; +import org.infinispan.container.entries.InternalCacheEntry; +import org.infinispan.util.ByteString; +import org.infinispan.util.TimeService; +import org.infinispan.util.concurrent.CompletableFutures; + +/** + * Command that when invoked will retrieve the last access time from an entry without updating it + * @author wburns + * @since 9.3 + */ +public class RetrieveLastAccessCommand extends BaseRpcCommand implements TopologyAffectedCommand { + + private Object key; + private Object value; + + private DataContainer container; + private TimeService timeService; + private int topologyId = -1; + + public static final byte COMMAND_ID = 81; + + // Only here for CommandIdUniquenessTest + private RetrieveLastAccessCommand() { super(null); } + + public RetrieveLastAccessCommand(ByteString cacheName) { + super(cacheName); + } + + public RetrieveLastAccessCommand(ByteString cacheName, Object key, Object value) { + super(cacheName); + this.key = Objects.requireNonNull(key); + this.value = value; + } + + public void inject(DataContainer container, TimeService timeService) { + this.container = container; + this.timeService = timeService; + } + + @Override + public byte getCommandId() { + return COMMAND_ID; + } + + @Override + public boolean isReturnValueExpected() { + return true; + } + + @Override + public void writeTo(ObjectOutput output) throws IOException { + output.writeObject(key); + if (value == null) { + output.writeBoolean(false); + } else { + output.writeBoolean(true); + output.writeObject(value); + } + } + + @Override + public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { + key = input.readObject(); + boolean hasValue = input.readBoolean(); + if (hasValue) { + value = input.readObject(); + } + } + + @Override + public int getTopologyId() { + return topologyId; + } + + @Override + public void setTopologyId(int topologyId) { + this.topologyId = topologyId; + } + + @Override + public CompletableFuture invokeAsync() throws Throwable { + InternalCacheEntry ice = container.peek(key); + if (ice != null && (value == null || value.equals(ice.getValue())) && + !ice.isExpired(timeService.wallClockTime())) { + return CompletableFuture.completedFuture(ice.getLastUsed()); + } + return CompletableFutures.completedNull(); + } +} diff --git a/core/src/main/java/org/infinispan/commands/remote/expiration/UpdateLastAccessCommand.java b/core/src/main/java/org/infinispan/commands/remote/expiration/UpdateLastAccessCommand.java new file mode 100644 index 000000000000..72fb00125ecc --- /dev/null +++ b/core/src/main/java/org/infinispan/commands/remote/expiration/UpdateLastAccessCommand.java @@ -0,0 +1,88 @@ +package org.infinispan.commands.remote.expiration; + +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.concurrent.CompletableFuture; + +import org.infinispan.commands.TopologyAffectedCommand; +import org.infinispan.commands.remote.BaseRpcCommand; +import org.infinispan.commons.io.UnsignedNumeric; +import org.infinispan.container.DataContainer; +import org.infinispan.container.entries.InternalCacheEntry; +import org.infinispan.util.ByteString; +import org.infinispan.util.concurrent.CompletableFutures; + +/** + * Command that will update the last access time for an entry given the specific time + * @author wburns + * @since 9.3 + */ +public class UpdateLastAccessCommand extends BaseRpcCommand implements TopologyAffectedCommand { + + private Object key; + private long acessTime; + + private DataContainer container; + private int topologyId = -1; + + public static final byte COMMAND_ID = 82; + + // Only here for CommandIdUniquenessTest + private UpdateLastAccessCommand() { super(null); } + + public UpdateLastAccessCommand(ByteString cacheName) { + super(cacheName); + } + + public UpdateLastAccessCommand(ByteString cacheName, Object key, long accessTime) { + super(cacheName); + this.key = key; + this.acessTime = accessTime; + } + + public void inject(DataContainer container) { + this.container = container; + } + + @Override + public byte getCommandId() { + return COMMAND_ID; + } + + @Override + public boolean isReturnValueExpected() { + return false; + } + + @Override + public void writeTo(ObjectOutput output) throws IOException { + output.writeObject(key); + UnsignedNumeric.writeUnsignedLong(output, acessTime); + } + + @Override + public void readFrom(ObjectInput input) throws IOException, ClassNotFoundException { + key = input.readObject(); + acessTime = UnsignedNumeric.readUnsignedLong(input); + } + + @Override + public int getTopologyId() { + return topologyId; + } + + @Override + public void setTopologyId(int topologyId) { + this.topologyId = topologyId; + } + + @Override + public CompletableFuture invokeAsync() throws Throwable { + InternalCacheEntry ice = container.peek(key); + if (ice != null) { + ice.touch(acessTime); + } + return CompletableFutures.completedNull(); + } +} diff --git a/core/src/main/java/org/infinispan/commands/triangle/SingleKeyBackupWriteCommand.java b/core/src/main/java/org/infinispan/commands/triangle/SingleKeyBackupWriteCommand.java index 2e01e9871a20..7f1ec8ffb812 100644 --- a/core/src/main/java/org/infinispan/commands/triangle/SingleKeyBackupWriteCommand.java +++ b/core/src/main/java/org/infinispan/commands/triangle/SingleKeyBackupWriteCommand.java @@ -170,7 +170,8 @@ WriteCommand createWriteCommand() { return new ReplaceCommand(key, null, valueOrFunction, cacheNotifier, metadata, getFlags(), getCommandInvocationId()); case REMOVE_EXPIRED: - return new RemoveExpiredCommand(key, valueOrFunction, null, cacheNotifier, getCommandInvocationId(), + // Doesn't matter if it is max idle or not - important thing is that it raises expired event + return new RemoveExpiredCommand(key, valueOrFunction, null, false, cacheNotifier, getCommandInvocationId(), versionGenerator.nonExistingVersion()); case COMPUTE_IF_PRESENT: return new ComputeCommand(key, (BiFunction) valueOrFunction, true, getFlags(), getCommandInvocationId(), diff --git a/core/src/main/java/org/infinispan/commands/write/RemoveExpiredCommand.java b/core/src/main/java/org/infinispan/commands/write/RemoveExpiredCommand.java index ba38996004f2..13c0ecc4b786 100644 --- a/core/src/main/java/org/infinispan/commands/write/RemoveExpiredCommand.java +++ b/core/src/main/java/org/infinispan/commands/write/RemoveExpiredCommand.java @@ -8,6 +8,7 @@ import java.util.Objects; import org.infinispan.commands.CommandInvocationId; +import org.infinispan.commands.Visitor; import org.infinispan.commons.util.EnumUtil; import org.infinispan.container.entries.MVCCEntry; import org.infinispan.container.versioning.IncrementableEntryVersion; @@ -29,6 +30,7 @@ public class RemoveExpiredCommand extends RemoveCommand { public static final int COMMAND_ID = 58; private static final Log log = LogFactory.getLog(RemoveExpiredCommand.class); + private boolean maxIdle; private Long lifespan; private IncrementableEntryVersion nonExistentVersion; @@ -37,15 +39,21 @@ public RemoveExpiredCommand() { this.valueMatcher = ValueMatcher.MATCH_EXPECTED_OR_NULL; } - public RemoveExpiredCommand(Object key, Object value, Long lifespan, CacheNotifier notifier, + public RemoveExpiredCommand(Object key, Object value, Long lifespan, boolean maxIdle, CacheNotifier notifier, CommandInvocationId commandInvocationId, IncrementableEntryVersion nonExistentVersion) { //valueEquivalence can be null because this command never compares values. super(key, value, notifier, EnumUtil.EMPTY_BIT_SET, commandInvocationId); this.lifespan = lifespan; + this.maxIdle = maxIdle; this.valueMatcher = ValueMatcher.MATCH_EXPECTED_OR_NULL; this.nonExistentVersion = nonExistentVersion; } + @Override + public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable { + return visitor.visitRemoveExpiredCommand(ctx, this); + } + /** * Performs an expiration on a specified entry * @@ -97,8 +105,7 @@ public boolean isConditional() { } @Override - public void notify(InvocationContext ctx, Object removedValue, Metadata removedMetadata, - boolean isPre) { + public void notify(InvocationContext ctx, Object removedValue, Metadata removedMetadata, boolean isPre) { if (!isPre) { notifier.notifyCacheEntryExpired(key, value, removedMetadata, ctx); } @@ -115,6 +122,7 @@ public String toString() { "key=" + toStr(key) + ", value=" + toStr(value) + ", lifespan=" + lifespan + + ", maxIde=" + maxIdle + '}'; } @@ -129,6 +137,7 @@ public void writeTo(ObjectOutput output) throws IOException { } else { output.writeBoolean(false); } + output.writeBoolean(maxIdle); } @Override @@ -142,6 +151,7 @@ public void readFrom(ObjectInput input) throws IOException, ClassNotFoundExcepti } else { lifespan = null; } + maxIdle = input.readBoolean(); } @Override @@ -150,12 +160,12 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; RemoveExpiredCommand that = (RemoveExpiredCommand) o; - return Objects.equals(lifespan, that.lifespan); + return maxIdle == that.maxIdle && Objects.equals(lifespan, that.lifespan); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), lifespan); + return Objects.hash(super.hashCode(), lifespan, maxIdle); } @Override @@ -164,6 +174,14 @@ public long getFlagsBitSet() { return FlagBitSets.SKIP_CACHE_LOAD; } + /** + * Whether this remove expired was fired because of max idle + * @return if this command is max idle based expiration + */ + public boolean isMaxIdle() { + return maxIdle; + } + public void init(CacheNotifier notifier, IncrementableEntryVersion nonExistentVersion) { super.init(notifier); this.nonExistentVersion = nonExistentVersion; diff --git a/core/src/main/java/org/infinispan/configuration/cache/MemoryConfigurationBuilder.java b/core/src/main/java/org/infinispan/configuration/cache/MemoryConfigurationBuilder.java index 2b4b6b9e5c2f..f9b7f4512f22 100644 --- a/core/src/main/java/org/infinispan/configuration/cache/MemoryConfigurationBuilder.java +++ b/core/src/main/java/org/infinispan/configuration/cache/MemoryConfigurationBuilder.java @@ -174,8 +174,9 @@ public void validate() { EvictionStrategy strategy = attributes.attribute(EVICTION_STRATEGY).get(); if (!strategy.isEnabled()) { if (size > 0) { - evictionStrategy(EvictionStrategy.REMOVE); - log.debugf("Max entries configured (%d) without eviction strategy. Eviction strategy overridden to %s", size, strategy); + EvictionStrategy newStrategy = EvictionStrategy.REMOVE; + evictionStrategy(newStrategy); + log.debugf("Max entries configured (%d) without eviction strategy. Eviction strategy overridden to %s", size, newStrategy); } else if (getBuilder().persistence().passivation() && strategy != EvictionStrategy.MANUAL && !getBuilder().template()) { log.passivationWithoutEviction(); diff --git a/core/src/main/java/org/infinispan/container/DefaultDataContainer.java b/core/src/main/java/org/infinispan/container/DefaultDataContainer.java index 2d619ce629be..42168a605556 100644 --- a/core/src/main/java/org/infinispan/container/DefaultDataContainer.java +++ b/core/src/main/java/org/infinispan/container/DefaultDataContainer.java @@ -198,8 +198,9 @@ public InternalCacheEntry get(Object k) { if (e != null && e.canExpire()) { long currentTimeMillis = timeService.wallClockTime(); if (e.isExpired(currentTimeMillis)) { - expirationManager.handleInMemoryExpiration(e, currentTimeMillis); - e = null; + if (expirationManager.entryExpiredInMemory(e, currentTimeMillis).join() == Boolean.TRUE) { + e = null; + } } else { e.touch(currentTimeMillis); } @@ -244,8 +245,9 @@ public boolean containsKey(Object k) { if (ice != null && ice.canExpire()) { long currentTimeMillis = timeService.wallClockTime(); if (ice.isExpired(currentTimeMillis)) { - expirationManager.handleInMemoryExpiration(ice, currentTimeMillis); - ice = null; + if (expirationManager.entryExpiredInMemory(ice, currentTimeMillis).join() == Boolean.TRUE) { + ice = null; + } } } return ice != null; @@ -435,13 +437,14 @@ private InternalCacheEntry getNext() { now = timeService.wallClockTime(); initializedTime = true; } - if (!entry.isExpired(now)) { + // If the entry isn't expired or the manager says it isn't expired then we can remove it + // Manager says it isn't expired usually for something like maxIdle when another node accessed it + if (!entry.isExpired(now) || + expirationManager.entryExpiredInMemoryFromIteration(entry, now).join() == Boolean.FALSE) { if (trace) { log.tracef("Return next entry %s", entry); } return entry; - } else if (trace) { - log.tracef("%s is expired", entry); } } } @@ -515,7 +518,10 @@ public boolean tryAdvance(Consumer> action) { now = timeService.wallClockTime(); initializedTime = true; } - if (entryToUse.isExpired(now)) { + // If the entry isn't expired or the manager says it isn't expired then we can remove it + // Manager says it isn't expired usually for something like maxIdle when another node accessed it + if (entryToUse.isExpired(now) && + expirationManager.entryExpiredInMemoryFromIteration(entryToUse, now).join() == Boolean.TRUE) { entryToUse = null; } } @@ -542,7 +548,8 @@ public void forEachRemaining(Consumer> action) now = timeService.wallClockTime(); initializedTime = true; } - if (currentEntry.isExpired(now)) { + if (currentEntry.isExpired(now) && + expirationManager.entryExpiredInMemoryFromIteration(currentEntry, now).join() == Boolean.TRUE) { continue; } } diff --git a/core/src/main/java/org/infinispan/container/EntryFactory.java b/core/src/main/java/org/infinispan/container/EntryFactory.java index 003d55327b78..00e79be55b21 100644 --- a/core/src/main/java/org/infinispan/container/EntryFactory.java +++ b/core/src/main/java/org/infinispan/container/EntryFactory.java @@ -107,6 +107,17 @@ public interface EntryFactory { */ void wrapEntryForWriting(InvocationContext ctx, Object key, boolean isOwner, boolean isRead); + /** + * Insert an entry that exists in the data container into the context, even if it is expired + * + * Doesn't do anything if the key was already wrapped + * @param ctx current + * @param ctx current invocation context + * @param key key to look up and wrap + * @param isOwner true if this node is current owner in readCH (or we ignore CH) + */ + void wrapEntryForExpired(InvocationContext ctx, Object key, boolean isOwner); + /** * Insert an external entry (e.g. loaded from a cache loader or from a remote node) into the context. * diff --git a/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java b/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java index 5d7bda72e45f..8bedb021b591 100644 --- a/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java +++ b/core/src/main/java/org/infinispan/container/EntryFactoryImpl.java @@ -111,6 +111,35 @@ public void wrapEntryForWriting(InvocationContext ctx, Object key, boolean isOwn } } + @Override + public void wrapEntryForExpired(InvocationContext ctx, Object key, boolean isOwner) { + CacheEntry contextEntry = getFromContext(ctx, key); + if (contextEntry instanceof MVCCEntry) { + // Nothing to do, already wrapped. + } else if (contextEntry != null) { + // Already in the context as an InternalCacheEntry + // Need to wrap it in a MVCCEntry. + MVCCEntry mvccEntry = createWrappedEntry(key, contextEntry); + ctx.putLookedUpEntry(key, mvccEntry); + if (trace) + log.tracef("Updated context entry %s -> %s", contextEntry, mvccEntry); + } else { + // Not in the context yet. + CacheEntry cacheEntry = innerGetFromContainer(key, true, true); + if (cacheEntry == null) { + cacheEntry = NullCacheEntry.getInstance(); + } + MVCCEntry mvccEntry = createWrappedEntry(key, cacheEntry); + if (cacheEntry.isNull()) { + mvccEntry.setCreated(true); + } + mvccEntry.setRead(); + ctx.putLookedUpEntry(key, mvccEntry); + if (trace) + log.tracef("Updated context entry %s -> %s", contextEntry, mvccEntry); + } + } + @Override public void wrapExternalEntry(InvocationContext ctx, Object key, CacheEntry externalEntry, boolean isRead, boolean isWrite) { // For a write operation, the entry is always already wrapped. For a read operation, the entry may be @@ -167,7 +196,7 @@ private CacheEntry getFromContext(InvocationContext ctx, Object key) { private CacheEntry getFromContainer(Object key, boolean isOwner, boolean writeOperation) { if (isOwner) { - final InternalCacheEntry ice = innerGetFromContainer(key, writeOperation); + final InternalCacheEntry ice = innerGetFromContainer(key, writeOperation, false); if (trace) log.tracef("Retrieved from container %s", ice); if (ice == null) { @@ -175,7 +204,7 @@ private CacheEntry getFromContainer(Object key, boolean isOwner, boolean writeOp } return ice; } else if (isL1Enabled) { - final InternalCacheEntry ice = innerGetFromContainer(key, writeOperation); + final InternalCacheEntry ice = innerGetFromContainer(key, writeOperation, false); if (trace) log.tracef("Retrieved from container %s", ice); if (ice == null || !ice.isL1Entry()) return null; @@ -196,13 +225,13 @@ private CacheEntry getFromContainerForRead(Object key, boolean isOwner) { } } - private InternalCacheEntry innerGetFromContainer(Object key, boolean writeOperation) { + private InternalCacheEntry innerGetFromContainer(Object key, boolean writeOperation, boolean returnExpired) { InternalCacheEntry ice; // Write operations should not cause expiration events to occur, because we will most likely overwrite the // value anyways - also required for remove expired to not cause infinite loop if (writeOperation) { ice = container.peek(key); - if (ice != null && ice.canExpire()) { + if (ice != null && !returnExpired && ice.canExpire()) { long wallClockTime = timeService.wallClockTime(); if (ice.isExpired(wallClockTime)) { ice = null; diff --git a/core/src/main/java/org/infinispan/container/entries/VersionedRepeatableReadEntry.java b/core/src/main/java/org/infinispan/container/entries/VersionedRepeatableReadEntry.java index 1a6c74c942a0..2d43bde1cd98 100644 --- a/core/src/main/java/org/infinispan/container/entries/VersionedRepeatableReadEntry.java +++ b/core/src/main/java/org/infinispan/container/entries/VersionedRepeatableReadEntry.java @@ -53,6 +53,10 @@ public boolean performWriteSkewCheck(DataContainer container, PersistenceManager prevVersion = getCurrentEntryVersion(container, persistenceManager, ctx, versionGenerator, timeService); } } + // If it is expired then it is possible the previous version doesn't exist - because entry didn't exist) + if (isExpired() && prevVersion == versionGenerator.nonExistingVersion()) { + return true; + } // ISPN-7170: With total-order protocol, a command may skip loading the entry from persistence layer, and keep // the entry would have non-existing version. Then TotalOrderVersionedEntryWrappingInterceptor would // increase the version and store the entry during commit phase, potentially overwriting newer version. diff --git a/core/src/main/java/org/infinispan/expiration/ExpirationManager.java b/core/src/main/java/org/infinispan/expiration/ExpirationManager.java index a22b51a81121..128d9984fb65 100644 --- a/core/src/main/java/org/infinispan/expiration/ExpirationManager.java +++ b/core/src/main/java/org/infinispan/expiration/ExpirationManager.java @@ -1,11 +1,14 @@ package org.infinispan.expiration; +import java.util.concurrent.CompletableFuture; + import org.infinispan.configuration.cache.ExpirationConfigurationBuilder; import org.infinispan.configuration.global.GlobalConfigurationBuilder; import org.infinispan.container.entries.InternalCacheEntry; import org.infinispan.factories.scopes.Scope; import org.infinispan.factories.scopes.Scopes; import org.infinispan.marshall.core.MarshalledEntry; +import org.infinispan.util.concurrent.CompletableFutures; import net.jcip.annotations.ThreadSafe; @@ -40,8 +43,43 @@ public interface ExpirationManager { * preserve atomicity. * @param entry entry that is now expired * @param currentTime the current time in milliseconds + * @deprecated since 9.3 Please use {@link #entryExpiredInMemory(InternalCacheEntry, long)} instead as it is + * possible that an entry may not be removed even though it was expired here + */ + @Deprecated + default void handleInMemoryExpiration(InternalCacheEntry entry, long currentTime) { + throw new UnsupportedOperationException("Should invoke entryExpiredInMemory instead!"); + } + + /** + * This should be invoked passing in an entry that is now expired. This method may attempt to lock this key to + * preserve atomicity. This method should be invoked when an entry was read via get but found to be expired. + *

+ * This method returns true if the entry was removed due to expiration or false if the entry was + * not removed due to expiration + * @param entry the entry that has expired + * @param currentTime the current time when it expired + * @return if this entry actually expired or not */ - void handleInMemoryExpiration(InternalCacheEntry entry, long currentTime); + default CompletableFuture entryExpiredInMemory(InternalCacheEntry entry, long currentTime) { + handleInMemoryExpiration(entry, currentTime); + return CompletableFutures.completedTrue(); + } + + /** + * This method is very similar to {@link #entryExpiredInMemory(InternalCacheEntry, long)} except that it does the + * bare minimum when an entry expired to guarantee if the entry is valid or not. This is important to reduce time + * spent per entry when iterating. This method may not actually remove the entry and may just return immediately + * if it is safe to do so. + * @param entry the entry that has expired + * @param currentTime the current time when it expired + * @return if this entry actually expired or not + */ + default CompletableFuture entryExpiredInMemoryFromIteration(InternalCacheEntry entry, + long currentTime) { + handleInMemoryExpiration(entry, currentTime); + return CompletableFutures.completedTrue(); + } /** * This is to be invoked when a store entry expires. This method may attempt to lock this key to preserve atomicity. @@ -60,16 +98,31 @@ public interface ExpirationManager { */ void handleInStoreExpiration(MarshalledEntry marshalledEntry); + /** + * Retrieves the last access time for the given key in the cache. + * If the entry is not in the cache or it is expired it will return null. + * If the entry is present but cannot expire via max idle, it will return -1 + * If the entry is present and can expire via max idle but hasn't it will return a number > 0 + * @param key the key to retrieve the access time for + * @param value the value to match if desired (this can be null) + * @return the last access time if available + */ + CompletableFuture retrieveLastAccess(Object key, Object value); + /** * This is to be invoked with a when a write is known to occur to prevent expiration from happening. This way we * won't have a swarm of remote calls required. * @param key the key to use + * @deprecated since 9.3 There is no reason for this method and is implementation specific */ + @Deprecated void registerWriteIncoming(K key); /** * This should always be invoked after registering write but after performing any operations required. * @param key the key to use + * @deprecated since 9.3 There is no reason for this method and is implementation specific */ + @Deprecated void unregisterWrite(K key); } diff --git a/core/src/main/java/org/infinispan/expiration/impl/ClusterExpirationManager.java b/core/src/main/java/org/infinispan/expiration/impl/ClusterExpirationManager.java index ceea54c813d4..33cc4abc3770 100644 --- a/core/src/main/java/org/infinispan/expiration/impl/ClusterExpirationManager.java +++ b/core/src/main/java/org/infinispan/expiration/impl/ClusterExpirationManager.java @@ -2,30 +2,30 @@ import static org.infinispan.commons.util.Util.toStr; +import java.util.Arrays; import java.util.Iterator; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import javax.transaction.HeuristicMixedException; -import javax.transaction.HeuristicRollbackException; -import javax.transaction.InvalidTransactionException; -import javax.transaction.NotSupportedException; -import javax.transaction.RollbackException; -import javax.transaction.SystemException; -import javax.transaction.Transaction; -import javax.transaction.TransactionManager; - import org.infinispan.AdvancedCache; import org.infinispan.cache.impl.AbstractDelegatingCache; -import org.infinispan.commons.CacheException; +import org.infinispan.commands.CommandsFactory; +import org.infinispan.commands.remote.expiration.RetrieveLastAccessCommand; import org.infinispan.commons.util.Util; import org.infinispan.container.entries.ExpiryHelper; import org.infinispan.container.entries.InternalCacheEntry; -import org.infinispan.factories.KnownComponentNames; -import org.infinispan.factories.annotations.ComponentName; +import org.infinispan.distribution.DistributionInfo; +import org.infinispan.distribution.DistributionManager; +import org.infinispan.distribution.LocalizedCacheTopology; import org.infinispan.factories.annotations.Inject; import org.infinispan.marshall.core.MarshalledEntry; import org.infinispan.metadata.InternalMetadata; +import org.infinispan.remoting.responses.ValidResponse; +import org.infinispan.remoting.rpc.RpcManager; +import org.infinispan.remoting.transport.Address; +import org.infinispan.remoting.transport.ResponseCollectors; +import org.infinispan.remoting.transport.ValidResponseCollector; +import org.infinispan.util.concurrent.CompletableFutures; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -50,21 +50,18 @@ public class ClusterExpirationManager extends ExpirationManagerImpl private static final Log log = LogFactory.getLog(ClusterExpirationManager.class); private static final boolean trace = log.isTraceEnabled(); - @Inject @ComponentName(KnownComponentNames.ASYNC_OPERATIONS_EXECUTOR) - private ExecutorService asyncExecutor; - @Inject private AdvancedCache cache; - private boolean needTransaction; + private static final int MAX_ASYNC_EXPIRATIONS = 5; - public ExecutorService getAsyncExecutor() { - return asyncExecutor; - } + @Inject protected AdvancedCache cache; + @Inject protected CommandsFactory cf; + @Inject protected RpcManager rpcManager; + @Inject protected DistributionManager distributionManager; @Override public void start() { super.start(); // Data container entries are retrieved directly, so we don't need to worry about an encodings this.cache = AbstractDelegatingCache.unwrapCache(cache).getAdvancedCache(); - needTransaction = configuration.transaction().transactionMode().isTransactional(); } @Override @@ -76,6 +73,9 @@ public void processExpiration() { log.trace("Purging data container of expired entries"); start = timeService.time(); } + int offset = 0; + // We limit it so there is only so many async expiration removals done at the same time + CompletableFuture[] futures = new CompletableFuture[MAX_ASYNC_EXPIRATIONS]; long currentTimeMillis = timeService.wallClockTime(); for (Iterator> purgeCandidates = dataContainer.iteratorIncludingExpired(); purgeCandidates.hasNext();) { @@ -86,19 +86,25 @@ public void processExpiration() { boolean expiredTransient; V value; long lifespan; + long maxIdle; synchronized (e) { value = e.getValue(); lifespan = e.getLifespan(); + maxIdle = e.getMaxIdle(); expiredMortal = ExpiryHelper.isExpiredMortal(lifespan, e.getCreated(), currentTimeMillis); - expiredTransient = ExpiryHelper.isExpiredTransient(e.getMaxIdle(), e.getLastUsed(), currentTimeMillis); + expiredTransient = ExpiryHelper.isExpiredTransient(maxIdle, e.getLastUsed(), currentTimeMillis); } if (expiredMortal) { - handleLifespanExpireEntry(e.getKey(), value, lifespan, true); + offset = addAndWaitIfFull(handleLifespanExpireEntry(e.getKey(), value, lifespan), futures, offset); } else if (expiredTransient) { - super.handleInMemoryExpiration(e, currentTimeMillis); + offset = addAndWaitIfFull(actualRemoveMaxIdleExpireEntry(e.getKey(), value, maxIdle), futures, offset); } } } + if (offset != 0) { + // Make sure that all of the futures are complete before returning + CompletableFuture.allOf(Arrays.copyOf(futures, offset)).join(); + } if (trace) { log.tracef("Purging data container completed in %s", Util.prettyPrintTime(timeService.timeDuration(start, TimeUnit.MILLISECONDS))); @@ -113,55 +119,68 @@ public void processExpiration() { } } - void handleLifespanExpireEntry(K key, V value, long lifespan, boolean sync) { + private int addAndWaitIfFull(CompletableFuture future, CompletableFuture[] futures, int offset) { + futures[offset++] = future; + if (offset == futures.length) { + // Wait for them to complete + CompletableFuture.allOf(futures).join(); + Arrays.fill(futures, null); + offset = 0; + } + return offset; + } + + CompletableFuture handleLifespanExpireEntry(K key, V value, long lifespan) { // The most used case will be a miss so no extra read before if (expiring.putIfAbsent(key, key) == null) { if (trace) { log.tracef("Submitting expiration removal for key %s which had lifespan of %s", toStr(key), lifespan); } - Runnable runnable = () -> { - try { - removeExpired(key, value, lifespan); - } finally { - expiring.remove(key); - } - }; - if (sync) { - runnable.run(); - } else { - asyncExecutor.submit(runnable); - } + CompletableFuture future = cache.removeLifespanExpired(key, value, lifespan); + return future.whenComplete((v, t) -> expiring.remove(key, key)); } + return CompletableFutures.completedNull(); + } + + // Method invoked when an entry is found to be expired via get + CompletableFuture handleMaxIdleExpireEntry(K key, V value, long maxIdle) { + return actualRemoveMaxIdleExpireEntry(key, value, maxIdle); } - private void removeExpired(K key, V value, Long lifespan) { - if (needTransaction) { - TransactionManager tm = cache.getTransactionManager(); + // Method invoked when entry should be attempted to be removed via max idle + CompletableFuture actualRemoveMaxIdleExpireEntry(K key, V value, long maxIdle) { + CompletableFuture completableFuture = new CompletableFuture<>(); + Object expiringObject = expiring.putIfAbsent(key, completableFuture); + if (expiringObject == null) { + if (trace) { + log.tracef("Submitting expiration removal for key %s which had maxIdle of %s", toStr(key), maxIdle); + } + completableFuture.whenComplete((b, t) -> expiring.remove(key, completableFuture)); try { - Transaction tx = tm.suspend(); - try { - tm.begin(); - cache.removeExpired(key, value, lifespan); - } catch (NotSupportedException | SystemException e) { - tm.rollback(); - throw e; - } finally { - tm.commit(); - } - if (tx != null) { - tm.resume(tx); - } - } catch (RollbackException | NotSupportedException | SystemException | HeuristicMixedException | - HeuristicRollbackException | InvalidTransactionException e) { - throw new CacheException(e); + CompletableFuture expired = cache.removeMaxIdleExpired(key, value); + expired.whenComplete((b, t) -> { + if (t != null) { + completableFuture.completeExceptionally(t); + } else { + completableFuture.complete(b); + } + }); + return completableFuture; + } catch (Throwable t) { + completableFuture.completeExceptionally(t); + throw t; } + } else if (expiringObject instanceof CompletableFuture) { + // This means there was another thread that found it had expired via max idle + return (CompletableFuture) expiringObject; } else { - cache.removeExpired(key, value, lifespan); + // If it wasn't a CompletableFuture we had a lifespan removal occurring so it will be removed for sure + return CompletableFutures.completedTrue(); } } @Override - public void handleInMemoryExpiration(InternalCacheEntry entry, long currentTime) { + public CompletableFuture entryExpiredInMemory(InternalCacheEntry entry, long currentTime) { // We need to synchronize on the entry since {@link InternalCacheEntry} locks the entry when doing an update // so we can see both the new value and the metadata boolean expiredMortal; @@ -173,9 +192,30 @@ public void handleInMemoryExpiration(InternalCacheEntry entry, long curren expiredMortal = ExpiryHelper.isExpiredMortal(lifespan, entry.getCreated(), currentTime); } if (expiredMortal) { - handleLifespanExpireEntry(entry.getKey(), value, lifespan, false); + handleLifespanExpireEntry(entry.getKey(), value, lifespan); + // We don't want to block the user while the remove expired is happening for lifespan + return CompletableFutures.completedTrue(); } else { - super.handleInMemoryExpiration(entry, currentTime); + // This means it expired transiently - this will block user until we confirm the entry is okay + return handleMaxIdleExpireEntry(entry.getKey(), value, entry.getMaxIdle()); + } + } + + @Override + public CompletableFuture entryExpiredInMemoryFromIteration(InternalCacheEntry entry, long currentTime) { + // We need to synchronize on the entry since {@link InternalCacheEntry} locks the entry when doing an update + // so we can see both the new value and the metadata + boolean expiredTransient; + synchronized (entry) { + expiredTransient = ExpiryHelper.isExpiredTransient(entry.getMaxIdle(), entry.getLastUsed(), currentTime); + } + if (expiredTransient) { + // Max idle expiration - we just return it (otherwise we would have to incur remote overhead) + // This entry will be removed on next get or reaper running + return CompletableFutures.completedFalse(); + } else { + // Lifespan was expired - but we don't want to take the hit of causing an expire command to be fired + return CompletableFutures.completedTrue(); } } @@ -185,9 +225,9 @@ public void handleInStoreExpiration(K key) { // Unfortunately stores don't pull the entry so we can't tell exactly why it expired and thus we have to remove // the entire value. Unfortunately this could cause a concurrent write to be undone try { - removeExpired(key, null, null); + cache.removeLifespanExpired(key, null, null).join(); } finally { - expiring.remove(key); + expiring.remove(key, key); } } } @@ -198,10 +238,64 @@ public void handleInStoreExpiration(MarshalledEntry marshalledEntry) { if (expiring.putIfAbsent(key, key) == null) { try { InternalMetadata metadata = marshalledEntry.getMetadata(); - removeExpired(key, marshalledEntry.getValue(), metadata.lifespan() == -1 ? null : metadata.lifespan()); + cache.removeLifespanExpired(key, marshalledEntry.getValue(), metadata.lifespan() == -1 ? null : metadata.lifespan()) + .join(); } finally { - expiring.remove(key); + expiring.remove(key, key); + } + } + } + + @Override + public CompletableFuture retrieveLastAccess(Object key, Object value) { + Long access = localLastAccess(key, value); + + LocalizedCacheTopology topology = distributionManager.getCacheTopology(); + DistributionInfo info = topology.getDistribution(key); + + if (trace) { + log.tracef("Asking all read owners %s for key: %s - for latest access time", info.readOwners(), key); + } + + // Need to gather last access times + RetrieveLastAccessCommand rlac = cf.buildRetrieveLastAccessCommand(key, value); + rlac.setTopologyId(rpcManager.getTopologyId()); + + // In scattered cache read owners will only contain primary + return rpcManager.invokeCommand(info.readOwners(), rlac, new MaxResponseCollector<>(access), + rpcManager.getSyncRpcOptions()).toCompletableFuture(); + } + + static class MaxResponseCollector> extends ValidResponseCollector { + T highest; + + MaxResponseCollector(T highest) { + this.highest = highest; + } + + @Override + public T finish() { + return highest; + } + + @Override + protected T addValidResponse(Address sender, ValidResponse response) { + T value = (T) response.getResponseValue(); + if (value != null && (highest == null || highest.compareTo(value) < 0)) { + highest = value; } + return null; + } + + @Override + protected T addTargetNotFound(Address sender) { + // We don't care about a node leaving + return null; + } + + @Override + protected T addException(Address sender, Exception exception) { + throw ResponseCollectors.wrapRemoteException(sender, exception); } } } diff --git a/core/src/main/java/org/infinispan/expiration/impl/ExpirationManagerImpl.java b/core/src/main/java/org/infinispan/expiration/impl/ExpirationManagerImpl.java index f0013530d889..fda2ad9ed6e9 100644 --- a/core/src/main/java/org/infinispan/expiration/impl/ExpirationManagerImpl.java +++ b/core/src/main/java/org/infinispan/expiration/impl/ExpirationManagerImpl.java @@ -1,6 +1,7 @@ package org.infinispan.expiration.impl; import java.util.Iterator; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; @@ -22,6 +23,7 @@ import org.infinispan.notifications.cachelistener.CacheNotifier; import org.infinispan.persistence.manager.PersistenceManager; import org.infinispan.util.TimeService; +import org.infinispan.util.concurrent.CompletableFutures; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -91,7 +93,7 @@ public void processExpiration() { purgeCandidates.hasNext();) { InternalCacheEntry e = purgeCandidates.next(); if (e.isExpired(currentTimeMillis)) { - handleInMemoryExpiration(e, currentTimeMillis); + entryExpiredInMemory(e, currentTimeMillis); } } if (trace) { @@ -114,7 +116,10 @@ public boolean isEnabled() { } @Override - public void handleInMemoryExpiration(InternalCacheEntry entry, long currentTime) { + public CompletableFuture entryExpiredInMemory(InternalCacheEntry entry, long currentTime) { + // We ignore the return from this method. It is possible for the entry to no longer be expired, but this means + // it was updated by another thread. In that case it is a completely valid value for it to be expired then not. + // So for this we just tell them it was expired. dataContainer.compute(entry.getKey(), ((k, oldEntry, factory) -> { if (oldEntry != null) { synchronized (oldEntry) { @@ -127,6 +132,13 @@ public void handleInMemoryExpiration(InternalCacheEntry entry, long curren } return null; })); + return CompletableFutures.completedTrue(); + } + + @Override + public CompletableFuture entryExpiredInMemoryFromIteration(InternalCacheEntry entry, long currentTime) { + // Local we just remove the entry as we see them + return entryExpiredInMemory(entry, currentTime); } @Override @@ -191,6 +203,24 @@ private void deleteFromStores(K key) { persistenceManager.deleteFromAllStores(key, PersistenceManager.AccessMode.BOTH); } + protected Long localLastAccess(Object key, Object value) { + InternalCacheEntry ice = dataContainer.peek(key); + if (ice != null && (value == null || value.equals(ice.getValue())) && + !ice.isExpired(timeService.wallClockTime())) { + return ice.getLastUsed(); + } + return null; + } + + @Override + public CompletableFuture retrieveLastAccess(Object key, Object value) { + Long lastAccess = localLastAccess(key, value); + if (lastAccess != null) { + return CompletableFuture.completedFuture(lastAccess); + } + return CompletableFutures.completedNull(); + } + @Override public void registerWriteIncoming(K key) { expiring.put(key, key); diff --git a/core/src/main/java/org/infinispan/expiration/impl/TxClusterExpirationManager.java b/core/src/main/java/org/infinispan/expiration/impl/TxClusterExpirationManager.java new file mode 100644 index 000000000000..2a3ac2c6d7cc --- /dev/null +++ b/core/src/main/java/org/infinispan/expiration/impl/TxClusterExpirationManager.java @@ -0,0 +1,15 @@ +package org.infinispan.expiration.impl; + +import java.util.concurrent.CompletableFuture; + +import net.jcip.annotations.ThreadSafe; + +@ThreadSafe +public class TxClusterExpirationManager extends ClusterExpirationManager { + @Override + CompletableFuture handleMaxIdleExpireEntry(K key, V value, long maxIdle) { + // We cannot remove an entry in a tx, due to possible dead lock - thus we just return + // if the entry is expired or not + return retrieveLastAccess(key, value).thenApply(hasMaxIdle -> hasMaxIdle == null); + } +} diff --git a/core/src/main/java/org/infinispan/factories/ExpirationManagerFactory.java b/core/src/main/java/org/infinispan/factories/ExpirationManagerFactory.java index 29fc8db0fab2..8e8db5364444 100644 --- a/core/src/main/java/org/infinispan/factories/ExpirationManagerFactory.java +++ b/core/src/main/java/org/infinispan/factories/ExpirationManagerFactory.java @@ -4,6 +4,7 @@ import org.infinispan.expiration.ExpirationManager; import org.infinispan.expiration.impl.ClusterExpirationManager; import org.infinispan.expiration.impl.ExpirationManagerImpl; +import org.infinispan.expiration.impl.TxClusterExpirationManager; import org.infinispan.factories.annotations.DefaultFactoryFor; /** @@ -21,6 +22,9 @@ public class ExpirationManagerFactory extends AbstractNamedCacheComponentFactory public T construct(Class componentType) { CacheMode cacheMode = configuration.clustering().cacheMode(); if (cacheMode.needsStateTransfer()) { + if (configuration.transaction().transactionMode().isTransactional()) { + return componentType.cast(new TxClusterExpirationManager<>()); + } return componentType.cast(new ClusterExpirationManager<>()); } else { return componentType.cast(new ExpirationManagerImpl<>()); diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java index e6430ff52f9d..ff0e6d67e6d3 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/BaseDistributionInterceptor.java @@ -26,9 +26,11 @@ import org.infinispan.commands.remote.ClusteredGetAllCommand; import org.infinispan.commands.remote.ClusteredGetCommand; import org.infinispan.commands.remote.GetKeysInGroupCommand; +import org.infinispan.commands.remote.expiration.UpdateLastAccessCommand; import org.infinispan.commands.write.AbstractDataWriteCommand; import org.infinispan.commands.write.ClearCommand; import org.infinispan.commands.write.DataWriteCommand; +import org.infinispan.commands.write.RemoveExpiredCommand; import org.infinispan.commands.write.ValueMatcher; import org.infinispan.commons.CacheException; import org.infinispan.commons.util.ArrayCollector; @@ -40,11 +42,11 @@ import org.infinispan.context.impl.FlagBitSets; import org.infinispan.context.impl.TxInvocationContext; import org.infinispan.distribution.DistributionInfo; -import org.infinispan.distribution.DistributionManager; import org.infinispan.distribution.LocalizedCacheTopology; import org.infinispan.distribution.RemoteValueRetrievedListener; import org.infinispan.distribution.ch.ConsistentHash; import org.infinispan.distribution.ch.KeyPartitioner; +import org.infinispan.expiration.ExpirationManager; import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Start; import org.infinispan.interceptors.InvocationSuccessFunction; @@ -64,6 +66,7 @@ import org.infinispan.statetransfer.AllOwnersLostException; import org.infinispan.statetransfer.OutdatedTopologyException; import org.infinispan.transaction.xa.GlobalTransaction; +import org.infinispan.util.TimeService; import org.infinispan.util.concurrent.CompletableFutures; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -83,9 +86,10 @@ public abstract class BaseDistributionInterceptor extends ClusteringInterceptor private static final boolean trace = log.isTraceEnabled(); private static final Object LOST_PLACEHOLDER = new Object(); - @Inject protected DistributionManager dm; @Inject protected RemoteValueRetrievedListener rvrl; @Inject protected KeyPartitioner keyPartitioner; + @Inject protected TimeService timeService; + @Inject protected ExpirationManager expirationManager; protected boolean isL1Enabled; protected boolean isReplicated; @@ -122,7 +126,7 @@ public final Object visitGetKeysInGroupCommand(InvocationContext ctx, GetKeysInG //don't go remote if we are an owner. return invokeNext(ctx, command); } - Address primaryOwner = dm.getCacheTopology().getDistribution(command.getGroupName()).primary(); + Address primaryOwner = distributionManager.getCacheTopology().getDistribution(command.getGroupName()).primary(); CompletionStage future = rpcManager.invokeCommand(primaryOwner, command, SingleResponseCollector.validOnly(), rpcManager.getSyncRpcOptions()); @@ -280,37 +284,22 @@ private boolean shouldLoad(InvocationContext ctx, AbstractDataWriteCommand comma } } - private Object invokeRemotely(InvocationContext ctx, DataWriteCommand command, Address primaryOwner) { - if (trace) log.tracef("I'm not the primary owner, so sending the command to the primary owner(%s) in order to be forwarded", primaryOwner); - boolean isSyncForwarding = isSynchronous(command) || command.isReturnValueExpected(); - - if (!isSyncForwarding) { - rpcManager.sendTo(primaryOwner, command, DeliverOrder.PER_SENDER); - return null; + protected LocalizedCacheTopology checkTopologyId(TopologyAffectedCommand command) { + LocalizedCacheTopology cacheTopology = distributionManager.getCacheTopology(); + int currentTopologyId = cacheTopology.getTopologyId(); + int cmdTopology = command.getTopologyId(); + if (command instanceof FlagAffectedCommand && ((((FlagAffectedCommand) command).hasAnyFlag(FlagBitSets.SKIP_OWNERSHIP_CHECK | FlagBitSets.CACHE_MODE_LOCAL)))) { + getLog().tracef("Skipping topology check for command %s", command); + return cacheTopology; } - CompletionStage remoteInvocation; - try { - remoteInvocation = rpcManager.invokeCommand(primaryOwner, command, SingleResponseCollector.validOnly(), - rpcManager.getSyncRpcOptions()); - } catch (Throwable t) { - command.setValueMatcher(command.getValueMatcher().matcherForRetry()); - throw t; + if (cmdTopology >= 0 && currentTopologyId != cmdTopology) { + throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " + + cmdTopology + ", got " + currentTopologyId); } - return asyncValue(remoteInvocation).andHandle(ctx, command, (rCtx, rCommand, rv, t) -> { - DataWriteCommand dataWriteCommand = (DataWriteCommand) rCommand; - dataWriteCommand.setValueMatcher(dataWriteCommand.getValueMatcher().matcherForRetry()); - CompletableFutures.rethrowException(t); - - Response response = ((Response) rv); - if (!response.isSuccessful()) { - dataWriteCommand.fail(); - // FIXME A response cannot be successful and not valid - } else if (!(response instanceof ValidResponse)) { - throw unexpected(response); - } - // We expect only successful/unsuccessful responses, not unsure - return ((ValidResponse) response).getResponseValue(); - }); + if (trace) { + getLog().tracef("Current topology %d, command topology %d", currentTopologyId, cmdTopology); + } + return cacheTopology; } private Object primaryReturnHandler(InvocationContext ctx, VisitableCommand visitableCommand, Object localResult) { @@ -873,22 +862,37 @@ protected Object unwrapFunctionalResultOnOrigin(InvocationContext ctx, Object ke return responseValue; } - protected LocalizedCacheTopology checkTopologyId(TopologyAffectedCommand command) { - LocalizedCacheTopology cacheTopology = dm.getCacheTopology(); - int currentTopologyId = cacheTopology.getTopologyId(); - int cmdTopology = command.getTopologyId(); - if (command instanceof FlagAffectedCommand && ((((FlagAffectedCommand) command).hasAnyFlag(FlagBitSets.SKIP_OWNERSHIP_CHECK | FlagBitSets.CACHE_MODE_LOCAL)))) { - log.tracef("Skipping topology check for command %s", command); - return cacheTopology; - } - if (cmdTopology >= 0 && currentTopologyId != cmdTopology) { - throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " + - cmdTopology + ", got " + currentTopologyId); + protected Object invokeRemotely(InvocationContext ctx, DataWriteCommand command, Address primaryOwner) { + if (trace) getLog().tracef("I'm not the primary owner, so sending the command to the primary owner(%s) in order to be forwarded", primaryOwner); + boolean isSyncForwarding = isSynchronous(command) || command.isReturnValueExpected(); + + if (!isSyncForwarding) { + rpcManager.sendTo(primaryOwner, command, DeliverOrder.PER_SENDER); + return null; } - if (trace) { - log.tracef("Current topology %d, command topology %d", currentTopologyId, cmdTopology); + CompletionStage remoteInvocation; + try { + remoteInvocation = rpcManager.invokeCommand(primaryOwner, command, SingleResponseCollector.validOnly(), + rpcManager.getSyncRpcOptions()); + } catch (Throwable t) { + command.setValueMatcher(command.getValueMatcher().matcherForRetry()); + throw t; } - return cacheTopology; + return asyncValue(remoteInvocation).andHandle(ctx, command, (rCtx, rCommand, rv, t) -> { + DataWriteCommand dataWriteCommand = (DataWriteCommand) rCommand; + dataWriteCommand.setValueMatcher(dataWriteCommand.getValueMatcher().matcherForRetry()); + CompletableFutures.rethrowException(t); + + Response response = ((Response) rv); + if (!response.isSuccessful()) { + dataWriteCommand.fail(); + // FIXME A response cannot be successful and not valid + } else if (!(response instanceof ValidResponse)) { + throw unexpected(response); + } + // We expect only successful/unsuccessful responses, not unsure + return ((ValidResponse) response).getResponseValue(); + }); } /** @@ -939,4 +943,87 @@ public Object transformResult(Object[] results) { } } + @Override + public Object visitRemoveExpiredCommand(InvocationContext ctx, RemoveExpiredCommand command) throws Throwable { + // Lifespan expiration just behaves like a remove command + if (!command.isMaxIdle()) { + return visitRemoveCommand(ctx, command); + } + + Object key = command.getKey(); + CacheEntry entry = ctx.lookupEntry(key); + + if (isLocalModeForced(command)) { + if (entry == null) { + entryFactory.wrapExternalEntry(ctx, key, null, false, true); + } + return invokeNext(ctx, command); + } + + LocalizedCacheTopology cacheTopology = checkTopologyId(command); + DistributionInfo info = cacheTopology.getDistribution(key); + if (entry == null) { + if (info.isPrimary()) { + throw new IllegalStateException("Primary owner in writeCH should always be an owner in readCH as well."); + } else if (ctx.isOriginLocal()) { + // Primary has to handle max idle removal + return invokeRemotely(ctx, command, info.primary()); + } else { + throw new IllegalStateException("Non primary owner recipient of remote remove expired command."); + } + } else { + if (info.isPrimary()) { + // We don't pass the value for performance as we already have the lock obtained for this key - so it can't + // change from its current value + CompletableFuture completableFuture = expirationManager.retrieveLastAccess(key, null); + + return asyncValue(completableFuture).thenApply(ctx, command, (rCtx, rCommand, max) -> { + if (max != null) { + // Make sure to fail the command for other interceptors + command.fail(); + if (trace) { + log.tracef("Received %s as the latest last access time for key %s", max, key); + } + long longMax = ((Long) max); + if (longMax == -1) { + // If it was -1 that means it has been written to, so in this case just assume it expired, but + // was overwritten by a concurrent write + return Boolean.TRUE; + } else { + // If it wasn't -1 it has to be > 0 so send that update + UpdateLastAccessCommand ulac = cf.buildUpdateLastAccessCommand(key, longMax); + ulac.setTopologyId(rpcManager.getTopologyId()); + CompletionStage updateState = rpcManager.invokeCommand(info.readOwners(), ulac, + MapResponseCollector.ignoreLeavers(info.writeOwners().size()), rpcManager.getSyncRpcOptions()); + ulac.inject(dataContainer); + // We update locally as well + ulac.invokeAsync(); + return asyncValue(updateState).thenApply(rCtx, rCommand, (rCtx2, rCommand2, ignore) -> Boolean.FALSE); + } + } else { + if (trace) { + log.tracef("No node has a non expired max idle time for key %s, proceeding to remove entry", key); + } + RemoveExpiredCommand realRemoveCommand; + if (!ctx.isOriginLocal()) { + // Have to build a new command since the command id points to the originating node - causes + // issues with triangle since it needs to know the originating node to respond to + realRemoveCommand = cf.buildRemoveExpiredCommand(key, command.getValue()); + realRemoveCommand.setTopologyId(rpcManager.getTopologyId()); + } else { + realRemoveCommand = command; + } + + return makeStage(visitRemoveCommand(ctx, realRemoveCommand)).thenApply(rCtx, rCommand, + (rCtx2, rCommand2, ignore) -> Boolean.TRUE); + } + }); + } else if (ctx.isOriginLocal()) { + // Primary has to handle max idle removal + return invokeRemotely(ctx, command, info.primary()); + } else { + return invokeNext(ctx, command); + } + } + } } diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java index a39576e388eb..1ad6a30efbb3 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/ScatteredDistributionInterceptor.java @@ -36,6 +36,8 @@ import org.infinispan.commands.remote.ClusteredGetAllCommand; import org.infinispan.commands.remote.ClusteredGetCommand; import org.infinispan.commands.remote.GetKeysInGroupCommand; +import org.infinispan.commands.remote.expiration.RetrieveLastAccessCommand; +import org.infinispan.commands.remote.expiration.UpdateLastAccessCommand; import org.infinispan.commands.write.ClearCommand; import org.infinispan.commands.write.ComputeCommand; import org.infinispan.commands.write.ComputeIfAbsentCommand; @@ -43,6 +45,7 @@ import org.infinispan.commands.write.PutKeyValueCommand; import org.infinispan.commands.write.PutMapCommand; import org.infinispan.commands.write.RemoveCommand; +import org.infinispan.commands.write.RemoveExpiredCommand; import org.infinispan.commands.write.ReplaceCommand; import org.infinispan.commands.write.ValueMatcher; import org.infinispan.commands.write.WriteCommand; @@ -61,7 +64,6 @@ import org.infinispan.context.InvocationContext; import org.infinispan.context.impl.FlagBitSets; import org.infinispan.distribution.DistributionInfo; -import org.infinispan.distribution.DistributionManager; import org.infinispan.distribution.LocalizedCacheTopology; import org.infinispan.distribution.ch.ConsistentHash; import org.infinispan.distribution.ch.KeyPartitioner; @@ -118,7 +120,6 @@ public class ScatteredDistributionInterceptor extends ClusteringInterceptor { @Inject protected CacheNotifier cacheNotifier; @Inject protected FunctionalNotifier functionalNotifier; @Inject protected KeyPartitioner keyPartitioner; - @Inject protected DistributionManager distributionManager; private volatile Address cachedNextMember; private volatile int cachedNextMemberTopology = -1; @@ -609,6 +610,61 @@ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) t return handleWriteCommand(ctx, command); } + @Override + public Object visitRemoveExpiredCommand(InvocationContext ctx, RemoveExpiredCommand command) throws Throwable { + if (!command.isMaxIdle() || isLocalModeForced(command)) { + return visitRemoveCommand(ctx, command); + } + + RepeatableReadEntry cacheEntry = (RepeatableReadEntry) ctx.lookupEntry(command.getKey()); + + LocalizedCacheTopology cacheTopology = checkTopology(command); + Object key = command.getKey(); + DistributionInfo info = cacheTopology.getDistribution(key); + if (info.primary() == null) { + throw new OutdatedTopologyException(cacheTopology.getTopologyId() + 1); + } + + if (ctx.isOriginLocal()) { + if (info.isPrimary()) { + // If primary originated it that means it is removed + return visitRemoveCommand(ctx, command); + } else { + // We are a backup, we just ask the primary what the last access time was + RetrieveLastAccessCommand rlac = cf.buildRetrieveLastAccessCommand(key, command.getValue()); + rlac.setTopologyId(rpcManager.getTopologyId()); + CompletionStage completionStage = rpcManager.invokeCommand(info.primary(), rlac, + new SingleResponseCollector(), rpcManager.getSyncRpcOptions()) + .thenApply(vr -> (Long) vr.getResponseValue()); + return asyncValue(completionStage).thenApply(ctx, command, (rCtx, rCommand, access) -> { + // Make sure to add the entry to the context if it wasn't there before - which is likely since this + // isn't the primary owner + RepeatableReadEntry contextEntry = cacheEntry; + if (cacheEntry == null) { + entryFactory.wrapExternalEntry(ctx, key, null, false, true); + contextEntry = (RepeatableReadEntry) ctx.lookupEntry(key); + } + // If it responded with a time we assume it wasn't expired + if (access != null) { + UpdateLastAccessCommand ulac = cf.buildUpdateLastAccessCommand(key, (long) access); + ulac.inject(dataContainer); + // This command doesn't block + ulac.invokeAsync().join(); + // Make sure to notify other interceptors the command failed + command.fail(); + return Boolean.FALSE; + } else { + // Go ahead with removal now + return makeStage(commitSingleEntryOnReturn(ctx, command, contextEntry, null)).thenApply(ctx, command, + (rCtx2, rCommand2, ignore) -> Boolean.TRUE); + } + }); + } + } else { + throw new IllegalStateException("Remove expired should never be replicated!"); + } + } + @Override public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable { return handleWriteCommand(ctx, command); diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/TriangleDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/TriangleDistributionInterceptor.java index ba552e5a4500..84858987821b 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/TriangleDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/TriangleDistributionInterceptor.java @@ -389,7 +389,9 @@ private Object handleSingleKeyWriteCommand(Invocati if (distributionInfo.isPrimary()) { assert context.lookupEntry(command.getKey()) != null; - return context.isOriginLocal() ? + // Not only if the context is local, but also if the command id is local (this occurs with newly created + // commands in response to a remote request (ie. RemoveExpiredCommannd) + return context.isOriginLocal() || command.getCommandInvocationId().getAddress().equals(localAddress) ? localPrimaryOwnerWrite(context, command, distributionInfo, backupBuilder) : remotePrimaryOwnerWrite(context, command, distributionInfo, backupBuilder); } else if (distributionInfo.isWriteBackup()) { diff --git a/core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java index af79147f2116..e69d2f4deed4 100644 --- a/core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java @@ -30,6 +30,7 @@ import org.infinispan.commands.functional.WriteOnlyKeyValueCommand; import org.infinispan.commands.functional.WriteOnlyManyCommand; import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand; +import org.infinispan.commands.remote.expiration.UpdateLastAccessCommand; import org.infinispan.commands.tx.CommitCommand; import org.infinispan.commands.tx.PrepareCommand; import org.infinispan.commands.tx.RollbackCommand; @@ -41,6 +42,7 @@ import org.infinispan.commands.write.PutKeyValueCommand; import org.infinispan.commands.write.PutMapCommand; import org.infinispan.commands.write.RemoveCommand; +import org.infinispan.commands.write.RemoveExpiredCommand; import org.infinispan.commands.write.ReplaceCommand; import org.infinispan.commands.write.ValueMatcher; import org.infinispan.commands.write.WriteCommand; @@ -136,6 +138,29 @@ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) t return handleTxWriteCommand(ctx, command, command.getKey()); } + @Override + public Object visitRemoveExpiredCommand(InvocationContext ctx, RemoveExpiredCommand command) throws Throwable { + if (ctx.isOriginLocal() && command.isMaxIdle()) { + Object key = command.getKey(); + CompletableFuture completableFuture = expirationManager.retrieveLastAccess(key, null); + return asyncValue(completableFuture).thenApply(ctx, command, (rCtx, rCommand, max) -> { + if (max == null) { + // If there was no max value just remove the entry as normal + return handleTxWriteCommand(ctx, command, command.getKey()); + } + // If max was returned update our time with it, so we don't query again + UpdateLastAccessCommand ulac = cf.buildUpdateLastAccessCommand(key, (long) max); + ulac.inject(dataContainer); + // This command doesn't block + ulac.invokeAsync().join(); + // Make sure to notify other interceptors the command failed + command.fail(); + return Boolean.FALSE; + }); + } + return handleTxWriteCommand(ctx, command, command.getKey()); + } + @Override public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { if (command.hasAnyFlag(FlagBitSets.PUT_FOR_EXTERNAL_READ)) { diff --git a/core/src/main/java/org/infinispan/interceptors/impl/ClusteringInterceptor.java b/core/src/main/java/org/infinispan/interceptors/impl/ClusteringInterceptor.java index 6835a28c9140..95f204f8f787 100644 --- a/core/src/main/java/org/infinispan/interceptors/impl/ClusteringInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/impl/ClusteringInterceptor.java @@ -8,6 +8,7 @@ import org.infinispan.commands.CommandsFactory; import org.infinispan.container.DataContainer; import org.infinispan.container.EntryFactory; +import org.infinispan.distribution.DistributionManager; import org.infinispan.factories.annotations.Inject; import org.infinispan.remoting.responses.CacheNotFoundResponse; import org.infinispan.remoting.responses.Response; @@ -31,6 +32,7 @@ public abstract class ClusteringInterceptor extends BaseRpcInterceptor { @Inject protected LockManager lockManager; @Inject protected DataContainer dataContainer; @Inject protected StateTransferManager stateTransferManager; + @Inject protected DistributionManager distributionManager; protected static Response getSingleResponse(Map responseMap) { Iterator it = responseMap.values().iterator(); diff --git a/core/src/main/java/org/infinispan/interceptors/impl/EntryWrappingInterceptor.java b/core/src/main/java/org/infinispan/interceptors/impl/EntryWrappingInterceptor.java index cb66ef8a8f65..22be07ea9e5a 100644 --- a/core/src/main/java/org/infinispan/interceptors/impl/EntryWrappingInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/impl/EntryWrappingInterceptor.java @@ -41,6 +41,7 @@ import org.infinispan.commands.write.PutKeyValueCommand; import org.infinispan.commands.write.PutMapCommand; import org.infinispan.commands.write.RemoveCommand; +import org.infinispan.commands.write.RemoveExpiredCommand; import org.infinispan.commands.write.ReplaceCommand; import org.infinispan.commands.write.WriteCommand; import org.infinispan.configuration.cache.Configurations; @@ -353,6 +354,12 @@ public final Object visitRemoveCommand(InvocationContext ctx, RemoveCommand comm return setSkipRemoteGetsAndInvokeNextForDataCommand(ctx, command); } + @Override + public Object visitRemoveExpiredCommand(InvocationContext ctx, RemoveExpiredCommand command) throws Throwable { + entryFactory.wrapEntryForExpired(ctx, command.getKey(), ignoreOwnership(command) || canRead(command.getKey())); + return setSkipRemoteGetsAndInvokeNextForDataCommand(ctx, command); + } + @Override public final Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable { diff --git a/core/src/main/java/org/infinispan/interceptors/impl/TransactionalExceptionEvictionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/impl/TransactionalExceptionEvictionInterceptor.java index eae6d950e603..a770947640a7 100644 --- a/core/src/main/java/org/infinispan/interceptors/impl/TransactionalExceptionEvictionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/impl/TransactionalExceptionEvictionInterceptor.java @@ -105,7 +105,11 @@ public void start() { public void entryExpired(CacheEntryExpiredEvent event) { // If this is null it means it was from the store, so we don't care about that if (event.getValue() != null) { - increaseSize(- calculator.calculateSize(event.getKey(), event.getValue(), event.getMetadata())); + Object key = event.getKey(); + if (isTrace) { + log.tracef("Key %s found to have expired", key); + } + increaseSize(- calculator.calculateSize(key, event.getValue(), event.getMetadata())); } } @@ -174,23 +178,29 @@ public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand comman InternalCacheEntry containerEntry = container.peek(key); Object value = containerEntry != null ? containerEntry.getValue() : null; if (value != null) { + if (isTrace) { + log.tracef("Key %s was removed", key); + } changeAmount -= calculator.calculateSize(key, value, entry.getMetadata()); } } else { + // We check the container directly - this is to handle entries that are expired as the command + // won't think it replaced a value + InternalCacheEntry containerEntry = container.peek(key); + if (isTrace) { + log.tracef("Key %s was put into cache, replacing existing %s", key, containerEntry != null); + } // Create and replace both add for the new value changeAmount += calculator.calculateSize(key, entry.getValue(), entry.getMetadata()); - if (!entry.isCreated()) { - // Need to subtract old value here - InternalCacheEntry containerEntry = container.peek(key); - if (containerEntry != null) { - changeAmount -= calculator.calculateSize(key, containerEntry.getValue(), containerEntry.getMetadata()); - } + // Need to subtract old value here + if (containerEntry != null) { + changeAmount -= calculator.calculateSize(key, containerEntry.getValue(), containerEntry.getMetadata()); } } } } - if (!increaseSize(changeAmount)) { + if (changeAmount != 0 && !increaseSize(changeAmount)) { throw log.containerFull(maxSize); } diff --git a/core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderDistributionInterceptor.java index c82ba977267d..82c3c34867cc 100644 --- a/core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderDistributionInterceptor.java @@ -73,7 +73,7 @@ protected CompletionStage prepareOnAffectedNodes(TxInvocationContext @Override protected LocalizedCacheTopology checkTopologyId(TopologyAffectedCommand command) { // TODO Remove this and catch OutdatedTopologyException in TotalOrderStateTransferInterceptor - LocalizedCacheTopology cacheTopology = dm.getCacheTopology(); + LocalizedCacheTopology cacheTopology = distributionManager.getCacheTopology(); int currentTopologyId = cacheTopology.getTopologyId(); int cmdTopology = command.getTopologyId(); if (trace) { diff --git a/core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderVersionedDistributionInterceptor.java b/core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderVersionedDistributionInterceptor.java index de04c3b07891..3bbeda2a144e 100644 --- a/core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderVersionedDistributionInterceptor.java +++ b/core/src/main/java/org/infinispan/interceptors/totalorder/TotalOrderVersionedDistributionInterceptor.java @@ -77,7 +77,7 @@ protected CompletionStage prepareOnAffectedNodes(TxInvocationContext @Override protected LocalizedCacheTopology checkTopologyId(TopologyAffectedCommand command) { - LocalizedCacheTopology cacheTopology = dm.getCacheTopology(); + LocalizedCacheTopology cacheTopology = distributionManager.getCacheTopology(); int currentTopologyId = cacheTopology.getTopologyId(); int cmdTopology = command.getTopologyId(); if (trace) { diff --git a/core/src/main/java/org/infinispan/marshall/exts/CacheRpcCommandExternalizer.java b/core/src/main/java/org/infinispan/marshall/exts/CacheRpcCommandExternalizer.java index c35ecc012bec..450b30db8c4d 100644 --- a/core/src/main/java/org/infinispan/marshall/exts/CacheRpcCommandExternalizer.java +++ b/core/src/main/java/org/infinispan/marshall/exts/CacheRpcCommandExternalizer.java @@ -16,6 +16,8 @@ import org.infinispan.commands.remote.RenewBiasCommand; import org.infinispan.commands.remote.RevokeBiasCommand; import org.infinispan.commands.remote.SingleRpcCommand; +import org.infinispan.commands.remote.expiration.RetrieveLastAccessCommand; +import org.infinispan.commands.remote.expiration.UpdateLastAccessCommand; import org.infinispan.commands.remote.recovery.CompleteTransactionCommand; import org.infinispan.commands.remote.recovery.GetInDoubtTransactionsCommand; import org.infinispan.commands.remote.recovery.GetInDoubtTxInfoCommand; @@ -95,7 +97,8 @@ public Set> getTypeClasses() { MultiKeyFunctionalBackupWriteCommand.class, InvalidateVersionsCommand.class, StreamIteratorRequestCommand.class, StreamIteratorNextCommand.class, StreamIteratorCloseCommand.class, - RevokeBiasCommand.class, RenewBiasCommand.class); + RevokeBiasCommand.class, RenewBiasCommand.class, RetrieveLastAccessCommand.class, + UpdateLastAccessCommand.class); // Only interested in cache specific replicable commands coreCommands.addAll(gcr.getModuleProperties().moduleCacheRpcCommands()); return coreCommands; diff --git a/core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java b/core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java index 146e565cba49..143dc912cdcb 100644 --- a/core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java +++ b/core/src/main/java/org/infinispan/persistence/manager/PersistenceManagerImpl.java @@ -406,9 +406,7 @@ public void purgeExpired() { ((AdvancedCacheExpirationWriter)writer).purge(persistenceExecutor, advancedListener); } else if (writer instanceof AdvancedCacheWriter) { //noinspection unchecked - ((AdvancedCacheWriter)writer).purge(persistenceExecutor, key -> { - expirationManager.handleInStoreExpiration(key); - }); + ((AdvancedCacheWriter)writer).purge(persistenceExecutor, advancedListener); } }; nonTxWriters.forEach(purgeWriter); diff --git a/core/src/main/java/org/infinispan/security/impl/SecureCacheImpl.java b/core/src/main/java/org/infinispan/security/impl/SecureCacheImpl.java index a5412df16e93..b26daaca6d75 100644 --- a/core/src/main/java/org/infinispan/security/impl/SecureCacheImpl.java +++ b/core/src/main/java/org/infinispan/security/impl/SecureCacheImpl.java @@ -646,9 +646,15 @@ public LockedStream lockedStream() { } @Override - public void removeExpired(K key, V value, Long lifespan) { + public CompletableFuture removeLifespanExpired(K key, V value, Long lifespan) { authzManager.checkPermission(subject, AuthorizationPermission.WRITE); - delegate.removeExpired(key, value, lifespan); + return delegate.removeLifespanExpired(key, value, lifespan); + } + + @Override + public CompletableFuture removeMaxIdleExpired(K key, V value) { + authzManager.checkPermission(subject, AuthorizationPermission.WRITE); + return delegate.removeMaxIdleExpired(key, value); } @Override diff --git a/core/src/main/java/org/infinispan/util/concurrent/CompletableFutures.java b/core/src/main/java/org/infinispan/util/concurrent/CompletableFutures.java index 9dfec94fc663..c53cc720c2a2 100644 --- a/core/src/main/java/org/infinispan/util/concurrent/CompletableFutures.java +++ b/core/src/main/java/org/infinispan/util/concurrent/CompletableFutures.java @@ -20,6 +20,8 @@ */ public class CompletableFutures { + private static final CompletableFuture completedTrueFuture = CompletableFuture.completedFuture(Boolean.TRUE); + private static final CompletableFuture completedFalseFuture = CompletableFuture.completedFuture(Boolean.FALSE); private static final CompletableFuture completedEmptyMapFuture = CompletableFuture.completedFuture(Collections.emptyMap()); private static final CompletableFuture completedNullFuture = CompletableFuture.completedFuture(null); private static final long BIG_DELAY_NANOS = TimeUnit.DAYS.toNanos(1); @@ -34,6 +36,14 @@ public static CompletableFuture completedNull() { return completedNullFuture; } + public static CompletableFuture completedTrue() { + return completedTrueFuture; + } + + public static CompletableFuture completedFalse() { + return completedFalseFuture; + } + public static CompletableFuture> sequence(List> futures) { CompletableFuture all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); return all.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); diff --git a/core/src/test/java/org/infinispan/container/SimpleDataContainerTest.java b/core/src/test/java/org/infinispan/container/SimpleDataContainerTest.java index 07be56fa412f..c61d74839259 100644 --- a/core/src/test/java/org/infinispan/container/SimpleDataContainerTest.java +++ b/core/src/test/java/org/infinispan/container/SimpleDataContainerTest.java @@ -26,6 +26,7 @@ import org.infinispan.test.TestingUtil; import org.infinispan.util.ControlledTimeService; import org.infinispan.util.CoreImmutables; +import org.infinispan.util.concurrent.CompletableFutures; import org.mockito.Mockito; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -54,7 +55,10 @@ protected DataContainer createContainer() { TestingUtil.inject(internalEntryFactory, timeService); ActivationManager activationManager = mock(ActivationManager.class); doNothing().when(activationManager).onUpdate(Mockito.any(), Mockito.anyBoolean()); - TestingUtil.inject(dc, internalEntryFactory, activationManager, timeService, mock(ExpirationManager.class)); + ExpirationManager expirationManager = mock(ExpirationManager.class); + Mockito.when(expirationManager.entryExpiredInMemory(Mockito.any(), Mockito.anyLong())).thenReturn(CompletableFutures.completedTrue()); + Mockito.when(expirationManager.entryExpiredInMemoryFromIteration(Mockito.any(), Mockito.anyLong())).thenReturn(CompletableFutures.completedTrue()); + TestingUtil.inject(dc, internalEntryFactory, activationManager, timeService, expirationManager); return dc; } diff --git a/core/src/test/java/org/infinispan/eviction/impl/ExceptionEvictionTest.java b/core/src/test/java/org/infinispan/eviction/impl/ExceptionEvictionTest.java index 778ea6cedcfa..b4ac9cc6f586 100644 --- a/core/src/test/java/org/infinispan/eviction/impl/ExceptionEvictionTest.java +++ b/core/src/test/java/org/infinispan/eviction/impl/ExceptionEvictionTest.java @@ -40,6 +40,7 @@ import org.infinispan.util.ControlledTimeService; import org.infinispan.util.TimeService; import org.testng.annotations.AfterMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** @@ -404,24 +405,87 @@ public void testExceptionWithCommitMultipleEntries() throws SystemException, Not } } + @DataProvider(name = "expiration") + public Object[][] expirationParams() { + return new Object[][] { + { true, true }, + { true, false }, + { false, true }, + { false, false } + }; + } + + @Test(dataProvider = "expiration") + public void testEntryExpirationOverwritten(boolean maxIdle, boolean readInTx) throws Exception { + Object expiringKey = 0; + if (maxIdle) { + cache(0).put(expiringKey, 0, -1, null, 10, TimeUnit.SECONDS); + } else { + cache(0).put(expiringKey, 0, 10, TimeUnit.SECONDS); + } + // Note that i starts at 1 so this adds SIZE - 1 entries + for (int i = 1; i < SIZE; ++i) { + cache(0).put(i, i); + } + + timeService.advance(TimeUnit.SECONDS.toMillis(11)); + + if (readInTx) { + TestingUtil.withTx(cache(0).getAdvancedCache().getTransactionManager(), () -> { + // This should eventually expire all entries + assertNull(cache(0).get(expiringKey)); + return null; + }); + } else { + // Make sure that it is updated outside of tx as well + assertNull(cache(0).get(expiringKey)); + } + + // We overwrite the existing key - which should work + cache(0).put(expiringKey, 0); + + // This should fail now as we are back to full again + try { + cache(0).put(-1, -1); + fail("Should have thrown an exception!"); + } catch (Throwable t) { + Exceptions.assertException(ContainerFullException.class, getMostNestedSuppressedThrowable(t)); + } + } + /** * This tests to verify that when an entry is expired and removed from the data container that it properly updates * the current count */ - public void testOnEntryExpiration() { - cache(0).put(0, 0, 10, TimeUnit.SECONDS); - + @Test(dataProvider = "expiration") + public void testEntryExpiration(boolean maxIdle, boolean readInTx) throws Exception { + Object expiringKey = 0; + if (maxIdle) { + cache(0).put(expiringKey, 0, -1, null, 10, TimeUnit.SECONDS); + } else { + cache(0).put(expiringKey, 0, 10, TimeUnit.SECONDS); + } + // Note that i starts at 1 so this adds SIZE - 1 entries for (int i = 1; i < SIZE; ++i) { cache(0).put(i, i); } timeService.advance(TimeUnit.SECONDS.toMillis(11)); - // This should eventually expire all entries - assertNull(cache(0).get(0)); + if (readInTx) { + TestingUtil.withTx(cache(0).getAdvancedCache().getTransactionManager(), () -> { + // This should eventually expire all entries + assertNull(cache(0).get(expiringKey)); + return null; + }); + } else { + // Make sure that it is updated outside of tx as well + assertNull(cache(0).get(expiringKey)); + } // Off heap doesn't expire entries on access yet ISPN-8380 - if (storageType == StorageType.OFF_HEAP) { + // Also max idle in a transaction don't remove entries until reaper runs + if (storageType == StorageType.OFF_HEAP || maxIdle) { for (Cache cache : caches()) { ExpirationManager em = TestingUtil.extractComponent(cache, ExpirationManager.class); em.processExpiration(); @@ -431,7 +495,7 @@ public void testOnEntryExpiration() { // Entry should be completely removed at some point - note that expired entries, that haven't been removed, still // count against counts for (Cache cache : caches()) { - eventually(() -> cache.getAdvancedCache().getDataContainer().peek(0) == null); + eventually(() -> cache.getAdvancedCache().getDataContainer().peek(expiringKey) == null); } // This insert should work now diff --git a/core/src/test/java/org/infinispan/expiration/impl/ClusterExpirationFunctionalTest.java b/core/src/test/java/org/infinispan/expiration/impl/ClusterExpirationFunctionalTest.java index 1feda49f1e9f..3b21ffd020cc 100644 --- a/core/src/test/java/org/infinispan/expiration/impl/ClusterExpirationFunctionalTest.java +++ b/core/src/test/java/org/infinispan/expiration/impl/ClusterExpirationFunctionalTest.java @@ -1,19 +1,28 @@ package org.infinispan.expiration.impl; import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; +import static org.testng.AssertJUnit.assertTrue; import java.lang.invoke.MethodHandles; +import java.util.Arrays; +import java.util.Map; import java.util.concurrent.TimeUnit; +import org.infinispan.AdvancedCache; import org.infinispan.Cache; +import org.infinispan.commons.util.CloseableIterator; import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.container.entries.CacheEntry; import org.infinispan.context.Flag; import org.infinispan.distribution.MagicKey; import org.infinispan.test.MultipleCacheManagersTest; import org.infinispan.test.TestingUtil; import org.infinispan.test.fwk.InCacheMode; +import org.infinispan.test.fwk.InTransactionMode; +import org.infinispan.transaction.TransactionMode; import org.infinispan.util.ControlledTimeService; import org.infinispan.util.TimeService; import org.infinispan.util.logging.Log; @@ -27,7 +36,8 @@ * @since 8.0 */ @Test(groups = "functional", testName = "expiration.impl.ClusterExpirationFunctionalTest") -@InCacheMode({CacheMode.DIST_SYNC, CacheMode.REPL_SYNC, CacheMode.SCATTERED_SYNC}) +@InCacheMode({CacheMode.DIST_SYNC, CacheMode.REPL_SYNC}) +@InTransactionMode({TransactionMode.NON_TRANSACTIONAL, TransactionMode.TRANSACTIONAL}) public class ClusterExpirationFunctionalTest extends MultipleCacheManagersTest { protected static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass()); @@ -44,6 +54,7 @@ public class ClusterExpirationFunctionalTest extends MultipleCacheManagersTest { protected void createCacheManagers() throws Throwable { ConfigurationBuilder builder = new ConfigurationBuilder(); builder.clustering().cacheMode(cacheMode); + builder.transaction().transactionMode(transactionMode()); createCluster(builder, 3); waitForClusterToForm(); injectTimeServices(); @@ -63,15 +74,15 @@ protected void injectTimeServices() { TestingUtil.replaceComponent(manager(2), TimeService.class, ts2, true); } - public void testExpiredOnPrimaryOwner() throws Exception { - testExpiredEntryRetrieval(cache0, cache1, ts0, true); + public void testLifespanExpiredOnPrimaryOwner() throws Exception { + testLifespanExpiredEntryRetrieval(cache0, cache1, ts0, true); } - public void testExpiredOnBackupOwner() throws Exception { - testExpiredEntryRetrieval(cache0, cache1, ts1, false); + public void testLifespanExpiredOnBackupOwner() throws Exception { + testLifespanExpiredEntryRetrieval(cache0, cache1, ts1, false); } - private void testExpiredEntryRetrieval(Cache primaryOwner, Cache backupOwner, + private void testLifespanExpiredEntryRetrieval(Cache primaryOwner, Cache backupOwner, ControlledTimeService timeService, boolean expireOnPrimary) throws Exception { MagicKey key = createKey(primaryOwner, backupOwner); primaryOwner.put(key, key.toString(), 10, TimeUnit.MINUTES); @@ -94,7 +105,7 @@ private void testExpiredEntryRetrieval(Cache primaryOwner, Cache Cache other; if (cacheMode.isScattered()) { - // In scattered cache the read would go + // In scattered cache the read would go to primary always other = otherCache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_OWNERSHIP_CHECK); } else { other = otherCache; @@ -122,7 +133,7 @@ private MagicKey createKey(Cache primaryOwner, Cache primaryOwner = cache0.getAdvancedCache(); + AdvancedCache backupOwner = cache1.getAdvancedCache(); + MagicKey key = createKey(primaryOwner, backupOwner); + primaryOwner.put(key, key.toString(), -1, null, 10, TimeUnit.MINUTES); + + assertEquals(key.toString(), primaryOwner.get(key)); + assertEquals(key.toString(), backupOwner.get(key)); + + AdvancedCache expiredCache; + AdvancedCache otherCache; + if (expireOnPrimary) { + expiredCache = primaryOwner; + otherCache = backupOwner; + } else { + expiredCache = backupOwner; + otherCache = primaryOwner; + } + + // We don't want to go remote on accident + expiredCache = expiredCache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_OWNERSHIP_CHECK); + otherCache = otherCache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_OWNERSHIP_CHECK); + + // Now we increment it a bit and force an access on the node that it doesn't expire on + incrementAllTimeServices(5, TimeUnit.MINUTES); + assertNotNull(otherCache.get(key)); + + // Now increment enough to cause it to be expired on the other node that didn't access it + incrementAllTimeServices(6, TimeUnit.MINUTES); + + if (cacheMode == CacheMode.SCATTERED_SYNC) { + // Scattered cache doesn't report last access time via getCacheEntry so we just verify the entry + // was removed only if primary expired - since it controls everything + String expiredValue = expiredCache.get(key); + String otherValue = otherCache.get(key); + if (expireOnPrimary) { + assertNull(expiredValue); + assertNull(otherValue); + } else { + assertNotNull(expiredValue); + assertNotNull(otherValue); + } + } else { + long targetTime = ts0.wallClockTime(); + // Now both nodes should return the value + CacheEntry ce = otherCache.getCacheEntry(key); + assertNotNull(ce); + // Transactional cache doesn't report last access times to user + if (transactional == Boolean.FALSE) { + assertEquals(targetTime, ce.getLastUsed()); + } + ce = expiredCache.getCacheEntry(key); + assertNotNull(ce); + if (transactional == Boolean.FALSE) { + assertEquals(targetTime, ce.getLastUsed()); + } + } + } + + private void testMaxIdleExpireExpireIteration(boolean expireOnPrimary, boolean iterateOnPrimary) { + // Cache0 is always the primary and cache1 is backup + MagicKey key = createKey(cache0, cache1); + cache1.put(key, key.toString(), -1, null, 10, TimeUnit.SECONDS); + + ControlledTimeService expiredTimeService; + if (expireOnPrimary) { + expiredTimeService = ts0; + } else { + expiredTimeService = ts1; + } + expiredTimeService.advance(TimeUnit.SECONDS.toMillis(11)); + + Cache cacheToIterate; + if (iterateOnPrimary) { + cacheToIterate = cache0; + } else { + cacheToIterate = cache1; + } + + // Iteration always works with max idle expired entries + try (CloseableIterator> iterator = cacheToIterate.entrySet().iterator()) { + assertTrue(iterator.hasNext()); + Map.Entry entry = iterator.next(); + assertEquals(key, entry.getKey()); + assertEquals(key.toString(), entry.getValue()); + } + } + + public void testMaxIdleExpirePrimaryIteratePrimary() { + testMaxIdleExpireExpireIteration(true, true); + } + + public void testMaxIdleExpireBackupIteratePrimary() { + testMaxIdleExpireExpireIteration(false, true); + } + + public void testMaxIdleExpirePrimaryIterateBackup() { + testMaxIdleExpireExpireIteration(true, false); + } + + public void testMaxIdleExpireBackupIterateBackup() { + testMaxIdleExpireExpireIteration(false, false); + } } diff --git a/core/src/test/java/org/infinispan/expiration/impl/ClusterExpirationLoaderFunctionalTest.java b/core/src/test/java/org/infinispan/expiration/impl/ClusterExpirationLoaderFunctionalTest.java index 01a694c32d17..0d139d764e8c 100644 --- a/core/src/test/java/org/infinispan/expiration/impl/ClusterExpirationLoaderFunctionalTest.java +++ b/core/src/test/java/org/infinispan/expiration/impl/ClusterExpirationLoaderFunctionalTest.java @@ -4,6 +4,8 @@ import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder; import org.infinispan.test.fwk.InCacheMode; +import org.infinispan.test.fwk.InTransactionMode; +import org.infinispan.transaction.TransactionMode; import org.testng.annotations.Test; /** @@ -13,7 +15,8 @@ * @since 8.0 */ @Test(groups = "functional", testName = "expiration.impl.ClusterExpirationLoaderFunctionalTest") -@InCacheMode({CacheMode.DIST_SYNC, CacheMode.REPL_SYNC, CacheMode.SCATTERED_SYNC}) +@InCacheMode({CacheMode.DIST_SYNC, CacheMode.REPL_SYNC}) +@InTransactionMode({TransactionMode.NON_TRANSACTIONAL, TransactionMode.TRANSACTIONAL}) public class ClusterExpirationLoaderFunctionalTest extends ClusterExpirationFunctionalTest { @Override protected void createCluster(ConfigurationBuilder builder, int count) { diff --git a/core/src/test/java/org/infinispan/expiration/impl/ExpirationFunctionalTest.java b/core/src/test/java/org/infinispan/expiration/impl/ExpirationFunctionalTest.java index 0eb0a1a2b8de..be57b71af372 100644 --- a/core/src/test/java/org/infinispan/expiration/impl/ExpirationFunctionalTest.java +++ b/core/src/test/java/org/infinispan/expiration/impl/ExpirationFunctionalTest.java @@ -137,11 +137,10 @@ public void testExpirationMaxIdleInExec() throws Exception { } public void testExpiredEntriesCleared() { - for (int i = 0; i < 2; i++) { - cache.put("key-" + i, "value-" + i,-1, null, i, TimeUnit.MILLISECONDS); - } + cache.put("key-" + 0, "value-" + 1, -1, null, 0, TimeUnit.MILLISECONDS); + cache.put("key-" + 1, "value-" + 1, -1, null, 1, TimeUnit.MILLISECONDS); - // This should expire approximately half of the entries + // This should expire 1 of the entries timeService.advance(1); cache.clear(); diff --git a/core/src/test/java/org/infinispan/expiration/impl/ExpirationStoreListenerFunctionalTest.java b/core/src/test/java/org/infinispan/expiration/impl/ExpirationStoreListenerFunctionalTest.java index f0981c2699b1..c0597abb7e1b 100644 --- a/core/src/test/java/org/infinispan/expiration/impl/ExpirationStoreListenerFunctionalTest.java +++ b/core/src/test/java/org/infinispan/expiration/impl/ExpirationStoreListenerFunctionalTest.java @@ -41,8 +41,13 @@ public void testSimpleExpirationLifespan() throws Exception { @Override public void testSimpleExpirationMaxIdle() throws Exception { - super.testSimpleExpirationMaxIdle(); + for (int i = 0; i < SIZE; i++) { + cache.put("key-" + i, "value-" + i,-1, null, 1, TimeUnit.MILLISECONDS); + } + timeService.advance(2); + // We have to process expiration for store and max idle manager.processExpiration(); + assertEquals(0, cache.size()); assertExpiredEvents(SIZE); } diff --git a/core/src/test/java/org/infinispan/expiration/impl/ScatteredExpirationFunctionalTest.java b/core/src/test/java/org/infinispan/expiration/impl/ScatteredExpirationFunctionalTest.java new file mode 100644 index 000000000000..0449745448e3 --- /dev/null +++ b/core/src/test/java/org/infinispan/expiration/impl/ScatteredExpirationFunctionalTest.java @@ -0,0 +1,19 @@ +package org.infinispan.expiration.impl; + +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.test.fwk.InCacheMode; +import org.infinispan.test.fwk.InTransactionMode; +import org.infinispan.transaction.TransactionMode; +import org.testng.annotations.Test; + +/** + * Tests to make sure that when expiration occurs it occurs across the cluster in a scattered cache. + * This test is needed since scattered cache only works with non transactional caches + * @author William Burns + * @since 9.3 + */ +@Test(groups = "functional", testName = "expiration.impl.ScatteredExpirationFunctionalTest") +@InCacheMode({CacheMode.SCATTERED_SYNC}) +@InTransactionMode({TransactionMode.NON_TRANSACTIONAL}) +public class ScatteredExpirationFunctionalTest extends ClusterExpirationFunctionalTest { +} diff --git a/core/src/test/java/org/infinispan/expiration/impl/ScatteredExpirationLoaderFunctionalTest.java b/core/src/test/java/org/infinispan/expiration/impl/ScatteredExpirationLoaderFunctionalTest.java new file mode 100644 index 000000000000..f5e6df95f75f --- /dev/null +++ b/core/src/test/java/org/infinispan/expiration/impl/ScatteredExpirationLoaderFunctionalTest.java @@ -0,0 +1,27 @@ +package org.infinispan.expiration.impl; + +import org.infinispan.configuration.cache.CacheMode; +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.persistence.dummy.DummyInMemoryStoreConfigurationBuilder; +import org.infinispan.test.fwk.InCacheMode; +import org.infinispan.test.fwk.InTransactionMode; +import org.infinispan.transaction.TransactionMode; +import org.testng.annotations.Test; + +/** + * Tests to make sure that when expiration occurs it occurs across the cluster when a loader is in use and a scattered + * cache is in use + * + * @author William Burns + * @since 9.3 + */ +@Test(groups = "functional", testName = "expiration.impl.ClusterExpirationLoaderFunctionalTest") +@InCacheMode({CacheMode.SCATTERED_SYNC}) +@InTransactionMode({TransactionMode.NON_TRANSACTIONAL}) +public class ScatteredExpirationLoaderFunctionalTest extends ClusterExpirationFunctionalTest { + @Override + protected void createCluster(ConfigurationBuilder builder, int count) { + builder.persistence().addStore(DummyInMemoryStoreConfigurationBuilder.class); + super.createCluster(builder, count); + } +} diff --git a/core/src/test/java/org/infinispan/expiry/AutoCommitExpiryTest.java b/core/src/test/java/org/infinispan/expiry/AutoCommitExpiryTest.java index 39bd16b46f8c..f085e8bc5055 100644 --- a/core/src/test/java/org/infinispan/expiry/AutoCommitExpiryTest.java +++ b/core/src/test/java/org/infinispan/expiry/AutoCommitExpiryTest.java @@ -1,5 +1,7 @@ package org.infinispan.expiry; +import static org.testng.AssertJUnit.assertEquals; + import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -62,7 +64,7 @@ public void testNoAutCommitAndExpiryListener() throws SystemException, NotSuppor ExpirationManager manager = applicationCache.getAdvancedCache().getExpirationManager(); manager.processExpiration(); - eventually(() -> 2 == expiryListener.getCount()); + assertEquals(2, expiryListener.getCount()); } @Override diff --git a/core/src/test/java/org/infinispan/functional/decorators/FunctionalAdvancedCache.java b/core/src/test/java/org/infinispan/functional/decorators/FunctionalAdvancedCache.java index 3d55488cc0f3..dc08339547c2 100644 --- a/core/src/test/java/org/infinispan/functional/decorators/FunctionalAdvancedCache.java +++ b/core/src/test/java/org/infinispan/functional/decorators/FunctionalAdvancedCache.java @@ -534,8 +534,13 @@ public LockedStream lockedStream() { } @Override - public void removeExpired(K key, V value, Long lifespan) { - // TODO: Customise this generated block + public CompletableFuture removeLifespanExpired(K key, V value, Long lifespan) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture removeMaxIdleExpired(K key, V value) { + throw new UnsupportedOperationException(); } @Override diff --git a/core/src/test/java/org/infinispan/security/SecureCacheTestDriver.java b/core/src/test/java/org/infinispan/security/SecureCacheTestDriver.java index 8b97a871756f..9d3300d656c6 100644 --- a/core/src/test/java/org/infinispan/security/SecureCacheTestDriver.java +++ b/core/src/test/java/org/infinispan/security/SecureCacheTestDriver.java @@ -710,8 +710,13 @@ public void testCacheEntrySet(SecureCache cache) { } @TestCachePermission(AuthorizationPermission.WRITE) - public void testRemoveExpired_Object_Object_Long(SecureCache cache) { - cache.getAdvancedCache().removeExpired("a", "a", null); + public void testRemoveLifespanExpired_Object_Object_Long(SecureCache cache) { + cache.getAdvancedCache().removeLifespanExpired("a", "a", null); + } + + @TestCachePermission(AuthorizationPermission.WRITE) + public void testRemoveMaxIdleExpired_Object_Object(SecureCache cache) { + cache.getAdvancedCache().removeMaxIdleExpired("a", "a"); } @TestCachePermission(value = AuthorizationPermission.LIFECYCLE, needsSecurityManager = true) diff --git a/core/src/test/java/org/infinispan/stress/DataContainerStressTest.java b/core/src/test/java/org/infinispan/stress/DataContainerStressTest.java index a604c76261ff..51eb73a522ab 100644 --- a/core/src/test/java/org/infinispan/stress/DataContainerStressTest.java +++ b/core/src/test/java/org/infinispan/stress/DataContainerStressTest.java @@ -2,6 +2,7 @@ import java.util.Arrays; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; @@ -156,6 +157,11 @@ public void handleInStoreExpiration(MarshalledEntry marshalledEntry) { } + @Override + public CompletableFuture retrieveLastAccess(Object key, Object value) { + return null; + } + @Override public void registerWriteIncoming(Object key) { diff --git a/core/src/test/java/org/infinispan/util/mocks/ControlledCommandFactory.java b/core/src/test/java/org/infinispan/util/mocks/ControlledCommandFactory.java index f4d800952f87..5725e02ad86e 100644 --- a/core/src/test/java/org/infinispan/util/mocks/ControlledCommandFactory.java +++ b/core/src/test/java/org/infinispan/util/mocks/ControlledCommandFactory.java @@ -49,6 +49,8 @@ import org.infinispan.commands.remote.RenewBiasCommand; import org.infinispan.commands.remote.RevokeBiasCommand; import org.infinispan.commands.remote.SingleRpcCommand; +import org.infinispan.commands.remote.expiration.RetrieveLastAccessCommand; +import org.infinispan.commands.remote.expiration.UpdateLastAccessCommand; import org.infinispan.commands.remote.recovery.CompleteTransactionCommand; import org.infinispan.commands.remote.recovery.GetInDoubtTransactionsCommand; import org.infinispan.commands.remote.recovery.GetInDoubtTxInfoCommand; @@ -198,6 +200,21 @@ public RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, return actual.buildRemoveExpiredCommand(key, value, lifespan); } + @Override + public RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value) { + return actual.buildRemoveExpiredCommand(key, value); + } + + @Override + public RetrieveLastAccessCommand buildRetrieveLastAccessCommand(Object key, Object value) { + return actual.buildRetrieveLastAccessCommand(key, value); + } + + @Override + public UpdateLastAccessCommand buildUpdateLastAccessCommand(Object key, long accessTime) { + return actual.buildUpdateLastAccessCommand(key, accessTime); + } + @Override public ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object newValue, Metadata metadata, long flagsBitSet) { return actual.buildReplaceCommand(key, oldValue, newValue, metadata, flagsBitSet);