diff --git a/cluster-api/pom.xml b/cluster-api/pom.xml
index edb867fa..641fd17b 100644
--- a/cluster-api/pom.xml
+++ b/cluster-api/pom.xml
@@ -5,7 +5,7 @@
io.scalecube
scalecube-cluster-parent
- 2.4.2
+ 2.4.3-SNAPSHOT
scalecube-cluster-api
diff --git a/cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java b/cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java
index d8d8f6ef..dc7ab51e 100644
--- a/cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java
+++ b/cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java
@@ -7,6 +7,7 @@
import io.scalecube.cluster.metadata.MetadataEncoder;
import io.scalecube.cluster.transport.api.TransportConfig;
import java.util.Optional;
+import java.util.StringJoiner;
import java.util.function.UnaryOperator;
import reactor.core.Exceptions;
@@ -34,6 +35,7 @@ public final class ClusterConfig implements Cloneable {
private MetadataEncoder metadataEncoder = MetadataEncoder.INSTANCE;
private MetadataDecoder metadataDecoder = MetadataDecoder.INSTANCE;
+ private String memberId;
private String memberHost;
private Integer memberPort;
@@ -158,7 +160,7 @@ public String memberHost() {
* Sets a memberHost.
*
* @param memberHost member host
- * @return new {@code ClusterConfig} instance@return
+ * @return new {@code ClusterConfig} instance
*/
public ClusterConfig memberHost(String memberHost) {
ClusterConfig c = clone();
@@ -166,6 +168,22 @@ public ClusterConfig memberHost(String memberHost) {
return c;
}
+ public String memberId() {
+ return memberId;
+ }
+
+ /**
+ * Sets a memberId.
+ *
+ * @param memberId member id
+ * @return new {@code ClusterConfig} instance
+ */
+ public ClusterConfig memberId(String memberId) {
+ ClusterConfig c = clone();
+ c.memberId = memberId;
+ return c;
+ }
+
public Integer memberPort() {
return memberPort;
}
@@ -262,29 +280,19 @@ public ClusterConfig clone() {
@Override
public String toString() {
- return "ClusterConfig{"
- + "metadata="
- + metadataAsString()
- + ", metadataTimeout="
- + metadataTimeout
- + ", metadataEncoder="
- + metadataEncoder
- + ", metadataDecoder="
- + metadataDecoder
- + ", memberHost='"
- + memberHost
- + '\''
- + ", memberPort="
- + memberPort
- + ", transportConfig="
- + transportConfig
- + ", failureDetectorConfig="
- + failureDetectorConfig
- + ", gossipConfig="
- + gossipConfig
- + ", membershipConfig="
- + membershipConfig
- + '}';
+ return new StringJoiner(", ", ClusterConfig.class.getSimpleName() + "[", "]")
+ .add("metadata=" + metadataAsString())
+ .add("metadataTimeout=" + metadataTimeout)
+ .add("metadataEncoder=" + metadataEncoder)
+ .add("metadataDecoder=" + metadataDecoder)
+ .add("memberId='" + memberId + "'")
+ .add("memberHost='" + memberHost + "'")
+ .add("memberPort=" + memberPort)
+ .add("transportConfig=" + transportConfig)
+ .add("failureDetectorConfig=" + failureDetectorConfig)
+ .add("gossipConfig=" + gossipConfig)
+ .add("membershipConfig=" + membershipConfig)
+ .toString();
}
private String metadataAsString() {
diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Member.java b/cluster-api/src/main/java/io/scalecube/cluster/Member.java
index eec04fe8..fe9a0f22 100644
--- a/cluster-api/src/main/java/io/scalecube/cluster/Member.java
+++ b/cluster-api/src/main/java/io/scalecube/cluster/Member.java
@@ -68,6 +68,6 @@ public int hashCode() {
@Override
public String toString() {
- return id + "@" + address;
+ return id + ":" + address.port();
}
}
diff --git a/cluster-api/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorConfig.java b/cluster-api/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorConfig.java
index 9f1366cd..b02ba19b 100644
--- a/cluster-api/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorConfig.java
+++ b/cluster-api/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorConfig.java
@@ -1,5 +1,6 @@
package io.scalecube.cluster.fdetector;
+import java.util.StringJoiner;
import reactor.core.Exceptions;
public final class FailureDetectorConfig implements Cloneable {
@@ -121,13 +122,10 @@ public FailureDetectorConfig clone() {
@Override
public String toString() {
- return "FailureDetectorConfig{"
- + "pingInterval="
- + pingInterval
- + ", pingTimeout="
- + pingTimeout
- + ", pingReqMembers="
- + pingReqMembers
- + '}';
+ return new StringJoiner(", ", FailureDetectorConfig.class.getSimpleName() + "[", "]")
+ .add("pingInterval=" + pingInterval)
+ .add("pingTimeout=" + pingTimeout)
+ .add("pingReqMembers=" + pingReqMembers)
+ .toString();
}
}
diff --git a/cluster-api/src/main/java/io/scalecube/cluster/gossip/GossipConfig.java b/cluster-api/src/main/java/io/scalecube/cluster/gossip/GossipConfig.java
index 45c8f595..ff8cba27 100644
--- a/cluster-api/src/main/java/io/scalecube/cluster/gossip/GossipConfig.java
+++ b/cluster-api/src/main/java/io/scalecube/cluster/gossip/GossipConfig.java
@@ -1,5 +1,6 @@
package io.scalecube.cluster.gossip;
+import java.util.StringJoiner;
import reactor.core.Exceptions;
public final class GossipConfig implements Cloneable {
@@ -115,13 +116,10 @@ public GossipConfig clone() {
@Override
public String toString() {
- return "GossipConfig{"
- + "gossipFanout="
- + gossipFanout
- + ", gossipInterval="
- + gossipInterval
- + ", gossipRepeatMult="
- + gossipRepeatMult
- + '}';
+ return new StringJoiner(", ", GossipConfig.class.getSimpleName() + "[", "]")
+ .add("gossipFanout=" + gossipFanout)
+ .add("gossipInterval=" + gossipInterval)
+ .add("gossipRepeatMult=" + gossipRepeatMult)
+ .toString();
}
}
diff --git a/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java b/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java
index 7fb42f7c..f22a44d8 100644
--- a/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java
+++ b/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java
@@ -5,6 +5,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.StringJoiner;
import reactor.core.Exceptions;
public final class MembershipConfig implements Cloneable {
@@ -167,18 +168,12 @@ public MembershipConfig clone() {
@Override
public String toString() {
- return "MembershipConfig{"
- + "seedMembers="
- + seedMembers
- + ", syncInterval="
- + syncInterval
- + ", syncTimeout="
- + syncTimeout
- + ", suspicionMult="
- + suspicionMult
- + ", syncGroup='"
- + syncGroup
- + '\''
- + '}';
+ return new StringJoiner(", ", MembershipConfig.class.getSimpleName() + "[", "]")
+ .add("seedMembers=" + seedMembers)
+ .add("syncInterval=" + syncInterval)
+ .add("syncTimeout=" + syncTimeout)
+ .add("suspicionMult=" + suspicionMult)
+ .add("syncGroup='" + syncGroup + "'")
+ .toString();
}
}
diff --git a/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipEvent.java b/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipEvent.java
index 2e127bae..b374a2b0 100644
--- a/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipEvent.java
+++ b/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipEvent.java
@@ -3,6 +3,7 @@
import io.scalecube.cluster.Member;
import java.nio.ByteBuffer;
import java.util.Objects;
+import java.util.StringJoiner;
/**
* Event which is emitted on cluster membership changes when new member added, updated in the
@@ -97,15 +98,12 @@ public ByteBuffer newMetadata() {
@Override
public String toString() {
- return "MembershipEvent{type="
- + type
- + ", member="
- + member
- + ", newMetadata="
- + metadataAsString(newMetadata)
- + ", oldMetadata="
- + metadataAsString(oldMetadata)
- + '}';
+ return new StringJoiner(", ", MembershipEvent.class.getSimpleName() + "[", "]")
+ .add("type=" + type)
+ .add("member=" + member)
+ .add("oldMetadata=" + metadataAsString(oldMetadata))
+ .add("newMetadata=" + metadataAsString(newMetadata))
+ .toString();
}
private String metadataAsString(ByteBuffer metadata) {
diff --git a/cluster-testlib/pom.xml b/cluster-testlib/pom.xml
index dd7f1987..97dd88a0 100644
--- a/cluster-testlib/pom.xml
+++ b/cluster-testlib/pom.xml
@@ -3,7 +3,7 @@
scalecube-cluster-parent
io.scalecube
- 2.4.2
+ 2.4.3-SNAPSHOT
4.0.0
diff --git a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java
index 80f93ec3..82d7a758 100644
--- a/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java
+++ b/cluster-testlib/src/main/java/io/scalecube/cluster/utils/NetworkEmulator.java
@@ -6,6 +6,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
+import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
@@ -369,7 +370,10 @@ public long evaluateDelay() {
@Override
public String toString() {
- return "OutboundSettings{loss=" + lossPercent + ", delay=" + meanDelay + '}';
+ return new StringJoiner(", ", OutboundSettings.class.getSimpleName() + "[", "]")
+ .add("lossPercent=" + lossPercent)
+ .add("meanDelay=" + meanDelay)
+ .toString();
}
}
@@ -405,7 +409,9 @@ public boolean shallPass() {
@Override
public String toString() {
- return "InboundSettings{shallPass=" + shallPass + '}';
+ return new StringJoiner(", ", InboundSettings.class.getSimpleName() + "[", "]")
+ .add("shallPass=" + shallPass)
+ .toString();
}
}
}
diff --git a/cluster/pom.xml b/cluster/pom.xml
index 1e822721..8c2513a3 100644
--- a/cluster/pom.xml
+++ b/cluster/pom.xml
@@ -5,7 +5,7 @@
io.scalecube
scalecube-cluster-parent
- 2.4.2
+ 2.4.3-SNAPSHOT
scalecube-cluster
diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
index e91bcd47..ad5df0f3 100644
--- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
@@ -1,13 +1,17 @@
package io.scalecube.cluster;
+import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.fdetector.FailureDetectorImpl;
+import io.scalecube.cluster.gossip.GossipConfig;
import io.scalecube.cluster.gossip.GossipProtocolImpl;
+import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.membership.MembershipProtocolImpl;
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.metadata.MetadataStoreImpl;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
+import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.TransportImpl;
import java.lang.management.ManagementFactory;
@@ -38,7 +42,7 @@
/** Cluster implementation. */
public final class ClusterImpl implements Cluster {
- private static final Logger LOGGER = LoggerFactory.getLogger(ClusterImpl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Cluster.class);
private static final Set SYSTEM_MESSAGES =
Collections.unmodifiableSet(
@@ -129,6 +133,58 @@ public ClusterImpl config(UnaryOperator options) {
return cluster;
}
+ /**
+ * Returns a new cluster's instance which will apply the given options.
+ *
+ * @param options transport config options
+ * @return new {@code ClusterImpl} instance
+ */
+ public ClusterImpl transport(UnaryOperator options) {
+ Objects.requireNonNull(options);
+ ClusterImpl cluster = new ClusterImpl(this);
+ cluster.config = config.transport(options);
+ return cluster;
+ }
+
+ /**
+ * Returns a new cluster's instance which will apply the given options.
+ *
+ * @param options failureDetector config options
+ * @return new {@code ClusterImpl} instance
+ */
+ public ClusterImpl failureDetector(UnaryOperator options) {
+ Objects.requireNonNull(options);
+ ClusterImpl cluster = new ClusterImpl(this);
+ cluster.config = config.failureDetector(options);
+ return cluster;
+ }
+
+ /**
+ * Returns a new cluster's instance which will apply the given options.
+ *
+ * @param options gossip config options
+ * @return new {@code ClusterImpl} instance
+ */
+ public ClusterImpl gossip(UnaryOperator options) {
+ Objects.requireNonNull(options);
+ ClusterImpl cluster = new ClusterImpl(this);
+ cluster.config = config.gossip(options);
+ return cluster;
+ }
+
+ /**
+ * Returns a new cluster's instance which will apply the given options.
+ *
+ * @param options membership config options
+ * @return new {@code ClusterImpl} instance
+ */
+ public ClusterImpl membership(UnaryOperator options) {
+ Objects.requireNonNull(options);
+ ClusterImpl cluster = new ClusterImpl(this);
+ cluster.config = config.membership(options);
+ return cluster;
+ }
+
/**
* Returns a new cluster's instance with given handler. The previous handler will be replaced.
*
@@ -284,7 +340,11 @@ private Member createLocalMember(int listenPort) {
.map(memberHost -> Address.create(memberHost, port))
.orElseGet(() -> Address.create(localAddress, listenPort));
- return new Member(memberAddress);
+ if (config.memberId() != null) {
+ return new Member(config.memberId(), memberAddress);
+ } else {
+ return new Member(memberAddress);
+ }
}
@Override
@@ -377,7 +437,7 @@ private Mono doShutdown() {
return Mono.defer(
() -> {
LOGGER.info("Cluster member {} is shutting down", localMember);
- return Flux.concatDelayError(leaveCluster(localMember), dispose(), transport.stop())
+ return Flux.concatDelayError(leaveCluster(), dispose(), transport.stop())
.then()
.doFinally(s -> scheduler.dispose())
.doOnSuccess(
@@ -389,21 +449,15 @@ private Mono doShutdown() {
});
}
- private Mono leaveCluster(Member member) {
+ private Mono leaveCluster() {
return membership
.leaveCluster()
.subscribeOn(scheduler)
- .doOnSuccess(
- s ->
- LOGGER.debug(
- "Cluster member {} notified about his leaving and shutting down", member))
+ .doOnSuccess(s -> LOGGER.debug("Cluster member {} has left a cluster", localMember))
.doOnError(
ex ->
LOGGER.info(
- "Cluster member {} failed to spread leave notification "
- + "to other cluster members: {}",
- member,
- ex.toString()))
+ "Cluster member {} failed on leaveCluster: {}", localMember, ex.toString()))
.then();
}
@@ -433,9 +487,9 @@ public boolean isShutdown() {
public interface MonitorMBean {
- Collection getMember();
+ Collection getMemberId();
- String getMemberAsString();
+ String getMemberIdAsString();
Collection getMetadata();
@@ -461,13 +515,13 @@ private static JmxMonitorMBean start(ClusterImpl cluster) throws Exception {
}
@Override
- public Collection getMember() {
+ public Collection getMemberId() {
return Collections.singleton(cluster.member().id());
}
@Override
- public String getMemberAsString() {
- return getMember().iterator().next();
+ public String getMemberIdAsString() {
+ return getMemberId().iterator().next();
}
@Override
diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorEvent.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorEvent.java
index 336e7d13..39de6600 100644
--- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorEvent.java
+++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorEvent.java
@@ -2,6 +2,7 @@
import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MemberStatus;
+import java.util.StringJoiner;
/** CLass contains result of ping check. */
public final class FailureDetectorEvent {
@@ -24,6 +25,9 @@ public MemberStatus status() {
@Override
public String toString() {
- return "FailureDetectorEvent{member=" + member + ", status=" + status + '}';
+ return new StringJoiner(", ", FailureDetectorEvent.class.getSimpleName() + "[", "]")
+ .add("member=" + member)
+ .add("status=" + status)
+ .toString();
}
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
index 5c9c52a0..0ba35ab6 100644
--- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java
@@ -28,7 +28,7 @@
public final class FailureDetectorImpl implements FailureDetector {
- private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetectorImpl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetector.class);
// Qualifiers
diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/PingData.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/PingData.java
index 8034c9cf..4a4ffff8 100644
--- a/cluster/src/main/java/io/scalecube/cluster/fdetector/PingData.java
+++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/PingData.java
@@ -1,6 +1,7 @@
package io.scalecube.cluster.fdetector;
import io.scalecube.cluster.Member;
+import java.util.StringJoiner;
/** DTO class. Supports FailureDetector messages (Ping, Ack, PingReq). */
final class PingData {
@@ -79,15 +80,11 @@ public PingData withAckType(AckType ackType) {
@Override
public String toString() {
- return "PingData{"
- + "from="
- + from
- + ", to="
- + to
- + ", originalIssuer="
- + originalIssuer
- + ", ackType="
- + ackType
- + '}';
+ return new StringJoiner(", ", PingData.class.getSimpleName() + "[", "]")
+ .add("from=" + from)
+ .add("to=" + to)
+ .add("originalIssuer=" + originalIssuer)
+ .add("ackType=" + ackType)
+ .toString();
}
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/Gossip.java b/cluster/src/main/java/io/scalecube/cluster/gossip/Gossip.java
index 48399d9f..146ebdce 100644
--- a/cluster/src/main/java/io/scalecube/cluster/gossip/Gossip.java
+++ b/cluster/src/main/java/io/scalecube/cluster/gossip/Gossip.java
@@ -2,6 +2,7 @@
import io.scalecube.cluster.transport.api.Message;
import java.util.Objects;
+import java.util.StringJoiner;
/** Data model for gossip, include gossip id, qualifier and object need to disseminate. */
final class Gossip {
@@ -44,6 +45,9 @@ public int hashCode() {
@Override
public String toString() {
- return "Gossip{gossipId=" + gossipId + ", message=" + message + '}';
+ return new StringJoiner(", ", Gossip.class.getSimpleName() + "[", "]")
+ .add("gossipId='" + gossipId + "'")
+ .add("message=" + message)
+ .toString();
}
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
index d9a0e650..e28d568b 100644
--- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java
@@ -30,7 +30,7 @@
public final class GossipProtocolImpl implements GossipProtocol {
- private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocolImpl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(GossipProtocol.class);
// Qualifiers
@@ -122,7 +122,7 @@ public void stop() {
@Override
public Mono spread(Message message) {
- return Mono.fromCallable(() -> message)
+ return Mono.just(message)
.subscribeOn(scheduler)
.flatMap(msg -> Mono.create(sink -> futures.put(createAndPutGossip(msg), sink)));
}
@@ -150,9 +150,27 @@ private void doSpreadGossip() {
selectGossipMembers().forEach(member -> spreadGossipsTo(period, member));
// Sweep gossips
- sweepGossips(period);
+ Set gossipsToRemove = getGossipsToRemove(period);
+ if (!gossipsToRemove.isEmpty()) {
+ LOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove);
+ for (String gossipId : gossipsToRemove) {
+ gossips.remove(gossipId);
+ }
+ }
+
+ // Check spread gossips
+ Set gossipsThatSpread = getGossipsThatMostLikelyDisseminated(period);
+ if (!gossipsThatSpread.isEmpty()) {
+ LOGGER.debug("Most likely disseminated gossips[{}]: {}", period, gossipsThatSpread);
+ for (String gossipId : gossipsThatSpread) {
+ MonoSink sink = futures.remove(gossipId);
+ if (sink != null) {
+ sink.success(gossipId);
+ }
+ }
+ }
} catch (Exception ex) {
- LOGGER.warn("Exception at doSpreadGossip[{}]: {}", period, ex.getMessage(), ex);
+ LOGGER.warn("Exception at doSpreadGossip[{}]: ", period, ex);
}
}
@@ -278,29 +296,24 @@ private Message buildGossipRequestMessage(Gossip gossip) {
return Message.withData(gossipRequest).qualifier(GOSSIP_REQ).build();
}
- private void sweepGossips(long period) {
+ private Set getGossipsToRemove(long period) {
// Select gossips to sweep
int periodsToSweep =
ClusterMath.gossipPeriodsToSweep(config.gossipRepeatMult(), remoteMembers.size() + 1);
- Set gossipsToRemove =
- gossips.values().stream()
- .filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSweep)
- .collect(Collectors.toSet());
-
- // Check if anything selected
- if (gossipsToRemove.isEmpty()) {
- return; // nothing to sweep
- }
+ return gossips.values().stream()
+ .filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSweep)
+ .map(gossipState -> gossipState.gossip().gossipId())
+ .collect(Collectors.toSet());
+ }
- // Sweep gossips
- LOGGER.debug("Sweep gossips[{}]: {}", period, gossipsToRemove);
- for (GossipState gossipState : gossipsToRemove) {
- gossips.remove(gossipState.gossip().gossipId());
- MonoSink sink = futures.remove(gossipState.gossip().gossipId());
- if (sink != null) {
- sink.success(gossipState.gossip().gossipId());
- }
- }
+ private Set getGossipsThatMostLikelyDisseminated(long period) {
+ // Select gossips to spread
+ int periodsToSpread =
+ ClusterMath.gossipPeriodsToSpread(config.gossipRepeatMult(), remoteMembers.size() + 1);
+ return gossips.values().stream()
+ .filter(gossipState -> period > gossipState.infectionPeriod() + periodsToSpread)
+ .map(gossipState -> gossipState.gossip().gossipId())
+ .collect(Collectors.toSet());
}
/**
diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipRequest.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipRequest.java
index 840cfcb6..cc0ba7af 100644
--- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipRequest.java
+++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipRequest.java
@@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.StringJoiner;
/** Gossip request which be transmitted through the network, contains list of gossips. */
final class GossipRequest {
@@ -32,6 +33,9 @@ public String from() {
@Override
public String toString() {
- return "GossipRequest{gossips=" + gossips + ", from=" + from + '}';
+ return new StringJoiner(", ", GossipRequest.class.getSimpleName() + "[", "]")
+ .add("gossips=" + gossips)
+ .add("from='" + from + "'")
+ .toString();
}
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipState.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipState.java
index 2ac7ee31..49372ef2 100644
--- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipState.java
+++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipState.java
@@ -3,6 +3,7 @@
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
+import java.util.StringJoiner;
/** Data related to gossip, maintained locally on each node. */
final class GossipState {
@@ -39,12 +40,10 @@ public boolean isInfected(String memberId) {
@Override
public String toString() {
- return "GossipState{gossip="
- + gossip
- + ", infectionPeriod="
- + infectionPeriod
- + ", infected="
- + infected
- + '}';
+ return new StringJoiner(", ", GossipState.class.getSimpleName() + "[", "]")
+ .add("gossip=" + gossip)
+ .add("infectionPeriod=" + infectionPeriod)
+ .add("infected=" + infected)
+ .toString();
}
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipPingData.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipPingData.java
index a78ff728..bf569c29 100644
--- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipPingData.java
+++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipPingData.java
@@ -1,6 +1,7 @@
package io.scalecube.cluster.membership;
import io.scalecube.cluster.Member;
+import java.util.StringJoiner;
final class MembershipPingData {
/** Message's destination address. */
@@ -19,9 +20,8 @@ public Member getTarget() {
@Override
public String toString() {
- final StringBuilder sb = new StringBuilder("MembershipPingData{");
- sb.append("target=").append(target);
- sb.append('}');
- return sb.toString();
+ return new StringJoiner(", ", MembershipPingData.class.getSimpleName() + "[", "]")
+ .add("target=" + target)
+ .toString();
}
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
index d385d5e6..212374d4 100644
--- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java
@@ -51,7 +51,7 @@
public final class MembershipProtocolImpl implements MembershipProtocol {
- private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocolImpl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(MembershipProtocol.class);
private static final Logger LOGGER_MEMBERSHIP =
LoggerFactory.getLogger("io.scalecube.cluster.Membership");
diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/SyncData.java b/cluster/src/main/java/io/scalecube/cluster/membership/SyncData.java
index b5209335..ba6a0741 100644
--- a/cluster/src/main/java/io/scalecube/cluster/membership/SyncData.java
+++ b/cluster/src/main/java/io/scalecube/cluster/membership/SyncData.java
@@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.StringJoiner;
/**
* A class containing full membership table from specific member and used full synchronization
@@ -36,6 +37,9 @@ public String getSyncGroup() {
@Override
public String toString() {
- return "SyncData{membership=" + membership + ", syncGroup=" + syncGroup + '}';
+ return new StringJoiner(", ", SyncData.class.getSimpleName() + "[", "]")
+ .add("membership=" + membership)
+ .add("syncGroup='" + syncGroup + "'")
+ .toString();
}
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/GetMetadataRequest.java b/cluster/src/main/java/io/scalecube/cluster/metadata/GetMetadataRequest.java
index 2580592e..1645242f 100644
--- a/cluster/src/main/java/io/scalecube/cluster/metadata/GetMetadataRequest.java
+++ b/cluster/src/main/java/io/scalecube/cluster/metadata/GetMetadataRequest.java
@@ -2,6 +2,7 @@
import io.scalecube.cluster.Member;
import java.util.Objects;
+import java.util.StringJoiner;
/** DTO class. Stands for remote request on getting metadata in remote MetadataStore. */
final class GetMetadataRequest {
@@ -22,6 +23,8 @@ public Member getMember() {
@Override
public String toString() {
- return "GetMetadataRequest{" + "member=" + member + '}';
+ return new StringJoiner(", ", GetMetadataRequest.class.getSimpleName() + "[", "]")
+ .add("member=" + member)
+ .toString();
}
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/GetMetadataResponse.java b/cluster/src/main/java/io/scalecube/cluster/metadata/GetMetadataResponse.java
index 99a3e59e..9166617a 100644
--- a/cluster/src/main/java/io/scalecube/cluster/metadata/GetMetadataResponse.java
+++ b/cluster/src/main/java/io/scalecube/cluster/metadata/GetMetadataResponse.java
@@ -2,6 +2,7 @@
import io.scalecube.cluster.Member;
import java.nio.ByteBuffer;
+import java.util.StringJoiner;
/**
* DTO class. Stands for response for preceding remote request on getting metadata in remote
@@ -33,6 +34,9 @@ ByteBuffer getMetadata() {
@Override
public String toString() {
- return "GetMetadataResponse{" + "member=" + member + ", metadata=" + metadata.remaining() + '}';
+ return new StringJoiner(", ", GetMetadataResponse.class.getSimpleName() + "[", "]")
+ .add("member=" + member)
+ .add("metadata=" + metadata.remaining())
+ .toString();
}
}
diff --git a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java
index 4b58e2cb..f63b3934 100644
--- a/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java
+++ b/cluster/src/main/java/io/scalecube/cluster/metadata/MetadataStoreImpl.java
@@ -21,7 +21,7 @@
public class MetadataStoreImpl implements MetadataStore {
- private static final Logger LOGGER = LoggerFactory.getLogger(MetadataStoreImpl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(MetadataStore.class);
// Qualifiers
diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java
index f3d8ff52..9db80472 100644
--- a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java
+++ b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java
@@ -35,9 +35,7 @@ public void testMembersAccessFromScheduler() {
// Start seed node
Cluster seedNode = new ClusterImpl().startAwait();
Cluster otherNode =
- new ClusterImpl()
- .config(config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
- .startAwait();
+ new ClusterImpl().membership(opts -> opts.seedMembers(seedNode.address())).startAwait();
assertEquals(2, seedNode.members().size());
assertEquals(2, otherNode.members().size());
@@ -53,15 +51,13 @@ public void testMembersAccessFromScheduler() {
@Test
public void testJoinLocalhostIgnored() {
+ Address[] addresses = {Address.from("localhost:4801"), Address.from("127.0.0.1:4801")};
+
// Start seed node
Cluster seedNode =
- new ClusterImpl(
- new ClusterConfig()
- .transport(opts -> opts.port(4801).connectTimeout(500))
- .membership(
- opts ->
- opts.seedMembers(
- Address.from("localhost:4801"), Address.from("127.0.0.1:4801"))))
+ new ClusterImpl()
+ .transport(opts -> opts.port(4801).connectTimeout(500))
+ .membership(opts -> opts.seedMembers(addresses))
.startAwait();
Collection otherMembers = seedNode.otherMembers();
@@ -72,12 +68,9 @@ public void testJoinLocalhostIgnored() {
public void testJoinLocalhostIgnoredWithOverride() {
// Start seed node
Cluster seedNode =
- new ClusterImpl(
- new ClusterConfig()
- .memberHost("localhost")
- .memberPort(7878)
- .transport(opts -> opts.port(7878).connectTimeout(500))
- .membership(opts -> opts.seedMembers(Address.from("localhost:7878"))))
+ new ClusterImpl(new ClusterConfig().memberHost("localhost").memberPort(7878))
+ .transport(opts -> opts.port(7878).connectTimeout(500))
+ .membership(opts -> opts.seedMembers(Address.from("localhost:7878")))
.startAwait();
Collection otherMembers = seedNode.otherMembers();
@@ -97,7 +90,7 @@ public void testJoinDynamicPort() {
for (int i = 0; i < membersNum; i++) {
otherNodes.add(
new ClusterImpl()
- .config(config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
+ .membership(opts -> opts.seedMembers(seedNode.address()))
.startAwait());
}
LOGGER.info("Start up time: {} ms", System.currentTimeMillis() - startAt);
@@ -129,11 +122,8 @@ public void testUpdateMetadata() throws Exception {
metadata.put("key2", "value2");
metadataNode =
new ClusterImpl()
- .config(
- config ->
- config
- .membership(opts -> opts.seedMembers(seedNode.address()))
- .metadata(metadata))
+ .config(opts -> opts.metadata(metadata))
+ .membership(opts -> opts.seedMembers(seedNode.address()))
.startAwait();
// Start other test members
@@ -141,8 +131,7 @@ public void testUpdateMetadata() throws Exception {
.flatMap(
integer ->
new ClusterImpl()
- .config(
- config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
+ .membership(opts -> opts.seedMembers(seedNode.address()))
.handler(
cluster ->
new ClusterMessageHandler() {
@@ -206,11 +195,8 @@ public void testUpdateMetadataProperty() throws Exception {
metadata.put("key2", "value2");
metadataNode =
new ClusterImpl()
- .config(
- config ->
- config
- .membership(opts -> opts.seedMembers(seedNode.address()))
- .metadata(metadata))
+ .config(opts -> opts.metadata(metadata))
+ .membership(opts -> opts.seedMembers(seedNode.address()))
.startAwait();
// Start other test members
@@ -218,8 +204,7 @@ public void testUpdateMetadataProperty() throws Exception {
.flatMap(
integer ->
new ClusterImpl()
- .config(
- config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
+ .membership(opts -> opts.seedMembers(seedNode.address()))
.handler(
cluster ->
new ClusterMessageHandler() {
@@ -288,11 +273,8 @@ public void testRemoveMetadataProperty() throws Exception {
metadata.put("key2", "value2");
metadataNode =
new ClusterImpl()
- .config(
- config ->
- config
- .membership(opts -> opts.seedMembers(seedNode.address()))
- .metadata(metadata))
+ .config(opts -> opts.metadata(metadata))
+ .membership(opts -> opts.seedMembers(seedNode.address()))
.startAwait();
// Start other test members
@@ -300,8 +282,7 @@ public void testRemoveMetadataProperty() throws Exception {
.flatMap(
integer ->
new ClusterImpl()
- .config(
- config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
+ .membership(opts -> opts.seedMembers(seedNode.address()))
.handler(
cluster ->
new ClusterMessageHandler() {
@@ -374,17 +355,17 @@ public void onMembershipEvent(MembershipEvent event) {
// Start nodes
final Cluster node1 =
new ClusterImpl()
- .config(config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
+ .membership(opts -> opts.seedMembers(seedNode.address()))
.handler(cluster -> listener)
.startAwait();
final Cluster node2 =
new ClusterImpl()
- .config(config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
+ .membership(opts -> opts.seedMembers(seedNode.address()))
.handler(cluster -> listener)
.startAwait();
final Cluster node3 =
new ClusterImpl()
- .config(config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
+ .membership(opts -> opts.seedMembers(seedNode.address()))
.handler(cluster -> listener)
.startAwait();
@@ -405,7 +386,7 @@ public void testMemberMetadataRemoved() throws InterruptedException {
seedMetadata.put("seed", "shmid");
final Cluster seedNode =
new ClusterImpl()
- .config(options -> options.metadata(seedMetadata))
+ .config(opts -> opts.metadata(seedMetadata))
.handler(
cluster ->
new ClusterMessageHandler() {
@@ -423,11 +404,8 @@ public void onMembershipEvent(MembershipEvent event) {
ReplayProcessor node1Events = ReplayProcessor.create();
final Cluster node1 =
new ClusterImpl()
- .config(
- config ->
- config
- .membership(opts -> opts.seedMembers(seedNode.address()))
- .metadata(node1Metadata))
+ .config(opts -> opts.metadata(node1Metadata))
+ .membership(opts -> opts.seedMembers(seedNode.address()))
.handler(
cluster ->
new ClusterMessageHandler() {
@@ -478,10 +456,7 @@ public void testJoinSeedClusterWithNoExistingSeedMember() {
Address nonExistingSeed2 = Address.from("localhost:5678");
Address[] seeds = new Address[] {nonExistingSeed1, nonExistingSeed2, seedNode.address()};
- Cluster otherNode =
- new ClusterImpl()
- .config(config -> config.membership(opts -> opts.seedMembers(seeds)))
- .startAwait();
+ Cluster otherNode = new ClusterImpl().membership(opts -> opts.seedMembers(seeds)).startAwait();
assertEquals(otherNode.member(), seedNode.otherMembers().iterator().next());
assertEquals(seedNode.member(), otherNode.otherMembers().iterator().next());
diff --git a/examples/pom.xml b/examples/pom.xml
index 12ab4d47..336b5eb3 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -4,7 +4,7 @@
io.scalecube
scalecube-cluster-parent
- 2.4.2
+ 2.4.3-SNAPSHOT
scalecube-cluster-examples
diff --git a/examples/src/main/java/io/scalecube/examples/ClusterCustomMetadataEncodingExample.java b/examples/src/main/java/io/scalecube/examples/ClusterCustomMetadataEncodingExample.java
new file mode 100644
index 00000000..44095579
--- /dev/null
+++ b/examples/src/main/java/io/scalecube/examples/ClusterCustomMetadataEncodingExample.java
@@ -0,0 +1,96 @@
+package io.scalecube.examples;
+
+import io.scalecube.cluster.Cluster;
+import io.scalecube.cluster.ClusterImpl;
+import io.scalecube.cluster.metadata.MetadataDecoder;
+import io.scalecube.cluster.metadata.MetadataEncoder;
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+public class ClusterCustomMetadataEncodingExample {
+
+ /** Main method. */
+ public static void main(String[] args) throws Exception {
+ // Start seed cluster member Alice
+ Cluster alice =
+ new ClusterImpl()
+ .config(opts -> opts.metadataDecoder(new LongMetadataDecoder()))
+ .startAwait();
+ System.out.println(
+ "[" + alice.member().id() + "] Alice's metadata: " + alice.metadata().orElse(null));
+
+ Cluster joe =
+ new ClusterImpl()
+ .config(
+ opts ->
+ opts.metadataDecoder(new LongMetadataDecoder())
+ .metadataEncoder(new LongMetadataEncoder())
+ .metadata(123L))
+ .membership(opts -> opts.seedMembers(alice.address()))
+ .startAwait();
+ System.out.println(
+ "[" + joe.member().id() + "] Joe's metadata: " + joe.metadata().orElse(null));
+
+ Cluster bob =
+ new ClusterImpl()
+ .config(
+ opts ->
+ opts.metadataDecoder(new LongMetadataDecoder())
+ .metadataEncoder(new LongMetadataEncoder())
+ .metadata(456L))
+ .membership(opts -> opts.seedMembers(alice.address()))
+ .startAwait();
+ System.out.println(
+ "[" + bob.member().id() + "] Bob's metadata: " + bob.metadata().orElse(null));
+
+ TimeUnit.SECONDS.sleep(3);
+
+ alice
+ .otherMembers()
+ .forEach(
+ member -> {
+ Long metadata = (Long) alice.metadata(member).orElse(null);
+ System.out.println(
+ "Alice knows [" + member.id() + "] has `" + metadata + "` as a metadata");
+ });
+
+ joe.otherMembers()
+ .forEach(
+ member -> {
+ Long metadata = (Long) alice.metadata(member).orElse(null);
+ System.out.println(
+ "Joe knows [" + member.id() + "] has `" + metadata + "` as a metadata");
+ });
+
+ bob.otherMembers()
+ .forEach(
+ member -> {
+ Long metadata = (Long) alice.metadata(member).orElse(null);
+ System.out.println(
+ "Bob knows [" + member.id() + "] has `" + metadata + "` as a metadata");
+ });
+
+ TimeUnit.SECONDS.sleep(3);
+ }
+
+ static class LongMetadataDecoder implements MetadataDecoder {
+ @Override
+ public Object decode(ByteBuffer buffer) {
+ return buffer.getLong();
+ }
+ }
+
+ static class LongMetadataEncoder implements MetadataEncoder {
+ @Override
+ public ByteBuffer encode(Object metadata) {
+ if (metadata == null) {
+ return null;
+ }
+
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+ buffer.putLong((Long) metadata);
+ buffer.flip();
+ return buffer;
+ }
+ }
+}
diff --git a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java
index 0ecd617c..6d2e7f19 100644
--- a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java
+++ b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java
@@ -17,23 +17,20 @@
public class ClusterJoinExamples {
/** Main method. */
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
// Start seed member Alice
Cluster alice = new ClusterImpl().startAwait();
// Join Bob to cluster with Alice
Cluster bob =
- new ClusterImpl()
- .config(config -> config.membership(opts -> opts.seedMembers(alice.address())))
- .startAwait();
+ new ClusterImpl().membership(opts -> opts.seedMembers(alice.address())).startAwait();
// Join Carol to cluster with metadata
Map metadata = Collections.singletonMap("name", "Carol");
Cluster carol =
new ClusterImpl()
- .config(
- config ->
- config.membership(opts -> opts.seedMembers(alice.address())).metadata(metadata))
+ .config(opts -> opts.metadata(metadata))
+ .membership(opts -> opts.seedMembers(alice.address()))
.startAwait();
// Start Dan on port 3000
diff --git a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java
index f9463fcc..9547d00e 100644
--- a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java
+++ b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java
@@ -29,11 +29,8 @@ public static void main(String[] args) throws Exception {
//noinspection unused
Cluster joe =
new ClusterImpl()
- .config(
- config ->
- config
- .membership(opts -> opts.seedMembers(alice.address()))
- .metadata(Collections.singletonMap("name", "Joe")))
+ .config(opts -> opts.metadata(Collections.singletonMap("name", "Joe")))
+ .membership(opts -> opts.seedMembers(alice.address()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
diff --git a/examples/src/main/java/io/scalecube/examples/GossipExample.java b/examples/src/main/java/io/scalecube/examples/GossipExample.java
index 5c728637..d33ebaaf 100644
--- a/examples/src/main/java/io/scalecube/examples/GossipExample.java
+++ b/examples/src/main/java/io/scalecube/examples/GossipExample.java
@@ -32,7 +32,7 @@ public void onGossip(Message gossip) {
//noinspection unused
Cluster bob =
new ClusterImpl()
- .config(config -> config.membership(opts -> opts.seedMembers(alice.address())))
+ .membership(opts -> opts.seedMembers(alice.address()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
@@ -47,7 +47,7 @@ public void onGossip(Message gossip) {
//noinspection unused
Cluster carol =
new ClusterImpl()
- .config(config -> config.membership(opts -> opts.seedMembers(alice.address())))
+ .membership(opts -> opts.seedMembers(alice.address()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
@@ -62,7 +62,7 @@ public void onGossip(Message gossip) {
//noinspection unused
Cluster dan =
new ClusterImpl()
- .config(config -> config.membership(opts -> opts.seedMembers(alice.address())))
+ .membership(opts -> opts.seedMembers(alice.address()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
@@ -76,9 +76,7 @@ public void onGossip(Message gossip) {
// Start cluster node Eve that joins cluster and spreads gossip
Cluster eve =
- new ClusterImpl()
- .config(config -> config.membership(opts -> opts.seedMembers(alice.address())))
- .startAwait();
+ new ClusterImpl().membership(opts -> opts.seedMembers(alice.address())).startAwait();
eve.spreadGossip(Message.fromData("Gossip from Eve"))
.doOnError(System.err::println)
.subscribe(null, Throwable::printStackTrace);
diff --git a/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java b/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java
index 38321685..e8311358 100644
--- a/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java
+++ b/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java
@@ -26,7 +26,7 @@ public static void main(String[] args) throws Exception {
// Alice init cluster
Cluster alice =
new ClusterImpl()
- .config(options -> options.metadata(Collections.singletonMap("name", "Alice")))
+ .config(opts -> opts.metadata(Collections.singletonMap("name", "Alice")))
.handler(
cluster -> {
return new ClusterMessageHandler() {
@@ -42,11 +42,8 @@ public void onMembershipEvent(MembershipEvent event) {
// Bob join cluster
Cluster bob =
new ClusterImpl()
- .config(
- config ->
- config
- .membership(opts -> opts.seedMembers(alice.address()))
- .metadata(Collections.singletonMap("name", "Bob")))
+ .config(opts -> opts.metadata(Collections.singletonMap("name", "Bob")))
+ .membership(opts -> opts.seedMembers(alice.address()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
@@ -62,11 +59,8 @@ public void onMembershipEvent(MembershipEvent event) {
// Carol join cluster
Cluster carol =
new ClusterImpl()
- .config(
- config ->
- config
- .membership(opts -> opts.seedMembers(alice.address(), bob.address()))
- .metadata(Collections.singletonMap("name", "Carol")))
+ .config(opts -> opts.metadata(Collections.singletonMap("name", "Carol")))
+ .membership(opts -> opts.seedMembers(alice.address(), bob.address()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
diff --git a/examples/src/main/java/io/scalecube/examples/MessagingExample.java b/examples/src/main/java/io/scalecube/examples/MessagingExample.java
index 6dce0ca0..8a695a38 100644
--- a/examples/src/main/java/io/scalecube/examples/MessagingExample.java
+++ b/examples/src/main/java/io/scalecube/examples/MessagingExample.java
@@ -37,7 +37,7 @@ public void onMessage(Message msg) {
// messages
Cluster bob =
new ClusterImpl()
- .config(config -> config.membership(opts -> opts.seedMembers(alice.address())))
+ .membership(opts -> opts.seedMembers(alice.address()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
@@ -55,9 +55,7 @@ public void onMessage(Message msg) {
// Join cluster node Carol to cluster with Alice and Bob
Cluster carol =
new ClusterImpl()
- .config(
- config ->
- config.membership(opts -> opts.seedMembers(alice.address(), bob.address())))
+ .membership(opts -> opts.seedMembers(alice.address(), bob.address()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
diff --git a/pom.xml b/pom.xml
index 62f757b9..7bdf0988 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
4.0.0
scalecube-cluster-parent
- 2.4.2
+ 2.4.3-SNAPSHOT
io.scalecube
scalecube-parent-pom
@@ -17,13 +17,13 @@
scm:git:git@github.com:scalecube/scalecube-cluster.git
scm:git:git@github.com:scalecube/scalecube-cluster.git
- v2.4.2
+ HEAD
1.0.1
1.7.7
- 2.9.9
+ 2.10.0.pr1
2.8.2
Californium-SR8
diff --git a/transport-parent/pom.xml b/transport-parent/pom.xml
index 17623280..27c08cca 100644
--- a/transport-parent/pom.xml
+++ b/transport-parent/pom.xml
@@ -5,7 +5,7 @@
scalecube-cluster-parent
io.scalecube
- 2.4.2
+ 2.4.3-SNAPSHOT
scalecube-transport-parent
diff --git a/transport-parent/transport-api/pom.xml b/transport-parent/transport-api/pom.xml
index 964d9172..202773ca 100644
--- a/transport-parent/transport-api/pom.xml
+++ b/transport-parent/transport-api/pom.xml
@@ -3,7 +3,7 @@
scalecube-transport-parent
io.scalecube
- 2.4.2
+ 2.4.3-SNAPSHOT
4.0.0
diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java
index b7360750..a4da77bb 100644
--- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java
+++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/Message.java
@@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import java.util.StringJoiner;
/**
* The Class Message introduces generic protocol used for point to point communication by transport.
@@ -184,7 +185,11 @@ public Address sender() {
@Override
public String toString() {
- return "Message {headers: " + headers + ", sender: " + sender + ", data: " + data + '}';
+ return new StringJoiner(", ", Message.class.getSimpleName() + "[", "]")
+ .add("sender=" + sender)
+ .add("headers=" + headers)
+ .add("data=" + data)
+ .toString();
}
public static class Builder {
diff --git a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java
index 5b6acf5a..be14c5e4 100644
--- a/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java
+++ b/transport-parent/transport-api/src/main/java/io/scalecube/cluster/transport/api/TransportConfig.java
@@ -1,5 +1,6 @@
package io.scalecube.cluster.transport.api;
+import java.util.StringJoiner;
import reactor.core.Exceptions;
public final class TransportConfig implements Cloneable {
@@ -143,17 +144,12 @@ public TransportConfig clone() {
@Override
public String toString() {
- return "TransportConfig{"
- + "host="
- + host
- + ", port="
- + port
- + ", connectTimeout="
- + connectTimeout
- + ", messageCodec="
- + messageCodec
- + ", maxFrameLength="
- + maxFrameLength
- + '}';
+ return new StringJoiner(", ", TransportConfig.class.getSimpleName() + "[", "]")
+ .add("host='" + host + "'")
+ .add("port=" + port)
+ .add("connectTimeout=" + connectTimeout)
+ .add("messageCodec=" + messageCodec)
+ .add("maxFrameLength=" + maxFrameLength)
+ .toString();
}
}
diff --git a/transport-parent/transport-netty/pom.xml b/transport-parent/transport-netty/pom.xml
index b340c09b..b7bcd890 100644
--- a/transport-parent/transport-netty/pom.xml
+++ b/transport-parent/transport-netty/pom.xml
@@ -5,7 +5,7 @@
scalecube-transport-parent
io.scalecube
- 2.4.2
+ 2.4.3-SNAPSHOT
scalecube-transport-netty
diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java
index 90fd82fd..ea719185 100644
--- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java
+++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java
@@ -44,7 +44,7 @@
public final class TransportImpl implements Transport {
- private static final Logger LOGGER = LoggerFactory.getLogger(TransportImpl.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(Transport.class);
private final TransportConfig config;
private final LoopResources loopResources;