New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISPN-7961 Cross-site replication of functional commands #5232
Conversation
@rvansa needs rebase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initial review
@@ -122,7 +131,36 @@ protected String key(String site) { | |||
} | |||
|
|||
|
|||
public CacheOperationsTest cacheMode(CacheMode cacheMode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong return type in cacheMode()
, transactional()
and lockingMode()
methods.
// All we can do is to assume that if the user plays with unsafe flags he won't modify the entry once | ||
// in a replicable and another time in a non-replicable way | ||
//TODO: what if after functional command the modified entry is not in the context? | ||
Map<Object, WriteCommand> lastModifyingCommand = new HashMap<>(); | ||
for (WriteCommand writeCommand : modifications) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't work if you have assertions enabled. If you have a transaction with put, remove, put on the same key, when iterating over the remove, the lastModification
won't be a RemoveCommand
.
I would reverse iterate over the modification
(can be changed to a List
) and if the key
exists in lastModifyingCommand
map, it would continue for the next key/command.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverse iteration is much better, done.
@@ -358,44 +360,55 @@ public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) t | |||
} | |||
return invokeNext(ctx, command); | |||
} | |||
GetAllSuccessHandler getAllSuccessHandler = new GetAllSuccessHandler(command); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are this changes related to xsite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I need a way to fetch multiple entries (remoteGetAll
) from TxDistributionInterceptor
, because when xsite is on, we will need the final value of the entry to replicate xsite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced. the read commands aren't sent to the backup site.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You got me wrong; When you execute functional modification in a transaction, by default you don't fetch the whole entry into originator's context, just the version for WSC. However, currently the xsite is implemented in such a way that the whole transaction is replicated xsite by the originator. And since the xsite may not have the previous value, we can't just replay the func modification in there - we need to send the whole updated value. Therefore, if xsite is on, we need to fetch the modified entry in order to have the updated value on the originator.
Since some commands work on multiple entries, I need some kind of remoteGetAll
to get hold of those values. Note that at some places the commands use just multiple remoteGet
s but that's a bug, in fact https://issues.jboss.org/browse/ISPN-7889
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling visitGetAllCommand
would also mean that it would call the interceptors below, executing the command and then checking if it loaded all entries or throwing OTE (as write commands should). I think that the error handling was the main reason why I went for the refactoring.
@@ -8,6 +8,7 @@ | |||
import org.infinispan.commands.CommandInvocationId; | |||
import org.infinispan.commands.Visitor; | |||
import org.infinispan.commands.write.ValueMatcher; | |||
import org.infinispan.functional.EntryView; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import
*/ | ||
CLUSTER, | ||
/** | ||
* Command is executed only locally, it is not sent to remote nodes. If the command is a write and this node | ||
* is not an owner of given entry, the entry is not stored in the cache. If the command reads a value and | ||
* the entry is not available locally, null entry is provided instead. | ||
*/ | ||
LOCAL; | ||
LOCAL, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit unclear what happens with writes when the originator is a backup owner, is the entry updated without contacting the primary owner? I'll grant you that's not that clear for non-functional writes and CACHE_MODE_LOCAL
, either, but we have to start somewhere :)
* Command is executed only in the current site (same as {@link #CLUSTER}, but it is not sent for backup | ||
* to other sites) | ||
*/ | ||
SITE_ONLY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a confusing to use SITE_ONLY
when the command is only executed in the local cluster, and CLUSTER
when it's also sent to the backup clusters. I'd like to rename CLUSTER
to ALL
(just deprecating CLUSTER
for now, of course) and SITE_ONLY
to LOCAL_SITE
.
Also, I'd remove the details about where exactly the command is executed with CLUSTER
, since we're never going to include here the entire replication algorithm, with retries and so on. And we could still decide to send the full value from the primary to the backups in non-tx caches, or from the originator to the owners in tx caches ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll rename CLUSTER
to ALL
, we can mess with this a bit due to the @Experimental
flag. It's not used anywhere, as it's the default anyway. And LOCAL_SITE
sounds good.
I'll add some backdoor to the documentation of CLUSTER
:)
} | ||
|
||
protected void handleRemotelyRetrievedKeys(InvocationContext ctx, List<?> remoteKeys) { | ||
} | ||
|
||
private class ClusteredGetAllHandler implements BiConsumer<Map<Address, Response>, Throwable> { | ||
private class ClusteredGetAllHandler<C extends FlagAffectedCommand & TopologyAffectedCommand> implements BiConsumer<Map<Address, Response>, Throwable> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it time we introduced an interface that extends both FlagAffectedCommand
and TopologyAffectedCommand
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe, but I wouldn't add changes in command hierarchy into this PR.
@@ -358,44 +360,55 @@ public Object visitGetAllCommand(InvocationContext ctx, GetAllCommand command) t | |||
} | |||
return invokeNext(ctx, command); | |||
} | |||
GetAllSuccessHandler getAllSuccessHandler = new GetAllSuccessHandler(command); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try { | ||
remoteGetAllHandler.onKeysLost(keys); | ||
} catch (Throwable t) { | ||
allFuture.completeExceptionally(t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be outside the synchronized block, so that late responses don't have to block while the remaining interceptor callbacks are running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
completeExceptionally
must be called within a synchronized block, see ClusteredGetAllFuture
javadoc. I could add another allFuture.isDone()
check before the sync block on line 430, but we need to sync the responses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand the javadoc of ClusteredGetAllFuture
either...
Completing allFuture
will run some interceptor callbacks synchronously, but not all of them. E.g. the state transfer interceptor callback would suspend the invocation to wait for a new topology, and that would release the allFuture
monitor, but the new topology could arrive while remoteGetAllHandler.onKeysLost(keys)
is running, and you'd still have 2 threads accessing the same invocation context.
So I don't think synchronizing completeExceptionally()
helps, we need to cancel all the BaseDistributionInterceptor
-related callbacks before completing allFuture
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When completeExceptionally
executes before the other responses processing, the future will be marked as done and as soon as the other responses get into the synchronized block, these will check isDone
and return.
If the response is being processed in sync block, running completeExceptionally
and the related callbacks will be blocked until we finish the processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, having the handlers check isDone()
works. But the ClusteredGetAllFuture
javadoc only mentions synchronizing around completeExceptionally()
, and that's not enough.
And yes, please add an isDone()
check before entering the synchronized
block.
if (forceRemoteReadForFunctionalCommands && !command.hasAnyFlag(FlagBitSets.SKIP_XSITE_BACKUP)) { | ||
CompletableFuture<Void> cf = remoteGetAll(ctx, command, command.getAffectedKeys(), RemoteGetAllForWriteHandler.INSTANCE); | ||
if (cf == null) { | ||
return invokeNext(ctx, command); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remoteGetAll()
could return a CompletableFutures.completedNull()
, and then you wouldn't need the if
.
metadata.lifespan(), TimeUnit.MILLISECONDS, | ||
metadata.maxIdle(), TimeUnit.MILLISECONDS); | ||
public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable { | ||
backupCache.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed another TODO here ;)
public AbstractTwoSitesTest use2Pc(boolean use2Pc) { | ||
this.use2Pc = use2Pc; | ||
return this; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be grouped with the other parameters (and with parameters()
as well).
// is not written. This happens when the failure is thrown on remote primary owner - we don't | ||
// commit local entries until distribution interceptor returns and this now throws an exception. | ||
// This used to work when we were replicating cross-site from origin only after everything was | ||
// committed - the replication failure then did not affect local cluster state. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't that the case now? BaseBackupInterceptor
is still before EntryWrappingInterceptor
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the past:
- origin -> primary
- primary commit
- origin commit
- origin backup -> exception, but data are committed locally
now:
- origin -> primary
- primary commit
- primary backups -> exception
- origin gets exception, does not commit
@@ -55,6 +55,7 @@ public void testXSiteDuringJoin() throws InterruptedException, ExecutionExceptio | |||
doXSiteStateTransferDuringTopologyChange(TopologyEvent.JOIN); | |||
} | |||
|
|||
// TODO: this test is flaky - ISPN-6872 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps you should move it to the "unstable_xsite" group.
return Stream.of(keys).collect(Collectors.toMap(Function.identity(), ignored -> value)); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkstyle error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor comments.
import java.util.function.Function; | ||
|
||
import javax.transaction.Transaction; | ||
|
||
import org.infinispan.Cache; | ||
import org.infinispan.functional.EntryView; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused imports
@@ -6,10 +6,13 @@ | |||
import java.util.function.Consumer; | |||
import java.util.function.Function; | |||
|
|||
import org.infinispan.container.entries.InternalCacheValue; | |||
import org.infinispan.functional.EntryView; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import
@@ -6,10 +6,12 @@ | |||
import java.util.Set; | |||
|
|||
import org.infinispan.container.versioning.EntryVersion; | |||
import org.infinispan.functional.MetaParam; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import
} | ||
|
||
public MetaParam.Writable[] toWritableMetas() { | ||
int writable = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: use stream instead?
return Arrays.stream(metas).filter(metaParam -> metaParam instanceof MetaParam.Writable).toArray(MetaParam.Writable[]::new);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That probably creates another intermediate array, and this is on the hot path...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw., I think that @karesti will introduce Metadata setter to the func API, and this will be another use case for that since the double conversion is unfortunate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merge PR should be merged for this to happen, I guess
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but none should block each other. The current solution is fine, if we decide to not include the setter.
@@ -6,6 +6,7 @@ | |||
import java.util.Optional; | |||
import java.util.Set; | |||
import java.util.concurrent.TimeUnit; | |||
import java.util.stream.StreamSupport; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused import
* Fix handling of ComputeCommand and ComputeIfAbsentCommand * In transaction replicate values from transactional context (replaying modifications in remote site may have different results)
Fixed or commented. |
Accidentally merged myself :-/ |
modifications in remote site may have different results)
https://issues.jboss.org/browse/ISPN-7961