From ec0cd54969aa9c65fe2b5d7097b06f2a9984db57 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 23 Mar 2018 05:58:55 -0700 Subject: [PATCH] Change SimpleProducer to use async_send (async is reserved in py37) (#1454) --- docs/simple.rst | 8 +++---- kafka/producer/base.py | 38 +++++++++++++++++++------------ kafka/producer/keyed.py | 2 +- kafka/producer/simple.py | 2 +- test/test_failover_integration.py | 8 +++---- test/test_producer_integration.py | 8 +++---- test/test_producer_legacy.py | 10 ++++---- 7 files changed, 42 insertions(+), 34 deletions(-) diff --git a/docs/simple.rst b/docs/simple.rst index 8192a8b76..afdb9756c 100644 --- a/docs/simple.rst +++ b/docs/simple.rst @@ -49,7 +49,7 @@ Asynchronous Mode # To send messages asynchronously client = SimpleClient('localhost:9092') - producer = SimpleProducer(client, async=True) + producer = SimpleProducer(client, async_send=True) producer.send_messages('my-topic', b'async message') # To send messages in batch. You can use any of the available @@ -60,7 +60,7 @@ Asynchronous Mode # * If the producer dies before the messages are sent, there will be losses # * Call producer.stop() to send the messages and cleanup producer = SimpleProducer(client, - async=True, + async_send=True, batch_send_every_n=20, batch_send_every_t=60) @@ -73,7 +73,7 @@ Synchronous Mode # To send messages synchronously client = SimpleClient('localhost:9092') - producer = SimpleProducer(client, async=False) + producer = SimpleProducer(client, async_send=False) # Note that the application is responsible for encoding messages to type bytes producer.send_messages('my-topic', b'some message') @@ -88,7 +88,7 @@ Synchronous Mode # ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed # by all in sync replicas before sending a response producer = SimpleProducer(client, - async=False, + async_send=False, req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE, ack_timeout=2000, sync_fail_on_error=False) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index c038bd3a0..e8d6c3d27 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -226,7 +226,7 @@ class Producer(object): Arguments: client (kafka.SimpleClient): instance to use for broker - communications. If async=True, the background thread will use + communications. If async_send=True, the background thread will use :meth:`client.copy`, which is expected to return a thread-safe object. codec (kafka.protocol.ALL_CODECS): compression codec to use. @@ -238,11 +238,11 @@ class Producer(object): sync_fail_on_error (bool, optional): whether sync producer should raise exceptions (True), or just return errors (False), defaults to True. - async (bool, optional): send message using a background thread, + async_send (bool, optional): send message using a background thread, defaults to False. - batch_send_every_n (int, optional): If async is True, messages are + batch_send_every_n (int, optional): If async_send is True, messages are sent in batches of this size, defaults to 20. - batch_send_every_t (int or float, optional): If async is True, + batch_send_every_t (int or float, optional): If async_send is True, messages are sent immediately after this timeout in seconds, even if there are fewer than batch_send_every_n, defaults to 20. async_retry_limit (int, optional): number of retries for failed messages @@ -268,8 +268,10 @@ class Producer(object): defaults to 30. Deprecated Arguments: + async (bool, optional): send message using a background thread, + defaults to False. Deprecated, use 'async_send' batch_send (bool, optional): If True, messages are sent by a background - thread in batches, defaults to False. Deprecated, use 'async' + thread in batches, defaults to False. Deprecated, use 'async_send' """ ACK_NOT_REQUIRED = 0 # No ack is required ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log @@ -282,8 +284,8 @@ def __init__(self, client, codec=None, codec_compresslevel=None, sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT, - async=False, - batch_send=False, # deprecated, use async + async_send=False, + batch_send=False, # deprecated, use async_send batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, async_retry_limit=ASYNC_RETRY_LIMIT, @@ -292,15 +294,21 @@ def __init__(self, client, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT, async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR, - async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS): + async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS, + **kwargs): + + # async renamed async_send for python3.7 support + if 'async' in kwargs: + log.warning('Deprecated async option found -- use async_send') + async_send = kwargs['async'] - if async: + if async_send: assert batch_send_every_n > 0 assert batch_send_every_t > 0 assert async_queue_maxsize >= 0 self.client = client - self.async = async + self.async_send = async_send self.req_acks = req_acks self.ack_timeout = ack_timeout self.stopped = False @@ -313,7 +321,7 @@ def __init__(self, client, self.codec = codec self.codec_compresslevel = codec_compresslevel - if self.async: + if self.async_send: # Messages are sent through this queue self.queue = Queue(async_queue_maxsize) self.async_queue_put_timeout = async_queue_put_timeout @@ -400,7 +408,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs): if key is not None and not isinstance(key, six.binary_type): raise TypeError("the key must be type bytes") - if self.async: + if self.async_send: for idx, m in enumerate(msg): try: item = (TopicPartition(topic, partition), m, key) @@ -435,7 +443,7 @@ def stop(self, timeout=None): log.warning('timeout argument to stop() is deprecated - ' 'it will be removed in future release') - if not self.async: + if not self.async_send: log.warning('producer.stop() called, but producer is not async') return @@ -443,7 +451,7 @@ def stop(self, timeout=None): log.warning('producer.stop() called, but producer is already stopped') return - if self.async: + if self.async_send: self.queue.put((STOP_ASYNC_PRODUCER, None, None)) self.thread_stop_event.set() self.thread.join() @@ -471,5 +479,5 @@ def stop(self, timeout=None): self.stopped = True def __del__(self): - if self.async and not self.stopped: + if self.async_send and not self.stopped: self.stop() diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 9fba33bbf..475b54ef5 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -46,4 +46,4 @@ def send(self, topic, key, msg): return self.send_messages(topic, key, msg) def __repr__(self): - return '' % self.async + return '' % self.async_send diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 90b3d4a7d..835909fe6 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -51,4 +51,4 @@ def send_messages(self, topic, *msg): ) def __repr__(self): - return '' % self.async + return '' % self.async_send diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 2439b5899..465afb713 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -61,7 +61,7 @@ def test_switch_leader(self): # require that the server commit messages to all in-sync replicas # so that failover doesn't lose any messages on server-side # and we can assert that server-side message count equals client-side - producer = Producer(self.client, async=False, + producer = Producer(self.client, async_send=False, req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) # Send 100 random messages to a specific partition @@ -102,7 +102,7 @@ def test_switch_leader_async(self): partition = 0 # Test the base class Producer -- send_messages to a specific partition - producer = Producer(self.client, async=True, + producer = Producer(self.client, async_send=True, batch_send_every_n=15, batch_send_every_t=3, req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, @@ -147,7 +147,7 @@ def test_switch_leader_async(self): def test_switch_leader_keyed_producer(self): topic = self.topic - producer = KeyedProducer(self.client, async=False) + producer = KeyedProducer(self.client, async_send=False) # Send 10 random messages for _ in range(10): @@ -183,7 +183,7 @@ def test_switch_leader_keyed_producer(self): producer.send_messages(topic, key, msg) def test_switch_leader_simple_consumer(self): - producer = Producer(self.client, async=False) + producer = Producer(self.client, async_send=False) consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10) self._send_random_messages(producer, self.topic, 0, 2) consumer.get_messages() diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index a304e83b6..9e3fb5f78 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -184,7 +184,7 @@ def test_async_simple_producer(self): partition = self.client.get_partition_ids_for_topic(self.topic)[0] start_offset = self.current_offset(self.topic, partition) - producer = SimpleProducer(self.client, async=True, random_start=False) + producer = SimpleProducer(self.client, async_send=True, random_start=False) resp = producer.send_messages(self.topic, self.msg("one")) self.assertEqual(len(resp), 0) @@ -203,7 +203,7 @@ def test_batched_simple_producer__triggers_by_message(self): batch_interval = 5 producer = SimpleProducer( self.client, - async=True, + async_send=True, batch_send_every_n=batch_messages, batch_send_every_t=batch_interval, random_start=False) @@ -268,7 +268,7 @@ def test_batched_simple_producer__triggers_by_time(self): batch_interval = 5 producer = SimpleProducer( self.client, - async=True, + async_send=True, batch_send_every_n=100, batch_send_every_t=batch_interval, random_start=False) @@ -400,7 +400,7 @@ def test_async_keyed_producer(self): producer = KeyedProducer(self.client, partitioner=RoundRobinPartitioner, - async=True, + async_send=True, batch_send_every_t=1) resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one")) diff --git a/test/test_producer_legacy.py b/test/test_producer_legacy.py index 9b87c7664..6d00116c3 100644 --- a/test/test_producer_legacy.py +++ b/test/test_producer_legacy.py @@ -73,7 +73,7 @@ def partitions(topic): @patch('kafka.producer.base._send_upstream') def test_producer_async_queue_overfilled(self, mock): queue_size = 2 - producer = Producer(MagicMock(), async=True, + producer = Producer(MagicMock(), async_send=True, async_queue_maxsize=queue_size) topic = b'test-topic' @@ -95,25 +95,25 @@ def test_producer_sync_fail_on_error(self): with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]): client = SimpleClient(MagicMock()) - producer = SimpleProducer(client, async=False, sync_fail_on_error=False) + producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False) # This should not raise (response,) = producer.send_messages('foobar', b'test message') self.assertEqual(response, error) - producer = SimpleProducer(client, async=False, sync_fail_on_error=True) + producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True) with self.assertRaises(FailedPayloadsError): producer.send_messages('foobar', b'test message') def test_cleanup_is_not_called_on_stopped_producer(self): - producer = Producer(MagicMock(), async=True) + producer = Producer(MagicMock(), async_send=True) producer.stopped = True with patch.object(producer, 'stop') as mocked_stop: producer._cleanup_func(producer) self.assertEqual(mocked_stop.call_count, 0) def test_cleanup_is_called_on_running_producer(self): - producer = Producer(MagicMock(), async=True) + producer = Producer(MagicMock(), async_send=True) producer.stopped = False with patch.object(producer, 'stop') as mocked_stop: producer._cleanup_func(producer)