Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.scalecube.cluster.Member;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Objects;
import java.util.StringJoiner;

Expand All @@ -21,37 +22,41 @@ public enum Type {
private final Member member;
private final ByteBuffer oldMetadata;
private final ByteBuffer newMetadata;
private final long timestamp;

private MembershipEvent(
Type type, Member member, ByteBuffer oldMetadata, ByteBuffer newMetadata) {
Type type, Member member, ByteBuffer oldMetadata, ByteBuffer newMetadata, long timestamp) {
this.type = type;
this.member = member;
this.oldMetadata = oldMetadata;
this.newMetadata = newMetadata;
this.timestamp = timestamp;
}

/**
* Creates REMOVED membership event with cluster member and its metadata (optional).
*
* @param member cluster member; not null
* @param metadata member metadata; optional
* @param timestamp event timestamp
* @return membership event
*/
public static MembershipEvent createRemoved(Member member, ByteBuffer metadata) {
public static MembershipEvent createRemoved(Member member, ByteBuffer metadata, long timestamp) {
Objects.requireNonNull(member, "member must be not null");
return new MembershipEvent(Type.REMOVED, member, metadata, null);
return new MembershipEvent(Type.REMOVED, member, metadata, null, timestamp);
}

/**
* Creates ADDED membership event with cluster member and its metadata.
*
* @param member cluster memeber; not null
* @param metadata member metadata; not null
* @param timestamp event timestamp
* @return membership event
*/
public static MembershipEvent createAdded(Member member, ByteBuffer metadata) {
public static MembershipEvent createAdded(Member member, ByteBuffer metadata, long timestamp) {
Objects.requireNonNull(member, "member must be not null");
return new MembershipEvent(Type.ADDED, member, null, metadata);
return new MembershipEvent(Type.ADDED, member, null, metadata, timestamp);
}

/**
Expand All @@ -60,12 +65,13 @@ public static MembershipEvent createAdded(Member member, ByteBuffer metadata) {
* @param member cluster member; not null
* @param oldMetadata previous metadata; not null
* @param newMetadata new metadata; not null
* @param timestamp event timestamp
* @return membership event
*/
public static MembershipEvent createUpdated(
Member member, ByteBuffer oldMetadata, ByteBuffer newMetadata) {
Member member, ByteBuffer oldMetadata, ByteBuffer newMetadata, long timestamp) {
Objects.requireNonNull(member, "member must be not null");
return new MembershipEvent(Type.UPDATED, member, oldMetadata, newMetadata);
return new MembershipEvent(Type.UPDATED, member, oldMetadata, newMetadata, timestamp);
}

public Type type() {
Expand Down Expand Up @@ -96,16 +102,25 @@ public ByteBuffer newMetadata() {
return newMetadata;
}

public long timestamp() {
return timestamp;
}

@Override
public String toString() {
return new StringJoiner(", ", MembershipEvent.class.getSimpleName() + "[", "]")
.add("type=" + type)
.add("member=" + member)
.add("oldMetadata=" + metadataAsString(oldMetadata))
.add("newMetadata=" + metadataAsString(newMetadata))
.add("timestamp=" + timestampAsString(timestamp))
.toString();
}

private String timestampAsString(long timestamp) {
return Instant.ofEpochMilli(timestamp).toString();
}

private String metadataAsString(ByteBuffer metadata) {
if (metadata == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,8 @@ private Mono<Void> onDeadMemberDetected(MembershipRecord r) {
membershipTable.remove(member.id());
// removed
ByteBuffer metadata0 = metadataStore.removeMetadata(member);
MembershipEvent event = MembershipEvent.createRemoved(member, metadata0);
MembershipEvent event =
MembershipEvent.createRemoved(member, metadata0, System.currentTimeMillis());
LOGGER_MEMBERSHIP.debug("Emitting membership event {}", event);
sink.next(event);
});
Expand All @@ -597,10 +598,11 @@ private void onAliveMemberDetected(
boolean memberExists = members.containsKey(member.id());

MembershipEvent event = null;
long timestamp = System.currentTimeMillis();
if (!memberExists) {
event = MembershipEvent.createAdded(member, metadata1);
event = MembershipEvent.createAdded(member, metadata1, timestamp);
} else if (!metadata1.equals(metadata0)) {
event = MembershipEvent.createUpdated(member, metadata0, metadata1);
event = MembershipEvent.createUpdated(member, metadata0, metadata1, timestamp);
}

members.put(member.id(), member);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ private FailureDetectorImpl createFd(
Flux.fromIterable(addresses)
.filter(address -> !transport.address().equals(address))
.map(address -> new Member("member-" + address.port(), null, address))
.map(member -> MembershipEvent.createAdded(member, null));
.map(member -> MembershipEvent.createAdded(member, null, 0));

CorrelationIdGenerator cidGenerator = new CorrelationIdGenerator(localMember.id());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private GossipProtocolImpl initGossipProtocol(Transport transport, List<Address>
Flux.fromIterable(members)
.filter(address -> !transport.address().equals(address))
.map(address -> new Member("member-" + address.port(), null, address))
.map(member -> MembershipEvent.createAdded(member, null));
.map(member -> MembershipEvent.createAdded(member, null, 0));

GossipProtocolImpl gossipProtocol =
new GossipProtocolImpl(localMember, transport, membershipFlux, gossipConfig, scheduler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public static void main(String[] args) throws Exception {
// Alice init cluster
Cluster alice =
new ClusterImpl()
.config(opts -> opts.memberAlias("Alice"))
.config(opts -> opts.metadata(Collections.singletonMap("name", "Alice")))
.handler(
cluster -> {
Expand All @@ -42,6 +43,7 @@ public void onMembershipEvent(MembershipEvent event) {
// Bob join cluster
Cluster bob =
new ClusterImpl()
.config(opts -> opts.memberAlias("Bob"))
.config(opts -> opts.metadata(Collections.singletonMap("name", "Bob")))
.membership(opts -> opts.seedMembers(alice.address()))
.handler(
Expand All @@ -59,6 +61,7 @@ public void onMembershipEvent(MembershipEvent event) {
// Carol join cluster
Cluster carol =
new ClusterImpl()
.config(opts -> opts.memberAlias("Carol"))
.config(opts -> opts.metadata(Collections.singletonMap("name", "Carol")))
.membership(opts -> opts.seedMembers(alice.address(), bob.address()))
.handler(
Expand Down