From 6c9edf698cf1af75f7be2509b1919955c285fd05 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Sun, 27 Oct 2019 01:31:55 +0200 Subject: [PATCH 1/8] Support of test dependencies in serial execution policy --- reframe/core/exceptions.py | 5 +++ reframe/frontend/cli.py | 10 +++-- reframe/frontend/dependency.py | 9 +++- reframe/frontend/executors/__init__.py | 17 +++++++- reframe/frontend/executors/policies.py | 52 +++++++++++++++++----- unittests/test_policies.py | 60 +++++++++++++++++++++++++- 6 files changed, 135 insertions(+), 18 deletions(-) diff --git a/reframe/core/exceptions.py b/reframe/core/exceptions.py index e24ff14bbf..fe4c2b51fc 100644 --- a/reframe/core/exceptions.py +++ b/reframe/core/exceptions.py @@ -60,6 +60,11 @@ class TaskExit(ReframeError): '''Raised when a regression task must exit the pipeline prematurely.''' +class TaskDependencyError(ReframeError): + '''Raised inside a regression task when one of its dependencies has + failed.''' + + class AbortTaskError(ReframeError): '''Raised into a regression task to denote that it has been aborted due to an external reason (e.g., keyboard interrupt, fatal error in other places diff --git a/reframe/frontend/cli.py b/reframe/frontend/cli.py index 558dbb7b44..0c4862356b 100644 --- a/reframe/frontend/cli.py +++ b/reframe/frontend/cli.py @@ -12,6 +12,7 @@ import reframe.core.runtime as runtime import reframe.frontend.argparse as argparse import reframe.frontend.check_filters as filters +import reframe.frontend.dependency as dependency import reframe.utility.os_ext as os_ext from reframe.core.exceptions import (EnvironError, ConfigError, ReframeError, ReframeFatalError, format_exception, @@ -493,14 +494,15 @@ def main(): for p in rt.system.partitions for e in p.environs if re.match(env_patt, e.name)} - # Generate the test cases + # Generate the test cases, validate dependencies and sort them checks_matched = list(checks_matched) testcases = generate_testcases(checks_matched, options.skip_system_check, options.skip_prgenv_check, allowed_environs) - - # Act on checks + testgraph = dependency.build_deps(testcases) + dependency.validate_deps(testgraph) + testcases = dependency.toposort(testgraph) # Unload regression's module and load user-specified modules if hasattr(settings, 'reframe_module'): @@ -532,6 +534,8 @@ def main(): "Skipping..." % m) printer.debug(str(e)) + # Act on checks + success = True if options.list: # List matched checks diff --git a/reframe/frontend/dependency.py b/reframe/frontend/dependency.py index 1def1791b4..b42d2c8f81 100644 --- a/reframe/frontend/dependency.py +++ b/reframe/frontend/dependency.py @@ -15,7 +15,7 @@ def build_deps(cases): The graph is represented as an adjacency list in a Python dictionary holding test cases. The dependency information is also encoded inside each - test cases. + test case. ''' # Index cases for quick access @@ -31,7 +31,7 @@ def build_deps(cases): cases_revmap[cname, pname, ename] = c def resolve_dep(target, from_map, *args): - errmsg = 'could not resolve dependency: %s' % target + errmsg = 'could not resolve dependency: %s -> %s' % (target, args) try: ret = from_map[args] except KeyError: @@ -74,6 +74,11 @@ def resolve_dep(target, from_map, *args): graph[c] = util.OrderedSet(c.deps) + # Calculate in-degree of each node + for u, adjacent in graph.items(): + for v in adjacent: + v.in_degree += 1 + return graph diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index f1fbe6b996..c6d711524a 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -28,6 +28,9 @@ def __init__(self, check, partition, environ): self.__check._case = weakref.ref(self) self.__deps = [] + # Incoming dependencies + self.in_degree = 0 + def __iter__(self): # Allow unpacking a test case with a single liner: # c, p, e = case @@ -66,6 +69,10 @@ def environ(self): def deps(self): return self.__deps + @property + def num_dependents(self): + return self.in_degree + def clone(self): # Return a fresh clone, i.e., one based on the original check return TestCase(self.__check_orig, self.__partition, self.__environ) @@ -105,11 +112,15 @@ class RegressionTask: def __init__(self, case, listeners=[]): self._case = case self._failed_stage = None - self._current_stage = None + self._current_stage = 'startup' self._exc_info = (None, None, None) self._environ = None self._listeners = list(listeners) + # Reference count for dependent tests; safe to cleanup the test only + # if it is zero + self.ref_count = case.num_dependents + # Test case has finished, but has not been waited for yet self.zombie = False @@ -183,9 +194,11 @@ def sanity(self): def performance(self): self._safe_call(self.check.performance) + def finalize(self): + self._notify_listeners('on_task_success') + def cleanup(self, *args, **kwargs): self._safe_call(self.check.cleanup, *args, **kwargs) - self._notify_listeners('on_task_success') def fail(self, exc_info=None): self._failed_stage = self._current_stage diff --git a/reframe/frontend/executors/policies.py b/reframe/frontend/executors/policies.py index 417063ea7d..55f024d651 100644 --- a/reframe/frontend/executors/policies.py +++ b/reframe/frontend/executors/policies.py @@ -3,16 +3,20 @@ import time from datetime import datetime -from reframe.core.exceptions import TaskExit +from reframe.core.exceptions import (TaskDependencyError, TaskExit) from reframe.core.logging import getlogger from reframe.frontend.executors import (ExecutionPolicy, RegressionTask, TaskEventListener, ABORT_REASONS) -class SerialExecutionPolicy(ExecutionPolicy): +class SerialExecutionPolicy(ExecutionPolicy, TaskEventListener): def __init__(self): super().__init__() - self._tasks = [] + self._task_index = {} + + # Tasks that have finished, but have not performed their cleanup phase + self._retired_tasks = [] + self.task_listeners.append(self) def runcase(self, case): super().runcase(case) @@ -22,10 +26,14 @@ def runcase(self, case): 'RUN', '%s on %s using %s' % (check.name, partition.fullname, environ.name) ) - task = RegressionTask(case) - self._tasks.append(task) + task = RegressionTask(case, self.task_listeners) + self._task_index[case] = task self.stats.add_task(task) try: + # Do not run test if any of its dependencies has failed + if any(self._task_index[c].failed for c in case.deps): + raise TaskDependencyError('dependencies failed') + task.setup(partition, environ, sched_flex_alloc_tasks=self.sched_flex_alloc_tasks, sched_account=self.sched_account, @@ -45,7 +53,8 @@ def runcase(self, case): if not self.skip_performance_check: task.performance() - task.cleanup(not self.keep_stage_files) + self._retired_tasks.append(task) + task.finalize() except TaskExit: return @@ -58,6 +67,32 @@ def runcase(self, case): self.printer.status('FAIL' if task.failed else 'OK', task.check.info(), just='right') + def _cleanup_all(self): + for task in self._retired_tasks: + if task.ref_count == 0: + task.cleanup(not self.keep_stage_files) + self._retired_tasks.remove(task) + + def on_task_run(self, task): + pass + + def on_task_exit(self, task): + pass + + def on_task_failure(self, task): + pass + + def on_task_success(self, task): + # update reference count of dependencies + for c in task.testcase.deps: + self._task_index[c].ref_count -= 1 + + self._cleanup_all() + + def exit(self): + # Clean up all remaining tasks + self._cleanup_all() + class PollRateFunction: def __init__(self, min_rate, decay_time): @@ -108,9 +143,6 @@ def __init__(self): # Ready tasks to be executed per partition self._ready_tasks = {} - # The tasks associated with the running checks - self._tasks = [] - # Job limit per partition self._max_jobs = {} @@ -154,7 +186,6 @@ def runcase(self, case): self._max_jobs.setdefault(partition.fullname, partition.max_jobs) task = RegressionTask(case, self.task_listeners) - self._tasks.append(task) self.stats.add_task(task) try: task.setup(partition, environ, @@ -222,6 +253,7 @@ def _finalize_task(self, task): if not self.skip_performance_check: task.performance() + task.finalize() task.cleanup(not self.keep_stage_files) def _failall(self, cause): diff --git a/unittests/test_policies.py b/unittests/test_policies.py index 39f7098b41..a8c92e8bc9 100644 --- a/unittests/test_policies.py +++ b/unittests/test_policies.py @@ -13,7 +13,9 @@ import reframe.utility as util import reframe.utility.os_ext as os_ext from reframe.core.environments import Environment -from reframe.core.exceptions import DependencyError, JobNotStartedError +from reframe.core.exceptions import ( + DependencyError, JobNotStartedError, TaskDependencyError +) from reframe.frontend.loader import RegressionCheckLoader import unittests.fixtures as fixtures from unittests.resources.checks.hellocheck import HelloTest @@ -49,6 +51,9 @@ def tearDown(self): def runall(self, checks, *args, **kwargs): cases = executors.generate_testcases(checks, *args, **kwargs) + depgraph = dependency.build_deps(cases) + dependency.validate_deps(depgraph) + cases = dependency.toposort(depgraph) self.runner.runall(cases) def _num_failures_stage(self, stage): @@ -194,6 +199,33 @@ def test_pass_in_retries(self): self.assertEqual(0, len(self.runner.stats.failures())) os.remove(fp.name) + def test_dependencies(self): + self.loader = RegressionCheckLoader( + ['unittests/resources/checks_unlisted/test_with_deps.py'] + ) + + # Setup the runner + self.checks = self.loader.load_all() + self.runall(self.checks) + + stats = self.runner.stats + assert stats.num_cases() == 8 + assert len(stats.failures()) == 2 + for tf in stats.failures(): + check = tf.testcase.check + exc_type, exc_value, _ = tf.exc_info + if check.name == 'T7': + assert isinstance(exc_value, TaskDependencyError) + + # Check that cleanup is executed properly for successful tests as well + for t in stats.tasks(): + check = t.testcase.check + if t.failed: + continue + + if t.ref_count == 0: + assert os.path.exists(os.path.join(check.outputdir, 'out.txt')) + class TaskEventMonitor(executors.TaskEventListener): '''Event listener for monitoring the execution of the asynchronous @@ -398,6 +430,10 @@ def test_poll_fails_busy_loop(self): self.assertEqual(num_tasks, stats.num_cases()) self.assertEqual(num_tasks, len(stats.failures())) + def test_dependencies(self): + pytest.skip('test with dependencies are not supported ' + 'by the asynchronous execution policy') + class TestDependencies(unittest.TestCase): class Node: @@ -437,6 +473,11 @@ def num_deps(graph, cname): return sum(len(deps) for c, deps in graph.items() if c.check.name == cname) + def in_degree(graph, node): + for v in graph.keys(): + if v == node: + return v.num_dependents + def find_check(name, checks): for c in checks: if c.name == name: @@ -479,6 +520,7 @@ def test_build_deps(self): Node = TestDependencies.Node has_edge = TestDependencies.has_edge num_deps = TestDependencies.num_deps + in_degree = TestDependencies.in_degree find_check = TestDependencies.find_check find_case = TestDependencies.find_case @@ -528,6 +570,22 @@ def test_build_deps(self): Node('Test1_exact', p, 'e1'), Node('Test0', p, 'e1')) + # Check in-degree of Test0 + + # 2 from Test1_fully, + # 1 from Test1_by_env, + # 1 from Test1_exact, + # 1 from Test1_default + assert in_degree(deps, Node('Test0', 'sys0:p0', 'e0')) == 5 + assert in_degree(deps, Node('Test0', 'sys0:p1', 'e0')) == 5 + + # 2 from Test1_fully, + # 1 from Test1_by_env, + # 2 from Test1_exact, + # 1 from Test1_default + assert in_degree(deps, Node('Test0', 'sys0:p0', 'e1')) == 6 + assert in_degree(deps, Node('Test0', 'sys0:p1', 'e1')) == 6 + # Pick a check to test getdep() check_e0 = find_case('Test1_exact', 'e0', cases).check check_e1 = find_case('Test1_exact', 'e1', cases).check From 621b498fae0f33c844f7df7a40f9b4a30a0302d1 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Mon, 28 Oct 2019 01:02:12 +0100 Subject: [PATCH 2/8] Add missing file --- .../checks_unlisted/test_with_deps.py | 145 ++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100644 unittests/resources/checks_unlisted/test_with_deps.py diff --git a/unittests/resources/checks_unlisted/test_with_deps.py b/unittests/resources/checks_unlisted/test_with_deps.py new file mode 100644 index 0000000000..c12fce486a --- /dev/null +++ b/unittests/resources/checks_unlisted/test_with_deps.py @@ -0,0 +1,145 @@ +import os +import reframe as rfm +import reframe.utility.sanity as sn +from reframe.core.deferrable import make_deferrable + + +# +# t0 +# ^ +# | +# +-->t4<--+ +# | | +# t5<------t1 +# ^ ^ +# | | +# +---t6---+ +# ^ +# | +# +<--t2<--t7 +# ^ +# | +# t3 + + +class BaseTest(rfm.RunOnlyRegressionTest): + def __init__(self): + self.valid_systems = ['*'] + self.valid_prog_environs = ['*'] + self.sourcesdir = None + self.executable = 'echo' + self._count = int(type(self).__name__[1:]) + self.sanity_patterns = make_deferrable(True) + self.keep_files = ['out.txt'] + + @property + @sn.sanity_function + def count(self): + return self._count + + @rfm.run_before('run') + def write_count(self): + self.executable_opts = [str(self.count), '> out.txt'] + + +# NOTE: The order of the tests here should not be topologically sorted + + +@rfm.simple_test +class T0(BaseTest): + pass + + +@rfm.simple_test +class T1(BaseTest): + def __init__(self): + super().__init__() + self.depends_on('T4') + self.depends_on('T5') + self.sanity_patterns = sn.assert_eq(self.count, 14) + + @rfm.require_deps + def prepend_output(self, T4, T5): + with open(os.path.join(T4().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + with open(os.path.join(T5().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + +@rfm.simple_test +class T2(BaseTest): + def __init__(self): + super().__init__() + self.depends_on('T6') + + # Make this test fail on purpose: expected value is 31 normally + self.sanity_patterns = sn.assert_eq(self.count, 30) + + @rfm.require_deps + def prepend_output(self, T6): + with open(os.path.join(T6().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + +@rfm.simple_test +class T3(T2): + def __init__(self): + super().__init__() + self.sanity_patterns = sn.assert_eq(self.count, 32) + + +@rfm.simple_test +class T4(BaseTest): + def __init__(self): + super().__init__() + self.depends_on('T0') + self.sanity_patterns = sn.assert_eq(self.count, 4) + + @rfm.require_deps + def prepend_output(self, T0): + with open(os.path.join(T0().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + +@rfm.simple_test +class T5(BaseTest): + def __init__(self): + super().__init__() + self.depends_on('T4') + self.sanity_patterns = sn.assert_eq(self.count, 9) + + @rfm.require_deps + def prepend_output(self, T4): + with open(os.path.join(T4().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + +@rfm.simple_test +class T6(BaseTest): + def __init__(self): + super().__init__() + self.depends_on('T1') + self.depends_on('T5') + self.sanity_patterns = sn.assert_eq(self.count, 29) + + @rfm.require_deps + def prepend_output(self, T1, T5): + with open(os.path.join(T1().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + with open(os.path.join(T5().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) + + +@rfm.simple_test +class T7(BaseTest): + def __init__(self): + super().__init__() + self.depends_on('T2') + self.sanity_patterns = sn.assert_eq(self.count, 38) + + @rfm.require_deps + def prepend_output(self, T2): + with open(os.path.join(T2().stagedir, 'out.txt')) as fp: + self._count += int(fp.read()) From d9a3ee8cdf3ac9c44348a988797b54a980a2ae35 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Mon, 28 Oct 2019 10:32:10 +0100 Subject: [PATCH 3/8] Fix unit test failure with Python 3.5 Order of tests is important for some of the async policy tests. Sort the cases topologically only if needed. --- unittests/test_policies.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/unittests/test_policies.py b/unittests/test_policies.py index a8c92e8bc9..72874eed32 100644 --- a/unittests/test_policies.py +++ b/unittests/test_policies.py @@ -49,11 +49,13 @@ def setUp(self): def tearDown(self): os_ext.rmtree(rt.runtime().resources.prefix) - def runall(self, checks, *args, **kwargs): + def runall(self, checks, sort=False, *args, **kwargs): cases = executors.generate_testcases(checks, *args, **kwargs) - depgraph = dependency.build_deps(cases) - dependency.validate_deps(depgraph) - cases = dependency.toposort(depgraph) + if sort: + depgraph = dependency.build_deps(cases) + dependency.validate_deps(depgraph) + cases = dependency.toposort(depgraph) + self.runner.runall(cases) def _num_failures_stage(self, stage): @@ -206,7 +208,7 @@ def test_dependencies(self): # Setup the runner self.checks = self.loader.load_all() - self.runall(self.checks) + self.runall(self.checks, sort=True) stats = self.runner.stats assert stats.num_cases() == 8 From ee48478553290431636663e53ff8363a6673c5f6 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Mon, 28 Oct 2019 20:30:53 +0100 Subject: [PATCH 4/8] Fix unit tests failure on Daint - Renamed an additional unit test resource file --- .../checks_unlisted/{test_with_deps.py => deps_complex.py} | 0 .../checks_unlisted/{dependencies/normal.py => deps_simple.py} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename unittests/resources/checks_unlisted/{test_with_deps.py => deps_complex.py} (100%) rename unittests/resources/checks_unlisted/{dependencies/normal.py => deps_simple.py} (100%) diff --git a/unittests/resources/checks_unlisted/test_with_deps.py b/unittests/resources/checks_unlisted/deps_complex.py similarity index 100% rename from unittests/resources/checks_unlisted/test_with_deps.py rename to unittests/resources/checks_unlisted/deps_complex.py diff --git a/unittests/resources/checks_unlisted/dependencies/normal.py b/unittests/resources/checks_unlisted/deps_simple.py similarity index 100% rename from unittests/resources/checks_unlisted/dependencies/normal.py rename to unittests/resources/checks_unlisted/deps_simple.py From ef48c4ee4f21f56068065420b908ac1b89aa7ea8 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Mon, 28 Oct 2019 20:37:48 +0100 Subject: [PATCH 5/8] Commit omitted changes --- unittests/test_policies.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/unittests/test_policies.py b/unittests/test_policies.py index 72874eed32..cfce321539 100644 --- a/unittests/test_policies.py +++ b/unittests/test_policies.py @@ -203,7 +203,7 @@ def test_pass_in_retries(self): def test_dependencies(self): self.loader = RegressionCheckLoader( - ['unittests/resources/checks_unlisted/test_with_deps.py'] + ['unittests/resources/checks_unlisted/deps_complex.py'] ) # Setup the runner @@ -494,7 +494,7 @@ def find_case(cname, ename, cases): def setUp(self): self.loader = RegressionCheckLoader([ - 'unittests/resources/checks_unlisted/dependencies/normal.py' + 'unittests/resources/checks_unlisted/deps_simple.py' ]) # Set runtime prefix From efb7883a5ab8dfb8423f8c793cb9aaf6d950fefd Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Mon, 28 Oct 2019 22:08:45 +0100 Subject: [PATCH 6/8] Add further comments in checks with dependencies for unit tests. --- unittests/resources/checks_unlisted/deps_complex.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/unittests/resources/checks_unlisted/deps_complex.py b/unittests/resources/checks_unlisted/deps_complex.py index c12fce486a..5f58525483 100644 --- a/unittests/resources/checks_unlisted/deps_complex.py +++ b/unittests/resources/checks_unlisted/deps_complex.py @@ -3,7 +3,9 @@ import reframe.utility.sanity as sn from reframe.core.deferrable import make_deferrable - +# +# The following tests implement the dependency graph below: +# # # t0 # ^ @@ -20,6 +22,12 @@ # ^ # | # t3 +# +# +# Each test has an id, which is the digit in its name and it produces its +# output in the 'out.txt' file. Each test sums up its own id with the output +# produced by its parents and writes in its output file. +# class BaseTest(rfm.RunOnlyRegressionTest): From 84e074afff630d246e8a2d5cc3bddbc45866669c Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Tue, 29 Oct 2019 23:01:21 +0100 Subject: [PATCH 7/8] Support retries with test dependencies --- reframe/frontend/dependency.py | 87 +++++++++++++++++------ reframe/frontend/executors/__init__.py | 8 ++- unittests/test_policies.py | 98 +++++++++++++++++++------- 3 files changed, 142 insertions(+), 51 deletions(-) diff --git a/reframe/frontend/dependency.py b/reframe/frontend/dependency.py index b42d2c8f81..a43906e1cd 100644 --- a/reframe/frontend/dependency.py +++ b/reframe/frontend/dependency.py @@ -10,7 +10,7 @@ from reframe.core.exceptions import DependencyError -def build_deps(cases): +def build_deps(cases, default_cases=None): '''Build dependency graph from test cases. The graph is represented as an adjacency list in a Python dictionary @@ -19,28 +19,51 @@ def build_deps(cases): ''' # Index cases for quick access - cases_by_part = {} - cases_revmap = {} - for c in cases: - cname = c.check.name - pname = c.partition.fullname - ename = c.environ.name - cases_by_part.setdefault((cname, pname), []) - cases_revmap.setdefault((cname, pname, ename), None) - cases_by_part[cname, pname].append(c) - cases_revmap[cname, pname, ename] = c + def build_partition_index(cases): + if cases is None: + return {} + + ret = {} + for c in cases: + cname, pname = c.check.name, c.partition.fullname + ret.setdefault((cname, pname), []) + ret[cname, pname].append(c) + + return ret + + def build_cases_index(cases): + if cases is None: + return {} + + ret = {} + for c in cases: + cname = c.check.name + pname = c.partition.fullname + ename = c.environ.name + ret.setdefault((cname, pname, ename), c) - def resolve_dep(target, from_map, *args): + return ret + + def resolve_dep(target, from_map, fallback_map, *args): errmsg = 'could not resolve dependency: %s -> %s' % (target, args) try: ret = from_map[args] except KeyError: + # try to resolve the dependency in the fallback map + try: + ret = fallback_map[args] + except KeyError: + raise DependencyError(errmsg) from None + + if not ret: raise DependencyError(errmsg) - else: - if not ret: - raise DependencyError(errmsg) - return ret + return ret + + cases_by_part = build_partition_index(cases) + cases_revmap = build_cases_index(cases) + default_cases_by_part = build_partition_index(default_cases) + default_cases_revmap = build_cases_index(default_cases) # NOTE on variable names # @@ -59,9 +82,10 @@ def resolve_dep(target, from_map, *args): for dep in c.check.user_deps(): tname, how, subdeps = dep if how == rfm.DEPEND_FULLY: - c.deps.extend(resolve_dep(c, cases_by_part, tname, pname)) + c.deps.extend(resolve_dep(c, cases_by_part, + default_cases_by_part, tname, pname)) elif how == rfm.DEPEND_BY_ENV: - c.deps.append(resolve_dep(c, cases_revmap, + c.deps.append(resolve_dep(c, cases_revmap, default_cases_revmap, tname, pname, ename)) elif how == rfm.DEPEND_EXACT: for env, tenvs in subdeps.items(): @@ -69,8 +93,10 @@ def resolve_dep(target, from_map, *args): continue for te in tenvs: - c.deps.append(resolve_dep(c, cases_revmap, - tname, pname, te)) + c.deps.append( + resolve_dep(c, cases_revmap, default_cases_revmap, + tname, pname, te) + ) graph[c] = util.OrderedSet(c.deps) @@ -141,10 +167,24 @@ def validate_deps(graph): sources -= visited -def toposort(graph): +def toposort(graph, is_subgraph=False): + '''Return a list of the graph nodes topologically sorted. + + If ``is_subgraph`` is ``True``, graph will by treated a subgraph, meaning + that any dangling edges will be ignored. + ''' test_deps = _reduce_deps(graph) visited = util.OrderedSet() + def retrieve(d, key, default): + try: + return d[key] + except KeyError: + if is_subgraph: + return default + else: + raise + def visit(node, path): # We assume an acyclic graph assert node not in path @@ -152,7 +192,7 @@ def visit(node, path): path.add(node) # Do a DFS visit of all the adjacent nodes - for adj in test_deps[node]: + for adj in retrieve(test_deps, node, []): if adj not in visited: visit(adj, path) @@ -171,4 +211,5 @@ def visit(node, path): except KeyError: cases_by_name[c.check.name] = [c] - return list(itertools.chain(*(cases_by_name[n] for n in visited))) + return list(itertools.chain(*(retrieve(cases_by_name, n, []) + for n in visited))) diff --git a/reframe/frontend/executors/__init__.py b/reframe/frontend/executors/__init__.py index c6d711524a..6f4b07f007 100644 --- a/reframe/frontend/executors/__init__.py +++ b/reframe/frontend/executors/__init__.py @@ -7,6 +7,7 @@ import reframe.core.environments as env import reframe.core.logging as logging import reframe.core.runtime as runtime +import reframe.frontend.dependency as dependency from reframe.core.exceptions import (AbortTaskError, JobNotStartedError, ReframeFatalError, TaskExit) from reframe.frontend.printer import PrettyPrinter @@ -301,7 +302,12 @@ def _retry_failed(self, cases): 'Retrying %d failed check(s) (retry %d/%d)' % (num_failed_checks, rt.current_run, self._max_retries) ) - self._runall(t.testcase.clone() for t in failures) + + # Clone failed cases and rebuild dependencies among them + failed_cases = [t.testcase.clone() for t in failures] + cases_graph = dependency.build_deps(failed_cases, cases) + failed_cases = dependency.toposort(cases_graph, is_subgraph=True) + self._runall(failed_cases) failures = self._stats.failures() def _runall(self, testcases): diff --git a/unittests/test_policies.py b/unittests/test_policies.py index cfce321539..e6c217f835 100644 --- a/unittests/test_policies.py +++ b/unittests/test_policies.py @@ -211,7 +211,7 @@ def test_dependencies(self): self.runall(self.checks, sort=True) stats = self.runner.stats - assert stats.num_cases() == 8 + assert stats.num_cases(0) == 8 assert len(stats.failures()) == 2 for tf in stats.failures(): check = tf.testcase.check @@ -228,6 +228,10 @@ def test_dependencies(self): if t.ref_count == 0: assert os.path.exists(os.path.join(check.outputdir, 'out.txt')) + def test_dependencies_with_retries(self): + self.runner._max_retries = 2 + self.test_dependencies() + class TaskEventMonitor(executors.TaskEventListener): '''Event listener for monitoring the execution of the asynchronous @@ -776,6 +780,39 @@ def test_cyclic_deps_by_env(self): def test_validate_deps_empty(self): dependency.validate_deps({}) + def assert_topological_order(self, cases, graph): + cases_order = [] + visited_tests = set() + tests = util.OrderedSet() + for c in cases: + check, part, env = c + cases_order.append((check.name, part.fullname, env.name)) + tests.add(check.name) + visited_tests.add(check.name) + + # Assert that all dependencies of c have been visited before + for d in graph[c]: + if d not in cases: + # dependency points outside the subgraph + continue + + assert d.check.name in visited_tests + + # Check the order of systems and prog. environments + # We are checking against all possible orderings + valid_orderings = [] + for partitions in itertools.permutations(['sys0:p0', 'sys0:p1']): + for environs in itertools.permutations(['e0', 'e1']): + ordering = [] + for t in tests: + for p in partitions: + for e in environs: + ordering.append((t, p, e)) + + valid_orderings.append(ordering) + + assert cases_order in valid_orderings + @rt.switch_runtime(fixtures.TEST_SITE_CONFIG, 'sys0') def test_toposort(self): # @@ -812,30 +849,37 @@ def test_toposort(self): t5, t6, t7, t8]) ) cases = dependency.toposort(deps) - cases_order = [] - tests = util.OrderedSet() - visited_tests = set() - for c in cases: - check, part, env = c - cases_order.append((check.name, part.fullname, env.name)) - tests.add(check.name) - visited_tests.add(check.name) - - # Assert that all dependencies of c have been visited before - for d in deps[c]: - assert d.check.name in visited_tests - - # Check the order of systems and prog. environments - # We are checking against all possible orderings - valid_orderings = [] - for partitions in itertools.permutations(['sys0:p0', 'sys0:p1']): - for environs in itertools.permutations(['e0', 'e1']): - ordering = [] - for t in tests: - for p in partitions: - for e in environs: - ordering.append((t, p, e)) - - valid_orderings.append(ordering) + self.assert_topological_order(cases, deps) - assert cases_order in valid_orderings + @rt.switch_runtime(fixtures.TEST_SITE_CONFIG, 'sys0') + def test_toposort_subgraph(self): + # + # t0 + # ^ + # | + # +-->t1<--+ + # | | + # t2<------t3 + # ^ ^ + # | | + # +---t4---+ + # + t0 = self.create_test('t0') + t1 = self.create_test('t1') + t2 = self.create_test('t2') + t3 = self.create_test('t3') + t4 = self.create_test('t4') + t1.depends_on('t0') + t2.depends_on('t1') + t3.depends_on('t1') + t3.depends_on('t2') + t4.depends_on('t2') + t4.depends_on('t3') + full_deps = dependency.build_deps( + executors.generate_testcases([t0, t1, t2, t3, t4]) + ) + partial_deps = dependency.build_deps( + executors.generate_testcases([t3, t4]), full_deps + ) + cases = dependency.toposort(partial_deps, is_subgraph=True) + self.assert_topological_order(cases, partial_deps) From d6f6e821ba59b572bca8eebe6686c2968f613c55 Mon Sep 17 00:00:00 2001 From: Vasileios Karakasis Date: Wed, 30 Oct 2019 00:42:39 +0100 Subject: [PATCH 8/8] Fix PEP8 issue --- reframe/frontend/dependency.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/reframe/frontend/dependency.py b/reframe/frontend/dependency.py index a43906e1cd..aa4a9e71de 100644 --- a/reframe/frontend/dependency.py +++ b/reframe/frontend/dependency.py @@ -85,8 +85,10 @@ def resolve_dep(target, from_map, fallback_map, *args): c.deps.extend(resolve_dep(c, cases_by_part, default_cases_by_part, tname, pname)) elif how == rfm.DEPEND_BY_ENV: - c.deps.append(resolve_dep(c, cases_revmap, default_cases_revmap, - tname, pname, ename)) + c.deps.append( + resolve_dep(c, cases_revmap, default_cases_revmap, + tname, pname, ename) + ) elif how == rfm.DEPEND_EXACT: for env, tenvs in subdeps.items(): if env != ename: