diff --git a/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java b/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java index f22a44d8..0b6b37c7 100644 --- a/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java +++ b/cluster-api/src/main/java/io/scalecube/cluster/membership/MembershipConfig.java @@ -29,6 +29,7 @@ public final class MembershipConfig implements Cloneable { private int syncTimeout = DEFAULT_SYNC_TIMEOUT; private int suspicionMult = DEFAULT_SUSPICION_MULT; private String syncGroup = "default"; + private int removedMembersHistorySize = 42; public MembershipConfig() {} @@ -157,6 +158,22 @@ public MembershipConfig syncGroup(String syncGroup) { return m; } + public int removedMembersHistorySize() { + return removedMembersHistorySize; + } + + /** + * Sets a removedMembersHistorySize. + * + * @param removedMembersHistorySize history size for remove members + * @return new {@code MembershipConfig} instance + */ + public MembershipConfig removedMembersHistorySize(int removedMembersHistorySize) { + MembershipConfig m = clone(); + m.removedMembersHistorySize = removedMembersHistorySize; + return m; + } + @Override public MembershipConfig clone() { try { @@ -174,6 +191,7 @@ public String toString() { .add("syncTimeout=" + syncTimeout) .add("suspicionMult=" + suspicionMult) .add("syncGroup='" + syncGroup + "'") + .add("removedMembersHistorySize=" + removedMembersHistorySize) .toString(); } } diff --git a/cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java b/cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java deleted file mode 100644 index 87129564..00000000 --- a/cluster/src/main/java/io/scalecube/cluster/AbstractMonitorMBean.java +++ /dev/null @@ -1,40 +0,0 @@ -package io.scalecube.cluster; - -import java.lang.management.ManagementFactory; -import javax.management.MBeanServer; -import javax.management.ObjectInstance; -import javax.management.ObjectName; -import javax.management.StandardMBean; - -public abstract class AbstractMonitorMBean { - - /** - * Registers monitor mbean. - * - * @param other monitor mbean instance - * @return object instance - * @throws Exception in case of error - */ - public static ObjectInstance register(AbstractMonitorMBean other) throws Exception { - Object bean = other.getBeanType().cast(other); - //noinspection unchecked - StandardMBean standardMBean = new StandardMBean(bean, other.getBeanType()); - MBeanServer server = ManagementFactory.getPlatformMBeanServer(); - ObjectName objectName = new ObjectName(other.getObjectName()); - return server.registerMBean(standardMBean, objectName); - } - - /** - * Returns bean type. - * - * @return bean type - */ - protected abstract Class getBeanType(); - - /** - * Returns jmx object name. - * - * @return object name - */ - protected abstract String getObjectName(); -} diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 8f0a878d..718602df 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -9,11 +9,15 @@ import io.scalecube.cluster.membership.MembershipProtocolImpl; import io.scalecube.cluster.metadata.MetadataStore; import io.scalecube.cluster.metadata.MetadataStoreImpl; +import io.scalecube.cluster.monitor.ClusterMonitorMBean; +import io.scalecube.cluster.monitor.ClusterMonitorModel; +import io.scalecube.cluster.monitor.JmxClusterMonitorMBean; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.net.Address; import io.scalecube.transport.netty.TransportImpl; +import java.lang.management.ManagementFactory; import java.util.Collection; import java.util.Collections; import java.util.Objects; @@ -23,6 +27,10 @@ import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.management.MBeanServer; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.StandardMBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; @@ -82,6 +90,7 @@ public final class ClusterImpl implements Cluster { private MetadataStore metadataStore; private Scheduler scheduler; private CorrelationIdGenerator cidGenerator; + private ClusterMonitorModel.Builder monitorModelBuilder; public ClusterImpl() { this(ClusterConfig.defaultConfig()); @@ -228,6 +237,7 @@ private Mono doStart0() { cidGenerator = new CorrelationIdGenerator(localMember.id()); scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true); + monitorModelBuilder = new ClusterMonitorModel.Builder(); failureDetector = new FailureDetectorImpl( @@ -259,14 +269,16 @@ private Mono doStart0() { metadataStore, config, scheduler, - cidGenerator); + cidGenerator, + monitorModelBuilder); actionsDisposables.add( + // Retransmit inner membership events to public api layer membership .listen() /*.publishOn(scheduler)*/ - // dont uncomment, already beign executed inside sc-cluster thread - .subscribe(membershipSink::next, this::onError)); + // Dont uncomment, already beign executed inside sc-cluster thread + .subscribe(membershipSink::next, this::onError, membershipSink::complete)); return Mono.fromRunnable(() -> failureDetector.start()) .then(Mono.fromRunnable(() -> gossip.start())) @@ -301,7 +313,18 @@ private void startHandler() { } private Mono startJmxMonitor() { - return Mono.fromCallable(() -> AbstractMonitorMBean.register(new JmxMonitorMBean(this))).then(); + return Mono.fromCallable(this::startJmxMonitor0).then(); + } + + private ObjectInstance startJmxMonitor0() throws Exception { + ClusterMonitorModel monitorModel = monitorModelBuilder.config(config).cluster(this).build(); + + JmxClusterMonitorMBean bean = new JmxClusterMonitorMBean(monitorModel); + StandardMBean standardMBean = new StandardMBean(bean, ClusterMonitorMBean.class); + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + ObjectName objectName = new ObjectName("io.scalecube.cluster:name=Cluster@" + member().id()); + + return server.registerMBean(standardMBean, objectName); } private void onError(Throwable th) { @@ -481,85 +504,6 @@ public boolean isShutdown() { return onShutdown.isDisposed(); } - @SuppressWarnings("unused") - public interface MonitorMBean { - - Collection getId(); - - String getIdAsString(); - - Collection getAlias(); - - String getAliasAsString(); - - Collection getAddress(); - - String getAddressAsString(); - - Collection getMetadata(); - - String getMetadataAsString(); - } - - public static class JmxMonitorMBean extends AbstractMonitorMBean implements MonitorMBean { - - private final ClusterImpl cluster; - - private JmxMonitorMBean(ClusterImpl cluster) { - this.cluster = cluster; - } - - @Override - public Collection getId() { - return Collections.singleton(getIdAsString()); - } - - @Override - public String getIdAsString() { - return cluster.member().id(); - } - - @Override - public Collection getAlias() { - return Collections.singleton(getAliasAsString()); - } - - @Override - public String getAliasAsString() { - return cluster.member().alias(); - } - - @Override - public Collection getAddress() { - return Collections.singleton(getAddressAsString()); - } - - @Override - public String getAddressAsString() { - return String.valueOf(cluster.member().address()); - } - - @Override - public Collection getMetadata() { - return Collections.singletonList(getMetadataAsString()); - } - - @Override - public String getMetadataAsString() { - return String.valueOf(cluster.metadataStore.metadata().map(Object::toString).orElse(null)); - } - - @Override - protected Class getBeanType() { - return MonitorMBean.class; - } - - @Override - protected String getObjectName() { - return "io.scalecube.cluster:name=Cluster@" + cluster.member().id(); - } - } - private static class SenderAwareTransport implements Transport { private final Transport transport; diff --git a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java index 0ba35ab6..362b63cd 100644 --- a/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/fdetector/FailureDetectorImpl.java @@ -89,11 +89,11 @@ public FailureDetectorImpl( // Subscribe actionsDisposables.addAll( Arrays.asList( - membershipProcessor // + membershipProcessor // Listen membership events to update remoteMembers .publishOn(scheduler) .subscribe(this::onMemberEvent, this::onError), transport - .listen() // + .listen() // Listen failure detector requests .publishOn(scheduler) .subscribe(this::onMessage, this::onError))); } diff --git a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java index e28d568b..ef6d5a34 100644 --- a/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/gossip/GossipProtocolImpl.java @@ -91,11 +91,11 @@ public GossipProtocolImpl( // Subscribe actionsDisposables.addAll( Arrays.asList( - membershipProcessor // + membershipProcessor // Listen membership events to update remoteMembers .publishOn(scheduler) .subscribe(this::onMemberEvent, this::onError), transport - .listen() + .listen() // Listen gossip requests .publishOn(scheduler) .filter(this::isGossipReq) .subscribe(this::onGossipReq, this::onError))); diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index 5bea1087..edd2b007 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -3,7 +3,6 @@ import static io.scalecube.cluster.membership.MemberStatus.ALIVE; import static io.scalecube.cluster.membership.MemberStatus.DEAD; -import io.scalecube.cluster.AbstractMonitorMBean; import io.scalecube.cluster.ClusterConfig; import io.scalecube.cluster.ClusterMath; import io.scalecube.cluster.CorrelationIdGenerator; @@ -13,6 +12,7 @@ import io.scalecube.cluster.fdetector.FailureDetectorEvent; import io.scalecube.cluster.gossip.GossipProtocol; import io.scalecube.cluster.metadata.MetadataStore; +import io.scalecube.cluster.monitor.ClusterMonitorModel; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.net.Address; @@ -78,11 +78,13 @@ private enum MembershipUpdateReason { private final GossipProtocol gossipProtocol; private final MetadataStore metadataStore; private final CorrelationIdGenerator cidGenerator; + private final ClusterMonitorModel.Builder monitorModelBuilder; // State private final Map membershipTable = new HashMap<>(); private final Map members = new HashMap<>(); + private final List removedMembersHistory = new CopyOnWriteArrayList<>(); // Subject @@ -108,6 +110,7 @@ private enum MembershipUpdateReason { * @param config cluster config parameters * @param scheduler scheduler * @param cidGenerator correlation id generator + * @param monitorModelBuilder monitor model builder */ public MembershipProtocolImpl( Member localMember, @@ -117,7 +120,8 @@ public MembershipProtocolImpl( MetadataStore metadataStore, ClusterConfig config, Scheduler scheduler, - CorrelationIdGenerator cidGenerator) { + CorrelationIdGenerator cidGenerator, + ClusterMonitorModel.Builder monitorModelBuilder) { this.transport = Objects.requireNonNull(transport); this.failureDetector = Objects.requireNonNull(failureDetector); @@ -126,6 +130,7 @@ public MembershipProtocolImpl( this.localMember = Objects.requireNonNull(localMember); this.scheduler = Objects.requireNonNull(scheduler); this.cidGenerator = Objects.requireNonNull(cidGenerator); + this.monitorModelBuilder = Objects.requireNonNull(monitorModelBuilder); this.membershipConfig = Objects.requireNonNull(config).membershipConfig(); this.failureDetectorConfig = Objects.requireNonNull(config).failureDetectorConfig(); @@ -140,23 +145,21 @@ public MembershipProtocolImpl( actionsDisposables.addAll( Arrays.asList( - // Listen to incoming SYNC and SYNC ACK requests from other members transport - .listen() // + .listen() // Listen to incoming SYNC and SYNC ACK requests from other members .publishOn(scheduler) .subscribe(this::onMessage, this::onError), - - // Listen to events from failure detector failureDetector - .listen() + .listen() // Listen to events from failure detector .publishOn(scheduler) .subscribe(this::onFailureDetectorEvent, this::onError), - - // Listen to membership gossips gossipProtocol - .listen() + .listen() // Listen to membership gossips .publishOn(scheduler) - .subscribe(this::onMembershipGossip, this::onError))); + .subscribe(this::onMembershipGossip, this::onError), + listen() // Listen removed members for monitoring + .filter(MembershipEvent::isRemoved) + .subscribe(this::onMemberRemoved))); } // Remove duplicates and local address @@ -211,10 +214,18 @@ public Mono leaveCluster() { @Override public Mono start() { // Make initial sync with all seed members - return Mono.create(this::start0).then(startJmxMonitor()); + return Mono.create(this::start0).then(); } private void start0(MonoSink sink) { + // Prepare monitor model + monitorModelBuilder + .seedMembers(seedMembers) + .incarnationSupplier(this::getIncarnation) + .aliveMembersSupplier(this::getAliveMembers) + .suspectedMembersSupplier(this::getSuspectedMembers) + .removedMembersSupplier(this::getRemovedMembers); + // In case no members at the moment just schedule periodic sync if (seedMembers.isEmpty()) { schedulePeriodicSync(); @@ -251,10 +262,6 @@ private void start0(MonoSink sink) { null, ex -> LOGGER.debug("Exception on initial SyncAck, cause: {}", ex.toString())); } - private Mono startJmxMonitor() { - return Mono.fromCallable(() -> AbstractMonitorMBean.register(new JmxMonitorMBean(this))).then(); - } - @Override public void stop() { // Stop accepting requests, events and sending sync @@ -721,102 +728,44 @@ List getMembershipRecords() { return Collections.unmodifiableList(new ArrayList<>(membershipTable.values())); } - @SuppressWarnings("unused") - public interface MonitorMBean { - - int getIncarnation(); - - List getAliveMembers(); - - String getAliveMembersAsString(); - - List getSuspectedMembers(); + // =============================================================== + // ============== Helper Methods for Monitoring ================== + // =============================================================== - String getSuspectedMembersAsString(); - - List getDeadMembers(); - - String getDeadMembersAsString(); + private int getIncarnation() { + return membershipTable.get(localMember.id()).incarnation(); } - public static class JmxMonitorMBean extends AbstractMonitorMBean implements MonitorMBean { - - public static final int REMOVED_MEMBERS_HISTORY_SIZE = 42; - - private final MembershipProtocolImpl membershipProtocol; - private final List removedMembersHistory; - - private JmxMonitorMBean(MembershipProtocolImpl membershipProtocol) { - this.membershipProtocol = membershipProtocol; - this.removedMembersHistory = new CopyOnWriteArrayList<>(); - - membershipProtocol - .listen() - .filter(MembershipEvent::isRemoved) - .subscribe( - event -> { - removedMembersHistory.add(event); - if (removedMembersHistory.size() > REMOVED_MEMBERS_HISTORY_SIZE) { - removedMembersHistory.remove(0); - } - }); - } - - @Override - public int getIncarnation() { - Map membershipTable = membershipProtocol.membershipTable; - String localMemberId = membershipProtocol.localMember.id(); - return membershipTable.get(localMemberId).incarnation(); - } - - @Override - public List getAliveMembers() { - return findRecordsByCondition(MembershipRecord::isAlive); - } - - @Override - public String getAliveMembersAsString() { - return getAliveMembers().stream().collect(Collectors.joining(",", "[", "]")); - } - - @Override - public List getSuspectedMembers() { - return findRecordsByCondition(MembershipRecord::isSuspect); - } - - @Override - public String getSuspectedMembersAsString() { - return getSuspectedMembers().stream().collect(Collectors.joining(",", "[", "]")); - } + private List getAliveMembers() { + return findRecordsByCondition(MembershipRecord::isAlive); + } - @Override - public List getDeadMembers() { - return removedMembersHistory.stream() - .map(MembershipEvent::toString) - .collect(Collectors.toList()); - } + private List getSuspectedMembers() { + return findRecordsByCondition(MembershipRecord::isSuspect); + } - @Override - public String getDeadMembersAsString() { - return getDeadMembers().stream().collect(Collectors.joining(",", "[", "]")); - } + private List getRemovedMembers() { + return removedMembersHistory // + .stream() + .map(MembershipEvent::member) + .collect(Collectors.toList()); + } - private List findRecordsByCondition(Predicate condition) { - return membershipProtocol.getMembershipRecords().stream() - .filter(condition) - .map(MembershipRecord::member) - .map(Member::toString) - .collect(Collectors.toList()); - } + private List findRecordsByCondition(Predicate condition) { + return getMembershipRecords().stream() + .filter(condition) + .map(MembershipRecord::member) + .collect(Collectors.toList()); + } - @Override - protected Class getBeanType() { - return MonitorMBean.class; + private void onMemberRemoved(MembershipEvent event) { + int s = membershipConfig.removedMembersHistorySize(); + if (s <= 0) { + return; } - - @Override - protected String getObjectName() { - return "io.scalecube.cluster:name=Membership@" + membershipProtocol.localMember.id(); + removedMembersHistory.add(event); + if (removedMembersHistory.size() > s) { + removedMembersHistory.remove(0); } } } diff --git a/cluster/src/main/java/io/scalecube/cluster/monitor/ClusterMonitorMBean.java b/cluster/src/main/java/io/scalecube/cluster/monitor/ClusterMonitorMBean.java new file mode 100644 index 00000000..24eaeeb7 --- /dev/null +++ b/cluster/src/main/java/io/scalecube/cluster/monitor/ClusterMonitorMBean.java @@ -0,0 +1,22 @@ +package io.scalecube.cluster.monitor; + +public interface ClusterMonitorMBean { + + String getClusterConfig(); + + int getClusterSize(); + + int getMemberIncarnation(); + + String getMember(); + + String getMetadata(); + + String getSeedMembers(); + + String getAliveMembers(); + + String getSuspectedMembers(); + + String getRemovedMembers(); +} diff --git a/cluster/src/main/java/io/scalecube/cluster/monitor/ClusterMonitorModel.java b/cluster/src/main/java/io/scalecube/cluster/monitor/ClusterMonitorModel.java new file mode 100644 index 00000000..ef1fe11a --- /dev/null +++ b/cluster/src/main/java/io/scalecube/cluster/monitor/ClusterMonitorModel.java @@ -0,0 +1,115 @@ +package io.scalecube.cluster.monitor; + +import io.scalecube.cluster.Cluster; +import io.scalecube.cluster.ClusterConfig; +import io.scalecube.cluster.Member; +import io.scalecube.net.Address; +import java.util.List; +import java.util.function.Supplier; + +public class ClusterMonitorModel { + + private Cluster cluster; + private ClusterConfig config; + private Supplier incarnationSupplier; + private List
seedMembers; + private Supplier> aliveMembersSupplier; + private Supplier> suspectedMembersSupplier; + private Supplier> removedMembersSupplier; + + private ClusterMonitorModel(Builder builder) { + this.cluster = builder.cluster; + this.config = builder.config; + this.incarnationSupplier = builder.incarnationSupplier; + this.seedMembers = builder.seedMembers; + this.aliveMembersSupplier = builder.aliveMembersSupplier; + this.suspectedMembersSupplier = builder.suspectedMembersSupplier; + this.removedMembersSupplier = builder.removedMembersSupplier; + } + + public ClusterConfig config() { + return config; + } + + public int incarnation() { + return incarnationSupplier.get(); + } + + public int clusterSize() { + return cluster.otherMembers().size() + 1; + } + + public Member member() { + return cluster.member(); + } + + public Object metadata() { + return cluster.metadata().orElse(null); + } + + public List
seedMembers() { + return seedMembers; + } + + public List aliveMembers() { + return aliveMembersSupplier.get(); + } + + public List suspectedMembers() { + return suspectedMembersSupplier.get(); + } + + public List removedMembers() { + return removedMembersSupplier.get(); + } + + public static class Builder { + + private Cluster cluster; + private ClusterConfig config; + private Supplier incarnationSupplier; + private List
seedMembers; + private Supplier> aliveMembersSupplier; + private Supplier> suspectedMembersSupplier; + private Supplier> removedMembersSupplier; + + public ClusterMonitorModel build() { + return new ClusterMonitorModel(this); + } + + public Builder config(ClusterConfig config) { + this.config = config; + return this; + } + + public Builder cluster(Cluster cluster) { + this.cluster = cluster; + return this; + } + + public Builder incarnationSupplier(Supplier incarnationSupplier) { + this.incarnationSupplier = incarnationSupplier; + return this; + } + + public Builder seedMembers(List
seedMembers) { + this.seedMembers = seedMembers; + return this; + } + + public Builder aliveMembersSupplier(Supplier> aliveMembersSupplier) { + this.aliveMembersSupplier = aliveMembersSupplier; + return this; + } + + public Builder suspectedMembersSupplier(Supplier> suspectedMembersSupplier) { + this.suspectedMembersSupplier = suspectedMembersSupplier; + return this; + } + + public Builder removedMembersSupplier(Supplier> removedMembersSupplier) { + this.removedMembersSupplier = removedMembersSupplier; + return this; + } + } +} diff --git a/cluster/src/main/java/io/scalecube/cluster/monitor/JmxClusterMonitorMBean.java b/cluster/src/main/java/io/scalecube/cluster/monitor/JmxClusterMonitorMBean.java new file mode 100644 index 00000000..5b120e22 --- /dev/null +++ b/cluster/src/main/java/io/scalecube/cluster/monitor/JmxClusterMonitorMBean.java @@ -0,0 +1,69 @@ +package io.scalecube.cluster.monitor; + +import io.scalecube.cluster.Member; +import io.scalecube.net.Address; +import java.util.Optional; +import java.util.stream.Collectors; + +public class JmxClusterMonitorMBean implements ClusterMonitorMBean { + + private final ClusterMonitorModel monitorModel; + + public JmxClusterMonitorMBean(ClusterMonitorModel monitorModel) { + this.monitorModel = monitorModel; + } + + @Override + public String getClusterConfig() { + return String.valueOf(monitorModel.config()); + } + + @Override + public int getClusterSize() { + return monitorModel.clusterSize(); + } + + @Override + public int getMemberIncarnation() { + return monitorModel.incarnation(); + } + + @Override + public String getMember() { + return String.valueOf(monitorModel.member()); + } + + @Override + public String getMetadata() { + return String.valueOf( + Optional.ofNullable(monitorModel.metadata()).map(Object::toString).orElse(null)); + } + + @Override + public String getSeedMembers() { + return monitorModel.seedMembers().stream() + .map(Address::toString) + .collect(Collectors.joining(",", "[", "]")); + } + + @Override + public String getAliveMembers() { + return monitorModel.aliveMembers().stream() + .map(Member::toString) + .collect(Collectors.joining(",", "[", "]")); + } + + @Override + public String getSuspectedMembers() { + return monitorModel.suspectedMembers().stream() + .map(Member::toString) + .collect(Collectors.joining(",", "[", "]")); + } + + @Override + public String getRemovedMembers() { + return monitorModel.removedMembers().stream() + .map(Member::toString) + .collect(Collectors.joining(",", "[", "]")); + } +} diff --git a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java index 503c6181..b4e236c1 100644 --- a/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/membership/MembershipProtocolTest.java @@ -10,6 +10,7 @@ import io.scalecube.cluster.fdetector.FailureDetectorImpl; import io.scalecube.cluster.gossip.GossipProtocolImpl; import io.scalecube.cluster.metadata.MetadataStoreImpl; +import io.scalecube.cluster.monitor.ClusterMonitorModel; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.cluster.utils.NetworkEmulator; @@ -965,7 +966,8 @@ private MembershipProtocolImpl createMembership(Transport transport, ClusterConf metadataStore, config, scheduler, - cidGenerator); + cidGenerator, + new ClusterMonitorModel.Builder()); membership.listen().subscribe(membershipSink::next);