From 8a1d2c6d8cf630e708a5119e210517c27ff9954c Mon Sep 17 00:00:00 2001 From: svetaStrech Date: Sun, 18 Sep 2022 17:57:59 +0300 Subject: [PATCH] fix sdk close bug --- memphis/memphis.py | 74 ++++++++++++++++++++++++---------------------- setup.cfg | 2 -- setup.py | 8 ++--- 3 files changed, 43 insertions(+), 41 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index a46e178..f5dfe77 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -49,7 +49,7 @@ class Memphis: def __init__(self): self.is_connection_active = False - + async def connect(self, host, username, connection_token, port=6666, reconnect=True, max_reconnect=10, reconnect_interval_ms=1500, timeout_ms=15000): """Creates connection with Memphis. Args: @@ -72,20 +72,19 @@ async def connect(self, host, username, connection_token, port=6666, reconnect=T self.timeout_ms = timeout_ms self.connection_id = self.__generateConnectionID() try: - self.broker_manager = await broker.connect(servers=self.host+":"+str(self.port), - allow_reconnect=self.reconnect, - reconnect_time_wait=self.reconnect_interval_ms/1000, - connect_timeout=self.timeout_ms/1000, - max_reconnect_attempts=self.max_reconnect, - token=self.connection_token, - name=self.connection_id + "::" + self.username, max_outstanding_pings=1) - + self.broker_manager = await broker.connect(servers=self.host+":"+str(self.port), + allow_reconnect=self.reconnect, + reconnect_time_wait=self.reconnect_interval_ms/1000, + connect_timeout=self.timeout_ms/1000, + max_reconnect_attempts=self.max_reconnect, + token=self.connection_token, + name=self.connection_id + "::" + self.username, max_outstanding_pings=1) + self.broker_connection = self.broker_manager.jetstream() self.is_connection_active = True except Exception as e: raise Exception(e) - async def station(self, name, retention_type=retention_types.MAX_MESSAGE_AGE_SECONDS, retention_value=604800, storage_type=storage_types.FILE, replicas=1, dedup_enabled=False, dedup_window_ms=0): """Creates a station. Args: @@ -102,7 +101,7 @@ async def station(self, name, retention_type=retention_types.MAX_MESSAGE_AGE_SEC try: if not self.is_connection_active: raise Exception("Connection is dead") - + createStationReq = { "name": name, "retention_type": retention_type, @@ -111,13 +110,14 @@ async def station(self, name, retention_type=retention_types.MAX_MESSAGE_AGE_SEC "replicas": replicas, "dedup_enabled": dedup_enabled, "dedup_window_in_ms": dedup_window_ms - } - create_station_req_bytes = json.dumps(createStationReq, indent=2).encode('utf-8') - err_msg = await self.broker_manager.request("$memphis_station_creations", create_station_req_bytes) - err_msg = err_msg.data.decode("utf-8") + } + create_station_req_bytes = json.dumps( + createStationReq, indent=2).encode('utf-8') + err_msg = await self.broker_manager.request("$memphis_station_creations", create_station_req_bytes) + err_msg = err_msg.data.decode("utf-8") if err_msg != "": - raise Exception(err_msg) + raise Exception(err_msg) return Station(self, name) except Exception as e: @@ -165,25 +165,25 @@ async def producer(self, station_name, producer_name): try: if not self.is_connection_active: raise Exception("Connection is dead") - + createProducerReq = { "name": producer_name, "station_name": station_name, "connection_id": self.connection_id, "producer_type": "application" - } - create_producer_req_bytes = json.dumps(createProducerReq, indent=2).encode('utf-8') - err_msg = await self.broker_manager.request("$memphis_producer_creations", create_producer_req_bytes) - err_msg = err_msg.data.decode("utf-8") + } + create_producer_req_bytes = json.dumps( + createProducerReq, indent=2).encode('utf-8') + err_msg = await self.broker_manager.request("$memphis_producer_creations", create_producer_req_bytes) + err_msg = err_msg.data.decode("utf-8") if err_msg != "": - raise Exception(err_msg) + raise Exception(err_msg) return Producer(self, producer_name, station_name) except Exception as e: raise Exception(e) - async def consumer(self, station_name, consumer_name, consumer_group="", pull_interval_ms=1000, batch_size=10, batch_max_time_to_wait_ms=5000, max_ack_time_ms=30000, max_msg_deliveries=10): """Creates a consumer. Args:. @@ -202,7 +202,7 @@ async def consumer(self, station_name, consumer_name, consumer_group="", pull_in if not self.is_connection_active: raise Exception("Connection is dead") cg = consumer_name if not consumer_group else consumer_group - + createConsumerReq = { 'name': consumer_name, "station_name": station_name, @@ -211,18 +211,19 @@ async def consumer(self, station_name, consumer_name, consumer_group="", pull_in "consumers_group": consumer_group, "max_ack_time_ms": max_ack_time_ms, "max_msg_deliveries": max_msg_deliveries - } + } - create_consumer_req_bytes = json.dumps(createConsumerReq, indent=2).encode('utf-8') - err_msg = await self.broker_manager.request("$memphis_consumer_creations", create_consumer_req_bytes) - err_msg = err_msg.data.decode("utf-8") + create_consumer_req_bytes = json.dumps( + createConsumerReq, indent=2).encode('utf-8') + err_msg = await self.broker_manager.request("$memphis_consumer_creations", create_consumer_req_bytes) + err_msg = err_msg.data.decode("utf-8") if err_msg != "": - raise Exception(err_msg) + raise Exception(err_msg) return Consumer(self, station_name, consumer_name, cg, pull_interval_ms, batch_size, batch_max_time_to_wait_ms, max_ack_time_ms, max_msg_deliveries) - + except Exception as e: - raise Exception(e) + raise Exception(e) class Station: @@ -235,7 +236,7 @@ async def destroy(self): """ try: nameReq = { - "station_name":self.name + "station_name": self.name } station_name = json.dumps(nameReq, indent=2).encode('utf-8') res = await self.connection.broker_manager.request('$memphis_station_destructions', station_name) @@ -279,7 +280,7 @@ async def destroy(self): "name": self.producer_name, "station_name": self.station_name } - + producer_name = json.dumps(destroyProducerReq).encode('utf-8') res = await self.connection.broker_manager.request('$memphis_producer_destructions', producer_name) error = res.data.decode('utf-8') @@ -347,13 +348,16 @@ async def __ping_consumer(self): async def destroy(self): """Destroy the consumer. """ + self.t_consume.cancel() + self.t_dlq.cancel() self.pull_interval_ms = None try: destroyConsumerReq = { "name": self.consumer_name, - "station_name":self.station_name + "station_name": self.station_name } - consumer_name = json.dumps(destroyConsumerReq, indent=2).encode('utf-8') + consumer_name = json.dumps( + destroyConsumerReq, indent=2).encode('utf-8') res = await self.connection.broker_manager.request('$memphis_consumer_destructions', consumer_name) error = res.data.decode('utf-8') if error != "" and not "not exist" in error: diff --git a/setup.cfg b/setup.cfg index 224a779..e69de29 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +0,0 @@ -[metadata] -description-file = README.md \ No newline at end of file diff --git a/setup.py b/setup.py index 4662b66..1ba1318 100644 --- a/setup.py +++ b/setup.py @@ -3,16 +3,16 @@ setup( name='memphis-py', packages=['memphis'], - version='0.1.9', + version='0.2.0', license='GPL', description='A powerful messaging platform for modern developers', - long_description = 'file: README.md', - long_description_content_type = 'text/markdown; charset=UTF-8; variant=GF', + long_description='file: README.md', + long_description_content_type='text/markdown; charset=UTF-8; variant=GF', readme="README.md", author='Memphis.dev', author_email='team@memphis.dev', url='https://github.com/memphisdev/memphis.py', - download_url='https://github.com/memphisdev/memphis.py/archive/refs/tags/v0.1.8.tar.gz', + download_url='https://github.com/memphisdev/memphis.py/archive/refs/tags/v0.2.0.tar.gz', keywords=['message broker', 'devtool', 'streaming', 'data'], install_requires=[ 'asyncio',