Skip to content

Commit

Permalink
Handle case where ListOffset returns an empty array of offsets (#78)
Browse files Browse the repository at this point in the history
There was an assumption made that ListOffset always returns at least one offset. However, in the case where the timestamp requested is out of range (or for brokers that do not have a time index, when the offset is in the first segment) where an empty array is returned. This updates the TopicOffsets model to handle that case and insert a -1 instead to indicate that no offset was available. I also added a test specifically for that case.
  • Loading branch information
toddpalino committed Sep 29, 2017
1 parent 3609630 commit 1031313
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 2 deletions.
6 changes: 5 additions & 1 deletion kafka/tools/models/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ def set_offsets_from_list(self, partitions):
"""
for partition in partitions:
raise_if_error(OffsetError, partition['error'])
self.partitions[partition['partition']] = partition['offsets'][0]
if len(partition['offsets']) > 0:
self.partitions[partition['partition']] = partition['offsets'][0]
else:
# We received no offsets back, so we'll just return -1 to indicate that
self.partitions[partition['partition']] = -1

def set_offsets_from_fetch(self, partitions):
"""
Expand Down
10 changes: 10 additions & 0 deletions tests/tools/client/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ def list_offset():
'offsets': [8904]}]}]})


def list_offset_none():
return ListOffsetV0Response({'responses': [{'topic': 'topic1',
'partition_responses': [{'partition': 0,
'error': 0,
'offsets': [4829]},
{'partition': 1,
'error': 0,
'offsets': []}]}]})


def list_offset_error():
return ListOffsetV0Response({'responses': [{'topic': 'topic1',
'partition_responses': [{'partition': 0,
Expand Down
11 changes: 10 additions & 1 deletion tests/tools/models/test_topic_and_partition.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest

from tests.tools.client.fixtures import list_offset, list_offset_error, offset_fetch, offset_fetch_error
from tests.tools.client.fixtures import list_offset, list_offset_none, list_offset_error, offset_fetch, offset_fetch_error

from kafka.tools.exceptions import OffsetError
from kafka.tools.models.cluster import Cluster
Expand Down Expand Up @@ -271,6 +271,15 @@ def test_set_offsets_from_list(self):
assert offsets.partitions[0] == 4829
assert offsets.partitions[1] == 8904

def test_set_offsets_empty_from_list(self):
topic = Topic('topic1', 2)
offsets = TopicOffsets(topic)
response = list_offset_none()

offsets.set_offsets_from_list(response['responses'][0]['partition_responses'])
assert offsets.partitions[0] == 4829
assert offsets.partitions[1] == -1

def test_set_offsets_from_list_error(self):
topic = Topic('topic1', 2)
offsets = TopicOffsets(topic)
Expand Down

0 comments on commit 1031313

Please sign in to comment.