Skip to content

Commit

Permalink
ISPN-9059 Use functional commands for 2LC
Browse files Browse the repository at this point in the history
  • Loading branch information
rvansa authored and galderz committed Jun 14, 2018
1 parent d01c5f3 commit 7e19964
Show file tree
Hide file tree
Showing 35 changed files with 677 additions and 792 deletions.
Expand Up @@ -5,7 +5,6 @@
import org.infinispan.commands.CommandInvocationId; import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.write.AbstractDataWriteCommand; import org.infinispan.commands.write.AbstractDataWriteCommand;
import org.infinispan.commands.write.ValueMatcher; import org.infinispan.commands.write.ValueMatcher;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.encoding.DataConversion; import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry; import org.infinispan.factories.ComponentRegistry;
import org.infinispan.functional.impl.Params; import org.infinispan.functional.impl.Params;
Expand All @@ -22,12 +21,11 @@ public AbstractWriteKeyCommand(Object key, ValueMatcher valueMatcher, int segmen
CommandInvocationId id, Params params, CommandInvocationId id, Params params,
DataConversion keyDataConversion, DataConversion keyDataConversion,
DataConversion valueDataConversion) { DataConversion valueDataConversion) {
super(key, segment, EnumUtil.EMPTY_BIT_SET, id); super(key, segment, params.toFlagsBitSet(), id);
this.valueMatcher = valueMatcher; this.valueMatcher = valueMatcher;
this.params = params; this.params = params;
this.keyDataConversion = keyDataConversion; this.keyDataConversion = keyDataConversion;
this.valueDataConversion = valueDataConversion; this.valueDataConversion = valueDataConversion;
this.setFlagsBitSet(params.toFlagsBitSet());
} }


public AbstractWriteKeyCommand() { public AbstractWriteKeyCommand() {
Expand Down
45 changes: 43 additions & 2 deletions core/src/main/java/org/infinispan/functional/Param.java
Expand Up @@ -95,7 +95,12 @@ public static PersistenceMode valueOf(int ordinal) {
*/ */
@Experimental @Experimental
enum LockingMode implements Param<LockingMode> { enum LockingMode implements Param<LockingMode> {
LOCK, SKIP; LOCK,
SKIP,
/**
* The operation fails when it is not possible to acquire the lock without waiting.
*/
TRY_LOCK;


public static final int ID = ParamIds.LOCKING_MODE_ID; public static final int ID = ParamIds.LOCKING_MODE_ID;
private static final LockingMode[] CACHED_VALUES = values(); private static final LockingMode[] CACHED_VALUES = values();
Expand Down Expand Up @@ -206,4 +211,40 @@ public static StatisticsMode valueOf(int ordinal) {
public static boolean isSkip(Params params) { public static boolean isSkip(Params params) {
return params.<StatisticsMode>get(ID).get() == SKIP; return params.<StatisticsMode>get(ID).get() == SKIP;
} }
}} }

@Experimental
enum ReplicationMode implements Param<ReplicationMode> {
/**
* Command is completed when all owners are updated.
*/
SYNC,
/**
* Invoking node does not know when the owners are updated nor if the command fails.
*/
ASYNC;


public static final int ID = ParamIds.REPLICATION_MODE_ID;
public static final ReplicationMode[] CACHED_VALUES = values();

@Override
public int id() {
return ID;
}

@Override
public ReplicationMode get() {
return this;
}

public static ReplicationMode defaultValue() {
return SYNC;
}

public static ReplicationMode valueOf(int ordinal) {
return CACHED_VALUES[ordinal];
}

}
}
1 change: 1 addition & 0 deletions core/src/main/java/org/infinispan/functional/ParamIds.java
Expand Up @@ -14,6 +14,7 @@ public final class ParamIds {
public static final int LOCKING_MODE_ID = 1; public static final int LOCKING_MODE_ID = 1;
public static final int EXECUTION_MODE_ID = 2; public static final int EXECUTION_MODE_ID = 2;
public static final int STATS_MODE_ID = 3; public static final int STATS_MODE_ID = 3;
public static final int REPLICATION_MODE_ID = 4;


private ParamIds() { private ParamIds() {
// Cannot be instantiated, it's just a holder class // Cannot be instantiated, it's just a holder class
Expand Down
Expand Up @@ -7,6 +7,7 @@
import org.infinispan.commands.CommandsFactory; import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.util.Experimental; import org.infinispan.commons.util.Experimental;
import org.infinispan.context.InvocationContextFactory; import org.infinispan.context.InvocationContextFactory;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.factories.ComponentRegistry; import org.infinispan.factories.ComponentRegistry;
import org.infinispan.functional.FunctionalMap; import org.infinispan.functional.FunctionalMap;
import org.infinispan.functional.Param; import org.infinispan.functional.Param;
Expand Down Expand Up @@ -51,6 +52,11 @@ private static <K, V> long getFlagsBitSet(Cache<K, V> cache) {
break; break;
} }
} }
// By default the commands have Param.ReplicationMode.SYNC and this forces synchronous execution
// We could either have third replication mode (USE_CACHE_MODE) or we enforce that here.
if (!cache.getCacheConfiguration().clustering().cacheMode().isSynchronous()) {
flagsBitSet |= FlagBitSets.FORCE_ASYNCHRONOUS;
}
return flagsBitSet; return flagsBitSet;
} }


Expand Down
94 changes: 70 additions & 24 deletions core/src/main/java/org/infinispan/functional/impl/Params.java
Expand Up @@ -10,6 +10,7 @@
import org.infinispan.functional.Param.ExecutionMode; import org.infinispan.functional.Param.ExecutionMode;
import org.infinispan.functional.Param.LockingMode; import org.infinispan.functional.Param.LockingMode;
import org.infinispan.functional.Param.PersistenceMode; import org.infinispan.functional.Param.PersistenceMode;
import org.infinispan.functional.Param.ReplicationMode;
import org.infinispan.functional.Param.StatisticsMode; import org.infinispan.functional.Param.StatisticsMode;
import org.infinispan.commons.util.Experimental; import org.infinispan.commons.util.Experimental;
import org.infinispan.context.impl.FlagBitSets; import org.infinispan.context.impl.FlagBitSets;
Expand All @@ -31,7 +32,8 @@
public final class Params { public final class Params {


private static final Param<?>[] DEFAULTS = new Param<?>[]{ private static final Param<?>[] DEFAULTS = new Param<?>[]{
PersistenceMode.defaultValue(), LockingMode.defaultValue(), ExecutionMode.defaultValue(), StatisticsMode.defaultValue() PersistenceMode.defaultValue(), LockingMode.defaultValue(), ExecutionMode.defaultValue(),
StatisticsMode.defaultValue(), ReplicationMode.defaultValue()
}; };
// TODO: as Params are immutable and there's only limited number of them, // TODO: as Params are immutable and there's only limited number of them,
// there could be a table with all the possible combinations and we // there could be a table with all the possible combinations and we
Expand Down Expand Up @@ -65,6 +67,12 @@ public Params addAll(Param<?>... ps) {
return new Params(paramsAll); return new Params(paramsAll);
} }


public Params add(Param<?> p) {
Param<?>[] paramsAll = Arrays.copyOf(params, params.length);
paramsAll[p.id()] = p;
return new Params(paramsAll);
}

public Params addAll(Params ps) { public Params addAll(Params ps) {
if (ps == DEFAULT_INSTANCE) { if (ps == DEFAULT_INSTANCE) {
return this; return this;
Expand Down Expand Up @@ -103,10 +111,9 @@ public long toFlagsBitSet() {
LockingMode lockingMode = (LockingMode) params[LockingMode.ID].get(); LockingMode lockingMode = (LockingMode) params[LockingMode.ID].get();
ExecutionMode executionMode = (ExecutionMode) params[ExecutionMode.ID].get(); ExecutionMode executionMode = (ExecutionMode) params[ExecutionMode.ID].get();
StatisticsMode statisticsMode = (StatisticsMode) params[StatisticsMode.ID].get(); StatisticsMode statisticsMode = (StatisticsMode) params[StatisticsMode.ID].get();
ReplicationMode replicationMode = (ReplicationMode) params[ReplicationMode.ID].get();
long flagsBitSet = 0; long flagsBitSet = 0;
switch (persistenceMode) { switch (persistenceMode) {
case LOAD_PERSIST:
break;
case SKIP_PERSIST: case SKIP_PERSIST:
flagsBitSet |= FlagBitSets.SKIP_CACHE_STORE; flagsBitSet |= FlagBitSets.SKIP_CACHE_STORE;
break; break;
Expand All @@ -117,34 +124,67 @@ public long toFlagsBitSet() {
flagsBitSet |= FlagBitSets.SKIP_CACHE_LOAD | FlagBitSets.SKIP_CACHE_STORE; flagsBitSet |= FlagBitSets.SKIP_CACHE_LOAD | FlagBitSets.SKIP_CACHE_STORE;
break; break;
} }
if (lockingMode == LockingMode.SKIP) flagsBitSet |= FlagBitSets.SKIP_LOCKING; switch (lockingMode) {
if (executionMode == ExecutionMode.LOCAL) flagsBitSet |= FlagBitSets.CACHE_MODE_LOCAL; case SKIP:
else if (executionMode == ExecutionMode.LOCAL_SITE) flagsBitSet |= FlagBitSets.SKIP_XSITE_BACKUP; flagsBitSet |= FlagBitSets.SKIP_LOCKING;
if (statisticsMode == StatisticsMode.SKIP) flagsBitSet |= FlagBitSets.SKIP_STATISTICS; break;
case TRY_LOCK:
flagsBitSet |= FlagBitSets.ZERO_LOCK_ACQUISITION_TIMEOUT;
break;
}
switch (executionMode) {
case LOCAL:
flagsBitSet |= FlagBitSets.CACHE_MODE_LOCAL;
break;
case LOCAL_SITE:
flagsBitSet |= FlagBitSets.SKIP_XSITE_BACKUP;
break;
}
if (statisticsMode == StatisticsMode.SKIP) {
flagsBitSet |= FlagBitSets.SKIP_STATISTICS;
}
switch (replicationMode) {
case SYNC:
flagsBitSet |= FlagBitSets.FORCE_SYNCHRONOUS;
break;
case ASYNC:
flagsBitSet |= FlagBitSets.FORCE_ASYNCHRONOUS;
break;
}
return flagsBitSet; return flagsBitSet;
} }


public static Params fromFlagsBitSet(long flagsBitSet) { public static Params fromFlagsBitSet(long flagsBitSet) {
Params params = create(); if (flagsBitSet == 0) {
return DEFAULT_INSTANCE;
}
Param<?>[] paramsAll = Arrays.copyOf(DEFAULTS, DEFAULTS.length);
if ((flagsBitSet & (FlagBitSets.SKIP_CACHE_LOAD | FlagBitSets.SKIP_CACHE_STORE)) != 0) { if ((flagsBitSet & (FlagBitSets.SKIP_CACHE_LOAD | FlagBitSets.SKIP_CACHE_STORE)) != 0) {
params = params.addAll(PersistenceMode.SKIP); paramsAll[PersistenceMode.ID] = PersistenceMode.SKIP;
} else if ((flagsBitSet & FlagBitSets.SKIP_CACHE_STORE) != 0) { } else if ((flagsBitSet & FlagBitSets.SKIP_CACHE_STORE) != 0) {
params = params.addAll(PersistenceMode.SKIP_PERSIST); paramsAll[PersistenceMode.ID] = PersistenceMode.SKIP_PERSIST;
} else if ((flagsBitSet & FlagBitSets.SKIP_CACHE_LOAD) != 0) { } else if ((flagsBitSet & FlagBitSets.SKIP_CACHE_LOAD) != 0) {
params = params.addAll(PersistenceMode.SKIP_LOAD); paramsAll[PersistenceMode.ID] = PersistenceMode.SKIP_LOAD;
} }
if ((flagsBitSet & FlagBitSets.SKIP_LOCKING) != 0) { if ((flagsBitSet & FlagBitSets.SKIP_LOCKING) != 0) {
params = params.addAll(LockingMode.SKIP); paramsAll[LockingMode.ID] = LockingMode.SKIP;
} else if ((flagsBitSet & FlagBitSets.ZERO_LOCK_ACQUISITION_TIMEOUT) != 0) {
paramsAll[LockingMode.ID] = LockingMode.TRY_LOCK;
} }
if ((flagsBitSet & FlagBitSets.CACHE_MODE_LOCAL) != 0) { if ((flagsBitSet & FlagBitSets.CACHE_MODE_LOCAL) != 0) {
params = params.addAll(ExecutionMode.LOCAL); paramsAll[ExecutionMode.ID] = ExecutionMode.LOCAL;
} else if ((flagsBitSet & FlagBitSets.SKIP_XSITE_BACKUP) != 0) { } else if ((flagsBitSet & FlagBitSets.SKIP_XSITE_BACKUP) != 0) {
params = params.addAll(ExecutionMode.LOCAL_SITE); paramsAll[ExecutionMode.ID] = ExecutionMode.LOCAL_SITE;
} }
if ((flagsBitSet & FlagBitSets.SKIP_STATISTICS) != 0) { if ((flagsBitSet & FlagBitSets.SKIP_STATISTICS) != 0) {
params = params.addAll(StatisticsMode.SKIP); paramsAll[StatisticsMode.ID] = StatisticsMode.SKIP;
} }
return params; if ((flagsBitSet & FlagBitSets.FORCE_ASYNCHRONOUS) != 0) {
paramsAll[ReplicationMode.ID] = ReplicationMode.ASYNC;
} else if ((flagsBitSet & FlagBitSets.FORCE_SYNCHRONOUS) != 0) {
paramsAll[ReplicationMode.ID] = ReplicationMode.SYNC;
}
return new Params(paramsAll);
} }


public static Params create() { public static Params create() {
Expand All @@ -165,40 +205,46 @@ public static Params from(Param<?>... ps) {
static { static {
// make sure that bit-set marshalling will work // make sure that bit-set marshalling will work
if (PersistenceMode.values().length > 4) throw new IllegalStateException(); if (PersistenceMode.values().length > 4) throw new IllegalStateException();
if (LockingMode.values().length > 2) throw new IllegalStateException(); if (LockingMode.values().length > 4) throw new IllegalStateException();
if (ExecutionMode.values().length > 4) throw new IllegalStateException(); if (ExecutionMode.values().length > 4) throw new IllegalStateException();
if (StatisticsMode.values().length > 2) throw new IllegalStateException(); if (StatisticsMode.values().length > 2) throw new IllegalStateException();
if (ReplicationMode.values().length > 2) throw new IllegalStateException();
} }


public static void writeObject(ObjectOutput output, Params params) throws IOException { public static void writeObject(ObjectOutput output, Params params) throws IOException {
PersistenceMode persistenceMode = (PersistenceMode) params.get(PersistenceMode.ID).get(); PersistenceMode persistenceMode = (PersistenceMode) params.get(PersistenceMode.ID).get();
LockingMode lockingMode = (LockingMode) params.get(LockingMode.ID).get(); LockingMode lockingMode = (LockingMode) params.get(LockingMode.ID).get();
ExecutionMode executionMode = (ExecutionMode) params.get(ExecutionMode.ID).get(); ExecutionMode executionMode = (ExecutionMode) params.get(ExecutionMode.ID).get();
StatisticsMode statisticsMode = (StatisticsMode) params.get(StatisticsMode.ID).get(); StatisticsMode statisticsMode = (StatisticsMode) params.get(StatisticsMode.ID).get();
ReplicationMode replicationMode = (ReplicationMode) params.get(ReplicationMode.ID).get();
int paramBits = persistenceMode.ordinal() int paramBits = persistenceMode.ordinal()
| (lockingMode.ordinal() << 2) | (lockingMode.ordinal() << 2)
| (executionMode.ordinal() << 3) | (executionMode.ordinal() << 4)
| (statisticsMode.ordinal() << 5); | (statisticsMode.ordinal() << 6)
| (replicationMode.ordinal() << 7);
output.writeByte(paramBits); output.writeByte(paramBits);
} }


public static Params readObject(ObjectInput input) throws IOException, ClassNotFoundException { public static Params readObject(ObjectInput input) throws IOException {
int paramBits = input.readByte(); int paramBits = input.readByte();
PersistenceMode persistenceMode = PersistenceMode.valueOf(paramBits & 3); PersistenceMode persistenceMode = PersistenceMode.valueOf(paramBits & 3);
LockingMode lockingMode = LockingMode.valueOf((paramBits >>> 2) & 1); LockingMode lockingMode = LockingMode.valueOf((paramBits >>> 2) & 3);
ExecutionMode executionMode = ExecutionMode.valueOf((paramBits >>> 3) & 3); ExecutionMode executionMode = ExecutionMode.valueOf((paramBits >>> 4) & 3);
StatisticsMode statisticsMode = StatisticsMode.valueOf((paramBits >>> 5) & 1); StatisticsMode statisticsMode = StatisticsMode.valueOf((paramBits >>> 6) & 1);
ReplicationMode replicationMode = ReplicationMode.valueOf((paramBits >>> 7) & 1);
if (persistenceMode == PersistenceMode.defaultValue() if (persistenceMode == PersistenceMode.defaultValue()
&& lockingMode == LockingMode.defaultValue() && lockingMode == LockingMode.defaultValue()
&& executionMode == ExecutionMode.defaultValue() && executionMode == ExecutionMode.defaultValue()
&& statisticsMode == StatisticsMode.defaultValue()) { && statisticsMode == StatisticsMode.defaultValue()
&& replicationMode == ReplicationMode.defaultValue()) {
return DEFAULT_INSTANCE; return DEFAULT_INSTANCE;
} else { } else {
Param[] params = Arrays.copyOf(DEFAULTS, DEFAULTS.length); Param[] params = Arrays.copyOf(DEFAULTS, DEFAULTS.length);
params[PersistenceMode.ID] = persistenceMode; params[PersistenceMode.ID] = persistenceMode;
params[LockingMode.ID] = lockingMode; params[LockingMode.ID] = lockingMode;
params[ExecutionMode.ID] = executionMode; params[ExecutionMode.ID] = executionMode;
params[StatisticsMode.ID] = statisticsMode; params[StatisticsMode.ID] = statisticsMode;
params[ReplicationMode.ID] = replicationMode;
return new Params(params); return new Params(params);
} }
} }
Expand Down
Expand Up @@ -24,4 +24,6 @@ default void invalidateRegion() {
boolean checkValid(); boolean checkValid();


void destroy(); void destroy();

long getElementCountInMemory();
} }
Expand Up @@ -2,9 +2,13 @@


import java.util.Comparator; import java.util.Comparator;


import org.infinispan.functional.MetaParam;

public interface InfinispanDataRegion extends InfinispanBaseRegion { public interface InfinispanDataRegion extends InfinispanBaseRegion {


long getTombstoneExpiration(); long getTombstoneExpiration();


MetaParam.MetaLifespan getExpiringMetaParam();

Comparator<Object> getComparator(String subclass); Comparator<Object> getComparator(String subclass);
} }
Expand Up @@ -8,14 +8,13 @@


import java.util.UUID; import java.util.UUID;


import org.infinispan.functional.FunctionalMap;
import org.infinispan.hibernate.cache.commons.access.SessionAccess.TransactionCoordinatorAccess; import org.infinispan.hibernate.cache.commons.access.SessionAccess.TransactionCoordinatorAccess;
import org.infinispan.hibernate.cache.commons.InfinispanDataRegion; import org.infinispan.hibernate.cache.commons.InfinispanDataRegion;
import org.infinispan.hibernate.cache.commons.util.FutureUpdate; import org.infinispan.hibernate.cache.commons.util.FutureUpdate;
import org.infinispan.hibernate.cache.commons.util.InfinispanMessageLogger; import org.infinispan.hibernate.cache.commons.util.InfinispanMessageLogger;
import org.infinispan.hibernate.cache.commons.util.InvocationAfterCompletion; import org.infinispan.hibernate.cache.commons.util.InvocationAfterCompletion;


import org.infinispan.AdvancedCache;

/** /**
* @author Radim Vansa &lt;rvansa@redhat.com&gt; * @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/ */
Expand All @@ -27,13 +26,13 @@ public class FutureUpdateSynchronization extends InvocationAfterCompletion {
private final Object value; private final Object value;
private final InfinispanDataRegion region; private final InfinispanDataRegion region;
private final long sessionTimestamp; private final long sessionTimestamp;
private final AdvancedCache cache; private final FunctionalMap.ReadWriteMap<Object, Object> rwMap;


public FutureUpdateSynchronization(TransactionCoordinatorAccess tc, AdvancedCache cache, boolean requiresTransaction, public FutureUpdateSynchronization(TransactionCoordinatorAccess tc, FunctionalMap.ReadWriteMap<Object, Object> rwMap, boolean requiresTransaction,
Object key, Object value, InfinispanDataRegion region, long sessionTimestamp) { Object key, Object value, InfinispanDataRegion region, long sessionTimestamp) {


super(tc, requiresTransaction); super(tc, requiresTransaction);
this.cache = cache; this.rwMap = rwMap;
this.key = key; this.key = key;
this.value = value; this.value = value;
this.region = region; this.region = region;
Expand All @@ -56,7 +55,11 @@ protected void invoke(boolean success) {
FutureUpdate futureUpdate = new FutureUpdate(uuid, region.nextTimestamp(), success ? this.value : null); FutureUpdate futureUpdate = new FutureUpdate(uuid, region.nextTimestamp(), success ? this.value : null);
for (;;) { for (;;) {
try { try {
cache.put(key, futureUpdate); // We expect that when the transaction completes further reads from cache will return the updated value.
// UnorderedDistributionInterceptor makes sure that the update is executed on the node first, and here
// we're waiting for the local update. The remote update does not concern us - the cache is async and
// we won't wait for that.
rwMap.eval(key, futureUpdate).join();
return; return;
} }
catch (Exception e) { catch (Exception e) {
Expand Down
Expand Up @@ -10,9 +10,11 @@


import org.infinispan.commands.write.DataWriteCommand; import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.Ownership; import org.infinispan.distribution.Ownership;
import org.infinispan.interceptors.InvocationFinallyFunction; import org.infinispan.interceptors.InvocationFinallyFunction;
import org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor; import org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.concurrent.locks.LockPromise; import org.infinispan.util.concurrent.locks.LockPromise;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;
Expand Down Expand Up @@ -51,6 +53,11 @@ public class LockingInterceptor extends NonTransactionalLockingInterceptor {
protected final InvocationFinallyFunction invokeNextAndUnlock = (rCtx, rCommand, rv, throwable) -> { protected final InvocationFinallyFunction invokeNextAndUnlock = (rCtx, rCommand, rv, throwable) -> {
if (throwable != null) { if (throwable != null) {
lockManager.unlockAll(rCtx); lockManager.unlockAll(rCtx);
DataWriteCommand dataWriteCommand = (DataWriteCommand) rCommand;
if (throwable instanceof TimeoutException && dataWriteCommand.hasAnyFlag(FlagBitSets.ZERO_LOCK_ACQUISITION_TIMEOUT)) {
dataWriteCommand.fail();
return null;
}
throw throwable; throw throwable;
} else { } else {
return invokeNextAndHandle(rCtx, rCommand, unlockAllReturnCheckCompletableFutureHandler); return invokeNextAndHandle(rCtx, rCommand, unlockAllReturnCheckCompletableFutureHandler);
Expand Down

0 comments on commit 7e19964

Please sign in to comment.