diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index d9a0e650..e28d568b 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -30,7 +30,7 @@ public final class GossipProtocolImpl implements GossipProtocol { - private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocolImpl.class); + private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocol.class); // Qualifiers @@ -122,7 +122,7 @@ public void stop() { @Override public Mono spread(Message message) { - return Mono.fromCallable(() -> message) + return Mono.just(message) .subscribeOn(scheduler) .flatMap(msg -> Mono.create(sink -> futures.put(createAndPutGossip(msg), sink))); } @@ -150,9 +150,27 @@ private void doSpreadGossip() { selectGossipMembers().forEach(member -> spreadGossipsTo(period, member)); // Sweep gossips - sweepGossips(period); + Set gossipsToRemove = getGossipsToRemove(period); + if (!gossipsToRemove.isEmpty()) { + LOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove); + for (String gossipId : gossipsToRemove) { + gossips.remove(gossipId); + } + } + + // Check spread gossips + Set gossipsThatSpread = getGossipsThatMostLikelyDisseminated(period); + if (!gossipsThatSpread.isEmpty()) { + LOGGER.debug("Most likely disseminated gossips[{}]: {}", period, gossipsThatSpread); + for (String gossipId : gossipsThatSpread) { + MonoSink sink = futures.remove(gossipId); + if (sink != null) { + sink.success(gossipId); + } + } + } } catch (Exception ex) { - LOGGER.warn("Exception at doSpreadGossip[{}]: {}", period, ex.getMessage(), ex); + LOGGER.warn("Exception at doSpreadGossip[{}]: ", period, ex); } } @@ -278,29 +296,24 @@ private Message buildGossipRequestMessage(Gossip gossip) { return Message.withData(gossipRequest).qualifier(GOSSIP_REQ).build(); } - private void sweepGossips(long period) { + private Set getGossipsToRemove(long period) { // Select gossips to sweep int periodsToSweep = ClusterMath.gossipPeriodsToSweep(config.gossipRepeatMult(), remoteMembers.size() + 1); - Set gossipsToRemove = - gossips.values().stream() - .filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSweep) - .collect(Collectors.toSet()); - - // Check if anything selected - if (gossipsToRemove.isEmpty()) { - return; // nothing to sweep - } + return gossips.values().stream() + .filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSweep) + .map(gossipState -> gossipState.gossip().gossipId()) + .collect(Collectors.toSet()); + } - // Sweep gossips - LOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove); - for (GossipState gossipState : gossipsToRemove) { - gossips.remove(gossipState.gossip().gossipId()); - MonoSink sink = futures.remove(gossipState.gossip().gossipId()); - if (sink != null) { - sink.success(gossipState.gossip().gossipId()); - } - } + private Set getGossipsThatMostLikelyDisseminated(long period) { + // Select gossips to spread + int periodsToSpread = + ClusterMath.gossipPeriodsToSpread(config.gossipRepeatMult(), remoteMembers.size() + 1); + return gossips.values().stream() + .filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSpread) + .map(gossipState -> gossipState.gossip().gossipId()) + .collect(Collectors.toSet()); } /**