Skip to content

Commit

Permalink
HIVE-24179: Memory leak in HS2 DbTxnManager when compiling SHOW LOCKS…
Browse files Browse the repository at this point in the history
… statement (Stamatis Zampetakis, reviewed by Denys Kuzmenko, Jesus Camacho Rodriguez)

1. Avoid multiple shutdown hooks for the same threadpool in DbTxnManager.

Enforce unique initialization and shutdown of the thread pool. Apart
from the apparent memory leak there is no need to register a shutdown
hook for the executor service multiple times. It wastes memory,
complicates code, and serves no purpose.

Change initHeartbeatExecutorService to synchronized putting the class
lock in the whole method. Mixing instance and class locks is confusing
and does not offer any particular benefit.

2. Avoid HiveTxnManager instantiation in lock analyzers.

A transaction manager should already exist inside the
analyzer before calling the analyze method. There is no need to create a
new one just to obtain the useNewShowLocksFormat boolean indicator.

Closes apache/hive#1509
  • Loading branch information
zabetak authored and Sungwoo Park committed Oct 1, 2023
1 parent e9cff2b commit 54d5060
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 37 deletions.
40 changes: 17 additions & 23 deletions ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
// ExecutorService for sending heartbeat to metastore periodically.
private static ScheduledExecutorService heartbeatExecutorService = null;
private ScheduledFuture<?> heartbeatTask = null;
private Runnable shutdownRunner = null;
private static final int SHUTDOWN_HOOK_PRIORITY = 0;

/**
Expand Down Expand Up @@ -194,18 +193,6 @@ IMetaStoreClient getMS() throws LockException {
}
}
DbTxnManager() {
shutdownRunner = new Runnable() {
@Override
public void run() {
if (heartbeatExecutorService != null
&& !heartbeatExecutorService.isShutdown()
&& !heartbeatExecutorService.isTerminated()) {
LOG.info("Shutting down Heartbeater thread pool.");
heartbeatExecutorService.shutdown();
}
}
};
ShutdownHookManager.addShutdownHook(shutdownRunner, SHUTDOWN_HOOK_PRIORITY);
}

@Override
Expand Down Expand Up @@ -794,11 +781,12 @@ public boolean isImplicitTransactionOpen() {
protected void destruct() {
try {
stopHeartbeat();
if (shutdownRunner != null) {
ShutdownHookManager.removeShutdownHook(shutdownRunner);
if (isTxnOpen()) {
rollbackTxn();
}
if (lockMgr != null) {
lockMgr.close();
}
if (isTxnOpen()) rollbackTxn();
if (lockMgr != null) lockMgr.close();
} catch (Exception e) {
LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage()
+ ">, swallowing as there is nothing we can do with it.");
Expand All @@ -810,18 +798,17 @@ private void init() throws LockException {
if (conf == null) {
throw new RuntimeException("Must call setHiveConf before any other methods.");
}
initHeartbeatExecutorService();
initHeartbeatExecutorService(conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE));
}

private synchronized void initHeartbeatExecutorService() {
synchronized (DbTxnManager.class) {
if (heartbeatExecutorService != null && !heartbeatExecutorService.isShutdown()
&& !heartbeatExecutorService.isTerminated()) {
private synchronized static void initHeartbeatExecutorService(int corePoolSize) {
if(heartbeatExecutorService != null) {
return;
}
// The following code will be executed only once when the service is not initialized
heartbeatExecutorService =
Executors.newScheduledThreadPool(
conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE),
corePoolSize,
new ThreadFactory() {
private final AtomicInteger threadCounter = new AtomicInteger();

Expand All @@ -831,6 +818,13 @@ public Thread newThread(Runnable r) {
}
});
((ScheduledThreadPoolExecutor) heartbeatExecutorService).setRemoveOnCancelPolicy(true);
ShutdownHookManager.addShutdownHook(DbTxnManager::shutdownHeartbeatExecutorService, SHUTDOWN_HOOK_PRIORITY);
}

private synchronized static void shutdownHeartbeatExecutorService() {
if (heartbeatExecutorService != null && !heartbeatExecutorService.isShutdown()) {
LOG.info("Shutting down Heartbeater thread pool.");
heartbeatExecutorService.shutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2821,13 +2821,7 @@ private void analyzeShowLocks(ASTNode ast) throws SemanticException {
}
}

HiveTxnManager txnManager = null;
try {
txnManager = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
} catch (LockException e) {
throw new SemanticException(e.getMessage());
}

assert txnManager != null : "Transaction manager should be set before calling analyze";
ShowLocksDesc showLocksDesc = new ShowLocksDesc(ctx.getResFile(), tableName,
partSpec, isExtended, txnManager.useNewShowLocksFormat());
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
Expand All @@ -2851,13 +2845,7 @@ private void analyzeShowDbLocks(ASTNode ast) throws SemanticException {
boolean isExtended = (ast.getChildCount() > 1);
String dbName = stripQuotes(ast.getChild(0).getText());

HiveTxnManager txnManager = null;
try {
txnManager = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
} catch (LockException e) {
throw new SemanticException(e.getMessage());
}

assert txnManager != null : "Transaction manager should be set before calling analyze";
ShowLocksDesc showLocksDesc = new ShowLocksDesc(ctx.getResFile(), dbName,
isExtended, txnManager.useNewShowLocksFormat());
rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
Expand Down

0 comments on commit 54d5060

Please sign in to comment.