-
Notifications
You must be signed in to change notification settings - Fork 4
/
consumergroup_controller.py
195 lines (180 loc) · 8.49 KB
/
consumergroup_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import logging
import re
from typing import Dict, List, Optional, Tuple
import pendulum
import pykafka
from confluent_kafka.cimpl import TopicPartition
from esque.cli.output import red_bold
from esque.clients.consumer import ConsumerFactory
from esque.cluster import Cluster
from esque.controller.topic_controller import TopicController
from esque.resources.consumergroup import ConsumerGroup
class ConsumerGroupOffsetPlan:
def __init__(
self,
topic_name: str,
current_offset: int,
proposed_offset: int,
high_watermark: int,
low_watermark: int,
partition_id: int,
) -> None:
self.topic_name = topic_name
self.current_offset = current_offset
self.proposed_offset = proposed_offset
self.high_watermark = high_watermark
self.low_watermark = low_watermark
self.partition_id = partition_id
@property
def offset_equal(self) -> bool:
return self.current_offset == self.proposed_offset
class ConsumerGroupController:
def __init__(self, cluster: Cluster):
self.cluster = cluster
self._logger = logging.getLogger(__name__)
def get_consumergroup(self, consumer_id) -> ConsumerGroup:
return ConsumerGroup(consumer_id, self.cluster)
def list_consumer_groups(self) -> List[str]:
brokers: Dict[int, pykafka.broker.Broker] = self.cluster.pykafka_client.cluster.brokers
return list(
set(group.decode("UTF-8") for _, broker in brokers.items() for group in broker.list_groups().groups)
)
def edit_consumer_group_offsets(self, consumer_id: str, offset_plan: List[ConsumerGroupOffsetPlan]):
"""
Commit consumergroup offsets to specific values
:param consumer_id: ID of the consumer group
:param offset_plan: List of ConsumerGroupOffsetPlan objects denoting the offsets for each partition in different topics
:return:
"""
consumer = ConsumerFactory().create_consumer(
group_id=consumer_id,
topic_name=None,
output_directory=None,
last=False,
avro=False,
initialize_default_output_directory=False,
match=None,
enable_auto_commit=False,
)
offsets = [
TopicPartition(
topic=plan_element.topic_name, partition=plan_element.partition_id, offset=plan_element.proposed_offset
)
for plan_element in offset_plan
if not plan_element.offset_equal
]
consumer.commit(offsets=offsets)
def create_consumer_group_offset_change_plan(
self,
consumer_id: str,
topic_name: str,
offset_to_value: Optional[int],
offset_by_delta: Optional[int],
offset_to_timestamp: Optional[str],
offset_from_group: Optional[str],
) -> List[ConsumerGroupOffsetPlan]:
consumer_group_state, offset_plans = self._read_current_consumergroup_offsets(
consumer_id=consumer_id, topic_name_expression=topic_name
)
if consumer_group_state == "Dead":
self._logger.error("The consumer group {} does not exist.".format(consumer_id))
return None
elif consumer_group_state == "Empty":
if offset_to_value:
for plan_element in offset_plans.values():
(allowed_offset, error, message) = self._select_new_offset_for_consumer(
offset_to_value, plan_element
)
plan_element.proposed_offset = allowed_offset
if error:
self._logger.error(message)
elif offset_by_delta:
for plan_element in offset_plans.values():
requested_offset = plan_element.current_offset + offset_by_delta
(allowed_offset, error, message) = self._select_new_offset_for_consumer(
requested_offset, plan_element
)
plan_element.proposed_offset = allowed_offset
if error:
self._logger.error(message)
elif offset_to_timestamp:
timestamp_limit = pendulum.parse(offset_to_timestamp)
proposed_offset_dict = TopicController(self.cluster, None).get_offsets_closest_to_timestamp(
group_id=consumer_id, topic_name=topic_name, timestamp_limit=timestamp_limit
)
for plan_element in offset_plans.values():
plan_element.proposed_offset = proposed_offset_dict.get(plan_element.partition_id, 0)
elif offset_from_group:
_, mirror_consumer_group = self._read_current_consumergroup_offsets(
consumer_id=offset_from_group, topic_name_expression=topic_name
)
for key, value in mirror_consumer_group.items():
if key in offset_plans.keys():
offset_plans[key].proposed_offset = value.current_offset
else:
value.current_offset = 0
offset_plans[key] = value
return list(offset_plans.values())
else:
self._logger.error(
"Consumergroup {} is not empty. Use the {} option if you want to override this safety mechanism.".format(
consumer_id, red_bold("--force")
)
)
return list(offset_plans.values())
@staticmethod
def _select_new_offset_for_consumer(
requested_offset: int, offset_plan: ConsumerGroupOffsetPlan
) -> Tuple[int, bool, str]:
if requested_offset < offset_plan.low_watermark:
final_value = offset_plan.low_watermark
error = True
message = "The requested offset ({}) is outside of the allowable range [{},{}] for partition {} in topic {}. Setting to low watermark.".format(
requested_offset,
red_bold(str(offset_plan.low_watermark)),
offset_plan.high_watermark,
offset_plan.partition_id,
offset_plan.topic_name,
)
elif requested_offset > offset_plan.high_watermark:
final_value = offset_plan.high_watermark
error = True
message = "The requested offset ({}) is outside of the allowable range [{},{}] for partition {} in topic {}. Setting to high watermark.".format(
requested_offset,
offset_plan.low_watermark,
red_bold(str(offset_plan.high_watermark)),
offset_plan.partition_id,
offset_plan.topic_name,
)
else:
final_value = requested_offset
error = False
message = ""
return final_value, error, message
def _read_current_consumergroup_offsets(
self, consumer_id: str, topic_name_expression: str
) -> Tuple[str, List[ConsumerGroupOffsetPlan]]:
offset_plans = {}
topic_name_pattern = re.compile(topic_name_expression, re.IGNORECASE)
try:
consumer_group = self.get_consumergroup(consumer_id)
consumer_group_desc = consumer_group.describe(verbose=True)
consumer_group_state = consumer_group_desc["meta"]["state"].decode("UTF-8")
for subscribed_topic_name in consumer_group_desc["offsets"]:
decoded_topic_name = subscribed_topic_name.decode("UTF-8")
if topic_name_pattern.match(decoded_topic_name):
for partition_id, partition_info in consumer_group_desc["offsets"][subscribed_topic_name].items():
consumer_offset_plan = ConsumerGroupOffsetPlan(
topic_name=decoded_topic_name,
current_offset=partition_info["consumer_offset"],
proposed_offset=partition_info["consumer_offset"],
high_watermark=partition_info["topic_high_watermark"],
low_watermark=partition_info["topic_low_watermark"],
partition_id=partition_id,
)
offset_plans[f"{decoded_topic_name}::{partition_id}"] = consumer_offset_plan
except AttributeError:
return "Dead", offset_plans
if len(offset_plans) == 0:
self._logger.warning("No offsets have ever been committed by consumergroup {}.".format(consumer_id))
return consumer_group_state, offset_plans