diff --git a/luigi/worker.py b/luigi/worker.py index 24a8b606d5..85dbd70fc3 100644 --- a/luigi/worker.py +++ b/luigi/worker.py @@ -583,10 +583,11 @@ def _add(self, task, is_complete): else: try: deps = task.deps() - except Exception: + except Exception as ex: formatted_traceback = traceback.format_exc() self.add_succeeded = False self._log_dependency_error(task, formatted_traceback) + task.trigger_event(Event.BROKEN_TASK, task, ex) self._email_dependency_error(task, formatted_traceback) return status = PENDING diff --git a/test/event_callbacks_test.py b/test/event_callbacks_test.py index f63c697955..9eb55f335f 100644 --- a/test/event_callbacks_test.py +++ b/test/event_callbacks_test.py @@ -36,6 +36,15 @@ def run(self): raise DummyException() +class TaskWithBrokenDependency(Task): + + def requires(self): + raise DummyException() + + def run(self): + pass + + class TaskWithCallback(Task): def run(self): @@ -88,6 +97,22 @@ def test_failure(self): self.assertEqual(len(exceptions), 1) self.assertTrue(isinstance(exceptions[0], DummyException)) + def test_broken_dependency(self): + failures = [] + exceptions = [] + + @TaskWithBrokenDependency.event_handler(Event.BROKEN_TASK) + def failure(task, exception): + failures.append(task) + exceptions.append(exception) + + t = TaskWithBrokenDependency() + build([t], local_scheduler=True) + + self.assertEqual(failures, [t]) + self.assertEqual(len(exceptions), 1) + self.assertTrue(isinstance(exceptions[0], DummyException)) + def test_custom_handler(self): dummies = []