Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into ZOOKEEPER-3188
Browse files Browse the repository at this point in the history
  • Loading branch information
symat committed Aug 26, 2019
2 parents de7bad2 + b5399da commit e823af4
Show file tree
Hide file tree
Showing 12 changed files with 670 additions and 4 deletions.
1 change: 1 addition & 0 deletions bin/zkServer.sh
Expand Up @@ -212,6 +212,7 @@ stop)
else
$KILL $(cat "$ZOOPIDFILE")
rm "$ZOOPIDFILE"
sleep 1
echo STOPPED
fi
exit 0
Expand Down
2 changes: 1 addition & 1 deletion build.xml
Expand Up @@ -55,7 +55,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant">
<property name="javacc.version" value="5.0"/>

<property name="jetty.version" value="9.4.18.v20190429"/>
<property name="jackson.version" value="2.9.9.1"/>
<property name="jackson.version" value="2.9.9.3"/>
<property name="dependency-check-ant.version" value="4.0.2"/>

<property name="commons-io.version" value="2.6"/>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -280,7 +280,7 @@
<commons-cli.version>1.2</commons-cli.version>
<netty.version>4.1.36.Final</netty.version>
<jetty.version>9.4.18.v20190429</jetty.version>
<jackson.version>2.9.9.1</jackson.version>
<jackson.version>2.9.9.3</jackson.version>
<json.version>1.1.1</json.version>
<jline.version>2.11</jline.version>
<snappy.version>1.1.7</snappy.version>
Expand Down
20 changes: 20 additions & 0 deletions zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
Expand Up @@ -1494,6 +1494,26 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t
minute. This prevents herding during container deletion.
Default is "10000".

<a name="sc_debug_observability_config"></a>

#### Debug Observability Configurations

**New in 3.6.0:** The following options are introduced to make zookeeper easier to debug.

* *zookeeper.messageTracker.BufferSize* :
(Java system property only)
Controls the maximum number of messages stored in **MessageTracker**. Value should be positive
integers. The default value is 10. **MessageTracker** is introduced in **3.6.0** to record the
last set of messages between a server (follower or observer) and a leader, when a server
disconnects with leader. These set of messages will then be dumped to zookeeper's log file,
and will help reconstruct the state of the servers at the time of the disconnection and
will be useful for debugging purpose.

* *zookeeper.messageTracker.Enabled* :
(Java system property only)
When set to "true", will enable **MessageTracker** to track and record messages. Default value
is "false".

<a name="sc_adminserver_config"></a>

#### AdminServer configuration
Expand Down
Expand Up @@ -74,11 +74,16 @@ void followLeader() throws InterruptedException {
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);

long connectionTime = 0;
boolean completedSync = false;

try {
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
QuorumServer leaderServer = findLeader();
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);
connectionTime = System.currentTimeMillis();
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
if (self.isReconfigStateChange()) {
throw new Exception("learned about role change");
Expand All @@ -99,6 +104,7 @@ void followLeader() throws InterruptedException {
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
syncWithLeader(newEpochZxid);
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync = true;
} finally {
long syncTime = Time.currentElapsedTime() - startTime;
ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);
Expand Down Expand Up @@ -129,6 +135,14 @@ void followLeader() throws InterruptedException {
om.stop();
}
zk.unregisterJMX(this);

if (connectionTime != 0) {
long connectionDuration = System.currentTimeMillis() - connectionTime;
LOG.info("Disconnected from leader (with address: {}). "
+ "Was connected for {}ms. Sync state: {}",
leaderAddr, connectionDuration, completedSync);
messageTracker.dumpToLog(leaderAddr.toString());
}
}
}

Expand Down
Expand Up @@ -52,6 +52,7 @@
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.MessageTracker;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.SetDataTxn;
Expand Down Expand Up @@ -79,6 +80,7 @@ static class PacketInFlight {
protected BufferedOutputStream bufferedOutput;

protected Socket sock;
protected MultipleAddresses leaderAddr;

/**
* Socket getter
Expand All @@ -93,6 +95,9 @@ public Socket getSocket() {
/** the protocol version of the leader */
protected int leaderProtocolVersion = 0x01;

private static final int BUFFERED_MESSAGE_SIZE = 10;
protected final MessageTracker messageTracker = new MessageTracker(BUFFERED_MESSAGE_SIZE);

protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);

/**
Expand Down Expand Up @@ -151,6 +156,7 @@ void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOExcep
void writePacket(QuorumPacket pp, boolean flush) throws IOException {
synchronized (leaderOs) {
if (pp != null) {
messageTracker.trackSent(pp.getType());
leaderOs.writeRecord(pp, "packet");
}
if (flush) {
Expand All @@ -169,6 +175,7 @@ void writePacket(QuorumPacket pp, boolean flush) throws IOException {
void readPacket(QuorumPacket pp) throws IOException {
synchronized (leaderIs) {
leaderIs.readRecord(pp, "packet");
messageTracker.trackReceived(pp.getType());
}
long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (pp.getType() == Leader.PING) {
Expand Down Expand Up @@ -256,6 +263,7 @@ protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) thr
protected void connectToLeader(MultipleAddresses addr, String hostname)
throws IOException, InterruptedException {

this.leaderAddr = addr;
Set<InetSocketAddress> addresses = addr.getAllAddresses();
ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
CountDownLatch latch = new CountDownLatch(addresses.size());
Expand Down
Expand Up @@ -49,6 +49,7 @@
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
import org.apache.zookeeper.server.util.MessageTracker;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.TxnHeader;
Expand Down Expand Up @@ -220,6 +221,8 @@ public boolean equals(Object o) {
private final BufferedInputStream bufferedInput;
private BufferedOutputStream bufferedOutput;

protected final MessageTracker messageTracker;

// for test only
protected void setOutputArchive(BinaryOutputArchive oa) {
this.oa = oa;
Expand Down Expand Up @@ -280,6 +283,8 @@ protected void setBufferedOutput(BufferedOutputStream bufferedOutput) {
}
throw new SaslException("Authentication failure: " + e.getMessage());
}

this.messageTracker = new MessageTracker(MessageTracker.BUFFERED_MESSAGE_SIZE);
}

@Override
Expand Down Expand Up @@ -349,6 +354,7 @@ private void sendPackets() throws InterruptedException {
}
oa.writeRecord(p, "packet");
packetsSent.incrementAndGet();
messageTracker.trackSent(p.getType());
} catch (IOException e) {
if (!sock.isClosed()) {
LOG.warn("Unexpected exception at " + this, e);
Expand Down Expand Up @@ -464,8 +470,11 @@ public void run() {

QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");

messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO) {
LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!");

return;
}

Expand Down Expand Up @@ -526,9 +535,11 @@ public void run() {
ByteBuffer.wrap(ver).putInt(0x10000);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
messageTracker.trackSent(Leader.LEADERINFO);
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
messageTracker.trackReceived(ackEpochPacket.getType());
if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH");
return;
Expand All @@ -554,6 +565,7 @@ public void run() {
try {
long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();
oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
messageTracker.trackSent(Leader.SNAP);
bufferedOutput.flush();

LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
Expand Down Expand Up @@ -600,6 +612,8 @@ public void run() {
*/
qp = new QuorumPacket();
ia.readRecord(qp, "packet");

messageTracker.trackReceived(qp.getType());
if (qp.getType() != Leader.ACK) {
LOG.error("Next packet was supposed to be an ACK," + " but received packet: {}", packetToString(qp));
return;
Expand Down Expand Up @@ -632,6 +646,7 @@ public void run() {
while (true) {
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
messageTracker.trackReceived(qp.getType());

long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
if (qp.getType() == Leader.PING) {
Expand Down Expand Up @@ -716,7 +731,9 @@ public void run() {
syncThrottler.endSync();
syncThrottler = null;
}
LOG.warn("******* GOODBYE {} ********", getRemoteAddress());
String remoteAddr = getRemoteAddress();
LOG.warn("******* GOODBYE {} ********", remoteAddr);
messageTracker.dumpToLog(remoteAddr);
shutdown();
}
}
Expand Down
Expand Up @@ -97,12 +97,14 @@ public String toString() {
*/
void observeLeader() throws Exception {
zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);

long connectTime = 0;
boolean completedSync = false;
try {
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
QuorumServer master = findLearnerMaster();
try {
connectToLeader(master.addr, master.hostname);
connectTime = System.currentTimeMillis();
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
if (self.isReconfigStateChange()) {
throw new Exception("learned about role change");
Expand All @@ -112,6 +114,7 @@ void observeLeader() throws Exception {
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);
syncWithLeader(newLeaderZxid);
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync = true;
QuorumPacket qp = new QuorumPacket();
while (this.isRunning() && nextLearnerMaster.get() == null) {
readPacket(qp);
Expand All @@ -127,6 +130,14 @@ void observeLeader() throws Exception {
} finally {
currentLearnerMaster = null;
zk.unregisterJMX(this);
if (connectTime != 0) {
long connectionDuration = System.currentTimeMillis() - connectTime;

LOG.info("Disconnected from leader (with address: {}). "
+ "Was connected for {}ms. Sync state: {}",
leaderAddr, connectionDuration, completedSync);
messageTracker.dumpToLog(leaderAddr.toString());
}
}
}

Expand Down
@@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zookeeper.server.util;

import java.lang.reflect.Array;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Thread safe FIFO CircularBuffer implementation.
* When the buffer is full write operation overwrites the oldest element.
*
* Fun thing @todo, make this lock free as this is called on every quorum message
*/
public class CircularBuffer<T> {

private final T[] buffer;
private final int capacity;
private int oldest;
private AtomicInteger numberOfElements = new AtomicInteger();

@SuppressWarnings("unchecked")
public CircularBuffer(Class<T> clazz, int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("CircularBuffer capacity should be greater than 0");
}
this.buffer = (T[]) Array.newInstance(clazz, capacity);
this.capacity = capacity;
}

/**
* Puts elements in the next available index in the array.
* If the array is full the oldest element is replaced with
* the new value.
* @param element
*/
public synchronized void write(T element) {
int newSize = numberOfElements.incrementAndGet();
if (newSize > capacity) {
buffer[oldest] = element;
oldest = ++oldest % capacity;
numberOfElements.decrementAndGet();
} else {
int index = (oldest + numberOfElements.get() - 1) % capacity;
buffer[index] = element;
}
}

/**
* Reads from the buffer in a FIFO manner.
* Returns the oldest element in the buffer if the buffer ie not empty
* Returns null if the buffer is empty
* @return
*/
public synchronized T take() {
int newSize = numberOfElements.decrementAndGet();
if (newSize < 0) {
numberOfElements.incrementAndGet();
return null;
}
T polled = buffer[oldest];
oldest = ++oldest % capacity;
return polled;
}

public synchronized T peek() {
if (numberOfElements.get() <= 0) {
return null;
}
return buffer[oldest];
}

public int size() {
return numberOfElements.get();
}

public boolean isEmpty() {
return numberOfElements.get() <= 0;
}

public boolean isFull() {
return numberOfElements.get() >= capacity;
}

public synchronized void reset() {
numberOfElements.set(0);
}
}

0 comments on commit e823af4

Please sign in to comment.