Skip to content

Commit

Permalink
HBASE-25307 ThreadLocal pooling leads to NullPointerException (apache…
Browse files Browse the repository at this point in the history
…#2685)

* PoolMap does not discard any elements anymore. If an element is put,
it always stores it. The reason: it stores expensive resources (rpc
connections) which would lead to resource leak if we simple discard it.
RpcClients can reference netty ByteBufs which are reference counted.
Resource cleanup is done by AbstractRpcClient.cleanupIdleConnections().
* PoolMap does not implement Map interface anymore, so ensuring
thread-safety has become easier. Put method is replaced with getOrCreate().
* ThreadLocalPool doesn't use ThreadLocal class anymore. It stores
resources on thread basis, but it doesn't remove values when a thread
exits. Again, proper cleanup is done by cleanupIdleConnections().

Signed-off-by: Sean Busbey <busbey@apache.org>
Signed-off-by: Wellington Chevreuil <wellington.chevreuil@gmail.com>
  • Loading branch information
meszibalu committed Nov 25, 2020
1 parent 3dd425a commit e0ae679
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 303 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
new ThreadFactoryBuilder().setNameFormat("Idle-Rpc-Conn-Sweeper-pool-%d").setDaemon(true)
.setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());

protected boolean running = true; // if client runs
private boolean running = true; // if client runs

protected final Configuration conf;
protected final String clusterId;
Expand All @@ -127,7 +127,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
protected final int readTO;
protected final int writeTO;

protected final PoolMap<ConnectionId, T> connections;
private final PoolMap<ConnectionId, T> connections;

private final AtomicInteger callIdCnt = new AtomicInteger(0);

Expand Down Expand Up @@ -209,7 +209,7 @@ private void cleanupIdleConnections() {
if (LOG.isTraceEnabled()) {
LOG.trace("Cleanup idle connection to {}", conn.remoteId().address);
}
connections.removeValue(conn.remoteId(), conn);
connections.remove(conn.remoteId(), conn);
conn.cleanupConnection();
}
}
Expand Down Expand Up @@ -294,7 +294,14 @@ private static PoolMap.PoolType getPoolType(Configuration config) {
* @return the maximum pool size
*/
private static int getPoolSize(Configuration config) {
return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
int poolSize = config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);

if (poolSize <= 0) {
LOG.warn("{} must be positive. Using default value: 1", HConstants.HBASE_CLIENT_IPC_POOL_SIZE);
return 1;
} else {
return poolSize;
}
}

private int nextCallId() {
Expand Down Expand Up @@ -350,11 +357,7 @@ private T getConnection(ConnectionId remoteId) throws IOException {
if (!running) {
throw new StoppedRpcClientException();
}
conn = connections.get(remoteId);
if (conn == null) {
conn = createConnection(remoteId);
connections.put(remoteId, conn);
}
conn = connections.getOrCreate(remoteId, () -> createConnection(remoteId));
conn.setLastTouched(EnvironmentEdgeManager.currentTime());
}
return conn;
Expand Down Expand Up @@ -453,7 +456,7 @@ public void cancelConnections(ServerName sn) {
&& remoteId.address.getHostName().equals(sn.getHostname())) {
LOG.info("The server on " + sn.toString() + " is dead - stopping the connection "
+ connection.remoteId);
connections.removeValue(remoteId, connection);
connections.remove(remoteId, connection);
connection.shutdown();
connection.cleanupConnection();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,13 @@ private synchronized boolean waitForWork() {
@Override
public void run() {
if (LOG.isTraceEnabled()) {
LOG.trace(threadName + ": starting, connections " + this.rpcClient.connections.size());
LOG.trace(threadName + ": starting");
}
while (waitForWork()) {
readResponse();
}
if (LOG.isTraceEnabled()) {
LOG.trace(threadName + ": stopped, connections " + this.rpcClient.connections.size());
LOG.trace(threadName + ": stopped");
}
}

Expand Down

0 comments on commit e0ae679

Please sign in to comment.