diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java index d96ee40ba8..132ec04d06 100644 --- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java +++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java @@ -40,24 +40,32 @@ import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.metrics.GRPCMetrics; +import org.apache.uniffle.common.util.Constants; import org.apache.uniffle.common.util.ExitUtils; +import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.common.util.ThreadUtils; public class GrpcServer implements ServerInterface { private static final Logger LOG = LoggerFactory.getLogger(GrpcServer.class); - private final Server server; + private Server server; private final int port; private int listenPort; private final ExecutorService pool; + private List>> servicesWithInterceptors; + private GRPCMetrics grpcMetrics; + private RssBaseConf rssConf; protected GrpcServer( RssBaseConf conf, List>> servicesWithInterceptors, GRPCMetrics grpcMetrics) { - this.port = conf.getInteger(RssBaseConf.RPC_SERVER_PORT); - long maxInboundMessageSize = conf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE); + this.rssConf = conf; + this.port = rssConf.getInteger(RssBaseConf.RPC_SERVER_PORT); + this.servicesWithInterceptors = servicesWithInterceptors; + this.grpcMetrics = grpcMetrics; + int rpcExecutorSize = conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE); pool = new GrpcThreadPoolExecutor( rpcExecutorSize, @@ -68,12 +76,15 @@ protected GrpcServer( ThreadUtils.getThreadFactory("Grpc"), grpcMetrics ); + } - boolean isMetricsEnabled = conf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED); + private Server buildGrpcServer(int serverPort) { + boolean isMetricsEnabled = rssConf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED); + long maxInboundMessageSize = rssConf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE); ServerBuilder builder = ServerBuilder - .forPort(port) + .forPort(serverPort) .executor(pool) - .maxInboundMessageSize((int)maxInboundMessageSize); + .maxInboundMessageSize((int) maxInboundMessageSize); if (isMetricsEnabled) { builder.addTransportFilter(new MonitoringServerTransportFilter(grpcMetrics)); } @@ -88,7 +99,7 @@ protected GrpcServer( } builder.addService(ServerInterceptors.intercept(serviceWithInterceptors.getLeft(), interceptors)); }); - this.server = builder.build(); + return builder.build(); } public static class Builder { @@ -155,21 +166,27 @@ protected void afterExecute(Runnable r, Throwable t) { } } + @Override public int start() throws IOException { try { - server.start(); - listenPort = server.getPort(); - } catch (IOException e) { - ExitUtils.terminate(1, "Fail to start grpc server", e, LOG); + this.listenPort = RssUtils.startServiceOnPort(this, + Constants.GRPC_SERVICE_NAME, port, rssConf); + } catch (Exception e) { + ExitUtils.terminate(1, "Fail to start grpc server on conf port:" + port, e, LOG); } - LOG.info("Grpc server started, configured port: {}, listening on {}.", port, listenPort); - return port; + return listenPort; } @Override - public void startOnPort(int port) { - ExitUtils.terminate(1, "Fail to start grpc server", - new RuntimeException("GRpcServer not implement now"), LOG); + public void startOnPort(int startPort) throws Exception { + this.server = buildGrpcServer(startPort); + try { + server.start(); + listenPort = server.getPort(); + } catch (Exception e) { + throw e; + } + LOG.info("Grpc server started, configured port: {}, listening on {}.", port, listenPort); } public void stop() throws InterruptedException { @@ -189,7 +206,7 @@ public void blockUntilShutdown() throws InterruptedException { } public int getPort() { - return port <= 0 ? listenPort : port; + return listenPort; } } diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java b/common/src/main/java/org/apache/uniffle/common/util/Constants.java index 5e946866d2..514cf0e7fa 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java +++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java @@ -72,4 +72,5 @@ private Constants() { public static final double MILLION_SECONDS_PER_SECOND = 1E3D; public static final String DEVICE_NO_SPACE_ERROR_MESSAGE = "No space left on device"; public static final String NETTY_STREAM_SERVICE_NAME = "netty.rpc.server"; + public static final String GRPC_SERVICE_NAME = "grpc.server"; } diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java index 888e96fa78..92688083fa 100644 --- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java +++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java @@ -204,6 +204,9 @@ public static boolean isServerPortBindCollision(Throwable e) { } else if (e instanceof Errors.NativeIoException) { return (e.getMessage() != null && e.getMessage().startsWith("bind() failed: ")) || isServerPortBindCollision(e.getCause()); + } else if (e instanceof IOException) { + return (e.getMessage() != null && e.getMessage().startsWith("Failed to bind to address")) + || isServerPortBindCollision(e.getCause()); } else { return false; } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java similarity index 72% rename from integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java rename to integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java index f98af1a4dd..67664a6852 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerEnableStreamServerTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerOnRandomPortTest.java @@ -27,7 +27,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; -public class ShuffleServerEnableStreamServerTest extends CoordinatorTestBase { +public class ShuffleServerOnRandomPortTest extends CoordinatorTestBase { @BeforeAll public static void setupServers() throws Exception { CoordinatorConf coordinatorConf = getCoordinatorConf(); @@ -38,10 +38,10 @@ public static void setupServers() throws Exception { createCoordinatorServer(coordinatorConf); ShuffleServerConf shuffleServerConf = getShuffleServerConf(); shuffleServerConf.setInteger("rss.server.netty.port", 0); + shuffleServerConf.setInteger("rss.rpc.server.port", 0); shuffleServerConf.setInteger("rss.random.port.min", 30000); shuffleServerConf.setInteger("rss.random.port.max", 40000); createShuffleServer(shuffleServerConf); - shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 1); shuffleServerConf.setInteger("rss.jetty.http.port", 18081); createShuffleServer(shuffleServerConf); startServers(); @@ -58,9 +58,9 @@ public void startStreamServerOnRandomPort() throws Exception { int maxRetries = 100; ShuffleServerConf shuffleServerConf = getShuffleServerConf(); + // start netty server with already bind port shuffleServerConf.setInteger("rss.server.netty.port", actualPort); shuffleServerConf.setInteger("rss.jetty.http.port", 18082); - shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 2); shuffleServerConf.setInteger("rss.port.max.retry", maxRetries); ShuffleServer ss = new ShuffleServer(shuffleServerConf); ss.start(); @@ -68,4 +68,25 @@ public void startStreamServerOnRandomPort() throws Exception { ss.stopServer(); } + @Test + public void startGrpcServerOnRandomPort() throws Exception { + CoordinatorTestUtils.waitForRegister(coordinatorClient, 2); + Thread.sleep(5000); + int actualPort = shuffleServers.get(0).getGrpcPort(); + assertTrue(actualPort >= 30000 && actualPort < 40000); + actualPort = shuffleServers.get(1).getGrpcPort(); + assertTrue(actualPort >= 30000 && actualPort <= 40000); + + int maxRetries = 100; + ShuffleServerConf shuffleServerConf = getShuffleServerConf(); + // start grpc server with already bind port + shuffleServerConf.setInteger("rss.rpc.server.port", actualPort); + shuffleServerConf.setInteger("rss.jetty.http.port", 18083); + shuffleServerConf.setInteger("rss.port.max.retry", maxRetries); + ShuffleServer ss = new ShuffleServer(shuffleServerConf); + ss.start(); + assertTrue(ss.getGrpcPort() > actualPort && actualPort <= actualPort + maxRetries); + ss.stopServer(); + } + } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java index e713649aa4..c23dbb0a26 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java @@ -126,7 +126,7 @@ public static void main(String[] args) throws Exception { public void start() throws Exception { jettyServer.start(); - server.start(); + grpcPort = server.start(); if (nettyServerEnabled) { nettyPort = streamServer.start(); }