Skip to content

Commit 7ab3d8e

Browse files
committed
消费指定时间窗内所产生的消息
1 parent 979baf1 commit 7ab3d8e

File tree

1 file changed

+86
-0
lines changed

1 file changed

+86
-0
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
### 参考代码
2+
3+
```python
4+
# -*- coding: utf-8 -*-
5+
"""
6+
Subject: 消费指定时间窗内所产生的消息
7+
@Author YH YR
8+
@Time 2019/01/23 21:23
9+
"""
10+
import time
11+
from kafka import KafkaConsumer, TopicPartition
12+
13+
14+
class ConsumerTimeStampWindow:
15+
def __init__(self, broker_list, group_name, topic, enable_auto_commit=True, auto_offset_reset='latest'):
16+
self.topic = topic
17+
self.consumer = KafkaConsumer(group_id=group_name, bootstrap_servers=broker_list,
18+
enable_auto_commit=enable_auto_commit, auto_offset_reset=auto_offset_reset)
19+
20+
def consumer_from_offset_window(self, process_msg, begin_time, end_time):
21+
self.consumer.subscribe(self.topic)
22+
self.consumer.poll(0)
23+
24+
begin_offset_dic, end_offset_dic = self.get_offset_time_window(begin_time, end_time)
25+
for topic_partition, offset_and_timestamp in begin_offset_dic.items():
26+
self.consumer.seek(topic_partition, offset_and_timestamp[0])
27+
28+
topic_partition_info = self.consumer.assignment()
29+
partition_consumer_finish_flag = dict(zip(topic_partition_info, [False] * len(topic_partition_info)))
30+
31+
while True:
32+
if False not in partition_consumer_finish_flag.values():
33+
return
34+
consumer_records = self.consumer.poll(100)
35+
for partition_info, records in consumer_records.items():
36+
if partition_consumer_finish_flag[partition_info]:
37+
print('-------------- {0} consumer finish --------------'.format(partition_info))
38+
break
39+
for record in records:
40+
if record.offset <= end_offset_dic[partition_info][0]:
41+
process_msg(record)
42+
else:
43+
partition_consumer_finish_flag[partition_info] = True
44+
45+
def get_offset_time_window(self, begin_time, end_time):
46+
partitions_structs = []
47+
48+
for partition_id in self.consumer.partitions_for_topic(self.topic):
49+
partitions_structs.append(TopicPartition(self.topic, partition_id))
50+
51+
begin_search = {}
52+
for partition in partitions_structs:
53+
begin_search[partition] = begin_time if isinstance(begin_time, int) else self.__str_to_timestamp(begin_time)
54+
begin_offset = self.consumer.offsets_for_times(begin_search)
55+
56+
end_search = {}
57+
for partition in partitions_structs:
58+
end_search[partition] = end_time if isinstance(end_time, int) else self.__str_to_timestamp(end_time)
59+
end_offset = self.consumer.offsets_for_times(end_search)
60+
61+
for topic_partition, offset_and_timestamp in begin_offset.items():
62+
b_offset = 'null' if offset_and_timestamp is None else offset_and_timestamp[0]
63+
e_offset = 'null' if end_offset[topic_partition] is None else end_offset[topic_partition][0]
64+
print('Between {0} and {1}, {2} offset range = [{3}, {4}]'.format(begin_time, end_time, topic_partition,
65+
b_offset, e_offset))
66+
return begin_offset, end_offset
67+
68+
@staticmethod
69+
def __str_to_timestamp(str_time, format_type='%Y-%m-%d %H:%M:%S'):
70+
time_array = time.strptime(str_time, format_type)
71+
return int(time.mktime(time_array)) * 1000
72+
73+
74+
def print_msg(msg_dic):
75+
print(msg_dic)
76+
77+
78+
if __name__ == '__main__':
79+
broker_list = 'localhost:9092'
80+
group_name = 'group_test'
81+
topic = 'topic_demo'
82+
83+
action = ConsumerTimeStampWindow(broker_list, group_name, topic)
84+
action.consumer_from_offset_window(print_msg, '2019-01-23 21:30:00', '2019-01-23 21:36:00')
85+
```
86+

0 commit comments

Comments
 (0)