Skip to content

Commit

Permalink
Merge pull request #20 from pypr/improve-automator
Browse files Browse the repository at this point in the history
Improve automator
  • Loading branch information
prabhuramachandran committed Apr 12, 2020
2 parents 512c371 + fd73342 commit 0c56fde
Show file tree
Hide file tree
Showing 8 changed files with 444 additions and 63 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
language: python

python:
- 2.7
- 3.6
- 3.7
- 3.8

install:
- pip install --upgrade pytest coverage codecov
Expand Down
4 changes: 2 additions & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
environment:

matrix:
- PYTHON: "C:\\Python27"
- PYTHON: "C:\\Python36-x64"
- PYTHON: "C:\\Python37-x64"
- PYTHON: "C:\\Python38-x64"

install:
- "%PYTHON%\\python.exe -V"
Expand Down
4 changes: 2 additions & 2 deletions automan/api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from .jobs import Job, Worker, LocalWorker, RemoteWorker, Scheduler # noqa

from .automation import ( # noqa
Automator, CommandTask, Problem, PySPHProblem, PySPHTask, RunAll,
Simulation, SolveProblem, Task, TaskRunner, WrapperTask
Automator, CommandTask, FileCommandTask, Problem, PySPHProblem, PySPHTask,
RunAll, Simulation, SolveProblem, Task, TaskRunner, WrapperTask
)

from .automation import compare_runs, filter_by_name, filter_cases # noqa
Expand Down
218 changes: 170 additions & 48 deletions automan/automation.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ class Task(object):
This class is very similar to luigi's Task class.
"""
def __init__(self, depends=None):
# Depends is a list and available for all tasks.
self.depends = depends if depends is not None else []

def complete(self):
"""Should return True/False indicating success of task.
Expand Down Expand Up @@ -53,7 +56,7 @@ def requires(self):
It is important that one either return tasks that are idempotent or
return the same instance as this method is called repeatedly.
"""
return []
return self.depends


class WrapperTask(Task):
Expand Down Expand Up @@ -238,12 +241,13 @@ def __init__(self, command, output_dir, job_info=None, depends=None):
**Parameters**
command: str or list: command to run $output_dir is substituted.
command: str or list: command to run; $output_dir is substituted.
output_dir: str : path of output directory.
job_info: dict: dictionary of job information.
depends: list: list of tasks this depends on.
"""
super().__init__(depends=depends)
if isinstance(command, str):
self.command = shlex.split(command)
else:
Expand All @@ -252,7 +256,6 @@ def __init__(self, command, output_dir, job_info=None, depends=None):
for x in self.command]
self.output_dir = output_dir
self.job_info = job_info if job_info is not None else {}
self.depends = depends if depends is not None else []
self.job_proxy = None
self._copy_proc = None
# This is a sentinel set to true when the job is finished
Expand Down Expand Up @@ -381,7 +384,7 @@ def __init__(self, command, output_dir, job_info=None, depends=None):
**Parameters**
command: str or list: command to run $output_dir is substituted.
command: str or list: command to run; $output_dir is substituted.
output_dir: str : path of output directory.
job_info: dict: dictionary of job information.
depends: list: list of tasks this depends on.
Expand Down Expand Up @@ -418,6 +421,48 @@ def _get_info_filename(self):
return None


class FileCommandTask(CommandTask):
"""Convenience class to run a command which produces as output one or more
files. The difference from the CommandTask is that this does not place its
outputs in a separate directory.
"""
def __init__(self, command, files, job_info=None, depends=None):
"""Constructor
**Parameters**
command: str or list: command to run; $output_dir is substituted.
output_dir: str : path of output directory.
files: list(str): relative paths of output files.
job_info: dict: dictionary of job information.
depends: list: list of tasks this depends on.
"""
self.files = files
output_dir = os.path.join(files[0] + '.job_info')
super().__init__(
command, output_dir, job_info=job_info, depends=depends
)

def clean(self):
"""Clean out any generated results.
This completely removes the output directory.
"""
if os.path.exists(self.output_dir):
shutil.rmtree(self.output_dir)
for f in self.files:
if os.path.exists(f):
os.remove(f)

def output(self):
"""Return list of output paths.
"""
return self.files


class Problem(object):
"""This class represents a numerical problem or computational
problem of interest that needs to be solved.
Expand Down Expand Up @@ -824,7 +869,8 @@ class SolveProblem(Task):
re-run any post-processing.
"""

def __init__(self, problem, match='', force=False):
def __init__(self, problem, match='', force=False, depends=None):
super().__init__(depends=depends)
self.problem = problem
self.match = match
self.force = force
Expand Down Expand Up @@ -872,15 +918,16 @@ def run(self, scheduler):
self.problem.run()

def requires(self):
return self._requires
return self._requires + self.depends


class RunAll(WrapperTask):
"""Solves a given collection of problems.
"""

def __init__(self, simulation_dir, output_dir, problem_classes,
force=False, match=''):
force=False, match='', depends=None):
super().__init__(depends=depends)
self.simulation_dir = simulation_dir
self.output_dir = output_dir
self.force = force
Expand All @@ -906,7 +953,7 @@ def _make_problems(self, problem_classes):
# #### Public protocol ################################################

def requires(self):
return self._requires
return self._requires + self.depends


class Automator(object):
Expand Down Expand Up @@ -946,60 +993,77 @@ def __init__(self, simulation_dir, output_dir, all_problems,
self.simulation_dir = simulation_dir
self.output_dir = output_dir
self.all_problems = all_problems
self.named_tasks = {}
self.tasks = []
self.post_proc_tasks = []
self.runner = None
self.cluster_manager = None
self.runall_task = None
self._args = None
if cluster_manager_factory is None:
from automan.cluster_manager import ClusterManager
self.cluster_manager_factory = ClusterManager
else:
self.cluster_manager_factory = cluster_manager_factory
self._setup_argparse()

# #### Public Protocol ########################################

def run(self, argv=None):
"""Start the automation.
"""
args = self.parser.parse_args(argv)
def add_task(self, task, name=None, post_proc=False):
"""Add a task or a problem instance to also execute.
self._check_positional_arguments(args.problem)
If the `name` is specified then it is a treated as a named task wherein
it must be only invoked explicitly via the command line when asked.
self.cluster_manager = self.cluster_manager_factory(
config_fname=args.config,
exclude_paths=self._get_exclude_paths()
)
from .cluster_manager import BootstrapError
If `post_proc` is True then the task is given an additional dependency
if possible such that the task is run after the `RunAll` task is
completed.
if len(args.host) > 0:
try:
self.cluster_manager.add_worker(args.host, args.home, args.nfs)
except BootstrapError:
pass
return
elif len(args.host) == 0 and args.update_remote:
self.cluster_manager.update(not args.no_rebuild)

problem_classes = self._select_problem_classes(args.problem)
task = RunAll(
simulation_dir=self.simulation_dir,
output_dir=self.output_dir,
problem_classes=problem_classes,
force=args.force, match=args.match
)
**Parameters**
task: Task or Problem instance: Task or Problem to add.
name: str: name of the task (optional).
post_proc: bool: Add a dependency to the task with the RunAll task.
self.scheduler = self.cluster_manager.create_scheduler()
self.runner = TaskRunner([task], self.scheduler)
"""
if isinstance(task, type) and issubclass(task, Problem):
p = task(
simulation_dir=self.simulation_dir, output_dir=self.output_dir
)
_task = SolveProblem(p)
elif isinstance(task, Problem):
_task = SolveProblem(task)
elif isinstance(task, Task):
_task = task
else:
raise ValueError(
'Invalid task: must be Problem class/instance or Task.'
)
if name is not None:
self.named_tasks[name] = _task
else:
self.tasks.append(_task)

if post_proc:
self.post_proc_tasks.append(_task)

def run(self, argv=None):
"""Start the automation.
"""
self._setup(argv)
self._setup_tasks()
self.runner.run()

# #### Private Protocol ########################################

def _check_positional_arguments(self, problems):
names = [c.__name__ for c in self.all_problems]
names = [c.__name__ for c in self.all_problems] + ['all']
lower_names = [x.lower() for x in names]
if problems != 'all':
for p in problems:
if p.lower() not in lower_names:
print("ERROR: %s not a valid problem!" % p)
print("Valid names are %s" % ', '.join(names))
self.parser.exit(1)
lower_names.extend(list(self.named_tasks.keys()))
for p in problems:
if p.lower() not in lower_names:
print("ERROR: %s not a valid problem/task!" % p)
print("Valid names are %s" % ', '.join(names))
self.parser.exit(1)

def _get_exclude_paths(self):
"""Returns a list of exclude paths suitable for passing on to rsync to
Expand All @@ -1012,24 +1076,64 @@ def _get_exclude_paths(self):
return paths

def _select_problem_classes(self, problems):
if problems == 'all':
if 'all' in problems:
return self.all_problems
else:
lower_names = [x.lower() for x in problems]
return [cls for cls in self.all_problems
if cls.__name__.lower() in lower_names]

def _setup(self, argv):
if self.runner is None:
self._setup_argparse()

args = self.parser.parse_args(argv)
self._args = args

self._check_positional_arguments(args.problem)

self.cluster_manager = self.cluster_manager_factory(
config_fname=args.config,
exclude_paths=self._get_exclude_paths()
)
from .cluster_manager import BootstrapError

if len(args.host) > 0:
try:
self.cluster_manager.add_worker(
args.host, args.home, args.nfs
)
except BootstrapError:
pass
return
elif len(args.host) == 0 and args.update_remote:
self.cluster_manager.update(not args.no_rebuild)

problem_classes = self._select_problem_classes(args.problem)
task = RunAll(
simulation_dir=self.simulation_dir,
output_dir=self.output_dir,
problem_classes=problem_classes,
force=args.force, match=args.match
)
self.runall_task = task

self.scheduler = self.cluster_manager.create_scheduler()
self.runner = TaskRunner([task], self.scheduler)

def _setup_argparse(self):
import argparse
desc = "Automation script to run simulations."
parser = argparse.ArgumentParser(
description=desc
)
all_problem_names = [c.__name__ for c in self.all_problems]
all_problem_names += list(self.named_tasks.keys()) + ['all']
parser.add_argument(
'problem', nargs='*', default="all",
help="Specifies problem to run as a string (case-insensitive), "
"valid names are %s. Defaults to running all of them."
'problem', nargs='*', default=["all"],
help="Specifies problem/task to run as a string "
"(case-insensitive), valid names are %s. "
"Defaults to running all of the problems."
% all_problem_names
)

Expand Down Expand Up @@ -1072,3 +1176,21 @@ def _setup_argparse(self):
)

self.parser = parser

def _setup_tasks(self):
for task in self.post_proc_tasks:
task.depends.append(self.runall_task)

# Add generic tasks.
for task in self.tasks:
self.runner.add_task(task)

# Add named tasks only if specifically requested on CLI.
for name, task in self.named_tasks.items():
if name in self._args.problem:
self.runner.add_task(task)

# Reset the tasks so we can use the automator interactively.
self.post_proc_tasks = []
self.tasks = []
self.named_tasks = {}

0 comments on commit 0c56fde

Please sign in to comment.