Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-13428 Distributed streams collectors ignore distributedBatchSize #9622

Merged
merged 3 commits into from Nov 5, 2021

Conversation

danberindei
Copy link
Member

@danberindei danberindei commented Nov 2, 2021

https://issues.redhat.com/browse/ISPN-13428
https://issues.redhat.com/browse/ISPN-13438

BytesObjectOutput does not handle overflow correctly
Add warning to CacheStream collector operations javadoc

@danberindei
Copy link
Member Author

@wburns if you are ok with only running the collectors on the originators, I can add another commit to deprecate the serializable collectors API and to remove the internal scaffolding.

@wburns
Copy link
Member

wburns commented Nov 2, 2021

I do not like this change at all. The javadoc for CacheStream states that operations are shipped to remote nodes and performed there. This is changing and removing functionality that you cannot reproduce anywhere else. Where as a user can very easily convert the stream to an iterator and do this themselves.

The distributedBatchSize explicitly lists which methods that utilize it as well.

@danberindei
Copy link
Member Author

danberindei commented Nov 2, 2021

The javadoc for CacheStream states that operations are shipped to remote nodes and performed there.

True, but the Javadoc does not say anything about not being able to use a collector with more than 2GB's worth of data from each node.

The first option I considered was to make the collectors use distributedBatchSize(), but that seemed much too complicated, so I went for the simpler option.

This is changing and removing functionality that you cannot reproduce anywhere else. Where as a user can very easily convert the stream to an iterator and do this themselves.

I believe users can also emulate a collector using reduce(), e.g.

      List<Map.Entry<Integer, String>> result2 =
            createStream(entrySet).reduce(Collections.emptyList(),
                                          (list, e) -> {
                                             if (list.isEmpty()) list = new ArrayList<>();
                                             list.add(e);
                                             return list;
                                          },
                                          (left, right) -> {
                                             if (left.isEmpty()) return right;
                                             left.addAll(right);
                                             return left;
                                          });
      assertEquals(range, result2.size());

I know it's ugly, but somehow I like that it's ugly and users would give it a second thought before using that code in production :)

On the one hand I can imagine applications needing a "mutable reduction" that actually reduces the size of the values and is just nicer to express using the collector methods compared to the reduce methods; OTOH our CacheCollectors methods steer users towards java.util.Collectors, and basically all our tests/examples just add the values in a collection.

@wburns
Copy link
Member

wburns commented Nov 2, 2021

True, but the Javadoc does not say anything about not being able to use a collector with more than 2GB's worth of data from each node.

Yeah, sadly that is a limitation for everything we do since we are tied to Integer.MAX_VALUE for array lengths.

The first option I considered was to make the collectors use distributedBatchSize(), but that seemed much too complicated, so I went for the simpler option.

Yeah, you don't want to do that, heh.

I believe users can also emulate a collector using reduce(), e.g.

But then we have different terminal operations that some go remote and some don't. Reduce and collect are very similar to each other and it would be weird that only some of them are local. Sorry, I meant that if we changed all of these then there would be no alternative.

OTOH our CacheCollectors methods steer users towards java.util.Collectors, and basically all our tests/examples just add the values in a collection.

Yeah, that is because this is using the Stream API and java.util.stream.Collector is a pretty predominate API in it, so I was trying to keep aligned with it.

Once the new API comes in assuming it exposes something like ClusterPublisherManager this will be a lot simpler to distinguish local and remote invocations as only the functions that transform Publishers are sent remotely and anything done to the returned Publisher will be local only.

@danberindei
Copy link
Member Author

Updated to fix the overflow exception and to add a warning in the collector methods javadoc.

@wburns wburns merged commit b598450 into infinispan:main Nov 5, 2021
@wburns
Copy link
Member

wburns commented Nov 5, 2021

Integrated into main, thanks @danberindei !

@danberindei danberindei deleted the ISPN-13428_collectors branch November 5, 2021 19:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants