Skip to content

Commit

Permalink
Merge branch 'apache:trunk' into trunk
Browse files Browse the repository at this point in the history
  • Loading branch information
rreddy-22 committed Jul 25, 2023
2 parents 2d42e3f + af1f50f commit d83fde1
Show file tree
Hide file tree
Showing 46 changed files with 1,730 additions and 586 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,4 @@ See [vagrant/README.md](vagrant/README.md).
Apache Kafka is interested in building the community; we would welcome any thoughts or [patches](https://issues.apache.org/jira/browse/KAFKA). You can reach us [on the Apache mailing lists](http://kafka.apache.org/contact.html).

To contribute follow the instructions here:
* https://kafka.apache.org/contributing.html
* https://kafka.apache.org/contributing.html
24 changes: 21 additions & 3 deletions checkstyle/import-control-metadata.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
</subpackage>

<subpackage name="controller">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.common.acl" />
Expand All @@ -73,7 +72,6 @@
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
Expand All @@ -93,13 +91,17 @@
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.mutable" />
<allow pkg="org.apache.kafka.server.policy"/>
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>

<subpackage name="image">
Expand All @@ -122,6 +124,22 @@
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<subpackage name="loader">
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.controller.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>
<subpackage name="publisher">
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.controller.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>
</subpackage>

<subpackage name="metadata">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
//
// Starting in version 7, records can be produced using ZStandard compression. See KIP-110.
//
// Starting in Version 8, response has RecordErrors and ErrorMEssage. See KIP-467.
// Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467.
//
// Version 9 enables flexible versions.
"validVersions": "0-9",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;

Expand Down Expand Up @@ -195,6 +197,7 @@ protected abstract void producerSendFailed(
private final boolean topicTrackingEnabled;
private final TopicCreation topicCreation;
private final Executor closeExecutor;
private final Supplier<List<ErrorReporter>> errorReportersSupplier;

// Visible for testing
List<SourceRecord> toSend;
Expand Down Expand Up @@ -224,7 +227,8 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id,
Time time,
RetryWithToleranceOperator retryWithToleranceOperator,
StatusBackingStore statusBackingStore,
Executor closeExecutor) {
Executor closeExecutor,
Supplier<List<ErrorReporter>> errorReportersSupplier) {

super(id, statusListener, initialState, loader, connectMetrics, errorMetrics,
retryWithToleranceOperator, time, statusBackingStore);
Expand All @@ -242,6 +246,7 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id,
this.offsetStore = Objects.requireNonNull(offsetStore, "offset store cannot be null for source tasks");
this.closeExecutor = closeExecutor;
this.sourceTaskContext = sourceTaskContext;
this.errorReportersSupplier = errorReportersSupplier;

this.stopRequestedLatch = new CountDownLatch(1);
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
Expand All @@ -261,6 +266,7 @@ public void initialize(TaskConfig taskConfig) {

@Override
protected void initializeAndStart() {
retryWithToleranceOperator.reporters(errorReportersSupplier.get());
prepareToInitializeTask();
offsetStore.start();
// If we try to start the task at all by invoking initialize, then count this as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
Expand All @@ -47,11 +48,13 @@
import org.slf4j.LoggerFactory;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;


/**
Expand Down Expand Up @@ -94,11 +97,12 @@ public ExactlyOnceWorkerSourceTask(ConnectorTaskId id,
SourceConnectorConfig sourceConfig,
Executor closeExecutor,
Runnable preProducerCheck,
Runnable postProducerCheck) {
Runnable postProducerCheck,
Supplier<List<ErrorReporter>> errorReportersSupplier) {
super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain,
new WorkerSourceTaskContext(offsetReader, id, configState, buildTransactionContext(sourceConfig)),
producer, admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics,
loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor);
loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier);

this.transactionOpen = false;
this.committableRecords = new LinkedHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1797,7 +1797,6 @@ public WorkerTask doBuild(Task task,
TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformationStages(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings());
retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
WorkerErrantRecordReporter workerErrantRecordReporter = createWorkerErrantRecordReporter(sinkConfig, retryWithToleranceOperator,
keyConverter, valueConverter, headerConverter);

Expand All @@ -1808,7 +1807,8 @@ public WorkerTask doBuild(Task task,

return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter,
valueConverter, errorHandlingMetrics, headerConverter, transformationChain, consumer, classLoader, time,
retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore());
retryWithToleranceOperator, workerErrantRecordReporter, herder.statusBackingStore(),
() -> sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass));
}
}

Expand Down Expand Up @@ -1837,7 +1837,6 @@ public WorkerTask doBuild(Task task,

SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
connectorConfig.originalsStrings(), config.topicCreationEnable());
retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);

Expand Down Expand Up @@ -1869,7 +1868,7 @@ public WorkerTask doBuild(Task task,
return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter, errorHandlingMetrics,
headerConverter, transformationChain, producer, topicAdmin, topicCreationGroups,
offsetReader, offsetWriter, offsetStore, config, configState, metrics, classLoader, time,
retryWithToleranceOperator, herder.statusBackingStore(), executor);
retryWithToleranceOperator, herder.statusBackingStore(), executor, () -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
}
}

Expand Down Expand Up @@ -1905,7 +1904,6 @@ public WorkerTask doBuild(Task task,

SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins,
connectorConfig.originalsStrings(), config.topicCreationEnable());
retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);

Expand Down Expand Up @@ -1935,7 +1933,8 @@ public WorkerTask doBuild(Task task,
return new ExactlyOnceWorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
headerConverter, transformationChain, producer, topicAdmin, topicCreationGroups,
offsetReader, offsetWriter, offsetStore, config, configState, metrics, errorHandlingMetrics, classLoader, time, retryWithToleranceOperator,
herder.statusBackingStore(), sourceConfig, executor, preProducerCheck, postProducerCheck);
herder.statusBackingStore(), sourceConfig, executor, preProducerCheck, postProducerCheck,
() -> sourceTaskReporters(id, sourceConfig, errorHandlingMetrics));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.WorkerErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.StatusBackingStore;
Expand All @@ -61,6 +62,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -98,6 +100,7 @@ class WorkerSinkTask extends WorkerTask {
private boolean committing;
private boolean taskStopped;
private final WorkerErrantRecordReporter workerErrantRecordReporter;
private final Supplier<List<ErrorReporter>> errorReportersSupplier;

public WorkerSinkTask(ConnectorTaskId id,
SinkTask task,
Expand All @@ -116,7 +119,8 @@ public WorkerSinkTask(ConnectorTaskId id,
Time time,
RetryWithToleranceOperator retryWithToleranceOperator,
WorkerErrantRecordReporter workerErrantRecordReporter,
StatusBackingStore statusBackingStore) {
StatusBackingStore statusBackingStore,
Supplier<List<ErrorReporter>> errorReportersSupplier) {
super(id, statusListener, initialState, loader, connectMetrics, errorMetrics,
retryWithToleranceOperator, time, statusBackingStore);

Expand Down Expand Up @@ -145,6 +149,7 @@ public WorkerSinkTask(ConnectorTaskId id,
this.isTopicTrackingEnabled = workerConfig.getBoolean(TOPIC_TRACKING_ENABLE_CONFIG);
this.taskStopped = false;
this.workerErrantRecordReporter = workerErrantRecordReporter;
this.errorReportersSupplier = errorReportersSupplier;
}

@Override
Expand Down Expand Up @@ -299,6 +304,7 @@ public int commitFailures() {
@Override
protected void initializeAndStart() {
SinkConnectorConfig.validate(taskConfig);
retryWithToleranceOperator.reporters(errorReportersSupplier.get());

if (SinkConnectorConfig.hasTopicsConfig(taskConfig)) {
List<String> topics = SinkConnectorConfig.parseTopicsList(taskConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
Expand All @@ -40,6 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand All @@ -48,6 +50,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;

Expand Down Expand Up @@ -84,12 +87,13 @@ public WorkerSourceTask(ConnectorTaskId id,
Time time,
RetryWithToleranceOperator retryWithToleranceOperator,
StatusBackingStore statusBackingStore,
Executor closeExecutor) {
Executor closeExecutor,
Supplier<List<ErrorReporter>> errorReportersSupplier) {

super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain,
new WorkerSourceTaskContext(offsetReader, id, configState, null), producer,
admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics, loader,
time, retryWithToleranceOperator, statusBackingStore, closeExecutor);
time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier);

this.committableOffsets = CommittableOffsets.EMPTY;
this.submittedRecords = new SubmittedRecords();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ public static Map<String, String> computeAliases(PluginScanResult scanResult) {
if (classNames.size() == 1) {
aliases.put(alias, classNames.stream().findAny().get());
} else {
log.warn("Ignoring ambiguous alias '{}' since it refers to multiple distinct plugins {}", alias, classNames);
log.debug("Ignoring ambiguous alias '{}' since it refers to multiple distinct plugins {}", alias, classNames);
}
}
return aliases;
Expand Down

0 comments on commit d83fde1

Please sign in to comment.