Skip to content

Commit

Permalink
Added basic tests for file-based job def (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
natsunlee committed Feb 7, 2024
1 parent 0f2873c commit 6027468
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 18 deletions.
30 changes: 12 additions & 18 deletions flowmancer/jobdefinition/file.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,30 @@
import os
import re
from typing import Any, Callable, Pattern
from pathlib import Path
from typing import Any, Union

import yaml

from . import JobDefinition, SerializableJobDefinition, job_definition

_env_var_matcher = re.compile(r'\$ENV{([^}^{]+)}')
_env_tag_matcher = re.compile(r'[^$]*\$ENV{([^}^{]+)}.*')

# TODO: parameters file
# _param_var_matcher = re.compile(r'\$PARAM{([^}^{]+)}')
# _param_tag_matcher = re.compile(r'[^$]*\$PARAM{([^}^{]+)}.*')


def _build_constructor(var_matcher: Pattern[str]) -> Callable[[Any, Any], str]:
def _path_constructor(_: Any, node: Any) -> str:
def replace_fn(match):
parts = f"{match.group(1)}:".split(":")
return os.environ.get(parts[0], parts[1])
return var_matcher.sub(replace_fn, node.value)
return _path_constructor
def _path_constructor(_: Any, node: Any) -> str:
def replace_fn(match):
parts = f"{match.group(1)}:".split(":")
return os.environ.get(parts[0], parts[1])
_env_var_matcher = re.compile(r'\$ENV{([^}^{]+)}')
return _env_var_matcher.sub(replace_fn, node.value)


@job_definition('yaml')
class YAMLJobDefinition(SerializableJobDefinition):
def load(self, filename: str) -> JobDefinition:
def load(self, filename: Union[Path, str]) -> JobDefinition:
_env_tag_matcher = re.compile(r'[^$]*\$ENV{([^}^{]+)}.*')
yaml.add_implicit_resolver("!envvar", _env_tag_matcher, None, yaml.SafeLoader)
yaml.add_constructor("!envvar", _build_constructor(_env_var_matcher), yaml.SafeLoader)
yaml.add_constructor("!envvar", _path_constructor, yaml.SafeLoader)
with open(filename, 'r') as f:
return JobDefinition(**yaml.safe_load(f.read()))

def dump(self, jdef: JobDefinition, filename: str) -> None:
def dump(self, jdef: JobDefinition, filename: Union[Path, str]) -> None:
with open(filename, 'r') as f:
f.write(yaml.dump(jdef.dict()))
Empty file added tests/jobdefinition/__init__.py
Empty file.
49 changes: 49 additions & 0 deletions tests/jobdefinition/test_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import os
from pathlib import Path

import pytest

from flowmancer.jobdefinition.file import YAMLJobDefinition

_sample_valid_yaml_content = """
config:
name: test_job
tasks:
first-task:
task: DoSomething
parameters:
init_step: $ENV{INIT_STEP_NAME:startup step}
run_step: running on and on
second-task:
task: SaveVars
parameters:
var_a: $ENV{VAL_WITHOUT_DEFAULT}
"""


@pytest.fixture(scope='module')
def valid_yaml(tmp_path_factory) -> Path:
fpath = tmp_path_factory.mktemp('jobdef') / 'valid.yaml'
with open(fpath, 'w') as f:
f.write(_sample_valid_yaml_content)
return fpath


def test_env_vars_defaults(valid_yaml: Path) -> None:
loader = YAMLJobDefinition()
jdef = loader.load(str(valid_yaml))
assert(jdef.tasks['first-task'].parameters['init_step'] == 'startup step')
assert(jdef.tasks['second-task'].parameters['var_a'] == '')


def test_env_vars_with_inputs(valid_yaml: Path) -> None:
os.environ['INIT_STEP_NAME'] = 'custom step name'
os.environ['VAL_WITHOUT_DEFAULT'] = 'hello world'
loader = YAMLJobDefinition()
jdef = loader.load(str(valid_yaml))
del os.environ['INIT_STEP_NAME']
del os.environ['VAL_WITHOUT_DEFAULT']
assert(jdef.tasks['first-task'].parameters['init_step'] == 'custom step name')
assert(jdef.tasks['second-task'].parameters['var_a'] == 'hello world')

0 comments on commit 6027468

Please sign in to comment.