diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 8f708515ccd9..3749f01caad1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -271,6 +271,15 @@ public enum OperationStatusCode { /** Configuration key for ZooKeeper session timeout */ public static final String ZK_SESSION_TIMEOUT = "zookeeper.session.timeout"; + /** Timeout for the ZK sync() call */ + public static final String ZK_SYNC_BLOCKING_TIMEOUT_MS = "hbase.zookeeper.sync.timeout.millis"; + // Choice of the default value is based on the following ZK recommendation (from docs). Keeping it + // lower lets the callers fail fast in case of any issues. + // "The clients view of the system is guaranteed to be up-to-date within a certain time bound. + // (On the order of tens of seconds.) Either system changes will be seen by a client within this + // bound, or the client will detect a service outage." + public static final long ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS = 30 * 1000; + /** Default value for ZooKeeper session timeout */ public static final int DEFAULT_ZK_SESSION_TIMEOUT = 90 * 1000; diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 1ca6c2e583e9..352734064ab3 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -204,7 +204,7 @@ public static ThreadPoolExecutor getBoundedCachedThreadPool( * @param prefix The prefix of every created Thread's name * @return a {@link java.util.concurrent.ThreadFactory} that names threads */ - private static ThreadFactory getNamedThreadFactory(final String prefix) { + public static ThreadFactory getNamedThreadFactory(final String prefix) { SecurityManager s = System.getSecurityManager(); final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread() .getThreadGroup(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java index c51d4937a143..7abf6bb0ad0f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/example/HFileArchiveManager.java @@ -125,7 +125,7 @@ private void enable(ZKWatcher zooKeeper, byte[] table) */ private void disable(ZKWatcher zooKeeper, byte[] table) throws KeeperException { // ensure the latest state of the archive node is found - zooKeeper.sync(archiveZnode); + zooKeeper.syncOrTimeout(archiveZnode); // if the top-level archive node is gone, then we are done if (ZKUtil.checkExists(zooKeeper, archiveZnode) < 0) { @@ -134,7 +134,7 @@ private void disable(ZKWatcher zooKeeper, byte[] table) throws KeeperException { // delete the table node, from the archive String tableNode = this.getTableNode(table); // make sure the table is the latest version so the delete takes - zooKeeper.sync(tableNode); + zooKeeper.syncOrTimeout(tableNode); LOG.debug("Attempting to delete table node:" + tableNode); ZKUtil.deleteNodeRecursively(zooKeeper, tableNode); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java index 0bbc8d3a35c4..bcb3b8ba4fbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/ZKVisibilityLabelWatcher.java @@ -111,7 +111,7 @@ public void nodeDeleted(String path) { public void nodeDataChanged(String path) { if (path.equals(labelZnode) || path.equals(userAuthsZnode)) { try { - watcher.sync(path); + watcher.syncOrTimeout(path); byte[] data = ZKUtil.getDataAndWatch(watcher, path); if (path.equals(labelZnode)) { refreshVisibilityLabelsCache(data); diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java index f3d6884d27b8..c3a7470e6b71 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RecoverableZooKeeper.java @@ -721,7 +721,7 @@ public synchronized byte[] getSessionPasswd() { } public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) throws KeeperException { - checkZk().sync(path, cb, null); + checkZk().sync(path, cb, ctx); } /** diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java index ac7538f008ff..8fdc4b218db3 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKWatcher.java @@ -24,14 +24,20 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.security.Superusers; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.security.UserGroupInformation; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -81,12 +87,22 @@ public class ZKWatcher implements Watcher, Abortable, Closeable { // listeners to be notified private final List listeners = new CopyOnWriteArrayList<>(); - // Used by ZKUtil:waitForZKConnectionIfAuthenticating to wait for SASL - // negotiation to complete - private CountDownLatch saslLatch = new CountDownLatch(1); + // Single threaded executor pool that processes event notifications from Zookeeper. Events are + // processed in the order in which they arrive (pool backed by an unbounded fifo queue). We do + // this to decouple the event processing from Zookeeper's ClientCnxn's EventThread context. + // EventThread internally runs a single while loop to serially process all the events. When events + // are processed by the listeners in the same thread, that blocks the EventThread from processing + // subsequent events. Processing events in a separate thread frees up the event thread to continue + // and further prevents deadlocks if the process method itself makes other zookeeper calls. + // It is ok to do it in a single thread because the Zookeeper ClientCnxn already serializes the + // requests using a single while loop and hence there is no performance degradation. + private final ExecutorService zkEventProcessor = + Executors.newSingleThreadExecutor(Threads.getNamedThreadFactory("zk-event-processor")); private final Configuration conf; + private final long zkSyncTimeout; + /* A pattern that matches a Kerberos name, borrowed from Hadoop's KerberosName */ private static final Pattern NAME_PATTERN = Pattern.compile("([^/@]*)(/([^/@]*))?@([^/@]*)"); @@ -175,6 +191,8 @@ public ZKWatcher(Configuration conf, String identifier, Abortable abortable, throw zce; } } + this.zkSyncTimeout = conf.getLong(HConstants.ZK_SYNC_BLOCKING_TIMEOUT_MS, + HConstants.ZK_SYNC_BLOCKING_TIMEOUT_DEFAULT_MS); } private void createBaseZNodes() throws ZooKeeperConnectionException { @@ -466,21 +484,8 @@ public ZNodePaths getZNodePaths() { return znodePaths; } - /** - * Method called from ZooKeeper for events and connection status. - *

- * Valid events are passed along to listeners. Connection status changes - * are dealt with locally. - */ - @Override - public void process(WatchedEvent event) { - LOG.debug(prefix("Received ZooKeeper Event, " + - "type=" + event.getType() + ", " + - "state=" + event.getState() + ", " + - "path=" + event.getPath())); - + private void processEvent(WatchedEvent event) { switch(event.getType()) { - // If event type is NONE, this is a connection status change case None: { connectionEvent(event); @@ -488,7 +493,6 @@ public void process(WatchedEvent event) { } // Otherwise pass along to the listeners - case NodeCreated: { for(ZKListener listener : listeners) { listener.nodeCreated(event.getPath()); @@ -517,10 +521,26 @@ public void process(WatchedEvent event) { break; } default: - throw new IllegalStateException("Received event is not valid: " + event.getState()); + LOG.error("Invalid event of type {} received for path {}. Ignoring.", + event.getState(), event.getPath()); } } + /** + * Method called from ZooKeeper for events and connection status. + *

+ * Valid events are passed along to listeners. Connection status changes + * are dealt with locally. + */ + @Override + public void process(WatchedEvent event) { + LOG.debug(prefix("Received ZooKeeper Event, " + + "type=" + event.getType() + ", " + + "state=" + event.getState() + ", " + + "path=" + event.getPath())); + zkEventProcessor.submit(() -> processEvent(event)); + } + // Connection management /** @@ -572,7 +592,8 @@ private void connectionEvent(WatchedEvent event) { } /** - * Forces a synchronization of this ZooKeeper client connection. + * Forces a synchronization of this ZooKeeper client connection within a timeout. Enforcing a + * timeout lets the callers fail-fast rather than wait forever for the sync to finish. *

* Executing this method before running other methods will ensure that the * subsequent operations are up-to-date and consistent as of the time that @@ -582,9 +603,28 @@ private void connectionEvent(WatchedEvent event) { * data of an existing node and delete or transition that node, utilizing the * previously read version and data. We want to ensure that the version read * is up-to-date from when we begin the operation. + *

*/ - public void sync(String path) throws KeeperException { - this.recoverableZooKeeper.sync(path, null, null); + public void syncOrTimeout(String path) throws KeeperException { + final CountDownLatch latch = new CountDownLatch(1); + long startTime = EnvironmentEdgeManager.currentTime(); + this.recoverableZooKeeper.sync(path, (i, s, o) -> latch.countDown(), null); + try { + if (!latch.await(zkSyncTimeout, TimeUnit.MILLISECONDS)) { + LOG.warn("sync() operation to ZK timed out. Configured timeout: {}ms. This usually points " + + "to a ZK side issue. Check ZK server logs and metrics.", zkSyncTimeout); + throw new KeeperException.OperationTimeoutException(); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted waiting for ZK sync() to finish.", e); + Thread.currentThread().interrupt(); + return; + } + if (LOG.isDebugEnabled()) { + // TODO: Switch to a metric once server side ZK watcher metrics are implemented. This is a + // useful metric to have since the latency of sync() impacts the callers. + LOG.debug("ZK sync() operation took {}ms", EnvironmentEdgeManager.currentTime() - startTime); + } } /** @@ -634,6 +674,7 @@ public void interruptedExceptionNoThrow(InterruptedException ie, boolean throwLa */ @Override public void close() { + zkEventProcessor.shutdownNow(); try { recoverableZooKeeper.close(); } catch (InterruptedException e) {