Skip to content

Commit

Permalink
almost
Browse files Browse the repository at this point in the history
  • Loading branch information
ademariag committed May 13, 2024
1 parent 447e00e commit d7e8463
Show file tree
Hide file tree
Showing 22 changed files with 465 additions and 139 deletions.
1 change: 1 addition & 0 deletions kapitan/inputs/kadet.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@


def inventory(lazy=False):
logger.info(Dict(inventory_func(search_paths.get(), current_target.get(), inventory_path), default_box=lazy)["parameters"])
return Dict(inventory_func(search_paths.get(), current_target.get(), inventory_path), default_box=lazy)


Expand Down
195 changes: 81 additions & 114 deletions kapitan/inventory/inv_omegaconf/inv_omegaconf.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,77 @@
import logging
import multiprocessing as mp
import os
from copy import deepcopy
from dataclasses import dataclass, field
from time import time
from cachetools import cached, LRUCache
import yaml
from omegaconf import ListMergeMode, OmegaConf, DictConfig
from omegaconf import OmegaConf

from typing import Self
from ..inventory import InventoryError, Inventory, InventoryTarget
from .resolvers import register_resolvers
from kadet import Dict


logger = logging.getLogger(__name__)


class OmegaConfTarget(InventoryTarget):
resolved: bool = False

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__add_metadata()

def resolve(self) -> Self:
if not self.resolved:
parameters = self.parameters
if isinstance(parameters, Dict):
parameters = OmegaConf.create(parameters.to_dict())
elif isinstance(parameters, dict):
parameters = OmegaConf.create(parameters)
OmegaConf.resolve(parameters, escape_interpolation_strings=False)

self.parameters = Dict(OmegaConf.to_container(parameters)).to_dict()
self.resolved = True
return self

def __add_metadata(self):
metadata = {
"name": {
"short": self.name.split(".")[-1],
"full": self.name,
"path": self.path,
"parts": self.name.split("."),
}
}
self.parameters["_kapitan_"] = metadata
self.parameters["_reclass_"] = metadata


class OmegaConfInventory(Inventory):
classes_cache: dict = {}

def render_targets(self, targets: list[InventoryTarget] = None, ignore_class_not_found: bool = False) -> None:
targets = self.targets.values()
self.ignore_class_not_found = ignore_class_not_found
manager = mp.Manager() # perf: bottleneck --> 90 % of the inventory time
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, target_class=OmegaConfTarget)

def render_targets(self, targets: list[OmegaConfTarget] = None, ignore_class_not_found: bool = False) -> None:
manager = mp.Manager()
shared_targets = manager.dict()

mp.set_start_method("spawn", True) # platform independent
with mp.Pool(min(len(targets), os.cpu_count())) as pool:
r = pool.map_async(self.inventory_worker, [(self, target, shared_targets) for target in targets])
r = pool.map_async(self.inventory_worker, [(self, target, shared_targets) for target in targets.values()])
r.wait()

# store parameters and classes
for target_name, rendered_target in shared_targets.items():
self.targets[target_name].parameters = rendered_target["parameters"]
logger.debug(f"Rendered {target_name} using OmegaConf with {len(rendered_target['parameters'])} parameters")



for target in shared_targets.values():
self.targets[target.name] = target

@staticmethod
def inventory_worker(zipped_args):
self, target, nodes = zipped_args
start = time()
self, target, shared_targets = zipped_args
try:
register_resolvers(self.inventory_path)
self.load_target(target)
nodes[target.name] = {"parameters": target.parameters}
register_resolvers(self.inventory_path)
target.resolve()
shared_targets[target.name] = target

except Exception as e:
logger.error(f"{target.name}: could not render due to error {e}")
raise
Expand All @@ -62,107 +92,44 @@ def resolve_class_file_path(self, class_name: str, class_parent_dir: str = None)

class_path_base = os.path.join(self.classes_path, *class_name.split("."))


if os.path.isfile(os.path.join(class_path_base, "init.yml")):
class_file = os.path.join(class_path_base, "init.yml")
elif os.path.isfile(class_path_base + ".yml"):
class_file = f"{class_path_base}.yml"

return class_file


@cached(cache=LRUCache(maxsize=1024))
def load_file(self, filename):
return Dict.from_yaml(filename=filename)

@cached(cache=LRUCache(maxsize=1024))
def load_class(self, class_file: str):
if class_file not in self.classes_cache:
self.classes_cache[class_file] = self._load_file(class_file)
return self.classes_cache[class_file]


def load_classes(self, target: InventoryTarget, classes: list, ignore_class_not_found=False, class_parent_dir=None):
for class_name in classes:
def load_parameters_from_file(self, filename, parameters={}) -> Dict:
parameters = Dict(parameters)
content = self.load_file(filename)

_classes = content.get("classes", [])
_parameters = content.get("parameters", {})

# first processes all classes
for class_name in _classes:
class_parent_dir = os.path.dirname(filename.removeprefix(self.classes_path).removeprefix("/"))
class_file = self.resolve_class_file_path(class_name, class_parent_dir=class_parent_dir)

if not class_file:
if ignore_class_not_found:
logger.warning(f"Class {class_name} not found")
if self.ignore_class_not_found:
continue
raise InventoryError(f"Class {class_name} not found")

content = self.load_class(class_file)

sub_classes = content.get("classes", [])

if sub_classes:
new_class_parent_dir = os.path.dirname(class_file.removeprefix(self.classes_path).removeprefix("/"))
self.load_classes(target, sub_classes, self.ignore_class_not_found, new_class_parent_dir)

parameters = content.get("parameters", {})
target.parameters = self._merge_parameters(target.parameters, parameters)

def load_target(self, target: InventoryTarget):
"""
load only one target with all its classes
"""

# load the target parameters
content = self._load_file(os.path.join(self.targets_path, target.path))
target.classes = content.get("classes", [])
self.load_classes(target, target.classes, ignore_class_not_found=self.ignore_class_not_found)

target.parameters = self._merge_parameters(target.parameters, content.get("parameters", {}))
parameters.merge_update(self.load_parameters_from_file(class_file), box_merge_lists="extend")

logger.debug(f"Loaded {target.name} with {len(target.parameters)} parameters and {len(target.classes)} classes")
# resolve interpolations
self._add_metadata(target)
target.parameters = self._resolve_parameters(target.parameters)

# obtain target name to insert in inv dict
vars_target_name = target.parameters.get("kapitan", {}).get("vars", {}).get("target")
if not vars_target_name:
# add hint to kapitan.vars.target
logger.warning(f"Could not resolve target name on target {target.name}")

def _add_metadata(self, target: InventoryTarget):
metadata = {
"name": {
"short": target.name.split(".")[-1],
"full": target.name,
"path": target.path,
"parts": target.name.split("."),
}
}
target.parameters["_kapitan_"] = metadata
target.parameters["_reclass_"] = metadata

@staticmethod
def _load_file(path: str):
with open(path, "r") as f:
f.seek(0)
config = yaml.load(f, yaml.SafeLoader)
return config

@staticmethod
def _merge_parameters(target_parameters: DictConfig, class_parameters: DictConfig) -> DictConfig:
if not target_parameters:
return class_parameters

return OmegaConf.unsafe_merge(
class_parameters, target_parameters, list_merge_mode=ListMergeMode.EXTEND,
)

@staticmethod
def _resolve_parameters(target_parameters: DictConfig):
# resolve first time
OmegaConf.resolve(target_parameters, escape_interpolation_strings=False)

# remove specified keys between first and second resolve-stage
remove_location = "omegaconf.remove"
removed_keys = OmegaConf.select(target_parameters, remove_location, default=[])
for key in removed_keys:
OmegaConf.update(target_parameters, key, {}, merge=False)

# resolve second time and convert to object
# TODO: add `throw_on_missing = True` when resolving second time (--> wait for to_object support)
# reference: https://github.com/omry/omegaconf/pull/1113
OmegaConf.resolve(target_parameters, escape_interpolation_strings=False)
return OmegaConf.to_container(target_parameters)

# finally merges the parameters from the current file
parameters.merge_update(_parameters, box_merge_lists="extend")
return parameters

def load_target(self, target: OmegaConfTarget):
full_target_path = os.path.join(self.targets_path, target.path)

parameters = Dict(target.parameters, frozen_box=True)
target.parameters = self.load_parameters_from_file(full_target_path, parameters=parameters).to_dict()

def resolve_all_target(self):
map(lambda x: x.resolve(), self.targets)
1 change: 0 additions & 1 deletion kapitan/inventory/inv_omegaconf/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ def register_resolvers(inventory_path: str) -> None:
"""register pre-defined and user-defined resolvers"""
replace = True

logger.debug(f"Registering resolvers from {inventory_path}")
# yaml key utility functions
OmegaConf.register_new_resolver("key", key, replace=replace)
OmegaConf.register_new_resolver("parentkey", parentkey, replace=replace)
Expand Down
20 changes: 13 additions & 7 deletions kapitan/inventory/inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,35 @@
from abc import ABC, abstractmethod
from pydantic import BaseModel, Field
from kapitan.errors import KapitanError
from typing import Annotated, Dict, Any, Optional
import kadet
from typing import Annotated, Any, Optional, Dict
logger = logging.getLogger(__name__)


class InventoryTarget(BaseModel):
name: str = Field(exclude=True)
path: str = Field(exclude=True)
parameters: dict = dict()
parameters: Dict = {}
classes: list = list()
applications: list = list()
exports: list = list()




class Inventory(ABC):
def __init__(self, inventory_path: str = "inventory", compose_target_name: bool = False, ignore_class_not_found=False):
def __init__(self, inventory_path: str = "inventory", compose_target_name: bool = False, ignore_class_not_found=False, initialise=True, target_class=InventoryTarget):
self.inventory_path = inventory_path
self.compose_target_name = compose_target_name
self.targets_path = os.path.join(self.inventory_path, 'targets')
self.classes_path = os.path.join(self.inventory_path, 'classes')
self.initialised: bool = False
self.targets: dict[str, InventoryTarget] = {}
self.targets: dict[str, target_class] = {}
self.ignore_class_not_found = ignore_class_not_found
self.target_class = target_class

self.__initialise(ignore_class_not_found=ignore_class_not_found)
if initialise:
self.__initialise(ignore_class_not_found=ignore_class_not_found)

@property
def inventory(self) -> dict:
Expand Down Expand Up @@ -63,7 +69,7 @@ def __initialise(self, ignore_class_not_found) -> bool:
logger.debug(f"ignoring {file}: targets have to be .yml or .yaml files.")
continue

target = InventoryTarget(name=name, path=path)
target = self.target_class(name=name, path=path)

if self.targets.get(target.name):
raise InventoryError(
Expand Down Expand Up @@ -101,7 +107,7 @@ def get_parameters(self, target_names: str | list[str], ignore_class_not_found:
target = self.get_target(target_names, ignore_class_not_found)
return target.parameters

return {name: target.parameters for name, target in self.get_targets(target_names)}
return {name: {"parameters": Dict(target.parameters)} for name, target in self.get_targets(target_names)}

@abstractmethod
def render_targets(self, targets: list[InventoryTarget] = None, ignore_class_not_found: bool = False) -> None:
Expand Down
2 changes: 1 addition & 1 deletion kapitan/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def search_imports(cwd, import_str, search_paths):
return normalised_path, normalised_path_content.encode()


def inventory(search_paths: list, target_name: str = None, inventory_path: str = "./inventory"):
def inventory(search_paths: list = [], target_name: str = None, inventory_path: str = "./inventory"):
"""
Reads inventory (set by inventory_path) in search_paths.
set nodes_uri to change reclass nodes_uri the default value
Expand Down
3 changes: 1 addition & 2 deletions kapitan/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def load_target_inventory(inventory_path, requested_targets, ignore_class_not_fo
else:
raise InventoryError(f"InventoryError: {target_name}: parameters is empty")

kapitan_target_configs = inv.get_parameters(target_name, ignore_class_not_found).get("kapitan")
kapitan_target_configs = inv.get_parameters(target_name, ignore_class_not_found)["kapitan"]
# check if parameters.kapitan is empty
if not kapitan_target_configs:
raise InventoryError(f"InventoryError: {target_name}: parameters.kapitan has no assignment")
Expand Down Expand Up @@ -493,7 +493,6 @@ def compile_target(target_obj, search_paths, compile_path, ref_controller, globa
logger.info("Compiled %s (%.2fs)", target_obj["target_full_path"], time.time() - start)


@hashable_lru_cache
def valid_target_obj(target_obj, require_compile=True):
"""
Validates a target_obj
Expand Down
Loading

0 comments on commit d7e8463

Please sign in to comment.