Skip to content

Commit

Permalink
0.7 (#58)
Browse files Browse the repository at this point in the history
* Addition of include functionality for YAML (#54)
* Made behavior for paths in include block to be consistent with typical behavior for Job Definition paths (#57)
  • Loading branch information
natsunlee committed Feb 14, 2024
1 parent e7a151f commit bd82c7b
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[bumpversion]
current_version = 0.6.3
current_version = 0.7.0
files = flowmancer/version.py pyproject.toml
80 changes: 80 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,86 @@ config:
- internal_flowmancer_package
```

### Include YAML Files
An optional `include` block may be defined in the Job Definition in order to merge multiple Job Definition YAML files.
YAML files are provided in a list and processed in the order given, with the containing YAML being processed last.

For example:
```yaml
# <app_root_dir>/jobdefs/template.yaml
config:
name: generic-template

tasks:
do-something:
task: DoSomething
parameters:
some_required_param: I am a required string parameter
```

```yaml
# <app_root_dir>/jobdefs/cleanup_addon.yaml
include:
- $SYS{APP_ROOT_DIR}/jobdefs/template.yaml

tasks:
cleanup:
task: Cleanup
dependencies:
- do-something
```

```yaml
# <app_root_dir>/jobdefs/complete.yaml
config:
name: complete-job

include:
- $SYS{APP_ROOT_DIR}/jobdefs/cleanup_addon.yaml

tasks:
do-something:
task: Do Something
parameters:
added_optional_param: 99
```

Loading the `complete.yaml` job definition will result in a YAML equivalent to:
```yaml
config:
name: complete-job

tasks:
do-something:
task: Do Something
parameters:
some_required_param: I am a required string parameter
added_optional_param: 99
```

> :warning: Array values are **NOT** merged like dictionaries are. Any array values (and therfore any nested structures) within them will be replaced if modified in a later YAML.
Additionally, the above example could have all `include` values in the `complete.yaml` file and the `include` block removed from `cleanup_addon.yaml`:
```yaml
# <app_root_dir>/jobdefs/complete.yaml
config:
name: complete-job

# As with most paths in Job Definition, paths to `include` YAML files are relative to `.py` file where the `.start()`
# method for Flowmancer is invoked.
include:
- ./jobdefs/template.yaml
- ./jobdefs/cleanup_addon.yaml

tasks:
do-something:
task: Do Something
parameters:
added_optional_param: 99
```

The `include` values are processed in order and results in the same outcome as the original example.

### Changing Default File Logger Directory
The Job Definition accepts an optional `loggers` section, which if left empty will default to using a `FileLogger` with default settings.
To utilize the default `FileLogger`, but with a different configuration, explicitly provide the `loggers` block:
Expand Down
24 changes: 16 additions & 8 deletions flowmancer/flowmancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
from .executor import ExecutionStateMap, Executor
from .extensions.extension import Extension, _extension_classes
from .jobdefinition import (
Configuration,
ConfigurationDefinition,
ExtensionDefinition,
JobDefinition,
LoadParams,
LoggerDefinition,
TaskDefinition,
_job_definition_classes,
Expand Down Expand Up @@ -97,7 +98,7 @@ def _load_extensions_path(path: str, package_chain: Optional[List[str]] = None):
class Flowmancer:
def __init__(self, *, test: bool = False, debug: bool = False) -> None:
manager = Manager()
self._config: Configuration = Configuration()
self._config: ConfigurationDefinition = ConfigurationDefinition()
self._test = test
self._debug = debug
self._log_event_bus = EventBus[SerializableLogEvent](manager.Queue())
Expand All @@ -116,9 +117,10 @@ def start(self) -> int:
try:
# Ensure any components, such as file loggers, work with respect to the .py file in which the `start`
# command is invoked, which is usually the project root dir.
os.chdir(os.path.dirname(os.path.abspath(inspect.stack()[-1][1])))
app_root_dir = os.path.dirname(os.path.abspath(inspect.stack()[-1][1]))
os.chdir(app_root_dir)
if not self._test:
self._process_cmd_args(orig_cwd)
self._process_cmd_args(orig_cwd, app_root_dir)
if not self._executors:
raise NoTasksLoadedError(
'No Tasks have been loaded! Please check that you have provided a valid Job Definition file.'
Expand All @@ -142,9 +144,10 @@ async def _initiate(self) -> int:
await asyncio.gather(*observer_tasks, *executor_tasks, *logger_tasks, checkpoint_task)
return len(self._states[ExecutionState.FAILED]) + len(self._states[ExecutionState.DEFAULTED])

def _process_cmd_args(self, caller_cwd: str) -> None:
def _process_cmd_args(self, caller_cwd: str, app_root_dir: str) -> None:
parser = ArgumentParser(description='Flowmancer job execution options.')
parser.add_argument('-j', '--jobdef', action='store', dest='jobdef')
parser.add_argument('-t', '--type', action='store', dest='jobdef_type', default='yaml')
parser.add_argument('-r', '--restart', action='store_true', dest='restart', default=False)
parser.add_argument('-d', '--debug', action='store_true', dest='debug', default=False)
parser.add_argument('--skip', action='append', dest='skip', default=[])
Expand All @@ -157,7 +160,7 @@ def _process_cmd_args(self, caller_cwd: str) -> None:

if args.jobdef:
jobdef_path = args.jobdef if args.jobdef.startswith('/') else os.path.join(caller_cwd, args.jobdef)
self.load_job_definition(jobdef_path)
self.load_job_definition(jobdef_path, app_root_dir, args.jobdef_type)

if args.restart:
try:
Expand Down Expand Up @@ -336,11 +339,16 @@ async def await_dependencies() -> bool:
self._executors[name] = ExecutorDetails(instance=e, dependencies=(deps or []))
self._states[ExecutionState.INIT].add(name)

def load_job_definition(self, j: Union[JobDefinition, str], filetype: str = 'yaml') -> Flowmancer:
def load_job_definition(
self,
j: Union[JobDefinition, str],
app_root_dir: str,
jobdef_type: str = 'yaml'
) -> Flowmancer:
if isinstance(j, JobDefinition):
jobdef = j
else:
jobdef = _job_definition_classes[filetype]().load(j)
jobdef = _job_definition_classes[jobdef_type]().load(j, LoadParams(APP_ROOT_DIR=app_root_dir))

# Configurations
self._config = jobdef.config
Expand Down
19 changes: 14 additions & 5 deletions flowmancer/jobdefinition/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, List, Type
from pathlib import Path
from typing import Any, Callable, Dict, List, Type, Union

from pydantic import BaseModel, Extra

Expand Down Expand Up @@ -42,7 +43,7 @@ class ExtensionDefinition(JobDefinitionComponent):
parameters: Dict[str, Any] = dict()


class Configuration(JobDefinitionComponent):
class ConfigurationDefinition(JobDefinitionComponent):
name: str = 'flowmancer'
max_concurrency: int = 0
extension_directories: List[str] = []
Expand All @@ -56,18 +57,26 @@ class CheckpointerDefinition(JobDefinitionComponent):

class JobDefinition(JobDefinitionComponent):
version: float = 0.1
config: Configuration = Configuration()
include: List[Path] = []
config: ConfigurationDefinition = ConfigurationDefinition()
tasks: Dict[str, TaskDefinition]
loggers: Dict[str, LoggerDefinition] = {'file-logger': LoggerDefinition(logger='FileLogger')}
extensions: Dict[str, ExtensionDefinition] = {'progress-bar': ExtensionDefinition(extension='RichProgressBar')}
checkpointer_config: CheckpointerDefinition = CheckpointerDefinition(checkpointer='FileCheckpointer')


class LoadParams(BaseModel):
APP_ROOT_DIR: str = '.'

class Config:
extra = Extra.forbid


class SerializableJobDefinition(ABC):
@abstractmethod
def load(self, filename: str) -> JobDefinition:
def load(self, filename: Union[Path, str], params: LoadParams = LoadParams()) -> JobDefinition:
pass

@abstractmethod
def dump(self, jdef: JobDefinition, filename: str) -> None:
def dump(self, jdef: JobDefinition, filename: Union[Path, str]) -> None:
pass
58 changes: 45 additions & 13 deletions flowmancer/jobdefinition/file.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,61 @@
import os
import re
from pathlib import Path
from typing import Any, Union
from typing import Any, Callable, Dict, Pattern, Union

import yaml

from . import JobDefinition, SerializableJobDefinition, job_definition
from . import JobDefinition, LoadParams, SerializableJobDefinition, job_definition


def _path_constructor(_: Any, node: Any) -> str:
def replace_fn(match):
parts = f"{match.group(1)}:".split(":")
return os.environ.get(parts[0], parts[1])
_env_var_matcher = re.compile(r'\$ENV{([^}^{]+)}')
return _env_var_matcher.sub(replace_fn, node.value)
def _merge(a: Any, b: Any) -> None:
for k in b.keys():
if k in a and isinstance(a[k], dict) and isinstance(b[k], dict):
_merge(a[k], b[k])
else:
a[k] = b[k]


def _build_path_constructor(var_matcher: Pattern[str], val_dict: Dict[str, Any]) -> Callable:
def _path_constructor(_: yaml.SafeLoader, node: yaml.Node) -> str:
def replace_fn(match):
parts = f"{match.group(1)}:".split(":")
return val_dict.get(parts[0], parts[1])
return var_matcher.sub(replace_fn, node.value)
return _path_constructor


@job_definition('yaml')
class YAMLJobDefinition(SerializableJobDefinition):
def load(self, filename: Union[Path, str]) -> JobDefinition:
_env_tag_matcher = re.compile(r'[^$]*\$ENV{([^}^{]+)}.*')
yaml.add_implicit_resolver("!envvar", _env_tag_matcher, None, yaml.SafeLoader)
yaml.add_constructor("!envvar", _path_constructor, yaml.SafeLoader)
def load(self, filename: Union[Path, str], params: LoadParams = LoadParams()) -> JobDefinition:
# Add constructor for built-in vars
sys_tag = re.compile(r'[^$]*\$SYS{([^}^{]+)}.*')
sys_var = re.compile(r'\$SYS{([^}^{]+)}')
yaml.add_implicit_resolver("!sysvar", sys_tag, None, yaml.SafeLoader)
yaml.add_constructor("!sysvar", _build_path_constructor(sys_var, dict(params)), yaml.SafeLoader)

# Add constructor for env vars
env_tag = re.compile(r'[^$]*\$ENV{([^}^{]+)}.*')
env_var = re.compile(r'\$ENV{([^}^{]+)}')
yaml.add_implicit_resolver("!envvar", env_tag, None, yaml.SafeLoader)
yaml.add_constructor("!envvar", _build_path_constructor(env_var, dict(os.environ)), yaml.SafeLoader)

def process_includes(jdef, merged, seen):
for p in jdef.get('include', []):
if not p.startswith('/'):
p = os.path.abspath(os.path.join(params.APP_ROOT_DIR, p))
if p in seen:
raise RuntimeError('asdf')
seen.add(p)
with open(p, 'r') as f:
cur = yaml.safe_load(f.read())
process_includes(cur, merged, seen)
_merge(merged, jdef)

merged: Dict[str, Any] = dict()
with open(filename, 'r') as f:
return JobDefinition(**yaml.safe_load(f.read()))
process_includes(yaml.safe_load(f.read()), merged, set())
return JobDefinition(**merged)

def dump(self, jdef: JobDefinition, filename: Union[Path, str]) -> None:
with open(filename, 'r') as f:
Expand Down
2 changes: 1 addition & 1 deletion flowmancer/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.6.3"
__version__ = "0.7.0"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "flowmancer"
version = "0.6.3"
version = "0.7.0"
description = "The Python Thing-Doer"
authors = ["Nathan Lee <lee.nathan.sh@outlook.com>"]
license = "MIT"
Expand Down
Loading

0 comments on commit bd82c7b

Please sign in to comment.