Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JMX descriptions #119

Merged
merged 6 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<suppress checks="MethodLength" files=".*MongoSinkTopicConfig.java"/>
<suppress checks="MethodLength" files=".*MongoSourceConfig.java"/>
<suppress checks="LineLength" files=".*RecordConverterTest.java"/>
<suppress checks="LineLength" files=".*TaskStatistics.java"/>
<suppress checks="MethodLength" files=".*RecordConverterTest.java"/>

<suppress checks="Javadoc*" files="com[\\/]mongodb[\\/]kafka[\\/]connect[\\/]"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import static com.mongodb.kafka.connect.sink.MongoSinkConfig.TOPIC_OVERRIDE_CONFIG;
import static com.mongodb.kafka.connect.sink.MongoSinkTopicConfig.COLLECTION_CONFIG;
import static com.mongodb.kafka.connect.util.jmx.internal.MBeanServerUtils.getMBeanAttributes;
import static com.mongodb.kafka.connect.util.jmx.internal.MBeanServerUtils.getMBeanDescriptionFor;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -70,15 +72,15 @@ void testSinkSavesAvroDataToMongoDB() {
for (Map<String, Long> attrs : mBeansMap.values()) {
assertEventually(
() -> {
assertNotEquals(0, attrs.get("records-received"));
assertNotEquals(0, attrs.get("records-succeeded"));
assertNotEquals(0, attrs.get("records"));
assertNotEquals(0, attrs.get("records-successful"));
assertEquals(0, attrs.get("records-failed"));
assertNotEquals(0, attrs.get("latest-offset-ms")); // potentially flaky
assertNotEquals(0, attrs.get("task-invocations"));
assertNotEquals(0, attrs.get("between-task-invocations"));
assertNotEquals(0, attrs.get("records-processing"));
assertNotEquals(0, attrs.get("successful-batch-writes"));
assertEquals(0, attrs.get("failed-batch-writes"));
assertNotEquals(0, attrs.get("latest-kafka-time-difference-ms")); // potentially flaky
assertNotEquals(0, attrs.get("in-task-put"));
assertNotEquals(0, attrs.get("in-connect-framework"));
assertNotEquals(0, attrs.get("processing-phases"));
assertNotEquals(0, attrs.get("batch-writes-successful"));
assertEquals(0, attrs.get("batch-writes-failed"));
});
}
}
Expand Down Expand Up @@ -194,54 +196,15 @@ void testSinkSurvivesARestart() {
}

private void assertMetrics() {
Set<String> names =
new HashSet<>(
Arrays.asList(
"records-received",
"records-succeeded",
"records-failed",
"latest-offset-ms",
"task-invocations",
"task-invocations-over-1ms",
"task-invocations-over-10ms",
"task-invocations-over-100ms",
"task-invocations-over-1000ms",
"task-invocations-over-10000ms",
"task-invocations-total-ms",
"between-task-invocations",
"between-task-invocations-over-1ms",
"between-task-invocations-over-10ms",
"between-task-invocations-over-100ms",
"between-task-invocations-over-1000ms",
"between-task-invocations-over-10000ms",
"between-task-invocations-total-ms",
"records-processing",
"records-processing-over-1ms",
"records-processing-over-10ms",
"records-processing-over-100ms",
"records-processing-over-1000ms",
"records-processing-over-10000ms",
"records-processing-total-ms",
"successful-batch-writes",
"successful-batch-writes-over-1ms",
"successful-batch-writes-over-10ms",
"successful-batch-writes-over-100ms",
"successful-batch-writes-over-1000ms",
"successful-batch-writes-over-10000ms",
"successful-batch-writes-total-ms",
"failed-batch-writes",
"failed-batch-writes-over-1ms",
"failed-batch-writes-over-10ms",
"failed-batch-writes-over-100ms",
"failed-batch-writes-over-1000ms",
"failed-batch-writes-over-10000ms",
"failed-batch-writes-total-ms"));

Map<String, Map<String, Long>> mBeansMap =
getMBeanAttributes("com.mongodb:type=MongoDBKafkaConnector,name=SinkTask0");
Set<String> names = SinkTaskStatistics.DESCRIPTIONS.keySet();

String mBeanName = "com.mongodb.kafka.connect:type=sink-task-metrics,task=sink-task-0";
Map<String, Map<String, Long>> mBeansMap = getMBeanAttributes(mBeanName);
assertTrue(mBeansMap.size() > 0);
for (Map.Entry<String, Map<String, Long>> entry : mBeansMap.entrySet()) {
assertEquals(
names, entry.getValue().keySet(), "Mismatched MBean attributes for " + entry.getKey());
entry.getValue().keySet().forEach(n -> assertNotNull(getMBeanDescriptionFor(mBeanName, n)));
}
Set<String> initialNames = new HashSet<>();
new SinkTaskStatistics("name").emit(v -> initialNames.add(v.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@
import static com.mongodb.kafka.connect.source.MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG;
import static com.mongodb.kafka.connect.source.MongoSourceConfig.PIPELINE_CONFIG;
import static com.mongodb.kafka.connect.util.jmx.internal.MBeanServerUtils.getMBeanAttributes;
import static com.mongodb.kafka.connect.util.jmx.internal.MBeanServerUtils.getMBeanDescriptionFor;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.rangeClosed;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -123,21 +124,21 @@ void testSourceLoadsDataFromMongoClient() {

private void assertMBeanAttributesRecorded(
final Map<String, Long> attrs, final boolean skipInitiating) {
assertNotEquals(0, attrs.get("records-returned"));
assertNotEquals(0, attrs.get("records"));
assertEquals(0, attrs.get("records-filtered"));
assertNotEquals(0, attrs.get("records-acknowledged"));
assertNotEquals(0, attrs.get("records-read-bytes"));
assertNotEquals(0, attrs.get("mongodb-bytes-read"));
// skip "latest-offset-secs"
assertNotEquals(0, attrs.get("task-invocations"));
if (attrs.get("task-invocations") > 1) {
assertNotEquals(0, attrs.get("between-task-invocations"));
assertNotEquals(0, attrs.get("in-task-poll"));
if (attrs.get("in-task-poll") > 1) {
assertNotEquals(0, attrs.get("in-connect-framework"));
}
if (!skipInitiating) {
assertNotEquals(0, attrs.get("successful-initiating-commands"));
assertNotEquals(0, attrs.get("initial-commands-successful"));
}
assertNotEquals(0, attrs.get("successful-getmore-commands"));
assertEquals(0, attrs.get("failed-initiating-commands"));
assertEquals(0, attrs.get("failed-getmore-commands"));
assertNotEquals(0, attrs.get("getmore-commands-successful"));
assertEquals(0, attrs.get("initial-commands-failed"));
assertEquals(0, attrs.get("getmore-commands-failed"));
}

@Test
Expand Down Expand Up @@ -356,6 +357,8 @@ void testSourceUsesHeartbeatsForOffsets() {
insertMany(rangeClosed(1, 50), altColl);
getProducedStrings(heartbeatTopic, 1);

assertMetrics();

stopStartSourceConnector(sourceProperties);

boolean resumedFromHeartbeat =
Expand All @@ -365,7 +368,6 @@ void testSourceUsesHeartbeatsForOffsets() {

assertTrue(resumedFromHeartbeat);
}
assertMetrics();
}

@Test
Expand Down Expand Up @@ -419,66 +421,18 @@ void testSourceHasFriendlyErrorMessagesForInvalidPipelines() {

assertTrue(containsIllegalChangeStreamOperation);
}
assertMetrics();
}

private void assertMetrics() {
Set<String> names =
new HashSet<>(
Arrays.asList(
"records-returned",
"records-filtered",
"records-acknowledged",
"records-read-bytes",
"latest-offset-secs",
"task-invocations",
"task-invocations-over-1ms",
"task-invocations-over-10ms",
"task-invocations-over-100ms",
"task-invocations-over-1000ms",
"task-invocations-over-10000ms",
"task-invocations-total-ms",
"between-task-invocations",
"between-task-invocations-over-1ms",
"between-task-invocations-over-10ms",
"between-task-invocations-over-100ms",
"between-task-invocations-over-1000ms",
"between-task-invocations-over-10000ms",
"between-task-invocations-total-ms",
"successful-initiating-commands",
"successful-initiating-commands-over-1ms",
"successful-initiating-commands-over-10ms",
"successful-initiating-commands-over-100ms",
"successful-initiating-commands-over-1000ms",
"successful-initiating-commands-over-10000ms",
"successful-initiating-commands-total-ms",
"successful-getmore-commands",
"successful-getmore-commands-over-1ms",
"successful-getmore-commands-over-10ms",
"successful-getmore-commands-over-100ms",
"successful-getmore-commands-over-1000ms",
"successful-getmore-commands-over-10000ms",
"successful-getmore-commands-total-ms",
"failed-initiating-commands",
"failed-initiating-commands-over-1ms",
"failed-initiating-commands-over-10ms",
"failed-initiating-commands-over-100ms",
"failed-initiating-commands-over-1000ms",
"failed-initiating-commands-over-10000ms",
"failed-initiating-commands-total-ms",
"failed-getmore-commands",
"failed-getmore-commands-over-1ms",
"failed-getmore-commands-over-10ms",
"failed-getmore-commands-over-100ms",
"failed-getmore-commands-over-1000ms",
"failed-getmore-commands-over-10000ms",
"failed-getmore-commands-total-ms"));

String mBeanName = "com.mongodb:type=MongoDBKafkaConnector,name=SourceTask*";
Set<String> names = SourceTaskStatistics.DESCRIPTIONS.keySet();

String mBeanName = "com.mongodb.kafka.connect:type=source-task-metrics,task=source-task-0";
Map<String, Map<String, Long>> mBeansMap = getMBeanAttributes(mBeanName);
assertTrue(mBeansMap.size() > 0);
for (Map.Entry<String, Map<String, Long>> entry : mBeansMap.entrySet()) {
assertEquals(
names, entry.getValue().keySet(), "Mismatched MBean attributes for " + entry.getKey());
entry.getValue().keySet().forEach(n -> assertNotNull(getMBeanDescriptionFor(mBeanName, n)));
}
Set<String> initialNames = new HashSet<>();
new SourceTaskStatistics("name").emit(v -> initialNames.add(v.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,12 @@ void testSourceCanHandleInvalidResumeTokenWhenErrorToleranceIsAll() {
getMBeanAttributes(
"com.mongodb.kafka.connect:type=source-task-metrics,task=source-task-change-stream-unknown");
for (Map<String, Long> attrs : mBeansMap.values()) {
assertEquals(50, attrs.get("records-returned"));
assertNotEquals(0, attrs.get("records-read-bytes"));
assertNotEquals(0, attrs.get("successful-initiating-commands"));
assertEquals(2, attrs.get("successful-getmore-commands"));
assertEquals(1, attrs.get("failed-initiating-commands"));
assertEquals(0, attrs.get("failed-getmore-commands"));
assertEquals(50, attrs.get("records"));
assertNotEquals(0, attrs.get("mongodb-bytes-read"));
assertNotEquals(0, attrs.get("initial-commands-successful"));
assertEquals(2, attrs.get("getmore-commands-successful"));
assertEquals(1, attrs.get("initial-commands-failed"));
assertEquals(0, attrs.get("getmore-commands-failed"));
}
}
task.stop();
Expand Down Expand Up @@ -816,12 +816,12 @@ void testErrorToleranceAllSupport16MbError() {
getMBeanAttributes(
"com.mongodb.kafka.connect:type=source-task-metrics,task=source-task-change-stream-unknown");
for (Map<String, Long> attrs : mBeansMap.values()) {
assertEquals(10, attrs.get("records-returned"));
assertNotEquals(0, attrs.get("records-read-bytes"));
assertEquals(2, attrs.get("successful-initiating-commands"));
assertEquals(3, attrs.get("successful-getmore-commands"));
assertEquals(0, attrs.get("failed-initiating-commands"));
assertEquals(1, attrs.get("failed-getmore-commands"));
assertEquals(10, attrs.get("records"));
assertNotEquals(0, attrs.get("mongodb-bytes-read"));
assertEquals(2, attrs.get("initial-commands-successful"));
assertEquals(3, attrs.get("getmore-commands-successful"));
assertEquals(0, attrs.get("initial-commands-failed"));
assertEquals(1, attrs.get("getmore-commands-failed"));
}
task.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ void stop() {
void put(final Collection<SinkRecord> records) {
if (lastTaskInvocation != null) {
statistics
.getBetweenTaskInvocations()
.getInConnectFramework()
.sample(lastTaskInvocation.getElapsedTime(TimeUnit.MILLISECONDS));
}
Timer taskTime = Timer.start();
statistics.getRecordsReceived().sample(records.size());
statistics.getRecords().sample(records.size());
trackLatestRecordTimestampOffset(records);
if (records.isEmpty()) {
LOGGER.debug("No sink records to process for current poll operation");
Expand All @@ -100,14 +100,12 @@ void put(final Collection<SinkRecord> records) {
List<List<MongoProcessedSinkRecordData>> batches =
MongoSinkRecordProcessor.orderedGroupByTopicAndNamespace(
records, sinkConfig, errorReporter);
statistics
.getRecordsProcessing()
.sample(processingTime.getElapsedTime(TimeUnit.MILLISECONDS));
statistics.getProcessingPhases().sample(processingTime.getElapsedTime(TimeUnit.MILLISECONDS));
for (List<MongoProcessedSinkRecordData> batch : batches) {
bulkWriteBatch(batch);
}
}
statistics.getTaskInvocations().sample(taskTime.getElapsedTime(TimeUnit.MILLISECONDS));
statistics.getInTaskPut().sample(taskTime.getElapsedTime(TimeUnit.MILLISECONDS));
lastTaskInvocation = Timer.start();
if (LOGGER.isDebugEnabled()) {
// toJSON relatively expensive
Expand All @@ -123,7 +121,7 @@ private void trackLatestRecordTimestampOffset(final Collection<SinkRecord> recor
.max();
if (latestRecord.isPresent()) {
long offsetMs = System.currentTimeMillis() - latestRecord.getAsLong();
statistics.getLatestOffsetMs().sample(offsetMs);
statistics.getLatestKafkaTimeDifferenceMs().sample(offsetMs);
}
}

Expand Down Expand Up @@ -154,12 +152,12 @@ private void bulkWriteBatch(final List<MongoProcessedSinkRecordData> batch) {
.getDatabase(namespace.getDatabaseName())
.getCollection(namespace.getCollectionName(), BsonDocument.class)
.bulkWrite(writeModels, new BulkWriteOptions().ordered(bulkWriteOrdered));
statistics.getSuccessfulBatchWrites().sample(writeTime.getElapsedTime(TimeUnit.MILLISECONDS));
statistics.getRecordsSucceeded().sample(batch.size());
statistics.getBatchWritesSuccessful().sample(writeTime.getElapsedTime(TimeUnit.MILLISECONDS));
statistics.getRecordsSuccessful().sample(batch.size());
LOGGER.debug("Mongodb bulk write result: {}", result);
} catch (RuntimeException e) {
statistics.getFailedBatchWrites().sample(writeTime.getElapsedTime(TimeUnit.MILLISECONDS));
statistics.getFailedRecords().sample(batch.size());
statistics.getBatchWritesFailed().sample(writeTime.getElapsedTime(TimeUnit.MILLISECONDS));
statistics.getRecordsFailed().sample(batch.size());
handleTolerableWriteException(
batch.stream()
.map(MongoProcessedSinkRecordData::getSinkRecord)
Expand Down
Loading