From f5267517b66b3e7f1fdaafa4e5b9a32dd7ddb8c6 Mon Sep 17 00:00:00 2001 From: Dhruv Garg Date: Thu, 3 Sep 2015 08:41:20 -0400 Subject: [PATCH] Fix error message naming / docs. Set task.expl earlier. --- luigi/scheduler.py | 6 +++--- luigi/task.py | 2 +- luigi/worker.py | 16 ++++++++------- test/worker_task_test.py | 43 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 55 insertions(+), 12 deletions(-) diff --git a/luigi/scheduler.py b/luigi/scheduler.py index 692309a867..ac0382f673 100644 --- a/luigi/scheduler.py +++ b/luigi/scheduler.py @@ -613,6 +613,9 @@ def add_task(self, task_id=None, status=PENDING, runnable=True, if task.remove is not None: task.remove = None # unmark task for removal so it isn't removed after being added + if expl is not None: + task.expl = expl + if not (task.status == RUNNING and status == PENDING): # don't allow re-scheduling of task while it is running, it must either fail or succeed first if status == PENDING or status != task.status: @@ -649,9 +652,6 @@ def add_task(self, task_id=None, status=PENDING, runnable=True, self._state.get_worker(worker_id).tasks.add(task) task.runnable = runnable - if expl is not None: - task.expl = expl - def add_worker(self, worker, info, **kwargs): self._state.get_worker(worker).add_info(info) diff --git a/luigi/task.py b/luigi/task.py index cc1fbdb0f0..790103e0f8 100644 --- a/luigi/task.py +++ b/luigi/task.py @@ -467,7 +467,7 @@ def on_failure(self, exception): Override for custom error handling. This method gets called if an exception is raised in :py:meth:`run`. - Return value of this method is json encoded and sent to the scheduler + The returned value of this method is json encoded and sent to the scheduler as the `expl` argument. Its string representation will be used as the body of the error email sent out if any. diff --git a/luigi/worker.py b/luigi/worker.py index aaa5523441..4a7fc37a00 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -116,7 +116,7 @@ def run(self): random.seed((os.getpid(), time.time())) status = FAILED - error_message = '' + expl = '' missing = [] new_deps = [] try: @@ -146,7 +146,7 @@ def run(self): elif status == DONE: self.task.trigger_event( Event.PROCESSING_TIME, self.task, time.time() - t0) - error_message = json.dumps(self.task.on_success()) + expl = json.dumps(self.task.on_success()) logger.info('[pid %s] Worker %s done %s', os.getpid(), self.worker_id, self.task.task_id) self.task.trigger_event(Event.SUCCESS, self.task) @@ -159,13 +159,15 @@ def run(self): self.task.trigger_event(Event.FAILURE, self.task, ex) subject = "Luigi: %s FAILED" % self.task - error_message = notifications.wrap_traceback(self.task.on_failure(ex)) + raw_error_message = self.task.on_failure(ex) + notification_error_message = notifications.wrap_traceback(raw_error_message) + expl = json.dumps(raw_error_message) formatted_error_message = notifications.format_task_error(subject, self.task, - formatted_exception=error_message) + formatted_exception=notification_error_message) notifications.send_error_email(subject, formatted_error_message, self.task.owner_email) finally: self.result_queue.put( - (self.task.task_id, status, error_message, missing, new_deps)) + (self.task.task_id, status, expl, missing, new_deps)) class SingleProcessPool(object): @@ -635,7 +637,7 @@ def _handle_next_task(self): self._purge_children() # Deal with subprocess failures try: - task_id, status, error_message, missing, new_requirements = ( + task_id, status, expl, missing, new_requirements = ( self._task_result_queue.get( timeout=float(self._config.wait_interval))) except Queue.Empty: @@ -657,7 +659,7 @@ def _handle_next_task(self): self._add_task(worker=self._id, task_id=task_id, status=status, - expl=error_message, + expl=expl, resources=task.process_resources(), runnable=None, params=task.to_str_params(), diff --git a/test/worker_task_test.py b/test/worker_task_test.py index 8a85be2687..3b8dfa373a 100644 --- a/test/worker_task_test.py +++ b/test/worker_task_test.py @@ -14,13 +14,17 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import multiprocessing from helpers import unittest +import mock +import json import luigi import luigi.date_interval import luigi.notifications -from luigi.worker import TaskException +from luigi.worker import TaskException, TaskProcess +from luigi.scheduler import DONE, FAILED luigi.notifications.DEBUG = True @@ -33,6 +37,21 @@ def __init__(self): pass +class SuccessTask(luigi.Task): + + def on_success(self): + return "test success expl" + + +class FailTask(luigi.Task): + + def run(self): + raise BaseException("Uh oh.") + + def on_failure(self, exception): + return "test failure expl" + + class WorkerTaskTest(unittest.TestCase): def test_constructor(self): @@ -45,5 +64,27 @@ def f(): luigi.build([None], local_scheduler=True) self.assertRaises(TaskException, f) + +class TaskProcessTest(unittest.TestCase): + + def test_update_result_queue_on_success(self): + task = SuccessTask() + result_queue = multiprocessing.Queue() + task_process = TaskProcess(task, 1, result_queue) + + with mock.patch.object(result_queue, 'put') as mock_put: + task_process.run() + mock_put.assert_called_once_with((task.task_id, DONE, json.dumps("test success expl"), [], None)) + + def test_update_result_queue_on_failure(self): + task = FailTask() + result_queue = multiprocessing.Queue() + task_process = TaskProcess(task, 1, result_queue) + + with mock.patch.object(result_queue, 'put') as mock_put: + task_process.run() + mock_put.assert_called_once_with((task.task_id, FAILED, json.dumps("test failure expl"), [], [])) + + if __name__ == '__main__': unittest.main()