New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISPN-8078 Support functional commands in scattered mode #5316
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's super hard for me to understand all the code here, I added some minor comments.
assertEquals(null, cache(1, cacheName).merge(key, "x", (o, n) -> "abc".equals(o) ? null : "unexpected")); | ||
assertLocalValue(0, key, null); | ||
assertLocalValue(1, key, null); | ||
assertLocalValue(2, key, "ab"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is the value not modified into "unexpected" here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, this is because is the local value and the cal was done from node 1, is that ok ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the merge should be run only on originator (1) and owner (0).
@@ -43,8 +48,15 @@ protected void createCacheManagers() throws Throwable { | |||
replBuilder.clustering().cacheMode(isSync ? CacheMode.REPL_SYNC : CacheMode.REPL_ASYNC); | |||
configureCache(replBuilder); | |||
cacheManagers.stream().forEach(cm -> cm.defineConfiguration(REPL, replBuilder.build())); | |||
// Create scattered caches | |||
if (!Boolean.TRUE.equals(transactional)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Personally I would init to Boolean.FALSE the MultipleCacheManagersTest
and just test !transactional. Or in this case do Boolean.FALSE.equals(transactional)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can init the transactional
to false in the constructor... I think there was some reason to have this as a tri-state, maybe related to @InTransactionalMode
.
@@ -94,6 +106,8 @@ private void initMaps() { | |||
fmapD2 = FunctionalMapImpl.create(cacheManagers.get(1).<Object, String>getCache(DIST).getAdvancedCache()); | |||
fmapR1 = FunctionalMapImpl.create(cacheManagers.get(0).<Object, String>getCache(REPL).getAdvancedCache()); | |||
fmapR2 = FunctionalMapImpl.create(cacheManagers.get(1).<Object, String>getCache(REPL).getAdvancedCache()); | |||
fmapS1 = FunctionalMapImpl.create(cacheManagers.get(0).<Object, String>getCache(SCATTERED).getAdvancedCache()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related, but I noticed when I've been working on encoding+tx in functional maps.
Did you notice that fmapL1 and fmapL2 work on the same cache ? Is this normal ? I don't understand why it's like that. When I did a quick test to use 0 and 1, I got errors, but well, I don't know if this is normal or not ... I didn't dig more yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that the whole class hierarchy for functional tests would use some refactoring, having only fmap1
and fmap2
and not all those by-cache-mode fields. I guess that all local mode ops should use only fmapL1, because in non-local caches the commands are not sent to any other node.
BiConsumer<ArrayIterator, Object> consumer = ArrayIterator::add; | ||
BiConsumer<ArrayIterator, ArrayIterator> combiner = ArrayIterator::combine; | ||
((Stream) rv).collect(supplier, consumer, combiner); | ||
((Stream) rv).collect(new ArrayCollector(allFuture.results)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rv is Always a Stream
? Impossible to get a ClassCastException
here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, ReadOnlyManyCommand.perform() returns a Stream. If we get something else, there's not much else to do about it than throw an exception.
import java.util.function.Function; | ||
import java.util.function.Supplier; | ||
|
||
public class ArrayCollector implements java.util.stream.Collector<Object, ArrayCollector, ArrayCollector>, Supplier<ArrayCollector> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is an utility class now and not a private class, could be nice to test it unitary and document it uses ?
} | ||
|
||
public void add(Object item) { | ||
array[pos] = item; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
calling this method too much could lead into ArrayIndexOutOfBounds
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much else I could do if the array is not sized properly.
@@ -93,7 +96,8 @@ public Object perform(InvocationContext ctx) throws Throwable { | |||
|
|||
@Override | |||
public boolean isReturnValueExpected() { | |||
return false; | |||
// Scattered cache always needs some response. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but for this case, the return is null ... isn't it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In scattered cache it will return an array of InternalCacheValue (after modification), because originator cannot apply the function locally (it does not have the previous version).
if (command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER)) { | ||
contextEntry.setMetadata(command.getMetadata()); | ||
// we don't increment versions with state transfer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why check to do nothing ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could use if (info.isPrimary() && !command.hasAnyFlag(FlagBitSets.PUT_FOR_STATE_TRANSFER))
...
Temporarily closing because of CI overload |
* Also move BaseDistributionInterceptor$ArrayIterator -> commons/ArrayCollector
* Scattered cache now uses RepeatableReadEntries to support concurrent changes in multi-key operations
@karesti Rebased, could you re-check? |
CI looks good, anything else to add here? |
@galderz Not from me... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM :)
Merged ! |
https://issues.jboss.org/browse/ISPN-8078
Depends on #5312