Skip to content

Commit

Permalink
ISPN-15239 HotRod client repeatedly retries iteration operation again…
Browse files Browse the repository at this point in the history
…st failed server after removal from topology
  • Loading branch information
jabolina authored and wburns committed Oct 24, 2023
1 parent 96de0b5 commit 9199e2b
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.infinispan.client.hotrod.impl.iteration;

import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -112,7 +113,7 @@ protected long handleNextResponse(IterationNextResponse<K, E> nextResponse, Map.

@Override
protected void handleThrowableInResponse(Throwable t, Map.Entry<SocketAddress, IntSet> target) {
if (t instanceof TransportException || t instanceof RemoteIllegalLifecycleStateException) {
if (t instanceof TransportException || t instanceof RemoteIllegalLifecycleStateException || t instanceof ConnectException) {
log.throwableDuringPublisher(t);
if (log.isTraceEnabled()) {
IntSet targetSegments = target.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ protected void handleThrowableInResponse(Throwable t, Map.Entry<SocketAddress, I
int batchSize = (this.batchSize / actualTargets.size()) + 1;
return Flowable.fromIterable(actualTargets.entrySet())
.map(entry -> {
log.tracef("Requesting next for: %s", entry);
RemoteInnerPublisherHandler<K, E> innerHandler = new RemoteInnerPublisherHandler<>(this,
batchSize, () -> null, entry);
return innerHandler.startPublisher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static org.testng.AssertJUnit.assertEquals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -14,6 +15,7 @@
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.client.hotrod.test.MultiHotRodServersTest;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.CloseableIteratorSet;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.protostream.SerializationContextInitializer;
import org.infinispan.query.dsl.embedded.DslSCI;
Expand Down Expand Up @@ -74,6 +76,24 @@ public void testFailOver() throws InterruptedException {
assertEquals(rangeAsSet(0, cacheSize), keys);
}

@Test(groups = "functional")
public void testServerLeaveButIterateTwice() throws InterruptedException {
testFailOver();
int cacheSize = 1_000;

RemoteCache<Integer, AccountHS> cache = clients.get(0).getCache();
Collection<Map.Entry<Object, Object>> entries = new ArrayList<>();
CloseableIteratorSet<Map.Entry<Integer, AccountHS>> iterator = cache.entrySet();
for (Map.Entry<Integer, AccountHS> entry : iterator) {
entries.add((Map.Entry) entry);
}

assertEquals(cacheSize, entries.size());

Set<Integer> keys = extractKeys(entries);
assertEquals(rangeAsSet(0, cacheSize), keys);
}

protected void killAnIterationServer() {
servers.stream()
.filter(s -> s.getIterationManager().activeIterations() > 0)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.infinispan.hotrod.impl.iteration;

import java.lang.invoke.MethodHandles;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -113,7 +114,7 @@ protected long handleNextResponse(IterationNextResponse<K, E> nextResponse, Map.

@Override
protected void handleThrowableInResponse(Throwable t, Map.Entry<SocketAddress, IntSet> target) {
if (t instanceof TransportException || t instanceof RemoteIllegalLifecycleStateException) {
if (t instanceof TransportException || t instanceof RemoteIllegalLifecycleStateException || t instanceof ConnectException) {
log.throwableDuringPublisher(t);
if (log.isTraceEnabled()) {
IntSet targetSegments = target.getValue();
Expand Down

0 comments on commit 9199e2b

Please sign in to comment.