Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

public final class GossipProtocolImpl implements GossipProtocol {

private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocolImpl.class);
private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocol.class);

// Qualifiers

Expand Down Expand Up @@ -122,7 +122,7 @@ public void stop() {

@Override
public Mono<String> spread(Message message) {
return Mono.fromCallable(() -> message)
return Mono.just(message)
.subscribeOn(scheduler)
.flatMap(msg -> Mono.create(sink -> futures.put(createAndPutGossip(msg), sink)));
}
Expand Down Expand Up @@ -150,9 +150,27 @@ private void doSpreadGossip() {
selectGossipMembers().forEach(member -> spreadGossipsTo(period, member));

// Sweep gossips
sweepGossips(period);
Set<String> gossipsToRemove = getGossipsToRemove(period);
if (!gossipsToRemove.isEmpty()) {
LOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove);
for (String gossipId : gossipsToRemove) {
gossips.remove(gossipId);
}
}

// Check spread gossips
Set<String> gossipsThatSpread = getGossipsThatMostLikelyDisseminated(period);
if (!gossipsThatSpread.isEmpty()) {
LOGGER.debug("Most likely disseminated gossips[{}]: {}", period, gossipsThatSpread);
for (String gossipId : gossipsThatSpread) {
MonoSink<String> sink = futures.remove(gossipId);
if (sink != null) {
sink.success(gossipId);
}
}
}
} catch (Exception ex) {
LOGGER.warn("Exception at doSpreadGossip[{}]: {}", period, ex.getMessage(), ex);
LOGGER.warn("Exception at doSpreadGossip[{}]: ", period, ex);
}
}

Expand Down Expand Up @@ -278,29 +296,24 @@ private Message buildGossipRequestMessage(Gossip gossip) {
return Message.withData(gossipRequest).qualifier(GOSSIP_REQ).build();
}

private void sweepGossips(long period) {
private Set<String> getGossipsToRemove(long period) {
// Select gossips to sweep
int periodsToSweep =
ClusterMath.gossipPeriodsToSweep(config.gossipRepeatMult(), remoteMembers.size() + 1);
Set<GossipState> gossipsToRemove =
gossips.values().stream()
.filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSweep)
.collect(Collectors.toSet());

// Check if anything selected
if (gossipsToRemove.isEmpty()) {
return; // nothing to sweep
}
return gossips.values().stream()
.filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSweep)
.map(gossipState -> gossipState.gossip().gossipId())
.collect(Collectors.toSet());
}

// Sweep gossips
LOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove);
for (GossipState gossipState : gossipsToRemove) {
gossips.remove(gossipState.gossip().gossipId());
MonoSink<String> sink = futures.remove(gossipState.gossip().gossipId());
if (sink != null) {
sink.success(gossipState.gossip().gossipId());
}
}
private Set<String> getGossipsThatMostLikelyDisseminated(long period) {
// Select gossips to spread
int periodsToSpread =
ClusterMath.gossipPeriodsToSpread(config.gossipRepeatMult(), remoteMembers.size() + 1);
return gossips.values().stream()
.filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSpread)
.map(gossipState -> gossipState.gossip().gossipId())
.collect(Collectors.toSet());
}

/**
Expand Down