27
27
import org .apache .kafka .common .KafkaException ;
28
28
import org .apache .kafka .common .MetricName ;
29
29
import org .apache .kafka .common .Node ;
30
+ import org .apache .kafka .common .TopicIdPartition ;
30
31
import org .apache .kafka .common .TopicPartition ;
31
32
import org .apache .kafka .common .Uuid ;
32
33
import org .apache .kafka .common .errors .AuthenticationException ;
@@ -565,7 +566,7 @@ private boolean awaitNodeReady(Node node, FindCoordinatorRequest.CoordinatorType
565
566
/**
566
567
* Handle a produce response
567
568
*/
568
- private void handleProduceResponse (ClientResponse response , Map <TopicPartition , ProducerBatch > batches , long now ) {
569
+ private void handleProduceResponse (ClientResponse response , Map <TopicIdPartition , ProducerBatch > batches , long now ) {
569
570
RequestHeader requestHeader = response .requestHeader ();
570
571
int correlationId = requestHeader .correlationId ();
571
572
if (response .wasTimedOut ()) {
@@ -595,9 +596,6 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition,
595
596
// This will be set by completeBatch.
596
597
Map <TopicPartition , Metadata .LeaderIdAndEpoch > partitionsWithUpdatedLeaderInfo = new HashMap <>();
597
598
produceResponse .data ().responses ().forEach (r -> r .partitionResponses ().forEach (p -> {
598
- // Version 13 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name.
599
- String topicName = metadata .topicNames ().getOrDefault (r .topicId (), r .name ());
600
- TopicPartition tp = new TopicPartition (topicName , p .index ());
601
599
ProduceResponse .PartitionResponse partResp = new ProduceResponse .PartitionResponse (
602
600
Errors .forCode (p .errorCode ()),
603
601
p .baseOffset (),
@@ -609,7 +607,22 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition,
609
607
.collect (Collectors .toList ()),
610
608
p .errorMessage (),
611
609
p .currentLeader ());
612
- ProducerBatch batch = batches .get (tp );
610
+ ProducerBatch batch = null ;
611
+ // Version 13 drop topic name and add support to topic id.
612
+ // We need to find batch based on topic id and partition index only as
613
+ // topic name in the response might be empty.
614
+ List <ProducerBatch > matchedBatchesForTopicId = batches .entrySet ().stream ()
615
+ .filter (entry -> entry .getKey ().matchesTopicIdForPartition (r .topicId (), p .index ()))
616
+ .map (Map .Entry ::getValue )
617
+ .collect (Collectors .toList ());
618
+
619
+ if (matchedBatchesForTopicId .size () > 1 ) {
620
+ matchedBatchesForTopicId .forEach ((matchedBatch ) ->
621
+ failBatch (matchedBatch , new RuntimeException ("More than one batch with same topic id and partition." ), false ));
622
+ } else {
623
+ batch = matchedBatchesForTopicId .stream ().findFirst ().orElse (null );
624
+ }
625
+
613
626
completeBatch (batch , partResp , correlationId , now , partitionsWithUpdatedLeaderInfo );
614
627
}));
615
628
@@ -855,7 +868,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
855
868
if (batches .isEmpty ())
856
869
return ;
857
870
858
- final Map <TopicPartition , ProducerBatch > recordsByPartition = new HashMap <>(batches .size ());
871
+ final Map <TopicIdPartition , ProducerBatch > recordsByPartition = new HashMap <>(batches .size ());
859
872
Map <String , Uuid > topicIds = topicIdsForBatches (batches );
860
873
861
874
ProduceRequestData .TopicProduceDataCollection tpd = new ProduceRequestData .TopicProduceDataCollection ();
@@ -874,7 +887,7 @@ private void sendProduceRequest(long now, int destination, short acks, int timeo
874
887
tpData .partitionData ().add (new ProduceRequestData .PartitionProduceData ()
875
888
.setIndex (tp .partition ())
876
889
.setRecords (records ));
877
- recordsByPartition .put (tp , batch );
890
+ recordsByPartition .put (new TopicIdPartition ( topicId , tp ) , batch );
878
891
}
879
892
880
893
String transactionalId = null ;
0 commit comments