diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 402d19f..dec8b2c 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @idanasulinmemphis @shohamroditimemphis \ No newline at end of file +* @idanasulinmemphis @rnowling-memphis diff --git a/memphis/consumer.py b/memphis/consumer.py index 9dcfca8..c1731ab 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -53,7 +53,40 @@ def set_context(self, context): self.context = context def consume(self, callback): - """Consume events.""" + """ + This method starts consuming events from the specified station and invokes the provided callback function for each batch of messages received. + + Parameters: + callback (function): A function that will be called with each batch of messages received. The function should have the following signature: + - `callback(messages: List[Message], error: Optional[MemphisError], context: Dict) -> Awaitable[None]` + - `messages`: A list of `Message` objects representing the batch of messages received. + - `error`: An optional `MemphisError` object if there was an error while consuming the messages. + - `context`: A dictionary representing the context that was set using the `set_context()` method. + + Example: + import asyncio + from memphis import Memphis + + async def message_handler(messages, error, context): + if error: + print(f"Error occurred: {error}") + return + + for message in messages: + print(f"Received message: {message}") + + async def main(): + memphis = Memphis() + await memphis.connect(host='localhost', username='user', password='pass') + consumer = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') + consumer.set_context({'key': 'value'}) + consumer.consume(message_handler) + + # Keep the event loop running + while True: + await asyncio.sleep(1) + asyncio.run(main()) + """ self.dls_callback_func = callback self.t_consume = asyncio.create_task(self.__consume(callback)) @@ -129,7 +162,7 @@ async def fetch(self, batch_size: int = 10): from memphis import Memphis - async def main(/, host, username, password, station): + async def main(host, username, password, station): memphis = Memphis() await memphis.connect(host=host, username=username, @@ -149,10 +182,10 @@ async def main(/, host, username, password, station): await memphis.close() if __name__ == '__main__': - asyncio.run(main(host=host, - username=username, - password=password, - station=station)) + asyncio.run(main(host, + username, + password, + station)) """ messages = [] diff --git a/memphis/producer.py b/memphis/producer.py index fb4d7fe..a89df34 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -265,13 +265,12 @@ async def produce( schemaverse_fail_alert_type, ) raise e - except Exception as e: + except Exception as e: # pylint: disable-next=no-member if hasattr(e, "status_code") and e.status_code == "503": raise MemphisError( "Produce operation has failed, please check whether Station/Producer still exist" ) - else: - raise MemphisError(str(e)) from e + raise MemphisError(str(e)) from e async def destroy(self): """Destroy the producer.""" diff --git a/setup.py b/setup.py index 51def80..7261c02 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( name="memphis-py", packages=["memphis"], - version="1.0.5", + version="1.0.6", license="Apache-2.0", description="A powerful messaging platform for modern developers", long_description=long_description, @@ -17,7 +17,7 @@ author="Memphis.dev", author_email="team@memphis.dev", url="https://github.com/memphisdev/memphis.py", - download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.5.tar.gz", + download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.6.tar.gz", keywords=["message broker", "devtool", "streaming", "data"], install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core"], classifiers=[ diff --git a/version-beta.conf b/version-beta.conf index f9cbc01..337a6a8 100644 --- a/version-beta.conf +++ b/version-beta.conf @@ -1 +1 @@ -1.0.7 \ No newline at end of file +1.0.8 \ No newline at end of file diff --git a/version.conf b/version.conf index 1464c52..ece61c6 100644 --- a/version.conf +++ b/version.conf @@ -1 +1 @@ -1.0.5 \ No newline at end of file +1.0.6 \ No newline at end of file