Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[14.0] SIFS and persistence changes #11806

Merged
merged 6 commits into from Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -8,6 +8,8 @@
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.processors.AsyncProcessor;
import io.reactivex.rxjava3.processors.FlowableProcessor;

/**
* Static factory class that provides methods to obtain commonly used instances for interoperation between RxJava
Expand Down Expand Up @@ -54,9 +56,25 @@ public static <R> Consumer<R> emptyConsumer() {
return (Consumer) emptyConsumer;
}

/**
* Returns a {@link FlowableProcessor} that is already complete and will ignore any value submitted to it and will
* immediately cancel any subscriptions it receives.
* @return processor that is completed
* @param <R> user value type
*/
public static <R> FlowableProcessor<R> completedFlowableProcessor() {
return (FlowableProcessor<R>) completeFlowableProcessor;
}

private static final Function<Object, Object> identityFunction = i -> i;
private static final Consumer<Object> emptyConsumer = ignore -> { };
private static final Function<Map.Entry<Object, Object>, Object> entryToKeyFunction = Map.Entry::getKey;
private static final Function<Map.Entry<Object, Object>, Object> entryToValueFunction = Map.Entry::getValue;
private static final Function<? super Throwable, Publisher<?>> wrapThrowable = t -> Flowable.error(Util.rewrapAsCacheException(t));
private static final FlowableProcessor<Object> completeFlowableProcessor;

static {
completeFlowableProcessor = AsyncProcessor.create();
completeFlowableProcessor.onComplete();
}
}
Expand Up @@ -55,6 +55,19 @@ public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throw
return visitor.visitRemoveCommand(ctx, this);
}

@Override
public boolean shouldReplicate(InvocationContext ctx, boolean requireReplicateIfRemote) {
if (!isSuccessful()) {
return false;
}
// XSITE backup should always replicate remove command
// If skip cache load is set we don't know if the store had a null value for remove so we have to replicate still
// Also if this is a backup write then we can't skip replication to stores
// Also if the caller says we must replicte on remote, make sure we are local
return (!nonExistent || hasAnyFlag(FlagBitSets.SKIP_XSITE_BACKUP |
FlagBitSets.SKIP_CACHE_LOAD) || (requireReplicateIfRemote && (ctx == null || !ctx.isOriginLocal())));
}

@Override
public byte getCommandId() {
return COMMAND_ID;
Expand Down Expand Up @@ -117,7 +130,7 @@ public boolean isConditional() {
}

public void nonExistant() {
nonExistent = false;
nonExistent = true;
}

public boolean isNonExistent() {
Expand Down
Expand Up @@ -57,6 +57,12 @@ public byte getCommandId() {
return COMMAND_ID;
}

@Override
public boolean shouldReplicate(InvocationContext ctx, boolean requireReplicateIfRemote) {
// TODO: I think expiration always has to replicate - check if works later
return isSuccessful();
}

@Override
public String toString() {
return "RemoveExpiredCommand{" +
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/org/infinispan/commands/write/WriteCommand.java
Expand Up @@ -6,6 +6,7 @@
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.TopologyAffectedCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.metadata.impl.PrivateMetadata;

/**
Expand All @@ -24,6 +25,19 @@ public interface WriteCommand extends VisitableCommand, FlagAffectedCommand, Top
*/
boolean isSuccessful();

/**
* Some commands may be successful but not need to be replicated to other nodes, stores or listeners. For example
* a unconditional remove may be performed on a key that doesn't exist. In that case the command is still successful
* but does not need to replicate that information other places.
* @param ctx invocation context if present, may be null
* @param requireReplicateIfRemote if the command can replicate even if not a locally invoked command
* @return whether the command should replicate
* @implSpec default just invokes {@link #isSuccessful()}
*/
default boolean shouldReplicate(InvocationContext ctx, boolean requireReplicateIfRemote) {
return isSuccessful();
}

/**
* Certain commands only work based on a certain condition or state of the cache. For example, {@link
* org.infinispan.Cache#putIfAbsent(Object, Object)} only does anything if a condition is met, i.e., the entry in
Expand Down
Expand Up @@ -1369,7 +1369,7 @@ private void parseIndex(ConfigurationReader reader, SoftIndexFileStoreConfigurat
break;
}
case SEGMENTS:
builder.indexSegments(ParseUtils.parseInt(reader, i, value));
ignoreAttribute(reader, i);
break;
case INDEX_QUEUE_LENGTH:
builder.indexQueueLength(ParseUtils.parseInt(reader, i, value));
Expand Down
Expand Up @@ -6,7 +6,6 @@

import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalEntryFactory;
import org.infinispan.eviction.impl.ActivationManager;
import org.infinispan.eviction.impl.PassivationManager;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
Expand Down Expand Up @@ -59,9 +58,6 @@ public interface DataContainer<K, V> extends Iterable<InternalCacheEntry<K, V>>
/**
* Puts an entry in the cache along with metadata adding information such lifespan of entry, max idle time, version
* information...etc.
* <p/>
* The {@code key} must be activate by invoking {@link ActivationManager#activateAsync(Object, int)}
* boolean)}.
*
* @param k key under which to store entry
* @param v value to store
Expand All @@ -79,8 +75,6 @@ public interface DataContainer<K, V> extends Iterable<InternalCacheEntry<K, V>>

/**
* Removes an entry from the cache
* <p/>
* The {@code key} must be activate by invoking {@link ActivationManager#activateAsync(Object, int)}
*
* @param k key to remove
* @return entry removed, or null if it didn't exist or had expired
Expand Down Expand Up @@ -127,8 +121,6 @@ default int size() {
* <p/>
* See {@link org.infinispan.container.DataContainer.ComputeAction#compute(Object,
* org.infinispan.container.entries.InternalCacheEntry, InternalEntryFactory)}.
* <p/>
* The {@code key} must be activated by invoking {@link ActivationManager#activateAsync(Object, int)}.
* <p>
* Note the entry provided to {@link org.infinispan.container.DataContainer.ComputeAction} may be expired as these
* entries are not filtered as many other methods do.
Expand Down
Expand Up @@ -233,6 +233,10 @@ protected Object primaryReturnHandler(InvocationContext ctx, AbstractDataWriteCo
if (log.isTraceEnabled()) log.tracef("Skipping the replication of the conditional command as it did not succeed on primary owner (%s).", command);
return localResult;
}
if (!command.shouldReplicate(ctx, false)) {
if (log.isTraceEnabled()) log.tracef("Skipping the replication of the command as it does not need to be (%s).", command);
return localResult;
}
LocalizedCacheTopology cacheTopology = CacheTopologyUtil.checkTopology(command, getCacheTopology());
int segment = SegmentSpecificCommand.extractSegment(command, command.getKey(), keyPartitioner);
DistributionInfo distributionInfo = cacheTopology.getSegmentDistribution(segment);
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.context.impl.TxInvocationContext;
Expand All @@ -34,7 +35,6 @@
import org.infinispan.interceptors.InvocationSuccessFunction;
import org.infinispan.interceptors.impl.BaseRpcInterceptor;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

Expand Down Expand Up @@ -143,7 +143,7 @@ public Object visitDataWriteCommand(InvocationContext ctx, DataWriteCommand comm
private Object handleDataWriteCommand(InvocationContext rCtx, DataWriteCommand writeCommand, Object rv, boolean assumeOriginKeptEntryInL1) {
Object key;
Object key1 = (key = writeCommand.getKey());
if (shouldUpdateOnWriteCommand(writeCommand) && writeCommand.isSuccessful() &&
if (shouldUpdateOnWriteCommand(writeCommand) && writeCommand.shouldReplicate(rCtx, true) &&
cdl.getCacheTopology().isWriteOwner(key1)) {
if (log.isTraceEnabled()) {
log.trace("Sending additional invalidation for requestors if necessary.");
Expand Down
Expand Up @@ -447,7 +447,7 @@ private <C extends DataWriteCommand> Object localPrimaryOwnerWrite(InvocationCon
final C dwCommand = (C) rCommand;
final CommandInvocationId id = dwCommand.getCommandInvocationId();
Collection<Address> backupOwners = distributionInfo.writeBackups();
if (!dwCommand.isSuccessful() || backupOwners.isEmpty()) {
if (!dwCommand.shouldReplicate(rCtx, true) || backupOwners.isEmpty()) {
if (log.isTraceEnabled()) {
log.tracef("Not sending command %s to backups", id);
}
Expand Down Expand Up @@ -496,9 +496,11 @@ private <C extends DataWriteCommand> Object remotePrimaryOwnerWrite(InvocationCo
final C dwCommand = (C) rCommand;
final CommandInvocationId id = dwCommand.getCommandInvocationId();
Collection<Address> backupOwners = distributionInfo.writeBackups();
// Note we have to replicate even if the command says not to with triangle if the command
// didn't originate at the primary since the backup may be the originator
if (!dwCommand.isSuccessful() || backupOwners.isEmpty()) {
if (log.isTraceEnabled()) {
log.tracef("Command %s not successful in primary owner.", id);
log.tracef("Command %s not replicating from primary owner.", id);
}
return rv;
}
Expand Down
Expand Up @@ -192,7 +192,7 @@ protected void visitNonTxKey(InvocationContext ctx, Object key, WriteCommand com
@SuppressWarnings("unused")
private void handleNonTxDataWriteCommand(InvocationContext ctx, DataWriteCommand command, Object rv, Throwable t) {
final Object key = command.getKey();
if (!command.isSuccessful() || skipEntryCommit(ctx, command, key)) {
if (!command.shouldReplicate(ctx, true) || skipEntryCommit(ctx, command, key)) {
return;
}
setMetadataToCacheEntry(ctx.lookupEntry(key), command.getSegment(), command.getInternalMetadata(key).iracMetadata());
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.cache.StoreConfiguration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
Expand All @@ -60,7 +61,6 @@
import org.infinispan.transaction.impl.AbstractCacheTransaction;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
Expand Down Expand Up @@ -175,10 +175,14 @@ private Transaction suspendRunningTx(TxInvocationContext<?> ctx) throws SystemEx
return xaTx;
}

boolean shouldReplicateRemove(InvocationContext ctx, RemoveCommand removeCommand) {
return removeCommand.shouldReplicate(ctx, true);
}

@Override
public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
return invokeNextThenApply(ctx, command, (rCtx, removeCommand, rv) -> {
if (!isStoreEnabled(removeCommand) || rCtx.isInTxScope() || !removeCommand.isSuccessful() ||
if (!isStoreEnabled(removeCommand) || rCtx.isInTxScope() || !shouldReplicateRemove(ctx, command) ||
!isProperWriter(rCtx, removeCommand, removeCommand.getKey())) {
return rv;
}
Expand Down Expand Up @@ -220,7 +224,7 @@ public Object visitIracPutKeyValueCommand(InvocationContext ctx, IracPutKeyValue
@Override
public Object visitComputeCommand(InvocationContext ctx, ComputeCommand command) throws Throwable {
return invokeNextThenApply(ctx, command, (rCtx, computeCommand, rv) -> {
if (!isStoreEnabled(computeCommand) || rCtx.isInTxScope() || !computeCommand.isSuccessful() ||
if (!isStoreEnabled(computeCommand) || rCtx.isInTxScope() || !computeCommand.shouldReplicate(ctx, true) ||
!isProperWriter(rCtx, computeCommand, computeCommand.getKey()))
return rv;

Expand All @@ -244,7 +248,7 @@ public Object visitComputeCommand(InvocationContext ctx, ComputeCommand command)
@Override
public Object visitComputeIfAbsentCommand(InvocationContext ctx, ComputeIfAbsentCommand command) throws Throwable {
return invokeNextThenApply(ctx, command, (rCtx, computeIfAbsentCommand, rv) -> {
if (!isStoreEnabled(computeIfAbsentCommand) || rCtx.isInTxScope() || !computeIfAbsentCommand.isSuccessful())
if (!isStoreEnabled(computeIfAbsentCommand) || rCtx.isInTxScope() || !computeIfAbsentCommand.shouldReplicate(ctx, true))
return rv;
if (!isProperWriter(rCtx, computeIfAbsentCommand, computeIfAbsentCommand.getKey()))
return rv;
Expand Down Expand Up @@ -301,7 +305,7 @@ public Object visitWriteOnlyKeyValueCommand(InvocationContext ctx, WriteOnlyKeyV

private <T extends DataWriteCommand & FunctionalCommand> Object visitWriteCommand(InvocationContext ctx, T command) {
return invokeNextThenApply(ctx, command, (rCtx, dataWriteCommand, rv) -> {
if (!isStoreEnabled(dataWriteCommand) || rCtx.isInTxScope() || !dataWriteCommand.isSuccessful() ||
if (!isStoreEnabled(dataWriteCommand) || rCtx.isInTxScope() || !dataWriteCommand.shouldReplicate(ctx, true) ||
!isProperWriter(rCtx, dataWriteCommand, dataWriteCommand.getKey()))
return rv;

Expand Down Expand Up @@ -409,8 +413,8 @@ private <T extends WriteCommand & FunctionalCommand> Object visitWriteManyComman
});
}

protected final InvocationStage store(TxInvocationContext<AbstractCacheTransaction> ctx) throws Throwable {
CompletionStage<Long> batchStage = persistenceManager.performBatch(ctx, ((writeCommand, o) -> isProperWriter(ctx, writeCommand, o)));
protected InvocationStage store(TxInvocationContext<AbstractCacheTransaction> ctx) throws Throwable {
CompletionStage<Long> batchStage = persistenceManager.performBatch(ctx, ((writeCommand, k, v) -> isProperWriter(ctx, writeCommand, k)));
if (getStatisticsEnabled()) {
batchStage.thenAccept(cacheStores::addAndGet);
}
Expand Down Expand Up @@ -490,7 +494,7 @@ protected boolean skipSharedStores(InvocationContext ctx, Object key, FlagAffect

private Object visitDataWriteCommandToStore(InvocationContext ctx, DataWriteCommand command) {
return invokeNextThenApply(ctx, command, (rCtx, cmd, rv) -> {
if (!isStoreEnabled(cmd) || rCtx.isInTxScope() || !cmd.isSuccessful())
if (!isStoreEnabled(cmd) || rCtx.isInTxScope() || !cmd.shouldReplicate(ctx, true))
return rv;
if (!isProperWriter(rCtx, cmd, cmd.getKey()))
return rv;
Expand Down