Skip to content

Commit

Permalink
MINOR: Delete task-level commit sensor (apache#14677)
Browse files Browse the repository at this point in the history
The task-level commit metrics were removed without deprecation in KIP-447 and the corresponding PR apache#8218. However, we forgot to update the docs and to remove the code that creates the task-level commit sensor.
This PR removes the task-level commit metrics from the docs and removes the code that creates the task-level commit sensor. The code was effectively dead since it was only used in tests.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
  • Loading branch information
cadonna authored and rreddy-22 committed Jan 2, 2024
1 parent e681e6c commit e6ae97f
Show file tree
Hide file tree
Showing 5 changed files with 2 additions and 116 deletions.
20 changes: 0 additions & 20 deletions docs/ops.html
Expand Up @@ -2956,26 +2956,6 @@ <h5 class="anchor-heading"><a id="kafka_streams_task_monitoring" class="anchor-l
<td>The total number of processed records across all source processor nodes of this task.</td>
<td>kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)</td>
</tr>
<tr>
<td>commit-latency-avg</td>
<td>The average execution time in ns, for committing.</td>
<td>kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)</td>
</tr>
<tr>
<td>commit-latency-max</td>
<td>The maximum execution time in ns, for committing.</td>
<td>kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)</td>
</tr>
<tr>
<td>commit-rate</td>
<td>The average number of commit calls per second.</td>
<td>kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)</td>
</tr>
<tr>
<td>commit-total</td>
<td>The total number of commit calls.</td>
<td>kafka.streams:type=stream-task-metrics,thread-id=([-.\w]+),task-id=([-.\w]+)</td>
</tr>
<tr>
<td>record-lateness-avg</td>
<td>The average observed lateness of records (stream time - record timestamp).</td>
Expand Down
Expand Up @@ -41,12 +41,6 @@ private TaskMetrics() {}
private static final String RATE_DESCRIPTION_SUFFIX = " per second";
private static final String ACTIVE_TASK_PREFIX = "active-";

private static final String COMMIT = "commit";
private static final String COMMIT_DESCRIPTION = "calls to commit";
private static final String COMMIT_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_DESCRIPTION;
private static final String COMMIT_RATE_DESCRIPTION =
RATE_DESCRIPTION_PREFIX + COMMIT_DESCRIPTION + RATE_DESCRIPTION_SUFFIX;

private static final String PUNCTUATE = "punctuate";
private static final String PUNCTUATE_DESCRIPTION = "calls to punctuate";
private static final String PUNCTUATE_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + PUNCTUATE_DESCRIPTION;
Expand Down Expand Up @@ -191,22 +185,6 @@ public static Sensor punctuateSensor(final String threadId,
);
}

public static Sensor commitSensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics,
final Sensor... parentSensor) {
return invocationRateAndCountSensor(
threadId,
taskId,
COMMIT,
COMMIT_RATE_DESCRIPTION,
COMMIT_TOTAL_DESCRIPTION,
Sensor.RecordingLevel.DEBUG,
streamsMetrics,
parentSensor
);
}

public static Sensor restoreSensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics,
Expand Down
Expand Up @@ -27,8 +27,6 @@
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATIO_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RECORDS_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TASK_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.THREAD_LEVEL_GROUP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.TOTAL_DESCRIPTION;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addAvgAndMaxToSensor;
Expand Down Expand Up @@ -78,10 +76,6 @@ private ThreadMetrics() {}
private static final String PUNCTUATE_RATE_DESCRIPTION = RATE_DESCRIPTION + PUNCTUATE_DESCRIPTION;
private static final String PUNCTUATE_AVG_LATENCY_DESCRIPTION = "The average punctuate latency";
private static final String PUNCTUATE_MAX_LATENCY_DESCRIPTION = "The maximum punctuate latency";
private static final String COMMIT_OVER_TASKS_DESCRIPTION =
"calls to commit over all tasks assigned to one stream thread";
private static final String COMMIT_OVER_TASKS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + COMMIT_OVER_TASKS_DESCRIPTION;
private static final String COMMIT_OVER_TASKS_RATE_DESCRIPTION = RATE_DESCRIPTION + COMMIT_OVER_TASKS_DESCRIPTION;
private static final String PROCESS_RATIO_DESCRIPTION =
"The fraction of time the thread spent on processing active tasks";
private static final String PUNCTUATE_RATIO_DESCRIPTION =
Expand Down Expand Up @@ -225,22 +219,6 @@ public static Sensor punctuateSensor(final String threadId,
);
}

public static Sensor commitOverTasksSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
final Sensor commitOverTasksSensor =
streamsMetrics.threadLevelSensor(threadId, COMMIT, Sensor.RecordingLevel.DEBUG);
final Map<String, String> tagMap = streamsMetrics.taskLevelTagMap(threadId, ROLLUP_VALUE);
addInvocationRateAndCountToSensor(
commitOverTasksSensor,
TASK_LEVEL_GROUP,
tagMap,
COMMIT,
COMMIT_OVER_TASKS_RATE_DESCRIPTION,
COMMIT_OVER_TASKS_TOTAL_DESCRIPTION
);
return commitOverTasksSensor;
}

public static Sensor processRatioSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
final Sensor sensor =
Expand Down
Expand Up @@ -177,30 +177,6 @@ public void shouldGetPunctuateSensor() {
}
}

@Test
public void shouldGetCommitSensor() {
final String operation = "commit";
final String totalDescription = "The total number of calls to commit";
final String rateDescription = "The average number of calls to commit per second";
when(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, operation, RecordingLevel.DEBUG)).thenReturn(expectedSensor);
when(streamsMetrics.taskLevelTagMap(THREAD_ID, TASK_ID)).thenReturn(tagMap);

try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
final Sensor sensor = TaskMetrics.commitSensor(THREAD_ID, TASK_ID, streamsMetrics);
streamsMetricsStaticMock.verify(
() -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
TASK_LEVEL_GROUP,
tagMap,
operation,
rateDescription,
totalDescription
)
);
assertThat(sensor, is(expectedSensor));
}
}

@Test
public void shouldGetEnforcedProcessingSensor() {
final String operation = "enforced-processing";
Expand Down
Expand Up @@ -35,15 +35,13 @@

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.LATENCY_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.RATE_SUFFIX;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.ROLLUP_VALUE;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

public class ThreadMetricsTest {

private static final String THREAD_ID = "thread-id";
private static final String THREAD_LEVEL_GROUP = "stream-thread-metrics";
private static final String TASK_LEVEL_GROUP = "stream-task-metrics";

private final Sensor expectedSensor = mock(Sensor.class);
private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class);
Expand Down Expand Up @@ -287,32 +285,6 @@ public void shouldGetCommitRatioSensor() {
}
}

@Test
public void shouldGetCommitOverTasksSensor() {
final String operation = "commit";
final String totalDescription =
"The total number of calls to commit over all tasks assigned to one stream thread";
final String rateDescription =
"The average per-second number of calls to commit over all tasks assigned to one stream thread";
when(streamsMetrics.threadLevelSensor(THREAD_ID, operation, RecordingLevel.DEBUG)).thenReturn(expectedSensor);
when(streamsMetrics.taskLevelTagMap(THREAD_ID, ROLLUP_VALUE)).thenReturn(tagMap);

try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) {
final Sensor sensor = ThreadMetrics.commitOverTasksSensor(THREAD_ID, streamsMetrics);
streamsMetricsStaticMock.verify(
() -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
expectedSensor,
TASK_LEVEL_GROUP,
tagMap,
operation,
rateDescription,
totalDescription
)
);
assertThat(sensor, is(expectedSensor));
}
}

@Test
public void shouldGetPunctuateSensor() {
final String operation = "punctuate";
Expand Down Expand Up @@ -371,6 +343,8 @@ public void shouldGetPunctuateRatioSensor() {
assertThat(sensor, is(expectedSensor));
}
}

@Test
public void shouldGetCreateTaskSensor() {
final String operation = "task-created";
final String totalDescription = "The total number of newly created tasks";
Expand Down

0 comments on commit e6ae97f

Please sign in to comment.