From fd2ab0132cf28f86939815788b8c8231ec5fe0ca Mon Sep 17 00:00:00 2001 From: Stamatis Zampetakis Date: Fri, 13 Nov 2020 18:24:43 +0100 Subject: [PATCH] HIVE-24179: Memory leak in HS2 DbTxnManager when compiling SHOW LOCKS statement (Stamatis Zampetakis, reviewed by Denys Kuzmenko, Jesus Camacho Rodriguez) 1. Avoid multiple shutdown hooks for the same threadpool in DbTxnManager. Enforce unique initialization and shutdown of the thread pool. Apart from the apparent memory leak there is no need to register a shutdown hook for the executor service multiple times. It wastes memory, complicates code, and serves no purpose. Change initHeartbeatExecutorService to synchronized putting the class lock in the whole method. Mixing instance and class locks is confusing and does not offer any particular benefit. 2. Avoid HiveTxnManager instantiation in lock analyzers. A transaction manager should already exist inside the analyzer before calling the analyze method. There is no need to create a new one just to obtain the useNewShowLocksFormat boolean indicator. Closes apache/hive#1509 --- .../hadoop/hive/ql/lockmgr/DbTxnManager.java | 40 ++++++++----------- .../hive/ql/parse/DDLSemanticAnalyzer.java | 16 +------- 2 files changed, 19 insertions(+), 37 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java index c644b2157af..97caac13cc5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java @@ -165,7 +165,6 @@ public final class DbTxnManager extends HiveTxnManagerImpl { // ExecutorService for sending heartbeat to metastore periodically. private static ScheduledExecutorService heartbeatExecutorService = null; private ScheduledFuture heartbeatTask = null; - private Runnable shutdownRunner = null; private static final int SHUTDOWN_HOOK_PRIORITY = 0; /** @@ -194,18 +193,6 @@ IMetaStoreClient getMS() throws LockException { } } DbTxnManager() { - shutdownRunner = new Runnable() { - @Override - public void run() { - if (heartbeatExecutorService != null - && !heartbeatExecutorService.isShutdown() - && !heartbeatExecutorService.isTerminated()) { - LOG.info("Shutting down Heartbeater thread pool."); - heartbeatExecutorService.shutdown(); - } - } - }; - ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY); } @Override @@ -794,11 +781,12 @@ public boolean isImplicitTransactionOpen() { protected void destruct() { try { stopHeartbeat(); - if (shutdownRunner != null) { - ShutdownHookManager.removeShutdownHook(shutdownRunner); + if (isTxnOpen()) { + rollbackTxn(); + } + if (lockMgr != null) { + lockMgr.close(); } - if (isTxnOpen()) rollbackTxn(); - if (lockMgr != null) lockMgr.close(); } catch (Exception e) { LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage() + ">, swallowing as there is nothing we can do with it."); @@ -810,18 +798,17 @@ private void init() throws LockException { if (conf == null) { throw new RuntimeException("Must call setHiveConf before any other methods."); } - initHeartbeatExecutorService(); + initHeartbeatExecutorService(conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE)); } - private synchronized void initHeartbeatExecutorService() { - synchronized (DbTxnManager.class) { - if (heartbeatExecutorService != null && !heartbeatExecutorService.isShutdown() - && !heartbeatExecutorService.isTerminated()) { + private synchronized static void initHeartbeatExecutorService(int corePoolSize) { + if(heartbeatExecutorService != null) { return; } + // The following code will be executed only once when the service is not initialized heartbeatExecutorService = Executors.newScheduledThreadPool( - conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE), + corePoolSize, new ThreadFactory() { private final AtomicInteger threadCounter = new AtomicInteger(); @@ -831,6 +818,13 @@ public Thread newThread(Runnable r) { } }); ((ScheduledThreadPoolExecutor) heartbeatExecutorService).setRemoveOnCancelPolicy(true); + ShutdownHookManager.addShutdownHook(DbTxnManager::shutdownHeartbeatExecutorService, SHUTDOWN_HOOK_PRIORITY); + } + + private synchronized static void shutdownHeartbeatExecutorService() { + if (heartbeatExecutorService != null && !heartbeatExecutorService.isShutdown()) { + LOG.info("Shutting down Heartbeater thread pool."); + heartbeatExecutorService.shutdown(); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index d0b2de2e14d..4e64248748c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -2821,13 +2821,7 @@ private void analyzeShowLocks(ASTNode ast) throws SemanticException { } } - HiveTxnManager txnManager = null; - try { - txnManager = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - } catch (LockException e) { - throw new SemanticException(e.getMessage()); - } - + assert txnManager != null : "Transaction manager should be set before calling analyze"; ShowLocksDesc showLocksDesc = new ShowLocksDesc(ctx.getResFile(), tableName, partSpec, isExtended, txnManager.useNewShowLocksFormat()); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), @@ -2851,13 +2845,7 @@ private void analyzeShowDbLocks(ASTNode ast) throws SemanticException { boolean isExtended = (ast.getChildCount() > 1); String dbName = stripQuotes(ast.getChild(0).getText()); - HiveTxnManager txnManager = null; - try { - txnManager = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf); - } catch (LockException e) { - throw new SemanticException(e.getMessage()); - } - + assert txnManager != null : "Transaction manager should be set before calling analyze"; ShowLocksDesc showLocksDesc = new ShowLocksDesc(ctx.getResFile(), dbName, isExtended, txnManager.useNewShowLocksFormat()); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),