Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 938fee2
Author: David Arthur <mumrah@gmail.com>
Date:   Mon Jul 31 09:21:22 2023 -0400

    Fix a Scala 2.12 compile issue (apache#14126)

    Reviewers: Luke Chen <showuon@gmail.com>, Qichao Chu

commit 3ba718e
Author: Yash Mayya <yash.mayya@gmail.com>
Date:   Fri Jul 28 19:35:42 2023 +0100

    MINOR: Remove duplicate instantiation of MockConnectMetrics in AbstractWorkerSourceTaskTest (apache#14091)

    Reviewers: Christo Lolov <christololov@gmail.com>, Manyanda Chitimbo <manyanda.chitimbo@gmail.com>, Greg Harris <greg.harris@aiven.io>

commit 1574b9f
Author: David Jacot <djacot@confluent.io>
Date:   Fri Jul 28 20:28:54 2023 +0200

    MINOR: Code cleanups in group-coordinator module (apache#14117)

    This patch does a few code cleanups in the group-coordinator module.

    It renames Coordinator to CoordinatorShard;
    It renames ReplicatedGroupCoordinator to GroupCoordinatorShard. I was never really happy with this name. The new name makes more sense to me;
    It removes TopicPartition from the GroupMetadataManager. It was only used in log messages. The log context already includes it so we don't have to log it again.
    It renames assignors to consumerGroupAssignors.

    Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>

commit 3709901
Author: Ritika Reddy <98577846+rreddy-22@users.noreply.github.com>
Date:   Fri Jul 28 10:30:04 2023 -0700

    KAFKA-14702: Extend server side assignor to support rack aware replica placement (apache#14099)

    This patch updates the `PartitionAssignor` interface to support rack-awareness. The change introduces the `SubscribedTopicDescriber` interface that can be used to retrieve topic metadata such as the number of partitions or the racks from within an assignor. We use an interface because it allows us to wrap internal data structures instead of having to copy them. It is more efficient.

    Reviewers: David Jacot <djacot@confluent.io>

commit 32c39c8
Author: David Arthur <mumrah@gmail.com>
Date:   Fri Jul 28 13:02:47 2023 -0400

    KAFKA-15263 Check KRaftMigrationDriver state in each event (apache#14115)

    Reviewers: Colin P. McCabe <cmccabe@apache.org>

commit 811ae01
Author: Philip Nee <pnee@confluent.io>
Date:   Fri Jul 28 09:11:20 2023 -0700

    MINOR: Test assign() and assignment() in the integration test (apache#14086)

    A missing piece from KAFKA-14950. This is to test assign() and assignment() in the integration test.

    Also fixed an accidental mistake in the committed API.

    Reviewers: Jun Rao <junrao@gmail.com>

commit 19f9e1e
Author: Jeff Kim <kimkb2011@gmail.com>
Date:   Fri Jul 28 09:13:27 2023 -0400

    KAFKA-14501: Implement Heartbeat protocol in new GroupCoordinator (apache#14056)

    This patch implements the existing Heartbeat API in the new Group Coordinator.

    Reviewers: David Jacot <djacot@confluent.io>

commit dcabc29
Author: David Jacot <djacot@confluent.io>
Date:   Fri Jul 28 14:49:48 2023 +0200

    KAFKA-14048; CoordinatorContext should be protected by a lock (apache#14090)

    Accessing the `CoordinatorContext` in the `CoordinatorRuntime` should be protected by a lock. The runtime guarantees that the context is never access concurrently however it is accessed by multiple threads. The lock is here to ensure that we have a proper memory barrier. The patch does the following:
    1) Adds a lock to `CoordinatorContext`;
    2) Adds helper methods to get the context and acquire/release the lock.
    3) Allow transition from Failed to Loading. Previously, the context was recreated in this case.

    Reviewers: Justine Olshan <jolshan@confluent.io>

commit afe631c
Author: James Shaw <js102@zepler.net>
Date:   Fri Jul 28 10:45:15 2023 +0100

    KAFKA-14967: fix NPE in CreateTopicsResult in MockAdminClient (apache#13671)

    Co-authored-by: James Shaw <james.shaw@masabi.com>
    Reviewers: Mickael Maison <mickael.maison@gmail.com>

commit 722b259
Author: Christo Lolov <lolovc@amazon.com>
Date:   Fri Jul 28 06:40:37 2023 +0100

    KAFKA-14038: Optimise calculation of size for log in remote tier (apache#14049)

    Reviewers: Kamal Chandraprakash<kamal.chandraprakash@gmail.com>, Divij Vaidya <diviv@amazon.com>, Luke Chen <showuon@gmail.com>, Satish Duggana <satishd@apache.org>

commit 10bcd4f
Author: Colin Patrick McCabe <cmccabe@apache.org>
Date:   Thu Jul 27 17:01:55 2023 -0700

    KAFKA-15213: provide the exact offset to QuorumController.replay (apache#13643)

    Provide the exact record offset to QuorumController.replay() in all cases. There are several situations
    where this is useful, such as logging, implementing metadata transactions, or handling broker
    registration records.

    In the case where the QC is inactive, and simply replaying records, it is easy to compute the exact
    record offset from the batch base offset and the record index.

    The active QC case is more difficult. Technically, when we submit records to the Raft layer, it can
    choose a batch base offset later than the one we expect, if someone else is also adding records.
    While the QC is the only entity submitting data records, control records may be added at any time.
    In the current implementation, these are really only used for leadership elections. However, this
    could change with the addition of quorum reconfiguration or similar features.

    Therefore, this PR allows the QC to tell the Raft layer that a record append should fail if it
    would have resulted in a batch base offset other than what was expected. This in turn will trigger a
    controller failover. In the future, if automatically added control records become more common, we
    may wish to have a more sophisticated system than this simple optimistic concurrency mechanism. But
    for now, this will allow us to rely on the offset as correct.

    In order that the active QC can learn what offset to start writing at, the PR also adds a new
    RaftClient#endOffset function.

    At the Raft level, this PR adds a new exception, UnexpectedBaseOffsetException. This gets thrown
    when we request a base offset that doesn't match the one the Raft layer would have given us.
    Although this exception should cause a failover, it should not be considered a fault. This
    complicated the exception handling a bit and motivated splitting more of it out into the new
    EventHandlerExceptionInfo class. This will also let us unit test things like slf4j log messages a
    bit better.

    Reviewers: David Arthur <mumrah@gmail.com>, José Armando García Sancio <jsancio@apache.org>

commit e5861ee
Author: Alyssa Huang <ahuang@confluent.io>
Date:   Thu Jul 27 13:12:25 2023 -0700

    [MINOR] Add latest versions to kraft upgrade kafkatest (apache#14084)

    Reviewers: Ron Dagostino <rndgstn@gmail.com>

commit 6f39ef0
Author: Justine Olshan <jolshan@confluent.io>
Date:   Thu Jul 27 09:36:32 2023 -0700

    MINOR: Adjust Invalid Record Exception for Invalid Txn State as mentioned in KIP-890 (apache#14088)

    Invalid record is a newer error. INVALID_TXN_STATE has been around as long as transactions and is not retriable. This is the desired behavior.

commit 29825ee
Author: David Jacot <djacot@confluent.io>
Date:   Thu Jul 27 13:18:10 2023 +0200

    KAFKA-14499: [3/N] Implement OffsetCommit API (apache#14067)

    This patch introduces the `OffsetMetadataManager` and implements the `OffsetCommit` API for both the old rebalance protocol and the new rebalance protocol. It introduces version 9 of the API but keeps it as unstable for now. The patch adds unit tests to test the API. Integration tests will be done separately.

    Reviewers: Jeff Kim <jeff.kim@confluent.io>, Justine Olshan <jolshan@confluent.io>

commit 353141e
Author: Divij Vaidya <diviv@amazon.com>
Date:   Thu Jul 27 12:33:34 2023 +0200

    KAFKA-15251: Add 3.5.1 to system tests (apache#14069)

    Reviewers: Matthias J. Sax <matthias@confluent.io>

commit d2fc907
Author: Jeff Kim <kimkb2011@gmail.com>
Date:   Thu Jul 27 02:02:29 2023 -0400

    KAFKA-14500; [6/6] Implement SyncGroup protocol in new GroupCoordinator (apache#14017)

    This patch implements the SyncGroup API in the new group coordinator. All the new unit tests are based on the existing scala tests.

    Reviewers: David Jacot <djacot@confluent.io>

commit ed44bcd
Author: Hao Li <1127478+lihaosky@users.noreply.github.com>
Date:   Wed Jul 26 16:02:52 2023 -0700

    KAFKA-15022: [3/N] use graph to compute rack aware assignment for active stateful tasks (apache#14030)

    Part of KIP-925.

    Reviewers: Matthias J. Sax <matthias@confluent.io>

commit 8135b6d
Author: Said Boudjelda <bmscomp@gmail.com>
Date:   Wed Jul 26 19:52:02 2023 +0200

    KAFKA-15235: Fix broken coverage reports since migration to Gradle 8.x (apache#14075)

    Reviewers: Divij Vaidya <diviv@amazon.com>

commit e5fb9b6
Author: Said Boudjelda <bmscomp@gmail.com>
Date:   Wed Jul 26 19:12:27 2023 +0200

    MINOR: upgrade version of gradle plugin (ben-manes.versions) to 0.47.0 (apache#14098)

    Reviewers: Divij Vaidya <diviv@amazon.com>

commit a900794
Author: David Arthur <mumrah@gmail.com>
Date:   Wed Jul 26 12:54:59 2023 -0400

    KAFKA-15196 Additional ZK migration metrics (apache#14028)

    This patch adds several metrics defined in KIP-866:

    * MigratingZkBrokerCount: the number of zk brokers registered with KRaft
    * ZkWriteDeltaTimeMs: time spent writing MetadataDelta to ZK
    * ZkWriteSnapshotTimeMs: time spent writing MetadataImage to ZK
    * Adds value 4 for "ZK" to ZkMigrationState

    Also fixes a typo in the metric name introduced in apache#14009 (ZKWriteBehindLag -> ZkWriteBehindLag)

    Reviewers: Luke Chen <showuon@gmail.com>, Colin P. McCabe <cmccabe@apache.org>

commit 6d81698
Author: sciclon2 <74413315+sciclon2@users.noreply.github.com>
Date:   Wed Jul 26 15:48:09 2023 +0200

    KAFKA-15243: Set decoded user names to DescribeUserScramCredentialsResponse (apache#14094)

    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>

commit ff390ab
Author: vamossagar12 <sagarmeansocean@gmail.com>
Date:   Wed Jul 26 17:56:20 2023 +0530

    [MINOR] Fix Javadoc comment in KafkaFuture#toCompletionStage (apache#14100)

    Fix Javadoc comment in KafkaFuture#toCompletionStage

    Reviewers: Luke Chen <showuon@gmail.com>

commit bb677c4
Author: Federico Valeri <fedevaleri@gmail.com>
Date:   Wed Jul 26 12:04:34 2023 +0200

    KAFKA-14583: Move ReplicaVerificationTool to tools (apache#14059)

    Reviewers: Mickael Maison <mickael.maison@gmail.com>

commit 4d30cbf
Author: Said Boudjelda <bmscomp@gmail.com>
Date:   Wed Jul 26 11:21:36 2023 +0200

    MINOR: Upgrade the minor version of snappy dependency to 1.1.10.3 (apache#14072)

    Reviewers: Divij Vaidya <diviv@amazon.com>

commit 206a4af
Author: Divij Vaidya <diviv@amazon.com>
Date:   Wed Jul 26 11:19:56 2023 +0200

    MINOR: Add co-authors to release email template (apache#14080)

    Reviewers: Mickael Maison <mickael.maison@gmail.com>

commit 46a8a28
Author: vamossagar12 <sagarmeansocean@gmail.com>
Date:   Wed Jul 26 07:21:23 2023 +0530

    KAFKA-15218: Avoid NPE thrown while deleting topic and fetch from follower concurrently (apache#14051)

    When deleting topics, we'll first clear all the remoteReplicaMap when stopPartitions here. But this time, there might be fetch request coming from follower, and try to check if the replica is eligible to be added into ISR here. At this moment, NPE will be thrown. Although it's fine since this topic is already deleted, it'd be better to avoid it happen.

    Reviewers: Luke Chen <showuon@gmail.com>

commit af1f50f
Author: Matthias J. Sax <matthias@confluent.io>
Date:   Tue Jul 25 14:56:58 2023 -0700

    MINOR: fix docs markup (apache#14085)

    Reviewers: Qichao Chu (@ex172000), Mickael Maison <mickael.maison@gmail.com>

commit e794bc7
Author: David Arthur <mumrah@gmail.com>
Date:   Tue Jul 25 16:05:04 2023 -0400

    MINOR: Add a Builder for KRaftMigrationDriver (apache#14062)

    Reviewers: Justine Olshan <jolshan@confluent.io>

commit 8b027b6
Author: tison <wander4096@gmail.com>
Date:   Tue Jul 25 23:56:49 2023 +0800

    MINOR: Fix typo in ProduceRequest.json (apache#14070)

    Reviewers: Mickael Maison <mickael.maison@gmail.com>

commit 08b3820
Author: Yash Mayya <yash.mayya@gmail.com>
Date:   Tue Jul 25 14:03:29 2023 +0100

    KAFKA-15238: Move DLQ reporter setup from the DistributedHerder's tick thread to the sink task thread (apache#14079)

    Reviewers: Chris Egerton <chrise@aiven.io>

commit 58b8c5c
Author: Chris Egerton <chrise@aiven.io>
Date:   Tue Jul 25 05:12:46 2023 -0700

    MINOR: Downgrade log level for conflicting Connect plugin aliases (apache#14081)

    Reviewers: Greg Harris <greg.harris@aiven.io>

commit c7de30f
Author: Colin Patrick McCabe <cmccabe@apache.org>
Date:   Mon Jul 24 21:13:58 2023 -0700

    KAFKA-15183: Add more controller, loader, snapshot emitter metrics (apache#14010)

    Implement some of the metrics from KIP-938: Add more metrics for
    measuring KRaft performance.

    Add these metrics to QuorumControllerMetrics:
        kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
        kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
        kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
        kafka.controller:type=KafkaController,name=NewActiveControllersCount

    Create LoaderMetrics with these new metrics:
        kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
        kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount

    Create SnapshotEmitterMetrics with these new metrics:
        kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
        kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs

    Reviewers: Ron Dagostino <rndgstn@gmail.com>

commit 79b8c96
Author: David Mao <47232755+splett2@users.noreply.github.com>
Date:   Mon Jul 24 13:22:25 2023 -0700

    KAFKA-14990: Dynamic producer ID expiration should be applied on a broker restart (apache#13707)

    Dynamic overrides for the producer ID expiration config are not picked up on broker restart in Zookeeper mode. Based on the integration test, this does not apply to KRaft mode.

    Adds a broker restart that fails without the corresponding KafkaConfig change.

    Reviewers: Justine Olshan <jolshan@confluent.io>

commit 38781f9
Author: Justine Olshan <jolshan@confluent.io>
Date:   Mon Jul 24 13:08:57 2023 -0700

    KAFKA-14920: Address timeouts and out of order sequences (apache#14033)

    When creating a verification state entry, we also store sequence and epoch. On subsequent requests, we will take the latest epoch seen and the earliest sequence seen. That way, if we try to append a sequence after the earliest seen sequence, we can block that and retry. This addresses potential OutOfOrderSequence loops caused by errors during verification (coordinator loading, timeouts, etc).

    Reviewers:  David Jacot <david.jacot@gmail.com>,  Artem Livshits <alivshits@confluent.io>
  • Loading branch information
rreddy-22 committed Jul 31, 2023
1 parent 060dc0b commit 88324ac
Show file tree
Hide file tree
Showing 158 changed files with 12,733 additions and 3,038 deletions.
2 changes: 1 addition & 1 deletion LICENSE-binary
Expand Up @@ -259,7 +259,7 @@ scala-library-2.13.11
scala-logging_2.13-3.9.4
scala-reflect-2.13.11
scala-java8-compat_2.13-1.0.2
snappy-java-1.1.10.1
snappy-java-1.1.10.3
swagger-annotations-2.2.8
zookeeper-3.6.4
zookeeper-jute-3.6.4
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -288,4 +288,4 @@ See [vagrant/README.md](vagrant/README.md).
Apache Kafka is interested in building the community; we would welcome any thoughts or [patches](https://issues.apache.org/jira/browse/KAFKA). You can reach us [on the Apache mailing lists](http://kafka.apache.org/contact.html).

To contribute follow the instructions here:
* https://kafka.apache.org/contributing.html
* https://kafka.apache.org/contributing.html
2 changes: 1 addition & 1 deletion bin/kafka-replica-verification.sh
Expand Up @@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.tools.ReplicaVerificationTool "$@"
exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.ReplicaVerificationTool "$@"
2 changes: 1 addition & 1 deletion bin/windows/kafka-replica-verification.bat
Expand Up @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

"%~dp0kafka-run-class.bat" kafka.tools.ReplicaVerificationTool %*
"%~dp0kafka-run-class.bat" org.apache.kafka.tools.ReplicaVerificationTool %*
14 changes: 7 additions & 7 deletions build.gradle
Expand Up @@ -31,8 +31,9 @@ buildscript {
}

plugins {
id 'com.github.ben-manes.versions' version '0.46.0'
id 'com.github.ben-manes.versions' version '0.47.0'
id 'idea'
id 'jacoco'
id 'java-library'
id 'org.owasp.dependencycheck' version '8.2.1'
id 'org.nosphere.apache.rat' version "0.8.0"
Expand Down Expand Up @@ -735,9 +736,9 @@ subprojects {
dependsOn tasks.test
sourceSets sourceSets.main
reports {
html.enabled = true
xml.enabled = true
csv.enabled = false
html.required = true
xml.required = true
csv.required = false
}
}

Expand Down Expand Up @@ -808,10 +809,9 @@ if (userEnableTestCoverage) {
executionData.from = javaProjects.jacocoTestReport.executionData

reports {
html.enabled = true
xml.enabled = true
html.required = true
xml.required = true
}

// workaround to ignore projects that don't have any tests at all
onlyIf = { true }
doFirst {
Expand Down
24 changes: 21 additions & 3 deletions checkstyle/import-control-metadata.xml
Expand Up @@ -63,7 +63,6 @@
</subpackage>

<subpackage name="controller">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.common.acl" />
Expand All @@ -73,7 +72,6 @@
<allow pkg="org.apache.kafka.common.internals" />
<allow pkg="org.apache.kafka.common.message" />
<allow pkg="org.apache.kafka.common.metadata" />
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.common.network" />
<allow pkg="org.apache.kafka.common.protocol" />
<allow pkg="org.apache.kafka.common.quota" />
Expand All @@ -93,13 +91,17 @@
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.mutable" />
<allow pkg="org.apache.kafka.server.policy"/>
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>

<subpackage name="image">
Expand All @@ -122,6 +124,22 @@
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.snapshot" />
<allow pkg="org.apache.kafka.test" />
<subpackage name="loader">
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.controller.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>
<subpackage name="publisher">
<subpackage name="metrics">
<allow pkg="com.yammer.metrics"/>
<allow pkg="org.apache.kafka.common.metrics" />
<allow pkg="org.apache.kafka.controller.metrics" />
<allow pkg="org.apache.kafka.server.metrics" />
</subpackage>
</subpackage>
</subpackage>

<subpackage name="metadata">
Expand Down
6 changes: 3 additions & 3 deletions checkstyle/suppressions.xml
Expand Up @@ -201,7 +201,7 @@
files="Murmur3.java"/>

<suppress checks="(NPathComplexity|CyclomaticComplexity)"
files="KStreamSlidingWindowAggregate.java"/>
files="(KStreamSlidingWindowAggregate|RackAwareTaskAssignor).java"/>

<!-- suppress FinalLocalVariable outside of the streams package. -->
<suppress checks="FinalLocalVariable"
Expand Down Expand Up @@ -266,7 +266,7 @@
<suppress checks="BooleanExpressionComplexity"
files="StreamsResetter.java"/>
<suppress checks="NPathComplexity"
files="(ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier).java"/>
files="(ProducerPerformance|StreamsResetter|Agent|TransactionalMessageCopier|ReplicaVerificationTool).java"/>
<suppress checks="ImportControl"
files="SignalLogger.java"/>
<suppress checks="IllegalImport"
Expand Down Expand Up @@ -339,7 +339,7 @@
<suppress checks="ParameterNumber"
files="(ConsumerGroupMember|GroupMetadataManager).java"/>
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest|GroupMetadataManagerTest).java"/>
files="(RecordHelpersTest|GroupMetadataManagerTest|GroupCoordinatorServiceTest).java"/>
<suppress checks="JavaNCSS"
files="GroupMetadataManagerTest.java"/>

Expand Down
Expand Up @@ -236,7 +236,12 @@ AbstractRequest.Builder<?> requestBuilder() {

@Override
public String toString() {
return "UnsentRequest(builder=" + requestBuilder + ")";
return "UnsentRequest{" +
"requestBuilder=" + requestBuilder +
", handler=" + handler +
", node=" + node +
", timer=" + timer +
'}';
}
}

Expand Down
Expand Up @@ -333,7 +333,7 @@ public Map<TopicPartition, OffsetAndMetadata> committed(final Set<TopicPartition
final OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions);
eventHandler.add(event);
try {
return event.complete(Duration.ofMillis(100));
return event.complete(timeout);
} catch (InterruptedException e) {
throw new InterruptException(e);
} catch (TimeoutException e) {
Expand Down
Expand Up @@ -110,7 +110,7 @@ public static KafkaFuture<Void> allOf(KafkaFuture<?>... futures) {
* {@code CompletionStage} will work normally.
*
* <p>If you want to block on the completion of a KafkaFuture you should use
* {@link #get()}, {@link #get(long, TimeUnit)} or {@link #getNow(Object)}, rather then calling
* {@link #get()}, {@link #get(long, TimeUnit)} or {@link #getNow(Object)}, rather than calling
* {@code .toCompletionStage().toCompletableFuture().get()} etc.
*
* @since Kafka 3.0
Expand Down
Expand Up @@ -30,7 +30,6 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class OffsetCommitRequest extends AbstractRequest {
// default values for the current version
Expand Down Expand Up @@ -121,8 +120,4 @@ public OffsetCommitResponse getErrorResponse(Throwable e) {
public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
return new OffsetCommitRequest(new OffsetCommitRequestData(new ByteBufferAccessor(buffer), version), version);
}

public static Optional<String> groupInstanceId(OffsetCommitRequestData request) {
return Optional.ofNullable(request.groupInstanceId());
}
}
Expand Up @@ -30,7 +30,8 @@
// Version 8 is the first flexible version.
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is
// the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used.
// the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used and
// GROUP_ID_NOT_FOUND when the group does not exist for both protocols.
"validVersions": "0-9",
"flexibleVersions": "8+",
// Supported errors:
Expand All @@ -42,6 +43,7 @@
// - UNKNOWN_MEMBER_ID (version 1+)
// - INVALID_COMMIT_OFFSET_SIZE (version 0+)
// - FENCED_MEMBER_EPOCH (version 7+)
// - GROUP_ID_NOT_FOUND (version 9+)
// - STALE_MEMBER_EPOCH (version 9+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
Expand Down
Expand Up @@ -30,7 +30,7 @@
//
// Starting in version 7, records can be produced using ZStandard compression. See KIP-110.
//
// Starting in Version 8, response has RecordErrors and ErrorMEssage. See KIP-467.
// Starting in Version 8, response has RecordErrors and ErrorMessage. See KIP-467.
//
// Version 9 enables flexible versions.
"validVersions": "0-9",
Expand Down
Expand Up @@ -391,13 +391,23 @@ synchronized public CreateTopicsResult createTopics(Collection<NewTopic> newTopi
topicIds.put(topicName, topicId);
topicNames.put(topicId, topicName);
allTopics.put(topicName, new TopicMetadata(topicId, false, partitions, logDirs, newTopic.configs()));
future.complete(null);
future.complete(new CreateTopicsResult.TopicMetadataAndConfig(topicId, numberOfPartitions, replicationFactor, config(newTopic)));
createTopicResult.put(topicName, future);
}

return new CreateTopicsResult(createTopicResult);
}

private static Config config(NewTopic newTopic) {
Collection<ConfigEntry> configEntries = new ArrayList<>();
if (newTopic.configs() != null) {
for (Map.Entry<String, String> entry : newTopic.configs().entrySet()) {
configEntries.add(new ConfigEntry(entry.getKey(), entry.getValue()));
}
}
return new Config(configEntries);
}

@Override
synchronized public ListTopicsResult listTopics(ListTopicsOptions options) {
Map<String, TopicListing> topicListings = new HashMap<>();
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
Expand Down Expand Up @@ -3085,7 +3086,7 @@ public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {

time.sleep(20);

sendIdempotentProducerResponse(0, tp0, Errors.INVALID_RECORD, -1);
sendIdempotentProducerResponse(0, tp0, Errors.INVALID_TXN_STATE, -1);
sender.runOnce(); // receive late response

// Loop once and confirm that the transaction manager does not enter a fatal error state
Expand All @@ -3107,6 +3108,45 @@ public void testReceiveFailedBatchTwiceWithTransactions() throws Exception {
txnManager.beginTransaction();
}

@Test
public void testInvalidTxnStateIsAnAbortableError() throws Exception {
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
apiVersions.update("0", NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
TransactionManager txnManager = new TransactionManager(logContext, "testInvalidTxnState", 60000, 100, apiVersions);

setupWithTransactionState(txnManager);
doInitTransactions(txnManager, producerIdAndEpoch);

txnManager.beginTransaction();
txnManager.maybeAddPartition(tp0);
client.prepareResponse(buildAddPartitionsToTxnResponseData(0, Collections.singletonMap(tp0, Errors.NONE)));
sender.runOnce();

Future<RecordMetadata> request = appendToAccumulator(tp0);
sender.runOnce(); // send request
sendIdempotentProducerResponse(0, tp0, Errors.INVALID_TXN_STATE, -1);

// Return InvalidTxnState error. It should be abortable.
sender.runOnce();
assertFutureFailure(request, InvalidTxnStateException.class);
assertTrue(txnManager.hasAbortableError());
TransactionalRequestResult result = txnManager.beginAbort();
sender.runOnce();

// Once the transaction is aborted, we should be able to begin a new one.
respondToEndTxn(Errors.NONE);
sender.runOnce();
assertTrue(txnManager::isInitializing);
prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
sender.runOnce();
assertTrue(txnManager::isReady);

assertTrue(result.isSuccessful());
result.await();

txnManager.beginTransaction();
}

private void verifyErrorMessage(ProduceResponse response, String expectedMessage) throws Exception {
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // connect
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.runtime.errors.ToleranceType;
Expand Down Expand Up @@ -65,6 +66,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;

Expand Down Expand Up @@ -195,6 +197,7 @@ protected abstract void producerSendFailed(
private final boolean topicTrackingEnabled;
private final TopicCreation topicCreation;
private final Executor closeExecutor;
private final Supplier<List<ErrorReporter>> errorReportersSupplier;

// Visible for testing
List<SourceRecord> toSend;
Expand Down Expand Up @@ -224,7 +227,8 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id,
Time time,
RetryWithToleranceOperator retryWithToleranceOperator,
StatusBackingStore statusBackingStore,
Executor closeExecutor) {
Executor closeExecutor,
Supplier<List<ErrorReporter>> errorReportersSupplier) {

super(id, statusListener, initialState, loader, connectMetrics, errorMetrics,
retryWithToleranceOperator, time, statusBackingStore);
Expand All @@ -242,6 +246,7 @@ protected AbstractWorkerSourceTask(ConnectorTaskId id,
this.offsetStore = Objects.requireNonNull(offsetStore, "offset store cannot be null for source tasks");
this.closeExecutor = closeExecutor;
this.sourceTaskContext = sourceTaskContext;
this.errorReportersSupplier = errorReportersSupplier;

this.stopRequestedLatch = new CountDownLatch(1);
this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics);
Expand All @@ -261,6 +266,7 @@ public void initialize(TaskConfig taskConfig) {

@Override
protected void initializeAndStart() {
retryWithToleranceOperator.reporters(errorReportersSupplier.get());
prepareToInitializeTask();
offsetStore.start();
// If we try to start the task at all by invoking initialize, then count this as
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.errors.ErrorHandlingMetrics;
import org.apache.kafka.connect.runtime.errors.ErrorReporter;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
Expand All @@ -47,11 +48,13 @@
import org.slf4j.LoggerFactory;

import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;


/**
Expand Down Expand Up @@ -94,11 +97,12 @@ public ExactlyOnceWorkerSourceTask(ConnectorTaskId id,
SourceConnectorConfig sourceConfig,
Executor closeExecutor,
Runnable preProducerCheck,
Runnable postProducerCheck) {
Runnable postProducerCheck,
Supplier<List<ErrorReporter>> errorReportersSupplier) {
super(id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain,
new WorkerSourceTaskContext(offsetReader, id, configState, buildTransactionContext(sourceConfig)),
producer, admin, topicGroups, offsetReader, offsetWriter, offsetStore, workerConfig, connectMetrics, errorMetrics,
loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor);
loader, time, retryWithToleranceOperator, statusBackingStore, closeExecutor, errorReportersSupplier);

this.transactionOpen = false;
this.committableRecords = new LinkedHashMap<>();
Expand Down

0 comments on commit 88324ac

Please sign in to comment.