Skip to content

Commit

Permalink
apache#779 grpc server support random port
Browse files Browse the repository at this point in the history
  • Loading branch information
xumanbu committed Apr 12, 2023
1 parent cae7cd9 commit cad1903
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 21 deletions.
51 changes: 34 additions & 17 deletions common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,32 @@

import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;

public class GrpcServer implements ServerInterface {

private static final Logger LOG = LoggerFactory.getLogger(GrpcServer.class);

private final Server server;
private Server server;
private final int port;
private int listenPort;
private final ExecutorService pool;
private List<Pair<BindableService, List<ServerInterceptor>>> servicesWithInterceptors;
private GRPCMetrics grpcMetrics;
private RssBaseConf rssConf;

protected GrpcServer(
RssBaseConf conf,
List<Pair<BindableService, List<ServerInterceptor>>> servicesWithInterceptors,
GRPCMetrics grpcMetrics) {
this.port = conf.getInteger(RssBaseConf.RPC_SERVER_PORT);
long maxInboundMessageSize = conf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
this.rssConf = conf;
this.port = rssConf.getInteger(RssBaseConf.RPC_SERVER_PORT);
this.servicesWithInterceptors = servicesWithInterceptors;
this.grpcMetrics = grpcMetrics;

int rpcExecutorSize = conf.getInteger(RssBaseConf.RPC_EXECUTOR_SIZE);
pool = new GrpcThreadPoolExecutor(
rpcExecutorSize,
Expand All @@ -68,12 +76,15 @@ protected GrpcServer(
ThreadUtils.getThreadFactory("Grpc"),
grpcMetrics
);
}

boolean isMetricsEnabled = conf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
private Server buildGrpcServer(int serverPort) {
boolean isMetricsEnabled = rssConf.getBoolean(RssBaseConf.RPC_METRICS_ENABLED);
long maxInboundMessageSize = rssConf.getLong(RssBaseConf.RPC_MESSAGE_MAX_SIZE);
ServerBuilder<?> builder = ServerBuilder
.forPort(port)
.forPort(serverPort)
.executor(pool)
.maxInboundMessageSize((int)maxInboundMessageSize);
.maxInboundMessageSize((int) maxInboundMessageSize);
if (isMetricsEnabled) {
builder.addTransportFilter(new MonitoringServerTransportFilter(grpcMetrics));
}
Expand All @@ -88,7 +99,7 @@ protected GrpcServer(
}
builder.addService(ServerInterceptors.intercept(serviceWithInterceptors.getLeft(), interceptors));
});
this.server = builder.build();
return builder.build();
}

public static class Builder {
Expand Down Expand Up @@ -155,21 +166,27 @@ protected void afterExecute(Runnable r, Throwable t) {
}
}

@Override
public int start() throws IOException {
try {
server.start();
listenPort = server.getPort();
} catch (IOException e) {
ExitUtils.terminate(1, "Fail to start grpc server", e, LOG);
this.listenPort = RssUtils.startServiceOnPort(this,
Constants.GRPC_SERVICE_NAME, port, rssConf);
} catch (Exception e) {
ExitUtils.terminate(1, "Fail to start grpc server on conf port:" + port, e, LOG);
}
LOG.info("Grpc server started, configured port: {}, listening on {}.", port, listenPort);
return port;
return listenPort;
}

@Override
public void startOnPort(int port) {
ExitUtils.terminate(1, "Fail to start grpc server",
new RuntimeException("GRpcServer not implement now"), LOG);
public void startOnPort(int startPort) throws Exception {
this.server = buildGrpcServer(startPort);
try {
server.start();
listenPort = server.getPort();
} catch (Exception e) {
throw e;
}
LOG.info("Grpc server started, configured port: {}, listening on {}.", port, listenPort);
}

public void stop() throws InterruptedException {
Expand All @@ -189,7 +206,7 @@ public void blockUntilShutdown() throws InterruptedException {
}

public int getPort() {
return port <= 0 ? listenPort : port;
return listenPort;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ private Constants() {
public static final double MILLION_SECONDS_PER_SECOND = 1E3D;
public static final String DEVICE_NO_SPACE_ERROR_MESSAGE = "No space left on device";
public static final String NETTY_STREAM_SERVICE_NAME = "netty.rpc.server";
public static final String GRPC_SERVICE_NAME = "grpc.server";
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ public static boolean isServerPortBindCollision(Throwable e) {
} else if (e instanceof Errors.NativeIoException) {
return (e.getMessage() != null && e.getMessage().startsWith("bind() failed: "))
|| isServerPortBindCollision(e.getCause());
} else if (e instanceof IOException) {
return (e.getMessage() != null && e.getMessage().startsWith("Failed to bind to address"))
|| isServerPortBindCollision(e.getCause());
} else {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import static org.junit.jupiter.api.Assertions.assertTrue;

public class ShuffleServerEnableStreamServerTest extends CoordinatorTestBase {
public class ShuffleServerOnRandomPortTest extends CoordinatorTestBase {
@BeforeAll
public static void setupServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
Expand All @@ -38,10 +38,10 @@ public static void setupServers() throws Exception {
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setInteger("rss.server.netty.port", 0);
shuffleServerConf.setInteger("rss.rpc.server.port", 0);
shuffleServerConf.setInteger("rss.random.port.min", 30000);
shuffleServerConf.setInteger("rss.random.port.max", 40000);
createShuffleServer(shuffleServerConf);
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 1);
shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
createShuffleServer(shuffleServerConf);
startServers();
Expand All @@ -58,14 +58,35 @@ public void startStreamServerOnRandomPort() throws Exception {

int maxRetries = 100;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
// start netty server with already bind port
shuffleServerConf.setInteger("rss.server.netty.port", actualPort);
shuffleServerConf.setInteger("rss.jetty.http.port", 18082);
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 2);
shuffleServerConf.setInteger("rss.port.max.retry", maxRetries);
ShuffleServer ss = new ShuffleServer(shuffleServerConf);
ss.start();
assertTrue(ss.getNettyPort() > actualPort && actualPort <= actualPort + maxRetries);
ss.stopServer();
}

@Test
public void startGrpcServerOnRandomPort() throws Exception {
CoordinatorTestUtils.waitForRegister(coordinatorClient, 2);
Thread.sleep(5000);
int actualPort = shuffleServers.get(0).getGrpcPort();
assertTrue(actualPort >= 30000 && actualPort < 40000);
actualPort = shuffleServers.get(1).getGrpcPort();
assertTrue(actualPort >= 30000 && actualPort <= 40000);

int maxRetries = 100;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
// start grpc server with already bind port
shuffleServerConf.setInteger("rss.rpc.server.port", actualPort);
shuffleServerConf.setInteger("rss.jetty.http.port", 18083);
shuffleServerConf.setInteger("rss.port.max.retry", maxRetries);
ShuffleServer ss = new ShuffleServer(shuffleServerConf);
ss.start();
assertTrue(ss.getGrpcPort() > actualPort && actualPort <= actualPort + maxRetries);
ss.stopServer();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public static void main(String[] args) throws Exception {

public void start() throws Exception {
jettyServer.start();
server.start();
grpcPort = server.start();
if (nettyServerEnabled) {
nettyPort = streamServer.start();
}
Expand Down

0 comments on commit cad1903

Please sign in to comment.