Skip to content

Commit

Permalink
[#4558] Add gRPC ClientOption
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim authored and emeroad committed Jun 25, 2019
1 parent 27142d1 commit 11fd0ba
Show file tree
Hide file tree
Showing 33 changed files with 812 additions and 249 deletions.
Expand Up @@ -33,7 +33,8 @@ public final class AgentBaseDataReceiverConfiguration {
private static final String WORKER_QUEUE_SIZE = PREFIX + ".worker.queueSize";
private static final String WORKER_MONITOR_ENABLE = PREFIX + ".worker.monitor";

private static final String GRPC_PREFIX = "collector.receiver.grpc";
private static final String GRPC_PREFIX = "collector.receiver.grpc.agent";
private static final String GRPC_ENABLE = GRPC_PREFIX + ".enable";
private static final String GRPC_BIND_IP = GRPC_PREFIX + ".ip";
private static final String GRPC_BIND_PORT = GRPC_PREFIX + ".port";
private static final String GRPC_WORKER_THREAD_SIZE = GRPC_PREFIX + ".worker.threadSize";
Expand Down Expand Up @@ -72,7 +73,7 @@ public AgentBaseDataReceiverConfiguration(Properties properties, DeprecatedConfi
this.workerMonitorEnable = isWorkerThreadMonitorEnable(properties, deprecatedConfiguration);

// gRPC
this.grpcEnable = CollectorConfiguration.readBoolean(properties, GRPC_PREFIX);
this.grpcEnable = CollectorConfiguration.readBoolean(properties, GRPC_ENABLE);
this.grpcBindIp = CollectorConfiguration.readString(properties, GRPC_BIND_IP, CollectorConfiguration.DEFAULT_LISTEN_IP);
Objects.requireNonNull(grpcBindIp);
this.grpcBindPort = CollectorConfiguration.readInt(properties, GRPC_BIND_PORT, 9997);
Expand Down
Expand Up @@ -27,15 +27,16 @@
public final class SpanReceiverConfiguration implements DataReceiverGroupConfiguration {

private static final String PREFIX = "collector.receiver.span";

private static final String GRPC_ENABLE = PREFIX + ".grpc";
private static final String GRPC_BIND_IP = PREFIX + ".grpc.ip";
private static final String GRPC_BIND_PORT = PREFIX + ".grpc.port";
private static final String GRPC_WORKER_THREAD_SIZE = PREFIX + ".grpc.worker.threadSize";
private static final String GRPC_WORKER_QUEUE_SIZE = PREFIX + ".grpc.worker.queueSize";
private static final String GRPC_WORKER_MONITOR_ENABLE = PREFIX + ".grpc.worker.monitor";
private static final String GRPC_KEEP_ALIVE_TIME = PREFIX + ".grpc.keepalive.time";
private static final String GRPC_KEEP_ALIVE_TIMEOUT = PREFIX + ".grpc.keepalive.timeout";
private static final String GRPC_PREFIX = "collector.receiver.grpc.span";

private static final String GRPC_ENABLE = GRPC_PREFIX + ".enable";
private static final String GRPC_BIND_IP = GRPC_PREFIX + ".ip";
private static final String GRPC_BIND_PORT = GRPC_PREFIX + ".port";
private static final String GRPC_WORKER_THREAD_SIZE = GRPC_PREFIX + ".worker.threadSize";
private static final String GRPC_WORKER_QUEUE_SIZE = GRPC_PREFIX + ".worker.queueSize";
private static final String GRPC_WORKER_MONITOR_ENABLE = GRPC_PREFIX + ".worker.monitor";
private static final String GRPC_KEEP_ALIVE_TIME = GRPC_PREFIX + ".keepalive.time";
private static final String GRPC_KEEP_ALIVE_TIMEOUT = GRPC_PREFIX + ".keepalive.timeout";

private static final String TCP_ENABLE = PREFIX + ".tcp";
private static final String TCP_BIND_IP = PREFIX + ".tcp.ip";
Expand Down
Expand Up @@ -27,15 +27,16 @@
public final class StatReceiverConfiguration implements DataReceiverGroupConfiguration {

private static final String PREFIX = "collector.receiver.stat";

private static final String GRPC_ENABLE = PREFIX + ".grpc";
private static final String GRPC_BIND_IP = PREFIX + ".grpc.ip";
private static final String GRPC_BIND_PORT = PREFIX + ".grpc.port";
private static final String GRPC_WORKER_THREAD_SIZE = PREFIX + ".grpc.worker.threadSize";
private static final String GRPC_WORKER_QUEUE_SIZE = PREFIX + ".grpc.worker.queueSize";
private static final String GRPC_WORKER_MONITOR_ENABLE = PREFIX + ".grpc.worker.monitor";
private static final String GRPC_KEEP_ALIVE_TIME = PREFIX + ".grpc.keepalive.time";
private static final String GRPC_KEEP_ALIVE_TIMEOUT = PREFIX + ".grpc.keepalive.timeout";
private static final String GRPC_PREFIX = "collector.receiver.grpc.stat";

private static final String GRPC_ENABLE = GRPC_PREFIX + ".enable";
private static final String GRPC_BIND_IP = GRPC_PREFIX + ".ip";
private static final String GRPC_BIND_PORT = GRPC_PREFIX + ".port";
private static final String GRPC_WORKER_THREAD_SIZE = GRPC_PREFIX + ".worker.threadSize";
private static final String GRPC_WORKER_QUEUE_SIZE = GRPC_PREFIX + ".worker.queueSize";
private static final String GRPC_WORKER_MONITOR_ENABLE = GRPC_PREFIX + ".worker.monitor";
private static final String GRPC_KEEP_ALIVE_TIME = GRPC_PREFIX + ".keepalive.time";
private static final String GRPC_KEEP_ALIVE_TIMEOUT = GRPC_PREFIX + ".keepalive.timeout";

private static final String TCP_ENABLE = PREFIX + ".tcp";
private static final String TCP_BIND_IP = PREFIX + ".tcp.ip";
Expand Down
Expand Up @@ -74,8 +74,9 @@ public void afterPropertiesSet() throws Exception {
Assert.requireNonNull(this.bindIp, "bindIp must not be null");
Assert.requireNonNull(this.addressFilter, "addressFilter must not be null");
Assert.isTrue(CollectionUtils.hasLength(this.serviceList), "serviceList must not be empty");
Assert.requireNonNull(this.serverOption, "serverOption must not be null");

this.serverFactory = new ServerFactory(beanName, this.bindIp, this.bindPort, this.executor);
this.serverFactory = new ServerFactory(beanName, this.bindIp, this.bindPort, this.executor, serverOption);
ServerTransportFilter permissionServerTransportFilter = new PermissionServerTransportFilter(addressFilter);
this.serverFactory.addTransportFilter(permissionServerTransportFilter);

Expand Down
9 changes: 5 additions & 4 deletions collector/src/main/resources/applicationContext-collector.xml
Expand Up @@ -444,7 +444,7 @@
<property name="corePoolSize" value="#{baseDataReceiverConfig.grpcWorkerThreadSize}"/>
<property name="maxPoolSize" value="#{baseDataReceiverConfig.grpcWorkerThreadSize}"/>
<property name="queueCapacity" value="#{baseDataReceiverConfig.grpcWorkerQueueSize}"/>
<property name="threadNamePrefix" value="Pinpoint-AgentServer-Worker"/>
<property name="threadNamePrefix" value="Pinpoint-GrpcAgent-Worker-"/>
<property name="registry" value="#{baseDataReceiverConfig.grpcWorkerMonitorEnable ? metricRegistry : null}"/>
</bean>

Expand All @@ -456,7 +456,7 @@

<bean id="grpcLifecycleScheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="poolSize" value="1"/>
<property name="threadNamePrefix" value="Pinpoint-LifecycleFlusher-"/>
<property name="threadNamePrefix" value="Pinpoint-GrpcLifecycleFlusher-"/>
<property name="daemon" value="true"/>
<property name="waitForTasksToCompleteOnShutdown" value="true"/>
<property name="awaitTerminationSeconds" value="10"/>
Expand Down Expand Up @@ -523,7 +523,7 @@
<property name="corePoolSize" value="#{spanReceiverConfig.grpcWorkerThreadSize}"/>
<property name="maxPoolSize" value="#{spanReceiverConfig.grpcWorkerThreadSize}"/>
<property name="queueCapacity" value="#{spanReceiverConfig.grpcWorkerQueueSize}"/>
<property name="threadNamePrefix" value="Pinpoint-TraceServer-Worker-"/>
<property name="threadNamePrefix" value="Pinpoint-GrpcSpan-Worker-"/>
<property name="registry" value="#{spanReceiverConfig.grpcWorkerMonitorEnable ? metricRegistry : null}"/>
</bean>

Expand Down Expand Up @@ -554,7 +554,7 @@
<property name="corePoolSize" value="#{statReceiverConfig.workerThreadSize}"/>
<property name="maxPoolSize" value="#{statReceiverConfig.workerThreadSize}"/>
<property name="queueCapacity" value="#{statReceiverConfig.workerQueueSize}"/>
<property name="threadNamePrefix" value="Pinpoint-Stat-Worker-"/>
<property name="threadNamePrefix" value="Pinpoint-GrpcStat-Worker-"/>
<property name="registry" value="#{statReceiverConfig.workerMonitorEnable ? metricRegistry : null}"/>
</bean>
<bean id="grpcStatServerOptionBuilder" class="com.navercorp.pinpoint.grpc.server.ServerOption.Builder">
Expand All @@ -577,6 +577,7 @@
<property name="bindableServiceList" ref="statServiceList"/>
<property name="executor" ref="grpcStatServerExecutor"/>
<property name="enable" value="#{statReceiverConfig.isGrpcEnable()}"/>
<property name="serverOption" ref="grpcStatServerOption"/>
</bean>

<!-- end receiver configuration -->
Expand Down
49 changes: 25 additions & 24 deletions collector/src/main/resources/pinpoint-collector.properties
Expand Up @@ -115,48 +115,49 @@ flink.cluster.enable=false
flink.cluster.zookeeper.address=localhost
flink.cluster.zookeeper.sessiontimeout=3000

collector.receiver.grpc=false
collector.receiver.grpc.ip=0.0.0.0
collector.receiver.grpc.port=9997
# gRPC
collector.receiver.grpc.agent.enable=false
collector.receiver.grpc.agent.ip=0.0.0.0
collector.receiver.grpc.agent.port=9997
# number of tcp worker threads
collector.receiver.grpc.worker.threadSize=8
collector.receiver.grpc.agent.worker.threadSize=8
# capacity of tcp worker queue
collector.receiver.grpc.worker.queueSize=1024
collector.receiver.grpc.agent.worker.queueSize=1024
# monitoring for tcp worker
collector.receiver.grpc.worker.monitor=true
collector.receiver.grpc.agent.worker.monitor=true
# Milliseconds, Default 5 min
# As collector.receiver.grpc.ping.interval
collector.receiver.grpc.keepalive.time=300000
collector.receiver.grpc.agent.keepalive.time=300000
# Milliseconds, Default 30 min
# As collector.receiver.grpc.pingwait.timeout
collector.receiver.grpc.keepalive.timeout=1800000
collector.receiver.grpc.agent.keepalive.timeout=1800000

collector.receiver.span.grpc=false
collector.receiver.span.grpc.ip=0.0.0.0
collector.receiver.span.grpc.port=9998
collector.receiver.grpc.span.enable=false
collector.receiver.grpc.span.ip=0.0.0.0
collector.receiver.grpc.span.port=9998
# Milliseconds, Default 5 min
collector.receiver.span.grpc.keepalive.time=300000
collector.receiver.grpc.span.keepalive.time=300000
# Milliseconds, Default 30 min
collector.receiver.span.grpc.keepalive.timeout=1800000
collector.receiver.grpc.span.keepalive.timeout=1800000
# number of span worker threads
collector.receiver.span.grpc.worker.threadSize=32
collector.receiver.grpc.span.worker.threadSize=32
# capacity of span worker queue
collector.receiver.span.grpc.worker.queueSize=256
collector.receiver.grpc.span.worker.queueSize=256
# monitoring for span worker
collector.receiver.span.grpc.worker.monitor=true
collector.receiver.grpc.span.worker.monitor=true

collector.receiver.stat.grpc=false
collector.receiver.stat.grpc.ip=0.0.0.0
collector.receiver.stat.grpc.port=9999
collector.receiver.grpc.stat.enable=false
collector.receiver.grpc.stat.ip=0.0.0.0
collector.receiver.grpc.stat.port=9999
# Milliseconds, Default 5 min
collector.receiver.stat.grpc.keepalive.time=300000
collector.receiver.grpc.stat.keepalive.time=300000
# Milliseconds, Default 30 min
collector.receiver.stat.grpc.keepalive.timeout=1800000
collector.receiver.grpc.stat.keepalive.timeout=1800000
# number of span worker threads
collector.receiver.stat.grpc.worker.threadSize=32
collector.receiver.grpc.stat.worker.threadSize=32
# capacity of span worker queue
collector.receiver.stat.grpc.worker.queueSize=256
collector.receiver.grpc.stat.worker.queueSize=256
# monitoring for span worker
collector.receiver.stat.grpc.worker.monitor=true
collector.receiver.grpc.stat.worker.monitor=true

collector.receiver.channel.properties.key=
Expand Up @@ -28,14 +28,14 @@ public class AgentBaseDataReceiverConfigurationTest {
@Test
public void properties() throws Exception {
Properties properties = new Properties();
properties.setProperty("collector.receiver.grpc", "");
properties.setProperty("collector.receiver.grpc.ip", "9.9.9.9");
properties.setProperty("collector.receiver.grpc.port", "1111");
properties.setProperty("collector.receiver.grpc.worker.threadSize", "99");
properties.setProperty("collector.receiver.grpc.worker.queueSize", "9999");
properties.setProperty("collector.receiver.grpc.worker.monitor", "false");
properties.setProperty("collector.receiver.grpc.keepalive.time", "3");
properties.setProperty("collector.receiver.grpc.keepalive.timeout", "7");
properties.setProperty("collector.receiver.grpc.agent", "");
properties.setProperty("collector.receiver.grpc.agent.ip", "9.9.9.9");
properties.setProperty("collector.receiver.grpc.agent.port", "1111");
properties.setProperty("collector.receiver.grpc.agent.worker.threadSize", "99");
properties.setProperty("collector.receiver.grpc.agent.worker.queueSize", "9999");
properties.setProperty("collector.receiver.grpc.agent.worker.monitor", "false");
properties.setProperty("collector.receiver.grpc.agent.keepalive.time", "3");
properties.setProperty("collector.receiver.grpc.agent.keepalive.timeout", "7");

AgentBaseDataReceiverConfiguration configuration = new AgentBaseDataReceiverConfiguration(properties, new DeprecatedConfiguration());
assertEquals(Boolean.FALSE, configuration.isGrpcEnable());
Expand Down
Expand Up @@ -28,14 +28,14 @@ public class SpanReceiverConfigurationTest {
@Test
public void properties() throws Exception {
Properties properties = new Properties();
properties.setProperty("collector.receiver.span.grpc", "");
properties.setProperty("collector.receiver.span.grpc.ip", "9.9.9.9");
properties.setProperty("collector.receiver.span.grpc.port", "1111");
properties.setProperty("collector.receiver.span.grpc.worker.threadSize", "99");
properties.setProperty("collector.receiver.span.grpc.worker.queueSize", "9999");
properties.setProperty("collector.receiver.span.grpc.worker.monitor", "false");
properties.setProperty("collector.receiver.span.grpc.keepalive.time", "3");
properties.setProperty("collector.receiver.span.grpc.keepalive.timeout", "7");
properties.setProperty("collector.receiver.grpc.span.enable", "");
properties.setProperty("collector.receiver.grpc.span.ip", "9.9.9.9");
properties.setProperty("collector.receiver.grpc.span.port", "1111");
properties.setProperty("collector.receiver.grpc.span.worker.threadSize", "99");
properties.setProperty("collector.receiver.grpc.span.worker.queueSize", "9999");
properties.setProperty("collector.receiver.grpc.span.worker.monitor", "false");
properties.setProperty("collector.receiver.grpc.span.keepalive.time", "3");
properties.setProperty("collector.receiver.grpc.span.keepalive.timeout", "7");

SpanReceiverConfiguration configuration = new SpanReceiverConfiguration(properties, new DeprecatedConfiguration());
assertEquals(Boolean.FALSE, configuration.isGrpcEnable());
Expand Down
Expand Up @@ -27,14 +27,14 @@ public class StatReceiverConfigurationTest {
@Test
public void properties() throws Exception {
Properties properties = new Properties();
properties.setProperty("collector.receiver.stat.grpc", "");
properties.setProperty("collector.receiver.stat.grpc.ip", "9.9.9.9");
properties.setProperty("collector.receiver.stat.grpc.port", "1111");
properties.setProperty("collector.receiver.stat.grpc.worker.threadSize", "99");
properties.setProperty("collector.receiver.stat.grpc.worker.queueSize", "9999");
properties.setProperty("collector.receiver.stat.grpc.worker.monitor", "false");
properties.setProperty("collector.receiver.stat.grpc.keepalive.time", "3");
properties.setProperty("collector.receiver.stat.grpc.keepalive.timeout", "7");
properties.setProperty("collector.receiver.grpc.stat.enable", "");
properties.setProperty("collector.receiver.grpc.stat.ip", "9.9.9.9");
properties.setProperty("collector.receiver.grpc.stat.port", "1111");
properties.setProperty("collector.receiver.grpc.stat.worker.threadSize", "99");
properties.setProperty("collector.receiver.grpc.stat.worker.queueSize", "9999");
properties.setProperty("collector.receiver.grpc.stat.worker.monitor", "false");
properties.setProperty("collector.receiver.grpc.stat.keepalive.time", "3");
properties.setProperty("collector.receiver.grpc.stat.keepalive.timeout", "7");

StatReceiverConfiguration configuration = new StatReceiverConfiguration(properties, new DeprecatedConfiguration());
assertEquals(Boolean.FALSE, configuration.isGrpcEnable());
Expand Down
Expand Up @@ -26,6 +26,7 @@
import com.navercorp.pinpoint.grpc.trace.PSqlMetaData;
import com.navercorp.pinpoint.grpc.trace.PStringMetaData;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientInterceptor;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
Expand All @@ -34,6 +35,7 @@
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.MetadataUtils;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -70,7 +72,6 @@ public AgentClientMock(final String host, final int port, final boolean agentHea
builder.intercept(headersInterceptor);
}
builder.usePlaintext();

channel = builder.build();
this.agentStub = AgentGrpc.newBlockingStub(channel);
this.metadataStub = MetadataGrpc.newBlockingStub(channel);
Expand Down
Expand Up @@ -19,10 +19,12 @@
import com.navercorp.pinpoint.collector.receiver.DispatchHandler;
import com.navercorp.pinpoint.collector.receiver.grpc.service.AgentService;
import com.navercorp.pinpoint.common.server.util.AddressFilter;
import com.navercorp.pinpoint.grpc.server.ServerOption;
import com.navercorp.pinpoint.grpc.trace.PResult;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.ServerResponse;
import io.grpc.BindableService;
import io.grpc.Status;

import java.net.InetAddress;
import java.util.Arrays;
Expand All @@ -47,6 +49,7 @@ public void run() throws Exception {
grpcReceiver.setBindableServiceList(Arrays.asList(agentService));
grpcReceiver.setAddressFilter(new MockAddressFilter());
grpcReceiver.setExecutor(Executors.newFixedThreadPool(8));
grpcReceiver.setServerOption(new ServerOption.Builder().build());

grpcReceiver.afterPropertiesSet();

Expand Down

0 comments on commit 11fd0ba

Please sign in to comment.