Skip to content

Commit

Permalink
[#4558] Change stat-collector type of Agent
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed May 3, 2019
1 parent 9c0d6dd commit e3a297a
Show file tree
Hide file tree
Showing 51 changed files with 1,567 additions and 354 deletions.
Expand Up @@ -20,6 +20,8 @@
import com.navercorp.pinpoint.profiler.monitor.AgentStatMonitor;
import com.navercorp.pinpoint.profiler.monitor.DefaultAgentStatMonitor;
import com.navercorp.pinpoint.profiler.monitor.collector.AgentStatMetricCollector;
import com.navercorp.pinpoint.profiler.monitor.metric.AgentStatMetricSnapshot;
import com.navercorp.pinpoint.profiler.monitor.metric.AgentStatMetricSnapshotBatch;
import com.navercorp.pinpoint.profiler.sender.DataSender;
import com.navercorp.pinpoint.test.ListenableDataSender;
import com.navercorp.pinpoint.test.TBaseRecorder;
Expand All @@ -43,18 +45,18 @@ public class AgentStatMonitorTest {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private TBaseRecorder<TAgentStatBatch> tBaseRecorder;
private TBaseRecorder<AgentStatMetricSnapshotBatch> tBaseRecorder;
private DataSender dataSender;

@Mock
private AgentStatMetricCollector<TAgentStat> agentStatCollector;
private AgentStatMetricCollector<AgentStatMetricSnapshot> agentStatCollector;

@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
when(agentStatCollector.collect()).thenReturn(new TAgentStat());
when(agentStatCollector.collect()).thenReturn(new AgentStatMetricSnapshot());

this.tBaseRecorder = new TBaseRecorder<TAgentStatBatch>();
this.tBaseRecorder = new TBaseRecorder<AgentStatMetricSnapshotBatch>();
TBaseRecorderAdaptor recorderAdaptor = new TBaseRecorderAdaptor(tBaseRecorder);

ListenableDataSender listenableDataSender = new ListenableDataSender("testDataSender");
Expand All @@ -77,7 +79,7 @@ public void testAgentStatMonitor() throws InterruptedException {
monitor.stop();
// Then
assertTrue(tBaseRecorder.size() >= minNumBatchToTest);
for (TAgentStatBatch agentStatBatch : tBaseRecorder) {
for (AgentStatMetricSnapshotBatch agentStatBatch : tBaseRecorder) {
logger.debug("agentStatBatch:{}", agentStatBatch);
assertTrue(agentStatBatch.getAgentStats().size() <= numCollectionsPerBatch);
}
Expand Down
Expand Up @@ -38,6 +38,7 @@
import com.navercorp.pinpoint.profiler.context.provider.TcpDataSenderProvider;
import com.navercorp.pinpoint.profiler.context.provider.grpc.GrpcSpanProcessorProvider;
import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter;
import com.navercorp.pinpoint.profiler.context.thrift.StatThriftMessageConverterProvider;
import com.navercorp.pinpoint.profiler.receiver.CommandDispatcher;
import com.navercorp.pinpoint.profiler.sender.DataSender;
import com.navercorp.pinpoint.profiler.sender.EnhancedDataSender;
Expand Down Expand Up @@ -87,6 +88,10 @@ protected void configure() {
Key<MessageConverter<TBase<?, ?>>> metadataMessageConverterKey = Key.get(thriftMessageConverter, MetadataConverter.class);
bind(metadataMessageConverterKey).toProvider(MetadataMessageConverterProvider.class ).in(Scopes.SINGLETON);

// Stat Thrift Converter
TypeLiteral<MessageConverter<TBase<?, ?>>> statMessageConverter = new TypeLiteral<MessageConverter<TBase<?, ?>>>() {};
Key<MessageConverter<TBase<?, ?>>> statMessageConverterKey = Key.get(statMessageConverter, StatConverter.class);
bind(statMessageConverterKey).toProvider(StatThriftMessageConverterProvider.class ).in(Scopes.SINGLETON);

Key<DataSender> spanDataSender = Key.get(DataSender.class, SpanDataSender.class);
bind(spanDataSender).toProvider(GrpcDataSenderProvider.class).in(Scopes.SINGLETON);
Expand Down
@@ -0,0 +1,34 @@
/*
* Copyright 2019 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.profiler.context.module;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

/**
* @author jaehong.kim
*/
@BindingAnnotation
@Target(PARAMETER)
@Retention(RUNTIME)
public @interface StatConverter {
}
Expand Up @@ -36,6 +36,7 @@
import com.navercorp.pinpoint.profiler.context.thrift.DefaultTransactionIdEncoder;
import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter;
import com.navercorp.pinpoint.profiler.context.thrift.SpanThriftMessageConverterProvider;
import com.navercorp.pinpoint.profiler.context.thrift.StatThriftMessageConverterProvider;
import com.navercorp.pinpoint.profiler.receiver.CommandDispatcher;
import com.navercorp.pinpoint.profiler.sender.DataSender;
import com.navercorp.pinpoint.profiler.sender.EnhancedDataSender;
Expand Down Expand Up @@ -86,6 +87,11 @@ protected void configure() {
// expose(metadataMessageConverterKey);


// Stat Thrift Converter
TypeLiteral<MessageConverter<TBase<?, ?>>> statMessageConverter = new TypeLiteral<MessageConverter<TBase<?, ?>>>() {};
Key<MessageConverter<TBase<?, ?>>> statMessageConverterKey = Key.get(statMessageConverter, StatConverter.class);
bind(statMessageConverterKey).toProvider(StatThriftMessageConverterProvider.class ).in(Scopes.SINGLETON);

Key<DataSender> spanDataSender = Key.get(DataSender.class, SpanDataSender.class);
bind(spanDataSender).toProvider(SpanDataSenderProvider.class).in(Scopes.SINGLETON);
expose(spanDataSender);
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Names;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceHistogram;
import com.navercorp.pinpoint.profiler.context.provider.stat.activethread.ActiveTraceMetricCollectorProvider;
import com.navercorp.pinpoint.profiler.context.provider.stat.buffer.BufferMetricCollectorProvider;
import com.navercorp.pinpoint.profiler.context.provider.stat.cpu.CpuLoadMetricCollectorProvider;
Expand All @@ -31,6 +32,15 @@
import com.navercorp.pinpoint.profiler.context.provider.stat.transaction.TransactionMetricCollectorProvider;
import com.navercorp.pinpoint.profiler.monitor.collector.AgentStatCollector;
import com.navercorp.pinpoint.profiler.monitor.collector.AgentStatMetricCollector;
import com.navercorp.pinpoint.profiler.monitor.metric.AgentStatMetricSnapshot;
import com.navercorp.pinpoint.profiler.monitor.metric.JvmGcMetricSnapshot;
import com.navercorp.pinpoint.profiler.monitor.metric.buffer.BufferMetricSnapshot;
import com.navercorp.pinpoint.profiler.monitor.metric.cpu.CpuLoadMetricSnapshot;
import com.navercorp.pinpoint.profiler.monitor.metric.datasource.DataSourceMetricSnapshot;
import com.navercorp.pinpoint.profiler.monitor.metric.deadlock.DeadlockMetricSnapshot;
import com.navercorp.pinpoint.profiler.monitor.metric.filedescriptor.FileDescriptorMetricSnapshot;
import com.navercorp.pinpoint.profiler.monitor.metric.response.ResponseTimeValue;
import com.navercorp.pinpoint.profiler.monitor.metric.transaction.TransactionMetricSnapshot;
import com.navercorp.pinpoint.thrift.dto.TActiveTrace;
import com.navercorp.pinpoint.thrift.dto.TAgentStat;
import com.navercorp.pinpoint.thrift.dto.TCpuLoad;
Expand All @@ -54,43 +64,43 @@ protected void configure() {
binder().disableCircularProxies();

// gc
TypeLiteral<AgentStatMetricCollector<TJvmGc>> jvmGcCollector = new TypeLiteral<AgentStatMetricCollector<TJvmGc>>() {};
TypeLiteral<AgentStatMetricCollector<JvmGcMetricSnapshot>> jvmGcCollector = new TypeLiteral<AgentStatMetricCollector<JvmGcMetricSnapshot>>() {};
bind(jvmGcCollector).toProvider(JvmGcMetricCollectorProvider.class).in(Scopes.SINGLETON);

// cpu
TypeLiteral<AgentStatMetricCollector<TCpuLoad>> cpuLoadCollector = new TypeLiteral<AgentStatMetricCollector<TCpuLoad>>() {};
TypeLiteral<AgentStatMetricCollector<CpuLoadMetricSnapshot>> cpuLoadCollector = new TypeLiteral<AgentStatMetricCollector<CpuLoadMetricSnapshot>>() {};
bind(cpuLoadCollector).toProvider(CpuLoadMetricCollectorProvider.class).in(Scopes.SINGLETON);

// FD
TypeLiteral<AgentStatMetricCollector<TFileDescriptor>> fdCollector = new TypeLiteral<AgentStatMetricCollector<TFileDescriptor>>() {};
TypeLiteral<AgentStatMetricCollector<FileDescriptorMetricSnapshot>> fdCollector = new TypeLiteral<AgentStatMetricCollector<FileDescriptorMetricSnapshot>>() {};
bind(fdCollector).toProvider(FileDescriptorMetricCollectorProvider.class).in(Scopes.SINGLETON);

// buffer
TypeLiteral<AgentStatMetricCollector<TDirectBuffer>> bufferCollector = new TypeLiteral<AgentStatMetricCollector<TDirectBuffer>>() {};
TypeLiteral<AgentStatMetricCollector<BufferMetricSnapshot>> bufferCollector = new TypeLiteral<AgentStatMetricCollector<BufferMetricSnapshot>>() {};
bind(bufferCollector).toProvider(BufferMetricCollectorProvider.class).in(Scopes.SINGLETON);

// transaction
TypeLiteral<AgentStatMetricCollector<TTransaction>> transactionCollector = new TypeLiteral<AgentStatMetricCollector<TTransaction>>() {};
TypeLiteral<AgentStatMetricCollector<TransactionMetricSnapshot>> transactionCollector = new TypeLiteral<AgentStatMetricCollector<TransactionMetricSnapshot>>() {};
bind(transactionCollector).toProvider(TransactionMetricCollectorProvider.class).in(Scopes.SINGLETON);

// activeTrace
TypeLiteral<AgentStatMetricCollector<TActiveTrace>> activeTraceCollector = new TypeLiteral<AgentStatMetricCollector<TActiveTrace>>() {};
TypeLiteral<AgentStatMetricCollector<ActiveTraceHistogram>> activeTraceCollector = new TypeLiteral<AgentStatMetricCollector<ActiveTraceHistogram>>() {};
bind(activeTraceCollector).toProvider(ActiveTraceMetricCollectorProvider.class).in(Scopes.SINGLETON);

// responseTime
TypeLiteral<AgentStatMetricCollector<TResponseTime>> responseTimeCollector = new TypeLiteral<AgentStatMetricCollector<TResponseTime>>() {};
TypeLiteral<AgentStatMetricCollector<ResponseTimeValue>> responseTimeCollector = new TypeLiteral<AgentStatMetricCollector<ResponseTimeValue>>() {};
bind(responseTimeCollector).toProvider(ResponseTimeMetricCollectorProvider.class).in(Scopes.SINGLETON);

// datasource
TypeLiteral<AgentStatMetricCollector<TDataSourceList>> datasourceCollector = new TypeLiteral<AgentStatMetricCollector<TDataSourceList>>() {};
TypeLiteral<AgentStatMetricCollector<DataSourceMetricSnapshot>> datasourceCollector = new TypeLiteral<AgentStatMetricCollector<DataSourceMetricSnapshot>>() {};
bind(datasourceCollector).toProvider(DataSourceMetricCollectorProvider.class).in(Scopes.SINGLETON);

// deadlock
TypeLiteral<AgentStatMetricCollector<TDeadlock>> deadlockCollector = new TypeLiteral<AgentStatMetricCollector<TDeadlock>>() {};
TypeLiteral<AgentStatMetricCollector<DeadlockMetricSnapshot>> deadlockCollector = new TypeLiteral<AgentStatMetricCollector<DeadlockMetricSnapshot>>() {};
bind(deadlockCollector).toProvider(DeadlockMetricCollectorProvider.class).in(Scopes.SINGLETON);

// stat
TypeLiteral<AgentStatMetricCollector<TAgentStat>> statMetric = new TypeLiteral<AgentStatMetricCollector<TAgentStat>>() {};
TypeLiteral<AgentStatMetricCollector<AgentStatMetricSnapshot>> statMetric = new TypeLiteral<AgentStatMetricCollector<AgentStatMetricSnapshot>>() {};
bind(statMetric).annotatedWith(Names.named("AgentStatCollector"))
.to(AgentStatCollector.class).in(Scopes.SINGLETON);
}
Expand Down
Expand Up @@ -25,7 +25,9 @@
import com.navercorp.pinpoint.profiler.context.module.SpanStatClientFactory;
import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter;
import com.navercorp.pinpoint.profiler.sender.DataSender;
import com.navercorp.pinpoint.profiler.sender.MessageSerializer;
import com.navercorp.pinpoint.profiler.sender.TcpDataSender;
import com.navercorp.pinpoint.profiler.sender.ThriftMessageSerializer;
import com.navercorp.pinpoint.profiler.sender.UdpDataSenderFactory;
import com.navercorp.pinpoint.rpc.client.PinpointClientFactory;
import org.apache.thrift.TBase;
Expand Down Expand Up @@ -77,7 +79,8 @@ public DataSender get() {
}

PinpointClientFactory pinpointClientFactory = clientFactoryProvider.get();
return new TcpDataSender("SpanDataSender", ip, port, pinpointClientFactory);
MessageSerializer<byte[]> messageSerializer = new ThriftMessageSerializer(messageConverter);
return new TcpDataSender("SpanDataSender", ip, port, pinpointClientFactory, messageSerializer);
} else {
UdpDataSenderFactory factory = new UdpDataSenderFactory(ip, port, UDP_EXECUTOR_NAME, writeQueueSize, timeout, sendBufferSize, messageConverter);
return factory.create(ioType);
Expand Down
Expand Up @@ -22,10 +22,13 @@
import com.navercorp.pinpoint.bootstrap.config.ProfilerConfig;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.profiler.context.module.SpanStatClientFactory;
import com.navercorp.pinpoint.profiler.context.module.StatConverter;
import com.navercorp.pinpoint.profiler.context.thrift.BypassMessageConverter;
import com.navercorp.pinpoint.profiler.context.thrift.MessageConverter;
import com.navercorp.pinpoint.profiler.sender.DataSender;
import com.navercorp.pinpoint.profiler.sender.MessageSerializer;
import com.navercorp.pinpoint.profiler.sender.TcpDataSender;
import com.navercorp.pinpoint.profiler.sender.ThriftMessageSerializer;
import com.navercorp.pinpoint.profiler.sender.UdpDataSenderFactory;
import com.navercorp.pinpoint.rpc.client.PinpointClientFactory;
import org.apache.thrift.TBase;
Expand Down Expand Up @@ -54,7 +57,7 @@ public class StatDataSenderProvider implements Provider<DataSender> {
private final MessageConverter<TBase<?, ?>> messageConverter;

@Inject
public StatDataSenderProvider(ProfilerConfig profilerConfig, @SpanStatClientFactory Provider<PinpointClientFactory> clientFactoryProvider) {
public StatDataSenderProvider(ProfilerConfig profilerConfig, @SpanStatClientFactory Provider<PinpointClientFactory> clientFactoryProvider, @StatConverter MessageConverter<TBase<?, ?>> messageConverter) {
Assert.requireNonNull(profilerConfig, "profilerConfig must not be null");

this.clientFactoryProvider = Assert.requireNonNull(clientFactoryProvider, "clientFactoryProvider must not be null");
Expand All @@ -67,8 +70,7 @@ public StatDataSenderProvider(ProfilerConfig profilerConfig, @SpanStatClientFact
this.sendBufferSize = thriftTransportConfig.getStatDataSenderSocketSendBufferSize();
this.ioType = thriftTransportConfig.getStatDataSenderSocketType();
this.transportType = thriftTransportConfig.getStatDataSenderTransportType();

this.messageConverter = new BypassMessageConverter<TBase<?, ?>>();
this.messageConverter = messageConverter;
}

@Override
Expand All @@ -79,7 +81,8 @@ public DataSender get() {
}

PinpointClientFactory pinpointClientFactory = clientFactoryProvider.get();
return new TcpDataSender("StatDataSender", ip, port, pinpointClientFactory);
MessageSerializer<byte[]> messageSerializer = new ThriftMessageSerializer(messageConverter);
return new TcpDataSender("StatDataSender", ip, port, pinpointClientFactory, messageSerializer);
} else {
UdpDataSenderFactory factory = new UdpDataSenderFactory(ip, port, UDP_EXECUTOR_NAME, writeQueueSize, timeout, sendBufferSize, messageConverter);
return factory.create(ioType);
Expand Down
Expand Up @@ -19,16 +19,16 @@
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.profiler.context.active.ActiveTraceHistogram;
import com.navercorp.pinpoint.profiler.monitor.collector.AgentStatMetricCollector;
import com.navercorp.pinpoint.profiler.monitor.collector.UnsupportedMetricCollector;
import com.navercorp.pinpoint.profiler.monitor.collector.activethread.DefaultActiveTraceMetricCollector;
import com.navercorp.pinpoint.profiler.monitor.metric.activethread.ActiveTraceMetric;
import com.navercorp.pinpoint.thrift.dto.TActiveTrace;

/**
* @author HyunGil Jeong
*/
public class ActiveTraceMetricCollectorProvider implements Provider<AgentStatMetricCollector<TActiveTrace>> {
public class ActiveTraceMetricCollectorProvider implements Provider<AgentStatMetricCollector<ActiveTraceHistogram>> {

private final ActiveTraceMetric activeTraceMetric;

Expand All @@ -38,9 +38,9 @@ public ActiveTraceMetricCollectorProvider(ActiveTraceMetric activeTraceMetric) {
}

@Override
public AgentStatMetricCollector<TActiveTrace> get() {
public AgentStatMetricCollector<ActiveTraceHistogram> get() {
if (activeTraceMetric == ActiveTraceMetric.UNSUPPORTED_ACTIVE_TRACE_METRIC) {
return new UnsupportedMetricCollector<TActiveTrace>();
return new UnsupportedMetricCollector<ActiveTraceHistogram>();
}
return new DefaultActiveTraceMetricCollector(activeTraceMetric);
}
Expand Down
Expand Up @@ -23,13 +23,12 @@
import com.navercorp.pinpoint.profiler.monitor.collector.UnsupportedMetricCollector;
import com.navercorp.pinpoint.profiler.monitor.collector.buffer.DefaultBufferMetricCollector;
import com.navercorp.pinpoint.profiler.monitor.metric.buffer.BufferMetric;
import com.navercorp.pinpoint.thrift.dto.TDirectBuffer;

import com.navercorp.pinpoint.profiler.monitor.metric.buffer.BufferMetricSnapshot;

/**
* @author Roy Kim
*/
public class BufferMetricCollectorProvider implements Provider<AgentStatMetricCollector<TDirectBuffer>> {
public class BufferMetricCollectorProvider implements Provider<AgentStatMetricCollector<BufferMetricSnapshot>> {

private final BufferMetric bufferMetric;

Expand All @@ -39,10 +38,10 @@ public BufferMetricCollectorProvider(BufferMetric bufferMetric) {
}

@Override
public AgentStatMetricCollector<TDirectBuffer> get() {
public AgentStatMetricCollector<BufferMetricSnapshot> get() {
if (bufferMetric == BufferMetric.UNSUPPORTED_BUFFER_METRIC) {
return new UnsupportedMetricCollector<TDirectBuffer>();
return new UnsupportedMetricCollector<BufferMetricSnapshot>();
}
return new DefaultBufferMetricCollector(bufferMetric);
}
}
}
Expand Up @@ -23,12 +23,12 @@
import com.navercorp.pinpoint.profiler.monitor.collector.UnsupportedMetricCollector;
import com.navercorp.pinpoint.profiler.monitor.collector.cpu.DefaultCpuLoadMetricCollector;
import com.navercorp.pinpoint.profiler.monitor.metric.cpu.CpuLoadMetric;
import com.navercorp.pinpoint.thrift.dto.TCpuLoad;
import com.navercorp.pinpoint.profiler.monitor.metric.cpu.CpuLoadMetricSnapshot;

/**
* @author HyunGil Jeong
*/
public class CpuLoadMetricCollectorProvider implements Provider<AgentStatMetricCollector<TCpuLoad>> {
public class CpuLoadMetricCollectorProvider implements Provider<AgentStatMetricCollector<CpuLoadMetricSnapshot>> {

private final CpuLoadMetric cpuLoadMetric;

Expand All @@ -38,9 +38,9 @@ public CpuLoadMetricCollectorProvider(CpuLoadMetric cpuLoadMetric) {
}

@Override
public AgentStatMetricCollector<TCpuLoad> get() {
public AgentStatMetricCollector<CpuLoadMetricSnapshot> get() {
if (cpuLoadMetric == CpuLoadMetric.UNSUPPORTED_CPU_LOAD_METRIC) {
return new UnsupportedMetricCollector<TCpuLoad>();
return new UnsupportedMetricCollector<CpuLoadMetricSnapshot>();
}
return new DefaultCpuLoadMetricCollector(cpuLoadMetric);
}
Expand Down

0 comments on commit e3a297a

Please sign in to comment.