Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package io.scalecube.cluster;

import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.fdetector.FailureDetectorImpl;
import io.scalecube.cluster.gossip.GossipConfig;
import io.scalecube.cluster.gossip.GossipProtocolImpl;
import io.scalecube.cluster.membership.MembershipConfig;
import io.scalecube.cluster.membership.MembershipEvent;
import io.scalecube.cluster.membership.MembershipProtocolImpl;
import io.scalecube.cluster.metadata.MetadataStore;
import io.scalecube.cluster.metadata.MetadataStoreImpl;
import io.scalecube.cluster.transport.api.Message;
import io.scalecube.cluster.transport.api.Transport;
import io.scalecube.cluster.transport.api.TransportConfig;
import io.scalecube.net.Address;
import io.scalecube.transport.netty.TransportImpl;
import java.lang.management.ManagementFactory;
Expand Down Expand Up @@ -129,6 +133,58 @@ public ClusterImpl config(UnaryOperator<ClusterConfig> options) {
return cluster;
}

/**
* Returns a new cluster's instance which will apply the given options.
*
* @param options transport config options
* @return new {@code ClusterImpl} instance
*/
public ClusterImpl transport(UnaryOperator<TransportConfig> options) {
Objects.requireNonNull(options);
ClusterImpl cluster = new ClusterImpl(this);
cluster.config = config.transport(options);
return cluster;
}

/**
* Returns a new cluster's instance which will apply the given options.
*
* @param options failureDetector config options
* @return new {@code ClusterImpl} instance
*/
public ClusterImpl failureDetector(UnaryOperator<FailureDetectorConfig> options) {
Objects.requireNonNull(options);
ClusterImpl cluster = new ClusterImpl(this);
cluster.config = config.failureDetector(options);
return cluster;
}

/**
* Returns a new cluster's instance which will apply the given options.
*
* @param options gossip config options
* @return new {@code ClusterImpl} instance
*/
public ClusterImpl gossip(UnaryOperator<GossipConfig> options) {
Objects.requireNonNull(options);
ClusterImpl cluster = new ClusterImpl(this);
cluster.config = config.gossip(options);
return cluster;
}

/**
* Returns a new cluster's instance which will apply the given options.
*
* @param options membership config options
* @return new {@code ClusterImpl} instance
*/
public ClusterImpl membership(UnaryOperator<MembershipConfig> options) {
Objects.requireNonNull(options);
ClusterImpl cluster = new ClusterImpl(this);
cluster.config = config.membership(options);
return cluster;
}

/**
* Returns a new cluster's instance with given handler. The previous handler will be replaced.
*
Expand Down
77 changes: 26 additions & 51 deletions cluster/src/test/java/io/scalecube/cluster/ClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ public void testMembersAccessFromScheduler() {
// Start seed node
Cluster seedNode = new ClusterImpl().startAwait();
Cluster otherNode =
new ClusterImpl()
.config(config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
.startAwait();
new ClusterImpl().membership(opts -> opts.seedMembers(seedNode.address())).startAwait();

assertEquals(2, seedNode.members().size());
assertEquals(2, otherNode.members().size());
Expand All @@ -53,15 +51,13 @@ public void testMembersAccessFromScheduler() {

@Test
public void testJoinLocalhostIgnored() {
Address[] addresses = {Address.from("localhost:4801"), Address.from("127.0.0.1:4801")};

// Start seed node
Cluster seedNode =
new ClusterImpl(
new ClusterConfig()
.transport(opts -> opts.port(4801).connectTimeout(500))
.membership(
opts ->
opts.seedMembers(
Address.from("localhost:4801"), Address.from("127.0.0.1:4801"))))
new ClusterImpl()
.transport(opts -> opts.port(4801).connectTimeout(500))
.membership(opts -> opts.seedMembers(addresses))
.startAwait();

Collection<Member> otherMembers = seedNode.otherMembers();
Expand All @@ -72,12 +68,9 @@ public void testJoinLocalhostIgnored() {
public void testJoinLocalhostIgnoredWithOverride() {
// Start seed node
Cluster seedNode =
new ClusterImpl(
new ClusterConfig()
.memberHost("localhost")
.memberPort(7878)
.transport(opts -> opts.port(7878).connectTimeout(500))
.membership(opts -> opts.seedMembers(Address.from("localhost:7878"))))
new ClusterImpl(new ClusterConfig().memberHost("localhost").memberPort(7878))
.transport(opts -> opts.port(7878).connectTimeout(500))
.membership(opts -> opts.seedMembers(Address.from("localhost:7878")))
.startAwait();

Collection<Member> otherMembers = seedNode.otherMembers();
Expand All @@ -97,7 +90,7 @@ public void testJoinDynamicPort() {
for (int i = 0; i < membersNum; i++) {
otherNodes.add(
new ClusterImpl()
.config(config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
.membership(opts -> opts.seedMembers(seedNode.address()))
.startAwait());
}
LOGGER.info("Start up time: {} ms", System.currentTimeMillis() - startAt);
Expand Down Expand Up @@ -129,20 +122,16 @@ public void testUpdateMetadata() throws Exception {
metadata.put("key2", "value2");
metadataNode =
new ClusterImpl()
.config(
config ->
config
.membership(opts -> opts.seedMembers(seedNode.address()))
.metadata(metadata))
.config(opts -> opts.metadata(metadata))
.membership(opts -> opts.seedMembers(seedNode.address()))
.startAwait();

// Start other test members
Flux.range(0, testMembersNum)
.flatMap(
integer ->
new ClusterImpl()
.config(
config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
.membership(opts -> opts.seedMembers(seedNode.address()))
.handler(
cluster ->
new ClusterMessageHandler() {
Expand Down Expand Up @@ -206,20 +195,16 @@ public void testUpdateMetadataProperty() throws Exception {
metadata.put("key2", "value2");
metadataNode =
new ClusterImpl()
.config(
config ->
config
.membership(opts -> opts.seedMembers(seedNode.address()))
.metadata(metadata))
.config(opts -> opts.metadata(metadata))
.membership(opts -> opts.seedMembers(seedNode.address()))
.startAwait();

// Start other test members
Flux.range(0, testMembersNum)
.flatMap(
integer ->
new ClusterImpl()
.config(
config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
.membership(opts -> opts.seedMembers(seedNode.address()))
.handler(
cluster ->
new ClusterMessageHandler() {
Expand Down Expand Up @@ -288,20 +273,16 @@ public void testRemoveMetadataProperty() throws Exception {
metadata.put("key2", "value2");
metadataNode =
new ClusterImpl()
.config(
config ->
config
.membership(opts -> opts.seedMembers(seedNode.address()))
.metadata(metadata))
.config(opts -> opts.metadata(metadata))
.membership(opts -> opts.seedMembers(seedNode.address()))
.startAwait();

// Start other test members
Flux.range(0, testMembersNum)
.flatMap(
integer ->
new ClusterImpl()
.config(
config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
.membership(opts -> opts.seedMembers(seedNode.address()))
.handler(
cluster ->
new ClusterMessageHandler() {
Expand Down Expand Up @@ -374,17 +355,17 @@ public void onMembershipEvent(MembershipEvent event) {
// Start nodes
final Cluster node1 =
new ClusterImpl()
.config(config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
.membership(opts -> opts.seedMembers(seedNode.address()))
.handler(cluster -> listener)
.startAwait();
final Cluster node2 =
new ClusterImpl()
.config(config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
.membership(opts -> opts.seedMembers(seedNode.address()))
.handler(cluster -> listener)
.startAwait();
final Cluster node3 =
new ClusterImpl()
.config(config -> config.membership(opts -> opts.seedMembers(seedNode.address())))
.membership(opts -> opts.seedMembers(seedNode.address()))
.handler(cluster -> listener)
.startAwait();

Expand All @@ -405,7 +386,7 @@ public void testMemberMetadataRemoved() throws InterruptedException {
seedMetadata.put("seed", "shmid");
final Cluster seedNode =
new ClusterImpl()
.config(options -> options.metadata(seedMetadata))
.config(opts -> opts.metadata(seedMetadata))
.handler(
cluster ->
new ClusterMessageHandler() {
Expand All @@ -423,11 +404,8 @@ public void onMembershipEvent(MembershipEvent event) {
ReplayProcessor<MembershipEvent> node1Events = ReplayProcessor.create();
final Cluster node1 =
new ClusterImpl()
.config(
config ->
config
.membership(opts -> opts.seedMembers(seedNode.address()))
.metadata(node1Metadata))
.config(opts -> opts.metadata(node1Metadata))
.membership(opts -> opts.seedMembers(seedNode.address()))
.handler(
cluster ->
new ClusterMessageHandler() {
Expand Down Expand Up @@ -478,10 +456,7 @@ public void testJoinSeedClusterWithNoExistingSeedMember() {
Address nonExistingSeed2 = Address.from("localhost:5678");
Address[] seeds = new Address[] {nonExistingSeed1, nonExistingSeed2, seedNode.address()};

Cluster otherNode =
new ClusterImpl()
.config(config -> config.membership(opts -> opts.seedMembers(seeds)))
.startAwait();
Cluster otherNode = new ClusterImpl().membership(opts -> opts.seedMembers(seeds)).startAwait();

assertEquals(otherNode.member(), seedNode.otherMembers().iterator().next());
assertEquals(seedNode.member(), otherNode.otherMembers().iterator().next());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,31 @@ public static void main(String[] args) throws Exception {
// Start seed cluster member Alice
Cluster alice =
new ClusterImpl()
.config(config -> config.metadataDecoder(new LongMetadataDecoder()))
.config(opts -> opts.metadataDecoder(new LongMetadataDecoder()))
.startAwait();
System.out.println(
"[" + alice.member().id() + "] Alice's metadata: " + alice.metadata().orElse(null));

Cluster joe =
new ClusterImpl()
.config(
config ->
config
.membership(opts -> opts.seedMembers(alice.address()))
.metadataDecoder(new LongMetadataDecoder())
opts ->
opts.metadataDecoder(new LongMetadataDecoder())
.metadataEncoder(new LongMetadataEncoder())
.metadata(123L))
.membership(opts -> opts.seedMembers(alice.address()))
.startAwait();
System.out.println(
"[" + joe.member().id() + "] Joe's metadata: " + joe.metadata().orElse(null));

Cluster bob =
new ClusterImpl()
.config(
config ->
config
.membership(opts -> opts.seedMembers(alice.address()))
.metadataDecoder(new LongMetadataDecoder())
opts ->
opts.metadataDecoder(new LongMetadataDecoder())
.metadataEncoder(new LongMetadataEncoder())
.metadata(456L))
.membership(opts -> opts.seedMembers(alice.address()))
.startAwait();
System.out.println(
"[" + bob.member().id() + "] Bob's metadata: " + bob.metadata().orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,20 @@
public class ClusterJoinExamples {

/** Main method. */
public static void main(String[] args) throws Exception {
public static void main(String[] args) {
// Start seed member Alice
Cluster alice = new ClusterImpl().startAwait();

// Join Bob to cluster with Alice
Cluster bob =
new ClusterImpl()
.config(config -> config.membership(opts -> opts.seedMembers(alice.address())))
.startAwait();
new ClusterImpl().membership(opts -> opts.seedMembers(alice.address())).startAwait();

// Join Carol to cluster with metadata
Map<String, String> metadata = Collections.singletonMap("name", "Carol");
Cluster carol =
new ClusterImpl()
.config(
config ->
config.membership(opts -> opts.seedMembers(alice.address())).metadata(metadata))
.config(opts -> opts.metadata(metadata))
.membership(opts -> opts.seedMembers(alice.address()))
.startAwait();

// Start Dan on port 3000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ public static void main(String[] args) throws Exception {
//noinspection unused
Cluster joe =
new ClusterImpl()
.config(
config ->
config
.membership(opts -> opts.seedMembers(alice.address()))
.metadata(Collections.singletonMap("name", "Joe")))
.config(opts -> opts.metadata(Collections.singletonMap("name", "Joe")))
.membership(opts -> opts.seedMembers(alice.address()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
Expand Down
10 changes: 4 additions & 6 deletions examples/src/main/java/io/scalecube/examples/GossipExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void onGossip(Message gossip) {
//noinspection unused
Cluster bob =
new ClusterImpl()
.config(config -> config.membership(opts -> opts.seedMembers(alice.address())))
.membership(opts -> opts.seedMembers(alice.address()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
Expand All @@ -47,7 +47,7 @@ public void onGossip(Message gossip) {
//noinspection unused
Cluster carol =
new ClusterImpl()
.config(config -> config.membership(opts -> opts.seedMembers(alice.address())))
.membership(opts -> opts.seedMembers(alice.address()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
Expand All @@ -62,7 +62,7 @@ public void onGossip(Message gossip) {
//noinspection unused
Cluster dan =
new ClusterImpl()
.config(config -> config.membership(opts -> opts.seedMembers(alice.address())))
.membership(opts -> opts.seedMembers(alice.address()))
.handler(
cluster -> {
return new ClusterMessageHandler() {
Expand All @@ -76,9 +76,7 @@ public void onGossip(Message gossip) {

// Start cluster node Eve that joins cluster and spreads gossip
Cluster eve =
new ClusterImpl()
.config(config -> config.membership(opts -> opts.seedMembers(alice.address())))
.startAwait();
new ClusterImpl().membership(opts -> opts.seedMembers(alice.address())).startAwait();
eve.spreadGossip(Message.fromData("Gossip from Eve"))
.doOnError(System.err::println)
.subscribe(null, Throwable::printStackTrace);
Expand Down
Loading