diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java index 1996807edd2..97de99a753c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java @@ -19,10 +19,19 @@ package org.apache.zookeeper.server.admin; import java.net.InetSocketAddress; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.stream.Collectors; -import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.zookeeper.Environment; import org.apache.zookeeper.Environment.Entry; import org.apache.zookeeper.Version; @@ -36,7 +45,9 @@ import org.apache.zookeeper.server.quorum.FollowerZooKeeperServer; import org.apache.zookeeper.server.quorum.Leader; import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer; +import org.apache.zookeeper.server.quorum.MultipleAddresses; import org.apache.zookeeper.server.quorum.QuorumPeer; +import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType; import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer; import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer; @@ -620,7 +631,8 @@ public CommandResponse run(ZooKeeperServer zkServer, Map kwargs) CommandResponse response = initializeResponse(); if (zkServer instanceof QuorumZooKeeperServer) { QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self; - VotingView votingView = new VotingView(peer.getVotingView()); + Map votingView = peer.getVotingView().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> new QuorumServerView(e.getValue()))); response.put("current_config", votingView); } else { response.put("current_config", Collections.emptyMap()); @@ -628,49 +640,44 @@ public CommandResponse run(ZooKeeperServer zkServer, Map kwargs) return response; } + private static class QuorumServerView { - private static class VotingView { - private final Map view; + @JsonProperty + private List serverAddresses; - VotingView(Map view) { - this.view = view.entrySet().stream() - .filter(e -> e.getValue().addr != null) - .collect(Collectors.toMap(Map.Entry::getKey, - e -> String.format("%s:%s%s", - getMultiAddressString(e.getValue()), - e.getValue().type.equals(QuorumPeer.LearnerType.PARTICIPANT) ? "participant" : "observer", - e.getValue().clientAddr ==null || e.getValue().isClientAddrFromStatic ? "" : - String.format(";%s:%d", - QuorumPeer.QuorumServer.delimitedHostString(e.getValue().clientAddr), - e.getValue().clientAddr.getPort())), - (v1, v2) -> v1, // cannot get duplicates as this straight draws from the other map - TreeMap::new)); - } - - private String getMultiAddressString(QuorumPeer.QuorumServer qs) { - return qs.addr.getAllAddresses().stream() - .map(address -> getSingleAddressString(qs, address)) - .collect(Collectors.joining(",")); - } + @JsonProperty + private List electionAddresses; - private String getSingleAddressString(QuorumPeer.QuorumServer qs, InetSocketAddress address) { - final String addressHostString = address.getHostString(); - final String delimitedHostString = QuorumPeer.QuorumServer.delimitedHostString(address); + @JsonProperty + private String clientAddress; - Optional matchingElectionAddress = qs.electionAddr.getAllAddresses().stream() - .filter(electionAddress -> electionAddress.getHostString().equals(addressHostString)) - .findFirst(); - final String electionPort = matchingElectionAddress.map(e-> ":" + e.getPort()).orElse(""); + @JsonProperty + private String learnerType; - return String.format("%s:%d%s", delimitedHostString, address.getPort(), electionPort); + public QuorumServerView(QuorumPeer.QuorumServer quorumServer) { + this.serverAddresses = getMultiAddressString(quorumServer.addr); + this.electionAddresses = getMultiAddressString(quorumServer.electionAddr); + this.learnerType = quorumServer.type.equals(LearnerType.PARTICIPANT) ? "participant" : "observer"; + this.clientAddress = getAddressString(quorumServer.clientAddr); } - @JsonAnyGetter - public Map getView() { - return view; + private static List getMultiAddressString(MultipleAddresses multipleAddresses) { + if(multipleAddresses == null) { + return Collections.emptyList(); + } + + return multipleAddresses.getAllAddresses().stream() + .map(QuorumServerView::getAddressString) + .collect(Collectors.toList()); } - } + private static String getAddressString(InetSocketAddress address) { + if(address == null) { + return ""; + } + return String.format("%s:%d", QuorumPeer.QuorumServer.delimitedHostString(address), address.getPort()); + } + } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java index b5b1a507300..c991a28fbad 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Leader.java @@ -311,7 +311,7 @@ ServerSocket createServerSocket(InetSocketAddress address, boolean portUnificati serverSocket.bind(address); return serverSocket; } catch (BindException e) { - LOG.error("Couldn't bind to " + self.getQuorumAddress(), e); + LOG.error("Couldn't bind to " + address.toString(), e); throw e; } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java index a8488334b3b..2f8b8ccd69b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java @@ -114,7 +114,7 @@ public class QuorumCnxManager { /* * Protocol identifier used among peers */ - public static final long PROTOCOL_VERSION = -65536L; + public static final long PROTOCOL_VERSION = -65535L; /* * Max buffer size to be read from the network. diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java index 86b5207ee3b..e432dd22218 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java @@ -292,6 +292,12 @@ public void testWatchSummary() throws IOException, InterruptedException { new Field("num_total_watches", Integer.class)); } + @Test + public void testVotingViewCommand() throws IOException, InterruptedException { + testCommand("voting_view", + new Field("current_config", Map.class)); + } + @Test public void testConsCommandSecureOnly() { // Arrange diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java index 25373fae035..400996eee51 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -58,6 +59,8 @@ import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class CnxManagerTest extends ZKTestCase { protected static final Logger LOG = LoggerFactory.getLogger(FLENewEpochTest.class); protected static final int THRESHOLD = 4; @@ -608,7 +611,7 @@ public void testInitialMessage() throws Exception { Assert.fail("bad hostport accepted"); } catch (InitialMessage.InitialMessageException ex) {} - // good message + // good message, single election address try { hostport = "10.0.0.2:3888"; @@ -621,6 +624,30 @@ public void testInitialMessage() throws Exception { // now parse it din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray())); msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din); + assertEquals(new Long(5L), msg.sid); + assertEquals(Arrays.asList(new InetSocketAddress("10.0.0.2", 3888)), msg.electionAddr); + } catch (InitialMessage.InitialMessageException ex) { + Assert.fail(ex.toString()); + } + + // good message, multiple election addresses (ZOOKEEPER-3188) + try { + + hostport = "1.1.1.1:9999,2.2.2.2:8888,3.3.3.3:7777"; + bos = new ByteArrayOutputStream(); + dout = new DataOutputStream(bos); + dout.writeLong(5L); // sid + dout.writeInt(hostport.getBytes().length); + dout.writeBytes(hostport); + + // now parse it + din = new DataInputStream(new ByteArrayInputStream(bos.toByteArray())); + msg = InitialMessage.parse(QuorumCnxManager.PROTOCOL_VERSION, din); + assertEquals(new Long(5L), msg.sid); + assertEquals(Arrays.asList(new InetSocketAddress("1.1.1.1", 9999), + new InetSocketAddress("2.2.2.2", 8888), + new InetSocketAddress("3.3.3.3", 7777)), + msg.electionAddr); } catch (InitialMessage.InitialMessageException ex) { Assert.fail(ex.toString()); }