Skip to content

Commit

Permalink
Move client support up to 0.10.2 (#85)
Browse files Browse the repository at this point in the history
  • Loading branch information
toddpalino committed Jan 12, 2018
1 parent 55993ff commit 7c60cb6
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 11 deletions.
19 changes: 9 additions & 10 deletions kafka/tools/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
from kafka.tools.protocol.requests.describe_groups_v0 import DescribeGroupsV0Request
from kafka.tools.protocol.requests.group_coordinator_v0 import GroupCoordinatorV0Request
from kafka.tools.protocol.requests.list_groups_v0 import ListGroupsV0Request
from kafka.tools.protocol.requests.list_offset_v0 import ListOffsetV0Request
from kafka.tools.protocol.requests.list_offset_v1 import ListOffsetV1Request
from kafka.tools.protocol.requests.offset_commit_v2 import OffsetCommitV2Request
from kafka.tools.protocol.requests.offset_fetch_v1 import OffsetFetchV1Request
from kafka.tools.protocol.requests.topic_metadata_v1 import TopicMetadataV1Request
from kafka.tools.protocol.requests.topic_metadata_v2 import TopicMetadataV2Request
from kafka.tools.models.broker import Broker
from kafka.tools.models.cluster import Cluster
from kafka.tools.models.group import Group
Expand Down Expand Up @@ -157,7 +157,7 @@ def get_topic(self, topic_name, cache=True):

if force_update:
# It doesn't matter what broker we fetch topic metadata from
metadata = self._send_any_broker(TopicMetadataV1Request({'topics': [topic_name]}))
metadata = self._send_any_broker(TopicMetadataV2Request({'topics': [topic_name]}))
raise_if_error(TopicError, metadata['topics'][0]['error'])

self._update_from_metadata(metadata)
Expand Down Expand Up @@ -278,8 +278,7 @@ def get_offsets_for_topics(self, topic_list, timestamp=OFFSET_LATEST):
for topic_name in broker_to_tp[broker_id]:
request_values[broker_id]['topics'].append({'topic': topic_name,
'partitions': [{'partition': i,
'timestamp': timestamp,
'max_num_offsets': 1} for i in broker_to_tp[broker_id][topic_name]]})
'timestamp': timestamp} for i in broker_to_tp[broker_id][topic_name]]})

return self._send_list_offsets_to_brokers(request_values)

Expand Down Expand Up @@ -394,7 +393,7 @@ def _maybe_bootstrap_cluster(self, broker_port):
return False

# Fetch topic metadata for all topics/brokers
req = TopicMetadataV1Request({'topics': None})
req = TopicMetadataV2Request({'topics': None})
correlation_id, metadata = broker.send(req)
broker.close()

Expand Down Expand Up @@ -462,7 +461,7 @@ def _send_some_brokers(self, requests, ignore_errors=True):
they were retrieved from. This method uses a thread pool to parallelize sends.
Args:
request (int -> BaseRequest): A dictionary, where keys are integer broker IDs and the values are valid
requests (int -> BaseRequest): A dictionary, where keys are integer broker IDs and the values are valid
request objects that inherit from BaseRequest.
Returns:
Expand Down Expand Up @@ -565,7 +564,7 @@ def _send_list_offsets_to_brokers(self, request_values):
"""
requests = {}
for broker_id in request_values:
requests[broker_id] = ListOffsetV0Request(request_values[broker_id])
requests[broker_id] = ListOffsetV1Request(request_values[broker_id])
responses = self._send_some_brokers(requests, ignore_errors=False)

rv = {}
Expand Down Expand Up @@ -722,7 +721,7 @@ def _maybe_update_metadata_for_topics(self, topics, cache=True):
force_update = True

if force_update:
self._update_from_metadata(self._send_any_broker(TopicMetadataV1Request({'topics': topics})), delete=False)
self._update_from_metadata(self._send_any_broker(TopicMetadataV2Request({'topics': topics})), delete=False)

def _maybe_update_full_metadata(self, cache=True):
"""
Expand All @@ -736,7 +735,7 @@ def _maybe_update_full_metadata(self, cache=True):
ConnectionError: If there is a failure to send the request to all brokers in the cluster
"""
if (not cache) or (self._last_full_metadata < (time.time() - self.configuration.metadata_refresh)):
self._update_from_metadata(self._send_any_broker(TopicMetadataV1Request({'topics': None})), delete=True)
self._update_from_metadata(self._send_any_broker(TopicMetadataV2Request({'topics': None})), delete=True)
self._last_full_metadata = time.time()

def _add_or_update_group(self, group_info, coordinator):
Expand Down
1 change: 0 additions & 1 deletion tests/tools/client/test_interface_offsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def test_get_offsets_for_topics(self):
assert values[broker_id]['topics'][0]['topic'] == 'topic1'
assert len(values[broker_id]['topics'][0]['partitions']) == 1
assert values[broker_id]['topics'][0]['partitions'][0]['timestamp'] == Client.OFFSET_LATEST
assert values[broker_id]['topics'][0]['partitions'][0]['max_num_offsets'] == 1
assert values[1]['topics'][0]['partitions'][0]['partition'] == 0
assert values[101]['topics'][0]['partitions'][0]['partition'] == 1

Expand Down

0 comments on commit 7c60cb6

Please sign in to comment.