Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions reframe/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ class TaskExit(ReframeError):
'''Raised when a regression task must exit the pipeline prematurely.'''


class TaskDependencyError(ReframeError):
'''Raised inside a regression task when one of its dependencies has
failed.'''


class AbortTaskError(ReframeError):
'''Raised into a regression task to denote that it has been aborted due to
an external reason (e.g., keyboard interrupt, fatal error in other places
Expand Down
10 changes: 7 additions & 3 deletions reframe/frontend/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import reframe.core.runtime as runtime
import reframe.frontend.argparse as argparse
import reframe.frontend.check_filters as filters
import reframe.frontend.dependency as dependency
import reframe.utility.os_ext as os_ext
from reframe.core.exceptions import (EnvironError, ConfigError, ReframeError,
ReframeFatalError, format_exception,
Expand Down Expand Up @@ -493,14 +494,15 @@ def main():
for p in rt.system.partitions
for e in p.environs if re.match(env_patt, e.name)}

# Generate the test cases
# Generate the test cases, validate dependencies and sort them
checks_matched = list(checks_matched)
testcases = generate_testcases(checks_matched,
options.skip_system_check,
options.skip_prgenv_check,
allowed_environs)

# Act on checks
testgraph = dependency.build_deps(testcases)
dependency.validate_deps(testgraph)
testcases = dependency.toposort(testgraph)

# Unload regression's module and load user-specified modules
if hasattr(settings, 'reframe_module'):
Expand Down Expand Up @@ -532,6 +534,8 @@ def main():
"Skipping..." % m)
printer.debug(str(e))

# Act on checks

success = True
if options.list:
# List matched checks
Expand Down
100 changes: 74 additions & 26 deletions reframe/frontend/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,60 @@
from reframe.core.exceptions import DependencyError


def build_deps(cases):
def build_deps(cases, default_cases=None):
'''Build dependency graph from test cases.

The graph is represented as an adjacency list in a Python dictionary
holding test cases. The dependency information is also encoded inside each
test cases.
test case.
'''

# Index cases for quick access
cases_by_part = {}
cases_revmap = {}
for c in cases:
cname = c.check.name
pname = c.partition.fullname
ename = c.environ.name
cases_by_part.setdefault((cname, pname), [])
cases_revmap.setdefault((cname, pname, ename), None)
cases_by_part[cname, pname].append(c)
cases_revmap[cname, pname, ename] = c
def build_partition_index(cases):
if cases is None:
return {}

ret = {}
for c in cases:
cname, pname = c.check.name, c.partition.fullname
ret.setdefault((cname, pname), [])
ret[cname, pname].append(c)

return ret

def build_cases_index(cases):
if cases is None:
return {}

ret = {}
for c in cases:
cname = c.check.name
pname = c.partition.fullname
ename = c.environ.name
ret.setdefault((cname, pname, ename), c)

return ret

def resolve_dep(target, from_map, *args):
errmsg = 'could not resolve dependency: %s' % target
def resolve_dep(target, from_map, fallback_map, *args):
errmsg = 'could not resolve dependency: %s -> %s' % (target, args)
try:
ret = from_map[args]
except KeyError:
# try to resolve the dependency in the fallback map
try:
ret = fallback_map[args]
except KeyError:
raise DependencyError(errmsg) from None

if not ret:
raise DependencyError(errmsg)
else:
if not ret:
raise DependencyError(errmsg)

return ret
return ret

cases_by_part = build_partition_index(cases)
cases_revmap = build_cases_index(cases)
default_cases_by_part = build_partition_index(default_cases)
default_cases_revmap = build_cases_index(default_cases)

# NOTE on variable names
#
Expand All @@ -59,21 +82,31 @@ def resolve_dep(target, from_map, *args):
for dep in c.check.user_deps():
tname, how, subdeps = dep
if how == rfm.DEPEND_FULLY:
c.deps.extend(resolve_dep(c, cases_by_part, tname, pname))
c.deps.extend(resolve_dep(c, cases_by_part,
default_cases_by_part, tname, pname))
elif how == rfm.DEPEND_BY_ENV:
c.deps.append(resolve_dep(c, cases_revmap,
tname, pname, ename))
c.deps.append(
resolve_dep(c, cases_revmap, default_cases_revmap,
tname, pname, ename)
)
elif how == rfm.DEPEND_EXACT:
for env, tenvs in subdeps.items():
if env != ename:
continue

for te in tenvs:
c.deps.append(resolve_dep(c, cases_revmap,
tname, pname, te))
c.deps.append(
resolve_dep(c, cases_revmap, default_cases_revmap,
tname, pname, te)
)

graph[c] = util.OrderedSet(c.deps)

# Calculate in-degree of each node
for u, adjacent in graph.items():
for v in adjacent:
v.in_degree += 1

return graph


Expand Down Expand Up @@ -136,18 +169,32 @@ def validate_deps(graph):
sources -= visited


def toposort(graph):
def toposort(graph, is_subgraph=False):
'''Return a list of the graph nodes topologically sorted.

If ``is_subgraph`` is ``True``, graph will by treated a subgraph, meaning
that any dangling edges will be ignored.
'''
test_deps = _reduce_deps(graph)
visited = util.OrderedSet()

def retrieve(d, key, default):
try:
return d[key]
except KeyError:
if is_subgraph:
return default
else:
raise

def visit(node, path):
# We assume an acyclic graph
assert node not in path

path.add(node)

# Do a DFS visit of all the adjacent nodes
for adj in test_deps[node]:
for adj in retrieve(test_deps, node, []):
if adj not in visited:
visit(adj, path)

Expand All @@ -166,4 +213,5 @@ def visit(node, path):
except KeyError:
cases_by_name[c.check.name] = [c]

return list(itertools.chain(*(cases_by_name[n] for n in visited)))
return list(itertools.chain(*(retrieve(cases_by_name, n, [])
for n in visited)))
25 changes: 22 additions & 3 deletions reframe/frontend/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import reframe.core.environments as env
import reframe.core.logging as logging
import reframe.core.runtime as runtime
import reframe.frontend.dependency as dependency
from reframe.core.exceptions import (AbortTaskError, JobNotStartedError,
ReframeFatalError, TaskExit)
from reframe.frontend.printer import PrettyPrinter
Expand All @@ -28,6 +29,9 @@ def __init__(self, check, partition, environ):
self.__check._case = weakref.ref(self)
self.__deps = []

# Incoming dependencies
self.in_degree = 0

def __iter__(self):
# Allow unpacking a test case with a single liner:
# c, p, e = case
Expand Down Expand Up @@ -66,6 +70,10 @@ def environ(self):
def deps(self):
return self.__deps

@property
def num_dependents(self):
return self.in_degree

def clone(self):
# Return a fresh clone, i.e., one based on the original check
return TestCase(self.__check_orig, self.__partition, self.__environ)
Expand Down Expand Up @@ -105,11 +113,15 @@ class RegressionTask:
def __init__(self, case, listeners=[]):
self._case = case
self._failed_stage = None
self._current_stage = None
self._current_stage = 'startup'
self._exc_info = (None, None, None)
self._environ = None
self._listeners = list(listeners)

# Reference count for dependent tests; safe to cleanup the test only
# if it is zero
self.ref_count = case.num_dependents

# Test case has finished, but has not been waited for yet
self.zombie = False

Expand Down Expand Up @@ -183,9 +195,11 @@ def sanity(self):
def performance(self):
self._safe_call(self.check.performance)

def finalize(self):
self._notify_listeners('on_task_success')

def cleanup(self, *args, **kwargs):
self._safe_call(self.check.cleanup, *args, **kwargs)
self._notify_listeners('on_task_success')

def fail(self, exc_info=None):
self._failed_stage = self._current_stage
Expand Down Expand Up @@ -288,7 +302,12 @@ def _retry_failed(self, cases):
'Retrying %d failed check(s) (retry %d/%d)' %
(num_failed_checks, rt.current_run, self._max_retries)
)
self._runall(t.testcase.clone() for t in failures)

# Clone failed cases and rebuild dependencies among them
failed_cases = [t.testcase.clone() for t in failures]
cases_graph = dependency.build_deps(failed_cases, cases)
failed_cases = dependency.toposort(cases_graph, is_subgraph=True)
self._runall(failed_cases)
failures = self._stats.failures()

def _runall(self, testcases):
Expand Down
52 changes: 42 additions & 10 deletions reframe/frontend/executors/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
import time
from datetime import datetime

from reframe.core.exceptions import TaskExit
from reframe.core.exceptions import (TaskDependencyError, TaskExit)
from reframe.core.logging import getlogger
from reframe.frontend.executors import (ExecutionPolicy, RegressionTask,
TaskEventListener, ABORT_REASONS)


class SerialExecutionPolicy(ExecutionPolicy):
class SerialExecutionPolicy(ExecutionPolicy, TaskEventListener):
def __init__(self):
super().__init__()
self._tasks = []
self._task_index = {}

# Tasks that have finished, but have not performed their cleanup phase
self._retired_tasks = []
self.task_listeners.append(self)

def runcase(self, case):
super().runcase(case)
Expand All @@ -22,10 +26,14 @@ def runcase(self, case):
'RUN', '%s on %s using %s' %
(check.name, partition.fullname, environ.name)
)
task = RegressionTask(case)
self._tasks.append(task)
task = RegressionTask(case, self.task_listeners)
self._task_index[case] = task
self.stats.add_task(task)
try:
# Do not run test if any of its dependencies has failed
if any(self._task_index[c].failed for c in case.deps):
raise TaskDependencyError('dependencies failed')

task.setup(partition, environ,
sched_flex_alloc_tasks=self.sched_flex_alloc_tasks,
sched_account=self.sched_account,
Expand All @@ -45,7 +53,8 @@ def runcase(self, case):
if not self.skip_performance_check:
task.performance()

task.cleanup(not self.keep_stage_files)
self._retired_tasks.append(task)
task.finalize()

except TaskExit:
return
Expand All @@ -58,6 +67,32 @@ def runcase(self, case):
self.printer.status('FAIL' if task.failed else 'OK',
task.check.info(), just='right')

def _cleanup_all(self):
for task in self._retired_tasks:
if task.ref_count == 0:
task.cleanup(not self.keep_stage_files)
self._retired_tasks.remove(task)

def on_task_run(self, task):
pass

def on_task_exit(self, task):
pass

def on_task_failure(self, task):
pass

def on_task_success(self, task):
# update reference count of dependencies
for c in task.testcase.deps:
self._task_index[c].ref_count -= 1

self._cleanup_all()

def exit(self):
# Clean up all remaining tasks
self._cleanup_all()


class PollRateFunction:
def __init__(self, min_rate, decay_time):
Expand Down Expand Up @@ -108,9 +143,6 @@ def __init__(self):
# Ready tasks to be executed per partition
self._ready_tasks = {}

# The tasks associated with the running checks
self._tasks = []

# Job limit per partition
self._max_jobs = {}

Expand Down Expand Up @@ -154,7 +186,6 @@ def runcase(self, case):
self._max_jobs.setdefault(partition.fullname, partition.max_jobs)

task = RegressionTask(case, self.task_listeners)
self._tasks.append(task)
self.stats.add_task(task)
try:
task.setup(partition, environ,
Expand Down Expand Up @@ -222,6 +253,7 @@ def _finalize_task(self, task):
if not self.skip_performance_check:
task.performance()

task.finalize()
task.cleanup(not self.keep_stage_files)

def _failall(self, cause):
Expand Down
Loading