Skip to content

Commit

Permalink
Merge pull request #14 from pypr/check-for-errors-in-running-tasks
Browse files Browse the repository at this point in the history
 Fix subtle bugs in task runner.
  • Loading branch information
prabhuramachandran committed Oct 23, 2018
2 parents ae38dd9 + d92a993 commit 170a9cf
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Support for inter Problem/Simulation/Task dependencies.
* Print more useful messages when running tasks.
* Fix bug with computing the available cores.
* Improve handling of cases when tasks fail with errors.
* Minor bug fixes.


Expand Down
40 changes: 35 additions & 5 deletions automan/automation.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ class Task(object):

def complete(self):
"""Should return True/False indicating success of task.
If the task was just executed (in this invocation) but failed, raise
any Exception that is a subclass of Exception as this signals an
error to the task execution engine.
If the task was executed in an earlier invocation of the automation,
then just return True/False so as to be able to re-run the simulation.
"""
return all([os.path.exists(x) for x in self.output()])

Expand All @@ -31,7 +38,12 @@ def output(self):
return []

def run(self, scheduler):
"""Run the task, using the given scheduler if needed.
"""Run the task, using the given scheduler.
Using the scheduler is optional but recommended for any long-running
tasks. It is safe to raise an exception immediately when running the
task but for long running tasks, the exception will not matter and the
`complete` method should do.
"""
pass

Expand All @@ -40,7 +52,6 @@ def requires(self):
It is important that one either return tasks that are idempotent or
return the same instance as this method is called repeatedly.
"""
return []

Expand Down Expand Up @@ -74,6 +85,13 @@ def __init__(self, tasks, scheduler):

# #### Private protocol ##############################################

def _check_error_in_running_tasks(self):
running = self._get_tasks_with_status('running')
for task in running:
if self._check_status_of_task(task) == 'error':
return True
return False

def _check_status_of_requires(self, task):
status = [self._check_status_of_task(t) for t in task.requires()]

Expand Down Expand Up @@ -143,6 +161,8 @@ def _wait_for_running_tasks(self, wait):
running = self._get_tasks_with_status('running')
errors = self._get_tasks_with_status('error')
print("{n_err} jobs had errors.".format(n_err=len(errors)))
print("Please fix the issues and re-run.")
return len(errors)

# #### Public protocol ##############################################

Expand All @@ -161,13 +181,22 @@ def add_task(self, task):
self.task_status[task] = 'done'

def run(self, wait=5):
'''Run the tasks that were given.
Wait for the given amount of time to poll for completed tasks.
Returns the number of tasks that had errors.
'''
self._show_remaining_tasks()
status = 'running'
while len(self.todo) > 0 and status != 'error':
to_remove = []
for i in range(len(self.todo) - 1, -1, -1):
task = self.todo[i]
status = self._check_status_of_requires(task)
if self._check_error_in_running_tasks():
status = 'error'

if status == 'error':
break
elif status == 'done':
Expand All @@ -181,9 +210,10 @@ def run(self, wait=5):
self._show_remaining_tasks(replace_line=True)
time.sleep(wait)

if status == 'error':
self._wait_for_running_tasks(wait)
print("Finished!")
n_errors = self._wait_for_running_tasks(wait)
if n_errors == 0:
print("Finished!")
return n_errors


class CommandTask(Task):
Expand Down
7 changes: 4 additions & 3 deletions automan/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,11 @@ def get_info(self, job_id):


class Scheduler(object):
def __init__(self, root='.', worker_config=()):
def __init__(self, root='.', worker_config=(), wait=5):
self.workers = deque()
self.worker_config = list(worker_config)
self.root = os.path.abspath(os.path.expanduser(root))
self.wait = wait
self._completed_jobs = []
self.jobs = []

Expand Down Expand Up @@ -523,7 +524,7 @@ def load(self, fname):
def add_worker(self, conf):
self.worker_config.append(conf)

def submit(self, job, wait=5):
def submit(self, job):
proxy = None
slept = False
while proxy is None:
Expand All @@ -538,7 +539,7 @@ def submit(self, job, wait=5):
self.jobs.append(proxy)
break
else:
time.sleep(wait)
time.sleep(self.wait)
slept = True
print("\rWaiting for free worker ...", end='')
sys.stdout.flush()
Expand Down
54 changes: 53 additions & 1 deletion automan/tests/test_automation.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,66 @@ def tearDown(self):
class TestTaskRunner(TestAutomationBase):
def _make_scheduler(self):
worker = dict(host='localhost')
s = Scheduler(root='.', worker_config=[worker])
s = Scheduler(root='.', worker_config=[worker], wait=0.1)
return s

def _get_time(self, path):
with open(os.path.join(path, 'stdout.txt')) as f:
t = float(f.read())
return t

@mock.patch('automan.jobs.total_cores', return_value=2)
def test_task_runner_waits_for_tasks_in_the_end(self, m_t_cores):
# Given
s = self._make_scheduler()
cmd = 'python -c "import sys, time; time.sleep(0.1); sys.exit(1)"'
ct1_dir = os.path.join(self.sim_dir, '1')
ct2_dir = os.path.join(self.sim_dir, '2')
ct3_dir = os.path.join(self.sim_dir, '3')
ct1 = CommandTask(cmd, output_dir=ct1_dir)
ct2 = CommandTask(cmd, output_dir=ct2_dir)
ct3 = CommandTask(cmd, output_dir=ct3_dir)

# When
t = TaskRunner(tasks=[ct1, ct2, ct3], scheduler=s)
n_errors = t.run(wait=0.1)

# Then
# All the tasks may have been run but those that ran will fail.
self.assertEqual(n_errors + len(t.todo), 3)
self.assertTrue(n_errors > 0)

@mock.patch('automan.jobs.total_cores', return_value=2)
def test_task_runner_checks_for_error_in_running_tasks(self, m_t_cores):
# Given
s = self._make_scheduler()
cmd = 'python -c "import sys, time; time.sleep(0.1); sys.exit(1)"'
ct1_dir = os.path.join(self.sim_dir, '1')
ct2_dir = os.path.join(self.sim_dir, '2')
ct3_dir = os.path.join(self.sim_dir, '3')
job_info = dict(n_core=2, n_thread=2)
ct1 = CommandTask(cmd, output_dir=ct1_dir, job_info=job_info)
ct2 = CommandTask(cmd, output_dir=ct2_dir, job_info=job_info)
ct3 = CommandTask(cmd, output_dir=ct3_dir, job_info=job_info)

# When
t = TaskRunner(tasks=[ct1, ct2, ct3], scheduler=s)

# Then
self.assertEqual(len(t.todo), 3)

# When
n_errors = t.run(wait=0.1)

# Then
# In this case, two tasks should have run and one should not have run
# as the other two had errors.
self.assertEqual(n_errors, 2)
self.assertEqual(len(t.todo), 1)
self.assertTrue(os.path.exists(ct3_dir))
self.assertTrue(os.path.exists(ct2_dir))
self.assertFalse(os.path.exists(ct1_dir))

def test_task_runner_does_not_add_repeated_tasks(self):
# Given
s = self._make_scheduler()
Expand Down
15 changes: 9 additions & 6 deletions automan/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ def safe_rmtree(*args, **kw):
except WindowsError:
pass
else:
shutil.rmtree(*args, **kw)
try:
shutil.rmtree(*args, **kw)
except OSError:
pass


class TestJob(unittest.TestCase):
Expand Down Expand Up @@ -445,18 +448,18 @@ def test_scheduler_should_not_overload_worker(self, m_total_cores,
# Given
n_core = jobs.total_cores()
config = [dict(host='localhost')]
s = jobs.Scheduler(worker_config=config)
s = jobs.Scheduler(worker_config=config, wait=0.5)

j1 = self._make_dummy_job(n_core, sleep=0.5)
j2 = self._make_dummy_job(n_core, sleep=0.5)
j3 = self._make_dummy_job(n_core, sleep=0.5)
j4 = self._make_dummy_job(0, sleep=0.5)

# When
proxy1 = s.submit(j1, wait=0.5)
proxy2 = s.submit(j2, wait=0.5)
proxy3 = s.submit(j3, wait=0.5)
proxy4 = s.submit(j4, wait=0.5)
proxy1 = s.submit(j1)
proxy2 = s.submit(j2)
proxy3 = s.submit(j3)
proxy4 = s.submit(j4)

# Then
self.assertEqual(len(s.workers), 1)
Expand Down

0 comments on commit 170a9cf

Please sign in to comment.