From 8b0974d66ed97105036c7fb9d8cdc4e15f39374e Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Thu, 17 Oct 2019 16:40:49 +0300 Subject: [PATCH] Fixed MembershipProtocolImpl; updated test in ClusterTest --- .../io/scalecube/cluster/ClusterImpl.java | 21 +++++----- .../membership/MembershipProtocolImpl.java | 32 ++++++++++++++-- .../io/scalecube/cluster/ClusterTest.java | 38 ++++++++++++++++--- .../transport/netty/TransportImpl.java | 30 +++++++++++---- 4 files changed, 93 insertions(+), 28 deletions(-) diff --git a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java index 718602df..85916193 100644 --- a/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/ClusterImpl.java @@ -232,7 +232,7 @@ private Mono doStart0() { return TransportImpl.bind(config.transportConfig()) .flatMap( transport1 -> { - localMember = createLocalMember(transport1.address().port()); + localMember = createLocalMember(transport1.address()); transport = new SenderAwareTransport(transport1, localMember.address()); cidGenerator = new CorrelationIdGenerator(localMember.id()); @@ -350,18 +350,17 @@ private Flux listenMembership() { * Creates and prepares local cluster member. An address of member that's being constructed may be * overriden from config variables. * - * @param listenPort transport listen port + * @param address transport address * @return local cluster member with cluster address and cluster member id */ - private Member createLocalMember(int listenPort) { - String localAddress = Address.getLocalIpAddress().getHostAddress(); - Integer port = Optional.ofNullable(config.memberPort()).orElse(listenPort); + private Member createLocalMember(Address address) { + int port = Optional.ofNullable(config.memberPort()).orElse(address.port()); // calculate local member cluster address Address memberAddress = Optional.ofNullable(config.memberHost()) - .map(memberHost -> Address.create(memberHost, port)) - .orElseGet(() -> Address.create(localAddress, listenPort)); + .map(host -> Address.create(host, port)) + .orElseGet(() -> Address.create(address.host(), port)); return new Member(Member.generateId(), config.memberAlias(), memberAddress); } @@ -507,11 +506,11 @@ public boolean isShutdown() { private static class SenderAwareTransport implements Transport { private final Transport transport; - private final Address sender; + private final Address address; - private SenderAwareTransport(Transport transport, Address sender) { + private SenderAwareTransport(Transport transport, Address address) { this.transport = Objects.requireNonNull(transport); - this.sender = Objects.requireNonNull(sender); + this.address = Objects.requireNonNull(address); } @Override @@ -545,7 +544,7 @@ public Flux listen() { } private Message enhanceWithSender(Message message) { - return Message.with(message).sender(sender).build(); + return Message.with(message).sender(address).build(); } } } diff --git a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java index 9462d5a2..53f482a3 100644 --- a/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java +++ b/cluster/src/main/java/io/scalecube/cluster/membership/MembershipProtocolImpl.java @@ -17,6 +17,7 @@ import io.scalecube.cluster.transport.api.Message; import io.scalecube.cluster.transport.api.Transport; import io.scalecube.net.Address; +import java.net.InetAddress; import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; @@ -166,15 +167,40 @@ public MembershipProtocolImpl( .subscribe(this::onMemberRemoved))); } - // Remove duplicates and local address + // Remove duplicates and local address(es) private List
cleanUpSeedMembers(Collection
seedMembers) { + InetAddress localIpAddress = Address.getLocalIpAddress(); + + String hostAddress = localIpAddress.getHostAddress(); + String hostName = localIpAddress.getHostName(); + + Address memberAddr = localMember.address(); + Address transportAddr = transport.address(); + Address memberAddrByHostAddress = Address.create(hostAddress, memberAddr.port()); + Address transportAddrByHostAddress = Address.create(hostAddress, transportAddr.port()); + Address memberAddByHostName = Address.create(hostName, memberAddr.port()); + Address transportAddrByHostName = Address.create(hostName, transportAddr.port()); + return new LinkedHashSet<>(seedMembers) .stream() - .filter(address -> !address.equals(localMember.address())) - .filter(address -> !address.equals(transport.address())) + .filter(addr -> checkAddressesNotEqual(addr, memberAddr)) + .filter(addr -> checkAddressesNotEqual(addr, transportAddr)) + .filter(addr -> checkAddressesNotEqual(addr, memberAddrByHostAddress)) + .filter(addr -> checkAddressesNotEqual(addr, transportAddrByHostAddress)) + .filter(addr -> checkAddressesNotEqual(addr, memberAddByHostName)) + .filter(addr -> checkAddressesNotEqual(addr, transportAddrByHostName)) .collect(Collectors.toList()); } + private boolean checkAddressesNotEqual(Address address0, Address address1) { + if (!address0.equals(address1)) { + return true; + } else { + LOGGER.warn("Filtering out seed address: {}", address0); + return false; + } + } + @Override public Flux listen() { return subject.onBackpressureBuffer(); diff --git a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java index b11d791c..98412215 100644 --- a/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java +++ b/cluster/src/test/java/io/scalecube/cluster/ClusterTest.java @@ -8,6 +8,7 @@ import io.scalecube.cluster.membership.MembershipEvent.Type; import io.scalecube.cluster.metadata.SimpleMapMetadataCodec; import io.scalecube.net.Address; +import java.net.InetAddress; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -29,6 +30,7 @@ public class ClusterTest extends BaseTest { public static final Duration TIMEOUT = Duration.ofSeconds(30); + public static final int CONNECT_TIMEOUT = 3000; @Test public void testMembersAccessFromScheduler() { @@ -50,29 +52,53 @@ public void testMembersAccessFromScheduler() { } @Test - public void testJoinLocalhostIgnored() { - Address[] addresses = {Address.from("localhost:4801"), Address.from("127.0.0.1:4801")}; + public void testJoinLocalhostIgnored() throws InterruptedException { + InetAddress localIpAddress = Address.getLocalIpAddress(); + Address localAddressByHostname = Address.create(localIpAddress.getHostName(), 4801); + Address localAddressByIp = Address.create(localIpAddress.getHostAddress(), 4801); + Address[] addresses = { + Address.from("localhost:4801"), + Address.from("127.0.0.1:4801"), + Address.from("127.0.1.1:4801"), + localAddressByHostname, + localAddressByIp + }; // Start seed node Cluster seedNode = new ClusterImpl() - .transport(opts -> opts.port(4801).connectTimeout(500)) + .transport(opts -> opts.port(4801).connectTimeout(CONNECT_TIMEOUT)) .membership(opts -> opts.seedMembers(addresses)) .startAwait(); + Thread.sleep(CONNECT_TIMEOUT); + Collection otherMembers = seedNode.otherMembers(); assertEquals(0, otherMembers.size(), "otherMembers: " + otherMembers); } @Test - public void testJoinLocalhostIgnoredWithOverride() { + public void testJoinLocalhostIgnoredWithOverride() throws InterruptedException { + InetAddress localIpAddress = Address.getLocalIpAddress(); + Address localAddressByHostname = Address.create(localIpAddress.getHostName(), 7878); + Address localAddressByIp = Address.create(localIpAddress.getHostAddress(), 7878); + Address[] addresses = { + Address.from("localhost:7878"), + Address.from("127.0.0.1:7878"), + Address.from("127.0.1.1:7878"), + localAddressByHostname, + localAddressByIp + }; + // Start seed node Cluster seedNode = new ClusterImpl(new ClusterConfig().memberHost("localhost").memberPort(7878)) - .transport(opts -> opts.port(7878).connectTimeout(500)) - .membership(opts -> opts.seedMembers(Address.from("localhost:7878"))) + .transport(opts -> opts.port(7878).connectTimeout(CONNECT_TIMEOUT)) + .membership(opts -> opts.seedMembers(addresses)) .startAwait(); + Thread.sleep(CONNECT_TIMEOUT); + Collection otherMembers = seedNode.otherMembers(); assertEquals(0, otherMembers.size(), "otherMembers: " + otherMembers); } diff --git a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java index ea719185..ee076de9 100644 --- a/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java +++ b/transport-parent/transport-netty/src/main/java/io/scalecube/transport/netty/TransportImpl.java @@ -16,6 +16,7 @@ import io.scalecube.cluster.transport.api.Transport; import io.scalecube.cluster.transport.api.TransportConfig; import io.scalecube.net.Address; +import java.net.InetAddress; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -98,7 +99,7 @@ public TransportImpl(TransportConfig config) { */ private TransportImpl(DisposableServer server, TransportImpl other) { this.server = server; - this.address = Address.create(server.address().getHostString(), server.address().getPort()); + this.address = prepareAddress(server); this.config = other.config; this.loopResources = other.loopResources; this.messagesSubject = other.messagesSubject; @@ -116,6 +117,16 @@ private TransportImpl(DisposableServer server, TransportImpl other) { .subscribe(null, ex -> LOGGER.warn("Exception occurred on transport stop: " + ex)); } + private static Address prepareAddress(DisposableServer server) { + InetAddress address = server.address().getAddress(); + int port = server.address().getPort(); + if (address.isAnyLocalAddress()) { + return Address.create(Address.getLocalIpAddress().getHostAddress(), port); + } else { + return Address.create(address.getHostAddress(), port); + } + } + /** * Init transport with the default configuration synchronously. Starts to accept connections on * local address. @@ -171,15 +182,22 @@ public Mono bind0() { .handle(this::onMessage) .bind() .doOnSuccess( - server -> - LOGGER.debug("Bound cluster transport on {}:{}", server.host(), server.port())) + server -> { + InetAddress address = server.address().getAddress(); + if (address.isAnyLocalAddress()) { + LOGGER.debug("Bound cluster transport on *:{}", server.port()); + } else { + LOGGER.debug( + "Bound cluster transport on {}:{}", address.getHostAddress(), server.port()); + } + }) .doOnError( ex -> LOGGER.error( "Failed to bind cluster transport on port={}, cause: {}", config.port(), ex.toString())) - .map(this::onBind); + .map(server -> new TransportImpl(server, this)); } @Override @@ -270,10 +288,6 @@ private Message toMessage(ByteBuf byteBuf) { } } - private TransportImpl onBind(DisposableServer server) { - return new TransportImpl(server, this); - } - private Mono send0(Connection conn, Message message) { // do send return conn.outbound()