From 75483dc5781f6acf3bc57c037a2c5a9e74628f07 Mon Sep 17 00:00:00 2001 From: Adarsh-jaiss Date: Thu, 13 Jul 2023 09:49:37 +0530 Subject: [PATCH 1/3] Add examples to docstrings for consumer methods --- memphis/consumer.py | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index e191e8d..ef036cd 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -53,7 +53,45 @@ def set_context(self, context): self.context = context def consume(self, callback): - """Consume events.""" + """ + Consume events. + + This method starts consuming events from the specified station and invokes the provided callback function for each batch of messages received. + + Parameters: + callback (function): A function that will be called with each batch of messages received. The function should have the following signature: + - `callback(messages: List[Message], error: Optional[MemphisError], context: Dict) -> Awaitable[None]` + - `messages`: A list of `Message` objects representing the batch of messages received. + - `error`: An optional `MemphisError` object if there was an error while consuming the messages. + - `context`: A dictionary representing the context that was set using the `set_context()` method. + + Example: + import asyncio + from memphis import Memphis + + async def message_handler(messages, error, context): + if error: + print(f"Error occurred: {error}") + return + + for message in messages: + print(f"Received message: {message}") + + async def main(): + memphis = Memphis() + await memphis.connect(host='localhost', username='user', password='pass') + consumer = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') + consumer.set_context({'key': 'value'}) + consumer.consume(callback=message_handler) + + # Keep the event loop running + while True: + await asyncio.sleep(1) + + asyncio.run(main()) + + + """ self.dls_callback_func = callback self.t_consume = asyncio.create_task(self.__consume(callback)) From 3604fcbd2d0e16335cda426ddf66f2a902c2a5a6 Mon Sep 17 00:00:00 2001 From: Adarsh jaiswal Date: Fri, 14 Jul 2023 18:54:12 +0530 Subject: [PATCH 2/3] Added required changes in comsumer.py - removed the first line and blank lines - Used the named parameters --- memphis/consumer.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index ef036cd..eeb0737 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -54,8 +54,6 @@ def set_context(self, context): def consume(self, callback): """ - Consume events. - This method starts consuming events from the specified station and invokes the provided callback function for each batch of messages received. Parameters: @@ -80,17 +78,14 @@ async def message_handler(messages, error, context): async def main(): memphis = Memphis() await memphis.connect(host='localhost', username='user', password='pass') - consumer = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') + callback = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') consumer.set_context({'key': 'value'}) consumer.consume(callback=message_handler) # Keep the event loop running while True: await asyncio.sleep(1) - asyncio.run(main()) - - """ self.dls_callback_func = callback self.t_consume = asyncio.create_task(self.__consume(callback)) From 5ce4f76a4eb4dee23db97fdb07b5b68baea6b50e Mon Sep 17 00:00:00 2001 From: Adarsh jaiswal Date: Sat, 15 Jul 2023 00:26:35 +0530 Subject: [PATCH 3/3] Removed callback parameter --- memphis/consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index eeb0737..c1731ab 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -78,9 +78,9 @@ async def message_handler(messages, error, context): async def main(): memphis = Memphis() await memphis.connect(host='localhost', username='user', password='pass') - callback = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') + consumer = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') consumer.set_context({'key': 'value'}) - consumer.consume(callback=message_handler) + consumer.consume(message_handler) # Keep the event loop running while True: