Permalink
Browse files

ISPN-2504 - WriteSkew check fails for entries which are inserted firs…

…t time
  • Loading branch information...
1 parent d334b7d commit 75222a9535818217dd2c3c7bde52bd515dfca4fe @pruivo pruivo committed with galderz Mar 11, 2013
Showing with 226 additions and 17 deletions.
  1. +16 −0 core/src/main/java/org/infinispan/commands/write/AbstractDataWriteCommand.java
  2. +5 −0 core/src/main/java/org/infinispan/commands/write/ClearCommand.java
  3. +2 −1 core/src/main/java/org/infinispan/commands/write/PutKeyValueCommand.java
  4. +5 −0 core/src/main/java/org/infinispan/commands/write/PutMapCommand.java
  5. +2 −1 core/src/main/java/org/infinispan/commands/write/RemoveCommand.java
  6. +2 −1 core/src/main/java/org/infinispan/commands/write/ReplaceCommand.java
  7. +2 −1 core/src/main/java/org/infinispan/commands/write/VersionedPutKeyValueCommand.java
  8. +5 −0 core/src/main/java/org/infinispan/commands/write/WriteCommand.java
  9. +5 −2 core/src/main/java/org/infinispan/container/entries/ClusteredRepeatableReadEntry.java
  10. +2 −1 core/src/main/java/org/infinispan/context/InvocationContext.java
  11. +4 −2 core/src/main/java/org/infinispan/context/impl/AbstractInvocationContext.java
  12. +2 −1 core/src/main/java/org/infinispan/context/impl/ImmutableContext.java
  13. +17 −0 core/src/main/java/org/infinispan/interceptors/EntryWrappingInterceptor.java
  14. +2 −2 core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
  15. +28 −0 core/src/main/java/org/infinispan/interceptors/VersionedEntryWrappingInterceptor.java
  16. +2 −2 core/src/main/java/org/infinispan/interceptors/distribution/NonTxDistributionInterceptor.java
  17. +2 −2 core/src/main/java/org/infinispan/interceptors/distribution/TxDistributionInterceptor.java
  18. +1 −1 core/src/main/java/org/infinispan/transaction/WriteSkewHelper.java
  19. +49 −0 core/src/test/java/org/infinispan/container/versioning/DistWriteSkewTest.java
  20. +24 −0 core/src/test/java/org/infinispan/container/versioning/LocalWriteSkewTest.java
  21. +49 −0 core/src/test/java/org/infinispan/container/versioning/ReplWriteSkewTest.java
@@ -36,6 +36,8 @@
*/
public abstract class AbstractDataWriteCommand extends AbstractDataCommand implements DataWriteCommand {
+ protected boolean previousRead;
+
protected AbstractDataWriteCommand() {
}
@@ -53,4 +55,18 @@ public boolean isReturnValueExpected() {
return flags == null || (!flags.contains(Flag.SKIP_REMOTE_LOOKUP)
&& !flags.contains(Flag.IGNORE_RETURN_VALUES));
}
+
+ /**
+ * It marks the key as read when this write command was executed. This is only used when write skew check is enabled.
+ *
+ * @param value {@code true} if the key was previous read before this command execution
+ */
+ public final void setPreviousRead(boolean value) {
+ this.previousRead = value;
+ }
+
+ @Override
+ public final boolean wasPreviousRead() {
+ return previousRead;
+ }
}
@@ -122,6 +122,11 @@ public boolean isConditional() {
}
@Override
+ public boolean wasPreviousRead() {
+ return false; //no return value from clear
+ }
+
+ @Override
public boolean isReturnValueExpected() {
return false;
}
@@ -127,7 +127,7 @@ public byte getCommandId() {
@Override
public Object[] getParameters() {
- return new Object[]{key, value, lifespanMillis, maxIdleTimeMillis, putIfAbsent, Flag.copyWithoutRemotableFlags(flags)};
+ return new Object[]{key, value, lifespanMillis, maxIdleTimeMillis, putIfAbsent, Flag.copyWithoutRemotableFlags(flags), previousRead};
}
@Override
@@ -140,6 +140,7 @@ public void setParameters(int commandId, Object[] parameters) {
maxIdleTimeMillis = (Long) parameters[3];
putIfAbsent = (Boolean) parameters[4];
flags = (Set<Flag>) parameters[5];
+ previousRead = (Boolean) parameters[6];
}
public boolean isPutIfAbsent() {
@@ -188,6 +188,11 @@ public boolean isConditional() {
}
@Override
+ public boolean wasPreviousRead() {
+ return false; //no return value
+ }
+
+ @Override
public boolean isReturnValueExpected() {
return false;
}
@@ -180,11 +180,12 @@ public void setParameters(int commandId, Object[] parameters) {
value = parameters[1];
flags = (Set<Flag>) parameters[2];
ignorePreviousValue = (Boolean) parameters[3];
+ previousRead = (Boolean) parameters[4];
}
@Override
public Object[] getParameters() {
- return new Object[]{key, value, Flag.copyWithoutRemotableFlags(flags), ignorePreviousValue};
+ return new Object[]{key, value, Flag.copyWithoutRemotableFlags(flags), ignorePreviousValue, previousRead};
}
public void setIgnorePreviousValue(boolean ignorePreviousValue) {
@@ -119,7 +119,7 @@ public byte getCommandId() {
@Override
public Object[] getParameters() {
return new Object[]{key, oldValue, newValue, lifespanMillis, maxIdleTimeMillis, ignorePreviousValue,
- Flag.copyWithoutRemotableFlags(flags)};
+ Flag.copyWithoutRemotableFlags(flags),previousRead};
}
@Override
@@ -133,6 +133,7 @@ public void setParameters(int commandId, Object[] parameters) {
maxIdleTimeMillis = (Long) parameters[4];
ignorePreviousValue = (Boolean) parameters[5];
flags = (Set<Flag>) parameters[6];
+ previousRead = (Boolean) parameters[7];
}
@Override
@@ -79,7 +79,7 @@ public byte getCommandId() {
@Override
public Object[] getParameters() {
return new Object[]{key, value, lifespanMillis, maxIdleTimeMillis, version,
- Flag.copyWithoutRemotableFlags(flags)};
+ Flag.copyWithoutRemotableFlags(flags), previousRead};
}
@SuppressWarnings("unchecked")
@@ -92,6 +92,7 @@ public void setParameters(int commandId, Object[] parameters) {
maxIdleTimeMillis = (Long) parameters[3];
version = (EntryVersion) parameters[4];
flags = (Set<Flag>) parameters[5];
+ previousRead = (Boolean) parameters[6];
}
@Override
@@ -58,4 +58,9 @@
* an empty collection for this method.
*/
Set<Object> getAffectedKeys();
+
+ /**
+ * @return {@code true} if the key was previously read in the transaction during this command execution
+ */
+ boolean wasPreviousRead();
}
@@ -43,7 +43,7 @@ public ClusteredRepeatableReadEntry(Object key, Object value, EntryVersion versi
this.version = version;
}
- public boolean performWriteSkewCheck(DataContainer container, TxInvocationContext ctx) {
+ public boolean performWriteSkewCheck(DataContainer container, TxInvocationContext ctx, boolean previousRead) {
EntryVersion prevVersion;
InternalCacheEntry ice = container.get(key);
if (ice == null) {
@@ -58,8 +58,11 @@ public boolean performWriteSkewCheck(DataContainer container, TxInvocationContex
if (prevVersion == null)
throw new IllegalStateException("Entries cannot have null versions!");
}
+ if (log.isTraceEnabled()) {
+ log.tracef("Is going to compare versions %s and %s for key %s. Was previously read? %s", prevVersion, version, key, previousRead);
+ }
// Could be that we didn't do a remote get first ... so we haven't effectively read this entry yet.
- if (version == null) return true;
+ if (version == null) return !previousRead;
log.tracef("Comparing versions %s and %s for key %s: %s", prevVersion, version, key, prevVersion.compareTo(version));
return InequalVersionComparisonResult.AFTER != prevVersion.compareTo(version);
}
@@ -24,6 +24,7 @@
import java.util.Set;
+import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.remoting.transport.Address;
/**
@@ -121,5 +122,5 @@
*
* @return true if the context already contained a wrapped entry for which this value was changed, false otherwise.
*/
- boolean replaceValue(Object key, Object value);
+ boolean replaceValue(Object key, InternalCacheEntry cacheEntry);
}
@@ -23,6 +23,7 @@
package org.infinispan.context.impl;
import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.remoting.transport.Address;
@@ -142,11 +143,12 @@ public void setClassLoader(ClassLoader classLoader) {
}
@Override
- public boolean replaceValue(Object key, Object value) {
+ public boolean replaceValue(Object key, InternalCacheEntry cacheEntry) {
CacheEntry ce = lookupEntry(key);
if (ce == null || ce.isNull() || ce.isLockPlaceholder() || ce.getValue() == null) {
if (ce != null && ce.isChanged()) {
- ce.setValue(value);
+ ce.setValue(cacheEntry.getValue());
+ ce.setVersion(cacheEntry.getVersion());
} else {
return false;
}
@@ -21,6 +21,7 @@
import org.infinispan.CacheException;
import org.infinispan.container.entries.CacheEntry;
+import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.InfinispanCollections;
@@ -144,7 +145,7 @@ public void clearLockedKeys() {
}
@Override
- public boolean replaceValue(Object key, Object value) {
+ public boolean replaceValue(Object key, InternalCacheEntry cacheEntry) {
throw new CacheException("This context is immutable");
}
}
@@ -22,6 +22,7 @@
import org.infinispan.commands.AbstractVisitor;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.FlagAffectedCommand;
+import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
@@ -122,6 +123,7 @@ public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command)
@Override
public final Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
try {
+ checkIfKeyRead(ctx, command.getKey(), command);
entryFactory.wrapEntryForReading(ctx, command.getKey());
return invokeNextInterceptor(ctx, command);
} finally {
@@ -163,6 +165,7 @@ public final Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCo
if (shouldWrap(command.getKey(), ctx, command)) {
entryFactory.wrapEntryForPut(ctx, command.getKey(), null, !command.isPutIfAbsent(), command);
}
+ checkIfKeyRead(ctx, command.getKey(), command);
return invokeNextAndApplyChanges(ctx, command);
}
@@ -196,6 +199,7 @@ public final Object visitRemoveCommand(InvocationContext ctx, RemoveCommand comm
if (shouldWrap(command.getKey(), ctx, command)) {
entryFactory.wrapEntryForRemove(ctx, command.getKey());
}
+ checkIfKeyRead(ctx, command.getKey(), command);
return invokeNextAndApplyChanges(ctx, command);
}
@@ -204,6 +208,7 @@ public final Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand co
if (shouldWrap(command.getKey(), ctx, command)) {
entryFactory.wrapEntryForReplace(ctx, command.getKey());
}
+ checkIfKeyRead(ctx, command.getKey(), command);
return invokeNextAndApplyChanges(ctx, command);
}
@@ -358,6 +363,18 @@ public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command)
}
}
+ /**
+ * invoked when a command that may return a value to the application, it has the logic of keep track of the keys read
+ * by the transaction. This information is later used to perform the write skew check.
+ *
+ * @param context the invocation context
+ * @param key the key accessed
+ * @param command the visitable command (can be a read or write command)
+ */
+ protected void checkIfKeyRead(InvocationContext context, Object key, VisitableCommand command) {
+ //no-op, it is only needed to check the write skew
+ }
+
private boolean commitEntryIfNeeded(InvocationContext ctx, boolean skipOwnershipCheck, Object key, CacheEntry entry, boolean isPutForStateTransfer) {
if (entry == null) {
if (key != null && !isPutForStateTransfer && stateConsumer != null) {
@@ -183,7 +183,7 @@ private Object remoteGet(InvocationContext ctx, Object key, FlagAffectedCommand
}
if (ice != null) {
- if (!ctx.replaceValue(key, ice.getValue())) {
+ if (!ctx.replaceValue(key, ice)) {
if (isWrite) {
lockAndWrap(ctx, key, ice, command);
} else {
@@ -223,7 +223,7 @@ protected InternalCacheEntry retrieveFromRemoteSource(Object key, InvocationCont
private Object localGet(InvocationContext ctx, Object key, boolean isWrite, FlagAffectedCommand command) throws Throwable {
InternalCacheEntry ice = dataContainer.get(key);
if (ice != null) {
- if (!ctx.replaceValue(key, ice.getValue())) {
+ if (!ctx.replaceValue(key, ice)) {
if (isWrite)
lockAndWrap(ctx, key, ice, command);
else
@@ -19,15 +19,19 @@
package org.infinispan.interceptors;
+import org.infinispan.commands.VisitableCommand;
+import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.VersionedCommitCommand;
import org.infinispan.commands.tx.VersionedPrepareCommand;
+import org.infinispan.commands.write.AbstractDataWriteCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.EntryVersionsMap;
import org.infinispan.container.versioning.VersionGenerator;
+import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.factories.annotations.Inject;
@@ -98,4 +102,28 @@ protected void commitContextEntry(CacheEntry entry, InvocationContext ctx, boole
cdl.commitEntry(entry, entry.getVersion(), skipOwnershipCheck, ctx);
}
}
+
+ @Override
+ protected void checkIfKeyRead(InvocationContext context, Object key, VisitableCommand command) {
+ if (command instanceof AbstractDataWriteCommand) {
+ AbstractDataWriteCommand writeCommand = (AbstractDataWriteCommand) command;
+ //keep track is only need in a clustered and transactional environment to perform the write skew check
+ if (context.isInTxScope() && context.isOriginLocal()) {
+ TxInvocationContext txInvocationContext = (TxInvocationContext) context;
+ if (!writeCommand.hasFlag(Flag.PUT_FOR_STATE_TRANSFER) && writeCommand.isConditional() ||
+ !writeCommand.hasFlag(Flag.IGNORE_RETURN_VALUES)) {
+ //State transfer does not show the old value for the application neither with the IGNORE_RETURN_VALUES.
+ //on other hand, the conditional always read key!
+ txInvocationContext.getCacheTransaction().addReadKey(key);
+ }
+ writeCommand.setPreviousRead(txInvocationContext.getCacheTransaction().keyRead(key));
+ }
+ } else if (command instanceof GetKeyValueCommand) {
+ if (context.isInTxScope() && context.isOriginLocal()) {
+ //always show the value to the application
+ TxInvocationContext txInvocationContext = (TxInvocationContext) context;
+ txInvocationContext.getCacheTransaction().addReadKey(key);
+ }
+ }
+ }
}
@@ -142,7 +142,7 @@ private Object remoteGetBeforeWrite(InvocationContext ctx, Object key, FlagAffec
// attempt a remote lookup
InternalCacheEntry ice = retrieveFromRemoteSource(key, ctx, false, command);
if (ice != null) {
- if (!ctx.replaceValue(key, ice.getValue())) {
+ if (!ctx.replaceValue(key, ice)) {
entryFactory.wrapEntryForPut(ctx, key, ice, false, command);
}
return ice.getValue();
@@ -156,7 +156,7 @@ private Object remoteGetBeforeWrite(InvocationContext ctx, Object key, FlagAffec
private Object localGet(InvocationContext ctx, Object key, boolean isWrite, FlagAffectedCommand command) throws Throwable {
InternalCacheEntry ice = dataContainer.get(key);
if (ice != null) {
- if (!ctx.replaceValue(key, ice.getValue())) {
+ if (!ctx.replaceValue(key, ice)) {
if (isWrite)
entryFactory.wrapEntryForPut(ctx, key, ice, false, command);
else
@@ -299,7 +299,7 @@ private Object localGet(InvocationContext ctx, Object key, boolean isWrite, Flag
if (isWrite && isPessimisticCache && ctx.isInTxScope()) {
((TxInvocationContext) ctx).addAffectedKey(key);
}
- if (!ctx.replaceValue(key, ice.getValue())) {
+ if (!ctx.replaceValue(key, ice)) {
if (isWrite)
lockAndWrap(ctx, key, ice, command);
else
@@ -380,7 +380,7 @@ private Object remoteGetAndStoreInL1(InvocationContext ctx, Object key, boolean
log.debug("Inability to store in L1 caused by", e);
}
} else {
- if (!ctx.replaceValue(key, ice.getValue())) {
+ if (!ctx.replaceValue(key, ice)) {
if (isWrite)
lockAndWrap(ctx, key, ice, command);
else
@@ -86,7 +86,7 @@ public static EntryVersionsMap performWriteSkewCheckAndReturnNewVersions(Version
if (versionSeen != null) entry.setVersion(versionSeen);
}
- if (entry.performWriteSkewCheck(dataContainer, context)) {
+ if (entry.performWriteSkewCheck(dataContainer, context, c.wasPreviousRead())) {
IncrementableEntryVersion newVersion = entry.isCreated() ? versionGenerator.generateNew() : versionGenerator.increment((IncrementableEntryVersion) entry.getVersion());
uv.put(k, newVersion);
} else {
Oops, something went wrong.

0 comments on commit 75222a9

Please sign in to comment.