From 99b7db0ed0b404364eddd8f6834feb107219f7dd Mon Sep 17 00:00:00 2001 From: Yaniv Ben Hemo <70286779+yanivbh1@users.noreply.github.com> Date: Wed, 28 Jun 2023 17:16:29 +0300 Subject: [PATCH 1/4] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2cd5113..b5f153a 100644 --- a/README.md +++ b/README.md @@ -65,10 +65,10 @@ async def main(): await memphis.connect( host="", username="", - account_id="", # You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored + account_id=, # You can find it on the profile page in the Memphis UI. This field should be sent only on the cloud version of Memphis, otherwise it will be ignored connection_token="", # you will get it on application type user creation password="", # depends on how Memphis deployed - default is connection token-based authentication - port="", # defaults to 6666 + port=, # defaults to 6666 reconnect=True, # defaults to True max_reconnect=3, # defaults to 3 reconnect_interval_ms=1500, # defaults to 1500 From 4424d4219e91090293903afb6e53c5efafefede4 Mon Sep 17 00:00:00 2001 From: RJ Nowling <130711295+rnowling-memphis@users.noreply.github.com> Date: Wed, 28 Jun 2023 16:11:59 -0500 Subject: [PATCH 2/4] Fix return none (#172) * Fix case where consumer.fetch() returns None * Add docs * Begin adding documentation --- memphis/consumer.py | 50 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index c6e3531..9a6c708 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -35,7 +35,7 @@ def __init__( self.batch_max_time_to_wait_ms = batch_max_time_to_wait_ms self.max_ack_time_ms = max_ack_time_ms self.max_msg_deliveries = max_msg_deliveries - self.ping_consumer_invterval_ms = 30000 + self.ping_consumer_interval_ms = 30000 if error_callback is None: error_callback = default_error_handler self.t_ping = asyncio.create_task(self.__ping_consumer(error_callback)) @@ -116,7 +116,45 @@ async def __consume_dls(self): return async def fetch(self, batch_size: int = 10): - """Fetch a batch of messages.""" + """ + Fetch a batch of messages. + + Returns a list of Message objects. If the connection is + not active or no messages are recieved before timing out, + an empty list is returned. + + Example: + + import asyncio + + from memphis import Memphis + + async def main(/, host, username, password, station): + memphis = Memphis() + await memphis.connect(host=host, + username=username, + password=password) + + consumer = await memphis.consumer(station_name=station, + consumer_name="test-consumer", + consumer_group="test-consumer-group") + + while True: + batch = await consumer.fetch() + print("Recieved {} messages".format(len(batch))) + for msg in batch: + serialized_record = msg.get_data() + print("Message:", serialized_record) + + await memphis.close() + + if __name__ == '__main__': + asyncio.run(main(host=host, + username=username, + password=password, + station=station)) + + """ messages = [] if self.connection.is_connection_active: try: @@ -150,15 +188,15 @@ async def fetch(self, batch_size: int = 10): Message(msg, self.connection, self.consumer_group)) return messages except Exception as e: - if "timeout" not in str(e): + if "timeout" not in str(e).lower(): raise MemphisError(str(e)) from e - else: - return messages + + return messages async def __ping_consumer(self, callback): while True: try: - await asyncio.sleep(self.ping_consumer_invterval_ms / 1000) + await asyncio.sleep(self.ping_consumer_interval_ms / 1000) consumer_group = get_internal_name(self.consumer_group) await self.connection.broker_connection.consumer_info( self.station_name, consumer_group, timeout=30 From e2204b4a0262ff04b63431f7c6ed4e9b652cd97f Mon Sep 17 00:00:00 2001 From: Shay Bratslavsky Date: Sun, 9 Jul 2023 13:47:21 +0300 Subject: [PATCH 3/4] add connid ti destroy prodcuer consumer (#176) --- memphis/consumer.py | 4 +++- memphis/producer.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/memphis/consumer.py b/memphis/consumer.py index 9a6c708..9dcfca8 100644 --- a/memphis/consumer.py +++ b/memphis/consumer.py @@ -218,7 +218,9 @@ async def destroy(self): destroy_consumer_req = { "name": self.consumer_name, "station_name": self.station_name, - "username": self.connection.username + "username": self.connection.username, + "connection_id": self.connection.connection_id, + "req_version": 1, } consumer_name = json.dumps( destroy_consumer_req, indent=2).encode("utf-8") diff --git a/memphis/producer.py b/memphis/producer.py index f9e7f35..fb4d7fe 100644 --- a/memphis/producer.py +++ b/memphis/producer.py @@ -279,7 +279,9 @@ async def destroy(self): destroy_producer_req = { "name": self.producer_name, "station_name": self.station_name, - "username": self.connection.username + "username": self.connection.username, + "connection_id": self.connection.connection_id, + "req_version": 1, } producer_name = json.dumps(destroy_producer_req).encode("utf-8") From bb2640359b7b2a02068745d1d2fca0edd4eecb43 Mon Sep 17 00:00:00 2001 From: idanasulinStrech Date: Sun, 9 Jul 2023 17:14:30 +0300 Subject: [PATCH 4/4] version update --- setup.py | 4 ++-- version-beta.conf | 2 +- version.conf | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index bd1ed91..51def80 100644 --- a/setup.py +++ b/setup.py @@ -8,7 +8,7 @@ setup( name="memphis-py", packages=["memphis"], - version="1.0.4", + version="1.0.5", license="Apache-2.0", description="A powerful messaging platform for modern developers", long_description=long_description, @@ -17,7 +17,7 @@ author="Memphis.dev", author_email="team@memphis.dev", url="https://github.com/memphisdev/memphis.py", - download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.4.tar.gz", + download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/1.0.5.tar.gz", keywords=["message broker", "devtool", "streaming", "data"], install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core"], classifiers=[ diff --git a/version-beta.conf b/version-beta.conf index ece61c6..f9cbc01 100644 --- a/version-beta.conf +++ b/version-beta.conf @@ -1 +1 @@ -1.0.6 \ No newline at end of file +1.0.7 \ No newline at end of file diff --git a/version.conf b/version.conf index a6a3a43..1464c52 100644 --- a/version.conf +++ b/version.conf @@ -1 +1 @@ -1.0.4 \ No newline at end of file +1.0.5 \ No newline at end of file