diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/config/AgentBaseDataReceiverConfiguration.java b/collector/src/main/java/com/navercorp/pinpoint/collector/config/AgentBaseDataReceiverConfiguration.java index b982f9461281..2a13b0cf7184 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/config/AgentBaseDataReceiverConfiguration.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/config/AgentBaseDataReceiverConfiguration.java @@ -33,7 +33,8 @@ public final class AgentBaseDataReceiverConfiguration { private static final String WORKER_QUEUE_SIZE = PREFIX + ".worker.queueSize"; private static final String WORKER_MONITOR_ENABLE = PREFIX + ".worker.monitor"; - private static final String GRPC_PREFIX = "collector.receiver.grpc"; + private static final String GRPC_PREFIX = "collector.receiver.grpc.agent"; + private static final String GRPC_ENABLE = GRPC_PREFIX + ".enable"; private static final String GRPC_BIND_IP = GRPC_PREFIX + ".ip"; private static final String GRPC_BIND_PORT = GRPC_PREFIX + ".port"; private static final String GRPC_WORKER_THREAD_SIZE = GRPC_PREFIX + ".worker.threadSize"; @@ -72,7 +73,7 @@ public AgentBaseDataReceiverConfiguration(Properties properties, DeprecatedConfi this.workerMonitorEnable = isWorkerThreadMonitorEnable(properties, deprecatedConfiguration); // gRPC - this.grpcEnable = CollectorConfiguration.readBoolean(properties, GRPC_PREFIX); + this.grpcEnable = CollectorConfiguration.readBoolean(properties, GRPC_ENABLE); this.grpcBindIp = CollectorConfiguration.readString(properties, GRPC_BIND_IP, CollectorConfiguration.DEFAULT_LISTEN_IP); Objects.requireNonNull(grpcBindIp); this.grpcBindPort = CollectorConfiguration.readInt(properties, GRPC_BIND_PORT, 9997); diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/config/SpanReceiverConfiguration.java b/collector/src/main/java/com/navercorp/pinpoint/collector/config/SpanReceiverConfiguration.java index 5adeb7e251bd..347824b83482 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/config/SpanReceiverConfiguration.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/config/SpanReceiverConfiguration.java @@ -27,15 +27,16 @@ public final class SpanReceiverConfiguration implements DataReceiverGroupConfiguration { private static final String PREFIX = "collector.receiver.span"; - - private static final String GRPC_ENABLE = PREFIX + ".grpc"; - private static final String GRPC_BIND_IP = PREFIX + ".grpc.ip"; - private static final String GRPC_BIND_PORT = PREFIX + ".grpc.port"; - private static final String GRPC_WORKER_THREAD_SIZE = PREFIX + ".grpc.worker.threadSize"; - private static final String GRPC_WORKER_QUEUE_SIZE = PREFIX + ".grpc.worker.queueSize"; - private static final String GRPC_WORKER_MONITOR_ENABLE = PREFIX + ".grpc.worker.monitor"; - private static final String GRPC_KEEP_ALIVE_TIME = PREFIX + ".grpc.keepalive.time"; - private static final String GRPC_KEEP_ALIVE_TIMEOUT = PREFIX + ".grpc.keepalive.timeout"; + private static final String GRPC_PREFIX = "collector.receiver.grpc.span"; + + private static final String GRPC_ENABLE = GRPC_PREFIX + ".enable"; + private static final String GRPC_BIND_IP = GRPC_PREFIX + ".ip"; + private static final String GRPC_BIND_PORT = GRPC_PREFIX + ".port"; + private static final String GRPC_WORKER_THREAD_SIZE = GRPC_PREFIX + ".worker.threadSize"; + private static final String GRPC_WORKER_QUEUE_SIZE = GRPC_PREFIX + ".worker.queueSize"; + private static final String GRPC_WORKER_MONITOR_ENABLE = GRPC_PREFIX + ".worker.monitor"; + private static final String GRPC_KEEP_ALIVE_TIME = GRPC_PREFIX + ".keepalive.time"; + private static final String GRPC_KEEP_ALIVE_TIMEOUT = GRPC_PREFIX + ".keepalive.timeout"; private static final String TCP_ENABLE = PREFIX + ".tcp"; private static final String TCP_BIND_IP = PREFIX + ".tcp.ip"; diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/config/StatReceiverConfiguration.java b/collector/src/main/java/com/navercorp/pinpoint/collector/config/StatReceiverConfiguration.java index 8a88a517401d..775d30deafcd 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/config/StatReceiverConfiguration.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/config/StatReceiverConfiguration.java @@ -27,15 +27,16 @@ public final class StatReceiverConfiguration implements DataReceiverGroupConfiguration { private static final String PREFIX = "collector.receiver.stat"; - - private static final String GRPC_ENABLE = PREFIX + ".grpc"; - private static final String GRPC_BIND_IP = PREFIX + ".grpc.ip"; - private static final String GRPC_BIND_PORT = PREFIX + ".grpc.port"; - private static final String GRPC_WORKER_THREAD_SIZE = PREFIX + ".grpc.worker.threadSize"; - private static final String GRPC_WORKER_QUEUE_SIZE = PREFIX + ".grpc.worker.queueSize"; - private static final String GRPC_WORKER_MONITOR_ENABLE = PREFIX + ".grpc.worker.monitor"; - private static final String GRPC_KEEP_ALIVE_TIME = PREFIX + ".grpc.keepalive.time"; - private static final String GRPC_KEEP_ALIVE_TIMEOUT = PREFIX + ".grpc.keepalive.timeout"; + private static final String GRPC_PREFIX = "collector.receiver.grpc.stat"; + + private static final String GRPC_ENABLE = GRPC_PREFIX + ".enable"; + private static final String GRPC_BIND_IP = GRPC_PREFIX + ".ip"; + private static final String GRPC_BIND_PORT = GRPC_PREFIX + ".port"; + private static final String GRPC_WORKER_THREAD_SIZE = GRPC_PREFIX + ".worker.threadSize"; + private static final String GRPC_WORKER_QUEUE_SIZE = GRPC_PREFIX + ".worker.queueSize"; + private static final String GRPC_WORKER_MONITOR_ENABLE = GRPC_PREFIX + ".worker.monitor"; + private static final String GRPC_KEEP_ALIVE_TIME = GRPC_PREFIX + ".keepalive.time"; + private static final String GRPC_KEEP_ALIVE_TIMEOUT = GRPC_PREFIX + ".keepalive.timeout"; private static final String TCP_ENABLE = PREFIX + ".tcp"; private static final String TCP_BIND_IP = PREFIX + ".tcp.ip"; diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java b/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java index 2d30549a0658..b5231aa0eefc 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/receiver/grpc/GrpcReceiver.java @@ -74,8 +74,9 @@ public void afterPropertiesSet() throws Exception { Assert.requireNonNull(this.bindIp, "bindIp must not be null"); Assert.requireNonNull(this.addressFilter, "addressFilter must not be null"); Assert.isTrue(CollectionUtils.hasLength(this.serviceList), "serviceList must not be empty"); + Assert.requireNonNull(this.serverOption, "serverOption must not be null"); - this.serverFactory = new ServerFactory(beanName, this.bindIp, this.bindPort, this.executor); + this.serverFactory = new ServerFactory(beanName, this.bindIp, this.bindPort, this.executor, serverOption); ServerTransportFilter permissionServerTransportFilter = new PermissionServerTransportFilter(addressFilter); this.serverFactory.addTransportFilter(permissionServerTransportFilter); diff --git a/collector/src/main/resources/applicationContext-collector.xml b/collector/src/main/resources/applicationContext-collector.xml index 096804718748..d3bc0d653674 100644 --- a/collector/src/main/resources/applicationContext-collector.xml +++ b/collector/src/main/resources/applicationContext-collector.xml @@ -444,7 +444,7 @@ - + @@ -456,7 +456,7 @@ - + @@ -523,7 +523,7 @@ - + @@ -554,7 +554,7 @@ - + @@ -577,6 +577,7 @@ + diff --git a/collector/src/main/resources/pinpoint-collector.properties b/collector/src/main/resources/pinpoint-collector.properties index 6d4090a4647d..ba9ace62421a 100644 --- a/collector/src/main/resources/pinpoint-collector.properties +++ b/collector/src/main/resources/pinpoint-collector.properties @@ -115,48 +115,49 @@ flink.cluster.enable=false flink.cluster.zookeeper.address=localhost flink.cluster.zookeeper.sessiontimeout=3000 -collector.receiver.grpc=false -collector.receiver.grpc.ip=0.0.0.0 -collector.receiver.grpc.port=9997 +# gRPC +collector.receiver.grpc.agent.enable=false +collector.receiver.grpc.agent.ip=0.0.0.0 +collector.receiver.grpc.agent.port=9997 # number of tcp worker threads -collector.receiver.grpc.worker.threadSize=8 +collector.receiver.grpc.agent.worker.threadSize=8 # capacity of tcp worker queue -collector.receiver.grpc.worker.queueSize=1024 +collector.receiver.grpc.agent.worker.queueSize=1024 # monitoring for tcp worker -collector.receiver.grpc.worker.monitor=true +collector.receiver.grpc.agent.worker.monitor=true # Milliseconds, Default 5 min # As collector.receiver.grpc.ping.interval -collector.receiver.grpc.keepalive.time=300000 +collector.receiver.grpc.agent.keepalive.time=300000 # Milliseconds, Default 30 min # As collector.receiver.grpc.pingwait.timeout -collector.receiver.grpc.keepalive.timeout=1800000 +collector.receiver.grpc.agent.keepalive.timeout=1800000 -collector.receiver.span.grpc=false -collector.receiver.span.grpc.ip=0.0.0.0 -collector.receiver.span.grpc.port=9998 +collector.receiver.grpc.span.enable=false +collector.receiver.grpc.span.ip=0.0.0.0 +collector.receiver.grpc.span.port=9998 # Milliseconds, Default 5 min -collector.receiver.span.grpc.keepalive.time=300000 +collector.receiver.grpc.span.keepalive.time=300000 # Milliseconds, Default 30 min -collector.receiver.span.grpc.keepalive.timeout=1800000 +collector.receiver.grpc.span.keepalive.timeout=1800000 # number of span worker threads -collector.receiver.span.grpc.worker.threadSize=32 +collector.receiver.grpc.span.worker.threadSize=32 # capacity of span worker queue -collector.receiver.span.grpc.worker.queueSize=256 +collector.receiver.grpc.span.worker.queueSize=256 # monitoring for span worker -collector.receiver.span.grpc.worker.monitor=true +collector.receiver.grpc.span.worker.monitor=true -collector.receiver.stat.grpc=false -collector.receiver.stat.grpc.ip=0.0.0.0 -collector.receiver.stat.grpc.port=9999 +collector.receiver.grpc.stat.enable=false +collector.receiver.grpc.stat.ip=0.0.0.0 +collector.receiver.grpc.stat.port=9999 # Milliseconds, Default 5 min -collector.receiver.stat.grpc.keepalive.time=300000 +collector.receiver.grpc.stat.keepalive.time=300000 # Milliseconds, Default 30 min -collector.receiver.stat.grpc.keepalive.timeout=1800000 +collector.receiver.grpc.stat.keepalive.timeout=1800000 # number of span worker threads -collector.receiver.stat.grpc.worker.threadSize=32 +collector.receiver.grpc.stat.worker.threadSize=32 # capacity of span worker queue -collector.receiver.stat.grpc.worker.queueSize=256 +collector.receiver.grpc.stat.worker.queueSize=256 # monitoring for span worker -collector.receiver.stat.grpc.worker.monitor=true +collector.receiver.grpc.stat.worker.monitor=true collector.receiver.channel.properties.key= diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/config/AgentBaseDataReceiverConfigurationTest.java b/collector/src/test/java/com/navercorp/pinpoint/collector/config/AgentBaseDataReceiverConfigurationTest.java index aa4aec8d4e17..3101ee145392 100644 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/config/AgentBaseDataReceiverConfigurationTest.java +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/config/AgentBaseDataReceiverConfigurationTest.java @@ -28,14 +28,14 @@ public class AgentBaseDataReceiverConfigurationTest { @Test public void properties() throws Exception { Properties properties = new Properties(); - properties.setProperty("collector.receiver.grpc", ""); - properties.setProperty("collector.receiver.grpc.ip", "9.9.9.9"); - properties.setProperty("collector.receiver.grpc.port", "1111"); - properties.setProperty("collector.receiver.grpc.worker.threadSize", "99"); - properties.setProperty("collector.receiver.grpc.worker.queueSize", "9999"); - properties.setProperty("collector.receiver.grpc.worker.monitor", "false"); - properties.setProperty("collector.receiver.grpc.keepalive.time", "3"); - properties.setProperty("collector.receiver.grpc.keepalive.timeout", "7"); + properties.setProperty("collector.receiver.grpc.agent", ""); + properties.setProperty("collector.receiver.grpc.agent.ip", "9.9.9.9"); + properties.setProperty("collector.receiver.grpc.agent.port", "1111"); + properties.setProperty("collector.receiver.grpc.agent.worker.threadSize", "99"); + properties.setProperty("collector.receiver.grpc.agent.worker.queueSize", "9999"); + properties.setProperty("collector.receiver.grpc.agent.worker.monitor", "false"); + properties.setProperty("collector.receiver.grpc.agent.keepalive.time", "3"); + properties.setProperty("collector.receiver.grpc.agent.keepalive.timeout", "7"); AgentBaseDataReceiverConfiguration configuration = new AgentBaseDataReceiverConfiguration(properties, new DeprecatedConfiguration()); assertEquals(Boolean.FALSE, configuration.isGrpcEnable()); diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/config/SpanReceiverConfigurationTest.java b/collector/src/test/java/com/navercorp/pinpoint/collector/config/SpanReceiverConfigurationTest.java index d8f001ef6a3a..f29c729aab18 100644 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/config/SpanReceiverConfigurationTest.java +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/config/SpanReceiverConfigurationTest.java @@ -28,14 +28,14 @@ public class SpanReceiverConfigurationTest { @Test public void properties() throws Exception { Properties properties = new Properties(); - properties.setProperty("collector.receiver.span.grpc", ""); - properties.setProperty("collector.receiver.span.grpc.ip", "9.9.9.9"); - properties.setProperty("collector.receiver.span.grpc.port", "1111"); - properties.setProperty("collector.receiver.span.grpc.worker.threadSize", "99"); - properties.setProperty("collector.receiver.span.grpc.worker.queueSize", "9999"); - properties.setProperty("collector.receiver.span.grpc.worker.monitor", "false"); - properties.setProperty("collector.receiver.span.grpc.keepalive.time", "3"); - properties.setProperty("collector.receiver.span.grpc.keepalive.timeout", "7"); + properties.setProperty("collector.receiver.grpc.span.enable", ""); + properties.setProperty("collector.receiver.grpc.span.ip", "9.9.9.9"); + properties.setProperty("collector.receiver.grpc.span.port", "1111"); + properties.setProperty("collector.receiver.grpc.span.worker.threadSize", "99"); + properties.setProperty("collector.receiver.grpc.span.worker.queueSize", "9999"); + properties.setProperty("collector.receiver.grpc.span.worker.monitor", "false"); + properties.setProperty("collector.receiver.grpc.span.keepalive.time", "3"); + properties.setProperty("collector.receiver.grpc.span.keepalive.timeout", "7"); SpanReceiverConfiguration configuration = new SpanReceiverConfiguration(properties, new DeprecatedConfiguration()); assertEquals(Boolean.FALSE, configuration.isGrpcEnable()); diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/config/StatReceiverConfigurationTest.java b/collector/src/test/java/com/navercorp/pinpoint/collector/config/StatReceiverConfigurationTest.java index 30298ddb55c3..42c7cc34eed1 100644 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/config/StatReceiverConfigurationTest.java +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/config/StatReceiverConfigurationTest.java @@ -27,14 +27,14 @@ public class StatReceiverConfigurationTest { @Test public void properties() throws Exception { Properties properties = new Properties(); - properties.setProperty("collector.receiver.stat.grpc", ""); - properties.setProperty("collector.receiver.stat.grpc.ip", "9.9.9.9"); - properties.setProperty("collector.receiver.stat.grpc.port", "1111"); - properties.setProperty("collector.receiver.stat.grpc.worker.threadSize", "99"); - properties.setProperty("collector.receiver.stat.grpc.worker.queueSize", "9999"); - properties.setProperty("collector.receiver.stat.grpc.worker.monitor", "false"); - properties.setProperty("collector.receiver.stat.grpc.keepalive.time", "3"); - properties.setProperty("collector.receiver.stat.grpc.keepalive.timeout", "7"); + properties.setProperty("collector.receiver.grpc.stat.enable", ""); + properties.setProperty("collector.receiver.grpc.stat.ip", "9.9.9.9"); + properties.setProperty("collector.receiver.grpc.stat.port", "1111"); + properties.setProperty("collector.receiver.grpc.stat.worker.threadSize", "99"); + properties.setProperty("collector.receiver.grpc.stat.worker.queueSize", "9999"); + properties.setProperty("collector.receiver.grpc.stat.worker.monitor", "false"); + properties.setProperty("collector.receiver.grpc.stat.keepalive.time", "3"); + properties.setProperty("collector.receiver.grpc.stat.keepalive.timeout", "7"); StatReceiverConfiguration configuration = new StatReceiverConfiguration(properties, new DeprecatedConfiguration()); assertEquals(Boolean.FALSE, configuration.isGrpcEnable()); diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentClientMock.java b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentClientMock.java index fae88ba18346..829b6d1ffbf9 100644 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentClientMock.java +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentClientMock.java @@ -26,6 +26,7 @@ import com.navercorp.pinpoint.grpc.trace.PSqlMetaData; import com.navercorp.pinpoint.grpc.trace.PStringMetaData; import io.grpc.Attributes; +import io.grpc.CallOptions; import io.grpc.ClientInterceptor; import io.grpc.ConnectivityState; import io.grpc.ConnectivityStateInfo; @@ -34,6 +35,7 @@ import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Status; +import io.grpc.netty.InternalNettyChannelBuilder; import io.grpc.netty.NettyChannelBuilder; import io.grpc.stub.MetadataUtils; import io.grpc.stub.StreamObserver; @@ -70,7 +72,6 @@ public AgentClientMock(final String host, final int port, final boolean agentHea builder.intercept(headersInterceptor); } builder.usePlaintext(); - channel = builder.build(); this.agentStub = AgentGrpc.newBlockingStub(channel); this.metadataStub = MetadataGrpc.newBlockingStub(channel); diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentServerTestMain.java b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentServerTestMain.java index a65dcd97a1cb..c52e71a2b52a 100644 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentServerTestMain.java +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/AgentServerTestMain.java @@ -19,10 +19,12 @@ import com.navercorp.pinpoint.collector.receiver.DispatchHandler; import com.navercorp.pinpoint.collector.receiver.grpc.service.AgentService; import com.navercorp.pinpoint.common.server.util.AddressFilter; +import com.navercorp.pinpoint.grpc.server.ServerOption; import com.navercorp.pinpoint.grpc.trace.PResult; import com.navercorp.pinpoint.io.request.ServerRequest; import com.navercorp.pinpoint.io.request.ServerResponse; import io.grpc.BindableService; +import io.grpc.Status; import java.net.InetAddress; import java.util.Arrays; @@ -47,6 +49,7 @@ public void run() throws Exception { grpcReceiver.setBindableServiceList(Arrays.asList(agentService)); grpcReceiver.setAddressFilter(new MockAddressFilter()); grpcReceiver.setExecutor(Executors.newFixedThreadPool(8)); + grpcReceiver.setServerOption(new ServerOption.Builder().build()); grpcReceiver.afterPropertiesSet(); diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanClientMock.java b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanClientMock.java index 5bfcf932022c..ed949abdc8b2 100644 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanClientMock.java +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanClientMock.java @@ -23,14 +23,20 @@ import com.navercorp.pinpoint.grpc.trace.PSpanChunk; import com.navercorp.pinpoint.grpc.trace.SpanGrpc; import io.grpc.ClientInterceptor; +import io.grpc.LoadBalancer; import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.netty.NettyChannelBuilder; import io.grpc.stub.MetadataUtils; import io.grpc.stub.StreamObserver; +import io.grpc.util.RoundRobinLoadBalancerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class SpanClientMock { @@ -50,6 +56,7 @@ public SpanClientMock(final String host, final int port) throws Exception { channel = builder.build(); this.spanStub = SpanGrpc.newStub(channel); + logger.info("CallOptions={}, channel={}", spanStub.getCallOptions(), spanStub.getChannel()); } public void stop() throws InterruptedException { @@ -64,14 +71,27 @@ public void span() { span(1); } + ExecutorService service = Executors.newFixedThreadPool(1); + public void span(int count) { - StreamObserver responseObserver = getResponseObserver(); - StreamObserver requestObserver = spanStub.sendSpan(responseObserver); - for (int i = 0; i < count; i++) { - final PSpan span = PSpan.newBuilder().build(); - requestObserver.onNext(span); - } - requestObserver.onCompleted(); + service.execute(new Runnable() { + @Override + public void run() { + StreamObserver responseObserver = getResponseObserver(); + + StreamObserver requestObserver = spanStub.sendSpan(responseObserver); + for (int i = 0; i < count; i++) { + final PSpan span = PSpan.newBuilder().build(); + requestObserver.onNext(span); + try { + TimeUnit.SECONDS.sleep(1); + + } catch (InterruptedException e) { + } + } + requestObserver.onCompleted(); + } + }); } public void spanChunk() { @@ -79,14 +99,25 @@ public void spanChunk() { } public void spanChunk(final int count) { - StreamObserver responseObserver = getResponseObserver(); - - StreamObserver requestObserver = spanStub.sendSpanChunk(responseObserver); - for (int i = 0; i < count; i++) { - final PSpanChunk spanChunk = PSpanChunk.newBuilder().build(); - requestObserver.onNext(spanChunk); - } - requestObserver.onCompleted(); + service.execute(new Runnable() { + @Override + public void run() { + + StreamObserver responseObserver = getResponseObserver(); + + StreamObserver requestObserver = spanStub.sendSpanChunk(responseObserver); + for (int i = 0; i < count; i++) { + final PSpanChunk spanChunk = PSpanChunk.newBuilder().build(); + requestObserver.onNext(spanChunk); + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + } + } + requestObserver.onCompleted(); + } + }); + } private StreamObserver getResponseObserver() { diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanClientTestMain.java b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanClientTestMain.java index a5649007b8e7..d7fd7bfee4b1 100644 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanClientTestMain.java +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanClientTestMain.java @@ -22,10 +22,13 @@ public class SpanClientTestMain { private static final int MAX = 100000; public static void main(String[] args) throws Exception { - SpanClientMock clientMock = new SpanClientMock("0.0.0.0", 9998); - clientMock.span(1); + SpanClientMock clientMock = new SpanClientMock("localhost", 9998); + TimeUnit.SECONDS.sleep(10); - clientMock.spanChunk(1); + clientMock.span(10); + clientMock.span(10); + clientMock.spanChunk(10); + clientMock.spanChunk(10); TimeUnit.SECONDS.sleep(60); clientMock.stop(); diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanServerTestMain.java b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanServerTestMain.java index 2bb5c7c566c2..01c0e461a8f2 100644 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanServerTestMain.java +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/SpanServerTestMain.java @@ -19,6 +19,7 @@ import com.navercorp.pinpoint.collector.receiver.DispatchHandler; import com.navercorp.pinpoint.collector.receiver.grpc.service.SpanService; import com.navercorp.pinpoint.common.server.util.AddressFilter; +import com.navercorp.pinpoint.grpc.server.ServerOption; import com.navercorp.pinpoint.grpc.trace.PResult; import com.navercorp.pinpoint.io.request.ServerRequest; import com.navercorp.pinpoint.io.request.ServerResponse; @@ -56,6 +57,7 @@ public void run() throws Exception { grpcReceiver.setAddressFilter(new MockAddressFilter()); grpcReceiver.setExecutor(Executors.newFixedThreadPool(8)); grpcReceiver.setEnable(true); + grpcReceiver.setServerOption(new ServerOption.Builder().build()); grpcReceiver.afterPropertiesSet(); diff --git a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/StatServerTestMain.java b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/StatServerTestMain.java index 4ac479426d96..51122bd0b72b 100644 --- a/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/StatServerTestMain.java +++ b/collector/src/test/java/com/navercorp/pinpoint/collector/receiver/grpc/StatServerTestMain.java @@ -19,6 +19,7 @@ import com.navercorp.pinpoint.collector.receiver.DispatchHandler; import com.navercorp.pinpoint.collector.receiver.grpc.service.StatService; import com.navercorp.pinpoint.common.server.util.AddressFilter; +import com.navercorp.pinpoint.grpc.server.ServerOption; import com.navercorp.pinpoint.grpc.trace.PResult; import com.navercorp.pinpoint.io.request.ServerRequest; import com.navercorp.pinpoint.io.request.ServerResponse; @@ -43,6 +44,7 @@ public void run() throws Exception { grpcReceiver.setAddressFilter(new MockAddressFilter()); grpcReceiver.setExecutor(Executors.newFixedThreadPool(8)); grpcReceiver.setEnable(true); + grpcReceiver.setServerOption(new ServerOption.Builder().build()); grpcReceiver.afterPropertiesSet(); diff --git a/commons/src/main/java/com/navercorp/pinpoint/common/util/PinpointThreadFactory.java b/commons/src/main/java/com/navercorp/pinpoint/common/util/PinpointThreadFactory.java index c56dee340e0a..84c2b21e5af3 100644 --- a/commons/src/main/java/com/navercorp/pinpoint/common/util/PinpointThreadFactory.java +++ b/commons/src/main/java/com/navercorp/pinpoint/common/util/PinpointThreadFactory.java @@ -23,7 +23,7 @@ * @author emeroad */ public class PinpointThreadFactory implements ThreadFactory { - + public static final String DEFAULT_THREAD_NAME_PREFIX = "Pinpoint-"; private final static AtomicInteger FACTORY_NUMBER = new AtomicInteger(0); private final AtomicInteger threadNumber = new AtomicInteger(0); diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ChannelFactory.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ChannelFactory.java index ff36823d78c0..8cebcf171aa1 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ChannelFactory.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ChannelFactory.java @@ -29,6 +29,8 @@ import io.grpc.netty.InternalNettyChannelBuilder; import io.grpc.netty.NettyChannelBuilder; import io.grpc.stub.MetadataUtils; +import io.netty.channel.ChannelOption; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.Future; import org.slf4j.Logger; @@ -47,19 +49,15 @@ * @author Woonduk Kang(emeroad) */ public class ChannelFactory { - private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final String name; private final HeaderFactory headerFactory; - private final NioEventLoopGroup eventLoopGroup; - private final ExecutorService eventLoopExecutor; - private final ExecutorService executorService; - private final NameResolverProvider nameResolverProvider; + private final ClientOption clientOption; private final List clientInterceptorList; @@ -68,28 +66,33 @@ public ChannelFactory(ChannelFactoryOption option) { this.headerFactory = option.getHeaderFactory(); - this.eventLoopExecutor = newCachedExecutorService(name + "-eventLoop"); + this.eventLoopExecutor = newCachedExecutorService(name + "-Channel-Worker"); this.eventLoopGroup = newEventLoopGroup(eventLoopExecutor); - this.executorService = newExecutorService(name + "-executor", option.getExecutorQueueSize()); + this.executorService = newExecutorService(name + "-Channel-Executor", option.getExecutorQueueSize()); this.nameResolverProvider = option.getNameResolverProvider(); - this.clientInterceptorList = Assert.requireNonNull(option.getClientInterceptorList(), "clientInterceptorList"); + this.clientInterceptorList = Assert.requireNonNull(option.getClientInterceptorList(), "clientInterceptorList must not be null"); + this.clientOption = option.getClientOption(); + } + + private ExecutorService newCachedExecutorService(String name) { + ThreadFactory threadFactory = new PinpointThreadFactory(PinpointThreadFactory.DEFAULT_THREAD_NAME_PREFIX + name, true); + return Executors.newCachedThreadPool(threadFactory); + } + + private NioEventLoopGroup newEventLoopGroup(ExecutorService executorService) { + return new NioEventLoopGroup(1, executorService); } private ExecutorService newExecutorService(String name, int executorQueueSize) { - ThreadFactory threadFactory = new PinpointThreadFactory(name, true); + ThreadFactory threadFactory = new PinpointThreadFactory(PinpointThreadFactory.DEFAULT_THREAD_NAME_PREFIX + name, true); BlockingQueue workQueue = new LinkedBlockingQueue(executorQueueSize); return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory); } - private ExecutorService newCachedExecutorService(String name) { - ThreadFactory threadFactory = new PinpointThreadFactory(name, true); - return Executors.newCachedThreadPool(threadFactory); - } - public ManagedChannel build(String channelName, String host, int port) { final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port); channelBuilder.usePlaintext(); @@ -101,20 +104,17 @@ public ManagedChannel build(String channelName, String host, int port) { channelBuilder.executor(executorService); if (this.nameResolverProvider != null) { - logger.info("setNameResolverProvider:{}", this.nameResolverProvider); + logger.info("Set nameResolverProvider {}. channelName={}, host={}, port={}", this.nameResolverProvider, channelName, host, port); channelBuilder.nameResolverFactory(this.nameResolverProvider); } + setupClientOption(channelBuilder); + final ManagedChannel channel = channelBuilder.build(); setChannelStateNotifier(channel, channelName); return channel; } - - private NioEventLoopGroup newEventLoopGroup(ExecutorService executorService) { - return new NioEventLoopGroup(1, executorService); - } - private void setupInternal(NettyChannelBuilder channelBuilder) { InternalNettyChannelBuilder.setStatsEnabled(channelBuilder, false); InternalNettyChannelBuilder.setTracingEnabled(channelBuilder, false); @@ -134,6 +134,22 @@ private void addClientInterceptor(NettyChannelBuilder channelBuilder) { channelBuilder.intercept(clientInterceptorList); } + private void setupClientOption(final NettyChannelBuilder channelBuilder) { + channelBuilder.keepAliveTime(clientOption.getKeepAliveTime(), TimeUnit.MILLISECONDS); + channelBuilder.keepAliveTimeout(clientOption.getKeepAliveTimeout(), TimeUnit.MILLISECONDS); + channelBuilder.keepAliveWithoutCalls(clientOption.isKeepAliveWithoutCalls()); + channelBuilder.maxHeaderListSize(clientOption.getMaxHeaderListSize()); + channelBuilder.maxInboundMessageSize(clientOption.getMaxInboundMessageSize()); + + // ChannelOption + channelBuilder.withOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientOption.getConnectTimeout()); + final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(clientOption.getWriteBufferLowWaterMark(), clientOption.getWriteBufferHighWaterMark()); + channelBuilder.withOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark); + if (logger.isInfoEnabled()) { + logger.info("Set clientOption {}. name={}", clientOption, name); + } + } + private void setChannelStateNotifier(ManagedChannel channel, final String name) { if (logger.isDebugEnabled()) { logger.debug("setChannelStateNotifier()"); @@ -173,8 +189,6 @@ public void run() { if (logger.isDebugEnabled()) { logger.debug("getState(){}", state); } - - } public void close() { @@ -188,5 +202,4 @@ public void close() { ExecutorUtils.shutdownExecutorService(name + "-eventLoopExecutor", eventLoopExecutor); ExecutorUtils.shutdownExecutorService(name + "-executorService", executorService); } - -} +} \ No newline at end of file diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ChannelFactoryOption.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ChannelFactoryOption.java index b88dd0923c2c..e55b9a7ae2f1 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ChannelFactoryOption.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ChannelFactoryOption.java @@ -41,6 +41,8 @@ public class ChannelFactoryOption { private final List clientInterceptorList; + private final ClientOption clientOption; + public String getName() { return name; } @@ -62,7 +64,11 @@ public List getClientInterceptorList() { return clientInterceptorList; } - private ChannelFactoryOption(String name, int executorQueueSize, HeaderFactory headerFactory, NameResolverProvider nameResolverProvider, List clientInterceptorList) { + public ClientOption getClientOption() { + return clientOption; + } + + private ChannelFactoryOption(String name, int executorQueueSize, HeaderFactory headerFactory, NameResolverProvider nameResolverProvider, List clientInterceptorList, ClientOption clientOption) { this.name = Assert.requireNonNull(name, "name must not be null"); Assert.isTrue(executorQueueSize > 0, "must be `executorQueueSize > 0`"); @@ -73,17 +79,20 @@ private ChannelFactoryOption(String name, int executorQueueSize, HeaderFactory h this.nameResolverProvider = nameResolverProvider; this.clientInterceptorList = Assert.requireNonNull(clientInterceptorList, "clientInterceptorList must not be null"); + this.clientOption = Assert.requireNonNull(clientOption, "clientOption must not be null"); } @Override public String toString() { - return "ChannelFactoryOption{" + - "name='" + name + '\'' + - ", executorQueueSize=" + executorQueueSize + - ", headerFactory=" + headerFactory + - ", nameResolverProvider=" + nameResolverProvider + - ", clientInterceptorList=" + clientInterceptorList + - '}'; + final StringBuilder sb = new StringBuilder("ChannelFactoryOption{"); + sb.append("name='").append(name).append('\''); + sb.append(", executorQueueSize=").append(executorQueueSize); + sb.append(", headerFactory=").append(headerFactory); + sb.append(", nameResolverProvider=").append(nameResolverProvider); + sb.append(", clientInterceptorList=").append(clientInterceptorList); + sb.append(", clientOption=").append(clientOption); + sb.append('}'); + return sb.toString(); } public static Builder newBuilder() { @@ -97,9 +106,10 @@ public static class Builder { private HeaderFactory headerFactory; private NameResolverProvider nameResolverProvider; private List clientInterceptorList = new ArrayList(); + private ClientOption clientOption = new ClientOption.Builder().build(); public ChannelFactoryOption build() { - final ChannelFactoryOption channelFactoryOption = new ChannelFactoryOption(name, executorQueueSize, headerFactory, nameResolverProvider, clientInterceptorList); + final ChannelFactoryOption channelFactoryOption = new ChannelFactoryOption(name, executorQueueSize, headerFactory, nameResolverProvider, clientInterceptorList, clientOption); return channelFactoryOption; } @@ -124,6 +134,8 @@ public void addClientInterceptor(ClientInterceptor clientInterceptor) { this.clientInterceptorList.add(clientInterceptor); } + public void setClientOption(ClientOption clientOption) { + this.clientOption = clientOption; + } } - -} +} \ No newline at end of file diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ClientOption.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ClientOption.java new file mode 100644 index 000000000000..4e97c1b5181c --- /dev/null +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/client/ClientOption.java @@ -0,0 +1,197 @@ +/* + * Copyright 2019 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.grpc.client; + +import java.util.concurrent.TimeUnit; + +/** + * @author jaehong.kim + */ +public class ClientOption { + public static final long DEFAULT_KEEPALIVE_TIME = TimeUnit.MINUTES.toMillis(5); + public static final long DEFAULT_KEEPALIVE_TIMEOUT = TimeUnit.MINUTES.toMillis(30); + public static final long IDLE_TIMEOUT_MILLIS_DISABLE = -1; + public static final boolean DEFAULT_KEEPALIVE_WITHOUT_CALLS = Boolean.FALSE; + public static final int DEFAULT_MAX_HEADER_LIST_SIZE = 8192; + public static final int DEFAULT_MAX_MESSAGE_SIZE = 4 * 1024 * 1024; + public static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB + + public static final int DEFAULT_CONNECT_TIMEOUT = 3000; + public static final int DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK = 32 * 1024; + public static final int DEFAULT_WRITE_BUFFER_LOW_WATER_MARK = 16 * 1024; + + private final long keepAliveTime; + private final long keepAliveTimeout; + // KeepAliveManager.keepAliveDuringTransportIdle + private final boolean keepAliveWithoutCalls; + private final long idleTimeoutMillis; + private final int maxHeaderListSize; + private final int maxInboundMessageSize; + private final int flowControlWindow; + + // ChannelOption + private final int connectTimeout; + private final int writeBufferHighWaterMark; + private final int writeBufferLowWaterMark; + + private ClientOption(long keepAliveTime, long keepAliveTimeout, boolean keepAliveWithoutCalls, long idleTimeoutMillis, int maxHeaderListSize, int maxInboundMessageSize, int flowControlWindow, int connectTimeout, int writeBufferHighWaterMark, int writeBufferLowWaterMark) { + this.flowControlWindow = flowControlWindow; + this.maxHeaderListSize = maxHeaderListSize; + this.keepAliveTime = keepAliveTime; + this.keepAliveTimeout = keepAliveTimeout; + this.keepAliveWithoutCalls = keepAliveWithoutCalls; + this.idleTimeoutMillis = idleTimeoutMillis; + this.maxInboundMessageSize = maxInboundMessageSize; + this.connectTimeout = connectTimeout; + this.writeBufferHighWaterMark = writeBufferHighWaterMark; + this.writeBufferLowWaterMark = writeBufferLowWaterMark; + } + + public int getFlowControlWindow() { + return flowControlWindow; + } + + public int getMaxHeaderListSize() { + return maxHeaderListSize; + } + + public long getKeepAliveTime() { + return keepAliveTime; + } + + public long getKeepAliveTimeout() { + return keepAliveTimeout; + } + + public boolean isKeepAliveWithoutCalls() { + return keepAliveWithoutCalls; + } + + public long getIdleTimeoutMillis() { + return idleTimeoutMillis; + } + + public int getMaxInboundMessageSize() { + return maxInboundMessageSize; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public int getWriteBufferHighWaterMark() { + return writeBufferHighWaterMark; + } + + public int getWriteBufferLowWaterMark() { + return writeBufferLowWaterMark; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("ClientOption{"); + sb.append("keepAliveTime=").append(keepAliveTime); + sb.append(", keepAliveTimeout=").append(keepAliveTimeout); + sb.append(", keepAliveWithoutCalls=").append(keepAliveWithoutCalls); + sb.append(", idleTimeoutMillis=").append(idleTimeoutMillis); + sb.append(", maxHeaderListSize=").append(maxHeaderListSize); + sb.append(", maxInboundMessageSize=").append(maxInboundMessageSize); + sb.append(", flowControlWindow=").append(flowControlWindow); + sb.append(", connectTimeout=").append(connectTimeout); + sb.append(", writeBufferHighWaterMark=").append(writeBufferHighWaterMark); + sb.append(", writeBufferLowWaterMark=").append(writeBufferLowWaterMark); + sb.append('}'); + return sb.toString(); + } + + public static class Builder { + private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW; + private int maxHeaderListSize = DEFAULT_MAX_HEADER_LIST_SIZE; + private long keepAliveTime = DEFAULT_KEEPALIVE_TIME; + private long keepAliveTimeout = DEFAULT_KEEPALIVE_TIMEOUT; + private boolean keepAliveWithoutCalls = DEFAULT_KEEPALIVE_WITHOUT_CALLS; + + private long idleTimeoutMillis = IDLE_TIMEOUT_MILLIS_DISABLE; + private int maxInboundMessageSize = DEFAULT_MAX_MESSAGE_SIZE; + + private int connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private int writeBufferHighWaterMark = DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK; + private int writeBufferLowWaterMark = DEFAULT_WRITE_BUFFER_LOW_WATER_MARK; + + public ClientOption build() { + final ClientOption clientOption = new ClientOption(keepAliveTime, keepAliveTimeout, keepAliveWithoutCalls, idleTimeoutMillis, maxHeaderListSize, maxInboundMessageSize, flowControlWindow, connectTimeout, writeBufferHighWaterMark, writeBufferLowWaterMark); + return clientOption; + } + + public void setFlowControlWindow(int flowControlWindow) { + this.flowControlWindow = flowControlWindow; + } + + public void setMaxHeaderListSize(int maxHeaderListSize) { + this.maxHeaderListSize = maxHeaderListSize; + } + + public void setKeepAliveTime(long keepAliveTime) { + this.keepAliveTime = keepAliveTime; + } + + public void setKeepAliveTimeout(long keepAliveTimeout) { + this.keepAliveTimeout = keepAliveTimeout; + } + + public void setKeepAliveWithoutCalls(boolean keepAliveWithoutCalls) { + this.keepAliveWithoutCalls = keepAliveWithoutCalls; + } + + public void setIdleTimeoutMillis(long idleTimeoutMillis) { + this.idleTimeoutMillis = idleTimeoutMillis; + } + + public void setMaxInboundMessageSize(int maxInboundMessageSize) { + this.maxInboundMessageSize = maxInboundMessageSize; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + this.writeBufferHighWaterMark = writeBufferHighWaterMark; + } + + public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + this.writeBufferLowWaterMark = writeBufferLowWaterMark; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("Builder{"); + sb.append("flowControlWindow=").append(flowControlWindow); + sb.append(", maxHeaderListSize=").append(maxHeaderListSize); + sb.append(", keepAliveTime=").append(keepAliveTime); + sb.append(", keepAliveTimeout=").append(keepAliveTimeout); + sb.append(", keepAliveWithoutCalls=").append(keepAliveWithoutCalls); + sb.append(", idleTimeoutMillis=").append(idleTimeoutMillis); + sb.append(", maxInboundMessageSize=").append(maxInboundMessageSize); + sb.append(", connectTimeout=").append(connectTimeout); + sb.append(", writeBufferHighWaterMark=").append(writeBufferHighWaterMark); + sb.append(", writeBufferLowWaterMark=").append(writeBufferLowWaterMark); + sb.append('}'); + return sb.toString(); + } + } +} \ No newline at end of file diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ServerFactory.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ServerFactory.java index c89d89e3c65d..eb2826234441 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ServerFactory.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ServerFactory.java @@ -31,6 +31,7 @@ import io.grpc.netty.NettyServerBuilder; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.Future; import org.slf4j.Logger; @@ -69,23 +70,23 @@ public class ServerFactory { private ServerOption serverOption; - public ServerFactory(String name, String hostname, int port, Executor executor) { - this(name, hostname, port, executor, null); - } - public ServerFactory(String name, String hostname, int port, Executor executor, ServerOption serverOption) { this.name = Assert.requireNonNull(name, "name must not be null"); this.hostname = Assert.requireNonNull(hostname, "hostname must not be null"); + this.serverOption = Assert.requireNonNull(serverOption, "serverOption must not be null"); this.port = port; - this.bossExecutor = newExecutor(name + "-boss"); + this.bossExecutor = newExecutor(name + "-Channel-Boss"); this.bossEventLoopGroup = newEventLoopGroup(1, this.bossExecutor); - - this.workerExecutor = newExecutor(name + "-worker"); - this.workerEventLoopGroup = newEventLoopGroup(CpuUtils.workerCount(), bossExecutor); + this.workerExecutor = newExecutor(name + "-Channel-Worker"); + this.workerEventLoopGroup = newEventLoopGroup(CpuUtils.workerCount(), workerExecutor); this.executor = Assert.requireNonNull(executor, "executor must not be null"); - this.serverOption = serverOption; + } + + private ExecutorService newExecutor(String name) { + ThreadFactory threadFactory = new PinpointThreadFactory(PinpointThreadFactory.DEFAULT_THREAD_NAME_PREFIX + name, true); + return Executors.newCachedThreadPool(threadFactory); } private NioEventLoopGroup newEventLoopGroup(int i, ExecutorService executorService) { @@ -93,10 +94,6 @@ private NioEventLoopGroup newEventLoopGroup(int i, ExecutorService executorServi return new NioEventLoopGroup(i, executorService); } - private ExecutorService newExecutor(String name) { - ThreadFactory threadFactory = new PinpointThreadFactory(name + "-executor", true); - return Executors.newCachedThreadPool(threadFactory); - } public void addService(BindableService bindableService) { Assert.requireNonNull(bindableService, "bindableService must not be null"); @@ -127,19 +124,19 @@ public Server build() { setupInternal(serverBuilder); for (Object service : this.bindableServices) { - logger.info("addService {}", service); + logger.info("Add service={}, server={}", service, name); if (service instanceof BindableService) { serverBuilder.addService((BindableService) service); - } else if(service instanceof ServerServiceDefinition) { + } else if (service instanceof ServerServiceDefinition) { serverBuilder.addService((ServerServiceDefinition) service); } } for (ServerTransportFilter transportFilter : this.serverTransportFilters) { - logger.info("addTransportFilter {}", transportFilter); + logger.info("Add transportFilter={}, server={}", transportFilter, name); serverBuilder.addTransportFilter(transportFilter); } for (ServerInterceptor serverInterceptor : this.serverInterceptors) { - logger.info("addIntercept {}", serverInterceptor); + logger.info("Add intercept={}, server={}", serverInterceptor, name); serverBuilder.intercept(serverInterceptor); } @@ -149,6 +146,7 @@ public Server build() { HeaderFactory headerFactory = new AgentHeaderFactory(); ServerInterceptor headerContext = new HeaderPropagationInterceptor(headerFactory, ServerContext.getAgentInfoKey()); serverBuilder.intercept(headerContext); + Server server = serverBuilder.build(); return server; } @@ -162,14 +160,12 @@ private void setupInternal(NettyServerBuilder serverBuilder) { private void setupServerOption(final NettyServerBuilder builder) { // TODO @see PinpointServerAcceptor builder.withChildOption(ChannelOption.TCP_NODELAY, true); - builder.withChildOption(ChannelOption.SO_KEEPALIVE, true); - builder.withChildOption(ChannelOption.SO_SNDBUF, 1024 * 64); - builder.withChildOption(ChannelOption.SO_RCVBUF, 1024 * 64); - - if (this.serverOption == null) { - // Use default - return; - } + builder.withChildOption(ChannelOption.SO_REUSEADDR, true); + builder.withChildOption(ChannelOption.SO_RCVBUF, this.serverOption.getReceiveBufferSize()); + builder.withChildOption(ChannelOption.SO_BACKLOG, this.serverOption.getBacklogQueueSize()); + builder.withChildOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.serverOption.getConnectTimeout()); + final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(this.serverOption.getWriteBufferLowWaterMark(), this.serverOption.getWriteBufferHighWaterMark()); + builder.withChildOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark); builder.handshakeTimeout(this.serverOption.getHandshakeTimeout(), TimeUnit.MILLISECONDS); builder.flowControlWindow(this.serverOption.getFlowControlWindow()); @@ -177,8 +173,8 @@ private void setupServerOption(final NettyServerBuilder builder) { builder.maxInboundMessageSize(this.serverOption.getMaxInboundMessageSize()); builder.maxHeaderListSize(this.serverOption.getMaxHeaderListSize()); - builder.keepAliveTimeout(this.serverOption.getKeepAliveTimeout(), TimeUnit.MILLISECONDS); builder.keepAliveTime(this.serverOption.getKeepAliveTime(), TimeUnit.MILLISECONDS); + builder.keepAliveTimeout(this.serverOption.getKeepAliveTimeout(), TimeUnit.MILLISECONDS); builder.permitKeepAliveTime(this.serverOption.getPermitKeepAliveTimeout(), TimeUnit.MILLISECONDS); builder.permitKeepAliveWithoutCalls(this.serverOption.isPermitKeepAliveWithoutCalls()); @@ -186,16 +182,19 @@ private void setupServerOption(final NettyServerBuilder builder) { builder.maxConnectionAge(this.serverOption.getMaxConnectionAge(), TimeUnit.MILLISECONDS); builder.maxConnectionAgeGrace(this.serverOption.getMaxConnectionAgeGrace(), TimeUnit.MILLISECONDS); builder.maxConcurrentCallsPerConnection(this.serverOption.getMaxConcurrentCallsPerConnection()); + if (logger.isInfoEnabled()) { + logger.info("Set serverOption {}. name={}, hostname={}, port={}", serverOption, name, hostname, port); + } } public void close() { final Future workerShutdown = this.workerEventLoopGroup.shutdownGracefully(); workerShutdown.awaitUninterruptibly(); - ExecutorUtils.shutdownExecutorService(name + "-worker", workerExecutor); + ExecutorUtils.shutdownExecutorService(name + "-Channel-Worker", workerExecutor); final Future bossShutdown = this.bossEventLoopGroup.shutdownGracefully(); bossShutdown.awaitUninterruptibly(); - ExecutorUtils.shutdownExecutorService(name + "-boss", bossExecutor); + ExecutorUtils.shutdownExecutorService(name + "-Channel-Boss", bossExecutor); } @Override diff --git a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ServerOption.java b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ServerOption.java index 0fada5421919..aee1d12f9a99 100644 --- a/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ServerOption.java +++ b/grpc/src/main/java/com/navercorp/pinpoint/grpc/server/ServerOption.java @@ -18,14 +18,17 @@ import java.util.concurrent.TimeUnit; +/** + * @author jaehong.kim + */ public class ServerOption { private static final int DEFAULT_FLOW_CONTROL_WINDOW = 1048576; // 1MiB private static final long DEFAULT_KEEPALIVE_TIME = TimeUnit.MINUTES.toMillis(5); private static final long DEFAULT_KEEPALIVE_TIMEOUT = TimeUnit.MINUTES.toMillis(30); - private static final long DEFAULT_PERMIT_KEEPALIVE_TIMEOUT = TimeUnit.MINUTES.toMillis(60); + private static final long DEFAULT_PERMIT_KEEPALIVE_TIMEOUT = TimeUnit.MINUTES.toMillis(3); private static final boolean DEFAULT_PERMIT_KEEPALIVE_WITHOUT_CALLS = Boolean.FALSE; - private static final long DEFAULT_MAX_CONNECTION_IDLE = Long.MAX_VALUE; // Disabled + private static final long DEFAULT_MAX_CONNECTION_IDLE = TimeUnit.SECONDS.toMillis(10); // 10s private static final long DEFAULT_MAX_CONNECTION_AGE = Long.MAX_VALUE; // Disabled private static final long DEFAULT_MAX_CONNECTION_AGE_GRACE = Long.MAX_VALUE; // Infinite private static final int DEFAULT_MAX_CONCURRENT_CALLS_PER_CONNECTION = Integer.MAX_VALUE; // Infinite @@ -34,6 +37,11 @@ public class ServerOption { private static final int DEFAULT_MAX_HEADER_LIST_SIZE = 8192; private static final long DEFAULT_HANDSHAKE_TIMEOUT = TimeUnit.SECONDS.toMillis(120); + private static final int DEFAULT_RECEIVE_BUFFER_SIZE = 1024 * 64; + private static final int DEFAULT_BACKLOG_QUEUE_SIZE = 200; + public static final int DEFAULT_CONNECT_TIMEOUT = 3000; + public static final int DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK = 32 * 1024; + public static final int DEFAULT_WRITE_BUFFER_LOW_WATER_MARK = 16 * 1024; // Sets a custom keepalive time, the delay time for sending next keepalive ping. private final long keepAliveTime; @@ -62,7 +70,14 @@ public class ServerOption { // Sets the HTTP/2 flow control window. private final int flowControlWindow; - ServerOption(long keepAliveTime, long keepAliveTimeout, long permitKeepAliveTimeout, boolean permitKeepAliveWithoutCalls, long maxConnectionIdle, long maxConnectionAge, long maxConnectionAgeGrace, int maxConcurrentCallsPerConnection, int maxInboundMessageSize, int maxHeaderListSize, long handshakeTimeout, int flowControlWindow) { + // ChannelOption + private final int receiveBufferSize; + private final int backlogQueueSize; + private final int connectTimeout; + private final int writeBufferHighWaterMark; + private final int writeBufferLowWaterMark; + + ServerOption(long keepAliveTime, long keepAliveTimeout, long permitKeepAliveTimeout, boolean permitKeepAliveWithoutCalls, long maxConnectionIdle, long maxConnectionAge, long maxConnectionAgeGrace, int maxConcurrentCallsPerConnection, int maxInboundMessageSize, int maxHeaderListSize, long handshakeTimeout, int flowControlWindow, int receiveBufferSize, int backlogQueueSize, int connectTimeout, int writeBufferHighWaterMark, int writeBufferLowWaterMark) { this.keepAliveTime = keepAliveTime; this.keepAliveTimeout = keepAliveTimeout; this.permitKeepAliveTimeout = permitKeepAliveTimeout; @@ -75,6 +90,11 @@ public class ServerOption { this.maxHeaderListSize = maxHeaderListSize; this.handshakeTimeout = handshakeTimeout; this.flowControlWindow = flowControlWindow; + this.receiveBufferSize = receiveBufferSize; + this.backlogQueueSize = backlogQueueSize; + this.connectTimeout = connectTimeout; + this.writeBufferHighWaterMark = writeBufferHighWaterMark; + this.writeBufferLowWaterMark = writeBufferLowWaterMark; } public long getKeepAliveTime() { @@ -125,6 +145,26 @@ public int getFlowControlWindow() { return flowControlWindow; } + public int getReceiveBufferSize() { + return receiveBufferSize; + } + + public int getBacklogQueueSize() { + return backlogQueueSize; + } + + public int getConnectTimeout() { + return connectTimeout; + } + + public int getWriteBufferHighWaterMark() { + return writeBufferHighWaterMark; + } + + public int getWriteBufferLowWaterMark() { + return writeBufferLowWaterMark; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("ServerOption{"); @@ -140,6 +180,11 @@ public String toString() { sb.append(", maxHeaderListSize=").append(maxHeaderListSize); sb.append(", handshakeTimeout=").append(handshakeTimeout); sb.append(", flowControlWindow=").append(flowControlWindow); + sb.append(", receiveBufferSize=").append(receiveBufferSize); + sb.append(", backlogQueueSize=").append(backlogQueueSize); + sb.append(", connectTimeout=").append(connectTimeout); + sb.append(", writeBufferHighWaterMark=").append(writeBufferHighWaterMark); + sb.append(", writeBufferLowWaterMark=").append(writeBufferLowWaterMark); sb.append('}'); return sb.toString(); } @@ -172,8 +217,14 @@ public static class Builder { // Sets the HTTP/2 flow control window. private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW; + private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE; + private int backlogQueueSize = DEFAULT_BACKLOG_QUEUE_SIZE; + private int connectTimeout = DEFAULT_CONNECT_TIMEOUT; + private int writeBufferHighWaterMark = DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK; + private int writeBufferLowWaterMark = DEFAULT_WRITE_BUFFER_LOW_WATER_MARK; + public ServerOption build() { - final ServerOption serverOption = new ServerOption(keepAliveTime, keepAliveTimeout, permitKeepAliveTimeout, permitKeepAliveWithoutCalls, maxConnectionIdle, maxConnectionAge, maxConnectionAgeGrace, maxConcurrentCallsPerConnection, maxInboundMessageSize, maxHeaderListSize, handshakeTimeout, flowControlWindow); + final ServerOption serverOption = new ServerOption(keepAliveTime, keepAliveTimeout, permitKeepAliveTimeout, permitKeepAliveWithoutCalls, maxConnectionIdle, maxConnectionAge, maxConnectionAgeGrace, maxConcurrentCallsPerConnection, maxInboundMessageSize, maxHeaderListSize, handshakeTimeout, flowControlWindow, receiveBufferSize, backlogQueueSize, connectTimeout, writeBufferHighWaterMark, writeBufferLowWaterMark); return serverOption; } @@ -224,5 +275,25 @@ public void setHandshakeTimeout(long handshakeTimeout) { public void setFlowControlWindow(int flowControlWindow) { this.flowControlWindow = flowControlWindow; } + + public void setReceiveBufferSize(int receiveBufferSize) { + this.receiveBufferSize = receiveBufferSize; + } + + public void setBacklogQueueSize(int backlogQueueSize) { + this.backlogQueueSize = backlogQueueSize; + } + + public void setConnectTimeout(int connectTimeout) { + this.connectTimeout = connectTimeout; + } + + public void setWriteBufferHighWaterMark(int writeBufferHighWaterMark) { + this.writeBufferHighWaterMark = writeBufferHighWaterMark; + } + + public void setWriteBufferLowWaterMark(int writeBufferLowWaterMark) { + this.writeBufferLowWaterMark = writeBufferLowWaterMark; + } } } \ No newline at end of file diff --git a/grpc/src/test/java/com/navercorp/pinpoint/grpc/ChannelFactoryTest.java b/grpc/src/test/java/com/navercorp/pinpoint/grpc/ChannelFactoryTest.java index 3d9017f6eb87..6f0cf7f05c67 100644 --- a/grpc/src/test/java/com/navercorp/pinpoint/grpc/ChannelFactoryTest.java +++ b/grpc/src/test/java/com/navercorp/pinpoint/grpc/ChannelFactoryTest.java @@ -17,11 +17,17 @@ package com.navercorp.pinpoint.grpc; import com.navercorp.pinpoint.common.util.PinpointThreadFactory; + import com.navercorp.pinpoint.grpc.client.ChannelFactory; import com.navercorp.pinpoint.grpc.client.ChannelFactoryOption; import com.navercorp.pinpoint.grpc.server.MetadataServerTransportFilter; import com.navercorp.pinpoint.grpc.server.ServerContext; import com.navercorp.pinpoint.grpc.server.ServerFactory; + +import com.navercorp.pinpoint.grpc.client.ClientOption; +import com.navercorp.pinpoint.grpc.server.MetadataServerTransportFilter; +import com.navercorp.pinpoint.grpc.server.ServerOption; + import com.navercorp.pinpoint.grpc.server.TransportMetadataFactory; import com.navercorp.pinpoint.grpc.server.TransportMetadataServerInterceptor; import com.navercorp.pinpoint.grpc.server.lifecycle.DefaultLifecycleRegistry; @@ -162,7 +168,7 @@ private PSpan newSpan() { private static Server serverStart(ExecutorService executorService) throws IOException { logger.debug("server start"); - serverFactory = new ServerFactory(ChannelFactoryTest.class.getSimpleName() + "-server", "127.0.0.1", PORT, executorService); + serverFactory = new ServerFactory(ChannelFactoryTest.class.getSimpleName() + "-server", "127.0.0.1", PORT, executorService, new ServerOption.Builder().build()); spanService = new SpanService(1); serverFactory.addService(spanService); diff --git a/grpc/src/test/java/com/navercorp/pinpoint/grpc/client/ClientOptionTest.java b/grpc/src/test/java/com/navercorp/pinpoint/grpc/client/ClientOptionTest.java new file mode 100644 index 000000000000..dae9007d2614 --- /dev/null +++ b/grpc/src/test/java/com/navercorp/pinpoint/grpc/client/ClientOptionTest.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.grpc.client; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class ClientOptionTest { + + @Test + public void build() throws Exception { + ClientOption.Builder clientOptionBuilder = new ClientOption.Builder(); + clientOptionBuilder.setKeepAliveTime(1); + clientOptionBuilder.setKeepAliveTimeout(1); + clientOptionBuilder.setKeepAliveWithoutCalls(true); + + clientOptionBuilder.setIdleTimeoutMillis(1); + + clientOptionBuilder.setFlowControlWindow(1); + clientOptionBuilder.setMaxHeaderListSize(1); + + clientOptionBuilder.setMaxInboundMessageSize(1); + clientOptionBuilder.setConnectTimeout(1); + + clientOptionBuilder.setWriteBufferHighWaterMark(1); + clientOptionBuilder.setWriteBufferLowWaterMark(1); + + ClientOption clientOption = clientOptionBuilder.build(); + assertEquals(1, clientOption.getKeepAliveTime()); + assertEquals(1, clientOption.getKeepAliveTimeout()); + assertEquals(true, clientOption.isKeepAliveWithoutCalls()); + assertEquals(1, clientOption.getIdleTimeoutMillis()); + assertEquals(1, clientOption.getFlowControlWindow()); + assertEquals(1, clientOption.getMaxHeaderListSize()); + assertEquals(1, clientOption.getMaxInboundMessageSize()); + assertEquals(1, clientOption.getConnectTimeout()); + assertEquals(1, clientOption.getWriteBufferHighWaterMark()); + assertEquals(1, clientOption.getWriteBufferLowWaterMark()); + } +} \ No newline at end of file diff --git a/grpc/src/test/java/com/navercorp/pinpoint/grpc/server/ServerOptionTest.java b/grpc/src/test/java/com/navercorp/pinpoint/grpc/server/ServerOptionTest.java new file mode 100644 index 000000000000..e1f43df53272 --- /dev/null +++ b/grpc/src/test/java/com/navercorp/pinpoint/grpc/server/ServerOptionTest.java @@ -0,0 +1,75 @@ +/* + * Copyright 2019 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.navercorp.pinpoint.grpc.server; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class ServerOptionTest { + + @Test + public void build() throws Exception { + ServerOption.Builder builder = new ServerOption.Builder(); + builder.setKeepAliveTime(1); + builder.setKeepAliveTimeout(1); + builder.setPermitKeepAliveTimeout(1); + builder.setPermitKeepAliveWithoutCalls(true); + + builder.setMaxConnectionIdle(1); + builder.setMaxConnectionAge(1); + builder.setMaxConnectionAgeGrace(1); + + builder.setMaxConcurrentCallsPerConnection(1); + builder.setFlowControlWindow(1); + builder.setMaxHeaderListSize(1); + + builder.setBacklogQueueSize(1); + builder.setConnectTimeout(1); + builder.setReceiveBufferSize(1); + builder.setWriteBufferHighWaterMark(1); + builder.setWriteBufferLowWaterMark(1); + + builder.setHandshakeTimeout(1); + builder.setMaxInboundMessageSize(1); + + + ServerOption serverOption = builder.build(); + assertEquals(1, serverOption.getKeepAliveTime()); + assertEquals(1, serverOption.getKeepAliveTimeout()); + assertEquals(1, serverOption.getPermitKeepAliveTimeout()); + assertEquals(true, serverOption.isPermitKeepAliveWithoutCalls()); + + + assertEquals(1, serverOption.getMaxConnectionIdle()); + assertEquals(1, serverOption.getMaxConnectionAge()); + assertEquals(1, serverOption.getMaxConnectionAgeGrace()); + + assertEquals(1, serverOption.getMaxConcurrentCallsPerConnection()); + assertEquals(1, serverOption.getFlowControlWindow()); + assertEquals(1, serverOption.getMaxHeaderListSize()); + + assertEquals(1, serverOption.getBacklogQueueSize()); + assertEquals(1, serverOption.getConnectTimeout()); + assertEquals(1, serverOption.getReceiveBufferSize()); + assertEquals(1, serverOption.getWriteBufferHighWaterMark()); + assertEquals(1, serverOption.getWriteBufferLowWaterMark()); + + assertEquals(1, serverOption.getHandshakeTimeout()); + assertEquals(1, serverOption.getMaxInboundMessageSize()); + } +} \ No newline at end of file diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/GrpcTransportConfig.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/GrpcTransportConfig.java index a3d9199f0e61..0c4e7fc1a5c3 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/GrpcTransportConfig.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/grpc/GrpcTransportConfig.java @@ -18,6 +18,7 @@ import com.navercorp.pinpoint.bootstrap.config.DefaultProfilerConfig; import com.navercorp.pinpoint.bootstrap.config.ProfilerConfig; +import com.navercorp.pinpoint.grpc.client.ClientOption; /** * @author Woonduk Kang(emeroad) @@ -25,32 +26,73 @@ public class GrpcTransportConfig { private static final String DEFAULT_IP = "127.0.0.1"; + private static final long DEFAULT_CLIENT_REQUEST_TIMEOUT = 6000; private String collectorAgentServerIp = DEFAULT_IP; - private int collectorAgentServerPort = 9997; + private int collectorAgentServerPort = 9991; + + private String collectorStatServerIp = DEFAULT_IP; + private int collectorStatServerPort = 9992; private String collectorSpanServerIp = DEFAULT_IP; - private int collectorSpanServerPort = 9998; + private int collectorSpanServerPort = 9993; - private String collectorStatServerIp = DEFAULT_IP; - private int collectorStatServerPort = 9999; + private ClientOption agentClientOption = new ClientOption.Builder().build(); + private ClientOption statClientOption = new ClientOption.Builder().build(); + private ClientOption spanClientOption = new ClientOption.Builder().build(); - private static final long DEFAULT_CLIENT_REQUEST_TIMEOUT = 6000; private long clientRequestTimeout = DEFAULT_CLIENT_REQUEST_TIMEOUT; + private int spanSenderExecutorQueueSize = 1024; + private int statSenderExecutorQueueSize = 1024; public void read(ProfilerConfig profilerConfig) { final ProfilerConfig.ValueResolver placeHolderResolver = new DefaultProfilerConfig.PlaceHolderResolver(); - // Agent + // Agent Collector this.collectorAgentServerIp = profilerConfig.readString("profiler.transport.grpc.collector.agent.ip", DEFAULT_IP, placeHolderResolver); - this.collectorAgentServerPort = profilerConfig.readInt("profiler.transport.grpc.collector.agent.port", 9997); - // Span - this.collectorSpanServerIp = profilerConfig.readString("profiler.transport.grpc.collector.span.ip", DEFAULT_IP, placeHolderResolver); - this.collectorSpanServerPort = profilerConfig.readInt("profiler.transport.grpc.collector.span.port", 9998); - // Stat + this.collectorAgentServerPort = profilerConfig.readInt("profiler.transport.grpc.collector.agent.port", 9991); + // Stat Collector this.collectorStatServerIp = profilerConfig.readString("profiler.transport.grpc.collector.stat.ip", DEFAULT_IP, placeHolderResolver); - this.collectorStatServerPort = profilerConfig.readInt("profiler.transport.grpc.collector.stat.port", 9999); this.clientRequestTimeout = profilerConfig.readLong("profiler.transport.grpc.client.request.timeout", DEFAULT_CLIENT_REQUEST_TIMEOUT); + this.collectorStatServerPort = profilerConfig.readInt("profiler.transport.grpc.collector.stat.port", 9992); + // Span Collector + this.collectorSpanServerIp = profilerConfig.readString("profiler.transport.grpc.collector.span.ip", DEFAULT_IP, placeHolderResolver); + this.collectorSpanServerPort = profilerConfig.readInt("profiler.transport.grpc.collector.span.port", 9993); + + this.statSenderExecutorQueueSize = profilerConfig.readInt("profiler.transport.grpc.stat.sender.executor.queue.size", 1024); + this.spanSenderExecutorQueueSize = profilerConfig.readInt("profiler.transport.grpc.span.sender.executor.queue.size", 1024); + + // ClientOption + this.agentClientOption = readAgentClientOption(profilerConfig); + this.statClientOption = readStatClientOption(profilerConfig); + this.spanClientOption = readSpanClientOption(profilerConfig); + } + + private ClientOption readAgentClientOption(final ProfilerConfig profilerConfig) { + return readClientOption(profilerConfig, "profiler.transport.grpc.agent"); + } + + private ClientOption readStatClientOption(final ProfilerConfig profilerConfig) { + return readClientOption(profilerConfig, "profiler.transport.grpc.stat"); + } + + private ClientOption readSpanClientOption(final ProfilerConfig profilerConfig) { + return readClientOption(profilerConfig, "profiler.transport.grpc.span"); + } + + private ClientOption readClientOption(final ProfilerConfig profilerConfig, final String transportName) { + final ClientOption.Builder builder = new ClientOption.Builder(); + profilerConfig.readLong(transportName + ".keepalive.time", ClientOption.DEFAULT_KEEPALIVE_TIME); + profilerConfig.readLong(transportName + ".keepalive.timeout", ClientOption.DEFAULT_KEEPALIVE_TIMEOUT); + profilerConfig.readBoolean(transportName + ".keepalive.without-calls", ClientOption.DEFAULT_KEEPALIVE_WITHOUT_CALLS); + profilerConfig.readLong(transportName + ".idle.timeout", ClientOption.IDLE_TIMEOUT_MILLIS_DISABLE); + profilerConfig.readInt(transportName + ".headers.size.max", ClientOption.DEFAULT_MAX_HEADER_LIST_SIZE); + profilerConfig.readInt(transportName + ".message.inbound.size.max", ClientOption.DEFAULT_MAX_MESSAGE_SIZE); + profilerConfig.readInt(transportName + ".connect.timeout", ClientOption.DEFAULT_CONNECT_TIMEOUT); + profilerConfig.readInt(transportName + ".write.buffer.highwatermark", ClientOption.DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK); + profilerConfig.readInt(transportName + ".write.buffer.lowwatermark", ClientOption.DEFAULT_WRITE_BUFFER_LOW_WATER_MARK); + + return builder.build(); } public String getCollectorSpanServerIp() { @@ -77,20 +119,45 @@ public int getCollectorStatServerPort() { return collectorStatServerPort; } + public int getSpanSenderExecutorQueueSize() { + return spanSenderExecutorQueueSize; + } + + public int getStatSenderExecutorQueueSize() { + return statSenderExecutorQueueSize; + } + public long getClientRequestTimeout() { return clientRequestTimeout; } + public ClientOption getAgentClientOption() { + return agentClientOption; + } + + public ClientOption getStatClientOption() { + return statClientOption; + } + + public ClientOption getSpanClientOption() { + return spanClientOption; + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("GrpcTransportConfig{"); sb.append("collectorAgentServerIp='").append(collectorAgentServerIp).append('\''); sb.append(", collectorAgentServerPort=").append(collectorAgentServerPort); - sb.append(", collectorSpanServerIp='").append(collectorSpanServerIp).append('\''); - sb.append(", collectorSpanServerPort=").append(collectorSpanServerPort); sb.append(", collectorStatServerIp='").append(collectorStatServerIp).append('\''); sb.append(", collectorStatServerPort=").append(collectorStatServerPort); + sb.append(", collectorSpanServerIp='").append(collectorSpanServerIp).append('\''); + sb.append(", collectorSpanServerPort=").append(collectorSpanServerPort); + sb.append(", agentClientOption=").append(agentClientOption); + sb.append(", statClientOption=").append(statClientOption); + sb.append(", spanClientOption=").append(spanClientOption); sb.append(", clientRequestTimeout=").append(clientRequestTimeout); + sb.append(", spanSenderExecutorQueueSize=").append(spanSenderExecutorQueueSize); + sb.append(", statSenderExecutorQueueSize=").append(statSenderExecutorQueueSize); sb.append('}'); return sb.toString(); } diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/AgentGrpcDataSenderProvider.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/AgentGrpcDataSenderProvider.java index 67bfa14256f3..60ce52091e6e 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/AgentGrpcDataSenderProvider.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/AgentGrpcDataSenderProvider.java @@ -16,6 +16,13 @@ package com.navercorp.pinpoint.profiler.context.provider.grpc; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.protobuf.GeneratedMessageV3; +import com.navercorp.pinpoint.grpc.client.ClientOption; +import com.navercorp.pinpoint.profiler.context.active.ActiveTraceRepository; +import com.navercorp.pinpoint.profiler.context.grpc.GrpcTransportConfig; + import com.navercorp.pinpoint.common.util.Assert; import com.navercorp.pinpoint.grpc.HeaderFactory; import com.navercorp.pinpoint.grpc.client.ChannelFactoryOption; @@ -60,16 +67,16 @@ public AgentGrpcDataSenderProvider(GrpcTransportConfig grpcTransportConfig, public EnhancedDataSender get() { String collectorTcpServerIp = grpcTransportConfig.getCollectorAgentServerIp(); int collectorTcpServerPort = grpcTransportConfig.getCollectorAgentServerPort(); - UnaryCallDeadlineInterceptor unaryCallDeadlineInterceptor = new UnaryCallDeadlineInterceptor(grpcTransportConfig.getClientRequestTimeout()); + final ClientOption clientOption = grpcTransportConfig.getAgentClientOption(); ChannelFactoryOption.Builder channelFactoryOptionBuilder = ChannelFactoryOption.newBuilder(); - channelFactoryOptionBuilder.setName("Default"); + channelFactoryOptionBuilder.setName("AgentGrpcDataSender"); channelFactoryOptionBuilder.setHeaderFactory(headerFactory); channelFactoryOptionBuilder.setNameResolverProvider(nameResolverProvider); channelFactoryOptionBuilder.addClientInterceptor(unaryCallDeadlineInterceptor); + channelFactoryOptionBuilder.setClientOption(clientOption); return new AgentGrpcDataSender(collectorTcpServerIp, collectorTcpServerPort, messageConverter, channelFactoryOptionBuilder.build(), activeTraceRepository); } - } \ No newline at end of file diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/SpanGrpcDataSenderProvider.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/SpanGrpcDataSenderProvider.java index 65323e59192b..6b0ceab391ad 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/SpanGrpcDataSenderProvider.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/SpanGrpcDataSenderProvider.java @@ -22,6 +22,8 @@ import com.navercorp.pinpoint.grpc.client.ChannelFactoryOption; import com.navercorp.pinpoint.grpc.client.UnaryCallDeadlineInterceptor; +import com.navercorp.pinpoint.grpc.client.ClientOption; + import com.navercorp.pinpoint.profiler.context.grpc.GrpcTransportConfig; import com.navercorp.pinpoint.common.util.Assert; import com.navercorp.pinpoint.grpc.HeaderFactory; @@ -55,16 +57,18 @@ public SpanGrpcDataSenderProvider(GrpcTransportConfig grpcTransportConfig, public DataSender get() { String collectorTcpServerIp = grpcTransportConfig.getCollectorSpanServerIp(); int collectorTcpServerPort = grpcTransportConfig.getCollectorSpanServerPort(); - + final int senderExecutorQueueSize = grpcTransportConfig.getSpanSenderExecutorQueueSize(); UnaryCallDeadlineInterceptor unaryCallDeadlineInterceptor = new UnaryCallDeadlineInterceptor(grpcTransportConfig.getClientRequestTimeout()); + final ClientOption clientOption = grpcTransportConfig.getSpanClientOption(); ChannelFactoryOption.Builder channelFactoryOptionBuilder = ChannelFactoryOption.newBuilder(); channelFactoryOptionBuilder.setName("SpanGrpcDataSender"); channelFactoryOptionBuilder.setHeaderFactory(headerFactory); channelFactoryOptionBuilder.setNameResolverProvider(nameResolverProvider); channelFactoryOptionBuilder.addClientInterceptor(unaryCallDeadlineInterceptor); + channelFactoryOptionBuilder.setClientOption(clientOption); - return new SpanGrpcDataSender(collectorTcpServerIp, collectorTcpServerPort, messageConverter, channelFactoryOptionBuilder.build()); + return new SpanGrpcDataSender(collectorTcpServerIp, collectorTcpServerPort, senderExecutorQueueSize, messageConverter, channelFactoryOptionBuilder.build()); } } \ No newline at end of file diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/StatGrpcDataSenderProvider.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/StatGrpcDataSenderProvider.java index 80fc92d8871a..79086bb65332 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/StatGrpcDataSenderProvider.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/context/provider/grpc/StatGrpcDataSenderProvider.java @@ -16,19 +16,21 @@ package com.navercorp.pinpoint.profiler.context.provider.grpc; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.protobuf.GeneratedMessageV3; +import com.navercorp.pinpoint.grpc.client.ClientOption; +import com.navercorp.pinpoint.profiler.context.grpc.GrpcTransportConfig; + import com.navercorp.pinpoint.common.util.Assert; import com.navercorp.pinpoint.grpc.HeaderFactory; import com.navercorp.pinpoint.grpc.client.ChannelFactoryOption; import com.navercorp.pinpoint.grpc.client.UnaryCallDeadlineInterceptor; -import com.navercorp.pinpoint.profiler.context.grpc.GrpcTransportConfig; import com.navercorp.pinpoint.profiler.context.module.StatConverter; import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter; import com.navercorp.pinpoint.profiler.sender.DataSender; import com.navercorp.pinpoint.profiler.sender.grpc.StatGrpcDataSender; -import com.google.inject.Inject; -import com.google.inject.Provider; -import com.google.protobuf.GeneratedMessageV3; import io.grpc.NameResolverProvider; /** @@ -55,17 +57,17 @@ public StatGrpcDataSenderProvider(GrpcTransportConfig grpcTransportConfig, public DataSender get() { String collectorTcpServerIp = grpcTransportConfig.getCollectorStatServerIp(); int collectorTcpServerPort = grpcTransportConfig.getCollectorStatServerPort(); - + int senderExecutorQueueSize = grpcTransportConfig.getStatSenderExecutorQueueSize(); UnaryCallDeadlineInterceptor unaryCallDeadlineInterceptor = new UnaryCallDeadlineInterceptor(grpcTransportConfig.getClientRequestTimeout()); + final ClientOption clientOption = grpcTransportConfig.getStatClientOption(); ChannelFactoryOption.Builder channelFactoryOptionBuilder = ChannelFactoryOption.newBuilder(); channelFactoryOptionBuilder.setName("StatGrpcDataSender"); channelFactoryOptionBuilder.setHeaderFactory(headerFactory); channelFactoryOptionBuilder.setNameResolverProvider(nameResolverProvider); channelFactoryOptionBuilder.addClientInterceptor(unaryCallDeadlineInterceptor); + channelFactoryOptionBuilder.setClientOption(clientOption); - return new StatGrpcDataSender(collectorTcpServerIp, collectorTcpServerPort, messageConverter, channelFactoryOptionBuilder.build()); + return new StatGrpcDataSender(collectorTcpServerIp, collectorTcpServerPort, senderExecutorQueueSize, messageConverter, channelFactoryOptionBuilder.build()); } - - } \ No newline at end of file diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSender.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSender.java index 1d650af58cc7..fcb79ce2b33d 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSender.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSender.java @@ -16,26 +16,32 @@ package com.navercorp.pinpoint.profiler.sender.grpc; +import com.navercorp.pinpoint.grpc.client.ClientOption; +import com.navercorp.pinpoint.grpc.trace.MetadataGrpc; +import com.navercorp.pinpoint.profiler.context.active.ActiveTraceRepository; +import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcCommandService; +import com.navercorp.pinpoint.profiler.sender.EnhancedDataSender; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.GeneratedMessageV3; + import com.navercorp.pinpoint.common.util.Assert; import com.navercorp.pinpoint.common.util.ExecutorFactory; import com.navercorp.pinpoint.common.util.PinpointThreadFactory; -import com.navercorp.pinpoint.common.util.StringUtils; import com.navercorp.pinpoint.grpc.client.ChannelFactory; import com.navercorp.pinpoint.grpc.client.ChannelFactoryOption; import com.navercorp.pinpoint.grpc.trace.AgentGrpc; -import com.navercorp.pinpoint.grpc.trace.MetadataGrpc; import com.navercorp.pinpoint.grpc.trace.PAgentInfo; import com.navercorp.pinpoint.grpc.trace.PApiMetaData; import com.navercorp.pinpoint.grpc.trace.PResult; import com.navercorp.pinpoint.grpc.trace.PSqlMetaData; import com.navercorp.pinpoint.grpc.trace.PStringMetaData; -import com.navercorp.pinpoint.profiler.context.active.ActiveTraceRepository; import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter; -import com.navercorp.pinpoint.profiler.receiver.grpc.GrpcCommandService; import com.navercorp.pinpoint.profiler.sender.AsyncQueueingExecutor; import com.navercorp.pinpoint.profiler.sender.AsyncQueueingExecutorListener; import com.navercorp.pinpoint.profiler.sender.DefaultAsyncQueueingExecutorListener; -import com.navercorp.pinpoint.profiler.sender.EnhancedDataSender; import com.navercorp.pinpoint.profiler.sender.RequestMessage; import com.navercorp.pinpoint.profiler.sender.RequestMessageFactory; import com.navercorp.pinpoint.profiler.sender.RetryMessage; @@ -47,10 +53,6 @@ import com.navercorp.pinpoint.rpc.client.PinpointClientReconnectEventListener; import com.navercorp.pinpoint.rpc.util.TimerFactory; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.GeneratedMessageV3; import io.grpc.ManagedChannel; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.util.HashedWheelTimer; @@ -71,13 +73,15 @@ * @author jaehong.kim */ public class AgentGrpcDataSender implements EnhancedDataSender { - protected final Logger logger = LoggerFactory.getLogger(this.getClass()); static { // preClassLoad ChannelBuffers.buffer(2); } + private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final boolean isDebug = logger.isDebugEnabled(); + private final Timer timer; private final AtomicBoolean fireState = new AtomicBoolean(false); @@ -102,7 +106,7 @@ public class AgentGrpcDataSender implements EnhancedDataSender { private final MetadataGrpc.MetadataFutureStub metadataStub; private GrpcCommandService grpcCommandService; - + private ClientOption clientOption; public AgentGrpcDataSender(String host, int port, MessageConverter messageConverter, ChannelFactoryOption channelFactoryOption) { this(host, port, messageConverter, channelFactoryOption, null); @@ -114,11 +118,9 @@ public AgentGrpcDataSender(String host, int port, MessageConverter createAsyncQueueingExecutor(int queueSize, String executorName) { + private AsyncQueueingExecutor createAsyncQueueingExecutor(int queueSize) { AsyncQueueingExecutorListener listener = new DefaultAsyncQueueingExecutorListener() { @Override public void execute(Object message) { sendPacket(message); } }; - final AsyncQueueingExecutor executor = new AsyncQueueingExecutor(queueSize, executorName, listener); + final String threadName = PinpointThreadFactory.DEFAULT_THREAD_NAME_PREFIX + name + "-Executor"; + final AsyncQueueingExecutor executor = new AsyncQueueingExecutor(queueSize, threadName, listener); return executor; } @@ -263,17 +256,19 @@ public void onComplete(Future future) { try { PResult result = PResult.parseFrom(responseMessage.getMessage()); if (result.getSuccess()) { - logger.debug("result success"); + if (isDebug) { + logger.debug("Request success. request={}, result={}", targetClass.getClass().getSimpleName(), result.getMessage()); + } } else { - logger.info("request fail. request:{} Caused:{}", targetClass, result.getMessage()); + logger.info("Request fail. request={}, result={}", targetClass.getClass().getSimpleName(), result.getMessage()); RetryMessage retryMessage = new RetryMessage(1, maxRetryCount, requestPacket, targetClass.getClass().getSimpleName()); retryRequest(retryMessage); } } catch (Exception e) { - logger.warn("Invalid response:{}", responseMessage); + logger.warn("Invalid response. request={}, result={}", targetClass.getClass().getSimpleName(), responseMessage); } } else { - logger.info("request fail. request:{} Caused:{}", targetClass, future.getCause().getMessage(), future.getCause()); + logger.info("Request fail. request={}, caused={}", targetClass.getClass().getSimpleName(), future.getCause().getMessage(), future.getCause()); RetryMessage retryMessage = new RetryMessage(1, maxRetryCount, requestPacket, targetClass.getClass().getSimpleName()); retryRequest(retryMessage); } @@ -294,16 +289,18 @@ public void onComplete(Future future) { try { PResult result = PResult.parseFrom(responseMessage.getMessage()); if (result.getSuccess()) { - logger.debug("result success"); + if (isDebug) { + logger.debug("Request success. request={}, result={}", retryMessage, result.getMessage()); + } } else { - logger.info("request fail. request:{}, Caused:{}", retryMessage, result.getMessage()); + logger.info("Request fail. request={}, result={}", retryMessage, result.getMessage()); retryRequest(retryMessage); } } catch (Exception e) { - logger.warn("Invalid response:{}", responseMessage); + logger.warn("Invalid response. request={}, result={}", retryMessage, responseMessage); } } else { - logger.info("request fail. request:{}, caused:{}", retryMessage, future.getCause().getMessage(), future.getCause()); + logger.info("Request fail. request={}, caused={}", retryMessage, future.getCause().getMessage(), future.getCause()); retryRequest(retryMessage); } } diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/GrpcDataSender.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/GrpcDataSender.java index 68f5d47c3714..7d101014a810 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/GrpcDataSender.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/GrpcDataSender.java @@ -21,6 +21,7 @@ import com.navercorp.pinpoint.common.util.PinpointThreadFactory; import com.navercorp.pinpoint.grpc.ExecutorUtils; import com.navercorp.pinpoint.grpc.client.ChannelFactory; + import com.navercorp.pinpoint.grpc.client.ChannelFactoryOption; import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter; import com.navercorp.pinpoint.profiler.sender.DataSender; @@ -41,6 +42,9 @@ * @author Woonduk Kang(emeroad) */ public abstract class GrpcDataSender implements DataSender { + protected static ScheduledExecutorService reconnectScheduler + = Executors.newScheduledThreadPool(1, new PinpointThreadFactory("Pinpoint-reconnect-thread")); + protected final Logger logger = LoggerFactory.getLogger(this.getClass()); protected final String name; @@ -55,9 +59,6 @@ public abstract class GrpcDataSender implements DataSender { protected volatile boolean shutdown; - protected static ScheduledExecutorService reconnectScheduler - = Executors.newScheduledThreadPool(1, new PinpointThreadFactory("pinpoint-reconnect-thread")); - protected final Reconnector reconnector = new Reconnector() { @Override public void reconnect(ReconnectJob reconnectJob) { @@ -65,23 +66,23 @@ public void reconnect(ReconnectJob reconnectJob) { } }; - private ThreadPoolExecutor newExecutorService(String name) { - ThreadFactory threadFactory = new PinpointThreadFactory(name, true); - return ExecutorFactory.newFixedThreadPool(1, 1000, threadFactory); - } - - public GrpcDataSender(String host, int port, MessageConverter messageConverter, ChannelFactoryOption channelFactoryOption) { + public GrpcDataSender(String host, int port, int executorQueueSize, MessageConverter messageConverter, ChannelFactoryOption channelFactoryOption) { Assert.requireNonNull(channelFactoryOption, "channelFactoryOption must not be null"); this.name = Assert.requireNonNull(channelFactoryOption.getName(), "name must not be null"); this.messageConverter = Assert.requireNonNull(messageConverter, "messageConverter must not be null"); - this.executor = newExecutorService(name); + this.executor = newExecutorService(name + "-Executor", executorQueueSize); this.channelFactory = new ChannelFactory(channelFactoryOption); this.managedChannel = channelFactory.build(name, host, port); } + private ThreadPoolExecutor newExecutorService(String name, int senderExecutorQueueSize) { + ThreadFactory threadFactory = new PinpointThreadFactory(PinpointThreadFactory.DEFAULT_THREAD_NAME_PREFIX + name, true); + return ExecutorFactory.newFixedThreadPool(1, senderExecutorQueueSize, threadFactory); + } + @Override public boolean send(final Object data) { final Runnable command = new Runnable() { @@ -122,6 +123,4 @@ private void reconnect(ReconnectJob reconnectAction) { logger.info("recreateStream"); reconnectScheduler.schedule(reconnectAction, reconnectAction.nextBackoffNanos(), TimeUnit.NANOSECONDS); } - - } \ No newline at end of file diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SpanGrpcDataSender.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SpanGrpcDataSender.java index 61926d12a416..fcce58b196e8 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SpanGrpcDataSender.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/SpanGrpcDataSender.java @@ -16,13 +16,16 @@ package com.navercorp.pinpoint.profiler.sender.grpc; + import com.navercorp.pinpoint.grpc.client.ChannelFactoryOption; + +import com.google.protobuf.Empty; + import com.navercorp.pinpoint.grpc.trace.PSpan; import com.navercorp.pinpoint.grpc.trace.PSpanChunk; import com.navercorp.pinpoint.grpc.trace.SpanGrpc; import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter; -import com.google.protobuf.Empty; import com.google.protobuf.GeneratedMessageV3; import io.grpc.stub.StreamObserver; @@ -40,8 +43,8 @@ public class SpanGrpcDataSender extends GrpcDataSender { private volatile StreamObserver spanChunkStream; private final ReconnectJob spanChunkReconnectAction; - public SpanGrpcDataSender(String host, int port, MessageConverter messageConverter, ChannelFactoryOption channelFactoryOption) { - super(host, port, messageConverter, channelFactoryOption); + public SpanGrpcDataSender(String host, int port, int executorQueueSize, MessageConverter messageConverter, ChannelFactoryOption channelFactoryOption) { + super(host, port, executorQueueSize, messageConverter, channelFactoryOption); this.spanStub = SpanGrpc.newStub(managedChannel); @@ -83,7 +86,7 @@ public boolean send0(Object data) { return true; } if (spanMessage instanceof PSpan) { - final PSpan pSpan = (PSpan) spanMessage; + final PSpan pSpan = (PSpan) spanMessage; spanStream.onNext(pSpan); return true; } diff --git a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StatGrpcDataSender.java b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StatGrpcDataSender.java index c868345b2dea..c417ee5820e6 100644 --- a/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StatGrpcDataSender.java +++ b/profiler/src/main/java/com/navercorp/pinpoint/profiler/sender/grpc/StatGrpcDataSender.java @@ -16,7 +16,13 @@ package com.navercorp.pinpoint.profiler.sender.grpc; + import com.navercorp.pinpoint.grpc.client.ChannelFactoryOption; + +import com.google.protobuf.Empty; +import com.navercorp.pinpoint.grpc.HeaderFactory; +import com.navercorp.pinpoint.grpc.client.ClientOption; + import com.navercorp.pinpoint.grpc.trace.PAgentStat; import com.navercorp.pinpoint.grpc.trace.PAgentStatBatch; import com.navercorp.pinpoint.grpc.trace.StatGrpc; @@ -38,8 +44,9 @@ public class StatGrpcDataSender extends GrpcDataSender { private volatile StreamObserver statBatchStream; private final ReconnectJob statBatchStreamReconnectAction; - public StatGrpcDataSender(String host, int port, MessageConverter messageConverter, ChannelFactoryOption channelFactoryOption) { - super(host, port, messageConverter, channelFactoryOption); + + public StatGrpcDataSender(String host, int port, int senderExecutorQueueSize, MessageConverter messageConverter, ChannelFactoryOption channelFactoryOption) { + super(host, port, senderExecutorQueueSize, messageConverter, channelFactoryOption); this.statStub = StatGrpc.newStub(managedChannel); diff --git a/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSenderTestMain.java b/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSenderTestMain.java index 43689d8ff327..9a34c3bdcc7e 100644 --- a/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSenderTestMain.java +++ b/profiler/src/test/java/com/navercorp/pinpoint/profiler/sender/grpc/AgentGrpcDataSenderTestMain.java @@ -54,6 +54,7 @@ public void request() throws Exception { GrpcNameResolverProvider grpcNameResolverProvider = new GrpcNameResolverProvider(dnsExecutorServiceProvider); NameResolverProvider nameResolverProvider = grpcNameResolverProvider.get(); + ChannelFactoryOption.Builder builder = ChannelFactoryOption.newBuilder(); builder.setName("TestAgentGrpcDataSender"); builder.setHeaderFactory(headerFactory);