From 2e3d8f1173199c2262d98614532907baed2664e6 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Fri, 20 Sep 2019 13:15:11 +0300 Subject: [PATCH 1/2] Added AbstractMonitorMBean --- .../cluster/AbstractMonitorMBean.java | 40 ++++++++++++++ .../io/scalecube/cluster/ClusterImpl.java | 55 ++++++++++++------- .../membership/MembershipProtocolImpl.java | 37 ++++++------- 3 files changed, 92 insertions(+), 40 deletions(-) create mode 100644 cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java diff --git a/cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java b/cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java new file mode 100644 index 00000000..a8400fda --- /dev/null +++ b/cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java @@ -0,0 +1,40 @@ +package io.scalecube.cluster; + +import java.lang.management.ManagementFactory; +import javax.management.MBeanServer; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.StandardMBean; + +public abstract class AbstractMonitorMBean { + + /** + * Registers monitor mbean. + * + * @param other monitor mbean instance + * @throws Exception in case of error + * @return object instance + */ + public static ObjectInstance register(AbstractMonitorMBean other) throws Exception { + Object bean = other.getBeanType().cast(other); + //noinspection unchecked + StandardMBean standardMBean = new StandardMBean(bean, other.getBeanType()); + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + ObjectName objectName = new ObjectName(other.getObjectName()); + return server.registerMBean(standardMBean, objectName); + } + + /** + * Returns bean type. + * + * @return bean type + */ + protected abstract Class getBeanType(); + + /** + * Returns jmx object name. + * + * @return object name + */ + protected abstract String getObjectName(); +} diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index ad5df0f3..7eb34127 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -14,7 +14,6 @@ import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.net.Address; import io.scalecube.transport.netty.TransportImpl; -import java.lang.management.ManagementFactory; import java.util.Collection; import java.util.Collections; import java.util.Objects; @@ -24,9 +23,6 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.Stream; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import javax.management.StandardMBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; @@ -277,7 +273,7 @@ private Mono doStart0() { .then(Mono.fromRunnable(() -> metadataStore.start())) .then(Mono.fromRunnable(this::startHandler)) .then((membership.start())) - .then(Mono.fromCallable(() -> JmxMonitorMBean.start(this))); + .then(startJmxMonitor()); }) .thenReturn(this); } @@ -304,6 +300,10 @@ private void startHandler() { actionsDisposables.add(listenGossip().subscribe(handler::onGossip, this::onError)); } + private Mono startJmxMonitor() { + return Mono.fromCallable(() -> AbstractMonitorMBean.register(new JmxMonitorMBean(this))).then(); + } + private void onError(Throwable th) { LOGGER.error("Received unexpected error: ", th); } @@ -485,18 +485,23 @@ public boolean isShutdown() { return onShutdown.isDisposed(); } + @SuppressWarnings("unused") public interface MonitorMBean { - Collection getMemberId(); + Collection getId(); + + String getIdAsString(); + + Collection getAddress(); - String getMemberIdAsString(); + String getAddressAsString(); Collection getMetadata(); String getMetadataAsString(); } - public static class JmxMonitorMBean implements MonitorMBean { + public static class JmxMonitorMBean extends AbstractMonitorMBean implements MonitorMBean { private final ClusterImpl cluster; @@ -504,24 +509,24 @@ private JmxMonitorMBean(ClusterImpl cluster) { this.cluster = cluster; } - private static JmxMonitorMBean start(ClusterImpl cluster) throws Exception { - JmxMonitorMBean monitorMBean = new JmxMonitorMBean(cluster); - MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - StandardMBean standardMBean = new StandardMBean(monitorMBean, MonitorMBean.class); - ObjectName objectName = - new ObjectName("io.scalecube.cluster:name=Cluster@" + cluster.member().id()); - server.registerMBean(standardMBean, objectName); - return monitorMBean; + @Override + public Collection getId() { + return Collections.singleton(cluster.member().id()); } @Override - public Collection getMemberId() { - return Collections.singleton(cluster.member().id()); + public String getIdAsString() { + return getId().iterator().next(); + } + + @Override + public Collection getAddress() { + return Collections.singleton(String.valueOf(cluster.member().address())); } @Override - public String getMemberIdAsString() { - return getMemberId().iterator().next(); + public String getAddressAsString() { + return getAddress().iterator().next(); } @Override @@ -534,6 +539,16 @@ public Collection getMetadata() { public String getMetadataAsString() { return getMetadata().iterator().next(); } + + @Override + protected Class getBeanType() { + return MonitorMBean.class; + } + + @Override + protected String getObjectName() { + return "io.scalecube.cluster:name=Cluster@" + cluster.member().id(); + } } private static class SenderAwareTransport implements Transport { diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index 212374d4..1a5e3fd2 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -3,6 +3,7 @@ import static io.scalecube.cluster.membership.MemberStatus.ALIVE; import static io.scalecube.cluster.membership.MemberStatus.DEAD; +import io.scalecube.cluster.AbstractMonitorMBean; import io.scalecube.cluster.ClusterConfig; import io.scalecube.cluster.ClusterMath; import io.scalecube.cluster.CorrelationIdGenerator; @@ -15,7 +16,6 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.net.Address; -import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; @@ -33,9 +33,6 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; -import javax.management.MBeanServer; -import javax.management.ObjectName; -import javax.management.StandardMBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; @@ -214,9 +211,7 @@ public Mono leaveCluster() { @Override public Mono start() { // Make initial sync with all seed members - return Mono.create(this::start0) - .then(Mono.fromCallable(() -> JmxMonitorMBean.start(this))) - .then(); + return Mono.create(this::start0).then(startJmxMonitor()); } private void start0(MonoSink sink) { @@ -256,6 +251,10 @@ private void start0(MonoSink sink) { null, ex -> LOGGER.debug("Exception on initial SyncAck, cause: {}", ex.toString())); } + private Mono startJmxMonitor() { + return Mono.fromCallable(() -> AbstractMonitorMBean.register(new JmxMonitorMBean(this))).then(); + } + @Override public void stop() { // Stop accepting requests, events and sending sync @@ -735,7 +734,7 @@ public interface MonitorMBean { String getDeadMembersAsString(); } - public static class JmxMonitorMBean implements MonitorMBean { + public static class JmxMonitorMBean extends AbstractMonitorMBean implements MonitorMBean { public static final int REMOVED_MEMBERS_HISTORY_SIZE = 42; @@ -751,18 +750,6 @@ private JmxMonitorMBean(MembershipProtocolImpl membershipProtocol) { .subscribe(removedMembersHistory); } - private static JmxMonitorMBean start(MembershipProtocolImpl membershipProtocol) - throws Exception { - JmxMonitorMBean monitorMBean = new JmxMonitorMBean(membershipProtocol); - MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - ObjectName objectName = - new ObjectName( - "io.scalecube.cluster:name=Membership@" + membershipProtocol.localMember.id()); - StandardMBean standardMBean = new StandardMBean(monitorMBean, MonitorMBean.class); - server.registerMBean(standardMBean, objectName); - return monitorMBean; - } - @Override public int getIncarnation() { Map membershipTable = membershipProtocol.membershipTable; @@ -809,5 +796,15 @@ private List findRecordsByCondition(Predicate conditio .map(Member::toString) .collect(Collectors.toList()); } + + @Override + protected Class getBeanType() { + return MonitorMBean.class; + } + + @Override + protected String getObjectName() { + return "io.scalecube.cluster:name=Membership@" + membershipProtocol.localMember.id(); + } } } From 2df9fdee899f9213308e246ecd4649ed863ce7e8 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Fri, 20 Sep 2019 14:32:16 +0300 Subject: [PATCH 2/2] Enhanced cluster.jmx-monitor: exposed few more properties; changed constructor for Member; removed `memberId` instead started to use `memberAlias` --- .../io/scalecube/cluster/ClusterConfig.java | 16 +++--- .../java/io/scalecube/cluster/Member.java | 35 ++++++------ .../cluster/AbstractMonitorMBean.java | 2 +- .../io/scalecube/cluster/ClusterImpl.java | 33 +++++++----- .../membership/MembershipProtocolImpl.java | 53 ++++++++++--------- .../cluster/membership/MembershipRecord.java | 9 ---- .../fdetector/FailureDetectorTest.java | 5 +- .../cluster/gossip/GossipProtocolTest.java | 5 +- .../cluster/gossip/GossipRequestTest.java | 2 +- .../membership/MembershipProtocolTest.java | 2 +- .../membership/MembershipRecordTest.java | 4 +- .../examples/ClusterJoinExamples.java | 11 ++-- 12 files changed, 94 insertions(+), 83 deletions(-) diff --git a/cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java b/cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java index dc7ab51e..7926c543 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/ClusterConfig.java @@ -35,7 +35,7 @@ public final class ClusterConfig implements Cloneable { private MetadataEncoder metadataEncoder = MetadataEncoder.INSTANCE; private MetadataDecoder metadataDecoder = MetadataDecoder.INSTANCE; - private String memberId; + private String memberAlias; private String memberHost; private Integer memberPort; @@ -168,19 +168,19 @@ public ClusterConfig memberHost(String memberHost) { return c; } - public String memberId() { - return memberId; + public String memberAlias() { + return memberAlias; } /** - * Sets a memberId. + * Sets a memberAlias. * - * @param memberId member id + * @param memberAlias member alias * @return new {@code ClusterConfig} instance */ - public ClusterConfig memberId(String memberId) { + public ClusterConfig memberAlias(String memberAlias) { ClusterConfig c = clone(); - c.memberId = memberId; + c.memberAlias = memberAlias; return c; } @@ -285,7 +285,7 @@ public String toString() { .add("metadataTimeout=" + metadataTimeout) .add("metadataEncoder=" + metadataEncoder) .add("metadataDecoder=" + metadataDecoder) - .add("memberId='" + memberId + "'") + .add("memberAlias='" + memberAlias + "'") .add("memberHost='" + memberHost + "'") .add("memberPort=" + memberPort) .add("transportConfig=" + transportConfig) diff --git a/cluster-api/src/main/java/io/scalecube/cluster/Member.java b/cluster-api/src/main/java/io/scalecube/cluster/Member.java index fe9a0f22..3ad454b9 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/Member.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/Member.java @@ -11,41 +11,38 @@ public final class Member { private String id; + private String alias; private Address address; /** Instantiates empty member for deserialization purpose. */ Member() {} /** - * Create instance of cluster member by given address; member id will be generated by {@link - * #generateId()}. - * - * @param address address on which given member listens for incoming messages - */ - public Member(Address address) { - this(generateId(), address); - } - - /** - * Create instance of cluster member with given parameters. + * Constructor. * * @param id member id - * @param address address on which given member listens for incoming messages + * @param alias member alias (optional) + * @param address member address */ - public Member(String id, Address address) { - this.id = Objects.requireNonNull(id); - this.address = Objects.requireNonNull(address); + public Member(String id, String alias, Address address) { + this.id = Objects.requireNonNull(id, "member id"); + this.alias = alias; // optional + this.address = Objects.requireNonNull(address, "member address"); } public String id() { return id; } + public String alias() { + return alias; + } + public Address address() { return address; } - private static String generateId() { + public static String generateId() { return Long.toHexString(UUID.randomUUID().getMostSignificantBits() & Long.MAX_VALUE); } @@ -68,6 +65,10 @@ public int hashCode() { @Override public String toString() { - return id + ":" + address.port(); + if (alias == null) { + return id + "@" + address; + } else { + return alias + "/" + id + "@" + address; + } } } diff --git a/cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java b/cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java index a8400fda..87129564 100644 --- a/cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java +++ b/cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java @@ -12,8 +12,8 @@ public abstract class AbstractMonitorMBean { * Registers monitor mbean. * * @param other monitor mbean instance - * @throws Exception in case of error * @return object instance + * @throws Exception in case of error */ public static ObjectInstance register(AbstractMonitorMBean other) throws Exception { Object bean = other.getBeanType().cast(other); diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 7eb34127..8f0a878d 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -340,11 +340,7 @@ private Member createLocalMember(int listenPort) { .map(memberHost -> Address.create(memberHost, port)) .orElseGet(() -> Address.create(localAddress, listenPort)); - if (config.memberId() != null) { - return new Member(config.memberId(), memberAddress); - } else { - return new Member(memberAddress); - } + return new Member(Member.generateId(), config.memberAlias(), memberAddress); } @Override @@ -492,6 +488,10 @@ public interface MonitorMBean { String getIdAsString(); + Collection getAlias(); + + String getAliasAsString(); + Collection getAddress(); String getAddressAsString(); @@ -511,33 +511,42 @@ private JmxMonitorMBean(ClusterImpl cluster) { @Override public Collection getId() { - return Collections.singleton(cluster.member().id()); + return Collections.singleton(getIdAsString()); } @Override public String getIdAsString() { - return getId().iterator().next(); + return cluster.member().id(); + } + + @Override + public Collection getAlias() { + return Collections.singleton(getAliasAsString()); + } + + @Override + public String getAliasAsString() { + return cluster.member().alias(); } @Override public Collection getAddress() { - return Collections.singleton(String.valueOf(cluster.member().address())); + return Collections.singleton(getAddressAsString()); } @Override public String getAddressAsString() { - return getAddress().iterator().next(); + return String.valueOf(cluster.member().address()); } @Override public Collection getMetadata() { - return Collections.singletonList( - String.valueOf(cluster.metadataStore.metadata().map(Object::toString).orElse(null))); + return Collections.singletonList(getMetadataAsString()); } @Override public String getMetadataAsString() { - return getMetadata().iterator().next(); + return String.valueOf(cluster.metadataStore.metadata().map(Object::toString).orElse(null)); } @Override diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index 1a5e3fd2..e7ec2d7d 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -482,7 +482,7 @@ private Mono updateMembership(MembershipRecord r1, MembershipUpdateReason () -> { Objects.requireNonNull(r1, "Membership record can't be null"); // Get current record - MembershipRecord r0 = membershipTable.get(r1.id()); + MembershipRecord r0 = membershipTable.get(r1.member().id()); // Check if new record r1 overrides existing membership record r0 if (r1.equals(r0) || !r1.isOverrides(r0)) { @@ -509,7 +509,7 @@ private Mono updateMembership(MembershipRecord r1, MembershipUpdateReason if (r1.isSuspect()) { // Update membership and schedule/cancel suspicion timeout task - membershipTable.put(r1.id(), r1); + membershipTable.put(r1.member().id(), r1); scheduleSuspicionTimeoutTask(r1); spreadMembershipGossipUnlessGossiped(r1, reason); } @@ -530,7 +530,7 @@ private Mono updateMembership(MembershipRecord r1, MembershipUpdateReason .doOnSuccess( metadata1 -> { // If metadata was received then member is Alive - cancelSuspicionTimeoutTask(r1.id()); + cancelSuspicionTimeoutTask(r1.member().id()); spreadMembershipGossipUnlessGossiped(r1, reason); // Update membership ByteBuffer metadata0 = metadataStore.updateMetadata(r1.member(), metadata1); @@ -567,28 +567,32 @@ private Mono onSelfMemberDetected( }); } - private Mono onDeadMemberDetected(MembershipRecord r1) { + private Mono onDeadMemberDetected(MembershipRecord r) { return Mono.fromRunnable( () -> { - cancelSuspicionTimeoutTask(r1.id()); - if (!members.containsKey(r1.id())) { + final Member member = r.member(); + + cancelSuspicionTimeoutTask(member.id()); + + if (!members.containsKey(member.id())) { return; } + // Update membership - members.remove(r1.id()); - membershipTable.remove(r1.id()); + members.remove(member.id()); + membershipTable.remove(member.id()); // removed - ByteBuffer metadata0 = metadataStore.removeMetadata(r1.member()); - MembershipEvent event = MembershipEvent.createRemoved(r1.member(), metadata0); + ByteBuffer metadata0 = metadataStore.removeMetadata(member); + MembershipEvent event = MembershipEvent.createRemoved(member, metadata0); LOGGER_MEMBERSHIP.debug("Emitting membership event {}", event); sink.next(event); }); } private void onAliveMemberDetected( - MembershipRecord r1, ByteBuffer metadata0, ByteBuffer metadata1) { + MembershipRecord r, ByteBuffer metadata0, ByteBuffer metadata1) { - final Member member = r1.member(); + final Member member = r.member(); boolean memberExists = members.containsKey(member.id()); @@ -600,7 +604,7 @@ private void onAliveMemberDetected( } members.put(member.id(), member); - membershipTable.put(member.id(), r1); + membershipTable.put(member.id(), r); if (event != null) { LOGGER_MEMBERSHIP.debug("Emitting membership event {}", event); @@ -616,7 +620,7 @@ private void cancelSuspicionTimeoutTask(String memberId) { } } - private void scheduleSuspicionTimeoutTask(MembershipRecord record) { + private void scheduleSuspicionTimeoutTask(MembershipRecord r) { long suspicionTimeout = ClusterMath.suspicionTimeout( membershipConfig.suspicionMult(), @@ -624,7 +628,7 @@ private void scheduleSuspicionTimeoutTask(MembershipRecord record) { failureDetectorConfig.pingInterval()); suspicionTimeoutTasks.computeIfAbsent( - record.id(), + r.member().id(), id -> { LOGGER.debug( "Scheduled SuspicionTimeoutTask for {}, suspicionTimeout {}", id, suspicionTimeout); @@ -635,29 +639,28 @@ private void scheduleSuspicionTimeoutTask(MembershipRecord record) { private void onSuspicionTimeout(String memberId) { suspicionTimeoutTasks.remove(memberId); - MembershipRecord record = membershipTable.get(memberId); - if (record != null) { - LOGGER.debug("Declare SUSPECTED member {} as DEAD by timeout", record); - MembershipRecord deadRecord = - new MembershipRecord(record.member(), DEAD, record.incarnation()); + MembershipRecord r = membershipTable.get(memberId); + if (r != null) { + LOGGER.debug("Declare SUSPECTED member {} as DEAD by timeout", r); + MembershipRecord deadRecord = new MembershipRecord(r.member(), DEAD, r.incarnation()); updateMembership(deadRecord, MembershipUpdateReason.SUSPICION_TIMEOUT) .subscribe(null, this::onError); } } private void spreadMembershipGossipUnlessGossiped( - MembershipRecord r1, MembershipUpdateReason reason) { + MembershipRecord r, MembershipUpdateReason reason) { // Spread gossip (unless already gossiped) if (reason != MembershipUpdateReason.MEMBERSHIP_GOSSIP && reason != MembershipUpdateReason.INITIAL_SYNC) { - spreadMembershipGossip(r1).doOnError(this::onErrorIgnore).subscribe(); + spreadMembershipGossip(r).doOnError(this::onErrorIgnore).subscribe(); } } - private Mono spreadMembershipGossip(MembershipRecord record) { + private Mono spreadMembershipGossip(MembershipRecord r) { return Mono.defer( () -> { - Message msg = Message.withData(record).qualifier(MEMBERSHIP_GOSSIP).build(); + Message msg = Message.withData(r).qualifier(MEMBERSHIP_GOSSIP).build(); LOGGER.debug("Spead membreship: {} with gossip", msg); return gossipProtocol .spread(msg) @@ -792,7 +795,7 @@ public String getDeadMembersAsString() { private List findRecordsByCondition(Predicate condition) { return membershipProtocol.getMembershipRecords().stream() .filter(condition) - .map(record -> new Member(record.id(), record.address())) + .map(MembershipRecord::member) .map(Member::toString) .collect(Collectors.toList()); } diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipRecord.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipRecord.java index f6ba474f..7512d919 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipRecord.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipRecord.java @@ -5,7 +5,6 @@ import static io.scalecube.cluster.membership.MemberStatus.SUSPECT; import io.scalecube.cluster.Member; -import io.scalecube.net.Address; import java.util.Objects; /** Cluster membership record which represents member, status, and incarnation. */ @@ -29,14 +28,6 @@ public Member member() { return member; } - public String id() { - return member.id(); - } - - public Address address() { - return member.address(); - } - public MemberStatus status() { return status; } diff --git a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java index 067135ca..49780bcf 100644 --- a/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/fdetector/FailureDetectorTest.java @@ -409,12 +409,13 @@ private FailureDetectorImpl createFd(Transport transport, List
members) private FailureDetectorImpl createFd( Transport transport, List
addresses, FailureDetectorConfig config) { - Member localMember = new Member("member-" + transport.address().port(), transport.address()); + Member localMember = + new Member("member-" + transport.address().port(), null, transport.address()); Flux membershipFlux = Flux.fromIterable(addresses) .filter(address -> !transport.address().equals(address)) - .map(address -> new Member("member-" + address.port(), address)) + .map(address -> new Member("member-" + address.port(), null, address)) .map(member -> MembershipEvent.createAdded(member, null)); CorrelationIdGenerator cidGenerator = new CorrelationIdGenerator(localMember.id()); diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java index f21cccc4..3ef98286 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipProtocolTest.java @@ -255,12 +255,13 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List
.gossipInterval(gossipInterval) .gossipRepeatMult(gossipRepeatMultiplier); - Member localMember = new Member("member-" + transport.address().port(), transport.address()); + Member localMember = + new Member("member-" + transport.address().port(), null, transport.address()); Flux membershipFlux = Flux.fromIterable(members) .filter(address -> !transport.address().equals(address)) - .map(address -> new Member("member-" + address.port(), address)) + .map(address -> new Member("member-" + address.port(), null, address)) .map(member -> MembershipEvent.createAdded(member, null)); GossipProtocolImpl gossipProtocol = diff --git a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java index 0a6dd714..d7bf5033 100644 --- a/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/gossip/GossipRequestTest.java @@ -38,7 +38,7 @@ public void init() { @Test public void testSerializationAndDeserialization() throws Exception { - Member from = new Member("0", Address.from("localhost:1234")); + Member from = new Member("0", null, Address.from("localhost:1234")); List gossips = getGossips(); Message message = Message.withData(new GossipRequest(gossips, from.id())).correlationId("CORR_ID").build(); diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index cf4530d0..503c6181 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -933,7 +933,7 @@ private MembershipProtocolImpl createMembership( } private MembershipProtocolImpl createMembership(Transport transport, ClusterConfig config) { - Member localMember = new Member(transport.address()); + Member localMember = new Member(Member.generateId(), null, transport.address()); DirectProcessor membershipProcessor = DirectProcessor.create(); FluxSink membershipSink = membershipProcessor.sink(); diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipRecordTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipRecordTest.java index 939586e0..eb51f1e7 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipRecordTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipRecordTest.java @@ -14,8 +14,8 @@ public class MembershipRecordTest extends BaseTest { - private final Member member = new Member("0", Address.from("localhost:1234")); - private final Member anotherMember = new Member("1", Address.from("localhost:4567")); + private final Member member = new Member("0", null, Address.from("localhost:1234")); + private final Member anotherMember = new Member("1", null, Address.from("localhost:4567")); private final MembershipRecord r0Null = null; diff --git a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java index 6d2e7f19..a874f9b1 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java @@ -19,23 +19,27 @@ public class ClusterJoinExamples { /** Main method. */ public static void main(String[] args) { // Start seed member Alice - Cluster alice = new ClusterImpl().startAwait(); + Cluster alice = new ClusterImpl().config(opts -> opts.memberAlias("Alice")).startAwait(); // Join Bob to cluster with Alice Cluster bob = - new ClusterImpl().membership(opts -> opts.seedMembers(alice.address())).startAwait(); + new ClusterImpl() + .config(opts -> opts.memberAlias("Bob")) + .membership(opts -> opts.seedMembers(alice.address())) + .startAwait(); // Join Carol to cluster with metadata Map metadata = Collections.singletonMap("name", "Carol"); Cluster carol = new ClusterImpl() - .config(opts -> opts.metadata(metadata)) + .config(opts -> opts.memberAlias("Carol").metadata(metadata)) .membership(opts -> opts.seedMembers(alice.address())) .startAwait(); // Start Dan on port 3000 ClusterConfig configWithFixedPort = new ClusterConfig() + .memberAlias("Dan") .membership(opts -> opts.seedMembers(alice.address())) .transport(opts -> opts.port(3000)); Cluster dan = new ClusterImpl(configWithFixedPort).startAwait(); @@ -43,6 +47,7 @@ public static void main(String[] args) { // Start Eve in separate cluster (separate sync group) ClusterConfig configWithSyncGroup = new ClusterConfig() + .memberAlias("Eve") .membership( opts -> opts.seedMembers(