diff --git a/.pylint-spelling-words b/.pylint-spelling-words index 1cbb2e22..cc1eebf4 100644 --- a/.pylint-spelling-words +++ b/.pylint-spelling-words @@ -107,6 +107,7 @@ nocover nodeid noqa nox +onedir openbsd ordereddict os diff --git a/changelog/146.bugfix.rst b/changelog/146.bugfix.rst new file mode 100644 index 00000000..887a1093 --- /dev/null +++ b/changelog/146.bugfix.rst @@ -0,0 +1,5 @@ +Fixed Salt's deferred imports to allow onedir builds while not breaking non-onedir builds: + +* Additionally, stopped relying on `salt.utils.files` and `salt.utils.yaml` +* Stopped using `zmq` to forward events(this was where the breakage was showing) for a plain TCP implementation. +* The `event_listener` fixture is now started/stopped like a regular pytest fixture diff --git a/requirements/base.txt b/requirements/base.txt index fa0cdb70..c4c5d573 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -8,5 +8,5 @@ pytest-shell-utilities>=1.4.0 psutil pyyaml pyzmq -msgpack +msgpack>=0.5.2 virtualenv diff --git a/src/saltfactories/bases.py b/src/saltfactories/bases.py index 770196ac..111e66fb 100644 --- a/src/saltfactories/bases.py +++ b/src/saltfactories/bases.py @@ -593,7 +593,9 @@ def verify_config(cls, config): """ Verify the configuration dictionary. """ - import salt.utils.verify + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.utils.verify # pylint: disable=import-outside-toplevel salt.utils.verify.verify_env( cls._get_verify_config_entries(config), diff --git a/src/saltfactories/cli/cloud.py b/src/saltfactories/cli/cloud.py index 60ae15bc..62c6c412 100644 --- a/src/saltfactories/cli/cloud.py +++ b/src/saltfactories/cli/cloud.py @@ -26,6 +26,8 @@ def default_config(root_dir, master_id, defaults=None, overrides=None): """ Return the default configuration for the daemon. """ + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. import salt.utils.dictupdate if defaults is None: @@ -78,6 +80,8 @@ def verify_config(cls, config): """ Verify the configuration dictionary. """ + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. import salt.config import salt.utils.verify diff --git a/src/saltfactories/cli/key.py b/src/saltfactories/cli/key.py index 66e82f4c..66a68879 100644 --- a/src/saltfactories/cli/key.py +++ b/src/saltfactories/cli/key.py @@ -30,9 +30,12 @@ def _default___cli_log_level_supported__(self): try: # Salt >= 3005 return "--log-level" in SaltKeyOptionParser._console_log_level_cli_flags - except AttributeError: + except AttributeError: # pragma: no cover # Salt <= 3004 - return SaltKeyOptionParser._skip_console_logging_config_ is False + return ( + SaltKeyOptionParser._skip_console_logging_config_ # pylint: disable=no-member + is False + ) def get_minion_tgt(self, minion_tgt=None): """ diff --git a/src/saltfactories/cli/spm.py b/src/saltfactories/cli/spm.py index bf6e5ae9..fca14e9a 100644 --- a/src/saltfactories/cli/spm.py +++ b/src/saltfactories/cli/spm.py @@ -34,7 +34,9 @@ def default_config(root_dir, master_factory, defaults=None, overrides=None): """ Return the default configuration for the daemon. """ - import salt.utils.dictupdate + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.utils.dictupdate # pylint: disable=import-outside-toplevel if defaults is None: defaults = {} @@ -89,8 +91,10 @@ def verify_config(cls, config): """ Verify the configuration dictionary. """ - import salt.config - import salt.utils.verify + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.config # pylint: disable=import-outside-toplevel + import salt.utils.verify # pylint: disable=import-outside-toplevel prepend_root_dirs = [ "formula_path", @@ -127,8 +131,10 @@ def write_config(cls, config): """ Verify the loaded configuration. """ - import salt.config - import salt.utils.verify + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.config # pylint: disable=import-outside-toplevel + import salt.utils.verify # pylint: disable=import-outside-toplevel cls.verify_config(config) config_file = config.pop("spm_conf_file") diff --git a/src/saltfactories/client.py b/src/saltfactories/client.py index 9a820dd1..b3d0d429 100644 --- a/src/saltfactories/client.py +++ b/src/saltfactories/client.py @@ -39,7 +39,9 @@ def _set_functions_known_to_return_none(self): @__client.default def _set_client(self): - import salt.client + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.client # pylint: disable=import-outside-toplevel return salt.client.get_local_client(mopts=self.master_config) diff --git a/src/saltfactories/daemons/master.py b/src/saltfactories/daemons/master.py index 9c40f31d..382f4998 100644 --- a/src/saltfactories/daemons/master.py +++ b/src/saltfactories/daemons/master.py @@ -69,8 +69,10 @@ def default_config( """ Return the default configuration. """ - import salt.config - import salt.utils.dictupdate + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.config # pylint: disable=import-outside-toplevel + import salt.utils.dictupdate # pylint: disable=import-outside-toplevel if defaults is None: defaults = {} @@ -274,7 +276,9 @@ def load_config(cls, config_file, config): """ Return the loaded configuration. """ - import salt.config + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.config # pylint: disable=import-outside-toplevel return salt.config.master_config(config_file) diff --git a/src/saltfactories/daemons/minion.py b/src/saltfactories/daemons/minion.py index 1045b88d..a079c86c 100644 --- a/src/saltfactories/daemons/minion.py +++ b/src/saltfactories/daemons/minion.py @@ -49,8 +49,10 @@ def default_config( """ Return the default configuration. """ - import salt.config - import salt.utils.dictupdate + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.config # pylint: disable=import-outside-toplevel + import salt.utils.dictupdate # pylint: disable=import-outside-toplevel if defaults is None: defaults = {} @@ -224,7 +226,9 @@ def load_config(cls, config_file, config): """ Return the loaded configuration. """ - import salt.config + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.config # pylint: disable=import-outside-toplevel return salt.config.minion_config(config_file, minion_id=config["id"], cache_minion_id=True) diff --git a/src/saltfactories/daemons/proxy.py b/src/saltfactories/daemons/proxy.py index 8f1a8b19..37a13b9d 100644 --- a/src/saltfactories/daemons/proxy.py +++ b/src/saltfactories/daemons/proxy.py @@ -71,8 +71,10 @@ def default_config( """ Return the default configuration. """ - import salt.config - import salt.utils.dictupdate + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.config # pylint: disable=import-outside-toplevel + import salt.utils.dictupdate # pylint: disable=import-outside-toplevel if defaults is None: defaults = {} @@ -240,7 +242,9 @@ def load_config(cls, config_file, config): """ Return the loaded configuration. """ - import salt.config + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.config # pylint: disable=import-outside-toplevel return salt.config.proxy_config(config_file, minion_id=config["id"], cache_minion_id=True) diff --git a/src/saltfactories/daemons/syndic.py b/src/saltfactories/daemons/syndic.py index ec2467b8..afbd986d 100644 --- a/src/saltfactories/daemons/syndic.py +++ b/src/saltfactories/daemons/syndic.py @@ -34,8 +34,10 @@ def default_config( """ Return the default configuration. """ - import salt.config - import salt.utils.dictupdate + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.config # pylint: disable=import-outside-toplevel + import salt.utils.dictupdate # pylint: disable=import-outside-toplevel if defaults is None: defaults = {} @@ -143,7 +145,9 @@ def load_config(cls, config_file, config): """ Return the loaded configuration. """ - import salt.config + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.config # pylint: disable=import-outside-toplevel conf_dir = pathlib.Path(config_file).parent.parent master_config_file = str(conf_dir / "master") diff --git a/src/saltfactories/manager.py b/src/saltfactories/manager.py index 0c139cd8..c4736313 100644 --- a/src/saltfactories/manager.py +++ b/src/saltfactories/manager.py @@ -162,7 +162,10 @@ def final_minion_config_tweaks(self, config): pytest_key = "pytest-minion" if pytest_key not in config: # pragma: no cover config[pytest_key] = {} - config[pytest_key]["returner_address"] = self.event_listener.address + config[pytest_key]["returner_address"] = { + "host": self.event_listener.host, + "port": self.event_listener.port, + } self.final_common_config_tweaks(config, "minion") def final_master_config_tweaks(self, config): @@ -172,7 +175,10 @@ def final_master_config_tweaks(self, config): pytest_key = "pytest-master" if pytest_key not in config: # pragma: no cover config[pytest_key] = {} - config[pytest_key]["returner_address"] = self.event_listener.address + config[pytest_key]["returner_address"] = { + "host": self.event_listener.host, + "port": self.event_listener.port, + } self.final_common_config_tweaks(config, "master") def final_syndic_config_tweaks(self, config): diff --git a/src/saltfactories/plugins/__init__.py b/src/saltfactories/plugins/__init__.py index 2e4a0280..315f248a 100644 --- a/src/saltfactories/plugins/__init__.py +++ b/src/saltfactories/plugins/__init__.py @@ -80,7 +80,9 @@ def pytest_load_initial_conftests(*_): """ Register our pytest helpers. """ - import salt.version + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.version # pylint: disable=import-outside-toplevel if salt.version.__saltstack_version__ < "3004": raise pytest.UsageError("Only salt>=3004 is supported") diff --git a/src/saltfactories/plugins/event_listener.py b/src/saltfactories/plugins/event_listener.py index b83d26b7..2f7e3684 100644 --- a/src/saltfactories/plugins/event_listener.py +++ b/src/saltfactories/plugins/event_listener.py @@ -3,6 +3,7 @@ A salt events store for all daemons started by salt-factories """ +import asyncio import copy import fnmatch import logging @@ -13,9 +14,8 @@ from datetime import timedelta import attr -import msgpack +import msgpack.exceptions import pytest -import zmq from pytestshellutils.utils import ports from pytestshellutils.utils import time @@ -116,7 +116,46 @@ def __iter__(self): return iter(self.matches) -@attr.s(kw_only=True, slots=True, hash=True) +class EventListenerServer(asyncio.Protocol): + """ + TCP Server to receive events forwarded. + """ + + def __init__(self, _event_listener, *args, **kwargs): + self._event_listener = _event_listener + super().__init__(*args, **kwargs) + + def connection_made(self, transport): + """ + Connection established. + """ + peername = transport.get_extra_info("peername") + log.debug("Connection from %s", peername) + # pylint: disable=attribute-defined-outside-init + self.transport = transport + self.unpacker = msgpack.Unpacker(raw=False) + # pylint: enable=attribute-defined-outside-init + + def data_received(self, data): + """ + Received data. + """ + try: + self.unpacker.feed(data) + except msgpack.exceptions.BufferFull: + # Start over loosing some data?! + self.unpacker = msgpack.Unpacker( # pylint: disable=attribute-defined-outside-init + raw=False + ) + self.unpacker.feed(data) + for payload in self.unpacker: + if payload is None: + self.transport.close() + break + self._event_listener._process_event_payload(payload) + + +@attr.s(kw_only=True, slots=True, hash=False) class EventListener: """ EventListener implementation. @@ -129,109 +168,108 @@ class EventListener: """ timeout = attr.ib(default=120) + host = attr.ib(init=False, repr=False) + port = attr.ib(init=False, repr=False) address = attr.ib(init=False) store = attr.ib(init=False, repr=False, hash=False) - sentinel = attr.ib(init=False, repr=False, hash=False) - sentinel_event = attr.ib(init=False, repr=False, hash=False) running_event = attr.ib(init=False, repr=False, hash=False) running_thread = attr.ib(init=False, repr=False, hash=False) cleanup_thread = attr.ib(init=False, repr=False, hash=False) auth_event_handlers = attr.ib(init=False, repr=False, hash=False) + loop = attr.ib(init=False, repr=False, hash=False) + server = attr.ib(init=False, repr=False, hash=False) + server_running_event = attr.ib(init=False, repr=False, hash=False) def __attrs_post_init__(self): """ Post attrs initialization routines. """ self.store = deque(maxlen=10000) - self.address = "tcp://127.0.0.1:{}".format(ports.get_unused_localhost_port()) + self.host = "127.0.0.1" + self.port = ports.get_unused_localhost_port() + self.address = f"tcp://{self.host}:{self.port}" self.running_event = threading.Event() - self.running_thread = threading.Thread(target=self._run) self.cleanup_thread = threading.Thread(target=self._cleanup) - self.sentinel = msgpack.dumps(None) - self.sentinel_event = threading.Event() self.auth_event_handlers = weakref.WeakValueDictionary() + self.loop = asyncio.new_event_loop() + self.server = None + self.server_running_event = threading.Event() + self.running_thread = threading.Thread(target=self._run_loop_in_thread, args=(self.loop,)) - def _run(self): - context = zmq.Context() - puller = context.socket(zmq.PULL) # pylint: disable=no-member + def _run_loop_in_thread(self, loop): + asyncio.set_event_loop(loop) try: - log.debug("%s Binding PULL socket to %s", self, self.address) - puller.bind(self.address) - if msgpack.version >= (0, 5, 2): - msgpack_kwargs = {"raw": False} - else: # pragma: no cover - msgpack_kwargs = {"encoding": "utf-8"} - log.debug("%s started", self) - self.running_event.set() - while self.running_event.is_set(): - payload = puller.recv() - if payload == self.sentinel: - log.info("%s Received stop sentinel...", self) - self.sentinel_event.set() - break - try: - decoded = msgpack.loads(payload, **msgpack_kwargs) - except ValueError: # pragma: no cover - log.error( - "%s Failed to msgpack.load message with payload: %s", - self, - payload, - exc_info=True, - ) - continue - if decoded is None: - log.info("%s Received stop sentinel...", self) - self.sentinel_event.set() - break - try: - daemon_id, tag, data = decoded - # Salt's event data has some "private" keys, for example, "_stamp" which - # get in the way of direct assertions. - # We'll just store a full_data attribute and clean up the regular data of these keys - full_data = copy.deepcopy(data) - for key in list(data): - if key.startswith("_"): - data.pop(key) - event = Event( - daemon_id=daemon_id, - tag=tag, - stamp=full_data["_stamp"], - data=data, - full_data=full_data, - expire_seconds=self.timeout, - ) - log.info("%s received event: %s", self, event) - self.store.append(event) - if tag == "salt/auth": - auth_event_callback = self.auth_event_handlers.get(daemon_id) - if auth_event_callback: - try: - auth_event_callback(data) - except Exception as exc: # pragma: no cover pylint: disable=broad-except - log.error( - "%s Error calling %r: %s", - self, - auth_event_callback, - exc, - exc_info=True, - ) - log.debug("%s store size after event received: %d", self, len(self.store)) - except Exception: # pragma: no cover pylint: disable=broad-except - log.error("%s Something funky happened", self, exc_info=True) - puller.close(0) - context.term() - # We need to keep these events stored, restart zmq socket - context = zmq.Context() - puller = context.socket(zmq.PULL) # pylint: disable=no-member - log.debug("%s Binding PULL socket to %s", self, self.address) - puller.bind(self.address) - except Exception as exc: - log.error("%s Something funky happened", self, exc_info=True) - raise exc from None + loop.run_until_complete(self._run_server()) finally: - puller.close(1500) - context.term() - log.debug("%s is no longer running", self) + log.debug("shutdown asyncgens") + loop.run_until_complete(loop.shutdown_asyncgens()) + log.debug("loop close") + loop.close() + + async def _run_server(self): + loop = asyncio.get_running_loop() + self.server = await self.loop.create_server( + lambda: EventListenerServer(self), + self.host, + self.port, + start_serving=False, + ) + + async with self.server: + loop.call_soon(self.server_running_event.set) + log.debug("%s server is starting", self) + await self.server.start_serving() + while self.server_running_event.is_set(): + await asyncio.sleep(1) + self.server.close() + log.debug("%s server await server close", self) + await self.server.wait_closed() + # await self.server.serve_forever() + log.debug("%s server stoppped", self) + + def _process_event_payload(self, decoded): + try: + daemon_id = decoded["id"] + tag = decoded["tag"] + data = decoded["data"] + # Salt's event data has some "private" keys, for example, "_stamp" which + # get in the way of direct assertions. + # We'll just store a full_data attribute and clean up the regular data of these keys + full_data = copy.deepcopy(data) + for key in list(data): + if key.startswith("_"): + data.pop(key) + event = Event( + daemon_id=daemon_id, + tag=tag, + stamp=full_data["_stamp"], + data=data, + full_data=full_data, + expire_seconds=self.timeout, + ) + log.info("%s received event: %s", self, event) + self.store.append(event) + if tag == "salt/auth": + auth_event_callback = self.auth_event_handlers.get(daemon_id) + if auth_event_callback: + try: + auth_event_callback(data) + except Exception as exc: # pragma: no cover pylint: disable=broad-except + log.error( + "%s Error calling %r: %s", + self, + auth_event_callback, + exc, + exc_info=True, + ) + log.debug( + "%s store(id: %s) size after event received: %d", + self, + id(self.store), + len(self.store), + ) + except Exception: # pragma: no cover pylint: disable=broad-except + log.error("%s Something funky happened", self, exc_info=True) def _cleanup(self): cleanup_at = time.time() + 30 @@ -274,11 +312,13 @@ def start(self): if self.running_event.is_set(): # pragma: no cover return log.debug("%s is starting", self) + self.running_event.set() self.running_thread.start() # Wait for the thread to start - if self.running_event.wait(5) is not True: - self.running_event.clear() + if self.server_running_event.wait(5) is not True: + self.server_running_event.clear() raise RuntimeError("Failed to start the event listener") + log.debug("%s is started", self) self.cleanup_thread.start() def stop(self): @@ -290,21 +330,8 @@ def stop(self): log.debug("%s is stopping", self) self.store.clear() self.auth_event_handlers.clear() - context = zmq.Context() - push = context.socket(zmq.PUSH) # pylint: disable=no-member - push.connect(self.address) - try: - push.send(self.sentinel) - log.debug("%s Sent sentinel to trigger log server shutdown", self) - if self.sentinel_event.wait(5) is not True: # pragma: no cover - log.warning( - "%s Failed to wait for the reception of the stop sentinel message. Stopping anyway.", - self, - ) - finally: - push.close(1500) - context.term() self.running_event.clear() + self.server_running_event.clear() log.debug("%s Joining running thread...", self) self.running_thread.join(7) if self.running_thread.is_alive(): # pragma: no cover @@ -458,7 +485,7 @@ def unregister_auth_event_handler(self, master_id): @pytest.fixture(scope="session") -def event_listener(request): +def event_listener(): """ Event listener session scoped fixture. @@ -499,5 +526,5 @@ def test_send(event_listener, salt_master, salt_minion, salt_call_cli): assert event.data["cmd"] == "_minion_event" assert "event.fire" in event.data["data"] """ - with EventListener() as event_listener: - yield event_listener + with EventListener() as _event_listener: + yield _event_listener diff --git a/src/saltfactories/plugins/factories.py b/src/saltfactories/plugins/factories.py index 2e667502..e1dee94b 100644 --- a/src/saltfactories/plugins/factories.py +++ b/src/saltfactories/plugins/factories.py @@ -9,7 +9,6 @@ import pytest import saltfactories -from saltfactories.manager import FactoriesManager log = logging.getLogger(__name__) @@ -56,6 +55,10 @@ def salt_factories( """ Instantiate the salt factories manager. """ + # Do not move this deferred import. It allows running against a Salt onedir build + # in salt's repo checkout. + from saltfactories.manager import FactoriesManager # pylint: disable=import-outside-toplevel + if not isinstance(salt_factories_config, dict): raise pytest.UsageError("The 'salt_factories_config' fixture MUST return a dictionary") if salt_factories_config: diff --git a/src/saltfactories/plugins/log_server.py b/src/saltfactories/plugins/log_server.py index 3de8b00b..22694262 100644 --- a/src/saltfactories/plugins/log_server.py +++ b/src/saltfactories/plugins/log_server.py @@ -129,6 +129,10 @@ def process_logs(self): puller.set_hwm(self.socket_hwm) exit_timeout_seconds = 5 exit_timeout = None + if msgpack.version >= (0, 5, 2): + msgpack_kwargs = dict(raw=False) + else: # pragma: no cover + msgpack_kwargs = dict(encoding="utf-8") try: puller.bind(address) except zmq.ZMQError: # pragma: no cover @@ -159,10 +163,7 @@ def process_logs(self): if not poller.poll(1000): continue msg = puller.recv() - if msgpack.version >= (0, 5, 2): - record_dict = msgpack.loads(msg, raw=False) - else: # pragma: no cover - record_dict = msgpack.loads(msg, encoding="utf-8") + record_dict = msgpack.loads(msg, **msgpack_kwargs) if record_dict is None: # A sentinel to stop processing the queue log.info("%s Received the sentinel to shutdown", self) diff --git a/src/saltfactories/plugins/sysinfo.py b/src/saltfactories/plugins/sysinfo.py index b2c227b7..5a155b9e 100644 --- a/src/saltfactories/plugins/sysinfo.py +++ b/src/saltfactories/plugins/sysinfo.py @@ -113,10 +113,15 @@ def pytest_sessionstart(session): :param _pytest.main.Session session: the pytest session object """ + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + # pylint: disable=import-outside-toplevel import salt.config import salt.loader - import salt.version import salt.utils.yaml + import salt.version + + # pylint: enable=import-outside-toplevel # Let PyTest do its own thing yield diff --git a/src/saltfactories/utils/__init__.py b/src/saltfactories/utils/__init__.py index 7d200194..dba4d719 100644 --- a/src/saltfactories/utils/__init__.py +++ b/src/saltfactories/utils/__init__.py @@ -45,7 +45,9 @@ def running_username(): """ Return the username that is running the code. """ - import salt.utils.user + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.utils.user # pylint: disable=import-outside-toplevel return salt.utils.user.get_user() diff --git a/src/saltfactories/utils/functional.py b/src/saltfactories/utils/functional.py index 74e4f962..36e5708e 100644 --- a/src/saltfactories/utils/functional.py +++ b/src/saltfactories/utils/functional.py @@ -12,7 +12,6 @@ from pytestshellutils.utils import format_callback_to_string from pytestshellutils.utils.processes import MatchString - PATCH_TARGET = "salt.loader.lazy.LOADED_BASE_NAME" log = logging.getLogger(__name__) @@ -54,11 +53,13 @@ def reset_loaders_state(loaders): """ def __init__(self, opts, loaded_base_name=None): + self.opts = opts if loaded_base_name is None: - from salt.loader.lazy import LOADED_BASE_NAME + # Do not move this deferred import. It allows running against a Salt onedir build + # in salt's repo checkout. + from salt.loader.lazy import LOADED_BASE_NAME # pylint: disable=import-outside-toplevel loaded_base_name = LOADED_BASE_NAME - self.opts = opts self.loaded_base_name = loaded_base_name self.context = {} self._cachedir = pathlib.Path(opts["cachedir"]) @@ -72,7 +73,9 @@ def __init__(self, opts, loaded_base_name=None): self._states = None self._utils = None - import salt.features + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.features # pylint: disable=import-outside-toplevel salt.features.setup_features(self.opts) self.reload_all() @@ -128,7 +131,9 @@ def grains(self): """ The grains loaded by the salt loader. """ - import salt.loader + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.loader # pylint: disable=import-outside-toplevel if self._grains is None: try: @@ -148,7 +153,9 @@ def utils(self): """ The utils loaded by the salt loader. """ - import salt.loader + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.loader # pylint: disable=import-outside-toplevel if self._utils is None: try: @@ -168,7 +175,9 @@ def modules(self): """ The execution modules loaded by the salt loader. """ - import salt.loader + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.loader # pylint: disable=import-outside-toplevel if self._modules is None: _modules = salt.loader.minion_mods( @@ -224,7 +233,9 @@ def serializers(self): """ The serializers loaded by the salt loader. """ - import salt.loader + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.loader # pylint: disable=import-outside-toplevel if self._serializers is None: try: @@ -245,7 +256,9 @@ def states(self): """ The state modules loaded by the salt loader. """ - import salt.loader + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.loader # pylint: disable=import-outside-toplevel if self._states is None: try: @@ -310,7 +323,9 @@ def pillar(self): """ The pillar loaded by the salt loader. """ - import salt.pillar + # Do not move these deferred imports. It allows running against a Salt + # onedir build in salt's repo checkout. + import salt.pillar # pylint: disable=import-outside-toplevel if self._pillar is None: try: diff --git a/src/saltfactories/utils/loader.py b/src/saltfactories/utils/loader.py index e5e10b52..d9f71875 100644 --- a/src/saltfactories/utils/loader.py +++ b/src/saltfactories/utils/loader.py @@ -11,14 +11,14 @@ import pytest from pytestshellutils.utils import format_callback_to_string -log = logging.getLogger(__name__) - try: LOGGING_TRACE_LEVEL = logging.TRACE except AttributeError: # Salt's logging hasn't been setup yet LOGGING_TRACE_LEVEL = 5 +log = logging.getLogger(__name__) + @attr.s(init=True, slots=True, frozen=True) class LoaderModuleMock: diff --git a/src/saltfactories/utils/saltext/engines/pytest_engine.py b/src/saltfactories/utils/saltext/engines/pytest_engine.py index cf2f6a87..6d0f9de6 100644 --- a/src/saltfactories/utils/saltext/engines/pytest_engine.py +++ b/src/saltfactories/utils/saltext/engines/pytest_engine.py @@ -5,25 +5,29 @@ Simple salt engine which will setup a socket to accept connections allowing us to know when a daemon is up and running """ +import asyncio import atexit import datetime import logging import threading +import time +from collections import deque from collections.abc import MutableMapping +import salt.utils.event +import salt.utils.immutabletypes as immutabletypes + +try: + from salt.utils.data import CaseInsensitiveDict +except ImportError: + CaseInsensitiveDict = None + try: import msgpack HAS_MSGPACK = True except ImportError: # pragma: no cover HAS_MSGPACK = False -try: - import zmq - - HAS_ZMQ = True -except ImportError: # pragma: no cover - HAS_ZMQ = False - log = logging.getLogger(__name__) @@ -33,8 +37,6 @@ def __virtual__(): if HAS_MSGPACK is False: return False, "msgpack was not importable. Please install msgpack." - if HAS_ZMQ is False: - return False, "zmq was not importable. Please install pyzmq." if "__role" not in __opts__: return False, "The required '__role' key could not be found in the options dictionary" role = __opts__["__role"] @@ -65,16 +67,6 @@ def ext_type_encoder(obj): """ Convert any types that msgpack cannot handle on it's own. """ - try: - import salt.utils.immutabletypes as immutabletypes - except ImportError: - immutabletypes = None - - try: - from salt.utils.data import CaseInsensitiveDict - except ImportError: - CaseInsensitiveDict = None - if isinstance(obj, (datetime.datetime, datetime.date)): # msgpack doesn't support datetime.datetime and datetime.date datatypes. return obj.strftime("%Y%m%dT%H:%M:%S.%f") @@ -97,26 +89,141 @@ def ext_type_encoder(obj): return obj +class PyTestEventForwardClient(asyncio.Protocol): + """ + TCP Client to forward events. + """ + + def __init__(self, connection_lost_future, queue, client_running_event): + self.connection_lost_future = connection_lost_future + self.queue = queue + self.task = None + self.running = client_running_event + + def connection_made(self, transport): + """ + Connection established. + """ + peername = transport.get_extra_info("peername") + log.debug("%s: Connected to %s", self.__class__.__name__, peername) + # pylint: disable=attribute-defined-outside-init + self.transport = transport + loop = asyncio.get_running_loop() + self.task = loop.create_task(self._process_queue()) + # pylint: enable=attribute-defined-outside-init + + def connection_lost(self, exc): + """ + Connection lost. + """ + log.debug("%s: The server closed the connection", self.__class__.__name__) + self.connection_lost_future.set_result(True) + if self.task is not None: + self.task.cancel() + + async def _process_queue(self): + self.running.set() + log.info("%s: Now processing the queue", self.__class__.__name__) + restarts = 0 + while True: + if restarts > 10: + break + if not self.running.is_set(): + self.connection_lost_future.set_result(True) + break + try: + try: + payload = self.queue.popleft() + except IndexError: + await asyncio.sleep(1) + continue + if payload is None: + return + dumped = msgpack.packb(payload, use_bin_type=True, default=ext_type_encoder) + self.transport.write(dumped) + log.info("%s: forwarded event: %r", self.__class__.__name__, payload) + except asyncio.CancelledError: + break + except Exception as exc: # pylint: disable=broad-except + log.exception( + "%s: Caught exception while pulling data from queue: %s", + self.__class__.__name__, + exc, + ) + restarts += 1 + + class PyTestEventForwardEngine: """ Salt Engine instance. """ - __slots__ = ("opts", "id", "role", "returner_address", "running_event") + __slots__ = ( + "opts", + "id", + "role", + "returner_address_host", + "returner_address_port", + "running_event", + "client_running_event", + "loop", + "client", + "queue", + "running_thread", + ) def __init__(self, opts): self.opts = opts self.id = self.opts["id"] # pylint: disable=invalid-name self.role = self.opts["__role"] - self.returner_address = self.opts["pytest-{}".format(self.role)]["returner_address"] + returner_address = self.opts["pytest-{}".format(self.role)]["returner_address"] + self.returner_address_host = returner_address["host"] + self.returner_address_port = returner_address["port"] self.running_event = threading.Event() + self.client_running_event = threading.Event() + self.loop = asyncio.new_event_loop() + self.client = None + self.queue = deque(maxlen=1000) + self.running_thread = threading.Thread(target=self._run_loop_in_thread, args=(self.loop,)) + + def _run_loop_in_thread(self, loop): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(self._run_client(loop)) + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() + + async def _run_client(self, loop): + on_con_lost = loop.create_future() + log.debug( + "%s client connecting to %s:%s", + self.__class__.__name__, + self.returner_address_host, + self.returner_address_port, + ) + self.client = PyTestEventForwardClient(on_con_lost, self.queue, self.client_running_event) + transport, _ = await loop.create_connection( + lambda: self.client, + self.returner_address_host, + self.returner_address_port, + ) + # Wait until the protocol signals that the connection + # is lost and close the transport. + try: + log.info("%s client started", self.__class__.__name__) + await on_con_lost + finally: + transport.close() def __repr__(self): # noqa: D105 - return "<{} role={!r} id={!r}, returner_address={!r} running={!r}>".format( + return "<{} role={!r} id={!r}, returner_address='{}:{}' running={!r}>".format( self.__class__.__name__, self.role, self.id, - self.returner_address, + self.returner_address_host, + self.returner_address_port, self.running_event.is_set(), ) @@ -124,19 +231,26 @@ def start(self): """ Start the engine. """ - import salt.utils.event - if self.running_event.is_set(): return log.info("%s is starting", self) atexit.register(self.stop) self.running_event.set() + self.running_thread.start() + timeout_at = time.time() + 10 + while True: + log.info("Waiting for %s.client to start...", self.__class__.__name__) + if time.time() > timeout_at: + raise Exception("Failed to start client") + if self.client is None: + time.sleep(1) + continue + if not self.client_running_event.is_set(): + time.sleep(1) + continue + break try: - context = zmq.Context() - push = context.socket(zmq.PUSH) # pylint: disable=no-member - log.debug("%s connecting PUSH socket to %s", self, self.returner_address) - push.connect(self.returner_address) opts = self.opts.copy() opts["file_client"] = "local" with salt.utils.event.get_event( @@ -158,25 +272,12 @@ def start(self): tag = event["tag"] data = event["data"] log.debug("%s Received Event; TAG: %r DATA: %r", self, tag, data) - forward = (self.id, tag, data) - try: - dumped = msgpack.dumps( - forward, use_bin_type=True, default=ext_type_encoder - ) - push.send(dumped) - log.info("%s forwarded event: %r", self, forward) - except Exception: # pragma: no cover pylint: disable=broad-except - log.error( - "%s failed to forward event: %r", self, forward, exc_info=True - ) + forward = {"id": self.id, "tag": tag, "data": data} + self.queue.append(forward) finally: if self.running_event.is_set(): # Some exception happened, unset self.running_event.clear() - if not push.closed: - push.close(1500) - if not context.closed: - context.term() def stop(self): """ @@ -187,4 +288,7 @@ def stop(self): log.info("Stopping %s", self) self.running_event.clear() + self.queue.append(None) + self.client_running_event.clear() + self.running_thread.join() log.info("%s stopped", self)