From 99b7db0ed0b404364eddd8f6834feb107219f7dd Mon Sep 17 00:00:00 2001 From: Yaniv Ben Hemo <70286779+yanivbh1@users.noreply.github.com> Date: Wed, 28 Jun 2023 17:16:29 +0300 Subject: [PATCH 01/12] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2cd5113..b5f153a 100644 --- a/README.md +++ b/README.md @@ -65,10 +65,10 @@ async def main(): await memphis.connect( host="", username="", - account_id="", # You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored + account_id=, # You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored connection_token="", # you will get it on application type user creation password="", # depends on how Memphis deployed - default is connection token-based authentication - port="", # defaults to 6666 + port=, # defaults to 6666 reconnect=True, # defaults to True max_reconnect=3, # defaults to 3 reconnect_interval_ms=1500, # defaults to 1500 From 4424d4219e91090293903afb6e53c5efafefede4 Mon Sep 17 00:00:00 2001 From: RJ Nowling <130711295+rnowling-memphis@users.noreply.github.com> Date: Wed, 28 Jun 2023 16:11:59 -0500 Subject: [PATCH 02/12] Fix return none (#172) * Fix case where consumer.fetch() returns None * Add docs * Begin adding documentation --- memphis/consumer.py | 50 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index c6e3531..9a6c708 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -35,7 +35,7 @@ def __init__( self.batch_max_time_to_wait_ms = batch_max_time_to_wait_ms self.max_ack_time_ms = max_ack_time_ms self.max_msg_deliveries = max_msg_deliveries - self.ping_consumer_invterval_ms = 30000 + self.ping_consumer_interval_ms = 30000 if error_callback is None: error_callback = default_error_handler self.t_ping = asyncio.create_task(self.__ping_consumer(error_callback)) @@ -116,7 +116,45 @@ async def __consume_dls(self): return async def fetch(self, batch_size: int = 10): - """Fetch a batch of messages.""" + """ + Fetch a batch of messages. + + Returns a list of Message objects. If the connection is + not active or no messages are recieved before timing out, + an empty list is returned. + + Example: + + import asyncio + + from memphis import Memphis + + async def main(/, host, username, password, station): + memphis = Memphis() + await memphis.connect(host=host, + username=username, + password=password) + + consumer = await memphis.consumer(station_name=station, + consumer_name="test-consumer", + consumer_group="test-consumer-group") + + while True: + batch = await consumer.fetch() + print("Recieved {} messages".format(len(batch))) + for msg in batch: + serialized_record = msg.get_data() + print("Message:", serialized_record) + + await memphis.close() + + if __name__ == '__main__': + asyncio.run(main(host=host, + username=username, + password=password, + station=station)) + + """ messages = [] if self.connection.is_connection_active: try: @@ -150,15 +188,15 @@ async def fetch(self, batch_size: int = 10): Message(msg, self.connection, self.consumer_group)) return messages except Exception as e: - if "timeout" not in str(e): + if "timeout" not in str(e).lower(): raise MemphisError(str(e)) from e - else: - return messages + + return messages async def __ping_consumer(self, callback): while True: try: - await asyncio.sleep(self.ping_consumer_invterval_ms / 1000) + await asyncio.sleep(self.ping_consumer_interval_ms / 1000) consumer_group = get_internal_name(self.consumer_group) await self.connection.broker_connection.consumer_info( self.station_name, consumer_group, timeout=30 From e2204b4a0262ff04b63431f7c6ed4e9b652cd97f Mon Sep 17 00:00:00 2001 From: Shay Bratslavsky Date: Sun, 9 Jul 2023 13:47:21 +0300 Subject: [PATCH 03/12] add connid ti destroy prodcuer consumer (#176) --- memphis/consumer.py | 4 +++- memphis/producer.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index 9a6c708..9dcfca8 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -218,7 +218,9 @@ async def destroy(self): destroy_consumer_req = { "name": self.consumer_name, "station_name": self.station_name, - "username": self.connection.username + "username": self.connection.username, + "connection_id": self.connection.connection_id, + "req_version": 1, } consumer_name = json.dumps( destroy_consumer_req, indent=2).encode("utf-8") diff --git a/memphis/producer.py b/memphis/producer.py index f9e7f35..fb4d7fe 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -279,7 +279,9 @@ async def destroy(self): destroy_producer_req = { "name": self.producer_name, "station_name": self.station_name, - "username": self.connection.username + "username": self.connection.username, + "connection_id": self.connection.connection_id, + "req_version": 1, } producer_name = json.dumps(destroy_producer_req).encode("utf-8") From bb2640359b7b2a02068745d1d2fca0edd4eecb43 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 9 Jul 2023 17:14:30 +0300 Subject: [PATCH 04/12] version update --- setup.py | 4 ++-- version-beta.conf | 2 +- version.conf | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index bd1ed91..51def80 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( name="memphis-py", packages=["memphis"], - version="1.0.4", + version="1.0.5", license="Apache-2.0", description="A powerful messaging platform for modern developers", long_description=long_description, @@ -17,7 +17,7 @@ author="Memphis.dev", author_email="team@memphis.dev", url="https://github.com/memphisdev/memphis.py", - download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.4.tar.gz", + download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.5.tar.gz", keywords=["message broker", "devtool", "streaming", "data"], install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core"], classifiers=[ diff --git a/version-beta.conf b/version-beta.conf index ece61c6..f9cbc01 100644 --- a/version-beta.conf +++ b/version-beta.conf @@ -1 +1 @@ -1.0.6 \ No newline at end of file +1.0.7 \ No newline at end of file diff --git a/version.conf b/version.conf index a6a3a43..1464c52 100644 --- a/version.conf +++ b/version.conf @@ -1 +1 @@ -1.0.4 \ No newline at end of file +1.0.5 \ No newline at end of file From f62768ebc71b1ffa414f1dee892dc0ce255e3865 Mon Sep 17 00:00:00 2001 From: Adarsh jaiswal Date: Tue, 11 Jul 2023 23:58:57 +0530 Subject: [PATCH 05/12] Fix : no-member error for Exception status_code member (#1) Bug Fix for no-member error for Exception status_code member in which i have fixed the following erros: memphis/producer.py:210:45: E1101: Instance of 'Exception' has no 'status_code' member (no-member) and after fixing that, this error occured on the same line 269:12: R1720: Unnecessary "else" after "raise", remove the "else" and de-indent the code inside it (no-else-raise) --- memphis/producer.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/memphis/producer.py b/memphis/producer.py index fb4d7fe..a89df34 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -265,13 +265,12 @@ async def produce( schemaverse_fail_alert_type, ) raise e - except Exception as e: + except Exception as e: # pylint: disable-next=no-member if hasattr(e, "status_code") and e.status_code == "503": raise MemphisError( "Produce operation has failed, please check whether Station/Producer still exist" ) - else: - raise MemphisError(str(e)) from e + raise MemphisError(str(e)) from e async def destroy(self): """Destroy the producer.""" From d664b62b2baa663856b0d10452b6b7d165f0e2ba Mon Sep 17 00:00:00 2001 From: RJ Nowling <130711295+rnowling-memphis@users.noreply.github.com> Date: Tue, 11 Jul 2023 15:17:10 -0500 Subject: [PATCH 06/12] Fix doc example syntax for Consumer.fetch() --- memphis/consumer.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index 9dcfca8..e191e8d 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -129,7 +129,7 @@ async def fetch(self, batch_size: int = 10): from memphis import Memphis - async def main(/, host, username, password, station): + async def main(host, username, password, station): memphis = Memphis() await memphis.connect(host=host, username=username, @@ -149,10 +149,10 @@ async def main(/, host, username, password, station): await memphis.close() if __name__ == '__main__': - asyncio.run(main(host=host, - username=username, - password=password, - station=station)) + asyncio.run(main(host, + username, + password, + station)) """ messages = [] From ae4514cb2974eb457d6f2c766f020a84969ee3aa Mon Sep 17 00:00:00 2001 From: Idan Asulin <74712806+idanasulinmemphis@users.noreply.github.com> Date: Wed, 12 Jul 2023 09:23:12 +0300 Subject: [PATCH 07/12] Update CODEOWNERS (#180) --- .github/CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 402d19f..ed33d6b 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @idanasulinmemphis @shohamroditimemphis \ No newline at end of file +* @idanasulinmemphis From c8a89555e9ad18774ab4449f512b1bd6da21ed2e Mon Sep 17 00:00:00 2001 From: Idan Asulin <74712806+idanasulinmemphis@users.noreply.github.com> Date: Wed, 12 Jul 2023 09:43:58 +0300 Subject: [PATCH 08/12] Update CODEOWNERS --- .github/CODEOWNERS | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index ed33d6b..dec8b2c 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @idanasulinmemphis +* @idanasulinmemphis @rnowling-memphis From 75483dc5781f6acf3bc57c037a2c5a9e74628f07 Mon Sep 17 00:00:00 2001 From: Adarsh-jaiss Date: Thu, 13 Jul 2023 09:49:37 +0530 Subject: [PATCH 09/12] Add examples to docstrings for consumer methods --- memphis/consumer.py | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index e191e8d..ef036cd 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -53,7 +53,45 @@ def set_context(self, context): self.context = context def consume(self, callback): - """Consume events.""" + """ + Consume events. + + This method starts consuming events from the specified station and invokes the provided callback function for each batch of messages received. + + Parameters: + callback (function): A function that will be called with each batch of messages received. The function should have the following signature: + - `callback(messages: List[Message], error: Optional[MemphisError], context: Dict) -> Awaitable[None]` + - `messages`: A list of `Message` objects representing the batch of messages received. + - `error`: An optional `MemphisError` object if there was an error while consuming the messages. + - `context`: A dictionary representing the context that was set using the `set_context()` method. + + Example: + import asyncio + from memphis import Memphis + + async def message_handler(messages, error, context): + if error: + print(f"Error occurred: {error}") + return + + for message in messages: + print(f"Received message: {message}") + + async def main(): + memphis = Memphis() + await memphis.connect(host='localhost', username='user', password='pass') + consumer = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') + consumer.set_context({'key': 'value'}) + consumer.consume(callback=message_handler) + + # Keep the event loop running + while True: + await asyncio.sleep(1) + + asyncio.run(main()) + + + """ self.dls_callback_func = callback self.t_consume = asyncio.create_task(self.__consume(callback)) From 3604fcbd2d0e16335cda426ddf66f2a902c2a5a6 Mon Sep 17 00:00:00 2001 From: Adarsh jaiswal Date: Fri, 14 Jul 2023 18:54:12 +0530 Subject: [PATCH 10/12] Added required changes in comsumer.py - removed the first line and blank lines - Used the named parameters --- memphis/consumer.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index ef036cd..eeb0737 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -54,8 +54,6 @@ def set_context(self, context): def consume(self, callback): """ - Consume events. - This method starts consuming events from the specified station and invokes the provided callback function for each batch of messages received. Parameters: @@ -80,17 +78,14 @@ async def message_handler(messages, error, context): async def main(): memphis = Memphis() await memphis.connect(host='localhost', username='user', password='pass') - consumer = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') + callback = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') consumer.set_context({'key': 'value'}) consumer.consume(callback=message_handler) # Keep the event loop running while True: await asyncio.sleep(1) - asyncio.run(main()) - - """ self.dls_callback_func = callback self.t_consume = asyncio.create_task(self.__consume(callback)) From 5ce4f76a4eb4dee23db97fdb07b5b68baea6b50e Mon Sep 17 00:00:00 2001 From: Adarsh jaiswal Date: Sat, 15 Jul 2023 00:26:35 +0530 Subject: [PATCH 11/12] Removed callback parameter --- memphis/consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index eeb0737..c1731ab 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -78,9 +78,9 @@ async def message_handler(messages, error, context): async def main(): memphis = Memphis() await memphis.connect(host='localhost', username='user', password='pass') - callback = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') + consumer = await memphis.consumer(station_name='my_station', consumer_name='my_consumer', consumer_group='my_group') consumer.set_context({'key': 'value'}) - consumer.consume(callback=message_handler) + consumer.consume(message_handler) # Keep the event loop running while True: From 5da04d2624bdb04ccad1c590710a4bf687776d93 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Mon, 17 Jul 2023 22:28:00 +0300 Subject: [PATCH 12/12] version update --- setup.py | 4 ++-- version-beta.conf | 2 +- version.conf | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 51def80..7261c02 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( name="memphis-py", packages=["memphis"], - version="1.0.5", + version="1.0.6", license="Apache-2.0", description="A powerful messaging platform for modern developers", long_description=long_description, @@ -17,7 +17,7 @@ author="Memphis.dev", author_email="team@memphis.dev", url="https://github.com/memphisdev/memphis.py", - download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.5.tar.gz", + download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.6.tar.gz", keywords=["message broker", "devtool", "streaming", "data"], install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core"], classifiers=[ diff --git a/version-beta.conf b/version-beta.conf index f9cbc01..337a6a8 100644 --- a/version-beta.conf +++ b/version-beta.conf @@ -1 +1 @@ -1.0.7 \ No newline at end of file +1.0.8 \ No newline at end of file diff --git a/version.conf b/version.conf index 1464c52..ece61c6 100644 --- a/version.conf +++ b/version.conf @@ -1 +1 @@ -1.0.5 \ No newline at end of file +1.0.6 \ No newline at end of file