From a5ac0d5a7e66f28b00a4f4d9860162d31143ce13 Mon Sep 17 00:00:00 2001 From: Pedro Algarvio Date: Mon, 12 Dec 2022 19:08:06 +0000 Subject: [PATCH] Start/Stop the event listener as a regular session fixture Signed-off-by: Pedro Algarvio --- src/saltfactories/plugins/event_listener.py | 193 ++++++++++---------- 1 file changed, 93 insertions(+), 100 deletions(-) diff --git a/src/saltfactories/plugins/event_listener.py b/src/saltfactories/plugins/event_listener.py index 31e5ffcc..b83d26b7 100644 --- a/src/saltfactories/plugins/event_listener.py +++ b/src/saltfactories/plugins/event_listener.py @@ -154,79 +154,84 @@ def __attrs_post_init__(self): def _run(self): context = zmq.Context() puller = context.socket(zmq.PULL) # pylint: disable=no-member - log.debug("%s Binding PULL socket to %s", self, self.address) - puller.bind(self.address) - if msgpack.version >= (0, 5, 2): - msgpack_kwargs = {"raw": False} - else: # pragma: no cover - msgpack_kwargs = {"encoding": "utf-8"} - log.debug("%s started", self) - self.running_event.set() - while self.running_event.is_set(): - payload = puller.recv() - if payload == self.sentinel: - log.info("%s Received stop sentinel...", self) - self.sentinel_event.set() - break - try: - decoded = msgpack.loads(payload, **msgpack_kwargs) - except ValueError: # pragma: no cover - log.error( - "%s Failed to msgpack.load message with payload: %s", - self, - payload, - exc_info=True, - ) - continue - if decoded is None: - log.info("%s Received stop sentinel...", self) - self.sentinel_event.set() - break - try: - daemon_id, tag, data = decoded - # Salt's event data has some "private" keys, for example, "_stamp" which - # get in the way of direct assertions. - # We'll just store a full_data attribute and clean up the regular data of these keys - full_data = copy.deepcopy(data) - for key in list(data): - if key.startswith("_"): - data.pop(key) - event = Event( - daemon_id=daemon_id, - tag=tag, - stamp=full_data["_stamp"], - data=data, - full_data=full_data, - expire_seconds=self.timeout, - ) - log.info("%s received event: %s", self, event) - self.store.append(event) - if tag == "salt/auth": - auth_event_callback = self.auth_event_handlers.get(daemon_id) - if auth_event_callback: - try: - auth_event_callback(data) - except Exception as exc: # pragma: no cover pylint: disable=broad-except - log.error( - "%s Error calling %r: %s", - self, - auth_event_callback, - exc, - exc_info=True, - ) - log.debug("%s store size after event received: %d", self, len(self.store)) - except Exception: # pragma: no cover pylint: disable=broad-except - log.error("%s Something funky happened", self, exc_info=True) - puller.close(0) - context.term() - # We need to keep these events stored, restart zmq socket - context = zmq.Context() - puller = context.socket(zmq.PULL) # pylint: disable=no-member - log.debug("%s Binding PULL socket to %s", self, self.address) - puller.bind(self.address) - puller.close(1500) - context.term() - log.debug("%s is no longer running", self) + try: + log.debug("%s Binding PULL socket to %s", self, self.address) + puller.bind(self.address) + if msgpack.version >= (0, 5, 2): + msgpack_kwargs = {"raw": False} + else: # pragma: no cover + msgpack_kwargs = {"encoding": "utf-8"} + log.debug("%s started", self) + self.running_event.set() + while self.running_event.is_set(): + payload = puller.recv() + if payload == self.sentinel: + log.info("%s Received stop sentinel...", self) + self.sentinel_event.set() + break + try: + decoded = msgpack.loads(payload, **msgpack_kwargs) + except ValueError: # pragma: no cover + log.error( + "%s Failed to msgpack.load message with payload: %s", + self, + payload, + exc_info=True, + ) + continue + if decoded is None: + log.info("%s Received stop sentinel...", self) + self.sentinel_event.set() + break + try: + daemon_id, tag, data = decoded + # Salt's event data has some "private" keys, for example, "_stamp" which + # get in the way of direct assertions. + # We'll just store a full_data attribute and clean up the regular data of these keys + full_data = copy.deepcopy(data) + for key in list(data): + if key.startswith("_"): + data.pop(key) + event = Event( + daemon_id=daemon_id, + tag=tag, + stamp=full_data["_stamp"], + data=data, + full_data=full_data, + expire_seconds=self.timeout, + ) + log.info("%s received event: %s", self, event) + self.store.append(event) + if tag == "salt/auth": + auth_event_callback = self.auth_event_handlers.get(daemon_id) + if auth_event_callback: + try: + auth_event_callback(data) + except Exception as exc: # pragma: no cover pylint: disable=broad-except + log.error( + "%s Error calling %r: %s", + self, + auth_event_callback, + exc, + exc_info=True, + ) + log.debug("%s store size after event received: %d", self, len(self.store)) + except Exception: # pragma: no cover pylint: disable=broad-except + log.error("%s Something funky happened", self, exc_info=True) + puller.close(0) + context.term() + # We need to keep these events stored, restart zmq socket + context = zmq.Context() + puller = context.socket(zmq.PULL) # pylint: disable=no-member + log.debug("%s Binding PULL socket to %s", self, self.address) + puller.bind(self.address) + except Exception as exc: + log.error("%s Something funky happened", self, exc_info=True) + raise exc from None + finally: + puller.close(1500) + context.term() + log.debug("%s is no longer running", self) def _cleanup(self): cleanup_at = time.time() + 30 @@ -249,6 +254,19 @@ def _cleanup(self): self.store.remove(event) log.debug("%s store size after cleanup: %s", self, len(self.store)) + def __enter__(self): + """ + Context manager support to start the event listener. + """ + self.start() + return self + + def __exit__(self, *_): + """ + Context manager support to stop the event listener. + """ + self.stop() + def start(self): """ Start the event listener. @@ -481,30 +499,5 @@ def test_send(event_listener, salt_master, salt_minion, salt_call_cli): assert event.data["cmd"] == "_minion_event" assert "event.fire" in event.data["data"] """ - return request.config.pluginmanager.get_plugin("saltfactories-event-listener") - - -def pytest_configure(config): - """ - Configure the plugins. - """ - _event_listener = EventListener() - config.pluginmanager.register(_event_listener, "saltfactories-event-listener") - - -@pytest.hookimpl(tryfirst=True) -def pytest_sessionstart(session): - """ - Start the event listener plugin. - """ - _event_listener = session.config.pluginmanager.get_plugin("saltfactories-event-listener") - _event_listener.start() - - -@pytest.hookimpl(trylast=True) -def pytest_sessionfinish(session): - """ - Stop the event listener plugin. - """ - _event_listener = session.config.pluginmanager.get_plugin("saltfactories-event-listener") - _event_listener.stop() + with EventListener() as event_listener: + yield event_listener