-
Notifications
You must be signed in to change notification settings - Fork 220
/
Copy pathRebalanceListner.java
40 lines (31 loc) · 1.47 KB
/
RebalanceListner.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.util.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.*;
public class RebalanceListner implements ConsumerRebalanceListener {
private KafkaConsumer consumer;
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap();
public RebalanceListner(KafkaConsumer con){
this.consumer=con;
}
public void addOffset(String topic, int partition, long offset){
currentOffsets.put(new TopicPartition(topic, partition),new OffsetAndMetadata(offset,"Commit"));
}
public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets(){
return currentOffsets;
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Following Partitions Assigned ....");
for(TopicPartition partition: partitions)
System.out.println(partition.partition()+",");
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Following Partitions Revoked ....");
for(TopicPartition partition: partitions)
System.out.println(partition.partition()+",");
System.out.println("Following Partitions commited ...." );
for(TopicPartition tp: currentOffsets.keySet())
System.out.println(tp.partition());
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
}