Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public final class ClusterConfig implements Cloneable {
private MetadataEncoder metadataEncoder = MetadataEncoder.INSTANCE;
private MetadataDecoder metadataDecoder = MetadataDecoder.INSTANCE;

private String memberId;
private String memberAlias;
private String memberHost;
private Integer memberPort;

Expand Down Expand Up @@ -168,19 +168,19 @@ public ClusterConfig memberHost(String memberHost) {
return c;
}

public String memberId() {
return memberId;
public String memberAlias() {
return memberAlias;
}

/**
* Sets a memberId.
* Sets a memberAlias.
*
* @param memberId member id
* @param memberAlias member alias
* @return new {@code ClusterConfig} instance
*/
public ClusterConfig memberId(String memberId) {
public ClusterConfig memberAlias(String memberAlias) {
ClusterConfig c = clone();
c.memberId = memberId;
c.memberAlias = memberAlias;
return c;
}

Expand Down Expand Up @@ -285,7 +285,7 @@ public String toString() {
.add("metadataTimeout=" + metadataTimeout)
.add("metadataEncoder=" + metadataEncoder)
.add("metadataDecoder=" + metadataDecoder)
.add("memberId='" + memberId + "'")
.add("memberAlias='" + memberAlias + "'")
.add("memberHost='" + memberHost + "'")
.add("memberPort=" + memberPort)
.add("transportConfig=" + transportConfig)
Expand Down
35 changes: 18 additions & 17 deletions cluster-api/src/main/java/io/scalecube/cluster/Member.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,38 @@
public final class Member {

private String id;
private String alias;
private Address address;

/** Instantiates empty member for deserialization purpose. */
Member() {}

/**
* Create instance of cluster member by given address; member id will be generated by {@link
* #generateId()}.
*
* @param address address on which given member listens for incoming messages
*/
public Member(Address address) {
this(generateId(), address);
}

/**
* Create instance of cluster member with given parameters.
* Constructor.
*
* @param id member id
* @param address address on which given member listens for incoming messages
* @param alias member alias (optional)
* @param address member address
*/
public Member(String id, Address address) {
this.id = Objects.requireNonNull(id);
this.address = Objects.requireNonNull(address);
public Member(String id, String alias, Address address) {
this.id = Objects.requireNonNull(id, "member id");
this.alias = alias; // optional
this.address = Objects.requireNonNull(address, "member address");
}

public String id() {
return id;
}

public String alias() {
return alias;
}

public Address address() {
return address;
}

private static String generateId() {
public static String generateId() {
return Long.toHexString(UUID.randomUUID().getMostSignificantBits() & Long.MAX_VALUE);
}

Expand All @@ -68,6 +65,10 @@ public int hashCode() {

@Override
public String toString() {
return id + ":" + address.port();
if (alias == null) {
return id + "@" + address;
} else {
return alias + "/" + id + "@" + address;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.scalecube.cluster;

import java.lang.management.ManagementFactory;
import javax.management.MBeanServer;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.StandardMBean;

public abstract class AbstractMonitorMBean {

/**
* Registers monitor mbean.
*
* @param other monitor mbean instance
* @return object instance
* @throws Exception in case of error
*/
public static ObjectInstance register(AbstractMonitorMBean other) throws Exception {
Object bean = other.getBeanType().cast(other);
//noinspection unchecked
StandardMBean standardMBean = new StandardMBean(bean, other.getBeanType());
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName = new ObjectName(other.getObjectName());
return server.registerMBean(standardMBean, objectName);
}

/**
* Returns bean type.
*
* @return bean type
*/
protected abstract Class getBeanType();

/**
* Returns jmx object name.
*
* @return object name
*/
protected abstract String getObjectName();
}
80 changes: 52 additions & 28 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.TransportImpl;
import java.lang.management.ManagementFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
Expand All @@ -24,9 +23,6 @@
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
Expand Down Expand Up @@ -277,7 +273,7 @@ private Mono<Cluster> doStart0() {
.then(Mono.fromRunnable(() -> metadataStore.start()))
.then(Mono.fromRunnable(this::startHandler))
.then((membership.start()))
.then(Mono.fromCallable(() -> JmxMonitorMBean.start(this)));
.then(startJmxMonitor());
})
.thenReturn(this);
}
Expand All @@ -304,6 +300,10 @@ private void startHandler() {
actionsDisposables.add(listenGossip().subscribe(handler::onGossip, this::onError));
}

private Mono<Void> startJmxMonitor() {
return Mono.fromCallable(() -> AbstractMonitorMBean.register(new JmxMonitorMBean(this))).then();
}

private void onError(Throwable th) {
LOGGER.error("Received unexpected error: ", th);
}
Expand Down Expand Up @@ -340,11 +340,7 @@ private Member createLocalMember(int listenPort) {
.map(memberHost -> Address.create(memberHost, port))
.orElseGet(() -> Address.create(localAddress, listenPort));

if (config.memberId() != null) {
return new Member(config.memberId(), memberAddress);
} else {
return new Member(memberAddress);
}
return new Member(Member.generateId(), config.memberAlias(), memberAddress);
}

@Override
Expand Down Expand Up @@ -485,54 +481,82 @@ public boolean isShutdown() {
return onShutdown.isDisposed();
}

@SuppressWarnings("unused")
public interface MonitorMBean {

Collection<String> getMemberId();
Collection<String> getId();

String getIdAsString();

Collection<String> getAlias();

String getAliasAsString();

String getMemberIdAsString();
Collection<String> getAddress();

String getAddressAsString();

Collection<String> getMetadata();

String getMetadataAsString();
}

public static class JmxMonitorMBean implements MonitorMBean {
public static class JmxMonitorMBean extends AbstractMonitorMBean implements MonitorMBean {

private final ClusterImpl cluster;

private JmxMonitorMBean(ClusterImpl cluster) {
this.cluster = cluster;
}

private static JmxMonitorMBean start(ClusterImpl cluster) throws Exception {
JmxMonitorMBean monitorMBean = new JmxMonitorMBean(cluster);
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
StandardMBean standardMBean = new StandardMBean(monitorMBean, MonitorMBean.class);
ObjectName objectName =
new ObjectName("io.scalecube.cluster:name=Cluster@" + cluster.member().id());
server.registerMBean(standardMBean, objectName);
return monitorMBean;
@Override
public Collection<String> getId() {
return Collections.singleton(getIdAsString());
}

@Override
public String getIdAsString() {
return cluster.member().id();
}

@Override
public Collection<String> getMemberId() {
return Collections.singleton(cluster.member().id());
public Collection<String> getAlias() {
return Collections.singleton(getAliasAsString());
}

@Override
public String getMemberIdAsString() {
return getMemberId().iterator().next();
public String getAliasAsString() {
return cluster.member().alias();
}

@Override
public Collection<String> getAddress() {
return Collections.singleton(getAddressAsString());
}

@Override
public String getAddressAsString() {
return String.valueOf(cluster.member().address());
}

@Override
public Collection<String> getMetadata() {
return Collections.singletonList(
String.valueOf(cluster.metadataStore.metadata().map(Object::toString).orElse(null)));
return Collections.singletonList(getMetadataAsString());
}

@Override
public String getMetadataAsString() {
return getMetadata().iterator().next();
return String.valueOf(cluster.metadataStore.metadata().map(Object::toString).orElse(null));
}

@Override
protected Class getBeanType() {
return MonitorMBean.class;
}

@Override
protected String getObjectName() {
return "io.scalecube.cluster:name=Cluster@" + cluster.member().id();
}
}

Expand Down
Loading