Skip to content

Commit

Permalink
[#noissue] Fix grpc it
Browse files Browse the repository at this point in the history
cleanup executor resources in grpc it
  • Loading branch information
koo-taejin committed Nov 12, 2021
1 parent 0c04806 commit cfba4cb
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.MetadataUtils;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -47,40 +48,39 @@ public class HelloWorldSimpleClient implements HelloWorldClient {
private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub;

private final ExecutorService workerExecutor;
private final NioEventLoopGroup eventExecutors;

/**
* Construct client connecting to HelloWorld server at {@code host:port}.
*/
@SuppressWarnings("deprecated")
public HelloWorldSimpleClient(String host, int port) {
this(newChannel(host, port));
this.workerExecutor = Executors.newCachedThreadPool();
this.eventExecutors = new NioEventLoopGroup(CpuUtils.cpuCount() + 5, workerExecutor);

this.channel = newChannel(host, port, eventExecutors);
this.blockingStub = GreeterGrpc.newBlockingStub(channel);
}

private static ManagedChannel newChannel(String host, int port) {
private static ManagedChannel newChannel(String host, int port, NioEventLoopGroup eventExecutors) {
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forAddress(host, port);
BuilderUtils.usePlainText(builder);

if (builder instanceof NettyChannelBuilder) {
ExecutorService workerExecutor = Executors.newCachedThreadPool();
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(CpuUtils.cpuCount() + 5, workerExecutor);
((NettyChannelBuilder) builder).eventLoopGroup(eventExecutors);
}

builder.intercept(MetadataUtils.newCaptureMetadataInterceptor(new AtomicReference<Metadata>(), new AtomicReference<Metadata>()));
return builder.build();
}


/**
* Construct client for accessing HelloWorld server using the existing channel.
*/
HelloWorldSimpleClient(ManagedChannel channel) {
this.channel = channel;
blockingStub = GreeterGrpc.newBlockingStub(channel);
}

@Override
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
Future<?> future = eventExecutors.shutdownGracefully(500, 500, TimeUnit.MILLISECONDS);
future.await(1000);
workerExecutor.shutdownNow();
}

public String greet(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.navercorp.pinpoint.common.util.CpuUtils;
import com.navercorp.pinpoint.pluginit.utils.SocketUtils;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.examples.helloworld.GreeterGrpc;
Expand All @@ -26,6 +27,7 @@
import io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand All @@ -49,7 +51,13 @@ public class HelloWorldSimpleServer implements HelloWorldServer {

private int bindPort;

private ExecutorService workerExecutor;
private final ExecutorService workerExecutor;
private final NioEventLoopGroup eventExecutors;

public HelloWorldSimpleServer() {
this.workerExecutor = Executors.newCachedThreadPool();
this.eventExecutors = new NioEventLoopGroup(CpuUtils.cpuCount() + 5, workerExecutor);
}

@PostConstruct
public void start() throws IOException {
Expand All @@ -58,9 +66,7 @@ public void start() throws IOException {
/* The port on which the server should run */
ServerBuilder<?> serverBuilder = ServerBuilder.forPort(bindPort);
if (serverBuilder instanceof NettyServerBuilder) {
this.workerExecutor = Executors.newCachedThreadPool();
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(CpuUtils.cpuCount() + 5, workerExecutor);
((NettyServerBuilder) serverBuilder).workerEventLoopGroup(eventExecutors);
((NettyServerBuilder) serverBuilder).bossEventLoopGroup(eventExecutors).workerEventLoopGroup(eventExecutors);
}
this.server = serverBuilder
.addService(new GreeterImpl())
Expand All @@ -87,9 +93,10 @@ public void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
if (workerExecutor != null) {
workerExecutor.shutdownNow();
}

Future<?> future = eventExecutors.shutdownGracefully(500, 500, TimeUnit.MILLISECONDS);
future.await(1000);
workerExecutor.shutdownNow();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.MetadataUtils;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;

import java.util.Arrays;
import java.util.Iterator;
Expand All @@ -49,39 +51,40 @@ public class HelloWorldStreamClient implements HelloWorldClient {
private final ManagedChannel channel;
private final StreamingGreeterGrpc.StreamingGreeterStub stub;

private final ExecutorService workerExecutor;
private final NioEventLoopGroup eventExecutors;

/**
* Construct client connecting to HelloWorld server at {@code host:port}.
*/
@SuppressWarnings("deprecated")
public HelloWorldStreamClient(String host, int port) {
this(newChannel(host, port));
this.workerExecutor = Executors.newCachedThreadPool();
this.eventExecutors = new NioEventLoopGroup(CpuUtils.cpuCount() + 5, workerExecutor);

this.channel = newChannel(host, port, eventExecutors);
this.stub = StreamingGreeterGrpc.newStub(channel);
}

private static ManagedChannel newChannel(String host, int port) {
private static ManagedChannel newChannel(String host, int port, EventLoopGroup eventExecutors) {
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forAddress(host, port);
BuilderUtils.usePlainText(builder);

if (builder instanceof NettyChannelBuilder) {
ExecutorService workerExecutor = Executors.newCachedThreadPool();
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(CpuUtils.cpuCount() + 5, workerExecutor);
((NettyChannelBuilder) builder).eventLoopGroup(eventExecutors);
}

builder.intercept(MetadataUtils.newCaptureMetadataInterceptor(new AtomicReference<Metadata>(), new AtomicReference<Metadata>()));
return builder.build();
}

/**
* Construct client for accessing HelloWorld server using the existing channel.
*/
HelloWorldStreamClient(ManagedChannel channel) {
this.channel = channel;
this.stub = StreamingGreeterGrpc.newStub(channel);
}

@Override
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);

Future<?> future = eventExecutors.shutdownGracefully(500, 500, TimeUnit.MILLISECONDS);
future.await(1000);
workerExecutor.shutdownNow();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.navercorp.pinpoint.common.util.CpuUtils;
import com.navercorp.pinpoint.pluginit.utils.SocketUtils;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.Status;
Expand All @@ -28,6 +29,7 @@
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.Future;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand All @@ -52,7 +54,13 @@ public class HelloWorldStreamServer implements HelloWorldServer {

private int bindPort;

private ExecutorService workerExecutor;
private final ExecutorService workerExecutor;
private final NioEventLoopGroup eventExecutors;

public HelloWorldStreamServer() {
this.workerExecutor = Executors.newCachedThreadPool();
this.eventExecutors = new NioEventLoopGroup(CpuUtils.cpuCount() + 5, workerExecutor);
}

@PostConstruct
public void start() throws IOException {
Expand Down Expand Up @@ -153,9 +161,7 @@ public void onCompleted() {

ServerBuilder<?> serverBuilder = ServerBuilder.forPort(bindPort);
if (serverBuilder instanceof NettyServerBuilder) {
this.workerExecutor = Executors.newCachedThreadPool();
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(CpuUtils.cpuCount() + 5, workerExecutor);
((NettyServerBuilder) serverBuilder).workerEventLoopGroup(eventExecutors);
((NettyServerBuilder) serverBuilder).bossEventLoopGroup(eventExecutors).workerEventLoopGroup(eventExecutors);
}
this.server = serverBuilder
.addService(svc)
Expand All @@ -182,9 +188,10 @@ public void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
if (workerExecutor != null) {
workerExecutor.shutdownNow();
}

Future<?> future = eventExecutors.shutdownGracefully(500, 500, TimeUnit.MILLISECONDS);
future.await(1000);
workerExecutor.shutdownNow();
}


Expand Down

0 comments on commit cfba4cb

Please sign in to comment.