diff --git a/memphis/memphis.py b/memphis/memphis.py index 0e82d4b..4f3afc2 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -21,8 +21,8 @@ from threading import Timer import asyncio -import retention_types -import storage_types +import memphis.retention_types as retention_types +import memphis.storage_types as storage_types class set_interval(): @@ -264,8 +264,10 @@ async def destroy(self): "factory_name":self.name } factory_name = json.dumps(nameReq, indent=2).encode('utf-8') - await self.connection.broker_connection.publish('$memphis_factory_destructions', factory_name) - + res = await self.connection.broker_manager.request('$memphis_factory_destructions', factory_name) + error = res.data.decode('utf-8') + if error != "" and not "not exist" in error: + raise Exception(error) except Exception as e: raise Exception(e) @@ -284,8 +286,10 @@ async def destroy(self): "station_name":self.name } station_name = json.dumps(nameReq, indent=2).encode('utf-8') - await self.connection.broker_connection.publish('$memphis_station_destructions', station_name) - + res = await self.connection.broker_manager.request('$memphis_station_destructions', station_name) + error = res.data.decode('utf-8') + if error != "" and not "not exist" in error: + raise Exception(error) except Exception as e: raise Exception(e) @@ -321,11 +325,14 @@ async def destroy(self): try: destroyProducerReq = { "name": self.producer_name, - "station_name":self.station_name + "station_name": self.station_name } - producer_name = json.dumps(destroyProducerReq, indent=2).encode('utf-8') - await self.connection.broker_connection.publish('$memphis_producer_destructions', producer_name) - + + producer_name = json.dumps(destroyProducerReq).encode('utf-8') + res = await self.connection.broker_manager.request('$memphis_producer_destructions', producer_name) + error = res.data.decode('utf-8') + if error != "" and not "not exist" in error: + raise Exception(error) except Exception as e: raise Exception(e) @@ -395,8 +402,10 @@ async def destroy(self): "station_name":self.station_name } consumer_name = json.dumps(destroyConsumerReq, indent=2).encode('utf-8') - await self.connection.broker_connection.publish('$memphis_consumer_destructions', consumer_name) - + res = await self.connection.broker_manager.request('$memphis_consumer_destructions', consumer_name) + error = res.data.decode('utf-8') + if error != "" and not "not exist" in error: + raise Exception(error) except Exception as e: raise Exception(e)