Skip to content

Commit

Permalink
Merge pull request #163 from zalando-nakadi/old-topics
Browse files Browse the repository at this point in the history
Wait for leadership finish and exclude new topics
  • Loading branch information
ferbncode committed Oct 22, 2020
2 parents 17c0105 + 1a6150b commit 0c9c943
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 3 deletions.
3 changes: 2 additions & 1 deletion bubuku/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ def _is_leadership_transferred(self, active_broker_ids=None, dead_broker_ids=Non
_LOG.info('Checking if leadership is transferred: active_broker_ids={}, dead_broker_ids={}'.format(
active_broker_ids, dead_broker_ids))
if self._is_clean_election():
for topic, partition, state in self.exhibitor.load_partition_states():
topics = self.exhibitor.load_active_topics()
for topic, partition, state in self.exhibitor.load_partition_states(topics=topics):
leader = str(state['leader'])
if active_broker_ids and leader not in active_broker_ids:
if any(str(x) in active_broker_ids for x in state.get('isr', [])):
Expand Down
19 changes: 19 additions & 0 deletions bubuku/zookeeper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import uuid
from typing import Dict, List, Iterable, Tuple
from datetime import datetime, timezone, timedelta

from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError, NoNodeError, ConnectionLossException
Expand Down Expand Up @@ -220,6 +221,24 @@ def get_broker_racks(self) -> Dict[int, str]:
int(broker): json.loads(self.exhibitor.get('/brokers/ids/{}'.format(broker))[0].decode('utf-8')).get('rack') for
broker in self.get_broker_ids()}

def load_active_topics(self, minimum_age_seconds=None) -> List[str]:
"""
Lists the topics that are not being deleted from Kafka. Newer topics can be excluded with specifying a min age
:return: a list of topics
"""
topics = [topic_ for topic_ in self.exhibitor.get_children('/brokers/topics')
if topic_ not in self.exhibitor.get_children('/admin/delete_topics')]
if not minimum_age_seconds:
return topics

topics_ = []
for topic in topics:
metadata = self.exhibitor.get('/brokers/topics/{}'.format(topic))[1]
if datetime.fromtimestamp(metadata.created, timezone.utc) < \
datetime.now(timezone.utc) - timedelta(seconds=minimum_age_seconds):
topics_.append(topic)
return topics_

def load_partition_assignment(self, topics=None) -> Iterable[Tuple[str, int, List[int]]]:
"""
Lists all the assignments of partitions to particular broker ids.
Expand Down
4 changes: 2 additions & 2 deletions tests/test_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_broker_checks_death(self):
exhibitor = MagicMock()
states = [2, 2]

def _load_states():
def _load_states(topics=None):
for idx in range(0, len(states)):
states[idx] -= 1
return [
Expand Down Expand Up @@ -81,7 +81,7 @@ def test_broker_start_fail_isr(self):
# suppose that leader is not present
try:
broker.start_kafka_process(zk_fake_host)
assert False, 'broker 1 must be in leaders, it must be impossible to start it'
assert False, 'broker 2 must be in leaders, it must be impossible to start 1'
except LeaderElectionInProgress:
pass

Expand Down
17 changes: 17 additions & 0 deletions tests/test_zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,23 @@ def _get_children(path):
assert ['1', '2', '3'] == buku.get_broker_ids() # ensure that return list is sorted


def test_load_active_topics():
exhibitor_mock = MagicMock()

def _get_children(path):
if path == '/brokers/topics':
return ['3', '1', '2']
elif path == '/admin/delete_topics':
return ['3', '1']
else:
raise NotImplementedError()

exhibitor_mock.get_children = _get_children
buku = BukuExhibitor(exhibitor_mock)

assert ['2'] == buku.load_active_topics()


def test_is_broker_registered():
def _get(path):
if path == '/brokers/ids/123':
Expand Down

0 comments on commit 0c9c943

Please sign in to comment.