diff --git a/servc/svc/com/bus/rabbitmq.py b/servc/svc/com/bus/rabbitmq.py index ad99794..e36dc05 100644 --- a/servc/svc/com/bus/rabbitmq.py +++ b/servc/svc/com/bus/rabbitmq.py @@ -37,8 +37,6 @@ class BusRabbitMQ(BusComponent): _conn: AsyncioConnection | BlockingConnection | None = None - _consumingArgs: Tuple[str, InputProcessor, OnConsuming | None, bool] | None = None - @property def isReady(self) -> bool: return ( @@ -83,15 +81,10 @@ def _close(self): def on_connection_closed(self, _conn: AsyncioConnection, reason: pika.exceptions): if reason == pika.exceptions.StreamLostError: + # Async connection always is impossible to reconstitute for some reason print(str(reason), flush=True) self._conn = None - - # if this happens while consuming, then we need to re establish the on-consuming method - if self._consumingArgs: - route, inputProcessor, onConsuming, bindEventExchange = ( - self._consumingArgs - ) - self.subscribe(route, inputProcessor, onConsuming, bindEventExchange) + exit(1) def get_channel(self, method: Callable | None, args: Tuple | None): if not self.isReady: @@ -205,7 +198,6 @@ def subscribe( # type: ignore return self.get_channel( self.subscribe, (route, inputProcessor, onConsuming, bindEventExchange) ) - self._consumingArgs = (route, inputProcessor, onConsuming, bindEventExchange) channel.add_on_close_callback(lambda _c, _r: self.close()) channel.add_on_cancel_callback(lambda _c: self.close())