Skip to content

Commit

Permalink
Add shortcut methods that create event loops and blocking task execut…
Browse files Browse the repository at this point in the history
…or thread pool (#3602)

Motivation:

It may be useful if a user can create a new event loop or blocking task executor that has the same life cycle with a `ClientFactory` or `Server`:

```java
ClientFactory
  .builder()
  .workerGroup(3)
  .build()

Server
  .builder()
  .workerGroup(3)
  .blockingTaskExecutor(10)
  ...
  .build()
```

Modifications:

- Added `ClientFactoryBuilder.workerGroup(int)`
- Added `ServerBuilder.workerGroup(int)`
- Added `ServerBuilder.blockingTaskExecutor(int)`

Result:

- Closes #3597
  • Loading branch information
kezhenxu94 committed Jun 21, 2021
1 parent cc495bc commit 0fbd1cc
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 17 deletions.
Expand Up @@ -55,6 +55,7 @@
import com.linecorp.armeria.common.Flags;
import com.linecorp.armeria.common.Http1HeaderNaming;
import com.linecorp.armeria.common.Request;
import com.linecorp.armeria.common.util.EventLoopGroups;
import com.linecorp.armeria.internal.common.RequestContextUtil;

import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -131,6 +132,17 @@ public ClientFactoryBuilder workerGroup(EventLoopGroup workerGroup, boolean shut
return this;
}

/**
* Uses a newly created {@link EventLoopGroup} with the specified number of threads for
* performing socket I/O and running {@link Client#execute(ClientRequestContext, Request)}.
* The worker {@link EventLoopGroup} will be shut down when the {@link ClientFactory} is closed.
*
* @param numThreads the number of event loop threads
*/
public ClientFactoryBuilder workerGroup(int numThreads) {
return workerGroup(EventLoopGroups.newEventLoopGroup(numThreads), true);
}

/**
* Sets the factory that creates an {@link EventLoopScheduler} which is responsible for assigning an
* {@link EventLoop} to handle a connection to the specified {@link Endpoint}.
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/java/com/linecorp/armeria/server/ServerBuilder.java
Expand Up @@ -66,6 +66,9 @@
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.common.logging.RequestOnlyLog;
import com.linecorp.armeria.common.util.BlockingTaskExecutor;
import com.linecorp.armeria.common.util.BlockingTaskExecutorBuilder;
import com.linecorp.armeria.common.util.EventLoopGroups;
import com.linecorp.armeria.common.util.SystemInfo;
import com.linecorp.armeria.internal.common.RequestContextUtil;
import com.linecorp.armeria.internal.common.util.ChannelUtil;
Expand Down Expand Up @@ -445,6 +448,19 @@ public ServerBuilder workerGroup(EventLoopGroup workerGroup, boolean shutdownOnS
return this;
}

/**
* Uses a newly created {@link EventLoopGroup} with the specified number of threads for
* performing socket I/O and running {@link Service#serve(ServiceRequestContext, Request)}.
* The worker {@link EventLoopGroup} will be shut down when the {@link Server} stops.
*
* @param numThreads the number of event loop threads
*/
public ServerBuilder workerGroup(int numThreads) {
checkArgument(numThreads >= 0, "numThreads: %s (expected: >= 0)", numThreads);
workerGroup(EventLoopGroups.newEventLoopGroup(numThreads), true);
return this;
}

/**
* Sets the {@link Executor} which will invoke the callbacks of {@link Server#start()},
* {@link Server#stop()} and {@link ServerListener}. If not set, {@link GlobalEventExecutor} will be used
Expand Down Expand Up @@ -719,6 +735,21 @@ public ServerBuilder blockingTaskExecutor(ScheduledExecutorService blockingTaskE
return this;
}

/**
* Uses a newly created {@link BlockingTaskExecutor} with the specified number of threads dedicated to
* the execution of blocking tasks or invocations.
* The {@link BlockingTaskExecutor} will be shut down when the {@link Server} stops.
*
* @param numThreads the number of threads in the executor
*/
public ServerBuilder blockingTaskExecutor(int numThreads) {
checkArgument(numThreads >= 0, "numThreads: %s (expected: >= 0)", numThreads);
final BlockingTaskExecutor executor = BlockingTaskExecutor.builder()
.numThreads(numThreads)
.build();
return blockingTaskExecutor(executor, true);
}

/**
* Sets the {@link MeterRegistry} that collects various stats.
*/
Expand Down
Expand Up @@ -44,7 +44,6 @@
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.logging.ClientConnectionTimings;
import com.linecorp.armeria.common.logging.RequestLogProperty;
import com.linecorp.armeria.common.util.EventLoopGroups;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;

Expand Down Expand Up @@ -122,7 +121,7 @@ public void connectionClosed(SessionProtocol protocol, InetSocketAddress remoteA
@BeforeEach
void setUp() {
clientFactory = ClientFactory.builder()
.workerGroup(EventLoopGroups.newEventLoopGroup(1), true)
.workerGroup(1)
.connectionPoolListener(connectionPoolListenerWrapper)
.build();
}
Expand Down
Expand Up @@ -75,7 +75,6 @@
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.common.stream.AbortedStreamException;
import com.linecorp.armeria.common.util.EventLoopGroups;
import com.linecorp.armeria.internal.testing.AnticipatedException;
import com.linecorp.armeria.server.AbstractHttpService;
import com.linecorp.armeria.server.ServerBuilder;
Expand All @@ -94,7 +93,7 @@ class RetryingClientTest {
@BeforeAll
static void beforeAll() {
// use different eventLoop from server's so that clients don't hang when the eventLoop in server hangs
clientFactory = ClientFactory.builder().workerGroup(EventLoopGroups.newEventLoopGroup(2), true).build();
clientFactory = ClientFactory.builder().workerGroup(2).build();
}

@AfterAll
Expand Down Expand Up @@ -525,7 +524,7 @@ void retryWithContentOnUnprocessedException() {

try (ClientFactory clientFactory = ClientFactory.builder()
.options(RetryingClientTest.clientFactory.options())
.workerGroup(EventLoopGroups.newEventLoopGroup(2), true)
.workerGroup(2)
.connectTimeoutMillis(Long.MAX_VALUE)
.build()) {
final WebClient client = WebClient.builder("http://127.0.0.1:1")
Expand Down Expand Up @@ -573,7 +572,7 @@ void retryWithRequestBody() {
@Test
void shouldGetExceptionWhenFactoryIsClosed() {
final ClientFactory factory =
ClientFactory.builder().workerGroup(EventLoopGroups.newEventLoopGroup(2), true).build();
ClientFactory.builder().workerGroup(2).build();

// Retry after 8000 which is slightly less than responseTimeoutMillis(10000).
final Function<? super HttpClient, RetryingClient> retryingDecorator =
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.linecorp.armeria.common.util.EventLoopGroups;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;

class ConnectionLimitingHandlerIntegrationTest {
Expand All @@ -39,7 +38,7 @@ class ConnectionLimitingHandlerIntegrationTest {
static final ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.workerGroup(EventLoopGroups.newEventLoopGroup(1), true);
sb.workerGroup(1);
sb.maxNumConnections(2);
sb.serviceUnder("/", new AbstractHttpService() {});
}
Expand Down
Expand Up @@ -73,7 +73,6 @@
import com.linecorp.armeria.common.grpc.GrpcSerializationFormats;
import com.linecorp.armeria.common.grpc.protocol.GrpcHeaderNames;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.common.util.EventLoopGroups;
import com.linecorp.armeria.common.util.Exceptions;
import com.linecorp.armeria.grpc.testing.Messages.CompressionType;
import com.linecorp.armeria.grpc.testing.Messages.EchoStatus;
Expand Down Expand Up @@ -138,7 +137,7 @@ class GrpcClientTest {
public static final ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) {
sb.workerGroup(EventLoopGroups.newEventLoopGroup(1), true);
sb.workerGroup(1);
sb.maxRequestLength(MAX_MESSAGE_SIZE);
sb.idleTimeoutMillis(0);
sb.http(0);
Expand Down
Expand Up @@ -80,7 +80,6 @@
import com.linecorp.armeria.common.grpc.protocol.GrpcHeaderNames;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.common.stream.ClosedStreamException;
import com.linecorp.armeria.common.util.EventLoopGroups;
import com.linecorp.armeria.common.util.TimeoutMode;
import com.linecorp.armeria.grpc.testing.Messages.EchoStatus;
import com.linecorp.armeria.grpc.testing.Messages.Payload;
Expand Down Expand Up @@ -393,7 +392,7 @@ public <REQ, RESP> Listener<REQ> interceptCall(ServerCall<REQ, RESP> call, Metad
static final ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.workerGroup(EventLoopGroups.newEventLoopGroup(1), true);
sb.workerGroup(1);
sb.maxRequestLength(0);

sb.service(
Expand Down Expand Up @@ -445,7 +444,7 @@ protected void configure(ServerBuilder sb) throws Exception {
static final ServerExtension serverWithBlockingExecutor = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.workerGroup(EventLoopGroups.newEventLoopGroup(1), true);
sb.workerGroup(1);
sb.maxRequestLength(0);

sb.serviceUnder("/",
Expand All @@ -471,7 +470,7 @@ protected void configure(ServerBuilder sb) throws Exception {
static final ServerExtension serverWithNoMaxMessageSize = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.workerGroup(EventLoopGroups.newEventLoopGroup(1), true);
sb.workerGroup(1);
sb.maxRequestLength(0);

sb.serviceUnder("/",
Expand All @@ -490,7 +489,7 @@ protected void configure(ServerBuilder sb) throws Exception {
static final ServerExtension serverWithLongMaxRequestLimit = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.workerGroup(EventLoopGroups.newEventLoopGroup(1), true);
sb.workerGroup(1);
sb.maxRequestLength(Long.MAX_VALUE);

sb.serviceUnder("/",
Expand Down
Expand Up @@ -54,7 +54,6 @@
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.RpcResponse;
import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.common.util.EventLoopGroups;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.thrift.THttpService;
import com.linecorp.armeria.service.test.thrift.main.DevNullService;
Expand Down Expand Up @@ -242,7 +241,7 @@ void execute_void() throws Exception {
@Test
void shouldGetExceptionWhenFactoryIsClosed() throws Exception {
final ClientFactory factory =
ClientFactory.builder().workerGroup(EventLoopGroups.newEventLoopGroup(2), true).build();
ClientFactory.builder().workerGroup(2).build();

final RetryRuleWithContent<RpcResponse> ruleWithContent =
(ctx, response, cause) -> {
Expand Down

0 comments on commit 0fbd1cc

Please sign in to comment.