Skip to content

Commit

Permalink
Refactored io.scalecube.cluster.membership.MembershipProtocolImpl.send
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v committed Sep 2, 2023
1 parent 77e8552 commit ccbe0ed
Showing 1 changed file with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -349,7 +350,7 @@ private void doSync() {

Message message = prepareSyncDataMsg(SYNC, null);
LOGGER.debug("[{}][doSync] Send Sync to {}", localMember, addresses);
send(transport, addresses, 0, message)
send(transport, addresses, message)
.subscribe(
null,
ex ->
Expand Down Expand Up @@ -890,14 +891,14 @@ private Mono<Void> spreadMembershipGossip(MembershipRecord record) {
});
}

private static Mono<Void> send(
Transport transport, List<Address> addresses, int currentIndex, Message request) {
if (currentIndex >= addresses.size()) {
return Mono.error(new RuntimeException("All addresses have been tried and failed"));
}

return transport
.send(addresses.get(currentIndex), request)
.onErrorResume(th -> send(transport, addresses, currentIndex + 1, request));
private static Mono<Void> send(Transport transport, List<Address> addresses, Message request) {
final AtomicInteger currentIndex = new AtomicInteger();
return Mono.defer(
() -> {
final Address address = addresses.get(currentIndex.get());
return transport.send(address, request);
})
.doOnError(ex -> currentIndex.incrementAndGet())
.retry(addresses.size() - 1);
}
}

0 comments on commit ccbe0ed

Please sign in to comment.