1
- package com .baeldung .kafka ;
1
+ package com .baeldung .kafka . exactlyonce ;
2
2
3
3
import org .apache .kafka .clients .consumer .ConsumerRecord ;
4
4
import org .apache .kafka .clients .consumer .ConsumerRecords ;
@@ -43,10 +43,11 @@ public static void main(String[] args) {
43
43
ConsumerRecords <String , String > records = consumer .poll (ofSeconds (60 ));
44
44
45
45
Map <String , Integer > wordCountMap = records .records (new TopicPartition (INPUT_TOPIC , 0 ))
46
- .stream ()
47
- .flatMap (record -> Stream .of (record .value ().split (" " )))
48
- .map (word -> Tuple .of (word , 1 ))
49
- .collect (Collectors .toMap (tuple -> tuple .getKey (), t1 -> t1 .getValue (), (v1 , v2 ) -> v1 + v2 ));
46
+ .stream ()
47
+ .flatMap (record -> Stream .of (record .value ()
48
+ .split (" " )))
49
+ .map (word -> Tuple .of (word , 1 ))
50
+ .collect (Collectors .toMap (tuple -> tuple .getKey (), t1 -> t1 .getValue (), (v1 , v2 ) -> v1 + v2 ));
50
51
51
52
producer .beginTransaction ();
52
53
@@ -56,7 +57,8 @@ public static void main(String[] args) {
56
57
57
58
for (TopicPartition partition : records .partitions ()) {
58
59
List <ConsumerRecord <String , String >> partitionedRecords = records .records (partition );
59
- long offset = partitionedRecords .get (partitionedRecords .size () - 1 ).offset ();
60
+ long offset = partitionedRecords .get (partitionedRecords .size () - 1 )
61
+ .offset ();
60
62
61
63
offsetsToCommit .put (partition , new OffsetAndMetadata (offset + 1 ));
62
64
}
@@ -72,7 +74,6 @@ public static void main(String[] args) {
72
74
73
75
}
74
76
75
-
76
77
}
77
78
78
79
private static KafkaConsumer <String , String > createKafkaConsumer () {
0 commit comments