Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion memphis/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,40 @@ def set_context(self, context):
self.context = context

def consume(self, callback):
"""Consume events."""
"""
This method starts consuming events from the specified station and invokes the provided callback function for each batch of messages received.

Parameters:
callback (function): A function that will be called with each batch of messages received. The function should have the following signature:
- `callback(messages: List[Message], error: Optional[MemphisError], context: Dict) -> Awaitable[None]`
- `messages`: A list of `Message` objects representing the batch of messages received.
- `error`: An optional `MemphisError` object if there was an error while consuming the messages.
- `context`: A dictionary representing the context that was set using the `set_context()` method.

Example:
import asyncio
from memphis import Memphis

async def message_handler(messages, error, context):
if error:
print(f"Error occurred: {error}")
return

for message in messages:
print(f"Received message: {message}")

async def main():
memphis = Memphis()
await memphis.connect(host='localhost', username='user', password='pass')
consumer = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group')
consumer.set_context({'key': 'value'})
consumer.consume(message_handler)

# Keep the event loop running
while True:
await asyncio.sleep(1)
asyncio.run(main())
"""
self.dls_callback_func = callback
self.t_consume = asyncio.create_task(self.__consume(callback))

Expand Down