Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 32 additions & 24 deletions cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.scalecube.cluster.metadata.MetadataEncoder;
import io.scalecube.cluster.transport.api.TransportConfig;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.UnaryOperator;
import reactor.core.Exceptions;

Expand Down Expand Up @@ -34,6 +35,7 @@ public final class ClusterConfig implements Cloneable {
private MetadataEncoder metadataEncoder = MetadataEncoder.INSTANCE;
private MetadataDecoder metadataDecoder = MetadataDecoder.INSTANCE;

private String memberId;
private String memberHost;
private Integer memberPort;

Expand Down Expand Up @@ -158,14 +160,30 @@ public String memberHost() {
* Sets a memberHost.
*
* @param memberHost member host
* @return new {@code ClusterConfig} instance@return
* @return new {@code ClusterConfig} instance
*/
public ClusterConfig memberHost(String memberHost) {
ClusterConfig c = clone();
c.memberHost = memberHost;
return c;
}

public String memberId() {
return memberId;
}

/**
* Sets a memberId.
*
* @param memberId member id
* @return new {@code ClusterConfig} instance
*/
public ClusterConfig memberId(String memberId) {
ClusterConfig c = clone();
c.memberId = memberId;
return c;
}

public Integer memberPort() {
return memberPort;
}
Expand Down Expand Up @@ -262,29 +280,19 @@ public ClusterConfig clone() {

@Override
public String toString() {
return "ClusterConfig{"
+ "metadata="
+ metadataAsString()
+ ", metadataTimeout="
+ metadataTimeout
+ ", metadataEncoder="
+ metadataEncoder
+ ", metadataDecoder="
+ metadataDecoder
+ ", memberHost='"
+ memberHost
+ '\''
+ ", memberPort="
+ memberPort
+ ", transportConfig="
+ transportConfig
+ ", failureDetectorConfig="
+ failureDetectorConfig
+ ", gossipConfig="
+ gossipConfig
+ ", membershipConfig="
+ membershipConfig
+ '}';
return new StringJoiner(", ", ClusterConfig.class.getSimpleName() + "[", "]")
.add("metadata=" + metadataAsString())
.add("metadataTimeout=" + metadataTimeout)
.add("metadataEncoder=" + metadataEncoder)
.add("metadataDecoder=" + metadataDecoder)
.add("memberId='" + memberId + "'")
.add("memberHost='" + memberHost + "'")
.add("memberPort=" + memberPort)
.add("transportConfig=" + transportConfig)
.add("failureDetectorConfig=" + failureDetectorConfig)
.add("gossipConfig=" + gossipConfig)
.add("membershipConfig=" + membershipConfig)
.toString();
}

private String metadataAsString() {
Expand Down
2 changes: 1 addition & 1 deletion cluster-api/src/main/java/io/scalecube/cluster/Member.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,6 @@ public int hashCode() {

@Override
public String toString() {
return id + "@" + address;
return id + ":" + address.port();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.scalecube.cluster.fdetector;

import java.util.StringJoiner;
import reactor.core.Exceptions;

public final class FailureDetectorConfig implements Cloneable {
Expand Down Expand Up @@ -121,13 +122,10 @@ public FailureDetectorConfig clone() {

@Override
public String toString() {
return "FailureDetectorConfig{"
+ "pingInterval="
+ pingInterval
+ ", pingTimeout="
+ pingTimeout
+ ", pingReqMembers="
+ pingReqMembers
+ '}';
return new StringJoiner(", ", FailureDetectorConfig.class.getSimpleName() + "[", "]")
.add("pingInterval=" + pingInterval)
.add("pingTimeout=" + pingTimeout)
.add("pingReqMembers=" + pingReqMembers)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.scalecube.cluster.gossip;

import java.util.StringJoiner;
import reactor.core.Exceptions;

public final class GossipConfig implements Cloneable {
Expand Down Expand Up @@ -115,13 +116,10 @@ public GossipConfig clone() {

@Override
public String toString() {
return "GossipConfig{"
+ "gossipFanout="
+ gossipFanout
+ ", gossipInterval="
+ gossipInterval
+ ", gossipRepeatMult="
+ gossipRepeatMult
+ '}';
return new StringJoiner(", ", GossipConfig.class.getSimpleName() + "[", "]")
.add("gossipFanout=" + gossipFanout)
.add("gossipInterval=" + gossipInterval)
.add("gossipRepeatMult=" + gossipRepeatMult)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
import reactor.core.Exceptions;

public final class MembershipConfig implements Cloneable {
Expand Down Expand Up @@ -167,18 +168,12 @@ public MembershipConfig clone() {

@Override
public String toString() {
return "MembershipConfig{"
+ "seedMembers="
+ seedMembers
+ ", syncInterval="
+ syncInterval
+ ", syncTimeout="
+ syncTimeout
+ ", suspicionMult="
+ suspicionMult
+ ", syncGroup='"
+ syncGroup
+ '\''
+ '}';
return new StringJoiner(", ", MembershipConfig.class.getSimpleName() + "[", "]")
.add("seedMembers=" + seedMembers)
.add("syncInterval=" + syncInterval)
.add("syncTimeout=" + syncTimeout)
.add("suspicionMult=" + suspicionMult)
.add("syncGroup='" + syncGroup + "'")
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.scalecube.cluster.Member;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.StringJoiner;

/**
* Event which is emitted on cluster membership changes when new member added, updated in the
Expand Down Expand Up @@ -97,15 +98,12 @@ public ByteBuffer newMetadata() {

@Override
public String toString() {
return "MembershipEvent{type="
+ type
+ ", member="
+ member
+ ", newMetadata="
+ metadataAsString(newMetadata)
+ ", oldMetadata="
+ metadataAsString(oldMetadata)
+ '}';
return new StringJoiner(", ", MembershipEvent.class.getSimpleName() + "[", "]")
.add("type=" + type)
.add("member=" + member)
.add("oldMetadata=" + metadataAsString(oldMetadata))
.add("newMetadata=" + metadataAsString(newMetadata))
.toString();
}

private String metadataAsString(ByteBuffer metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -369,7 +370,10 @@ public long evaluateDelay() {

@Override
public String toString() {
return "OutboundSettings{loss=" + lossPercent + ", delay=" + meanDelay + '}';
return new StringJoiner(", ", OutboundSettings.class.getSimpleName() + "[", "]")
.add("lossPercent=" + lossPercent)
.add("meanDelay=" + meanDelay)
.toString();
}
}

Expand Down Expand Up @@ -405,7 +409,9 @@ public boolean shallPass() {

@Override
public String toString() {
return "InboundSettings{shallPass=" + shallPass + '}';
return new StringJoiner(", ", InboundSettings.class.getSimpleName() + "[", "]")
.add("shallPass=" + shallPass)
.toString();
}
}
}
32 changes: 15 additions & 17 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
/** Cluster implementation. */
public final class ClusterImpl implements Cluster {

private static final Logger LOGGER = LoggerFactory.getLogger(ClusterImpl.class);
private static final Logger LOGGER = LoggerFactory.getLogger(Cluster.class);

private static final Set<String> SYSTEM_MESSAGES =
Collections.unmodifiableSet(
Expand Down Expand Up @@ -340,7 +340,11 @@ private Member createLocalMember(int listenPort) {
.map(memberHost -> Address.create(memberHost, port))
.orElseGet(() -> Address.create(localAddress, listenPort));

return new Member(memberAddress);
if (config.memberId() != null) {
return new Member(config.memberId(), memberAddress);
} else {
return new Member(memberAddress);
}
}

@Override
Expand Down Expand Up @@ -433,7 +437,7 @@ private Mono<Void> doShutdown() {
return Mono.defer(
() -> {
LOGGER.info("Cluster member {} is shutting down", localMember);
return Flux.concatDelayError(leaveCluster(localMember), dispose(), transport.stop())
return Flux.concatDelayError(leaveCluster(), dispose(), transport.stop())
.then()
.doFinally(s -> scheduler.dispose())
.doOnSuccess(
Expand All @@ -445,21 +449,15 @@ private Mono<Void> doShutdown() {
});
}

private Mono<Void> leaveCluster(Member member) {
private Mono<Void> leaveCluster() {
return membership
.leaveCluster()
.subscribeOn(scheduler)
.doOnSuccess(
s ->
LOGGER.debug(
"Cluster member {} notified about his leaving and shutting down", member))
.doOnSuccess(s -> LOGGER.debug("Cluster member {} has left a cluster", localMember))
.doOnError(
ex ->
LOGGER.info(
"Cluster member {} failed to spread leave notification "
+ "to other cluster members: {}",
member,
ex.toString()))
"Cluster member {} failed on leaveCluster: {}", localMember, ex.toString()))
.then();
}

Expand Down Expand Up @@ -489,9 +487,9 @@ public boolean isShutdown() {

public interface MonitorMBean {

Collection<String> getMember();
Collection<String> getMemberId();

String getMemberAsString();
String getMemberIdAsString();

Collection<String> getMetadata();

Expand All @@ -517,13 +515,13 @@ private static JmxMonitorMBean start(ClusterImpl cluster) throws Exception {
}

@Override
public Collection<String> getMember() {
public Collection<String> getMemberId() {
return Collections.singleton(cluster.member().id());
}

@Override
public String getMemberAsString() {
return getMember().iterator().next();
public String getMemberIdAsString() {
return getMemberId().iterator().next();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.scalecube.cluster.Member;
import io.scalecube.cluster.membership.MemberStatus;
import java.util.StringJoiner;

/** CLass contains result of ping check. */
public final class FailureDetectorEvent {
Expand All @@ -24,6 +25,9 @@ public MemberStatus status() {

@Override
public String toString() {
return "FailureDetectorEvent{member=" + member + ", status=" + status + '}';
return new StringJoiner(", ", FailureDetectorEvent.class.getSimpleName() + "[", "]")
.add("member=" + member)
.add("status=" + status)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

public final class FailureDetectorImpl implements FailureDetector {

private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetectorImpl.class);
private static final Logger LOGGER = LoggerFactory.getLogger(FailureDetector.class);

// Qualifiers

Expand Down
17 changes: 7 additions & 10 deletions cluster/src/main/java/io/scalecube/cluster/fdetector/PingData.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.scalecube.cluster.fdetector;

import io.scalecube.cluster.Member;
import java.util.StringJoiner;

/** DTO class. Supports FailureDetector messages (Ping, Ack, PingReq). */
final class PingData {
Expand Down Expand Up @@ -79,15 +80,11 @@ public PingData withAckType(AckType ackType) {

@Override
public String toString() {
return "PingData{"
+ "from="
+ from
+ ", to="
+ to
+ ", originalIssuer="
+ originalIssuer
+ ", ackType="
+ ackType
+ '}';
return new StringJoiner(", ", PingData.class.getSimpleName() + "[", "]")
.add("from=" + from)
.add("to=" + to)
.add("originalIssuer=" + originalIssuer)
.add("ackType=" + ackType)
.toString();
}
}
Loading