From 1721845f99c87f5ead07c6dc273930509e2361a6 Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Mon, 29 Aug 2022 10:17:04 -0700 Subject: [PATCH 01/13] break out the code that handles the sub proxies into its own function --- salt/metaproxy/deltaproxy.py | 288 +++++++++++++++++++---------------- salt/minion.py | 10 ++ 2 files changed, 165 insertions(+), 133 deletions(-) diff --git a/salt/metaproxy/deltaproxy.py b/salt/metaproxy/deltaproxy.py index 73712b0418fe..83f8bbcd5bb8 100644 --- a/salt/metaproxy/deltaproxy.py +++ b/salt/metaproxy/deltaproxy.py @@ -2,7 +2,6 @@ # Proxy minion metaproxy modules # -import copy import logging import os import signal @@ -320,163 +319,186 @@ def post_master_init(self, master): self.proxy_pillar = {} self.proxy_context = {} self.add_periodic_callback("cleanup", self.cleanup_subprocesses) - for _id in self.opts["proxy"].get("ids", []): - control_id = self.opts["id"] - proxyopts = self.opts.copy() - proxyopts["id"] = _id + responses = yield { + _id: self.subproxy_post_master_init(_id, uid) + for _id in self.opts["proxy"].get("ids", []) + } + + log.debug("=== responses %s ===", responses.keys()) + for _id in responses: + self.deltaproxy_objs[_id], self.deltaproxy_opts[_id] = responses[_id] + self.ready = True - proxyopts = salt.config.proxy_config( - self.opts["conf_file"], defaults=proxyopts, minion_id=_id - ) - proxyopts["id"] = proxyopts["proxyid"] = _id - proxyopts["subproxy"] = True +def subproxy_post_master_init(self, minion_id, uid): + """ + Function to finish init after a deltaproxy proxy + minion has finished connecting to a master. - self.proxy_context[_id] = {"proxy_id": _id} + This is primarily loading modules, pillars, etc. (since they need + to know which master they connected to) for the sub proxy minions. + """ + proxy_grains = {} + proxy_pillar = {} - # We need grains first to be able to load pillar, which is where we keep the proxy - # configurations - self.proxy_grains[_id] = salt.loader.grains( - proxyopts, proxy=self.proxy, context=self.proxy_context[_id] - ) - self.proxy_pillar[_id] = yield salt.pillar.get_async_pillar( - proxyopts, - self.proxy_grains[_id], - _id, - saltenv=proxyopts["saltenv"], - pillarenv=proxyopts.get("pillarenv"), - ).compile_pillar() + control_id = self.opts["id"] + proxyopts = self.opts.copy() - proxyopts["proxy"] = self.proxy_pillar[_id].get("proxy", {}) - if not proxyopts["proxy"]: - log.warning( - "Pillar data for proxy minion %s could not be loaded, skipping.", _id - ) - continue + proxyopts = salt.config.proxy_config( + self.opts["conf_file"], defaults=proxyopts, minion_id=minion_id + ) + proxyopts.update({"id": minion_id, "proxyid": minion_id, "subproxy": True}) - # Remove ids - proxyopts["proxy"].pop("ids", None) + self.proxy_context[minion_id] = {"proxy_id": minion_id} - proxyopts["pillar"] = self.proxy_pillar[_id] - proxyopts["grains"] = self.proxy_grains[_id] + # We need grains first to be able to load pillar, which is where we keep the proxy + # configurations + proxy_grains[minion_id] = salt.loader.grains( + proxyopts, proxy=self.proxy, context=self.proxy_context[minion_id] + ) + proxy_pillar[minion_id] = yield salt.pillar.get_async_pillar( + proxyopts, + proxy_grains[minion_id], + minion_id, + saltenv=proxyopts["saltenv"], + pillarenv=proxyopts.get("pillarenv"), + ).compile_pillar() + + proxyopts["proxy"] = proxy_pillar[minion_id].get("proxy", {}) + if not proxyopts["proxy"]: + log.warning( + "Pillar data for proxy minion %s could not be loaded, skipping.", minion_id + ) + return None, None - proxyopts["hash_id"] = self.opts["id"] + # Remove ids + proxyopts["proxy"].pop("ids", None) - _proxy_minion = ProxyMinion(proxyopts) - _proxy_minion.proc_dir = salt.minion.get_proc_dir( - proxyopts["cachedir"], uid=uid - ) + proxyopts.update( + { + "pillar": proxy_pillar[minion_id], + "grains": proxy_grains[minion_id], + "hash_id": self.opts["id"], + } + ) - _proxy_minion.proxy = salt.loader.proxy( - proxyopts, utils=self.utils, context=self.proxy_context[_id] - ) - _proxy_minion.subprocess_list = self.subprocess_list + _proxy_minion = ProxyMinion(proxyopts) + _proxy_minion.proc_dir = salt.minion.get_proc_dir(proxyopts["cachedir"], uid=uid) - # And load the modules - ( - _proxy_minion.functions, - _proxy_minion.returners, - _proxy_minion.function_errors, - _proxy_minion.executors, - ) = _proxy_minion._load_modules( - opts=proxyopts, grains=proxyopts["grains"], context=self.proxy_context[_id] - ) + _proxy_minion.proxy = salt.loader.proxy( + proxyopts, utils=self.utils, context=self.proxy_context[minion_id] + ) + _proxy_minion.subprocess_list = self.subprocess_list - # we can then sync any proxymodules down from the master - # we do a sync_all here in case proxy code was installed by - # SPM or was manually placed in /srv/salt/_modules etc. - _proxy_minion.functions["saltutil.sync_all"](saltenv=self.opts["saltenv"]) + # And load the modules + ( + _proxy_minion.functions, + _proxy_minion.returners, + _proxy_minion.function_errors, + _proxy_minion.executors, + ) = _proxy_minion._load_modules( + opts=proxyopts, + grains=proxyopts["grains"], + context=self.proxy_context[minion_id], + ) - # And re-load the modules so the __proxy__ variable gets injected - ( - _proxy_minion.functions, - _proxy_minion.returners, - _proxy_minion.function_errors, - _proxy_minion.executors, - ) = _proxy_minion._load_modules( - opts=proxyopts, grains=proxyopts["grains"], context=self.proxy_context[_id] - ) + # we can then sync any proxymodules down from the master + # we do a sync_all here in case proxy code was installed by + # SPM or was manually placed in /srv/salt/_modules etc. + _proxy_minion.functions["saltutil.sync_all"](saltenv=self.opts["saltenv"]) - _proxy_minion.functions.pack["__proxy__"] = _proxy_minion.proxy - _proxy_minion.proxy.pack["__salt__"] = _proxy_minion.functions - _proxy_minion.proxy.pack["__ret__"] = _proxy_minion.returners - _proxy_minion.proxy.pack["__pillar__"] = proxyopts["pillar"] - _proxy_minion.proxy.pack["__grains__"] = proxyopts["grains"] + # And re-load the modules so the __proxy__ variable gets injected + ( + _proxy_minion.functions, + _proxy_minion.returners, + _proxy_minion.function_errors, + _proxy_minion.executors, + ) = _proxy_minion._load_modules( + opts=proxyopts, + grains=proxyopts["grains"], + context=self.proxy_context[minion_id], + ) - # Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__ - _proxy_minion.proxy.utils = salt.loader.utils( - proxyopts, proxy=_proxy_minion.proxy, context=self.proxy_context[_id] - ) + _proxy_minion.functions.pack["__proxy__"] = _proxy_minion.proxy + _proxy_minion.proxy.pack["__salt__"] = _proxy_minion.functions + _proxy_minion.proxy.pack["__ret__"] = _proxy_minion.returners + _proxy_minion.proxy.pack["__pillar__"] = proxyopts["pillar"] + _proxy_minion.proxy.pack["__grains__"] = proxyopts["grains"] - _proxy_minion.proxy.pack["__utils__"] = _proxy_minion.proxy.utils + # Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__ + _proxy_minion.proxy.utils = salt.loader.utils( + proxyopts, proxy=_proxy_minion.proxy, context=self.proxy_context[minion_id] + ) - # Reload all modules so all dunder variables are injected - _proxy_minion.proxy.reload_modules() + _proxy_minion.proxy.pack["__utils__"] = _proxy_minion.proxy.utils - _proxy_minion.connected = True + # Reload all modules so all dunder variables are injected + _proxy_minion.proxy.reload_modules() - _fq_proxyname = proxyopts["proxy"]["proxytype"] + _proxy_minion.connected = True - proxy_init_fn = _proxy_minion.proxy[_fq_proxyname + ".init"] - try: - proxy_init_fn(proxyopts) - except Exception as exc: # pylint: disable=broad-except - log.error( - "An exception occured during the initialization of minion %s: %s", - _id, - exc, - exc_info=True, - ) - continue + _fq_proxyname = proxyopts["proxy"]["proxytype"] - # Reload the grains - self.proxy_grains[_id] = salt.loader.grains( - proxyopts, proxy=_proxy_minion.proxy, context=self.proxy_context[_id] + proxy_init_fn = _proxy_minion.proxy[_fq_proxyname + ".init"] + try: + proxy_init_fn(proxyopts) + except Exception as exc: # pylint: disable=broad-except + log.error( + "An exception occured during the initialization of minion %s: %s", + minion_id, + exc, + exc_info=True, ) - proxyopts["grains"] = self.proxy_grains[_id] - - if not hasattr(_proxy_minion, "schedule"): - _proxy_minion.schedule = salt.utils.schedule.Schedule( - proxyopts, - _proxy_minion.functions, - _proxy_minion.returners, - cleanup=[salt.minion.master_event(type="alive")], - proxy=_proxy_minion.proxy, - new_instance=True, - _subprocess_list=_proxy_minion.subprocess_list, - ) + yield None, None - self.deltaproxy_objs[_id] = _proxy_minion - self.deltaproxy_opts[_id] = copy.deepcopy(proxyopts) + # Reload the grains + self.proxy_grains[minion_id] = salt.loader.grains( + proxyopts, proxy=_proxy_minion.proxy, context=self.proxy_context[minion_id] + ) + proxyopts["grains"] = self.proxy_grains[minion_id] - # proxy keepalive - _proxy_alive_fn = _fq_proxyname + ".alive" - if ( - _proxy_alive_fn in _proxy_minion.proxy - and "status.proxy_reconnect" in self.deltaproxy_objs[_id].functions - and proxyopts.get("proxy_keep_alive", True) - ): - # if `proxy_keep_alive` is either not specified, either set to False does not retry reconnecting - _proxy_minion.schedule.add_job( - { - "__proxy_keepalive": { - "function": "status.proxy_reconnect", - "minutes": proxyopts.get( - "proxy_keep_alive_interval", 1 - ), # by default, check once per minute - "jid_include": True, - "maxrunning": 1, - "return_job": False, - "kwargs": {"proxy_name": _fq_proxyname}, - } - }, - persist=True, - ) - _proxy_minion.schedule.enable_schedule() - else: - _proxy_minion.schedule.delete_job("__proxy_keepalive", persist=True) + if not hasattr(_proxy_minion, "schedule"): + _proxy_minion.schedule = salt.utils.schedule.Schedule( + proxyopts, + _proxy_minion.functions, + _proxy_minion.returners, + cleanup=[salt.minion.master_event(type="alive")], + proxy=_proxy_minion.proxy, + new_instance=True, + _subprocess_list=_proxy_minion.subprocess_list, + ) - self.ready = True + # self.deltaproxy_objs[minion_id] = _proxy_minion + # self.deltaproxy_opts[minion_id] = copy.deepcopy(proxyopts) + + # proxy keepalive + _proxy_alive_fn = _fq_proxyname + ".alive" + if ( + _proxy_alive_fn in _proxy_minion.proxy + and "status.proxy_reconnect" in self.deltaproxy_objs[minion_id].functions + and proxyopts.get("proxy_keep_alive", True) + ): + # if `proxy_keep_alive` is either not specified, either set to False does not retry reconnecting + _proxy_minion.schedule.add_job( + { + "__proxy_keepalive": { + "function": "status.proxy_reconnect", + "minutes": proxyopts.get( + "proxy_keep_alive_interval", 1 + ), # by default, check once per minute + "jid_include": True, + "maxrunning": 1, + "return_job": False, + "kwargs": {"proxy_name": _fq_proxyname}, + } + }, + persist=True, + ) + _proxy_minion.schedule.enable_schedule() + else: + _proxy_minion.schedule.delete_job("__proxy_keepalive", persist=True) + + return (_proxy_minion, proxyopts) def target(cls, minion_instance, opts, data, connected): diff --git a/salt/minion.py b/salt/minion.py index edb04d87c075..79b4c33a2867 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -3798,6 +3798,16 @@ def _post_master_init(self, master): mp_call = _metaproxy_call(self.opts, "post_master_init") return mp_call(self, master) + @salt.ext.tornado.gen.coroutine + def subproxy_post_master_init(self, minion_id, uid): + """ + Function to finish init for the sub proxies + + :rtype : None + """ + mp_call = _metaproxy_call(self.opts, "subproxy_post_master_init") + return mp_call(self, minion_id, uid) + def tune_in(self, start=True): """ Lock onto the publisher. This is the main event loop for the minion From c9563abaa096a69e0a6db20cbf474c55a2fe7268 Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Fri, 16 Sep 2022 11:15:23 -0700 Subject: [PATCH 02/13] experiment with deltaproxy using concurrent.futures.as_completed --- salt/metaproxy/deltaproxy.py | 142 +++++++++++++++++++---------------- 1 file changed, 76 insertions(+), 66 deletions(-) diff --git a/salt/metaproxy/deltaproxy.py b/salt/metaproxy/deltaproxy.py index 83f8bbcd5bb8..6fe8e2c80427 100644 --- a/salt/metaproxy/deltaproxy.py +++ b/salt/metaproxy/deltaproxy.py @@ -2,6 +2,7 @@ # Proxy minion metaproxy modules # +import concurrent.futures import logging import os import signal @@ -319,18 +320,26 @@ def post_master_init(self, master): self.proxy_pillar = {} self.proxy_context = {} self.add_periodic_callback("cleanup", self.cleanup_subprocesses) - responses = yield { - _id: self.subproxy_post_master_init(_id, uid) - for _id in self.opts["proxy"].get("ids", []) - } - - log.debug("=== responses %s ===", responses.keys()) - for _id in responses: - self.deltaproxy_objs[_id], self.deltaproxy_opts[_id] = responses[_id] + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit( + subproxy_post_master_init, _id, uid, self.opts, self.proxy, self.utils + ) + for _id in self.opts["proxy"].get("ids", []) + ] + + for f in concurrent.futures.as_completed(futures): + sub_proxy_data = f.result() + minion_id = sub_proxy_data["proxy_opts"].get("id") + self.deltaproxy_opts[minion_id] = sub_proxy_data["proxy_opts"] + self.deltaproxy_objs[minion_id] = sub_proxy_data["proxy_minion"] + + self.deltaproxy_objs[minion_id].subprocess_list = self.subprocess_list + self.ready = True -def subproxy_post_master_init(self, minion_id, uid): +def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils): """ Function to finish init after a deltaproxy proxy minion has finished connecting to a master. @@ -338,25 +347,26 @@ def subproxy_post_master_init(self, minion_id, uid): This is primarily loading modules, pillars, etc. (since they need to know which master they connected to) for the sub proxy minions. """ + log.info("=== starting %s ===", minion_id) proxy_grains = {} proxy_pillar = {} - control_id = self.opts["id"] - proxyopts = self.opts.copy() + control_id = opts["id"] + proxyopts = opts.copy() proxyopts = salt.config.proxy_config( - self.opts["conf_file"], defaults=proxyopts, minion_id=minion_id + opts["conf_file"], defaults=proxyopts, minion_id=minion_id ) proxyopts.update({"id": minion_id, "proxyid": minion_id, "subproxy": True}) - self.proxy_context[minion_id] = {"proxy_id": minion_id} + proxy_context = {"proxy_id": minion_id} # We need grains first to be able to load pillar, which is where we keep the proxy # configurations proxy_grains[minion_id] = salt.loader.grains( - proxyopts, proxy=self.proxy, context=self.proxy_context[minion_id] + proxyopts, proxy=main_proxy, context=proxy_context ) - proxy_pillar[minion_id] = yield salt.pillar.get_async_pillar( + proxy_pillar[minion_id] = salt.pillar.get_pillar( proxyopts, proxy_grains[minion_id], minion_id, @@ -369,7 +379,7 @@ def subproxy_post_master_init(self, minion_id, uid): log.warning( "Pillar data for proxy minion %s could not be loaded, skipping.", minion_id ) - return None, None + return {"proxy_minion": None, "proxy_opts": None} # Remove ids proxyopts["proxy"].pop("ids", None) @@ -378,7 +388,7 @@ def subproxy_post_master_init(self, minion_id, uid): { "pillar": proxy_pillar[minion_id], "grains": proxy_grains[minion_id], - "hash_id": self.opts["id"], + "hash_id": opts["id"], } ) @@ -386,9 +396,9 @@ def subproxy_post_master_init(self, minion_id, uid): _proxy_minion.proc_dir = salt.minion.get_proc_dir(proxyopts["cachedir"], uid=uid) _proxy_minion.proxy = salt.loader.proxy( - proxyopts, utils=self.utils, context=self.proxy_context[minion_id] + proxyopts, utils=main_utils, context=proxy_context ) - _proxy_minion.subprocess_list = self.subprocess_list + # _proxy_minion.subprocess_list = self.subprocess_list # And load the modules ( @@ -399,13 +409,13 @@ def subproxy_post_master_init(self, minion_id, uid): ) = _proxy_minion._load_modules( opts=proxyopts, grains=proxyopts["grains"], - context=self.proxy_context[minion_id], + context=proxy_context, ) # we can then sync any proxymodules down from the master # we do a sync_all here in case proxy code was installed by # SPM or was manually placed in /srv/salt/_modules etc. - _proxy_minion.functions["saltutil.sync_all"](saltenv=self.opts["saltenv"]) + _proxy_minion.functions["saltutil.sync_all"](saltenv=opts["saltenv"]) # And re-load the modules so the __proxy__ variable gets injected ( @@ -416,7 +426,7 @@ def subproxy_post_master_init(self, minion_id, uid): ) = _proxy_minion._load_modules( opts=proxyopts, grains=proxyopts["grains"], - context=self.proxy_context[minion_id], + context=proxy_context, ) _proxy_minion.functions.pack["__proxy__"] = _proxy_minion.proxy @@ -427,7 +437,7 @@ def subproxy_post_master_init(self, minion_id, uid): # Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__ _proxy_minion.proxy.utils = salt.loader.utils( - proxyopts, proxy=_proxy_minion.proxy, context=self.proxy_context[minion_id] + proxyopts, proxy=_proxy_minion.proxy, context=proxy_context ) _proxy_minion.proxy.pack["__utils__"] = _proxy_minion.proxy.utils @@ -449,56 +459,56 @@ def subproxy_post_master_init(self, minion_id, uid): exc, exc_info=True, ) - yield None, None + return {"proxy_minion": None, "proxy_opts": None} # Reload the grains - self.proxy_grains[minion_id] = salt.loader.grains( - proxyopts, proxy=_proxy_minion.proxy, context=self.proxy_context[minion_id] - ) - proxyopts["grains"] = self.proxy_grains[minion_id] - - if not hasattr(_proxy_minion, "schedule"): - _proxy_minion.schedule = salt.utils.schedule.Schedule( - proxyopts, - _proxy_minion.functions, - _proxy_minion.returners, - cleanup=[salt.minion.master_event(type="alive")], - proxy=_proxy_minion.proxy, - new_instance=True, - _subprocess_list=_proxy_minion.subprocess_list, - ) + # self.proxy_grains[minion_id] = salt.loader.grains( + # proxyopts, proxy=_proxy_minion.proxy, context=self.proxy_context[minion_id] + # ) + # proxyopts["grains"] = self.proxy_grains[minion_id] + + # if not hasattr(_proxy_minion, "schedule"): + # _proxy_minion.schedule = salt.utils.schedule.Schedule( + # proxyopts, + # _proxy_minion.functions, + # _proxy_minion.returners, + # cleanup=[salt.minion.master_event(type="alive")], + # proxy=_proxy_minion.proxy, + # new_instance=True, + # _subprocess_list=_proxy_minion.subprocess_list, + # ) # self.deltaproxy_objs[minion_id] = _proxy_minion # self.deltaproxy_opts[minion_id] = copy.deepcopy(proxyopts) # proxy keepalive - _proxy_alive_fn = _fq_proxyname + ".alive" - if ( - _proxy_alive_fn in _proxy_minion.proxy - and "status.proxy_reconnect" in self.deltaproxy_objs[minion_id].functions - and proxyopts.get("proxy_keep_alive", True) - ): - # if `proxy_keep_alive` is either not specified, either set to False does not retry reconnecting - _proxy_minion.schedule.add_job( - { - "__proxy_keepalive": { - "function": "status.proxy_reconnect", - "minutes": proxyopts.get( - "proxy_keep_alive_interval", 1 - ), # by default, check once per minute - "jid_include": True, - "maxrunning": 1, - "return_job": False, - "kwargs": {"proxy_name": _fq_proxyname}, - } - }, - persist=True, - ) - _proxy_minion.schedule.enable_schedule() - else: - _proxy_minion.schedule.delete_job("__proxy_keepalive", persist=True) - - return (_proxy_minion, proxyopts) + # _proxy_alive_fn = _fq_proxyname + ".alive" + # if ( + # _proxy_alive_fn in _proxy_minion.proxy + # and "status.proxy_reconnect" in self.deltaproxy_objs[minion_id].functions + # and proxyopts.get("proxy_keep_alive", True) + # ): + # # if `proxy_keep_alive` is either not specified, either set to False does not retry reconnecting + # _proxy_minion.schedule.add_job( + # { + # "__proxy_keepalive": { + # "function": "status.proxy_reconnect", + # "minutes": proxyopts.get( + # "proxy_keep_alive_interval", 1 + # ), # by default, check once per minute + # "jid_include": True, + # "maxrunning": 1, + # "return_job": False, + # "kwargs": {"proxy_name": _fq_proxyname}, + # } + # }, + # persist=True, + # ) + # _proxy_minion.schedule.enable_schedule() + # else: + # _proxy_minion.schedule.delete_job("__proxy_keepalive", persist=True) + + return {"proxy_minion": _proxy_minion, "proxy_opts": proxyopts} def target(cls, minion_instance, opts, data, connected): From 985f12d5492b90a54ae54e40310ddc23232f765c Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Sun, 18 Sep 2022 18:17:00 -0700 Subject: [PATCH 03/13] add req_channel fix. --- salt/metaproxy/deltaproxy.py | 28 +++++++------ salt/minion.py | 6 +-- tests/pytests/conftest.py | 2 + tests/pytests/integration/proxy/conftest.py | 40 ++++++++++++++----- .../integration/proxy/test_deltaproxy.py | 16 +++++++- 5 files changed, 64 insertions(+), 28 deletions(-) diff --git a/salt/metaproxy/deltaproxy.py b/salt/metaproxy/deltaproxy.py index 6fe8e2c80427..31c2bc670a66 100644 --- a/salt/metaproxy/deltaproxy.py +++ b/salt/metaproxy/deltaproxy.py @@ -335,6 +335,11 @@ def post_master_init(self, master): self.deltaproxy_objs[minion_id] = sub_proxy_data["proxy_minion"] self.deltaproxy_objs[minion_id].subprocess_list = self.subprocess_list + self.deltaproxy_objs[ + minion_id + ].req_channel = salt.transport.client.AsyncReqChannel.factory( + sub_proxy_data["proxy_opts"], io_loop=self.io_loop + ) self.ready = True @@ -467,19 +472,16 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils): # ) # proxyopts["grains"] = self.proxy_grains[minion_id] - # if not hasattr(_proxy_minion, "schedule"): - # _proxy_minion.schedule = salt.utils.schedule.Schedule( - # proxyopts, - # _proxy_minion.functions, - # _proxy_minion.returners, - # cleanup=[salt.minion.master_event(type="alive")], - # proxy=_proxy_minion.proxy, - # new_instance=True, - # _subprocess_list=_proxy_minion.subprocess_list, - # ) - - # self.deltaproxy_objs[minion_id] = _proxy_minion - # self.deltaproxy_opts[minion_id] = copy.deepcopy(proxyopts) + if not hasattr(_proxy_minion, "schedule"): + _proxy_minion.schedule = salt.utils.schedule.Schedule( + proxyopts, + _proxy_minion.functions, + _proxy_minion.returners, + cleanup=[salt.minion.master_event(type="alive")], + proxy=_proxy_minion.proxy, + new_instance=True, + _subprocess_list=_proxy_minion.subprocess_list, + ) # proxy keepalive # _proxy_alive_fn = _fq_proxyname + ".alive" diff --git a/salt/minion.py b/salt/minion.py index 835e1315428f..f12b44a6d44f 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -2693,10 +2693,10 @@ def handle_event(self, package): notify=data.get("notify", False), ) elif tag.startswith("__master_req_channel_payload"): - yield self.req_channel.send( + yield _minion.req_channel.send( data, - timeout=self._return_retry_timer(), - tries=self.opts["return_retry_tries"], + timeout=_minion._return_retry_timer(), + tries=_minion.opts["return_retry_tries"], ) elif tag.startswith("pillar_refresh"): yield _minion.pillar_refresh( diff --git a/tests/pytests/conftest.py b/tests/pytests/conftest.py index 4549d36b8d54..4305cc192867 100644 --- a/tests/pytests/conftest.py +++ b/tests/pytests/conftest.py @@ -606,6 +606,8 @@ def delta_proxy_minion_ids(): return [ "dummy_proxy_one", "dummy_proxy_two", + "dummy_proxy_three", + "dummy_proxy_four", ] diff --git a/tests/pytests/integration/proxy/conftest.py b/tests/pytests/integration/proxy/conftest.py index 6c2ae1e03164..7406e59f5995 100644 --- a/tests/pytests/integration/proxy/conftest.py +++ b/tests/pytests/integration/proxy/conftest.py @@ -16,7 +16,13 @@ def deltaproxy_pillar_tree(salt_master, salt_delta_proxy_factory): """ Create the pillar files for controlproxy and two dummy proxy minions """ - proxy_one, proxy_two = pytest.helpers.proxy.delta_proxy_minion_ids() + ( + proxy_one, + proxy_two, + proxy_three, + proxy_four, + ) = pytest.helpers.proxy.delta_proxy_minion_ids() + top_file = """ base: {control}: @@ -25,10 +31,16 @@ def deltaproxy_pillar_tree(salt_master, salt_delta_proxy_factory): - {one} {two}: - {two} + {three}: + - {three} + {four}: + - {four} """.format( control=salt_delta_proxy_factory.id, one=proxy_one, two=proxy_two, + three=proxy_three, + four=proxy_four, ) controlproxy_pillar_file = """ proxy: @@ -36,16 +48,16 @@ def deltaproxy_pillar_tree(salt_master, salt_delta_proxy_factory): ids: - {} - {} + - {} + - {} """.format( - proxy_one, proxy_two + proxy_one, + proxy_two, + proxy_three, + proxy_four, ) - dummy_proxy_one_pillar_file = """ - proxy: - proxytype: dummy - """ - - dummy_proxy_two_pillar_file = """ + dummy_proxy_pillar_file = """ proxy: proxytype: dummy """ @@ -55,12 +67,18 @@ def deltaproxy_pillar_tree(salt_master, salt_delta_proxy_factory): "controlproxy.sls", controlproxy_pillar_file ) dummy_proxy_one_tempfile = salt_master.pillar_tree.base.temp_file( - "{}.sls".format(proxy_one), dummy_proxy_one_pillar_file + "{}.sls".format(proxy_one), dummy_proxy_pillar_file ) dummy_proxy_two_tempfile = salt_master.pillar_tree.base.temp_file( - "{}.sls".format(proxy_two), dummy_proxy_two_pillar_file + "{}.sls".format(proxy_two), dummy_proxy_pillar_file + ) + dummy_proxy_three_tempfile = salt_master.pillar_tree.base.temp_file( + "{}.sls".format(proxy_three), dummy_proxy_pillar_file + ) + dummy_proxy_four_tempfile = salt_master.pillar_tree.base.temp_file( + "{}.sls".format(proxy_four), dummy_proxy_pillar_file ) - with top_tempfile, controlproxy_tempfile, dummy_proxy_one_tempfile, dummy_proxy_two_tempfile: + with top_tempfile, controlproxy_tempfile, dummy_proxy_one_tempfile, dummy_proxy_two_tempfile, dummy_proxy_three_tempfile, dummy_proxy_four_tempfile: yield diff --git a/tests/pytests/integration/proxy/test_deltaproxy.py b/tests/pytests/integration/proxy/test_deltaproxy.py index 565014f31bc6..01231334c990 100644 --- a/tests/pytests/integration/proxy/test_deltaproxy.py +++ b/tests/pytests/integration/proxy/test_deltaproxy.py @@ -25,7 +25,12 @@ def proxy_id(request, salt_delta_proxy, skip_on_tcp_transport): return request.param -def test_can_it_ping(salt_cli, proxy_id): +@pytest.fixture +def proxy_ids(request, salt_delta_proxy, skip_on_tcp_transport): + return pytest.helpers.proxy.delta_proxy_minion_ids() + + +def test_can_it_ping(salt_cli, proxy_id, proxy_ids): """ Ensure the proxy can ping """ @@ -33,6 +38,15 @@ def test_can_it_ping(salt_cli, proxy_id): assert ret.data is True +def test_can_it_ping_all(salt_cli, proxy_ids): + """ + Ensure the proxy can ping (all proxy minions) + """ + ret = salt_cli.run("-L", "test.ping", minion_tgt=",".join(proxy_ids)) + for _id in proxy_ids: + assert ret.data[_id] is True + + def test_list_pkgs(salt_cli, proxy_id): """ Package test 1, really just tests that the virtual function capability From d6ab6d62d644154bf9563cc899c7990d55410cec Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Mon, 19 Sep 2022 15:42:21 -0700 Subject: [PATCH 04/13] Only add sub proxy to the list if the post_master_init succeeds. Add concurrency to the tune in functions for the sub proxies. --- salt/metaproxy/deltaproxy.py | 113 ++++++++++++++++++++--------------- 1 file changed, 64 insertions(+), 49 deletions(-) diff --git a/salt/metaproxy/deltaproxy.py b/salt/metaproxy/deltaproxy.py index 31c2bc670a66..b561f19db71b 100644 --- a/salt/metaproxy/deltaproxy.py +++ b/salt/metaproxy/deltaproxy.py @@ -331,15 +331,16 @@ def post_master_init(self, master): for f in concurrent.futures.as_completed(futures): sub_proxy_data = f.result() minion_id = sub_proxy_data["proxy_opts"].get("id") + self.deltaproxy_opts[minion_id] = sub_proxy_data["proxy_opts"] self.deltaproxy_objs[minion_id] = sub_proxy_data["proxy_minion"] - self.deltaproxy_objs[minion_id].subprocess_list = self.subprocess_list - self.deltaproxy_objs[ - minion_id - ].req_channel = salt.transport.client.AsyncReqChannel.factory( - sub_proxy_data["proxy_opts"], io_loop=self.io_loop - ) + if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]: + self.deltaproxy_objs[ + minion_id + ].req_channel = salt.transport.client.AsyncReqChannel.factory( + sub_proxy_data["proxy_opts"], io_loop=self.io_loop + ) self.ready = True @@ -352,12 +353,11 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils): This is primarily loading modules, pillars, etc. (since they need to know which master they connected to) for the sub proxy minions. """ - log.info("=== starting %s ===", minion_id) proxy_grains = {} proxy_pillar = {} - control_id = opts["id"] proxyopts = opts.copy() + proxyopts["id"] = minion_id proxyopts = salt.config.proxy_config( opts["conf_file"], defaults=proxyopts, minion_id=minion_id @@ -368,18 +368,18 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils): # We need grains first to be able to load pillar, which is where we keep the proxy # configurations - proxy_grains[minion_id] = salt.loader.grains( + proxy_grains = salt.loader.grains( proxyopts, proxy=main_proxy, context=proxy_context ) - proxy_pillar[minion_id] = salt.pillar.get_pillar( + proxy_pillar = salt.pillar.get_pillar( proxyopts, - proxy_grains[minion_id], + proxy_grains, minion_id, saltenv=proxyopts["saltenv"], pillarenv=proxyopts.get("pillarenv"), ).compile_pillar() - proxyopts["proxy"] = proxy_pillar[minion_id].get("proxy", {}) + proxyopts["proxy"] = proxy_pillar.get("proxy", {}) if not proxyopts["proxy"]: log.warning( "Pillar data for proxy minion %s could not be loaded, skipping.", minion_id @@ -391,8 +391,8 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils): proxyopts.update( { - "pillar": proxy_pillar[minion_id], - "grains": proxy_grains[minion_id], + "pillar": proxy_pillar, + "grains": proxy_grains, "hash_id": opts["id"], } ) @@ -403,7 +403,6 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils): _proxy_minion.proxy = salt.loader.proxy( proxyopts, utils=main_utils, context=proxy_context ) - # _proxy_minion.subprocess_list = self.subprocess_list # And load the modules ( @@ -467,10 +466,10 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils): return {"proxy_minion": None, "proxy_opts": None} # Reload the grains - # self.proxy_grains[minion_id] = salt.loader.grains( - # proxyopts, proxy=_proxy_minion.proxy, context=self.proxy_context[minion_id] - # ) - # proxyopts["grains"] = self.proxy_grains[minion_id] + proxy_grains = salt.loader.grains( + proxyopts, proxy=_proxy_minion.proxy, context=proxy_context + ) + proxyopts["grains"] = proxy_grains if not hasattr(_proxy_minion, "schedule"): _proxy_minion.schedule = salt.utils.schedule.Schedule( @@ -484,31 +483,31 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils): ) # proxy keepalive - # _proxy_alive_fn = _fq_proxyname + ".alive" - # if ( - # _proxy_alive_fn in _proxy_minion.proxy - # and "status.proxy_reconnect" in self.deltaproxy_objs[minion_id].functions - # and proxyopts.get("proxy_keep_alive", True) - # ): - # # if `proxy_keep_alive` is either not specified, either set to False does not retry reconnecting - # _proxy_minion.schedule.add_job( - # { - # "__proxy_keepalive": { - # "function": "status.proxy_reconnect", - # "minutes": proxyopts.get( - # "proxy_keep_alive_interval", 1 - # ), # by default, check once per minute - # "jid_include": True, - # "maxrunning": 1, - # "return_job": False, - # "kwargs": {"proxy_name": _fq_proxyname}, - # } - # }, - # persist=True, - # ) - # _proxy_minion.schedule.enable_schedule() - # else: - # _proxy_minion.schedule.delete_job("__proxy_keepalive", persist=True) + _proxy_alive_fn = _fq_proxyname + ".alive" + if ( + _proxy_alive_fn in _proxy_minion.proxy + and "status.proxy_reconnect" in _proxy_minion.functions + and proxyopts.get("proxy_keep_alive", True) + ): + # if `proxy_keep_alive` is either not specified, either set to False does not retry reconnecting + _proxy_minion.schedule.add_job( + { + "__proxy_keepalive": { + "function": "status.proxy_reconnect", + "minutes": proxyopts.get( + "proxy_keep_alive_interval", 1 + ), # by default, check once per minute + "jid_include": True, + "maxrunning": 1, + "return_job": False, + "kwargs": {"proxy_name": _fq_proxyname}, + } + }, + persist=True, + ) + _proxy_minion.schedule.enable_schedule() + else: + _proxy_minion.schedule.delete_job("__proxy_keepalive", persist=True) return {"proxy_minion": _proxy_minion, "proxy_opts": proxyopts} @@ -1060,9 +1059,25 @@ def tune_in(self, start=True): Lock onto the publisher. This is the main event loop for the minion :rtype : None """ - for proxy_id in self.deltaproxy_objs: - _proxy_minion = self.deltaproxy_objs[proxy_id] - _proxy_minion.setup_scheduler() - _proxy_minion.setup_beacons() - _proxy_minion._state_run() + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit(subproxy_tune_in, self.deltaproxy_objs[proxy_minion]) + for proxy_minion in self.deltaproxy_objs + ] + + for f in concurrent.futures.as_completed(futures): + _proxy_minion = f.result() + log.debug("Tune in for sub proxy %r finished", _proxy_minion.opts.get("id")) super(ProxyMinion, self).tune_in(start=start) + + +def subproxy_tune_in(proxy_minion, start=True): + """ + Tunein sub proxy minions + """ + proxy_minion.setup_scheduler() + proxy_minion.setup_beacons() + proxy_minion.add_periodic_callback("cleanup", proxy_minion.cleanup_subprocesses) + proxy_minion._state_run() + + return proxy_minion From aec8c0bdb5dfd19eb8677b41521a035324b41421 Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Tue, 20 Sep 2022 12:18:59 -0700 Subject: [PATCH 05/13] Adding context to ssh_sample proxy. --- salt/proxy/ssh_sample.py | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/salt/proxy/ssh_sample.py b/salt/proxy/ssh_sample.py index 6e7e31aa8787..a952f52199d5 100644 --- a/salt/proxy/ssh_sample.py +++ b/salt/proxy/ssh_sample.py @@ -14,8 +14,6 @@ # This must be present or the Salt loader won't load this module __proxyenabled__ = ["ssh_sample"] -DETAILS = {} - log = logging.getLogger(__file__) @@ -36,13 +34,13 @@ def init(opts): Can be used to initialize the server connection. """ try: - DETAILS["server"] = SSHConnection( + __context__["server"] = SSHConnection( host=__opts__["proxy"]["host"], username=__opts__["proxy"]["username"], password=__opts__["proxy"]["password"], ) - out, err = DETAILS["server"].sendline("help") - DETAILS["initialized"] = True + out, err = __context__["server"].sendline("help") + __context__["initialized"] = True except TerminalException as e: log.error(e) @@ -55,7 +53,7 @@ def initialized(): places occur before the proxy can be initialized, return whether our init() function has been called """ - return DETAILS.get("initialized", False) + return __context__.get("initialized", False) def grains(): @@ -63,23 +61,23 @@ def grains(): Get the grains from the proxied device """ - if not DETAILS.get("grains_cache", {}): + if not __context__.get("grains_cache", {}): cmd = "info" # Send the command to execute - out, err = DETAILS["server"].sendline(cmd) + out, err = __context__["server"].sendline(cmd) # "scrape" the output and return the right fields as a dict - DETAILS["grains_cache"] = parse(out) + __context__["grains_cache"] = parse(out) - return DETAILS["grains_cache"] + return __context__["grains_cache"] def grains_refresh(): """ Refresh the grains from the proxied device """ - DETAILS["grains_cache"] = None + __context__["grains_cache"] = None return grains() @@ -101,7 +99,7 @@ def ping(): Ping the device on the other end of the connection """ try: - out, err = DETAILS["server"].sendline("help") + out, err = __context__["server"].sendline("help") return True except TerminalException as e: log.error(e) @@ -112,7 +110,7 @@ def shutdown(opts): """ Disconnect """ - DETAILS["server"].close_connection() + __context__["server"].close_connection() def parse(out): @@ -146,7 +144,7 @@ def package_list(): """ # Send the command to execute - out, err = DETAILS["server"].sendline("pkg_list\n") + out, err = __context__["server"].sendline("pkg_list\n") # "scrape" the output and return the right fields as a dict return parse(out) @@ -161,7 +159,7 @@ def package_install(name, **kwargs): cmd += " " + kwargs["version"] # Send the command to execute - out, err = DETAILS["server"].sendline(cmd) + out, err = __context__["server"].sendline(cmd) # "scrape" the output and return the right fields as a dict return parse(out) @@ -174,7 +172,7 @@ def package_remove(name): cmd = "pkg_remove " + name # Send the command to execute - out, err = DETAILS["server"].sendline(cmd) + out, err = __context__["server"].sendline(cmd) # "scrape" the output and return the right fields as a dict return parse(out) @@ -189,7 +187,7 @@ def service_list(): cmd = "ps" # Send the command to execute - out, err = DETAILS["server"].sendline(cmd) + out, err = __context__["server"].sendline(cmd) # "scrape" the output and return the right fields as a dict return parse(out) @@ -204,7 +202,7 @@ def service_start(name): cmd = "start " + name # Send the command to execute - out, err = DETAILS["server"].sendline(cmd) + out, err = __context__["server"].sendline(cmd) # "scrape" the output and return the right fields as a dict return parse(out) @@ -219,7 +217,7 @@ def service_stop(name): cmd = "stop " + name # Send the command to execute - out, err = DETAILS["server"].sendline(cmd) + out, err = __context__["server"].sendline(cmd) # "scrape" the output and return the right fields as a dict return parse(out) @@ -234,7 +232,7 @@ def service_restart(name): cmd = "restart " + name # Send the command to execute - out, err = DETAILS["server"].sendline(cmd) + out, err = __context__["server"].sendline(cmd) # "scrape" the output and return the right fields as a dict return parse(out) From d2ae4050944885c7b36988db58f9b16077ec4703 Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Thu, 27 Oct 2022 14:58:10 -0700 Subject: [PATCH 06/13] Updating ssh_sample proxy to aid in testing. Adding unit tests for ssh_sample proxy. --- salt/proxy/ssh_sample.py | 4 +- tests/pytests/unit/proxy/test_ssh_sample.py | 269 ++++++++++++++++++++ 2 files changed, 271 insertions(+), 2 deletions(-) create mode 100644 tests/pytests/unit/proxy/test_ssh_sample.py diff --git a/salt/proxy/ssh_sample.py b/salt/proxy/ssh_sample.py index a952f52199d5..eb4c975d6acd 100644 --- a/salt/proxy/ssh_sample.py +++ b/salt/proxy/ssh_sample.py @@ -8,8 +8,8 @@ import logging import salt.utils.json +import salt.utils.vt_helper from salt.utils.vt import TerminalException -from salt.utils.vt_helper import SSHConnection # This must be present or the Salt loader won't load this module __proxyenabled__ = ["ssh_sample"] @@ -34,7 +34,7 @@ def init(opts): Can be used to initialize the server connection. """ try: - __context__["server"] = SSHConnection( + __context__["server"] = salt.utils.vt_helper.SSHConnection( host=__opts__["proxy"]["host"], username=__opts__["proxy"]["username"], password=__opts__["proxy"]["password"], diff --git a/tests/pytests/unit/proxy/test_ssh_sample.py b/tests/pytests/unit/proxy/test_ssh_sample.py new file mode 100644 index 000000000000..58bfb30eb6bd --- /dev/null +++ b/tests/pytests/unit/proxy/test_ssh_sample.py @@ -0,0 +1,269 @@ +""" + :codeauthor: Gareth J. Greenaway +""" + +import copy +import logging + +import pytest +from saltfactories.utils import random_string + +import salt.proxy.ssh_sample as ssh_sample_proxy +from salt.utils.vt import TerminalException +from tests.support.mock import MagicMock, patch + +log = logging.getLogger(__name__) + + +@pytest.fixture(scope="module") +def proxy_minion_config_module(salt_master_factory): + factory = salt_master_factory.salt_proxy_minion_daemon( + random_string("proxy-minion-"), + ) + return factory.config + + +@pytest.fixture +def proxy_minion_config(proxy_minion_config_module): + + minion_config = copy.deepcopy(proxy_minion_config_module) + minion_config["proxy"]["proxytype"] = "ssh_sample" + minion_config["proxy"]["host"] = "localhost" + minion_config["proxy"]["username"] = "username" + minion_config["proxy"]["password"] = "password" + return minion_config + + +@pytest.fixture +def configure_loader_modules(): + return {ssh_sample_proxy: {}} + + +class MockSSHConnection: + def __init__(self, *args, **kwargs): + return None + + def sendline(self, *args, **kwargs): + return "", "" + + +def test_init(proxy_minion_config): + """ + check ssh_sample_proxy init method + """ + + with patch( + "salt.utils.vt_helper.SSHConnection", + MagicMock(autospec=True, return_value=MockSSHConnection()), + ): + with patch.dict(ssh_sample_proxy.__opts__, proxy_minion_config): + ssh_sample_proxy.init(proxy_minion_config) + assert "server" in ssh_sample_proxy.__context__ + assert "initialized" in ssh_sample_proxy.__context__ + assert ssh_sample_proxy.__context__["initialized"] + + +def test_initialized(proxy_minion_config): + """ + check ssh_sample_proxy initialized method + """ + + with patch( + "salt.utils.vt_helper.SSHConnection", + MagicMock(autospec=True, return_value=MockSSHConnection()), + ): + with patch.dict(ssh_sample_proxy.__opts__, proxy_minion_config): + ssh_sample_proxy.init(proxy_minion_config) + + ret = ssh_sample_proxy.initialized() + assert ret + + +def test_grains(proxy_minion_config): + """ + check ssh_sample_proxy grains method + """ + + GRAINS_INFO = """{ + "os": "SshExampleOS", + "kernel": "0.0000001", + "housecat": "Are you kidding?" +} +""" + mock_sendline = MagicMock(autospec=True, side_effect=[("", ""), (GRAINS_INFO, "")]) + with patch.object(MockSSHConnection, "sendline", mock_sendline): + with patch( + "salt.utils.vt_helper.SSHConnection", + MagicMock(autospec=True, return_value=MockSSHConnection()), + ): + with patch.dict(ssh_sample_proxy.__opts__, proxy_minion_config): + ssh_sample_proxy.init(proxy_minion_config) + + ret = ssh_sample_proxy.grains() + assert "os" in ret + assert "kernel" in ret + assert "housecat" in ret + + assert ret["os"] == "SshExampleOS" + assert ret["kernel"] == "0.0000001" + assert ret["housecat"] == "Are you kidding?" + + # Read from __context__ cache + mock_sendline = MagicMock(autospec=True, side_effect=[("", ""), (GRAINS_INFO, "")]) + mock_context = { + "grains_cache": { + "os": "SSH-ExampleOS", + "kernel": "0.0000002", + "dog": "Not kidding.", + } + } + + with patch.object(MockSSHConnection, "sendline", mock_sendline): + with patch( + "salt.utils.vt_helper.SSHConnection", + MagicMock(autospec=True, return_value=MockSSHConnection()), + ): + with patch.dict(ssh_sample_proxy.__opts__, proxy_minion_config): + with patch.dict(ssh_sample_proxy.__context__, mock_context): + ssh_sample_proxy.init(proxy_minion_config) + + ret = ssh_sample_proxy.grains() + assert "os" in ret + assert "kernel" in ret + assert "dog" in ret + + assert ret["os"] == "SSH-ExampleOS" + assert ret["kernel"] == "0.0000002" + assert ret["dog"] == "Not kidding." + + +def test_grains_refresh(proxy_minion_config): + """ + check ssh_sample_proxy grains_refresh method + """ + + GRAINS_INFO = """{ + "os": "SshExampleOS", + "kernel": "0.0000001", + "housecat": "Are you kidding?" +} +""" + + mock_sendline = MagicMock(autospec=True, side_effect=[("", ""), (GRAINS_INFO, "")]) + mock_context = { + "grains_cache": { + "os": "SSH-ExampleOS", + "kernel": "0.0000002", + "dog": "Not kidding.", + } + } + + with patch.object(MockSSHConnection, "sendline", mock_sendline): + with patch( + "salt.utils.vt_helper.SSHConnection", + MagicMock(autospec=True, return_value=MockSSHConnection()), + ): + with patch.dict(ssh_sample_proxy.__opts__, proxy_minion_config): + with patch.dict(ssh_sample_proxy.__context__, mock_context): + ssh_sample_proxy.init(proxy_minion_config) + + ret = ssh_sample_proxy.grains_refresh() + assert "os" in ret + assert "kernel" in ret + assert "housecat" in ret + + assert ret["os"] == "SshExampleOS" + assert ret["kernel"] == "0.0000001" + assert ret["housecat"] == "Are you kidding?" + + +def test_fns(): + """ + check ssh_sample_proxy fns method + """ + + ret = ssh_sample_proxy.fns() + + assert "details" in ret + assert ret["details"] == ( + "This key is here because a function in " + "grains/ssh_sample.py called fns() here in the proxymodule." + ) + + +def test_ping(proxy_minion_config): + """ + check ssh_sample_proxy ping method + """ + + mock_sendline = MagicMock(autospec=True, side_effect=[("", ""), ("", "")]) + with patch.object(MockSSHConnection, "sendline", mock_sendline): + with patch( + "salt.utils.vt_helper.SSHConnection", + MagicMock(autospec=True, return_value=MockSSHConnection()), + ): + with patch.dict(ssh_sample_proxy.__opts__, proxy_minion_config): + ssh_sample_proxy.init(proxy_minion_config) + + ret = ssh_sample_proxy.ping() + assert ret + + mock_sendline = MagicMock(autospec=True, side_effect=[("", ""), TerminalException]) + with patch.object(MockSSHConnection, "sendline", mock_sendline): + with patch( + "salt.utils.vt_helper.SSHConnection", + MagicMock(autospec=True, return_value=MockSSHConnection()), + ): + with patch.dict(ssh_sample_proxy.__opts__, proxy_minion_config): + ssh_sample_proxy.init(proxy_minion_config) + ret = ssh_sample_proxy.ping() + assert not ret + + +def test_package_list(proxy_minion_config): + """ + check ssh_sample_proxy package_list method + """ + + PKG_LIST = """{ + "coreutils": "1.05" +} +""" + + mock_sendline = MagicMock(autospec=True, side_effect=[("", ""), (PKG_LIST, "")]) + with patch.object(MockSSHConnection, "sendline", mock_sendline): + with patch( + "salt.utils.vt_helper.SSHConnection", + MagicMock(autospec=True, return_value=MockSSHConnection()), + ): + with patch.dict(ssh_sample_proxy.__opts__, proxy_minion_config): + ssh_sample_proxy.init(proxy_minion_config) + + ret = ssh_sample_proxy.package_list() + assert ret + assert "coreutils" in ret + assert ret["coreutils"] == "1.05" + + +def test_package_install(proxy_minion_config): + """ + check ssh_sample_proxy package_list method + """ + PKG_INSTALL = """{ + "redbull": "1.0" +} +""" + + mock_sendline = MagicMock(autospec=True, side_effect=[("", ""), (PKG_INSTALL, "")]) + with patch.object(MockSSHConnection, "sendline", mock_sendline): + with patch( + "salt.utils.vt_helper.SSHConnection", + MagicMock(autospec=True, return_value=MockSSHConnection()), + ): + with patch.dict(ssh_sample_proxy.__opts__, proxy_minion_config): + ssh_sample_proxy.init(proxy_minion_config) + + ret = ssh_sample_proxy.package_install("redbull") + assert ret + assert "redbull" in ret + assert ret["redbull"] == "1.0" From 061586faf09799d73a23ce25580cdbb137802279 Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Thu, 27 Oct 2022 16:58:17 -0700 Subject: [PATCH 07/13] Adding some more tests for deltaproxy functionality. --- .../integration/proxy/test_deltaproxy.py | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/tests/pytests/integration/proxy/test_deltaproxy.py b/tests/pytests/integration/proxy/test_deltaproxy.py index 01231334c990..cf33e558d27e 100644 --- a/tests/pytests/integration/proxy/test_deltaproxy.py +++ b/tests/pytests/integration/proxy/test_deltaproxy.py @@ -58,6 +58,16 @@ def test_list_pkgs(salt_cli, proxy_id): assert "redbull" in ret.data +def test_list_pkgs_all(salt_cli, proxy_ids): + """ + Ensure the proxy can ping (all proxy minions) + """ + pkg_list = {"apache": "2.4", "coreutils": "1.0", "redbull": "999.99", "tinc": "1.4"} + ret = salt_cli.run("-L", "pkg.list_pkgs", minion_tgt=",".join(proxy_ids)) + for _id in proxy_ids: + assert ret.data[_id] == pkg_list + + def test_install_pkgs(salt_cli, proxy_id): """ Package test 2, really just tests that the virtual function capability @@ -74,11 +84,39 @@ def test_install_pkgs(salt_cli, proxy_id): assert ret.data["thispkg"] == "1.0" +def test_install_pkgs_all(salt_cli, proxy_ids): + """ + Ensure the proxy can ping (all proxy minions) + """ + install_ret = salt_cli.run( + "-L", "pkg.install", "thispkg", minion_tgt=",".join(proxy_ids) + ) + list_ret = salt_cli.run("-L", "pkg.list_pkgs", minion_tgt=",".join(proxy_ids)) + + for _id in proxy_ids: + + assert install_ret.data[_id]["thispkg"] == "1.0" + + assert list_ret.data[_id]["apache"] == "2.4" + assert list_ret.data[_id]["redbull"] == "999.99" + assert list_ret.data[_id]["thispkg"] == "1.0" + + def test_remove_pkgs(salt_cli, proxy_id): ret = salt_cli.run("pkg.remove", "apache", minion_tgt=proxy_id) assert "apache" not in ret.data +def test_remove_pkgs_all(salt_cli, proxy_ids): + """ + Ensure the proxy can ping (all proxy minions) + """ + ret = salt_cli.run("-L", "pkg.remove", "apache", minion_tgt=",".join(proxy_ids)) + + for _id in proxy_ids: + assert "apache" not in ret.data[_id] + + def test_upgrade(salt_cli, proxy_id): ret = salt_cli.run("pkg.upgrade", minion_tgt=proxy_id) assert ret.data["coreutils"]["new"] == "2.0" @@ -166,3 +204,58 @@ def test_config_get(salt_cli, proxy_id): """ ret = salt_cli.run("config.get", "id", minion_tgt=proxy_id) assert ret.data == proxy_id + + +def test_schedule_list(salt_cli, proxy_id): + """ + Ensure schedule.list works + """ + ret = salt_cli.run("schedule.list", minion_tgt=proxy_id) + assert ret.data == "schedule: {}\n" + + +def test_schedule_add_list(salt_cli, proxy_id): + """ + Ensure schedule.add works + """ + ret = salt_cli.run( + "schedule.add", name="job1", function="test.ping", minion_tgt=proxy_id + ) + assert "result" in ret.data + assert ret.data["result"] + + assert "comment" in ret.data + assert ret.data["comment"] == "Added job: job1 to schedule." + + assert "changes" in ret.data + assert ret.data["changes"] == {"job1": "added"} + + _expected = """schedule: + job1: + enabled: true + function: test.ping + jid_include: true + maxrunning: 1 + name: job1 + saved: true +""" + ret = salt_cli.run("schedule.list", minion_tgt=proxy_id) + assert ret.data == _expected + + +def test_schedule_add_list_all(salt_cli, proxy_ids): + """ + Ensure schedule.add works when targeting a single minion + and that the others are not affected. + """ + ret = salt_cli.run( + "schedule.add", name="job1", function="test.ping", minion_tgt=proxy_ids[0] + ) + assert "result" in ret.data + assert ret.data["result"] + + ret = salt_cli.run("-L", "schedule.list", minion_tgt=",".join(proxy_ids)) + + # check every proxy except the first one + for _id in proxy_ids[1:]: + assert ret.data[_id] == "schedule: {}\n" From 3eaf3903cfdff573ba3cbe777e5149080ce2451b Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Fri, 28 Oct 2022 16:55:20 -0700 Subject: [PATCH 08/13] fixing failing tests. --- .../integration/proxy/test_deltaproxy.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/tests/pytests/integration/proxy/test_deltaproxy.py b/tests/pytests/integration/proxy/test_deltaproxy.py index cf33e558d27e..4ea69e0b8a51 100644 --- a/tests/pytests/integration/proxy/test_deltaproxy.py +++ b/tests/pytests/integration/proxy/test_deltaproxy.py @@ -106,15 +106,21 @@ def test_remove_pkgs(salt_cli, proxy_id): ret = salt_cli.run("pkg.remove", "apache", minion_tgt=proxy_id) assert "apache" not in ret.data + # reinstall + ret = salt_cli.run("pkg.install", "apache", minion_tgt=proxy_id) + def test_remove_pkgs_all(salt_cli, proxy_ids): """ Ensure the proxy can ping (all proxy minions) """ - ret = salt_cli.run("-L", "pkg.remove", "apache", minion_tgt=",".join(proxy_ids)) + ret = salt_cli.run("-L", "pkg.remove", "coreutils", minion_tgt=",".join(proxy_ids)) for _id in proxy_ids: - assert "apache" not in ret.data[_id] + assert "coreutils" not in ret.data[_id] + + # reinstall + salt_cli.run("-L", "pkg.install", "coreutils", minion_tgt=",".join(proxy_ids)) def test_upgrade(salt_cli, proxy_id): @@ -242,6 +248,9 @@ def test_schedule_add_list(salt_cli, proxy_id): ret = salt_cli.run("schedule.list", minion_tgt=proxy_id) assert ret.data == _expected + # clean out the scheduler + salt_cli.run("schedule.purge", minion_tgt=proxy_id) + def test_schedule_add_list_all(salt_cli, proxy_ids): """ @@ -249,7 +258,7 @@ def test_schedule_add_list_all(salt_cli, proxy_ids): and that the others are not affected. """ ret = salt_cli.run( - "schedule.add", name="job1", function="test.ping", minion_tgt=proxy_ids[0] + "schedule.add", name="job2", function="test.ping", minion_tgt=proxy_ids[0] ) assert "result" in ret.data assert ret.data["result"] @@ -259,3 +268,6 @@ def test_schedule_add_list_all(salt_cli, proxy_ids): # check every proxy except the first one for _id in proxy_ids[1:]: assert ret.data[_id] == "schedule: {}\n" + + # clean out the scheduler + salt_cli.run("-L", "schedule.purge", minion_tgt=",".join(proxy_ids)) From f769a1c4bb7fe6a4366982a80308c9be53525ed9 Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Mon, 31 Oct 2022 11:29:10 -0700 Subject: [PATCH 09/13] Only add the sub proxy to the list of sub proxies if the proxy minion object is valid, eg. pillar data was unavailable so the minion could not be loaded. --- salt/metaproxy/deltaproxy.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/salt/metaproxy/deltaproxy.py b/salt/metaproxy/deltaproxy.py index b561f19db71b..7ff305ca4bfc 100644 --- a/salt/metaproxy/deltaproxy.py +++ b/salt/metaproxy/deltaproxy.py @@ -332,15 +332,16 @@ def post_master_init(self, master): sub_proxy_data = f.result() minion_id = sub_proxy_data["proxy_opts"].get("id") - self.deltaproxy_opts[minion_id] = sub_proxy_data["proxy_opts"] - self.deltaproxy_objs[minion_id] = sub_proxy_data["proxy_minion"] - - if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]: - self.deltaproxy_objs[ - minion_id - ].req_channel = salt.transport.client.AsyncReqChannel.factory( - sub_proxy_data["proxy_opts"], io_loop=self.io_loop - ) + if sub_proxy_data["proxy_minion"]: + self.deltaproxy_opts[minion_id] = sub_proxy_data["proxy_opts"] + self.deltaproxy_objs[minion_id] = sub_proxy_data["proxy_minion"] + + if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]: + self.deltaproxy_objs[ + minion_id + ].req_channel = salt.transport.client.AsyncReqChannel.factory( + sub_proxy_data["proxy_opts"], io_loop=self.io_loop + ) self.ready = True @@ -384,7 +385,7 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils): log.warning( "Pillar data for proxy minion %s could not be loaded, skipping.", minion_id ) - return {"proxy_minion": None, "proxy_opts": None} + return {"proxy_minion": None, "proxy_opts": {}} # Remove ids proxyopts["proxy"].pop("ids", None) From ab558db0b81e05409703be25b259683aa10034b7 Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Mon, 31 Oct 2022 16:25:43 -0700 Subject: [PATCH 10/13] Changes to handle scenario when the we are unable to connect to the minion and an exception happens. --- salt/metaproxy/deltaproxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/salt/metaproxy/deltaproxy.py b/salt/metaproxy/deltaproxy.py index 7ff305ca4bfc..6ac9c0899a46 100644 --- a/salt/metaproxy/deltaproxy.py +++ b/salt/metaproxy/deltaproxy.py @@ -464,7 +464,7 @@ def subproxy_post_master_init(minion_id, uid, opts, main_proxy, main_utils): exc, exc_info=True, ) - return {"proxy_minion": None, "proxy_opts": None} + return {"proxy_minion": None, "proxy_opts": {}} # Reload the grains proxy_grains = salt.loader.grains( From bb2dd3da3b845947a560795fc486ffbcdbbf23ac Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Tue, 1 Nov 2022 12:40:25 -0700 Subject: [PATCH 11/13] Adding option to having deltaproxy sub proxies startup sequentially by default and in parallel if parallel_startup is configured on the deltaproxy. Updating tests to test with parallel and non-parallel startup. --- salt/metaproxy/deltaproxy.py | 86 +++++++++++++------ .../integration/cli/test_salt_deltaproxy.py | 27 +++++- tests/pytests/integration/proxy/conftest.py | 13 ++- 3 files changed, 95 insertions(+), 31 deletions(-) diff --git a/salt/metaproxy/deltaproxy.py b/salt/metaproxy/deltaproxy.py index 6ac9c0899a46..8db2bf5dfe1d 100644 --- a/salt/metaproxy/deltaproxy.py +++ b/salt/metaproxy/deltaproxy.py @@ -320,28 +320,55 @@ def post_master_init(self, master): self.proxy_pillar = {} self.proxy_context = {} self.add_periodic_callback("cleanup", self.cleanup_subprocesses) - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [ - executor.submit( - subproxy_post_master_init, _id, uid, self.opts, self.proxy, self.utils + + if self.opts["proxy"].get("parallel_startup"): + log.debug("Doing parallel startup") + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit( + subproxy_post_master_init, + _id, + uid, + self.opts, + self.proxy, + self.utils, + ) + for _id in self.opts["proxy"].get("ids", []) + ] + + for f in concurrent.futures.as_completed(futures): + sub_proxy_data = f.result() + minion_id = sub_proxy_data["proxy_opts"].get("id") + + if sub_proxy_data["proxy_minion"]: + self.deltaproxy_opts[minion_id] = sub_proxy_data["proxy_opts"] + self.deltaproxy_objs[minion_id] = sub_proxy_data["proxy_minion"] + + if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]: + self.deltaproxy_objs[ + minion_id + ].req_channel = salt.transport.client.AsyncReqChannel.factory( + sub_proxy_data["proxy_opts"], io_loop=self.io_loop + ) + else: + log.debug("Doing non-parallel startup") + for _id in self.opts["proxy"].get("ids", []): + sub_proxy_data = subproxy_post_master_init( + _id, uid, self.opts, self.proxy, self.utils ) - for _id in self.opts["proxy"].get("ids", []) - ] - for f in concurrent.futures.as_completed(futures): - sub_proxy_data = f.result() - minion_id = sub_proxy_data["proxy_opts"].get("id") + minion_id = sub_proxy_data["proxy_opts"].get("id") - if sub_proxy_data["proxy_minion"]: - self.deltaproxy_opts[minion_id] = sub_proxy_data["proxy_opts"] - self.deltaproxy_objs[minion_id] = sub_proxy_data["proxy_minion"] + if sub_proxy_data["proxy_minion"]: + self.deltaproxy_opts[minion_id] = sub_proxy_data["proxy_opts"] + self.deltaproxy_objs[minion_id] = sub_proxy_data["proxy_minion"] - if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]: - self.deltaproxy_objs[ - minion_id - ].req_channel = salt.transport.client.AsyncReqChannel.factory( - sub_proxy_data["proxy_opts"], io_loop=self.io_loop - ) + if self.deltaproxy_opts[minion_id] and self.deltaproxy_objs[minion_id]: + self.deltaproxy_objs[ + minion_id + ].req_channel = salt.transport.client.AsyncReqChannel.factory( + sub_proxy_data["proxy_opts"], io_loop=self.io_loop + ) self.ready = True @@ -1060,15 +1087,20 @@ def tune_in(self, start=True): Lock onto the publisher. This is the main event loop for the minion :rtype : None """ - with concurrent.futures.ThreadPoolExecutor() as executor: - futures = [ - executor.submit(subproxy_tune_in, self.deltaproxy_objs[proxy_minion]) - for proxy_minion in self.deltaproxy_objs - ] - - for f in concurrent.futures.as_completed(futures): - _proxy_minion = f.result() - log.debug("Tune in for sub proxy %r finished", _proxy_minion.opts.get("id")) + if self.opts["proxy"].get("parallel_startup"): + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = [ + executor.submit(subproxy_tune_in, self.deltaproxy_objs[proxy_minion]) + for proxy_minion in self.deltaproxy_objs + ] + + for f in concurrent.futures.as_completed(futures): + _proxy_minion = f.result() + log.debug("Tune in for sub proxy %r finished", _proxy_minion.opts.get("id")) + else: + for proxy_minion in self.deltaproxy_objs: + _proxy_minion = subproxy_tune_in(self.deltaproxy_objs[proxy_minion]) + log.debug("Tune in for sub proxy %r finished", _proxy_minion.opts.get("id")) super(ProxyMinion, self).tune_in(start=start) diff --git a/tests/pytests/integration/cli/test_salt_deltaproxy.py b/tests/pytests/integration/cli/test_salt_deltaproxy.py index 61823fb6bcd9..59bf3c59fae9 100644 --- a/tests/pytests/integration/cli/test_salt_deltaproxy.py +++ b/tests/pytests/integration/cli/test_salt_deltaproxy.py @@ -116,10 +116,16 @@ def test_exit_status_unknown_argument(salt_master, proxy_minion_id): # Hangs on Windows. You can add a timeout to the proxy.run command, but then # it just times out. @pytest.mark.skip_on_windows(reason=PRE_PYTEST_SKIP_REASON) +@pytest.mark.parametrize( + "parallel_startup", + [True, False], + ids=["parallel_startup=True", "parallel_startup=False"], +) def test_exit_status_correct_usage( salt_master, salt_cli, proxy_minion_id, + parallel_startup, ): """ Ensure the salt-proxy control proxy starts and @@ -153,11 +159,12 @@ def test_exit_status_correct_usage( controlproxy_pillar_file = """ proxy: proxytype: deltaproxy + parallel_startup: {} ids: - {} - {} """.format( - proxy_one, proxy_two + parallel_startup, proxy_one, proxy_two ) dummy_proxy_one_pillar_file = """ @@ -227,10 +234,16 @@ def test_exit_status_correct_usage( # Hangs on Windows. You can add a timeout to the proxy.run command, but then # it just times out. @pytest.mark.skip_on_windows(reason=PRE_PYTEST_SKIP_REASON) +@pytest.mark.parametrize( + "parallel_startup", + [True, False], + ids=["parallel_startup=True", "parallel_startup=False"], +) def test_missing_pillar_file( salt_master, salt_cli, proxy_minion_id, + parallel_startup, ): """ Ensure that the control proxy minion starts up when @@ -258,11 +271,12 @@ def test_missing_pillar_file( controlproxy_pillar_file = """ proxy: proxytype: deltaproxy + parallel_startup: {} ids: - {} - {} """.format( - proxy_one, proxy_two + parallel_startup, proxy_one, proxy_two ) dummy_proxy_one_pillar_file = """ @@ -318,10 +332,16 @@ def test_missing_pillar_file( # Hangs on Windows. You can add a timeout to the proxy.run command, but then # it just times out. @pytest.mark.skip_on_windows(reason=PRE_PYTEST_SKIP_REASON) +@pytest.mark.parametrize( + "parallel_startup", + [True, False], + ids=["parallel_startup=True", "parallel_startup=False"], +) def test_invalid_connection( salt_master, salt_cli, proxy_minion_id, + parallel_startup, ): """ Ensure that the control proxy minion starts up when @@ -356,12 +376,13 @@ def test_invalid_connection( controlproxy_pillar_file = """ proxy: proxytype: deltaproxy + parallel_startup: {} ids: - {} - {} - {} """.format( - broken_proxy_one, broken_proxy_two, proxy_one + parallel_startup, broken_proxy_one, broken_proxy_two, proxy_one ) dummy_proxy_one_pillar_file = """ diff --git a/tests/pytests/integration/proxy/conftest.py b/tests/pytests/integration/proxy/conftest.py index 7406e59f5995..d924f4eba8ad 100644 --- a/tests/pytests/integration/proxy/conftest.py +++ b/tests/pytests/integration/proxy/conftest.py @@ -12,7 +12,16 @@ def salt_proxy(salt_master, salt_proxy_factory): @pytest.fixture(scope="module") -def deltaproxy_pillar_tree(salt_master, salt_delta_proxy_factory): +def deltaproxy_parallel_startup(): + yield from [True, False] + + +@pytest.fixture( + scope="module", + params=[True, False], + ids=["parallel_startup=True", "parallel_startup=False"], +) +def deltaproxy_pillar_tree(request, salt_master, salt_delta_proxy_factory): """ Create the pillar files for controlproxy and two dummy proxy minions """ @@ -45,12 +54,14 @@ def deltaproxy_pillar_tree(salt_master, salt_delta_proxy_factory): controlproxy_pillar_file = """ proxy: proxytype: deltaproxy + parallel_startup: {} ids: - {} - {} - {} - {} """.format( + request.param, proxy_one, proxy_two, proxy_three, From f0dcd401952bdbe0bdd44f2aa015105338379cc1 Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Wed, 2 Nov 2022 09:49:29 -0700 Subject: [PATCH 12/13] Adding changelog. --- changelog/61153.fixed | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/61153.fixed diff --git a/changelog/61153.fixed b/changelog/61153.fixed new file mode 100644 index 000000000000..ad7b978a7f2f --- /dev/null +++ b/changelog/61153.fixed @@ -0,0 +1 @@ +Initial work to allow parallel startup of proxy minions when used as sub proxies with Deltaproxy. From b5ff9931e0a31eda004baf26a7bdfd6647256452 Mon Sep 17 00:00:00 2001 From: "Gareth J. Greenaway" Date: Thu, 3 Nov 2022 11:39:45 -0700 Subject: [PATCH 13/13] requested changes --- changelog/{61153.fixed => 61153.added} | 0 salt/metaproxy/deltaproxy.py | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) rename changelog/{61153.fixed => 61153.added} (100%) diff --git a/changelog/61153.fixed b/changelog/61153.added similarity index 100% rename from changelog/61153.fixed rename to changelog/61153.added diff --git a/salt/metaproxy/deltaproxy.py b/salt/metaproxy/deltaproxy.py index 8db2bf5dfe1d..206cfce5cf4f 100644 --- a/salt/metaproxy/deltaproxy.py +++ b/salt/metaproxy/deltaproxy.py @@ -322,7 +322,7 @@ def post_master_init(self, master): self.add_periodic_callback("cleanup", self.cleanup_subprocesses) if self.opts["proxy"].get("parallel_startup"): - log.debug("Doing parallel startup") + log.debug("Initiating parallel startup for proxies") with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ executor.submit( @@ -351,7 +351,7 @@ def post_master_init(self, master): sub_proxy_data["proxy_opts"], io_loop=self.io_loop ) else: - log.debug("Doing non-parallel startup") + log.debug("Initiating non-parallel startup for proxies") for _id in self.opts["proxy"].get("ids", []): sub_proxy_data = subproxy_post_master_init( _id, uid, self.opts, self.proxy, self.utils