From 1c790ffc334980ec3478e3e00eb19fa36f060723 Mon Sep 17 00:00:00 2001 From: shohamroditimemphis Date: Sun, 28 Aug 2022 02:22:43 +0300 Subject: [PATCH 1/2] change the publish to request in destroy methods --- memphis/memphis.py | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 0e82d4b..1e400f9 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -21,8 +21,8 @@ from threading import Timer import asyncio -import retention_types -import storage_types +import memphis.retention_types as retention_types +import memphis.storage_types as storage_types class set_interval(): @@ -264,8 +264,12 @@ async def destroy(self): "factory_name":self.name } factory_name = json.dumps(nameReq, indent=2).encode('utf-8') - await self.connection.broker_connection.publish('$memphis_factory_destructions', factory_name) - + res = await self.connection.broker_manager.request('$memphis_factory_destructions', factory_name) + error = res.data.decode('utf-8') + if "mongo: no documents in result" in error: + print('Producer already destroyed') + elif error != "": + raise Exception(error) except Exception as e: raise Exception(e) @@ -284,7 +288,12 @@ async def destroy(self): "station_name":self.name } station_name = json.dumps(nameReq, indent=2).encode('utf-8') - await self.connection.broker_connection.publish('$memphis_station_destructions', station_name) + res = await self.connection.broker_manager.request('$memphis_station_destructions', station_name) + error = res.data.decode('utf-8') + if "mongo: no documents in result" in error: + print('Producer already destroyed') + elif error != "": + raise Exception(error) except Exception as e: raise Exception(e) @@ -321,11 +330,16 @@ async def destroy(self): try: destroyProducerReq = { "name": self.producer_name, - "station_name":self.station_name + "station_name": self.station_name } - producer_name = json.dumps(destroyProducerReq, indent=2).encode('utf-8') - await self.connection.broker_connection.publish('$memphis_producer_destructions', producer_name) - + + producer_name = json.dumps(destroyProducerReq).encode('utf-8') + res = await self.connection.broker_manager.request('$memphis_producer_destructions', producer_name) + error = res.data.decode('utf-8') + if "mongo: no documents in result" in error: + print('Producer already destroyed') + elif error != "": + raise Exception(error) except Exception as e: raise Exception(e) @@ -395,7 +409,12 @@ async def destroy(self): "station_name":self.station_name } consumer_name = json.dumps(destroyConsumerReq, indent=2).encode('utf-8') - await self.connection.broker_connection.publish('$memphis_consumer_destructions', consumer_name) + res = await self.connection.broker_manager.request('$memphis_consumer_destructions', consumer_name) + error = res.data.decode('utf-8') + if "mongo: no documents in result" in error: + print('Producer already destroyed') + elif error != "": + raise Exception(error) except Exception as e: raise Exception(e) From 7f67e0b03a8e938f06a9fb707af8984a704eceab Mon Sep 17 00:00:00 2001 From: shay23b Date: Sun, 28 Aug 2022 10:43:50 +0300 Subject: [PATCH 2/2] changed destroy exception --- memphis/memphis.py | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/memphis/memphis.py b/memphis/memphis.py index 1e400f9..4f3afc2 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -266,9 +266,7 @@ async def destroy(self): factory_name = json.dumps(nameReq, indent=2).encode('utf-8') res = await self.connection.broker_manager.request('$memphis_factory_destructions', factory_name) error = res.data.decode('utf-8') - if "mongo: no documents in result" in error: - print('Producer already destroyed') - elif error != "": + if error != "" and not "not exist" in error: raise Exception(error) except Exception as e: raise Exception(e) @@ -290,11 +288,8 @@ async def destroy(self): station_name = json.dumps(nameReq, indent=2).encode('utf-8') res = await self.connection.broker_manager.request('$memphis_station_destructions', station_name) error = res.data.decode('utf-8') - if "mongo: no documents in result" in error: - print('Producer already destroyed') - elif error != "": + if error != "" and not "not exist" in error: raise Exception(error) - except Exception as e: raise Exception(e) @@ -336,9 +331,7 @@ async def destroy(self): producer_name = json.dumps(destroyProducerReq).encode('utf-8') res = await self.connection.broker_manager.request('$memphis_producer_destructions', producer_name) error = res.data.decode('utf-8') - if "mongo: no documents in result" in error: - print('Producer already destroyed') - elif error != "": + if error != "" and not "not exist" in error: raise Exception(error) except Exception as e: raise Exception(e) @@ -411,11 +404,8 @@ async def destroy(self): consumer_name = json.dumps(destroyConsumerReq, indent=2).encode('utf-8') res = await self.connection.broker_manager.request('$memphis_consumer_destructions', consumer_name) error = res.data.decode('utf-8') - if "mongo: no documents in result" in error: - print('Producer already destroyed') - elif error != "": + if error != "" and not "not exist" in error: raise Exception(error) - except Exception as e: raise Exception(e)