Skip to content

Commit

Permalink
ISPN-9003 Clustered maxIdle expiration
Browse files Browse the repository at this point in the history
* Initial implementation
  • Loading branch information
wburns authored and tristantarrant committed May 9, 2018
1 parent f8b2142 commit a044e5f
Show file tree
Hide file tree
Showing 49 changed files with 1,298 additions and 194 deletions.
25 changes: 24 additions & 1 deletion core/src/main/java/org/infinispan/AdvancedCache.java
Expand Up @@ -775,14 +775,37 @@ default Map<K, V> getAndPutAll(Map<? extends K, ? extends V> map) {
* <p>
* This command will only remove the value if the value and lifespan also match if provided.
* <p>
* This method will suspend any ongoing transaction and start a new one just for the invocation of this command. It
* is automatically committed or rolled back after the command completes, either successfully or via an exception.
* <p>
* NOTE: This method may be removed at any point including in a minor release and is not supported for external
* usage.
*
* @param key the key that is expiring
* @param value the value that mapped to the given. Null means it will match any value
* @param lifespan the lifespan that should match. If null is provided it will match any lifespan value
* @return if the entry was removed
*/
CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan);

/**
* Attempts to remove the entry for the given key, when it has expired due to max idle. This command first locks
* the key and then verifies that the entry has expired via maxIdle across all nodes. If it has this will then
* remove the given key.
* <p>
* This method returns a boolean when it has determined if the entry has expired. This is useful for when a backup
* node invokes this command for a get that found the entry expired. This way the node can return back to the caller
* much faster when the entry is not expired.
* <p>
* This method will suspend any ongoing transaction and start a new one just for the invocation of this command. It
* is automatically committed or rolled back after the command completes, either successfully or via an exception.
* <p>
* NOTE: This method may be removed at any point including in a minor release and is not supported for external
* usage.
* @param key the key that expired via max idle for the given entry
* @return if the entry was removed
*/
void removeExpired(K key, V value, Long lifespan);
CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value);

/**
* Performs any cache operations using the specified pair of {@link Encoder}.
Expand Down
Expand Up @@ -436,8 +436,13 @@ public LockedStream<K, V> lockedStream() {
}

@Override
public void removeExpired(K key, V value, Long lifespan) {
cache.removeExpired(key, value, lifespan);
public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan) {
return cache.removeLifespanExpired(key, value, lifespan);
}

@Override
public CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value) {
return cache.removeMaxIdleExpired(key, value);
}

@Override
Expand Down
47 changes: 41 additions & 6 deletions core/src/main/java/org/infinispan/cache/impl/CacheImpl.java
Expand Up @@ -176,6 +176,7 @@ public class CacheImpl<K, V> implements AdvancedCache<K, V> {
private boolean transactional;
private boolean batchingEnabled;
private final ContextBuilder contextBuilder = this::getInvocationContextWithImplicitTransaction;
private final ContextBuilder expiredContextBuilder = i -> this.getInvocationContextWithImplicitTransaction(i, true);
private final ContextBuilder pferContextBuilder = this::putForExternalReadContext;

public CacheImpl(String name) {
Expand Down Expand Up @@ -629,10 +630,31 @@ private RemoveCommand createRemoveCommand(Object key, long explicitFlags) {
}

@Override
public void removeExpired(K key, V value, Long lifespan) {
public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan) {
RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value, lifespan);
// Send an expired remove command to everyone
executeCommandAndCommitIfNeeded(contextBuilder, command, 1);
// Remove expired returns a boolean - just ignore it, the caller just needs to know that the expired
// entry is removed when this completes
CompletableFuture<Boolean> completableFuture = performRemoveExpiredCommand(command);
return completableFuture.thenApply(b -> null);
}

@Override
public CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value) {
RemoveExpiredCommand command = commandsFactory.buildRemoveExpiredCommand(key, value);
return performRemoveExpiredCommand(command);
}

private CompletableFuture<Boolean> performRemoveExpiredCommand(RemoveExpiredCommand command) {
Transaction ongoingTransaction = null;
try {
ongoingTransaction = suspendOngoingTransactionIfExists();
return executeCommandAndCommitIfNeededAsync(expiredContextBuilder, command, 1);
} catch (Exception e) {
if (log.isDebugEnabled()) log.debug("Caught exception while doing removeExpired()", e);
return CompletableFutures.completedExceptionFuture(e);
} finally {
resumePreviousOngoingTransaction(ongoingTransaction, true, "Had problems trying to resume a transaction after removeExpired()");
}
}

@Override
Expand Down Expand Up @@ -912,15 +934,28 @@ <C> void addFilteredListener(ListenerHolder listener,
}

/**
* If this is a transactional cache and autoCommit is set to true then starts a transaction if this is not a
* transactional call.
* Creates an invocation context with an implicit transaction if it is required. An implicit transaction is created
* if there is no current transaction and autoCommit is enabled.
* @param keyCount how many keys are expected to be changed
* @return the invocation context
*/
InvocationContext getInvocationContextWithImplicitTransaction(int keyCount) {
return getInvocationContextWithImplicitTransaction(keyCount, false);
}

/**
* Same as {@link #getInvocationContextWithImplicitTransaction(int)} except if <b>forceCreateTransaction</b>
* is true then autoCommit doesn't have to be enabled to start a new transaction.
* @param keyCount how many keys are expected to be changed
* @param forceCreateTransaction if true then a transaction is always started if there wasn't one
* @return the invocation context
*/
InvocationContext getInvocationContextWithImplicitTransaction(int keyCount, boolean forceCreateTransaction) {
InvocationContext invocationContext;
boolean txInjected = false;
if (transactional) {
Transaction transaction = getOngoingTransaction(true);
if (transaction == null && config.transaction().autoCommit()) {
if (transaction == null && (forceCreateTransaction || config.transaction().autoCommit())) {
transaction = tryBegin();
txInjected = true;
}
Expand Down
Expand Up @@ -477,8 +477,13 @@ public CacheSet<CacheEntry<K, V>> cacheEntrySet() {
}

@Override
public void removeExpired(K key, V value, Long lifespan) {
super.removeExpired(keyToStorage(key), valueToStorage(value), lifespan);
public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan) {
return super.removeLifespanExpired(keyToStorage(key), valueToStorage(value), lifespan);
}

@Override
public CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value) {
return super.removeMaxIdleExpired(keyToStorage(key), valueToStorage(value));
}

@Override
Expand Down
Expand Up @@ -462,8 +462,17 @@ public LockedStream<K, V> lockedStream() {
}

@Override
public void removeExpired(K key, V value, Long lifespan) {
public CompletableFuture<Void> removeLifespanExpired(K key, V value, Long lifespan) {
checkExpiration(getDataContainer().get(key), timeService.wallClockTime());
return CompletableFutures.completedNull();
}

@Override
public CompletableFuture<Boolean> removeMaxIdleExpired(K key, V value) {
if (checkExpiration(getDataContainer().get(key), timeService.wallClockTime())) {
return CompletableFutures.completedTrue();
}
return CompletableFutures.completedFalse();
}

@Override
Expand Down
27 changes: 26 additions & 1 deletion core/src/main/java/org/infinispan/commands/CommandsFactory.java
Expand Up @@ -41,6 +41,8 @@
import org.infinispan.commands.remote.RenewBiasCommand;
import org.infinispan.commands.remote.RevokeBiasCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commands.remote.expiration.RetrieveLastAccessCommand;
import org.infinispan.commands.remote.expiration.UpdateLastAccessCommand;
import org.infinispan.commands.remote.recovery.CompleteTransactionCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTransactionsCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTxInfoCommand;
Expand Down Expand Up @@ -153,14 +155,37 @@ public interface CommandsFactory {
InvalidateCommand buildInvalidateFromL1Command(Address origin, long flagsBitSet, Collection<Object> keys);

/**
* Builds an expired remove command that is used to remove only a specific expired entry
* Builds an expired remove command that is used to remove only a specific entry when it expires via lifespan
* @param key the key of the expired entry
* @param value the value of the entry when it was expired
* @param lifespan the lifespan that expired from the command
* @return a RemovedExpiredCommand
*/
RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, Long lifespan);

/**
* Builds an expired remove command that is used to remove only a specific entry when it expires via maxIdle
* @param key the key of the expired entry
* @param value the value of the entry when it was expired
* @return a RemovedExpiredCommand
*/
RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value);

/**
* Builds a retrieve max idle command that is used to get the last access time for a given key.
* @param key the key of the entry to get the last access time of
* @return a RetrieveLastAccessCommand
*/
RetrieveLastAccessCommand buildRetrieveLastAccessCommand(Object key, Object value);

/**
* Builds an update last access command that is used to update the last access time for a given key.
* @param key the key of the entry to update the last access time of
* @param accessTime the time to set the access time to
* @return a UpdateLastAccessCommand
*/
UpdateLastAccessCommand buildUpdateLastAccessCommand(Object key, long accessTime);

/**
* Builds a ReplaceCommand
* @param key key to replace
Expand Down
Expand Up @@ -46,6 +46,8 @@
import org.infinispan.commands.remote.RenewBiasCommand;
import org.infinispan.commands.remote.RevokeBiasCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commands.remote.expiration.RetrieveLastAccessCommand;
import org.infinispan.commands.remote.expiration.UpdateLastAccessCommand;
import org.infinispan.commands.remote.recovery.CompleteTransactionCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTransactionsCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTxInfoCommand;
Expand Down Expand Up @@ -139,6 +141,7 @@
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.transaction.xa.recovery.RecoveryManager;
import org.infinispan.util.ByteString;
import org.infinispan.util.TimeService;
import org.infinispan.util.concurrent.CommandAckCollector;
import org.infinispan.util.concurrent.locks.LockManager;
import org.infinispan.util.logging.Log;
Expand Down Expand Up @@ -199,6 +202,7 @@ public class CommandsFactoryImpl implements CommandsFactory {
private Map<Byte, ModuleCommandInitializer> moduleCommandInitializers;
@Inject private VersionGenerator versionGenerator;
@Inject private KeyPartitioner keyPartitioner;
@Inject private TimeService timeService;

private ByteString cacheName;
private boolean transactional;
Expand Down Expand Up @@ -247,10 +251,26 @@ public InvalidateCommand buildInvalidateFromL1Command(Address origin, long flags

@Override
public RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value, Long lifespan) {
return new RemoveExpiredCommand(key, value, lifespan, notifier, generateUUID(transactional),
return new RemoveExpiredCommand(key, value, lifespan, false, notifier, generateUUID(transactional),
versionGenerator.nonExistingVersion());
}

@Override
public RemoveExpiredCommand buildRemoveExpiredCommand(Object key, Object value) {
return new RemoveExpiredCommand(key, value, null, true, notifier, generateUUID(transactional),
versionGenerator.nonExistingVersion());
}

@Override
public RetrieveLastAccessCommand buildRetrieveLastAccessCommand(Object key, Object value) {
return new RetrieveLastAccessCommand(cacheName, key, value);
}

@Override
public UpdateLastAccessCommand buildUpdateLastAccessCommand(Object key, long accessTime) {
return new UpdateLastAccessCommand(cacheName, key, accessTime);
}

@Override
public ReplaceCommand buildReplaceCommand(Object key, Object oldValue, Object newValue, Metadata metadata, long flagsBitSet) {
return new ReplaceCommand(key, oldValue, newValue, notifier, metadata, flagsBitSet,
Expand Down Expand Up @@ -509,6 +529,14 @@ public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) {
RemoveExpiredCommand removeExpiredCommand = (RemoveExpiredCommand) c;
removeExpiredCommand.init(notifier, versionGenerator.nonExistingVersion());
break;
case RetrieveLastAccessCommand.COMMAND_ID:
RetrieveLastAccessCommand retrieveLastAccessCommand = (RetrieveLastAccessCommand) c;
retrieveLastAccessCommand.inject(dataContainer, timeService);
break;
case UpdateLastAccessCommand.COMMAND_ID:
UpdateLastAccessCommand updateLastAccessCommand = (UpdateLastAccessCommand) c;
updateLastAccessCommand.inject(dataContainer);
break;
case BackupAckCommand.COMMAND_ID:
BackupAckCommand command = (BackupAckCommand) c;
command.setCommandAckCollector(commandAckCollector);
Expand Down
Expand Up @@ -26,6 +26,8 @@
import org.infinispan.commands.remote.RenewBiasCommand;
import org.infinispan.commands.remote.RevokeBiasCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.commands.remote.expiration.RetrieveLastAccessCommand;
import org.infinispan.commands.remote.expiration.UpdateLastAccessCommand;
import org.infinispan.commands.remote.recovery.CompleteTransactionCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTransactionsCommand;
import org.infinispan.commands.remote.recovery.GetInDoubtTxInfoCommand;
Expand Down Expand Up @@ -354,6 +356,12 @@ public CacheRpcCommand fromStream(byte id, byte type, ByteString cacheName) {
case RenewBiasCommand.COMMAND_ID:
command = new RenewBiasCommand(cacheName);
break;
case RetrieveLastAccessCommand.COMMAND_ID:
command = new RetrieveLastAccessCommand(cacheName);
break;
case UpdateLastAccessCommand.COMMAND_ID:
command = new UpdateLastAccessCommand(cacheName);
break;
default:
throw new CacheException("Unknown command id " + id + "!");
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/infinispan/commands/Visitor.java
Expand Up @@ -32,6 +32,7 @@
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.RemoveExpiredCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
Expand Down Expand Up @@ -61,6 +62,10 @@ public interface Visitor {

Object visitEvictCommand(InvocationContext ctx, EvictCommand command) throws Throwable;

default Object visitRemoveExpiredCommand(InvocationContext ctx, RemoveExpiredCommand command) throws Throwable {
return visitRemoveCommand(ctx, command);
}

@Deprecated
default Object visitApplyDeltaCommand(InvocationContext ctx, ApplyDeltaCommand command) throws Throwable {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit a044e5f

Please sign in to comment.