diff --git a/pykafka/balancedconsumer.py b/pykafka/balancedconsumer.py index 5073d740f..ab8af6fa1 100644 --- a/pykafka/balancedconsumer.py +++ b/pykafka/balancedconsumer.py @@ -33,6 +33,7 @@ from .exceptions import (KafkaException, PartitionOwnedError, ConsumerStoppedException) from .simpleconsumer import SimpleConsumer +from .utils.compat import range log = logging.getLogger(__name__) @@ -409,7 +410,7 @@ def _rebalance(self): self._consumer_id, self._topic.name) ) - for i in xrange(self._rebalance_max_retries): + for i in range(self._rebalance_max_retries): try: # If retrying, be sure to make sure the # partition allocation is correct. diff --git a/pykafka/producer.py b/pykafka/producer.py index 329837091..1a7b2d944 100644 --- a/pykafka/producer.py +++ b/pykafka/producer.py @@ -31,7 +31,7 @@ ) from .partitioners import random_partitioner from .protocol import Message, ProduceRequest -from .utils.compat import string_types, get_bytes +from .utils.compat import string_types, get_bytes, iteritems log = logging.getLogger(__name__) @@ -126,8 +126,8 @@ def _get_partition_msgs(partition_id, req): """Get all the messages for the partitions from the request.""" messages = itertools.chain.from_iterable( mset.messages - for topic, partitions in req.msets.iteritems() - for p_id, mset in partitions.iteritems() + for topic, partitions in iteritems(req.msets) + for p_id, mset in iteritems(partitions) if p_id == partition_id ) for message in messages: @@ -141,8 +141,8 @@ def _get_partition_msgs(partition_id, req): # Figure out if we need to retry any messages # TODO: Convert to using utils.handle_partition_responses to_retry = [] - for topic, partitions in response.topics.iteritems(): - for partition, presponse in partitions.iteritems(): + for topic, partitions in iteritems(response.topics): + for partition, presponse in iteritems(partitions): if presponse.err == 0: continue # All's well if presponse.err == UnknownTopicOrPartition.ERROR_CODE: @@ -171,8 +171,8 @@ def _get_partition_msgs(partition_id, req): self._cluster.update() to_retry = [ ((message.partition_key, message.value), p_id) - for topic, partitions in req.msets.iteritems() - for p_id, mset in partitions.iteritems() + for topic, partitions in iteritems(req.msets) + for p_id, mset in iteritems(partitions) for message in mset.messages ] @@ -231,7 +231,7 @@ def _produce(self, message_partition_tups, attempt): attempt) # Send any still not sent - for leader, req in requests.iteritems(): + for leader, req in iteritems(requests): self._send_request(leader, req, attempt) def produce(self, messages): diff --git a/pykafka/simpleconsumer.py b/pykafka/simpleconsumer.py index e383fb553..f4c564cea 100644 --- a/pykafka/simpleconsumer.py +++ b/pykafka/simpleconsumer.py @@ -25,7 +25,8 @@ from collections import defaultdict from .common import OffsetType -from .utils.compat import Semaphore, Queue, Empty, iteritems, itervalues +from .utils.compat import (Semaphore, Queue, Empty, iteritems, itervalues, + range) from .exceptions import (OffsetOutOfRangeError, UnknownTopicOrPartition, OffsetMetadataTooLarge, OffsetsLoadInProgress, NotCoordinatorForConsumer, SocketDisconnectedError, @@ -279,7 +280,7 @@ def fetcher(): log.debug("Fetcher thread exiting") log.info("Starting %s fetcher threads", self._num_consumer_fetchers) return [self._cluster.handler.spawn(fetcher) - for i in xrange(self._num_consumer_fetchers)] + for i in range(self._num_consumer_fetchers)] def __iter__(self): """Yield an infinite stream of messages until the consumer times out""" @@ -340,7 +341,7 @@ def commit_offsets(self): reqs = [p.build_offset_commit_request() for p in self._partitions.values()] log.debug("Committing offsets for %d partitions to broker id %s", len(reqs), self._offset_manager.id) - for i in xrange(self._offsets_commit_max_retries): + for i in range(self._offsets_commit_max_retries): if i > 0: log.debug("Retrying") time.sleep(i * (self._offsets_channel_backoff_ms / 1000)) @@ -391,7 +392,7 @@ def _handle_success(parts): log.debug("Fetching offsets for %d partitions from broker id %s", len(reqs), self._offset_manager.id) - for i in xrange(self._offsets_fetch_max_retries): + for i in range(self._offsets_fetch_max_retries): if i > 0: log.debug("Retrying offset fetch") @@ -486,7 +487,7 @@ def _handle_success(parts): log.info("Resetting offsets for %s partitions", len(list(owned_partition_offsets))) - for i in xrange(self._offsets_reset_max_retries): + for i in range(self._offsets_reset_max_retries): # group partitions by leader by_leader = defaultdict(list) for partition, offset in iteritems(owned_partition_offsets): diff --git a/tests/pykafka/test_simpleconsumer.py b/tests/pykafka/test_simpleconsumer.py index fb2253e33..580891240 100644 --- a/tests/pykafka/test_simpleconsumer.py +++ b/tests/pykafka/test_simpleconsumer.py @@ -6,6 +6,7 @@ from pykafka import KafkaClient from pykafka.simpleconsumer import OwnedPartition, OffsetType from pykafka.test.utils import get_cluster, stop_cluster +from pykafka.utils.compat import range class TestSimpleConsumer(unittest.TestCase): @@ -26,7 +27,7 @@ def setUpClass(cls): for _ in range(3): cls.kafka.produce_messages( cls.topic_name, - ('msg {i}'.format(i=i) for i in xrange(batch))) + ('msg {i}'.format(i=i) for i in range(batch))) cls.client = KafkaClient(cls.kafka.brokers) @@ -44,7 +45,7 @@ def _get_simple_consumer(self, **kwargs): def test_consume(self): with self._get_simple_consumer() as consumer: - messages = [consumer.consume() for _ in xrange(self.total_msgs)] + messages = [consumer.consume() for _ in range(self.total_msgs)] self.assertEquals(len(messages), self.total_msgs) self.assertTrue(None not in messages) @@ -52,7 +53,7 @@ def test_offset_commit(self): """Check fetched offsets match pre-commit internal state""" with self._get_simple_consumer( consumer_group='test_offset_commit') as consumer: - [consumer.consume() for _ in xrange(100)] + [consumer.consume() for _ in range(100)] offsets_committed = consumer.held_offsets consumer.commit_offsets() @@ -64,7 +65,7 @@ def test_offset_resume(self): """Check resumed internal state matches committed offsets""" with self._get_simple_consumer( consumer_group='test_offset_resume') as consumer: - [consumer.consume() for _ in xrange(100)] + [consumer.consume() for _ in range(100)] offsets_committed = consumer.held_offsets consumer.commit_offsets()