Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISPN-9080 Distributed cache stream should wait for new topology when #5924

Merged

Conversation

wburns
Copy link
Member

@wburns wburns commented Apr 16, 2018

segments aren't complete

  • Block waiting for next topology on retries for distributed streams

https://issues.jboss.org/browse/ISPN-9080

@wburns
Copy link
Member Author

wburns commented Apr 16, 2018

Core seems to pass fine. Going to still run some heavier tests to verify as well though.

@tristantarrant
Copy link
Member

tristantarrant commented Apr 16, 2018 via email

@wburns
Copy link
Member Author

wburns commented Apr 16, 2018

whoops thought I deleted that :D

@wburns wburns force-pushed the ISPN-9080_dist_streams_wait branch from 0fc3c1f to 1fc1d04 Compare April 16, 2018 16:53
@wburns
Copy link
Member Author

wburns commented Apr 16, 2018

Removed

Copy link
Member

@danberindei danberindei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I'm only requesting a few small changes to improve readability.

if (complete) {
break;
}
// This is only entered if we couldn't complete all segments. This is because we either had a node leave
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is "this" the body of the if statement? If so, I think it would be clearer to but the comment in the body, or say something like "cacheTopology is non-null only if the previous iteration finished without completing all the segments"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved it inside the if block.

publisherDecorator(completedHandler, lostSegments -> {
// This covers when an iterator invokes hasNext multiple times each time returning false
// Otherwise the second hasNext would block for the timeout since we may not ever see a topology update
if (complete) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete is set after the loop, so I think this check should be before the loop.

The if (currentIterator == null) body is also getting a little large, so I'd start the loop with the "else" branch instead:

        if (currentIterator != null && currentIterator.hasNext()) {
           return currentIterator.next();
        }

If this doesn't return you know the currentIterator is either null or exhausted, so you don't need to set currentIterator = null.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to play around with this a bit, as it isn't this straight forward unfortunately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't really move that code up higher, as we shouldn't check the cache topology unless both the iterator is exhausted and segmentsToUse is also not empty. It sadly isn't just a simple else.

Also the completed adds an additional variable access per entry, which by keeping it inside the if prevents. But I can still change that if you really wanted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took a while to understand what you meant with segmentsToUse also not being empty, but if I got it right this time then I don't think you need the complete field at all:

     while (true){
        if (currentIterator != null && currentIterator.hasNext()) {
           return currentIterator.next();
        }

        // Either we don't have an iterator or the current iterator is exhausted
        if (segmentsToUse.isEmpty()) {
           // No more segments to spawn new iterators
           return null;
        }

        // An iterator completes all segments, unless we either had a node leave (SuspectException)
        // or a new node came up and data rehashed away from the node we requested from.
        // In either case we need to wait for a new topology before spawning a new iterator.
        if (currentIterator != null) {
           try {
              int nextTopology = cacheTopology.getTopologyId() + 1;
              log.tracef("Waiting for topology %d to continue iterator operation with segments %s", nextTopology,
                    segmentsToUse);
              stateTransferLock.topologyFuture(nextTopology).get(timeout, timeoutUnit);
           } catch (InterruptedException | ExecutionException | TimeoutException e) {
              throw new CacheException(e);
           }
        }

        cacheTopology = dm.getCacheTopology();
        log.tracef("Creating non-rehash iterator for segments %s using topology id: %d", segmentsToUse, cacheTopology.getTopologyId());
        currentIterator = nonRehashRemoteIterator(cacheTopology.getReadConsistentHash(), segmentsToUse,
              receivedKeys::get, publisherDecorator(completedHandler, lostSegments -> {
              }, k -> {
                 // Every time a key is retrieved from iterator we add it to the keys received
                 // Then when we retry we exclude those keys to keep out duplicates
                 Set<Object> set = receivedKeys.get(keyPartitioner.getSegment(k));
                 if (set != null) {
                    set.add(k);
                 }
              }), intermediateOperations);
     }
  }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah tbh this is more what I wanted to do, but I was quite distracted yesterday. So thanks for this :) I will try it out.

@wburns wburns force-pushed the ISPN-9080_dist_streams_wait branch from 1fc1d04 to e298242 Compare April 17, 2018 17:47
segments aren't complete

* Block waiting for next topology on retries for distributed streams
@wburns wburns force-pushed the ISPN-9080_dist_streams_wait branch from e298242 to 8017bec Compare April 18, 2018 15:05
@wburns
Copy link
Member Author

wburns commented Apr 18, 2018

Updated

@tristantarrant tristantarrant merged commit 399f898 into infinispan:master Apr 19, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants