Skip to content

Commit

Permalink
Add better error message.
Browse files Browse the repository at this point in the history
  • Loading branch information
tobiasraabe committed Sep 29, 2023
1 parent 132f515 commit 9d58501
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/source/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
- {pull}`427` fixes type annotations for {attr}`pytask.PTask.depends_on` and
{attr}`pytask.PTask.produces`.
- {pull}`428` updates the example in the readme.
- {pull}`429` implements a more informative error message when `node.state()` throws an
exception. Now, it is easy to see which tasks are affected.

## 0.3.2 - 2023-06-07

Expand Down
21 changes: 20 additions & 1 deletion src/_pytask/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from _pytask.report import DagReport
from _pytask.shared import reduce_names_of_multiple_nodes
from _pytask.shared import reduce_node_name
from _pytask.traceback import remove_internal_traceback_frames_from_exception
from _pytask.traceback import render_exc_info
from _pytask.tree_util import tree_map
from rich.text import Text
Expand Down Expand Up @@ -201,6 +202,8 @@ def _format_cycles(cycles: list[tuple[str, ...]]) -> str:


def _check_if_root_nodes_are_available(dag: nx.DiGraph, paths: Sequence[Path]) -> None:
__tracebackhide__ = True

missing_root_nodes = []
is_task_skipped: dict[str, bool] = {}

Expand All @@ -212,7 +215,12 @@ def _check_if_root_nodes_are_available(dag: nx.DiGraph, paths: Sequence[Path]) -
node, dag, is_task_skipped
)
if not are_all_tasks_skipped:
node_exists = dag.nodes[node]["node"].state()
try:
node_exists = dag.nodes[node]["node"].state()
except Exception as e: # noqa: BLE001
e = remove_internal_traceback_frames_from_exception(e)
msg = _format_exception_from_failed_node_state(node, dag)
raise ResolvingDependenciesError(msg) from e
if not node_exists:
missing_root_nodes.append(node)

Expand All @@ -232,6 +240,17 @@ def _check_if_root_nodes_are_available(dag: nx.DiGraph, paths: Sequence[Path]) -
raise ResolvingDependenciesError(_TEMPLATE_ERROR.format(text)) from None


def _format_exception_from_failed_node_state(node_name: str, dag: nx.DiGraph) -> str:
"""Format message when ``node.state()`` threw an exception."""
tasks = [dag.nodes[i]["task"] for i in dag.successors(node_name)]
names = [getattr(x, "display_name", x.name) for x in tasks]
successors = ", ".join([f"{name!r}" for name in names])
return (
f"While checking whether dependency {node_name!r} from task(s) "
f"{successors} exists, an error was raised."
)


def _check_if_tasks_are_skipped(
node: PNode, dag: nx.DiGraph, is_task_skipped: dict[str, bool]
) -> tuple[bool, dict[str, bool]]:
Expand Down
15 changes: 15 additions & 0 deletions src/_pytask/traceback.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

__all__ = [
"format_exception_without_traceback",
"remove_internal_traceback_frames_from_exception",
"remove_internal_traceback_frames_from_exc_info",
"remove_traceback_from_exc_info",
"render_exc_info",
Expand Down Expand Up @@ -57,6 +58,20 @@ def remove_traceback_from_exc_info(exc_info: ExceptionInfo) -> ExceptionInfo:
return (*exc_info[:2], None)


def remove_internal_traceback_frames_from_exception(exc: Exception) -> Exception:
"""Remove internal traceback frames from exception.
The conversion between exceptions and ``sys.exc_info`` is explained here:
https://stackoverflow.com/a/59041463/7523785.
"""
_, _, tb = remove_internal_traceback_frames_from_exc_info(
(type(exc), exc, exc.__traceback__)
)
exc.__traceback__ = tb
return exc


def remove_internal_traceback_frames_from_exc_info(
exc_info: ExceptionInfo,
) -> ExceptionInfo:
Expand Down
14 changes: 14 additions & 0 deletions tests/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,17 @@ def task_example(produces):

result = runner.invoke(cli, [tmp_path.as_posix()])
assert result.exit_code == ExitCode.OK


def test_error_when_node_state_throws_error(runner, tmp_path):
source = """
from pytask import PythonNode
def task_example(a = PythonNode(value={"a": 1}, hash=True)):
pass
"""
tmp_path.joinpath("task_example.py").write_text(textwrap.dedent(source))

result = runner.invoke(cli, [tmp_path.as_posix()])
assert result.exit_code == ExitCode.DAG_FAILED
assert "task_example" in result.output

0 comments on commit 9d58501

Please sign in to comment.