From 593dd317f1f07c6397d15178ec7dd63ea93466d5 Mon Sep 17 00:00:00 2001 From: Paul van der Linden Date: Fri, 2 Apr 2021 13:02:07 +0200 Subject: [PATCH] fixing multiprocessing --- kapitan/cached.py | 32 ++++++++++++++++++++++++++++++++ kapitan/resources.py | 3 +-- kapitan/targets.py | 29 +++++++++++++++++------------ kapitan/utils.py | 3 ++- setup.py | 4 +--- 5 files changed, 53 insertions(+), 18 deletions(-) diff --git a/kapitan/cached.py b/kapitan/cached.py index 58e8425e4..17c7edba4 100644 --- a/kapitan/cached.py +++ b/kapitan/cached.py @@ -35,6 +35,38 @@ def reset_cache(): revealer_obj = None +def from_dict(cache_dict): + global inv, inv_cache, gpg_obj, gkms_obj, awskms_obj, azkms_obj, dot_kapitan, ref_controller_obj, revealer_obj, inv_sources, args + + inv = cache_dict["inv"] + inv_cache = cache_dict["inv_cache"] + inv_sources = cache_dict["inv_sources"] + gpg_obj = cache_dict["gpg_obj"] + gkms_obj = cache_dict["gkms_obj"] + awskms_obj = cache_dict["awskms_obj"] + azkms_obj = cache_dict["azkms_obj"] + dot_kapitan = cache_dict["dot_kapitan"] + ref_controller_obj = cache_dict["ref_controller_obj"] + revealer_obj = cache_dict["revealer_obj"] + args = cache_dict["args"] + + +def as_dict(): + return { + "inv": inv, + "inv_cache": inv_cache, + "inv_sources": inv_sources, + "gpg_obj": gpg_obj, + "gkms_obj": gkms_obj, + "awskms_obj": awskms_obj, + "azkms_obj": azkms_obj, + "dot_kapitan": dot_kapitan, + "ref_controller_obj": ref_controller_obj, + "revealer_obj": revealer_obj, + "args": args, + } + + def reset_inv(): """clears the inv while fetching remote inventories""" global inv diff --git a/kapitan/resources.py b/kapitan/resources.py index af7ac0e14..3627c217c 100644 --- a/kapitan/resources.py +++ b/kapitan/resources.py @@ -21,7 +21,6 @@ import kapitan.cached as cached import yaml from kapitan import __file__ as kapitan_install_path -from kapitan import cached as cached from kapitan.errors import CompileError, InventoryError, KapitanError from kapitan.utils import PrettyDumper, deep_get, flatten_dict, render_jinja2_file, sha256_string @@ -38,6 +37,7 @@ JSONNET_CACHE = {} + def resource_callbacks(search_paths): """ Returns a dict with all the functions to be used @@ -258,7 +258,6 @@ def inventory(search_paths, target, inventory_path=None): set inventory_path to read custom path. None defaults to value set via cli Returns a dictionary with the inventory for target """ - if inventory_path is None: # grab inventory_path value from cli subcommand inventory_path_arg = cached.args.get("compile") or cached.args.get("inventory") diff --git a/kapitan/targets.py b/kapitan/targets.py index c9c5b0862..9ce51dcef 100644 --- a/kapitan/targets.py +++ b/kapitan/targets.py @@ -84,7 +84,7 @@ def compile_targets( logger.info("No changes since last compilation.") return - pool = multiprocessing.Pool(parallel) + pool = multiprocessing.get_context("spawn").Pool(parallel) try: if kwargs.get("fetch_inventories", False): @@ -96,14 +96,6 @@ def compile_targets( # append "compiled" to output_path so we can safely overwrite it compile_path = os.path.join(output_path, "compiled") - worker = partial( - compile_target, - search_paths=search_paths, - compile_path=temp_compile_path, - ref_controller=ref_controller, - inventory_path=inventory_path, - **kwargs, - ) if not target_objs: raise CompileError("Error: no targets found") @@ -129,9 +121,19 @@ def compile_targets( output_path, target_objs, dep_cache_dir, kwargs.get("force_fetch", False), pool ) + worker = partial( + compile_target, + search_paths=search_paths, + compile_path=temp_compile_path, + ref_controller=ref_controller, + inventory_path=inventory_path, + globals_cached=cached.as_dict(), + **kwargs, + ) + # compile_target() returns None on success - for target_obj in target_objs: - worker(target_obj) + # so p is only not None when raising an exception + [p.get() for p in pool.imap_unordered(worker, target_objs) if p] os.makedirs(compile_path, exist_ok=True) @@ -416,13 +418,16 @@ def search_targets(inventory_path, targets, labels): return targets_found -def compile_target(target_obj, search_paths, compile_path, ref_controller, **kwargs): +def compile_target(target_obj, search_paths, compile_path, ref_controller, globals_cached=None, **kwargs): """Compiles target_obj and writes to compile_path""" start = time.time() compile_objs = target_obj["compile"] ext_vars = target_obj["vars"] target_name = ext_vars["target"] + if globals_cached: + cached.from_dict(globals_cached) + for comp_obj in compile_objs: input_type = comp_obj["input_type"] output_path = comp_obj["output_path"] diff --git a/kapitan/utils.py b/kapitan/utils.py index 80a997a37..e20638850 100644 --- a/kapitan/utils.py +++ b/kapitan/utils.py @@ -24,7 +24,8 @@ JSONNET_AVAILABLE = True try: import _gojsonnet as jsonnet - logging.debug('Using GO jsonnet over C jsonnet') + + logging.debug("Using GO jsonnet over C jsonnet") except ImportError: try: import _jsonnet as jsonnet diff --git a/setup.py b/setup.py index 9c921313f..a126b03b1 100644 --- a/setup.py +++ b/setup.py @@ -75,7 +75,5 @@ def install_deps(): "kapitan=kapitan.cli:main", ], }, - extras_require={ - "gojsonnet": ["gojsonnet==0.17.0"] - } + extras_require={"gojsonnet": ["gojsonnet==0.17.0"]}, )