Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ci-scripts/ci-runner.bash
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,14 @@ checked_exec()

run_user_checks()
{
cmd="python reframe.py --prefix . --notimestamp -r -t production $@"
cmd="./bin/reframe --exec-policy=async -r -t production $@"
echo "Running user checks with \`$cmd'"
checked_exec $cmd
}

run_serial_user_checks()
{
cmd="./bin/reframe --exec-policy=serial -r -t production-serial $@"
echo "Running user checks with \`$cmd'"
checked_exec $cmd
}
Expand Down Expand Up @@ -194,6 +201,7 @@ if [ ${#userchecks[@]} -ne 0 ]; then
#
for i in ${!invocations[@]}; do
run_user_checks ${userchecks_path} ${invocations[i]}
run_serial_user_checks ${userchecks_path} ${invocations[i]}
done
fi

Expand Down
19 changes: 19 additions & 0 deletions reframe/core/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ def __set__(self, obj, value):
obj.__dict__[self.name] = value


class ForwardField(object):
"""Simple field that forwards set/get to a target object."""
def __init__(self, obj, attr):
self.target = obj
self.attr = attr

def __get__(self, obj, objtype):
return self.target.__dict__[self.attr]


def __set__(self, obj, value):
self.target.__dict__[self.attr] = value


class TypedField(Field):
"""Stores a field of predefined type"""
def __init__(self, fieldname, fieldtype, allow_none = False):
Expand Down Expand Up @@ -354,6 +368,11 @@ def __init__(self, mapping={}, scope_sep=':', global_scope='*'):
self.global_scope = global_scope


def __str__(self):
# just print the internal dictionary
return str(self.scopes)


def _check_scope_type(self, key, value):
if not isinstance(key, str):
raise TypeError('scope keys in a scoped dict must be strings')
Expand Down
100 changes: 66 additions & 34 deletions reframe/core/launchers.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,86 @@
from math import ceil


class JobLauncher:
def __init__(self, job, options):
self.job = job
def __init__(self, job, options=[]):
self.job = job
self.options = options

def emit_run_command(self, cmd, builder, **builder_opts):
@property
def executable(self):
raise NotImplementedError('Attempt to call an abstract method')

@property
def fixed_options(self):
return []

class LocalLauncher(JobLauncher):
def __init__(self, job, options = []):
super().__init__(job, options)

def emit_run_command(self, cmd, builder, **builder_opts):
# Just emit the command
return builder.verbatim(cmd, **builder_opts)
def emit_run_command(self, target_executable, builder, **builder_opts):
options = ' '.join(self.fixed_options + self.options)
return builder.verbatim('%s %s %s' % \
(self.executable, options, target_executable),
**builder_opts)


class NativeSlurmLauncher(JobLauncher):
def __init__(self, job, options = []):
super().__init__(job, options)
self.launcher = 'srun %s' % (' '.join(self.options))


def emit_run_command(self, cmd, builder, **builder_opts):
return builder.verbatim('%s %s' % (self.launcher, cmd), **builder_opts)
@property
def executable(self):
return 'srun'


class AlpsLauncher(JobLauncher):
def __init__(self, job, options = []):
super().__init__(job, options)
self.launcher = 'aprun -B %s' % (' '.join(self.options))
@property
def executable(self):
return 'aprun'

def emit_run_command(self, cmd, builder, **builder_opts):
return builder.verbatim('%s %s' % (self.launcher, cmd), **builder_opts)
@property
def fixed_options(self):
return [ '-B' ]


class LauncherWrapper(JobLauncher):
"""
Wraps a launcher object so that you can modify the launcher's invocation
"""
def __init__(self, launcher, wrapper_cmd, wrapper_options = []):
self.launcher = launcher
self.wrapper = wrapper_cmd
"""Wrap a launcher object so that its invocation may be modified."""
def __init__(self, target_launcher, wrapper_command, wrapper_options=[]):
super().__init__(target_launcher.job, target_launcher.options)
self.target_launcher = target_launcher
self.wrapper_command = wrapper_command
self.wrapper_options = wrapper_options

@property
def executable(self):
return self.wrapper_command

@property
def fixed_options(self):
return self.wrapper_options + [ self.target_launcher.executable ] + \
self.target_launcher.fixed_options


class LocalLauncher(JobLauncher):
def emit_run_command(self, cmd, builder, **builder_opts):
# Suppress the output of the wrapped launcher in the builder
launcher_cmd = self.launcher.emit_run_command(cmd, builder,
suppress=True)
return builder.verbatim(
'%s %s %s' % (self.wrapper, ' '.join(self.wrapper_options),
launcher_cmd), **builder_opts)
# Just emit the command
return builder.verbatim(cmd, **builder_opts)


class VisitLauncher(JobLauncher):
def __init__(self, job, options=[]):
super().__init__(job, options)
if self.job:
# The self.job.launcher must be stored at the moment of the
# VisitLauncher construction, because the user will afterwards set
# the newly created VisitLauncher as new self.job.launcher!
self.target_launcher = self.job.launcher

@property
def executable(self):
return 'visit'

@property
def fixed_options(self):
options = []
if self.target_launcher and \
not isinstance(self.target_launcher, LocalLauncher):
num_nodes = ceil(self.job.num_tasks/self.job.num_tasks_per_node)
options.append('-np %s' % self.job.num_tasks)
options.append('-nn %s' % num_nodes)
options.append('-l %s' % self.target_launcher.executable)
return options
53 changes: 43 additions & 10 deletions reframe/core/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class RegressionTest(object):
use_multithreading = BooleanField('use_multithreading', allow_none=True)
local = BooleanField('local')
prefix = StringField('prefix')
sourcesdir = StringField('sourcesdir')
sourcesdir = StringField('sourcesdir', allow_none=True)
stagedir = StringField('stagedir', allow_none=True)
stdout = StringField('stdout', allow_none=True)
stderr = StringField('stderr', allow_none=True)
Expand Down Expand Up @@ -171,6 +171,13 @@ def is_local(self):
return self.local or self.current_partition.scheduler == 'local'


def _sanitize_basename(self, name):
"""Create a basename safe to be used as path component

Replace all path separator characters in `name` with underscores."""
return name.replace(os.sep, '_')


def _setup_environ(self, environ):
"""Setup the current environment and load it."""

Expand All @@ -196,10 +203,17 @@ def _setup_environ(self, environ):
def _setup_paths(self):
"""Setup the check's dynamic paths."""
self.logger.debug('setting up paths')

self.stagedir = self._resources.stagedir(
self.current_partition.name, self.name, self.current_environ.name)
self._sanitize_basename(self.current_partition.name),
self.name,
self._sanitize_basename(self.current_environ.name)
)
self.outputdir = self._resources.outputdir(
self.current_partition.name, self.name, self.current_environ.name)
self._sanitize_basename(self.current_partition.name),
self.name,
self._sanitize_basename(self.current_environ.name)
)
self.stdout = os.path.join(self.stagedir, '%s.out' % self.name)
self.stderr = os.path.join(self.stagedir, '%s.err' % self.name)

Expand Down Expand Up @@ -230,10 +244,13 @@ def _setup_job(self, **job_opts):
raise ReframeFatalError('Oops: unsupported launcher: %s' %
self.current_partition.scheduler)

job_name = '%s_%s_%s_%s' % (self.name,
self.current_system.name,
self.current_partition.name,
self.current_environ.name)
job_name = '%s_%s_%s_%s' % (
self.name,
self._sanitize_basename(self.current_system.name),
self._sanitize_basename(self.current_partition.name),
self._sanitize_basename(self.current_environ.name)
)

if self.is_local():
self.job = LocalJob(
job_name=job_name,
Expand Down Expand Up @@ -342,6 +359,9 @@ def compile(self, **compile_opts):
if not self.current_environ:
raise ReframeError('no programming environment set')

if not self.sourcesdir:
raise ReframeError('sourcesdir is not set')

# if self.sourcepath refers to a directory, stage it first
target_sourcepath = os.path.join(self.sourcesdir, self.sourcepath)
if os.path.isdir(target_sourcepath):
Expand Down Expand Up @@ -420,8 +440,8 @@ def check_performance(self):
return self._match_patterns(self.perf_patterns, self.reference)


def cleanup(self, remove_files=False, unload_env=True):
# Copy stdout/stderr and job script
def _copy_to_outputdir(self):
"""Copy checks interesting files to the output directory."""
self.logger.debug('copying interesting files to output directory')
shutil.copy(self.stdout, self.outputdir)
shutil.copy(self.stderr, self.outputdir)
Expand All @@ -434,6 +454,15 @@ def cleanup(self, remove_files=False, unload_env=True):
f = os.path.join(self.stagedir, f)
shutil.copy(f, self.outputdir)


def cleanup(self, remove_files=False, unload_env=True):
aliased = os.path.samefile(self.stagedir, self.outputdir)
if aliased:
self.logger.debug('skipping copy to output dir '
'since they alias each other')
else:
self._copy_to_outputdir()

if remove_files:
self.logger.debug('removing stage directory')
shutil.rmtree(self.stagedir)
Expand Down Expand Up @@ -550,7 +579,11 @@ def compile(self, **compile_opts):


def run(self):
self._copy_to_stagedir(os.path.join(self.sourcesdir, self.sourcepath))
# The sourcesdir can be set to None by the user; then we don't copy.
if self.sourcesdir:
self._copy_to_stagedir(os.path.join(self.sourcesdir,
self.sourcepath))

super().run()


Expand Down
Loading