Skip to content

Commit

Permalink
KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tic…
Browse files Browse the repository at this point in the history
…k thread to the sink task thread (apache#14079)

Reviewers: Chris Egerton <chrise@aiven.io>
  • Loading branch information
yashmayya committed Jul 25, 2023
1 parent 58b8c5c commit 08b3820
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 56 deletions.
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;

Expand Down Expand Up @@ -195,6 +197,7 @@ protected abstract void producerSendFailed(
private final boolean topicTrackingEnabled;
private final TopicCreation topicCreation;
private final Executor closeExecutor;
private final Supplier<List<ErrorReporter>> errorReportersSupplier;

// Visible for testing
List<SourceRecord> toSend;
Expand Down Expand Up @@ -224,7 +227,8 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id,
Time time,
RetryWithToleranceOperator retryWithToleranceOperator,
StatusBackingStore statusBackingStore,
Executor closeExecutor) {
Executor closeExecutor,
Supplier<List<ErrorReporter>> errorReportersSupplier) {

super(id, statusListener, initialState, loader, connectMetrics, errorMetrics,
retryWithToleranceOperator, time, statusBackingStore);
Expand All @@ -242,6 +246,7 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id,
this.offsetStore = Objects.requireNonNull(offsetStore, "offset store cannot be null for source tasks");
this.closeExecutor = closeExecutor;
this.sourceTaskContext = sourceTaskContext;
this.errorReportersSupplier = errorReportersSupplier;

this.stopRequestedLatch = new CountDownLatch(1);
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
Expand All @@ -261,6 +266,7 @@ public void initialize(TaskConfig taskConfig) {

@Override
protected void initializeAndStart() {
retryWithToleranceOperator.reporters(errorReportersSupplier.get());
prepareToInitializeTask();
offsetStore.start();
// If we try to start the task at all by invoking initialize, then count this as
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
Expand All @@ -47,11 +48,13 @@
import org.slf4j.LoggerFactory;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;


/**
Expand Down Expand Up @@ -94,11 +97,12 @@ public ExactlyOnceWorkerSourceTask(ConnectorTaskId id,
SourceConnectorConfig sourceConfig,
Executor closeExecutor,
Runnable preProducerCheck,
Runnable postProducerCheck) {
Runnable postProducerCheck,
Supplier<List<ErrorReporter>> errorReportersSupplier) {
super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain,
new WorkerSourceTaskContext(offsetReader, id, configState, buildTransactionContext(sourceConfig)),
producer, admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics,
loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor);
loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier);

this.transactionOpen = false;
this.committableRecords = new LinkedHashMap<>();
Expand Down
Expand Up @@ -1797,7 +1797,6 @@ public WorkerTask doBuild(Task task,
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformationStages(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings());
retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
keyConverter, valueConverter, headerConverter);

Expand All @@ -1808,7 +1807,8 @@ public WorkerTask doBuild(Task task,

return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
valueConverter, errorHandlingMetrics, headerConverter, transformationChain, consumer, classLoader, time,
retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore());
retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore(),
() -> sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
}
}

Expand Down Expand Up @@ -1837,7 +1837,6 @@ public WorkerTask doBuild(Task task,

SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
connectorConfig.originalsStrings(), config.topicCreationEnable());
retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);

Expand Down Expand Up @@ -1869,7 +1868,7 @@ public WorkerTask doBuild(Task task,
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, errorHandlingMetrics,
headerConverter, transformationChain, producer, topicAdmin, topicCreationGroups,
offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time,
retryWithToleranceOperator, herder.statusBackingStore(), executor);
retryWithToleranceOperator, herder.statusBackingStore(), executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
}
}

Expand Down Expand Up @@ -1905,7 +1904,6 @@ public WorkerTask doBuild(Task task,

SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
connectorConfig.originalsStrings(), config.topicCreationEnable());
retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);

Expand Down Expand Up @@ -1935,7 +1933,8 @@ public WorkerTask doBuild(Task task,
return new ExactlyOnceWorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
headerConverter, transformationChain, producer, topicAdmin, topicCreationGroups,
offsetReader, offsetWriter, offsetStore, config, configState, metrics, errorHandlingMetrics, classLoader, time, retryWithToleranceOperator,
herder.statusBackingStore(), sourceConfig, executor, preProducerCheck, postProducerCheck);
herder.statusBackingStore(), sourceConfig, executor, preProducerCheck, postProducerCheck,
() -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
}
}

Expand Down
Expand Up @@ -40,13 +40,14 @@
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
Expand All @@ -61,6 +62,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -98,6 +100,7 @@ class WorkerSinkTask extends WorkerTask {
private boolean committing;
private boolean taskStopped;
private final WorkerErrantRecordReporter workerErrantRecordReporter;
private final Supplier<List<ErrorReporter>> errorReportersSupplier;

public WorkerSinkTask(ConnectorTaskId id,
SinkTask task,
Expand All @@ -116,7 +119,8 @@ public WorkerSinkTask(ConnectorTaskId id,
Time time,
RetryWithToleranceOperator retryWithToleranceOperator,
WorkerErrantRecordReporter workerErrantRecordReporter,
StatusBackingStore statusBackingStore) {
StatusBackingStore statusBackingStore,
Supplier<List<ErrorReporter>> errorReportersSupplier) {
super(id, statusListener, initialState, loader, connectMetrics, errorMetrics,
retryWithToleranceOperator, time, statusBackingStore);

Expand Down Expand Up @@ -145,6 +149,7 @@ public WorkerSinkTask(ConnectorTaskId id,
this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
this.taskStopped = false;
this.workerErrantRecordReporter = workerErrantRecordReporter;
this.errorReportersSupplier = errorReportersSupplier;
}

@Override
Expand Down Expand Up @@ -299,6 +304,7 @@ public int commitFailures() {
@Override
protected void initializeAndStart() {
SinkConnectorConfig.validate(taskConfig);
retryWithToleranceOperator.reporters(errorReportersSupplier.get());

if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
List<String> topics = SinkConnectorConfig.parseTopicsList(taskConfig);
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
Expand All @@ -40,6 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand All @@ -48,6 +50,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;

Expand Down Expand Up @@ -84,12 +87,13 @@ public WorkerSourceTask(ConnectorTaskId id,
Time time,
RetryWithToleranceOperator retryWithToleranceOperator,
StatusBackingStore statusBackingStore,
Executor closeExecutor) {
Executor closeExecutor,
Supplier<List<ErrorReporter>> errorReportersSupplier) {

super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain,
new WorkerSourceTaskContext(offsetReader, id, configState, null), producer,
admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics, loader,
time, retryWithToleranceOperator, statusBackingStore, closeExecutor);
time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier);

this.committableOffsets = CommittableOffsets.EMPTY;
this.submittedRecords = new SubmittedRecords();
Expand Down
Expand Up @@ -37,6 +37,8 @@
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.integration.MonitorableSourceConnector;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
Expand Down Expand Up @@ -72,6 +74,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
Expand All @@ -95,6 +98,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -348,7 +352,8 @@ public void testHeadersWithCustomConverter() throws Exception {
StringConverter stringConverter = new StringConverter();
SampleConverterWithHeaders testConverter = new SampleConverterWithHeaders();

createWorkerTask(stringConverter, testConverter, stringConverter);
createWorkerTask(stringConverter, testConverter, stringConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR,
Collections::emptyList);

expectSendRecord(null);
expectTopicCreation(TOPIC);
Expand Down Expand Up @@ -689,6 +694,28 @@ public void testSendRecordsRetriableException() {
verify(transformationChain, times(2)).apply(eq(record3));
}

@Test
public void testErrorReportersConfigured() {
RetryWithToleranceOperator retryWithToleranceOperator = mock(RetryWithToleranceOperator.class);
List<ErrorReporter> errorReporters = Collections.singletonList(mock(ErrorReporter.class));
createWorkerTask(keyConverter, valueConverter, headerConverter, retryWithToleranceOperator, () -> errorReporters);
workerTask.initializeAndStart();

ArgumentCaptor<List<ErrorReporter>> errorReportersCapture = ArgumentCaptor.forClass(List.class);
verify(retryWithToleranceOperator).reporters(errorReportersCapture.capture());
assertEquals(errorReporters, errorReportersCapture.getValue());
}

@Test
public void testErrorReporterConfigurationExceptionPropagation() {
createWorkerTask(keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR,
() -> {
throw new ConnectException("Failed to create error reporters");
}
);
assertThrows(ConnectException.class, () -> workerTask.initializeAndStart());
}

private void expectSendRecord(Headers headers) {
if (headers != null)
expectConvertHeadersAndKeyValue(headers, TOPIC);
Expand Down Expand Up @@ -799,15 +826,16 @@ private RecordHeaders emptyHeaders() {
}

private void createWorkerTask() {
createWorkerTask(keyConverter, valueConverter, headerConverter);
createWorkerTask(keyConverter, valueConverter, headerConverter, RetryWithToleranceOperatorTest.NOOP_OPERATOR, Collections::emptyList);
}

private void createWorkerTask(Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
private void createWorkerTask(Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter,
RetryWithToleranceOperator retryWithToleranceOperator, Supplier<List<ErrorReporter>> errorReportersSupplier) {
workerTask = new AbstractWorkerSourceTask(
taskId, sourceTask, statusListener, TargetState.STARTED, keyConverter, valueConverter, headerConverter, transformationChain,
sourceTaskContext, producer, admin, TopicCreationGroup.configuredGroups(sourceConfig), offsetReader, offsetWriter, offsetStore,
config, metrics, errorHandlingMetrics, plugins.delegatingLoader(), Time.SYSTEM, RetryWithToleranceOperatorTest.NOOP_OPERATOR,
statusBackingStore, Runnable::run) {
config, metrics, errorHandlingMetrics, plugins.delegatingLoader(), Time.SYSTEM, retryWithToleranceOperator,
statusBackingStore, Runnable::run, errorReportersSupplier) {
@Override
protected void prepareToInitializeTask() {
}
Expand Down

0 comments on commit 08b3820

Please sign in to comment.