Skip to content

Commit

Permalink
[#noissue] Adding Logger for Kafka Callback
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Feb 6, 2023
1 parent 97ded91 commit 3529785
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import com.navercorp.pinpoint.metric.common.model.MetricTagKey;
import com.navercorp.pinpoint.metric.common.model.StringPrecondition;
import com.navercorp.pinpoint.metric.common.model.mybatis.TagListTypeHandler;
import com.navercorp.pinpoint.metric.common.util.KafkaCallbacks;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Repository;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.List;
import java.util.Objects;
Expand All @@ -46,6 +50,9 @@ public class PinotMetricTagDao implements MetricTagDao {
private final TagListTypeHandler tagListTypeHandler = new TagListTypeHandler();
private final String topic;

private final ListenableFutureCallback<SendResult<String, MetricJsonTag>> resultCallback
= KafkaCallbacks.loggingCallback("Kafka(MetricJsonTag)", logger);

public PinotMetricTagDao(SqlSessionTemplate sqlPinotSessionTemplate,
KafkaTemplate<String, MetricJsonTag> kafkaTagTemplate,
@Value("${kafka.metadata.tag.topic}") String topic) {
Expand All @@ -57,7 +64,8 @@ public PinotMetricTagDao(SqlSessionTemplate sqlPinotSessionTemplate,
@Override
public void insertMetricTag(MetricTag metricTag) {
MetricJsonTag metricJsonTag = MetricJsonTag.covertMetricJsonTag(tagListTypeHandler, metricTag);
kafkaTagTemplate.send(topic, metricTag.getHostGroupName(), metricJsonTag);
ListenableFuture<SendResult<String, MetricJsonTag>> callBack = kafkaTagTemplate.send(topic, metricTag.getHostGroupName(), metricJsonTag);
callBack.addCallback(resultCallback);
}

private static class MetricJsonTag {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import com.navercorp.pinpoint.metric.collector.dao.SystemMetricDataTypeDao;
import com.navercorp.pinpoint.metric.common.model.MetricData;
import com.navercorp.pinpoint.metric.common.model.MetricDataName;
import com.navercorp.pinpoint.metric.common.util.KafkaCallbacks;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Repository;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.List;
import java.util.Objects;
Expand All @@ -43,6 +47,9 @@ public class PinotSystemMetricDataTypeDao implements SystemMetricDataTypeDao {
private final KafkaTemplate<String, MetricData> kafkaDataTypeTemplate;
private final String topic;

private final ListenableFutureCallback<SendResult<String, MetricData>> resultCallback
= KafkaCallbacks.loggingCallback("Kafka(MetricData)", logger);

public PinotSystemMetricDataTypeDao(SqlSessionTemplate sqlPinotSessionTemplate,
KafkaTemplate<String, MetricData> kafkaDataTypeTemplate,
@Value("${kafka.metadata.data.type.topic}") String topic) {
Expand All @@ -64,6 +71,7 @@ public MetricData selectMetricDataType(MetricDataName metricDataName) {

@Override
public void updateMetricDataType(MetricData metricData) {
kafkaDataTypeTemplate.send(topic, metricData.getMetricName(), metricData);
ListenableFuture<SendResult<String, MetricData>> callback = kafkaDataTypeTemplate.send(topic, metricData.getMetricName(), metricData);
callback.addCallback(resultCallback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@
import com.navercorp.pinpoint.metric.collector.dao.SystemMetricDao;
import com.navercorp.pinpoint.metric.collector.view.SystemMetricView;
import com.navercorp.pinpoint.metric.common.model.DoubleMetric;
import com.navercorp.pinpoint.metric.common.util.KafkaCallbacks;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Repository;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.List;
import java.util.Objects;
Expand All @@ -32,10 +38,15 @@
@Repository
public class PinotSystemMetricDoubleDao implements SystemMetricDao<DoubleMetric> {

private final Logger logger = LogManager.getLogger(this.getClass());

private final KafkaTemplate<String, SystemMetricView> kafkaDoubleTemplate;

private final String topic;

private final ListenableFutureCallback<SendResult<String, SystemMetricView>> resultCallback
= KafkaCallbacks.loggingCallback("Kafka(SystemMetricView)", logger);

public PinotSystemMetricDoubleDao(KafkaTemplate<String, SystemMetricView> kafkaDoubleTemplate,
@Value("${kafka.double.topic}") String topic) {
this.kafkaDoubleTemplate = Objects.requireNonNull(kafkaDoubleTemplate, "kafkaDoubleTemplate");
Expand All @@ -51,17 +62,16 @@ public void insert(String tenantId, String hostGroupName, String hostName, List<
for (DoubleMetric doubleMetric : systemMetrics) {
String kafkaKey = generateKafkaKey(doubleMetric);
SystemMetricView systemMetricView = new SystemMetricView(tenantId, hostGroupName, doubleMetric);
this.kafkaDoubleTemplate.send(topic, kafkaKey, systemMetricView);
ListenableFuture<SendResult<String, SystemMetricView>> callback = this.kafkaDoubleTemplate.send(topic, kafkaKey, systemMetricView);
callback.addCallback(resultCallback);
}
}

private String generateKafkaKey(DoubleMetric doubleMetric) {
StringBuilder sb = new StringBuilder();
sb.append(doubleMetric.getHostName());
sb.append("_");
sb.append(doubleMetric.getMetricName());
sb.append("_");
sb.append(doubleMetric.getFieldName());
return sb.toString();
return doubleMetric.getHostName() +
"_" +
doubleMetric.getMetricName() +
"_" +
doubleMetric.getFieldName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.navercorp.pinpoint.metric.common.util;

import org.apache.logging.log4j.Logger;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFutureCallback;

public final class KafkaCallbacks {

public static <T> ListenableFutureCallback<SendResult<String, T>> loggingCallback(String name, Logger logger) {
return new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
logger.warn("{} onFailure:{}", name, ex.getMessage(), ex);
}

@Override
public void onSuccess(SendResult<String, T> result) {
if (logger.isDebugEnabled()) {
logger.debug("{} onSuccess:{}", name, result);
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.stubbing.Answer;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -38,6 +39,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;

/**
* @author Hyunjoon Cho
Expand All @@ -63,7 +65,7 @@ public void setupTemplate() {
public Object answer(InvocationOnMock invocation) throws Throwable {
sendCount.increment();
logger.info("Sending View {}", sendCount.intValue());
return null;
return mock(ListenableFuture.class);
}
}).when(kafkaTemplate).send(anyString(), anyString(), any(SystemMetricView.class));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
package com.navercorp.pinpoint.uristat.collector.dao;

import com.navercorp.pinpoint.metric.common.util.KafkaCallbacks;
import com.navercorp.pinpoint.uristat.common.util.StringPrecondition;
import com.navercorp.pinpoint.uristat.common.model.UriStat;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Repository;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.List;
import java.util.Objects;

@Repository
public class PinotUriStatDao implements UriStatDao {
private final Logger logger = LogManager.getLogger(getClass());

private final KafkaTemplate<String, UriStat> kafkaUriStatTemplate;

private final String topic;

private final ListenableFutureCallback<SendResult<String, UriStat>> resultCallback
= KafkaCallbacks.loggingCallback("Kafka(UriStat)", logger);

public PinotUriStatDao(@Qualifier("kafkaUriStatTemplate") KafkaTemplate<String, UriStat> kafkaUriStatTemplate,
@Value("${kafka.uri.topic}") String topic) {
this.kafkaUriStatTemplate = Objects.requireNonNull(kafkaUriStatTemplate, "kafkaUriStatTemplate");
Expand All @@ -28,7 +38,8 @@ public void insert(List<UriStat> data) {
Objects.requireNonNull(data);

for (UriStat uriStat : data) {
this.kafkaUriStatTemplate.send(topic, uriStat.getApplicationName(), uriStat);
ListenableFuture<SendResult<String, UriStat>> response = this.kafkaUriStatTemplate.send(topic, uriStat.getApplicationName(), uriStat);
response.addCallback(resultCallback);
}

}
Expand Down

0 comments on commit 3529785

Please sign in to comment.