Skip to content

Commit

Permalink
Start/Stop the event listener as a regular session fixture
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Algarvio <palgarvio@vmware.com>
  • Loading branch information
s0undt3ch committed Dec 15, 2022
1 parent 6dcb91d commit a5ac0d5
Showing 1 changed file with 93 additions and 100 deletions.
193 changes: 93 additions & 100 deletions src/saltfactories/plugins/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,79 +154,84 @@ def __attrs_post_init__(self):
def _run(self):
context = zmq.Context()
puller = context.socket(zmq.PULL) # pylint: disable=no-member
log.debug("%s Binding PULL socket to %s", self, self.address)
puller.bind(self.address)
if msgpack.version >= (0, 5, 2):
msgpack_kwargs = {"raw": False}
else: # pragma: no cover
msgpack_kwargs = {"encoding": "utf-8"}
log.debug("%s started", self)
self.running_event.set()
while self.running_event.is_set():
payload = puller.recv()
if payload == self.sentinel:
log.info("%s Received stop sentinel...", self)
self.sentinel_event.set()
break
try:
decoded = msgpack.loads(payload, **msgpack_kwargs)
except ValueError: # pragma: no cover
log.error(
"%s Failed to msgpack.load message with payload: %s",
self,
payload,
exc_info=True,
)
continue
if decoded is None:
log.info("%s Received stop sentinel...", self)
self.sentinel_event.set()
break
try:
daemon_id, tag, data = decoded
# Salt's event data has some "private" keys, for example, "_stamp" which
# get in the way of direct assertions.
# We'll just store a full_data attribute and clean up the regular data of these keys
full_data = copy.deepcopy(data)
for key in list(data):
if key.startswith("_"):
data.pop(key)
event = Event(
daemon_id=daemon_id,
tag=tag,
stamp=full_data["_stamp"],
data=data,
full_data=full_data,
expire_seconds=self.timeout,
)
log.info("%s received event: %s", self, event)
self.store.append(event)
if tag == "salt/auth":
auth_event_callback = self.auth_event_handlers.get(daemon_id)
if auth_event_callback:
try:
auth_event_callback(data)
except Exception as exc: # pragma: no cover pylint: disable=broad-except
log.error(
"%s Error calling %r: %s",
self,
auth_event_callback,
exc,
exc_info=True,
)
log.debug("%s store size after event received: %d", self, len(self.store))
except Exception: # pragma: no cover pylint: disable=broad-except
log.error("%s Something funky happened", self, exc_info=True)
puller.close(0)
context.term()
# We need to keep these events stored, restart zmq socket
context = zmq.Context()
puller = context.socket(zmq.PULL) # pylint: disable=no-member
log.debug("%s Binding PULL socket to %s", self, self.address)
puller.bind(self.address)
puller.close(1500)
context.term()
log.debug("%s is no longer running", self)
try:
log.debug("%s Binding PULL socket to %s", self, self.address)
puller.bind(self.address)
if msgpack.version >= (0, 5, 2):
msgpack_kwargs = {"raw": False}
else: # pragma: no cover
msgpack_kwargs = {"encoding": "utf-8"}
log.debug("%s started", self)
self.running_event.set()
while self.running_event.is_set():
payload = puller.recv()
if payload == self.sentinel:
log.info("%s Received stop sentinel...", self)
self.sentinel_event.set()
break
try:
decoded = msgpack.loads(payload, **msgpack_kwargs)
except ValueError: # pragma: no cover
log.error(
"%s Failed to msgpack.load message with payload: %s",
self,
payload,
exc_info=True,
)
continue
if decoded is None:
log.info("%s Received stop sentinel...", self)
self.sentinel_event.set()
break
try:
daemon_id, tag, data = decoded
# Salt's event data has some "private" keys, for example, "_stamp" which
# get in the way of direct assertions.
# We'll just store a full_data attribute and clean up the regular data of these keys
full_data = copy.deepcopy(data)
for key in list(data):
if key.startswith("_"):
data.pop(key)
event = Event(
daemon_id=daemon_id,
tag=tag,
stamp=full_data["_stamp"],
data=data,
full_data=full_data,
expire_seconds=self.timeout,
)
log.info("%s received event: %s", self, event)
self.store.append(event)
if tag == "salt/auth":
auth_event_callback = self.auth_event_handlers.get(daemon_id)
if auth_event_callback:
try:
auth_event_callback(data)
except Exception as exc: # pragma: no cover pylint: disable=broad-except
log.error(
"%s Error calling %r: %s",
self,
auth_event_callback,
exc,
exc_info=True,
)
log.debug("%s store size after event received: %d", self, len(self.store))
except Exception: # pragma: no cover pylint: disable=broad-except
log.error("%s Something funky happened", self, exc_info=True)
puller.close(0)
context.term()
# We need to keep these events stored, restart zmq socket
context = zmq.Context()
puller = context.socket(zmq.PULL) # pylint: disable=no-member
log.debug("%s Binding PULL socket to %s", self, self.address)
puller.bind(self.address)
except Exception as exc:
log.error("%s Something funky happened", self, exc_info=True)
raise exc from None
finally:
puller.close(1500)
context.term()
log.debug("%s is no longer running", self)

def _cleanup(self):
cleanup_at = time.time() + 30
Expand All @@ -249,6 +254,19 @@ def _cleanup(self):
self.store.remove(event)
log.debug("%s store size after cleanup: %s", self, len(self.store))

def __enter__(self):
"""
Context manager support to start the event listener.
"""
self.start()
return self

def __exit__(self, *_):
"""
Context manager support to stop the event listener.
"""
self.stop()

def start(self):
"""
Start the event listener.
Expand Down Expand Up @@ -481,30 +499,5 @@ def test_send(event_listener, salt_master, salt_minion, salt_call_cli):
assert event.data["cmd"] == "_minion_event"
assert "event.fire" in event.data["data"]
"""
return request.config.pluginmanager.get_plugin("saltfactories-event-listener")


def pytest_configure(config):
"""
Configure the plugins.
"""
_event_listener = EventListener()
config.pluginmanager.register(_event_listener, "saltfactories-event-listener")


@pytest.hookimpl(tryfirst=True)
def pytest_sessionstart(session):
"""
Start the event listener plugin.
"""
_event_listener = session.config.pluginmanager.get_plugin("saltfactories-event-listener")
_event_listener.start()


@pytest.hookimpl(trylast=True)
def pytest_sessionfinish(session):
"""
Stop the event listener plugin.
"""
_event_listener = session.config.pluginmanager.get_plugin("saltfactories-event-listener")
_event_listener.stop()
with EventListener() as event_listener:
yield event_listener

0 comments on commit a5ac0d5

Please sign in to comment.