diff --git a/reframe/core/exceptions.py b/reframe/core/exceptions.py index e24ff14bbf..fe4c2b51fc 100644 --- a/reframe/core/exceptions.py +++ b/reframe/core/exceptions.py @@ -60,6 +60,11 @@ class TaskExit(ReframeError): '''Raised when a regression task must exit the pipeline prematurely.''' +class TaskDependencyError(ReframeError): + '''Raised inside a regression task when one of its dependencies has + failed.''' + + class AbortTaskError(ReframeError): '''Raised into a regression task to denote that it has been aborted due to an external reason (e.g., keyboard interrupt, fatal error in other places diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 558dbb7b44..0c4862356b 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -12,6 +12,7 @@ import reframe.core.runtime as runtime import reframe.frontend.argparse as argparse import reframe.frontend.check_filters as filters +import reframe.frontend.dependency as dependency import reframe.utility.os_ext as os_ext from reframe.core.exceptions import (EnvironError, ConfigError, ReframeError, ReframeFatalError, format_exception, @@ -493,14 +494,15 @@ def main(): for p in rt.system.partitions for e in p.environs if re.match(env_patt, e.name)} - # Generate the test cases + # Generate the test cases, validate dependencies and sort them checks_matched = list(checks_matched) testcases = generate_testcases(checks_matched, options.skip_system_check, options.skip_prgenv_check, allowed_environs) - - # Act on checks + testgraph = dependency.build_deps(testcases) + dependency.validate_deps(testgraph) + testcases = dependency.toposort(testgraph) # Unload regression's module and load user-specified modules if hasattr(settings, 'reframe_module'): @@ -532,6 +534,8 @@ def main(): "Skipping..." % m) printer.debug(str(e)) + # Act on checks + success = True if options.list: # List matched checks diff --git a/reframe/frontend/dependency.py b/reframe/frontend/dependency.py index 1def1791b4..aa4a9e71de 100644 --- a/reframe/frontend/dependency.py +++ b/reframe/frontend/dependency.py @@ -10,37 +10,60 @@ from reframe.core.exceptions import DependencyError -def build_deps(cases): +def build_deps(cases, default_cases=None): '''Build dependency graph from test cases. The graph is represented as an adjacency list in a Python dictionary holding test cases. The dependency information is also encoded inside each - test cases. + test case. ''' # Index cases for quick access - cases_by_part = {} - cases_revmap = {} - for c in cases: - cname = c.check.name - pname = c.partition.fullname - ename = c.environ.name - cases_by_part.setdefault((cname, pname), []) - cases_revmap.setdefault((cname, pname, ename), None) - cases_by_part[cname, pname].append(c) - cases_revmap[cname, pname, ename] = c + def build_partition_index(cases): + if cases is None: + return {} + + ret = {} + for c in cases: + cname, pname = c.check.name, c.partition.fullname + ret.setdefault((cname, pname), []) + ret[cname, pname].append(c) + + return ret + + def build_cases_index(cases): + if cases is None: + return {} + + ret = {} + for c in cases: + cname = c.check.name + pname = c.partition.fullname + ename = c.environ.name + ret.setdefault((cname, pname, ename), c) + + return ret - def resolve_dep(target, from_map, *args): - errmsg = 'could not resolve dependency: %s' % target + def resolve_dep(target, from_map, fallback_map, *args): + errmsg = 'could not resolve dependency: %s -> %s' % (target, args) try: ret = from_map[args] except KeyError: + # try to resolve the dependency in the fallback map + try: + ret = fallback_map[args] + except KeyError: + raise DependencyError(errmsg) from None + + if not ret: raise DependencyError(errmsg) - else: - if not ret: - raise DependencyError(errmsg) - return ret + return ret + + cases_by_part = build_partition_index(cases) + cases_revmap = build_cases_index(cases) + default_cases_by_part = build_partition_index(default_cases) + default_cases_revmap = build_cases_index(default_cases) # NOTE on variable names # @@ -59,21 +82,31 @@ def resolve_dep(target, from_map, *args): for dep in c.check.user_deps(): tname, how, subdeps = dep if how == rfm.DEPEND_FULLY: - c.deps.extend(resolve_dep(c, cases_by_part, tname, pname)) + c.deps.extend(resolve_dep(c, cases_by_part, + default_cases_by_part, tname, pname)) elif how == rfm.DEPEND_BY_ENV: - c.deps.append(resolve_dep(c, cases_revmap, - tname, pname, ename)) + c.deps.append( + resolve_dep(c, cases_revmap, default_cases_revmap, + tname, pname, ename) + ) elif how == rfm.DEPEND_EXACT: for env, tenvs in subdeps.items(): if env != ename: continue for te in tenvs: - c.deps.append(resolve_dep(c, cases_revmap, - tname, pname, te)) + c.deps.append( + resolve_dep(c, cases_revmap, default_cases_revmap, + tname, pname, te) + ) graph[c] = util.OrderedSet(c.deps) + # Calculate in-degree of each node + for u, adjacent in graph.items(): + for v in adjacent: + v.in_degree += 1 + return graph @@ -136,10 +169,24 @@ def validate_deps(graph): sources -= visited -def toposort(graph): +def toposort(graph, is_subgraph=False): + '''Return a list of the graph nodes topologically sorted. + + If ``is_subgraph`` is ``True``, graph will by treated a subgraph, meaning + that any dangling edges will be ignored. + ''' test_deps = _reduce_deps(graph) visited = util.OrderedSet() + def retrieve(d, key, default): + try: + return d[key] + except KeyError: + if is_subgraph: + return default + else: + raise + def visit(node, path): # We assume an acyclic graph assert node not in path @@ -147,7 +194,7 @@ def visit(node, path): path.add(node) # Do a DFS visit of all the adjacent nodes - for adj in test_deps[node]: + for adj in retrieve(test_deps, node, []): if adj not in visited: visit(adj, path) @@ -166,4 +213,5 @@ def visit(node, path): except KeyError: cases_by_name[c.check.name] = [c] - return list(itertools.chain(*(cases_by_name[n] for n in visited))) + return list(itertools.chain(*(retrieve(cases_by_name, n, []) + for n in visited))) diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index f1fbe6b996..6f4b07f007 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -7,6 +7,7 @@ import reframe.core.environments as env import reframe.core.logging as logging import reframe.core.runtime as runtime +import reframe.frontend.dependency as dependency from reframe.core.exceptions import (AbortTaskError, JobNotStartedError, ReframeFatalError, TaskExit) from reframe.frontend.printer import PrettyPrinter @@ -28,6 +29,9 @@ def __init__(self, check, partition, environ): self.__check._case = weakref.ref(self) self.__deps = [] + # Incoming dependencies + self.in_degree = 0 + def __iter__(self): # Allow unpacking a test case with a single liner: # c, p, e = case @@ -66,6 +70,10 @@ def environ(self): def deps(self): return self.__deps + @property + def num_dependents(self): + return self.in_degree + def clone(self): # Return a fresh clone, i.e., one based on the original check return TestCase(self.__check_orig, self.__partition, self.__environ) @@ -105,11 +113,15 @@ class RegressionTask: def __init__(self, case, listeners=[]): self._case = case self._failed_stage = None - self._current_stage = None + self._current_stage = 'startup' self._exc_info = (None, None, None) self._environ = None self._listeners = list(listeners) + # Reference count for dependent tests; safe to cleanup the test only + # if it is zero + self.ref_count = case.num_dependents + # Test case has finished, but has not been waited for yet self.zombie = False @@ -183,9 +195,11 @@ def sanity(self): def performance(self): self._safe_call(self.check.performance) + def finalize(self): + self._notify_listeners('on_task_success') + def cleanup(self, *args, **kwargs): self._safe_call(self.check.cleanup, *args, **kwargs) - self._notify_listeners('on_task_success') def fail(self, exc_info=None): self._failed_stage = self._current_stage @@ -288,7 +302,12 @@ def _retry_failed(self, cases): 'Retrying %d failed check(s) (retry %d/%d)' % (num_failed_checks, rt.current_run, self._max_retries) ) - self._runall(t.testcase.clone() for t in failures) + + # Clone failed cases and rebuild dependencies among them + failed_cases = [t.testcase.clone() for t in failures] + cases_graph = dependency.build_deps(failed_cases, cases) + failed_cases = dependency.toposort(cases_graph, is_subgraph=True) + self._runall(failed_cases) failures = self._stats.failures() def _runall(self, testcases): diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index 417063ea7d..55f024d651 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -3,16 +3,20 @@ import time from datetime import datetime -from reframe.core.exceptions import TaskExit +from reframe.core.exceptions import (TaskDependencyError, TaskExit) from reframe.core.logging import getlogger from reframe.frontend.executors import (ExecutionPolicy, RegressionTask, TaskEventListener, ABORT_REASONS) -class SerialExecutionPolicy(ExecutionPolicy): +class SerialExecutionPolicy(ExecutionPolicy, TaskEventListener): def __init__(self): super().__init__() - self._tasks = [] + self._task_index = {} + + # Tasks that have finished, but have not performed their cleanup phase + self._retired_tasks = [] + self.task_listeners.append(self) def runcase(self, case): super().runcase(case) @@ -22,10 +26,14 @@ def runcase(self, case): 'RUN', '%s on %s using %s' % (check.name, partition.fullname, environ.name) ) - task = RegressionTask(case) - self._tasks.append(task) + task = RegressionTask(case, self.task_listeners) + self._task_index[case] = task self.stats.add_task(task) try: + # Do not run test if any of its dependencies has failed + if any(self._task_index[c].failed for c in case.deps): + raise TaskDependencyError('dependencies failed') + task.setup(partition, environ, sched_flex_alloc_tasks=self.sched_flex_alloc_tasks, sched_account=self.sched_account, @@ -45,7 +53,8 @@ def runcase(self, case): if not self.skip_performance_check: task.performance() - task.cleanup(not self.keep_stage_files) + self._retired_tasks.append(task) + task.finalize() except TaskExit: return @@ -58,6 +67,32 @@ def runcase(self, case): self.printer.status('FAIL' if task.failed else 'OK', task.check.info(), just='right') + def _cleanup_all(self): + for task in self._retired_tasks: + if task.ref_count == 0: + task.cleanup(not self.keep_stage_files) + self._retired_tasks.remove(task) + + def on_task_run(self, task): + pass + + def on_task_exit(self, task): + pass + + def on_task_failure(self, task): + pass + + def on_task_success(self, task): + # update reference count of dependencies + for c in task.testcase.deps: + self._task_index[c].ref_count -= 1 + + self._cleanup_all() + + def exit(self): + # Clean up all remaining tasks + self._cleanup_all() + class PollRateFunction: def __init__(self, min_rate, decay_time): @@ -108,9 +143,6 @@ def __init__(self): # Ready tasks to be executed per partition self._ready_tasks = {} - # The tasks associated with the running checks - self._tasks = [] - # Job limit per partition self._max_jobs = {} @@ -154,7 +186,6 @@ def runcase(self, case): self._max_jobs.setdefault(partition.fullname, partition.max_jobs) task = RegressionTask(case, self.task_listeners) - self._tasks.append(task) self.stats.add_task(task) try: task.setup(partition, environ, @@ -222,6 +253,7 @@ def _finalize_task(self, task): if not self.skip_performance_check: task.performance() + task.finalize() task.cleanup(not self.keep_stage_files) def _failall(self, cause): diff --git a/unittests/resources/checks_unlisted/deps_complex.py b/unittests/resources/checks_unlisted/deps_complex.py new file mode 100644 index 0000000000..5f58525483 --- /dev/null +++ b/unittests/resources/checks_unlisted/deps_complex.py @@ -0,0 +1,153 @@ +import os +import reframe as rfm +import reframe.utility.sanity as sn +from reframe.core.deferrable import make_deferrable + +# +# The following tests implement the dependency graph below: +# +# +# t0 +# ^ +# | +# +-->t4<--+ +# | | +# t5<------t1 +# ^ ^ +# | | +# +---t6---+ +# ^ +# | +# +<--t2<--t7 +# ^ +# | +# t3 +# +# +# Each test has an id, which is the digit in its name and it produces its +# output in the 'out.txt' file. Each test sums up its own id with the output +# produced by its parents and writes in its output file. +# + + +class BaseTest(rfm.RunOnlyRegressionTest): + def __init__(self): + self.valid_systems = ['*'] + self.valid_prog_environs = ['*'] + self.sourcesdir = None + self.executable = 'echo' + self._count = int(type(self).__name__[1:]) + self.sanity_patterns = make_deferrable(True) + self.keep_files = ['out.txt'] + + @property + @sn.sanity_function + def count(self): + return self._count + + @rfm.run_before('run') + def write_count(self): + self.executable_opts = [str(self.count), '> out.txt'] + + +# NOTE: The order of the tests here should not be topologically sorted + + +@rfm.simple_test +class T0(BaseTest): + pass + + +@rfm.simple_test +class T1(BaseTest): + def __init__(self): + super().__init__() + self.depends_on('T4') + self.depends_on('T5') + self.sanity_patterns = sn.assert_eq(self.count, 14) + + @rfm.require_deps + def prepend_output(self, T4, T5): + with open(os.path.join(T4().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + with open(os.path.join(T5().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + +@rfm.simple_test +class T2(BaseTest): + def __init__(self): + super().__init__() + self.depends_on('T6') + + # Make this test fail on purpose: expected value is 31 normally + self.sanity_patterns = sn.assert_eq(self.count, 30) + + @rfm.require_deps + def prepend_output(self, T6): + with open(os.path.join(T6().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + +@rfm.simple_test +class T3(T2): + def __init__(self): + super().__init__() + self.sanity_patterns = sn.assert_eq(self.count, 32) + + +@rfm.simple_test +class T4(BaseTest): + def __init__(self): + super().__init__() + self.depends_on('T0') + self.sanity_patterns = sn.assert_eq(self.count, 4) + + @rfm.require_deps + def prepend_output(self, T0): + with open(os.path.join(T0().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + +@rfm.simple_test +class T5(BaseTest): + def __init__(self): + super().__init__() + self.depends_on('T4') + self.sanity_patterns = sn.assert_eq(self.count, 9) + + @rfm.require_deps + def prepend_output(self, T4): + with open(os.path.join(T4().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + +@rfm.simple_test +class T6(BaseTest): + def __init__(self): + super().__init__() + self.depends_on('T1') + self.depends_on('T5') + self.sanity_patterns = sn.assert_eq(self.count, 29) + + @rfm.require_deps + def prepend_output(self, T1, T5): + with open(os.path.join(T1().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + with open(os.path.join(T5().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + +@rfm.simple_test +class T7(BaseTest): + def __init__(self): + super().__init__() + self.depends_on('T2') + self.sanity_patterns = sn.assert_eq(self.count, 38) + + @rfm.require_deps + def prepend_output(self, T2): + with open(os.path.join(T2().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) diff --git a/unittests/resources/checks_unlisted/dependencies/normal.py b/unittests/resources/checks_unlisted/deps_simple.py similarity index 100% rename from unittests/resources/checks_unlisted/dependencies/normal.py rename to unittests/resources/checks_unlisted/deps_simple.py diff --git a/unittests/test_policies.py b/unittests/test_policies.py index 39f7098b41..e6c217f835 100644 --- a/unittests/test_policies.py +++ b/unittests/test_policies.py @@ -13,7 +13,9 @@ import reframe.utility as util import reframe.utility.os_ext as os_ext from reframe.core.environments import Environment -from reframe.core.exceptions import DependencyError, JobNotStartedError +from reframe.core.exceptions import ( + DependencyError, JobNotStartedError, TaskDependencyError +) from reframe.frontend.loader import RegressionCheckLoader import unittests.fixtures as fixtures from unittests.resources.checks.hellocheck import HelloTest @@ -47,8 +49,13 @@ def setUp(self): def tearDown(self): os_ext.rmtree(rt.runtime().resources.prefix) - def runall(self, checks, *args, **kwargs): + def runall(self, checks, sort=False, *args, **kwargs): cases = executors.generate_testcases(checks, *args, **kwargs) + if sort: + depgraph = dependency.build_deps(cases) + dependency.validate_deps(depgraph) + cases = dependency.toposort(depgraph) + self.runner.runall(cases) def _num_failures_stage(self, stage): @@ -194,6 +201,37 @@ def test_pass_in_retries(self): self.assertEqual(0, len(self.runner.stats.failures())) os.remove(fp.name) + def test_dependencies(self): + self.loader = RegressionCheckLoader( + ['unittests/resources/checks_unlisted/deps_complex.py'] + ) + + # Setup the runner + self.checks = self.loader.load_all() + self.runall(self.checks, sort=True) + + stats = self.runner.stats + assert stats.num_cases(0) == 8 + assert len(stats.failures()) == 2 + for tf in stats.failures(): + check = tf.testcase.check + exc_type, exc_value, _ = tf.exc_info + if check.name == 'T7': + assert isinstance(exc_value, TaskDependencyError) + + # Check that cleanup is executed properly for successful tests as well + for t in stats.tasks(): + check = t.testcase.check + if t.failed: + continue + + if t.ref_count == 0: + assert os.path.exists(os.path.join(check.outputdir, 'out.txt')) + + def test_dependencies_with_retries(self): + self.runner._max_retries = 2 + self.test_dependencies() + class TaskEventMonitor(executors.TaskEventListener): '''Event listener for monitoring the execution of the asynchronous @@ -398,6 +436,10 @@ def test_poll_fails_busy_loop(self): self.assertEqual(num_tasks, stats.num_cases()) self.assertEqual(num_tasks, len(stats.failures())) + def test_dependencies(self): + pytest.skip('test with dependencies are not supported ' + 'by the asynchronous execution policy') + class TestDependencies(unittest.TestCase): class Node: @@ -437,6 +479,11 @@ def num_deps(graph, cname): return sum(len(deps) for c, deps in graph.items() if c.check.name == cname) + def in_degree(graph, node): + for v in graph.keys(): + if v == node: + return v.num_dependents + def find_check(name, checks): for c in checks: if c.name == name: @@ -451,7 +498,7 @@ def find_case(cname, ename, cases): def setUp(self): self.loader = RegressionCheckLoader([ - 'unittests/resources/checks_unlisted/dependencies/normal.py' + 'unittests/resources/checks_unlisted/deps_simple.py' ]) # Set runtime prefix @@ -479,6 +526,7 @@ def test_build_deps(self): Node = TestDependencies.Node has_edge = TestDependencies.has_edge num_deps = TestDependencies.num_deps + in_degree = TestDependencies.in_degree find_check = TestDependencies.find_check find_case = TestDependencies.find_case @@ -528,6 +576,22 @@ def test_build_deps(self): Node('Test1_exact', p, 'e1'), Node('Test0', p, 'e1')) + # Check in-degree of Test0 + + # 2 from Test1_fully, + # 1 from Test1_by_env, + # 1 from Test1_exact, + # 1 from Test1_default + assert in_degree(deps, Node('Test0', 'sys0:p0', 'e0')) == 5 + assert in_degree(deps, Node('Test0', 'sys0:p1', 'e0')) == 5 + + # 2 from Test1_fully, + # 1 from Test1_by_env, + # 2 from Test1_exact, + # 1 from Test1_default + assert in_degree(deps, Node('Test0', 'sys0:p0', 'e1')) == 6 + assert in_degree(deps, Node('Test0', 'sys0:p1', 'e1')) == 6 + # Pick a check to test getdep() check_e0 = find_case('Test1_exact', 'e0', cases).check check_e1 = find_case('Test1_exact', 'e1', cases).check @@ -716,6 +780,39 @@ def test_cyclic_deps_by_env(self): def test_validate_deps_empty(self): dependency.validate_deps({}) + def assert_topological_order(self, cases, graph): + cases_order = [] + visited_tests = set() + tests = util.OrderedSet() + for c in cases: + check, part, env = c + cases_order.append((check.name, part.fullname, env.name)) + tests.add(check.name) + visited_tests.add(check.name) + + # Assert that all dependencies of c have been visited before + for d in graph[c]: + if d not in cases: + # dependency points outside the subgraph + continue + + assert d.check.name in visited_tests + + # Check the order of systems and prog. environments + # We are checking against all possible orderings + valid_orderings = [] + for partitions in itertools.permutations(['sys0:p0', 'sys0:p1']): + for environs in itertools.permutations(['e0', 'e1']): + ordering = [] + for t in tests: + for p in partitions: + for e in environs: + ordering.append((t, p, e)) + + valid_orderings.append(ordering) + + assert cases_order in valid_orderings + @rt.switch_runtime(fixtures.TEST_SITE_CONFIG, 'sys0') def test_toposort(self): # @@ -752,30 +849,37 @@ def test_toposort(self): t5, t6, t7, t8]) ) cases = dependency.toposort(deps) - cases_order = [] - tests = util.OrderedSet() - visited_tests = set() - for c in cases: - check, part, env = c - cases_order.append((check.name, part.fullname, env.name)) - tests.add(check.name) - visited_tests.add(check.name) - - # Assert that all dependencies of c have been visited before - for d in deps[c]: - assert d.check.name in visited_tests + self.assert_topological_order(cases, deps) - # Check the order of systems and prog. environments - # We are checking against all possible orderings - valid_orderings = [] - for partitions in itertools.permutations(['sys0:p0', 'sys0:p1']): - for environs in itertools.permutations(['e0', 'e1']): - ordering = [] - for t in tests: - for p in partitions: - for e in environs: - ordering.append((t, p, e)) - - valid_orderings.append(ordering) - - assert cases_order in valid_orderings + @rt.switch_runtime(fixtures.TEST_SITE_CONFIG, 'sys0') + def test_toposort_subgraph(self): + # + # t0 + # ^ + # | + # +-->t1<--+ + # | | + # t2<------t3 + # ^ ^ + # | | + # +---t4---+ + # + t0 = self.create_test('t0') + t1 = self.create_test('t1') + t2 = self.create_test('t2') + t3 = self.create_test('t3') + t4 = self.create_test('t4') + t1.depends_on('t0') + t2.depends_on('t1') + t3.depends_on('t1') + t3.depends_on('t2') + t4.depends_on('t2') + t4.depends_on('t3') + full_deps = dependency.build_deps( + executors.generate_testcases([t0, t1, t2, t3, t4]) + ) + partial_deps = dependency.build_deps( + executors.generate_testcases([t3, t4]), full_deps + ) + cases = dependency.toposort(partial_deps, is_subgraph=True) + self.assert_topological_order(cases, partial_deps)