From bd82c7b0fc42f778d110405f7e1e3adc4346e6c9 Mon Sep 17 00:00:00 2001 From: Nathan Lee <43053363+natsunlee@users.noreply.github.com> Date: Wed, 14 Feb 2024 00:59:01 -0500 Subject: [PATCH] 0.7 (#58) * Addition of include functionality for YAML (#54) * Made behavior for paths in include block to be consistent with typical behavior for Job Definition paths (#57) --- .bumpversion.cfg | 2 +- README.md | 80 ++++++++++ flowmancer/flowmancer.py | 24 ++- flowmancer/jobdefinition/__init__.py | 19 ++- flowmancer/jobdefinition/file.py | 58 +++++-- flowmancer/version.py | 2 +- pyproject.toml | 2 +- tests/jobdefinition/test_file.py | 228 ++++++++++++++++++++++----- 8 files changed, 349 insertions(+), 66 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 7b9f3ab..bbbc20c 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,3 +1,3 @@ [bumpversion] -current_version = 0.6.3 +current_version = 0.7.0 files = flowmancer/version.py pyproject.toml diff --git a/README.md b/README.md index 37758b9..cfa6174 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,86 @@ config: - internal_flowmancer_package ``` +### Include YAML Files +An optional `include` block may be defined in the Job Definition in order to merge multiple Job Definition YAML files. +YAML files are provided in a list and processed in the order given, with the containing YAML being processed last. + +For example: +```yaml +# /jobdefs/template.yaml +config: + name: generic-template + +tasks: + do-something: + task: DoSomething + parameters: + some_required_param: I am a required string parameter +``` + +```yaml +# /jobdefs/cleanup_addon.yaml +include: + - $SYS{APP_ROOT_DIR}/jobdefs/template.yaml + +tasks: + cleanup: + task: Cleanup + dependencies: + - do-something +``` + +```yaml +# /jobdefs/complete.yaml +config: + name: complete-job + +include: + - $SYS{APP_ROOT_DIR}/jobdefs/cleanup_addon.yaml + +tasks: + do-something: + task: Do Something + parameters: + added_optional_param: 99 +``` + +Loading the `complete.yaml` job definition will result in a YAML equivalent to: +```yaml +config: + name: complete-job + +tasks: + do-something: + task: Do Something + parameters: + some_required_param: I am a required string parameter + added_optional_param: 99 +``` + +> :warning: Array values are **NOT** merged like dictionaries are. Any array values (and therfore any nested structures) within them will be replaced if modified in a later YAML. + +Additionally, the above example could have all `include` values in the `complete.yaml` file and the `include` block removed from `cleanup_addon.yaml`: +```yaml +# /jobdefs/complete.yaml +config: + name: complete-job + +# As with most paths in Job Definition, paths to `include` YAML files are relative to `.py` file where the `.start()` +# method for Flowmancer is invoked. +include: + - ./jobdefs/template.yaml + - ./jobdefs/cleanup_addon.yaml + +tasks: + do-something: + task: Do Something + parameters: + added_optional_param: 99 +``` + +The `include` values are processed in order and results in the same outcome as the original example. + ### Changing Default File Logger Directory The Job Definition accepts an optional `loggers` section, which if left empty will default to using a `FileLogger` with default settings. To utilize the default `FileLogger`, but with a different configuration, explicitly provide the `loggers` block: diff --git a/flowmancer/flowmancer.py b/flowmancer/flowmancer.py index b797eba..fa0f726 100644 --- a/flowmancer/flowmancer.py +++ b/flowmancer/flowmancer.py @@ -24,9 +24,10 @@ from .executor import ExecutionStateMap, Executor from .extensions.extension import Extension, _extension_classes from .jobdefinition import ( - Configuration, + ConfigurationDefinition, ExtensionDefinition, JobDefinition, + LoadParams, LoggerDefinition, TaskDefinition, _job_definition_classes, @@ -97,7 +98,7 @@ def _load_extensions_path(path: str, package_chain: Optional[List[str]] = None): class Flowmancer: def __init__(self, *, test: bool = False, debug: bool = False) -> None: manager = Manager() - self._config: Configuration = Configuration() + self._config: ConfigurationDefinition = ConfigurationDefinition() self._test = test self._debug = debug self._log_event_bus = EventBus[SerializableLogEvent](manager.Queue()) @@ -116,9 +117,10 @@ def start(self) -> int: try: # Ensure any components, such as file loggers, work with respect to the .py file in which the `start` # command is invoked, which is usually the project root dir. - os.chdir(os.path.dirname(os.path.abspath(inspect.stack()[-1][1]))) + app_root_dir = os.path.dirname(os.path.abspath(inspect.stack()[-1][1])) + os.chdir(app_root_dir) if not self._test: - self._process_cmd_args(orig_cwd) + self._process_cmd_args(orig_cwd, app_root_dir) if not self._executors: raise NoTasksLoadedError( 'No Tasks have been loaded! Please check that you have provided a valid Job Definition file.' @@ -142,9 +144,10 @@ async def _initiate(self) -> int: await asyncio.gather(*observer_tasks, *executor_tasks, *logger_tasks, checkpoint_task) return len(self._states[ExecutionState.FAILED]) + len(self._states[ExecutionState.DEFAULTED]) - def _process_cmd_args(self, caller_cwd: str) -> None: + def _process_cmd_args(self, caller_cwd: str, app_root_dir: str) -> None: parser = ArgumentParser(description='Flowmancer job execution options.') parser.add_argument('-j', '--jobdef', action='store', dest='jobdef') + parser.add_argument('-t', '--type', action='store', dest='jobdef_type', default='yaml') parser.add_argument('-r', '--restart', action='store_true', dest='restart', default=False) parser.add_argument('-d', '--debug', action='store_true', dest='debug', default=False) parser.add_argument('--skip', action='append', dest='skip', default=[]) @@ -157,7 +160,7 @@ def _process_cmd_args(self, caller_cwd: str) -> None: if args.jobdef: jobdef_path = args.jobdef if args.jobdef.startswith('/') else os.path.join(caller_cwd, args.jobdef) - self.load_job_definition(jobdef_path) + self.load_job_definition(jobdef_path, app_root_dir, args.jobdef_type) if args.restart: try: @@ -336,11 +339,16 @@ async def await_dependencies() -> bool: self._executors[name] = ExecutorDetails(instance=e, dependencies=(deps or [])) self._states[ExecutionState.INIT].add(name) - def load_job_definition(self, j: Union[JobDefinition, str], filetype: str = 'yaml') -> Flowmancer: + def load_job_definition( + self, + j: Union[JobDefinition, str], + app_root_dir: str, + jobdef_type: str = 'yaml' + ) -> Flowmancer: if isinstance(j, JobDefinition): jobdef = j else: - jobdef = _job_definition_classes[filetype]().load(j) + jobdef = _job_definition_classes[jobdef_type]().load(j, LoadParams(APP_ROOT_DIR=app_root_dir)) # Configurations self._config = jobdef.config diff --git a/flowmancer/jobdefinition/__init__.py b/flowmancer/jobdefinition/__init__.py index c7cafeb..c99e656 100644 --- a/flowmancer/jobdefinition/__init__.py +++ b/flowmancer/jobdefinition/__init__.py @@ -1,7 +1,8 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, List, Type +from pathlib import Path +from typing import Any, Callable, Dict, List, Type, Union from pydantic import BaseModel, Extra @@ -42,7 +43,7 @@ class ExtensionDefinition(JobDefinitionComponent): parameters: Dict[str, Any] = dict() -class Configuration(JobDefinitionComponent): +class ConfigurationDefinition(JobDefinitionComponent): name: str = 'flowmancer' max_concurrency: int = 0 extension_directories: List[str] = [] @@ -56,18 +57,26 @@ class CheckpointerDefinition(JobDefinitionComponent): class JobDefinition(JobDefinitionComponent): version: float = 0.1 - config: Configuration = Configuration() + include: List[Path] = [] + config: ConfigurationDefinition = ConfigurationDefinition() tasks: Dict[str, TaskDefinition] loggers: Dict[str, LoggerDefinition] = {'file-logger': LoggerDefinition(logger='FileLogger')} extensions: Dict[str, ExtensionDefinition] = {'progress-bar': ExtensionDefinition(extension='RichProgressBar')} checkpointer_config: CheckpointerDefinition = CheckpointerDefinition(checkpointer='FileCheckpointer') +class LoadParams(BaseModel): + APP_ROOT_DIR: str = '.' + + class Config: + extra = Extra.forbid + + class SerializableJobDefinition(ABC): @abstractmethod - def load(self, filename: str) -> JobDefinition: + def load(self, filename: Union[Path, str], params: LoadParams = LoadParams()) -> JobDefinition: pass @abstractmethod - def dump(self, jdef: JobDefinition, filename: str) -> None: + def dump(self, jdef: JobDefinition, filename: Union[Path, str]) -> None: pass diff --git a/flowmancer/jobdefinition/file.py b/flowmancer/jobdefinition/file.py index 7dc0866..ca002b6 100644 --- a/flowmancer/jobdefinition/file.py +++ b/flowmancer/jobdefinition/file.py @@ -1,29 +1,61 @@ import os import re from pathlib import Path -from typing import Any, Union +from typing import Any, Callable, Dict, Pattern, Union import yaml -from . import JobDefinition, SerializableJobDefinition, job_definition +from . import JobDefinition, LoadParams, SerializableJobDefinition, job_definition -def _path_constructor(_: Any, node: Any) -> str: - def replace_fn(match): - parts = f"{match.group(1)}:".split(":") - return os.environ.get(parts[0], parts[1]) - _env_var_matcher = re.compile(r'\$ENV{([^}^{]+)}') - return _env_var_matcher.sub(replace_fn, node.value) +def _merge(a: Any, b: Any) -> None: + for k in b.keys(): + if k in a and isinstance(a[k], dict) and isinstance(b[k], dict): + _merge(a[k], b[k]) + else: + a[k] = b[k] + + +def _build_path_constructor(var_matcher: Pattern[str], val_dict: Dict[str, Any]) -> Callable: + def _path_constructor(_: yaml.SafeLoader, node: yaml.Node) -> str: + def replace_fn(match): + parts = f"{match.group(1)}:".split(":") + return val_dict.get(parts[0], parts[1]) + return var_matcher.sub(replace_fn, node.value) + return _path_constructor @job_definition('yaml') class YAMLJobDefinition(SerializableJobDefinition): - def load(self, filename: Union[Path, str]) -> JobDefinition: - _env_tag_matcher = re.compile(r'[^$]*\$ENV{([^}^{]+)}.*') - yaml.add_implicit_resolver("!envvar", _env_tag_matcher, None, yaml.SafeLoader) - yaml.add_constructor("!envvar", _path_constructor, yaml.SafeLoader) + def load(self, filename: Union[Path, str], params: LoadParams = LoadParams()) -> JobDefinition: + # Add constructor for built-in vars + sys_tag = re.compile(r'[^$]*\$SYS{([^}^{]+)}.*') + sys_var = re.compile(r'\$SYS{([^}^{]+)}') + yaml.add_implicit_resolver("!sysvar", sys_tag, None, yaml.SafeLoader) + yaml.add_constructor("!sysvar", _build_path_constructor(sys_var, dict(params)), yaml.SafeLoader) + + # Add constructor for env vars + env_tag = re.compile(r'[^$]*\$ENV{([^}^{]+)}.*') + env_var = re.compile(r'\$ENV{([^}^{]+)}') + yaml.add_implicit_resolver("!envvar", env_tag, None, yaml.SafeLoader) + yaml.add_constructor("!envvar", _build_path_constructor(env_var, dict(os.environ)), yaml.SafeLoader) + + def process_includes(jdef, merged, seen): + for p in jdef.get('include', []): + if not p.startswith('/'): + p = os.path.abspath(os.path.join(params.APP_ROOT_DIR, p)) + if p in seen: + raise RuntimeError('asdf') + seen.add(p) + with open(p, 'r') as f: + cur = yaml.safe_load(f.read()) + process_includes(cur, merged, seen) + _merge(merged, jdef) + + merged: Dict[str, Any] = dict() with open(filename, 'r') as f: - return JobDefinition(**yaml.safe_load(f.read())) + process_includes(yaml.safe_load(f.read()), merged, set()) + return JobDefinition(**merged) def dump(self, jdef: JobDefinition, filename: Union[Path, str]) -> None: with open(filename, 'r') as f: diff --git a/flowmancer/version.py b/flowmancer/version.py index 63af887..49e0fc1 100644 --- a/flowmancer/version.py +++ b/flowmancer/version.py @@ -1 +1 @@ -__version__ = "0.6.3" +__version__ = "0.7.0" diff --git a/pyproject.toml b/pyproject.toml index 57d33bd..8038d15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "flowmancer" -version = "0.6.3" +version = "0.7.0" description = "The Python Thing-Doer" authors = ["Nathan Lee "] license = "MIT" diff --git a/tests/jobdefinition/test_file.py b/tests/jobdefinition/test_file.py index bdb9f1f..d3301ed 100644 --- a/tests/jobdefinition/test_file.py +++ b/tests/jobdefinition/test_file.py @@ -1,49 +1,203 @@ import os from pathlib import Path +from typing import Any, Callable, Dict import pytest +import yaml +from flowmancer.jobdefinition import JobDefinition, LoadParams from flowmancer.jobdefinition.file import YAMLJobDefinition -_sample_valid_yaml_content = """ -config: - name: test_job -tasks: - first-task: - task: DoSomething - parameters: - init_step: $ENV{INIT_STEP_NAME:startup step} - run_step: running on and on +@pytest.fixture(scope='module') +def jobdef_dir(tmp_path_factory) -> Path: + return tmp_path_factory.mktemp('jobdef') + - second-task: - task: SaveVars - parameters: - var_a: $ENV{VAL_WITHOUT_DEFAULT} -""" +@pytest.fixture(scope='module') +def write_yaml(jobdef_dir: Path) -> Callable[[str, Dict[str, Any], bool], Path]: + def f(name: str, content: Dict[str, Any], wrap_vals_with_quotes: bool = False) -> Path: + fpath = jobdef_dir / name + with open(fpath, 'w') as f: + if wrap_vals_with_quotes: + # Regular dump wraps values in quotes... + f.write(yaml.dump(content, default_flow_style=False)) + else: + f.write(yaml.safe_dump(content, default_flow_style=False)) + return fpath + return f @pytest.fixture(scope='module') -def valid_yaml(tmp_path_factory) -> Path: - fpath = tmp_path_factory.mktemp('jobdef') / 'valid.yaml' - with open(fpath, 'w') as f: - f.write(_sample_valid_yaml_content) - return fpath - - -def test_env_vars_defaults(valid_yaml: Path) -> None: - loader = YAMLJobDefinition() - jdef = loader.load(str(valid_yaml)) - assert(jdef.tasks['first-task'].parameters['init_step'] == 'startup step') - assert(jdef.tasks['second-task'].parameters['var_a'] == '') - - -def test_env_vars_with_inputs(valid_yaml: Path) -> None: - os.environ['INIT_STEP_NAME'] = 'custom step name' - os.environ['VAL_WITHOUT_DEFAULT'] = 'hello world' - loader = YAMLJobDefinition() - jdef = loader.load(str(valid_yaml)) - del os.environ['INIT_STEP_NAME'] - del os.environ['VAL_WITHOUT_DEFAULT'] - assert(jdef.tasks['first-task'].parameters['init_step'] == 'custom step name') - assert(jdef.tasks['second-task'].parameters['var_a'] == 'hello world') +def load_yaml_jobdef(jobdef_dir: Path) -> Callable[[str], JobDefinition]: + def f(p: str) -> JobDefinition: + loader = YAMLJobDefinition() + params = LoadParams(APP_ROOT_DIR=str(jobdef_dir)) + return loader.load(str(jobdef_dir / p), params) + return f + + +def test_env_var_defaults( + write_yaml: Callable[[str, Dict[str, Any]], Path], + load_yaml_jobdef: Callable[[str], JobDefinition] +) -> None: + write_yaml('a.yaml', { + 'config': {'name': '$ENV{JOB_NAME:default name}'}, + 'tasks': {'test': {'task': '$ENV{TASK_CLASS:Test}'}} + }) + j = load_yaml_jobdef('a.yaml') + assert(j.config.name == 'default name') + assert(j.tasks['test'].task == 'Test') + + +def test_env_var_with_inputs( + write_yaml: Callable[[str, Dict[str, Any]], Path], + load_yaml_jobdef: Callable[[str], JobDefinition] +) -> None: + write_yaml('a.yaml', { + 'config': {'name': '$ENV{JOB_NAME:default name}'}, + 'tasks': { + 'test': {'task': '$ENV{TASK_CLASS:Test}'}, + 'notreal': {'task': '$ENV{NOT_REAL}'} + } + }) + os.environ['JOB_NAME'] = 'custom step name' + os.environ['TASK_CLASS'] = 'DoSomething' + j = load_yaml_jobdef('a.yaml') + del os.environ['JOB_NAME'] + del os.environ['TASK_CLASS'] + assert(j.config.name == 'custom step name') + assert(j.tasks['test'].task == 'DoSomething') + assert(j.tasks['notreal'].task == '') + + +def test_sys_var( + jobdef_dir: Path, + write_yaml: Callable[[str, Dict[str, Any]], Path], + load_yaml_jobdef: Callable[[str], JobDefinition] +) -> None: + write_yaml('a.yaml', { + 'tasks': { + 'test': { + 'task': 'Test', + 'parameters': { + 'app_root_dir': '$SYS{APP_ROOT_DIR}', + 'notreal': '$SYS{NOTREAL}', + } + } + } + }) + j = load_yaml_jobdef('a.yaml') + assert(j.tasks['test'].parameters['app_root_dir'] == str(jobdef_dir)) + assert(j.tasks['test'].parameters['notreal'] == '') + + +def test_var_as_literal( + write_yaml: Callable[[str, Dict[str, Any], bool], Path], + load_yaml_jobdef: Callable[[str], JobDefinition] +) -> None: + write_yaml('a.yaml', { + 'tasks': { + 'escaped_env': {'task': '$ENV{LITERAL}'}, + 'escaped_sys': {'task': '$SYS{LITERAL}'} + } + }, True) + j = load_yaml_jobdef('a.yaml') + assert(j.tasks['escaped_env'].task == '$ENV{LITERAL}') + assert(j.tasks['escaped_sys'].task == '$SYS{LITERAL}') + + +def test_include_order( + write_yaml: Callable[[str, Dict[str, Any]], Path], + load_yaml_jobdef: Callable[[str], JobDefinition] +) -> None: + a = { + 'config': { + 'name': 'a' + }, + 'tasks': { + 'do-something': { + 'task': 'DoSomething' + } + } + } + b = { + 'config': { + 'name': 'b' + }, + 'tasks': { + 'do-something': { + 'task': 'DoSomethingElse' + } + } + } + c = { + 'include': [ + '$SYS{APP_ROOT_DIR}/a.yaml', + '$SYS{APP_ROOT_DIR}/b.yaml' + ], + 'config': { + 'name': 'c' + } + } + write_yaml('a.yaml', a) + write_yaml('b.yaml', b) + write_yaml('c.yaml', c) + jdef = load_yaml_jobdef('c.yaml') + assert(jdef.config.name == 'c') + assert(jdef.tasks['do-something'].task == 'DoSomethingElse') + + +def test_nested_include( + write_yaml: Callable[[str, Dict[str, Any]], Path], + load_yaml_jobdef: Callable[[str], JobDefinition] +) -> None: + a = { + 'config': { + 'name': 'a' + }, + 'tasks': { + 'do-something': { + 'task': 'DoSomething' + } + } + } + b = { + 'include': [ + '$SYS{APP_ROOT_DIR}/a.yaml' + ], + 'config': { + 'name': 'b' + }, + 'tasks': { + 'do-something': { + 'task': 'DoSomethingElse' + } + } + } + c = { + 'include': [ + '$SYS{APP_ROOT_DIR}/b.yaml' + ], + 'config': { + 'name': 'c' + } + } + write_yaml('a.yaml', a) + write_yaml('b.yaml', b) + write_yaml('c.yaml', c) + jdef = load_yaml_jobdef('c.yaml') + assert(jdef.config.name == 'c') + assert(jdef.tasks['do-something'].task == 'DoSomethingElse') + + +def test_relative_path_include( + write_yaml: Callable[[str, Dict[str, Any]], Path], + load_yaml_jobdef: Callable[[str], JobDefinition] +) -> None: + a = {'tasks': {'do-something': {'task': 'DoSomething'}}} + b = {'include': ['./a.yaml'], 'tasks': {'do-something': {'task': 'DoSomethingElse'}}} + write_yaml('a.yaml', a) + write_yaml('b.yaml', b) + jdef = load_yaml_jobdef('b.yaml') + assert(jdef.tasks['do-something'].task == 'DoSomethingElse')