From 3b624092cf350e9720222b6a4e9da2b17b37684d Mon Sep 17 00:00:00 2001 From: Sunny Jiao <> Date: Fri, 21 Nov 2025 12:06:44 +0800 Subject: [PATCH] fix rpc close leakage --- .../tron/common/application/RpcService.java | 27 ++++++++++-- .../core/services/RpcApiServicesTest.java | 43 +++++++++++++++---- 2 files changed, 58 insertions(+), 12 deletions(-) diff --git a/framework/src/main/java/org/tron/common/application/RpcService.java b/framework/src/main/java/org/tron/common/application/RpcService.java index 4a7b3fa0f9..c398b71ae4 100644 --- a/framework/src/main/java/org/tron/common/application/RpcService.java +++ b/framework/src/main/java/org/tron/common/application/RpcService.java @@ -19,6 +19,7 @@ import io.grpc.netty.NettyServerBuilder; import io.grpc.protobuf.services.ProtoReflectionService; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -34,6 +35,7 @@ public abstract class RpcService extends AbstractService { private Server apiServer; + private ExecutorService executorService; protected String executorName; @Autowired @@ -58,7 +60,24 @@ public void innerStart() throws Exception { @Override public void innerStop() throws Exception { if (this.apiServer != null) { - this.apiServer.shutdown().awaitTermination(5, TimeUnit.SECONDS); + this.apiServer.shutdown(); + try { + if (!this.apiServer.awaitTermination(5, TimeUnit.SECONDS)) { + logger.warn("gRPC server did not shutdown gracefully, forcing shutdown"); + this.apiServer.shutdownNow(); + } + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting for gRPC server shutdown", e); + this.apiServer.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + // Close executor + if (this.executorService != null) { + ExecutorServiceManager.shutdownAndAwaitTermination( + this.executorService, this.executorName); + this.executorService = null; } } @@ -76,9 +95,9 @@ protected NettyServerBuilder initServerBuilder() { NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(this.port); CommonParameter parameter = Args.getInstance(); if (parameter.getRpcThreadNum() > 0) { - serverBuilder = serverBuilder - .executor(ExecutorServiceManager.newFixedThreadPool( - this.executorName, parameter.getRpcThreadNum())); + this.executorService = ExecutorServiceManager.newFixedThreadPool( + this.executorName, parameter.getRpcThreadNum()); + serverBuilder = serverBuilder.executor(this.executorService); } // Set configs from config.conf or default value serverBuilder diff --git a/framework/src/test/java/org/tron/core/services/RpcApiServicesTest.java b/framework/src/test/java/org/tron/core/services/RpcApiServicesTest.java index dbd06fabff..c873613bb9 100644 --- a/framework/src/test/java/org/tron/core/services/RpcApiServicesTest.java +++ b/framework/src/test/java/org/tron/core/services/RpcApiServicesTest.java @@ -11,6 +11,7 @@ import io.grpc.ManagedChannelBuilder; import java.io.IOException; import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.junit.AfterClass; import org.junit.Assert; @@ -53,6 +54,7 @@ import org.tron.common.application.Application; import org.tron.common.application.ApplicationFactory; import org.tron.common.application.TronApplicationContext; +import org.tron.common.es.ExecutorServiceManager; import org.tron.common.utils.ByteArray; import org.tron.common.utils.PublicMethod; import org.tron.common.utils.Sha256Hash; @@ -140,6 +142,9 @@ public class RpcApiServicesTest { private static ByteString ivk; private static ByteString d; + private static ExecutorService executorService; + private static final String executorName = "rpc-test-executor"; + @BeforeClass public static void init() throws IOException { Args.setParam(new String[] {"-d", temporaryFolder.newFolder().toString()}, Constant.TEST_CONF); @@ -163,16 +168,22 @@ public static void init() throws IOException { String pBFTNode = String.format("%s:%d", Constant.LOCAL_HOST, getInstance().getRpcOnPBFTPort()); + executorService = ExecutorServiceManager.newFixedThreadPool( + executorName, 3); + channelFull = ManagedChannelBuilder.forTarget(fullNode) .usePlaintext() + .executor(executorService) .intercept(new TimeoutInterceptor(5000)) .build(); channelPBFT = ManagedChannelBuilder.forTarget(pBFTNode) .usePlaintext() + .executor(executorService) .intercept(new TimeoutInterceptor(5000)) .build(); channelSolidity = ManagedChannelBuilder.forTarget(solidityNode) .usePlaintext() + .executor(executorService) .intercept(new TimeoutInterceptor(5000)) .build(); context = new TronApplicationContext(DefaultConfig.class); @@ -197,19 +208,35 @@ public static void init() throws IOException { @AfterClass public static void destroy() { - if (channelFull != null) { - channelFull.shutdownNow(); - } - if (channelPBFT != null) { - channelPBFT.shutdownNow(); - } - if (channelSolidity != null) { - channelSolidity.shutdownNow(); + shutdownChannel(channelFull); + shutdownChannel(channelPBFT); + shutdownChannel(channelSolidity); + + if (executorService != null) { + ExecutorServiceManager.shutdownAndAwaitTermination( + executorService, executorName); + executorService = null; } + context.close(); Args.clearParam(); } + private static void shutdownChannel(ManagedChannel channel) { + if (channel == null) { + return; + } + try { + channel.shutdown(); + if (!channel.awaitTermination(5, TimeUnit.SECONDS)) { + channel.shutdownNow(); + } + } catch (InterruptedException e) { + channel.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + @Test public void testGetBlockByNum() { NumberMessage message = NumberMessage.newBuilder().setNum(0).build();