Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class MembershipConfig implements Cloneable {
private int syncTimeout = DEFAULT_SYNC_TIMEOUT;
private int suspicionMult = DEFAULT_SUSPICION_MULT;
private String syncGroup = "default";
private int removedMembersHistorySize = 42;

public MembershipConfig() {}

Expand Down Expand Up @@ -157,6 +158,22 @@ public MembershipConfig syncGroup(String syncGroup) {
return m;
}

public int removedMembersHistorySize() {
return removedMembersHistorySize;
}

/**
* Sets a removedMembersHistorySize.
*
* @param removedMembersHistorySize history size for remove members
* @return new {@code MembershipConfig} instance
*/
public MembershipConfig removedMembersHistorySize(int removedMembersHistorySize) {
MembershipConfig m = clone();
m.removedMembersHistorySize = removedMembersHistorySize;
return m;
}

@Override
public MembershipConfig clone() {
try {
Expand All @@ -174,6 +191,7 @@ public String toString() {
.add("syncTimeout=" + syncTimeout)
.add("suspicionMult=" + suspicionMult)
.add("syncGroup='" + syncGroup + "'")
.add("removedMembersHistorySize=" + removedMembersHistorySize)
.toString();
}
}

This file was deleted.

110 changes: 27 additions & 83 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
import io.scalecube.cluster.membership.MembershipProtocolImpl;
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.metadata.MetadataStoreImpl;
import io.scalecube.cluster.monitor.ClusterMonitorMBean;
import io.scalecube.cluster.monitor.ClusterMonitorModel;
import io.scalecube.cluster.monitor.JmxClusterMonitorMBean;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.TransportImpl;
import java.lang.management.ManagementFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
Expand All @@ -23,6 +27,10 @@
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.MBeanServer;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
Expand Down Expand Up @@ -82,6 +90,7 @@ public final class ClusterImpl implements Cluster {
private MetadataStore metadataStore;
private Scheduler scheduler;
private CorrelationIdGenerator cidGenerator;
private ClusterMonitorModel.Builder monitorModelBuilder;

public ClusterImpl() {
this(ClusterConfig.defaultConfig());
Expand Down Expand Up @@ -228,6 +237,7 @@ private Mono<Cluster> doStart0() {

cidGenerator = new CorrelationIdGenerator(localMember.id());
scheduler = Schedulers.newSingle("sc-cluster-" + localMember.address().port(), true);
monitorModelBuilder = new ClusterMonitorModel.Builder();

failureDetector =
new FailureDetectorImpl(
Expand Down Expand Up @@ -259,14 +269,16 @@ private Mono<Cluster> doStart0() {
metadataStore,
config,
scheduler,
cidGenerator);
cidGenerator,
monitorModelBuilder);

actionsDisposables.add(
// Retransmit inner membership events to public api layer
membership
.listen()
/*.publishOn(scheduler)*/
// dont uncomment, already beign executed inside sc-cluster thread
.subscribe(membershipSink::next, this::onError));
// Dont uncomment, already beign executed inside sc-cluster thread
.subscribe(membershipSink::next, this::onError, membershipSink::complete));

return Mono.fromRunnable(() -> failureDetector.start())
.then(Mono.fromRunnable(() -> gossip.start()))
Expand Down Expand Up @@ -301,7 +313,18 @@ private void startHandler() {
}

private Mono<Void> startJmxMonitor() {
return Mono.fromCallable(() -> AbstractMonitorMBean.register(new JmxMonitorMBean(this))).then();
return Mono.fromCallable(this::startJmxMonitor0).then();
}

private ObjectInstance startJmxMonitor0() throws Exception {
ClusterMonitorModel monitorModel = monitorModelBuilder.config(config).cluster(this).build();

JmxClusterMonitorMBean bean = new JmxClusterMonitorMBean(monitorModel);
StandardMBean standardMBean = new StandardMBean(bean, ClusterMonitorMBean.class);
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName = new ObjectName("io.scalecube.cluster:name=Cluster@" + member().id());

return server.registerMBean(standardMBean, objectName);
}

private void onError(Throwable th) {
Expand Down Expand Up @@ -481,85 +504,6 @@ public boolean isShutdown() {
return onShutdown.isDisposed();
}

@SuppressWarnings("unused")
public interface MonitorMBean {

Collection<String> getId();

String getIdAsString();

Collection<String> getAlias();

String getAliasAsString();

Collection<String> getAddress();

String getAddressAsString();

Collection<String> getMetadata();

String getMetadataAsString();
}

public static class JmxMonitorMBean extends AbstractMonitorMBean implements MonitorMBean {

private final ClusterImpl cluster;

private JmxMonitorMBean(ClusterImpl cluster) {
this.cluster = cluster;
}

@Override
public Collection<String> getId() {
return Collections.singleton(getIdAsString());
}

@Override
public String getIdAsString() {
return cluster.member().id();
}

@Override
public Collection<String> getAlias() {
return Collections.singleton(getAliasAsString());
}

@Override
public String getAliasAsString() {
return cluster.member().alias();
}

@Override
public Collection<String> getAddress() {
return Collections.singleton(getAddressAsString());
}

@Override
public String getAddressAsString() {
return String.valueOf(cluster.member().address());
}

@Override
public Collection<String> getMetadata() {
return Collections.singletonList(getMetadataAsString());
}

@Override
public String getMetadataAsString() {
return String.valueOf(cluster.metadataStore.metadata().map(Object::toString).orElse(null));
}

@Override
protected Class getBeanType() {
return MonitorMBean.class;
}

@Override
protected String getObjectName() {
return "io.scalecube.cluster:name=Cluster@" + cluster.member().id();
}
}

private static class SenderAwareTransport implements Transport {

private final Transport transport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ public FailureDetectorImpl(
// Subscribe
actionsDisposables.addAll(
Arrays.asList(
membershipProcessor //
membershipProcessor // Listen membership events to update remoteMembers
.publishOn(scheduler)
.subscribe(this::onMemberEvent, this::onError),
transport
.listen() //
.listen() // Listen failure detector requests
.publishOn(scheduler)
.subscribe(this::onMessage, this::onError)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ public GossipProtocolImpl(
// Subscribe
actionsDisposables.addAll(
Arrays.asList(
membershipProcessor //
membershipProcessor // Listen membership events to update remoteMembers
.publishOn(scheduler)
.subscribe(this::onMemberEvent, this::onError),
transport
.listen()
.listen() // Listen gossip requests
.publishOn(scheduler)
.filter(this::isGossipReq)
.subscribe(this::onGossipReq, this::onError)));
Expand Down
Loading