From e097fbcf5edb5d4b34533f886fb4fb59caad2904 Mon Sep 17 00:00:00 2001 From: artem-v Date: Sat, 7 Sep 2019 16:18:39 +0300 Subject: [PATCH] Created shorter Cluster config api --- .../io/scalecube/cluster/ClusterImpl.java | 56 ++++++++++++++ .../io/scalecube/cluster/ClusterTest.java | 77 +++++++------------ .../ClusterCustomMetadataEncodingExample.java | 16 ++-- .../examples/ClusterJoinExamples.java | 11 +-- .../examples/ClusterMetadataExample.java | 7 +- .../io/scalecube/examples/GossipExample.java | 10 +-- .../examples/MembershipEventsExample.java | 16 ++-- .../scalecube/examples/MessagingExample.java | 6 +- 8 files changed, 106 insertions(+), 93 deletions(-) diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index e91bcd47..373d56b4 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -1,13 +1,17 @@ package io.scalecube.cluster; +import io.scalecube.cluster.fdetector.FailureDetectorConfig; import io.scalecube.cluster.fdetector.FailureDetectorImpl; +import io.scalecube.cluster.gossip.GossipConfig; import io.scalecube.cluster.gossip.GossipProtocolImpl; +import io.scalecube.cluster.membership.MembershipConfig; import io.scalecube.cluster.membership.MembershipEvent; import io.scalecube.cluster.membership.MembershipProtocolImpl; import io.scalecube.cluster.metadata.MetadataStore; import io.scalecube.cluster.metadata.MetadataStoreImpl; import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; +import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.net.Address; import io.scalecube.transport.netty.TransportImpl; import java.lang.management.ManagementFactory; @@ -129,6 +133,58 @@ public ClusterImpl config(UnaryOperator options) { return cluster; } + /** + * Returns a new cluster's instance which will apply the given options. + * + * @param options transport config options + * @return new {@code ClusterImpl} instance + */ + public ClusterImpl transport(UnaryOperator options) { + Objects.requireNonNull(options); + ClusterImpl cluster = new ClusterImpl(this); + cluster.config = config.transport(options); + return cluster; + } + + /** + * Returns a new cluster's instance which will apply the given options. + * + * @param options failureDetector config options + * @return new {@code ClusterImpl} instance + */ + public ClusterImpl failureDetector(UnaryOperator options) { + Objects.requireNonNull(options); + ClusterImpl cluster = new ClusterImpl(this); + cluster.config = config.failureDetector(options); + return cluster; + } + + /** + * Returns a new cluster's instance which will apply the given options. + * + * @param options gossip config options + * @return new {@code ClusterImpl} instance + */ + public ClusterImpl gossip(UnaryOperator options) { + Objects.requireNonNull(options); + ClusterImpl cluster = new ClusterImpl(this); + cluster.config = config.gossip(options); + return cluster; + } + + /** + * Returns a new cluster's instance which will apply the given options. + * + * @param options membership config options + * @return new {@code ClusterImpl} instance + */ + public ClusterImpl membership(UnaryOperator options) { + Objects.requireNonNull(options); + ClusterImpl cluster = new ClusterImpl(this); + cluster.config = config.membership(options); + return cluster; + } + /** * Returns a new cluster's instance with given handler. The previous handler will be replaced. * diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java index f3d8ff52..9db80472 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java @@ -35,9 +35,7 @@ public void testMembersAccessFromScheduler() { // Start seed node Cluster seedNode = new ClusterImpl().startAwait(); Cluster otherNode = - new ClusterImpl() - .config(config -> config.membership(opts -> opts.seedMembers(seedNode.address()))) - .startAwait(); + new ClusterImpl().membership(opts -> opts.seedMembers(seedNode.address())).startAwait(); assertEquals(2, seedNode.members().size()); assertEquals(2, otherNode.members().size()); @@ -53,15 +51,13 @@ public void testMembersAccessFromScheduler() { @Test public void testJoinLocalhostIgnored() { + Address[] addresses = {Address.from("localhost:4801"), Address.from("127.0.0.1:4801")}; + // Start seed node Cluster seedNode = - new ClusterImpl( - new ClusterConfig() - .transport(opts -> opts.port(4801).connectTimeout(500)) - .membership( - opts -> - opts.seedMembers( - Address.from("localhost:4801"), Address.from("127.0.0.1:4801")))) + new ClusterImpl() + .transport(opts -> opts.port(4801).connectTimeout(500)) + .membership(opts -> opts.seedMembers(addresses)) .startAwait(); Collection otherMembers = seedNode.otherMembers(); @@ -72,12 +68,9 @@ public void testJoinLocalhostIgnored() { public void testJoinLocalhostIgnoredWithOverride() { // Start seed node Cluster seedNode = - new ClusterImpl( - new ClusterConfig() - .memberHost("localhost") - .memberPort(7878) - .transport(opts -> opts.port(7878).connectTimeout(500)) - .membership(opts -> opts.seedMembers(Address.from("localhost:7878")))) + new ClusterImpl(new ClusterConfig().memberHost("localhost").memberPort(7878)) + .transport(opts -> opts.port(7878).connectTimeout(500)) + .membership(opts -> opts.seedMembers(Address.from("localhost:7878"))) .startAwait(); Collection otherMembers = seedNode.otherMembers(); @@ -97,7 +90,7 @@ public void testJoinDynamicPort() { for (int i = 0; i < membersNum; i++) { otherNodes.add( new ClusterImpl() - .config(config -> config.membership(opts -> opts.seedMembers(seedNode.address()))) + .membership(opts -> opts.seedMembers(seedNode.address())) .startAwait()); } LOGGER.info("Start up time: {} ms", System.currentTimeMillis() - startAt); @@ -129,11 +122,8 @@ public void testUpdateMetadata() throws Exception { metadata.put("key2", "value2"); metadataNode = new ClusterImpl() - .config( - config -> - config - .membership(opts -> opts.seedMembers(seedNode.address())) - .metadata(metadata)) + .config(opts -> opts.metadata(metadata)) + .membership(opts -> opts.seedMembers(seedNode.address())) .startAwait(); // Start other test members @@ -141,8 +131,7 @@ public void testUpdateMetadata() throws Exception { .flatMap( integer -> new ClusterImpl() - .config( - config -> config.membership(opts -> opts.seedMembers(seedNode.address()))) + .membership(opts -> opts.seedMembers(seedNode.address())) .handler( cluster -> new ClusterMessageHandler() { @@ -206,11 +195,8 @@ public void testUpdateMetadataProperty() throws Exception { metadata.put("key2", "value2"); metadataNode = new ClusterImpl() - .config( - config -> - config - .membership(opts -> opts.seedMembers(seedNode.address())) - .metadata(metadata)) + .config(opts -> opts.metadata(metadata)) + .membership(opts -> opts.seedMembers(seedNode.address())) .startAwait(); // Start other test members @@ -218,8 +204,7 @@ public void testUpdateMetadataProperty() throws Exception { .flatMap( integer -> new ClusterImpl() - .config( - config -> config.membership(opts -> opts.seedMembers(seedNode.address()))) + .membership(opts -> opts.seedMembers(seedNode.address())) .handler( cluster -> new ClusterMessageHandler() { @@ -288,11 +273,8 @@ public void testRemoveMetadataProperty() throws Exception { metadata.put("key2", "value2"); metadataNode = new ClusterImpl() - .config( - config -> - config - .membership(opts -> opts.seedMembers(seedNode.address())) - .metadata(metadata)) + .config(opts -> opts.metadata(metadata)) + .membership(opts -> opts.seedMembers(seedNode.address())) .startAwait(); // Start other test members @@ -300,8 +282,7 @@ public void testRemoveMetadataProperty() throws Exception { .flatMap( integer -> new ClusterImpl() - .config( - config -> config.membership(opts -> opts.seedMembers(seedNode.address()))) + .membership(opts -> opts.seedMembers(seedNode.address())) .handler( cluster -> new ClusterMessageHandler() { @@ -374,17 +355,17 @@ public void onMembershipEvent(MembershipEvent event) { // Start nodes final Cluster node1 = new ClusterImpl() - .config(config -> config.membership(opts -> opts.seedMembers(seedNode.address()))) + .membership(opts -> opts.seedMembers(seedNode.address())) .handler(cluster -> listener) .startAwait(); final Cluster node2 = new ClusterImpl() - .config(config -> config.membership(opts -> opts.seedMembers(seedNode.address()))) + .membership(opts -> opts.seedMembers(seedNode.address())) .handler(cluster -> listener) .startAwait(); final Cluster node3 = new ClusterImpl() - .config(config -> config.membership(opts -> opts.seedMembers(seedNode.address()))) + .membership(opts -> opts.seedMembers(seedNode.address())) .handler(cluster -> listener) .startAwait(); @@ -405,7 +386,7 @@ public void testMemberMetadataRemoved() throws InterruptedException { seedMetadata.put("seed", "shmid"); final Cluster seedNode = new ClusterImpl() - .config(options -> options.metadata(seedMetadata)) + .config(opts -> opts.metadata(seedMetadata)) .handler( cluster -> new ClusterMessageHandler() { @@ -423,11 +404,8 @@ public void onMembershipEvent(MembershipEvent event) { ReplayProcessor node1Events = ReplayProcessor.create(); final Cluster node1 = new ClusterImpl() - .config( - config -> - config - .membership(opts -> opts.seedMembers(seedNode.address())) - .metadata(node1Metadata)) + .config(opts -> opts.metadata(node1Metadata)) + .membership(opts -> opts.seedMembers(seedNode.address())) .handler( cluster -> new ClusterMessageHandler() { @@ -478,10 +456,7 @@ public void testJoinSeedClusterWithNoExistingSeedMember() { Address nonExistingSeed2 = Address.from("localhost:5678"); Address[] seeds = new Address[] {nonExistingSeed1, nonExistingSeed2, seedNode.address()}; - Cluster otherNode = - new ClusterImpl() - .config(config -> config.membership(opts -> opts.seedMembers(seeds))) - .startAwait(); + Cluster otherNode = new ClusterImpl().membership(opts -> opts.seedMembers(seeds)).startAwait(); assertEquals(otherNode.member(), seedNode.otherMembers().iterator().next()); assertEquals(seedNode.member(), otherNode.otherMembers().iterator().next()); diff --git a/examples/src/main/java/io/scalecube/examples/ClusterCustomMetadataEncodingExample.java b/examples/src/main/java/io/scalecube/examples/ClusterCustomMetadataEncodingExample.java index cbfa634c..44095579 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterCustomMetadataEncodingExample.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterCustomMetadataEncodingExample.java @@ -14,7 +14,7 @@ public static void main(String[] args) throws Exception { // Start seed cluster member Alice Cluster alice = new ClusterImpl() - .config(config -> config.metadataDecoder(new LongMetadataDecoder())) + .config(opts -> opts.metadataDecoder(new LongMetadataDecoder())) .startAwait(); System.out.println( "[" + alice.member().id() + "] Alice's metadata: " + alice.metadata().orElse(null)); @@ -22,12 +22,11 @@ public static void main(String[] args) throws Exception { Cluster joe = new ClusterImpl() .config( - config -> - config - .membership(opts -> opts.seedMembers(alice.address())) - .metadataDecoder(new LongMetadataDecoder()) + opts -> + opts.metadataDecoder(new LongMetadataDecoder()) .metadataEncoder(new LongMetadataEncoder()) .metadata(123L)) + .membership(opts -> opts.seedMembers(alice.address())) .startAwait(); System.out.println( "[" + joe.member().id() + "] Joe's metadata: " + joe.metadata().orElse(null)); @@ -35,12 +34,11 @@ public static void main(String[] args) throws Exception { Cluster bob = new ClusterImpl() .config( - config -> - config - .membership(opts -> opts.seedMembers(alice.address())) - .metadataDecoder(new LongMetadataDecoder()) + opts -> + opts.metadataDecoder(new LongMetadataDecoder()) .metadataEncoder(new LongMetadataEncoder()) .metadata(456L)) + .membership(opts -> opts.seedMembers(alice.address())) .startAwait(); System.out.println( "[" + bob.member().id() + "] Bob's metadata: " + bob.metadata().orElse(null)); diff --git a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java index 0ecd617c..6d2e7f19 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterJoinExamples.java @@ -17,23 +17,20 @@ public class ClusterJoinExamples { /** Main method. */ - public static void main(String[] args) throws Exception { + public static void main(String[] args) { // Start seed member Alice Cluster alice = new ClusterImpl().startAwait(); // Join Bob to cluster with Alice Cluster bob = - new ClusterImpl() - .config(config -> config.membership(opts -> opts.seedMembers(alice.address()))) - .startAwait(); + new ClusterImpl().membership(opts -> opts.seedMembers(alice.address())).startAwait(); // Join Carol to cluster with metadata Map metadata = Collections.singletonMap("name", "Carol"); Cluster carol = new ClusterImpl() - .config( - config -> - config.membership(opts -> opts.seedMembers(alice.address())).metadata(metadata)) + .config(opts -> opts.metadata(metadata)) + .membership(opts -> opts.seedMembers(alice.address())) .startAwait(); // Start Dan on port 3000 diff --git a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java index f9463fcc..9547d00e 100644 --- a/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java +++ b/examples/src/main/java/io/scalecube/examples/ClusterMetadataExample.java @@ -29,11 +29,8 @@ public static void main(String[] args) throws Exception { //noinspection unused Cluster joe = new ClusterImpl() - .config( - config -> - config - .membership(opts -> opts.seedMembers(alice.address())) - .metadata(Collections.singletonMap("name", "Joe"))) + .config(opts -> opts.metadata(Collections.singletonMap("name", "Joe"))) + .membership(opts -> opts.seedMembers(alice.address())) .handler( cluster -> { return new ClusterMessageHandler() { diff --git a/examples/src/main/java/io/scalecube/examples/GossipExample.java b/examples/src/main/java/io/scalecube/examples/GossipExample.java index 5c728637..d33ebaaf 100644 --- a/examples/src/main/java/io/scalecube/examples/GossipExample.java +++ b/examples/src/main/java/io/scalecube/examples/GossipExample.java @@ -32,7 +32,7 @@ public void onGossip(Message gossip) { //noinspection unused Cluster bob = new ClusterImpl() - .config(config -> config.membership(opts -> opts.seedMembers(alice.address()))) + .membership(opts -> opts.seedMembers(alice.address())) .handler( cluster -> { return new ClusterMessageHandler() { @@ -47,7 +47,7 @@ public void onGossip(Message gossip) { //noinspection unused Cluster carol = new ClusterImpl() - .config(config -> config.membership(opts -> opts.seedMembers(alice.address()))) + .membership(opts -> opts.seedMembers(alice.address())) .handler( cluster -> { return new ClusterMessageHandler() { @@ -62,7 +62,7 @@ public void onGossip(Message gossip) { //noinspection unused Cluster dan = new ClusterImpl() - .config(config -> config.membership(opts -> opts.seedMembers(alice.address()))) + .membership(opts -> opts.seedMembers(alice.address())) .handler( cluster -> { return new ClusterMessageHandler() { @@ -76,9 +76,7 @@ public void onGossip(Message gossip) { // Start cluster node Eve that joins cluster and spreads gossip Cluster eve = - new ClusterImpl() - .config(config -> config.membership(opts -> opts.seedMembers(alice.address()))) - .startAwait(); + new ClusterImpl().membership(opts -> opts.seedMembers(alice.address())).startAwait(); eve.spreadGossip(Message.fromData("Gossip from Eve")) .doOnError(System.err::println) .subscribe(null, Throwable::printStackTrace); diff --git a/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java b/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java index 38321685..e8311358 100644 --- a/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java +++ b/examples/src/main/java/io/scalecube/examples/MembershipEventsExample.java @@ -26,7 +26,7 @@ public static void main(String[] args) throws Exception { // Alice init cluster Cluster alice = new ClusterImpl() - .config(options -> options.metadata(Collections.singletonMap("name", "Alice"))) + .config(opts -> opts.metadata(Collections.singletonMap("name", "Alice"))) .handler( cluster -> { return new ClusterMessageHandler() { @@ -42,11 +42,8 @@ public void onMembershipEvent(MembershipEvent event) { // Bob join cluster Cluster bob = new ClusterImpl() - .config( - config -> - config - .membership(opts -> opts.seedMembers(alice.address())) - .metadata(Collections.singletonMap("name", "Bob"))) + .config(opts -> opts.metadata(Collections.singletonMap("name", "Bob"))) + .membership(opts -> opts.seedMembers(alice.address())) .handler( cluster -> { return new ClusterMessageHandler() { @@ -62,11 +59,8 @@ public void onMembershipEvent(MembershipEvent event) { // Carol join cluster Cluster carol = new ClusterImpl() - .config( - config -> - config - .membership(opts -> opts.seedMembers(alice.address(), bob.address())) - .metadata(Collections.singletonMap("name", "Carol"))) + .config(opts -> opts.metadata(Collections.singletonMap("name", "Carol"))) + .membership(opts -> opts.seedMembers(alice.address(), bob.address())) .handler( cluster -> { return new ClusterMessageHandler() { diff --git a/examples/src/main/java/io/scalecube/examples/MessagingExample.java b/examples/src/main/java/io/scalecube/examples/MessagingExample.java index 6dce0ca0..8a695a38 100644 --- a/examples/src/main/java/io/scalecube/examples/MessagingExample.java +++ b/examples/src/main/java/io/scalecube/examples/MessagingExample.java @@ -37,7 +37,7 @@ public void onMessage(Message msg) { // messages Cluster bob = new ClusterImpl() - .config(config -> config.membership(opts -> opts.seedMembers(alice.address()))) + .membership(opts -> opts.seedMembers(alice.address())) .handler( cluster -> { return new ClusterMessageHandler() { @@ -55,9 +55,7 @@ public void onMessage(Message msg) { // Join cluster node Carol to cluster with Alice and Bob Cluster carol = new ClusterImpl() - .config( - config -> - config.membership(opts -> opts.seedMembers(alice.address(), bob.address()))) + .membership(opts -> opts.seedMembers(alice.address(), bob.address())) .handler( cluster -> { return new ClusterMessageHandler() {