Skip to content

Commit

Permalink
[#4558] Add gRPC option
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Mar 8, 2019
1 parent 82bb31f commit 655e85d
Show file tree
Hide file tree
Showing 42 changed files with 1,186 additions and 1,027 deletions.
Expand Up @@ -27,50 +27,63 @@
public final class AgentBaseDataReceiverConfiguration {

private static final String PREFIX = "collector.receiver.base";
private static final String BIND_IP = PREFIX + ".ip";
private static final String BIND_PORT = PREFIX + ".port";
private static final String WORKER_THREAD_SIZE = PREFIX + ".worker.threadSize";
private static final String WORKER_QUEUE_SIZE = PREFIX + ".worker.queueSize";
private static final String WORKER_MONITOR_ENABLE = PREFIX + ".worker.monitor";

private static final String GRPC_PREFIX = "collector.receiver.grpc";
private static final String GRPC_BIND_IP = GRPC_PREFIX + ".ip";
private static final String GRPC_BIND_PORT = GRPC_PREFIX + ".port";
private static final String GRPC_WORKER_THREAD_SIZE = GRPC_PREFIX + ".worker.threadSize";
private static final String GRPC_WORKER_QUEUE_SIZE = GRPC_PREFIX + ".worker.queueSize";
private static final String GRPC_WORKER_MONITOR_ENABLE = GRPC_PREFIX + ".worker.monitor";
private static final String GRPC_KEEP_ALIVE_TIME = GRPC_PREFIX + ".keepalive.time";
private static final String GRPC_KEEP_ALIVE_TIMEOUT = GRPC_PREFIX + ".keepalive.timeout";

private static final String BIND_IP = PREFIX + ".ip";
private final String bindIp;
private static final String BIND_PORT = PREFIX + ".port";
private final int bindPort;

private static final String WORKER_THREAD_SIZE = PREFIX + ".worker.threadSize";
private final int workerThreadSize;
private static final String WORKER_QUEUE_SIZE = PREFIX + ".worker.queueSize";
private final int workerQueueSize;

private static final String WORKER_MONITOR_ENABLE = PREFIX + ".worker.monitor";
private final boolean workerMonitorEnable;

private final boolean grpcEnable;
private final String grpcBindIp;
private final int grpcBindPort;
private final int grpcWorkerThreadSize;
private final int grpcWorkerQueueSize;
private final boolean grpcWorkerMonitorEnable;
private final long grpcKeepAliveTime;
private final long grpcKeepAliveTimeout;

public AgentBaseDataReceiverConfiguration(Properties properties, DeprecatedConfiguration deprecatedConfiguration) {
Objects.requireNonNull(properties, "properties must not be null");
Objects.requireNonNull(deprecatedConfiguration, "deprecatedConfiguration must not be null");

this.bindIp = getBindIp(properties, deprecatedConfiguration, CollectorConfiguration.DEFAULT_LISTEN_IP);
Objects.requireNonNull(bindIp);

this.bindPort = getBindPort(properties, deprecatedConfiguration, 9994);
Assert.isTrue(bindPort > 0, "bindPort must be greater than 0");

this.workerThreadSize = getWorkerThreadSize(properties, deprecatedConfiguration, 128);
Assert.isTrue(workerThreadSize > 0, "workerThreadSize must be greater than 0");

this.workerQueueSize = getWorkerQueueSize(properties, deprecatedConfiguration, 1024 * 5);
Assert.isTrue(workerQueueSize > 0, "workerQueueSize must be greater than 0");

this.workerMonitorEnable = isWorkerThreadMonitorEnable(properties, deprecatedConfiguration);

this.grpcEnable = getGrpcEnable(properties, false);
this.grpcBindIp = getGrpcBindIp(properties, CollectorConfiguration.DEFAULT_LISTEN_IP);
// gRPC
this.grpcEnable = CollectorConfiguration.readBoolean(properties, GRPC_PREFIX);
this.grpcBindIp = CollectorConfiguration.readString(properties, GRPC_BIND_IP, CollectorConfiguration.DEFAULT_LISTEN_IP);
Objects.requireNonNull(grpcBindIp);
this.grpcBindPort = getGrpcBindPort(properties, 9997);
this.grpcBindPort = CollectorConfiguration.readInt(properties, GRPC_BIND_PORT, 9997);
Assert.isTrue(grpcBindPort > 0, "grpcBindPort must be greater than 0");
this.grpcWorkerThreadSize =CollectorConfiguration.readInt(properties, GRPC_WORKER_THREAD_SIZE, 128);
Assert.isTrue(grpcWorkerThreadSize > 0, "grpcWorkerThreadSize must be greater than 0");
this.grpcWorkerQueueSize =CollectorConfiguration.readInt(properties, GRPC_WORKER_QUEUE_SIZE, 1024 * 5);
Assert.isTrue(grpcWorkerQueueSize > 0, "grpcWorkerQueueSize must be greater than 0");
this.grpcWorkerMonitorEnable = CollectorConfiguration.readBoolean(properties, GRPC_WORKER_MONITOR_ENABLE);
this.grpcKeepAliveTime = CollectorConfiguration.readLong(properties, GRPC_KEEP_ALIVE_TIME, 300000L);
this.grpcKeepAliveTimeout = CollectorConfiguration.readLong(properties, GRPC_KEEP_ALIVE_TIMEOUT, 1800000L);
}

private String getBindIp(Properties properties, DeprecatedConfiguration deprecatedConfiguration, String defaultValue) {
Expand All @@ -97,27 +110,6 @@ private int getBindPort(Properties properties, DeprecatedConfiguration deprecate
return defaultValue;
}

private boolean getGrpcEnable(Properties properties, boolean defaultValue) {
if (properties.containsKey(GRPC_PREFIX)) {
return CollectorConfiguration.readBoolean(properties, GRPC_PREFIX);
}
return defaultValue;
}

private String getGrpcBindIp(Properties properties, String defaultValue) {
if (properties.containsKey(GRPC_BIND_IP)) {
return CollectorConfiguration.readString(properties, GRPC_BIND_IP, null);
}
return defaultValue;
}

private int getGrpcBindPort(Properties properties, int defaultValue) {
if (properties.containsKey(GRPC_BIND_PORT)) {
return CollectorConfiguration.readInt(properties, GRPC_BIND_PORT, -1);
}
return defaultValue;
}

private int getWorkerThreadSize(Properties properties, DeprecatedConfiguration deprecatedConfiguration, int defaultValue) {
if (properties.containsKey(WORKER_THREAD_SIZE)) {
return CollectorConfiguration.readInt(properties, WORKER_THREAD_SIZE, -1);
Expand Down Expand Up @@ -186,6 +178,26 @@ public boolean isGrpcEnable() {
return grpcEnable;
}

public int getGrpcWorkerThreadSize() {
return grpcWorkerThreadSize;
}

public int getGrpcWorkerQueueSize() {
return grpcWorkerQueueSize;
}

public boolean isGrpcWorkerMonitorEnable() {
return grpcWorkerMonitorEnable;
}

public long getGrpcKeepAliveTime() {
return grpcKeepAliveTime;
}

public long getGrpcKeepAliveTimeout() {
return grpcKeepAliveTimeout;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("AgentBaseDataReceiverConfiguration{");
Expand All @@ -194,8 +206,15 @@ public String toString() {
sb.append(", workerThreadSize=").append(workerThreadSize);
sb.append(", workerQueueSize=").append(workerQueueSize);
sb.append(", workerMonitorEnable=").append(workerMonitorEnable);
sb.append(", grpcEnable=").append(grpcEnable);
sb.append(", grpcBindIp='").append(grpcBindIp).append('\'');
sb.append(", grpcBindPort=").append(grpcBindPort);
sb.append(", grpcWorkerThreadSize=").append(grpcWorkerThreadSize);
sb.append(", grpcWorkerQueueSize=").append(grpcWorkerQueueSize);
sb.append(", grpcWorkerMonitorEnable=").append(grpcWorkerMonitorEnable);
sb.append(", grpcKeepAliveTime=").append(grpcKeepAliveTime);
sb.append(", grpcKeepAliveTimeout=").append(grpcKeepAliveTimeout);
sb.append('}');
return sb.toString();
}

}
Expand Up @@ -31,34 +31,42 @@ public final class SpanReceiverConfiguration implements DataReceiverGroupConfigu
private static final String GRPC_ENABLE = PREFIX + ".grpc";
private static final String GRPC_BIND_IP = PREFIX + ".grpc.ip";
private static final String GRPC_BIND_PORT = PREFIX + ".grpc.port";
private static final String GRPC_WORKER_THREAD_SIZE = PREFIX + ".grpc.worker.threadSize";
private static final String GRPC_WORKER_QUEUE_SIZE = PREFIX + ".grpc.worker.queueSize";
private static final String GRPC_WORKER_MONITOR_ENABLE = PREFIX + ".grpc.worker.monitor";
private static final String GRPC_KEEP_ALIVE_TIME = PREFIX + ".grpc.keepalive.time";
private static final String GRPC_KEEP_ALIVE_TIMEOUT = PREFIX + ".grpc.keepalive.timeout";

private static final String TCP_ENABLE = PREFIX + ".tcp";
private final boolean isTcpEnable;
private static final String TCP_BIND_IP = PREFIX + ".tcp.ip";
private final String tcpBindIp;
private static final String TCP_BIND_PORT = PREFIX + ".tcp.port";
private final int tcpBindPort;

private static final String UDP_ENABLE = PREFIX + ".udp";
private final boolean isUdpEnable;
private static final String UDP_BIND_IP = PREFIX + ".udp.ip";
private final String udpBindIp;
private static final String UDP_BIND_PORT = PREFIX + ".udp.port";
private final int udpBindPort;
private static final String UDP_RECEIVE_BUFFER_SIZE = PREFIX + ".udp.receiveBufferSize";
private final int udpReceiveBufferSize;

private static final String WORKER_THREAD_SIZE = PREFIX + ".worker.threadSize";
private final int workerThreadSize;
private static final String WORKER_QUEUE_SIZE = PREFIX + ".worker.queueSize";
private final int workerQueueSize;

private static final String WORKER_MONITOR_ENABLE = PREFIX + ".worker.monitor";

private final boolean isTcpEnable;
private final String tcpBindIp;
private final int tcpBindPort;
private final boolean isUdpEnable;
private final String udpBindIp;
private final int udpBindPort;
private final int udpReceiveBufferSize;
private final int workerThreadSize;
private final int workerQueueSize;
private final boolean workerMonitorEnable;

private final boolean isGrpcEnable;
private final String grpcBindIp;
private final int grpcBindPort;
private final int grpcWorkerThreadSize;
private final int grpcWorkerQueueSize;
private final boolean grpcWorkerMonitorEnable;
private final long grpcKeepAliveTime;
private final long grpcKeepAliveTimeout;

public SpanReceiverConfiguration(Properties properties, DeprecatedConfiguration deprecatedConfiguration) {
Objects.requireNonNull(properties, "properties must not be null");
Expand All @@ -72,18 +80,24 @@ public SpanReceiverConfiguration(Properties properties, DeprecatedConfiguration
this.udpBindIp = getUdpBindIp(properties, deprecatedConfiguration, CollectorConfiguration.DEFAULT_LISTEN_IP);
this.udpBindPort = getUdpBindPort(properties, deprecatedConfiguration, 9996);
this.udpReceiveBufferSize = getUdpReceiveBufferSize(properties, deprecatedConfiguration, 1024 * 4096);

this.isGrpcEnable = isGrpcEnable(properties, false);
this.grpcBindIp = CollectorConfiguration.readString(properties, GRPC_BIND_IP, CollectorConfiguration.DEFAULT_LISTEN_IP);
this.grpcBindPort = CollectorConfiguration.readInt(properties, GRPC_BIND_PORT, -1);

this.workerThreadSize = getWorkerThreadSize(properties, deprecatedConfiguration, 256);
Assert.isTrue(workerThreadSize > 0, "workerThreadSize must be greater than 0");
this.workerQueueSize = getWorkerQueueSize(properties, deprecatedConfiguration, 1024 * 5);
Assert.isTrue(workerQueueSize > 0, "workerQueueSize must be greater than 0");

this.workerMonitorEnable = isWorkerThreadMonitorEnable(properties, deprecatedConfiguration);

// gRPC
this.isGrpcEnable = CollectorConfiguration.readBoolean(properties, GRPC_ENABLE);
this.grpcBindIp = CollectorConfiguration.readString(properties, GRPC_BIND_IP, CollectorConfiguration.DEFAULT_LISTEN_IP);
this.grpcBindPort = CollectorConfiguration.readInt(properties, GRPC_BIND_PORT, 9998);
this.grpcWorkerThreadSize = CollectorConfiguration.readInt(properties, GRPC_WORKER_THREAD_SIZE, 128);
Assert.isTrue(grpcWorkerThreadSize > 0, "grpcWorkerThreadSize must be greater than 0");
this.grpcWorkerQueueSize = CollectorConfiguration.readInt(properties, GRPC_WORKER_QUEUE_SIZE, 1024 * 5);
Assert.isTrue(grpcWorkerQueueSize > 0, "grpcWorkerQueueSize must be greater than 0");
this.grpcWorkerMonitorEnable = CollectorConfiguration.readBoolean(properties, GRPC_WORKER_MONITOR_ENABLE);
this.grpcKeepAliveTime = CollectorConfiguration.readLong(properties, GRPC_KEEP_ALIVE_TIME, 300000L);
this.grpcKeepAliveTimeout = CollectorConfiguration.readLong(properties, GRPC_KEEP_ALIVE_TIMEOUT, 1800000L);

validate();
}

Expand Down Expand Up @@ -147,14 +161,6 @@ private int getUdpReceiveBufferSize(Properties properties, DeprecatedConfigurati
return defaultValue;
}

private boolean isGrpcEnable(Properties properties, boolean defaultValue) {
if (properties.containsKey(GRPC_ENABLE)) {
return CollectorConfiguration.readBoolean(properties, GRPC_ENABLE);
}

return defaultValue;
}

private int getWorkerThreadSize(Properties properties, DeprecatedConfiguration deprecatedConfiguration, int defaultValue) {
if (properties.containsKey(WORKER_THREAD_SIZE)) {
return CollectorConfiguration.readInt(properties, WORKER_THREAD_SIZE, -1);
Expand Down Expand Up @@ -256,9 +262,29 @@ public int getGrpcBindPort() {
return grpcBindPort;
}

public int getGrpcWorkerThreadSize() {
return grpcWorkerThreadSize;
}

public int getGrpcWorkerQueueSize() {
return grpcWorkerQueueSize;
}

public boolean isGrpcWorkerMonitorEnable() {
return grpcWorkerMonitorEnable;
}

public long getGrpcKeepAliveTime() {
return grpcKeepAliveTime;
}

public long getGrpcKeepAliveTimeout() {
return grpcKeepAliveTimeout;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("SpanReceiverConfig{");
final StringBuilder sb = new StringBuilder("SpanReceiverConfiguration{");
sb.append("isTcpEnable=").append(isTcpEnable);
sb.append(", tcpBindIp='").append(tcpBindIp).append('\'');
sb.append(", tcpBindPort=").append(tcpBindPort);
Expand All @@ -269,6 +295,14 @@ public String toString() {
sb.append(", workerThreadSize=").append(workerThreadSize);
sb.append(", workerQueueSize=").append(workerQueueSize);
sb.append(", workerMonitorEnable=").append(workerMonitorEnable);
sb.append(", isGrpcEnable=").append(isGrpcEnable);
sb.append(", grpcBindIp='").append(grpcBindIp).append('\'');
sb.append(", grpcBindPort=").append(grpcBindPort);
sb.append(", grpcWorkerThreadSize=").append(grpcWorkerThreadSize);
sb.append(", grpcWorkerQueueSize=").append(grpcWorkerQueueSize);
sb.append(", grpcWorkerMonitorEnable=").append(grpcWorkerMonitorEnable);
sb.append(", grpcKeepAliveTime=").append(grpcKeepAliveTime);
sb.append(", grpcKeepAliveTimeout=").append(grpcKeepAliveTimeout);
sb.append('}');
return sb.toString();
}
Expand Down

0 comments on commit 655e85d

Please sign in to comment.