Skip to content

Commit

Permalink
Merge pull request #50 from soasme/predecessors
Browse files Browse the repository at this point in the history
Bugfix: set correct arrow direction for graph visualization.
  • Loading branch information
soasme committed Jul 3, 2021
2 parents 598eef3 + 299ac60 commit d1e69b4
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 5 deletions.
2 changes: 1 addition & 1 deletion ROADMAP.md
Expand Up @@ -62,7 +62,7 @@
- [ ] core: support task argument `_timeout`.
- [ ] core: preview all task execution statuses in ascii chars.
- [ ] core: get all task execution statuses.
- [ ] bugfix: visualize: the arrow in the output is reversed.
- [x] bugfix: visualize: the arrow in the output is reversed. v0.8.1.
- [x] core: remove context from Task.run signature. v0.8.1.
- [x] bugfix: resolve deps failed when implicit deps is in inner map. v0.8.1.
- [x] FEATURES FOR ALPHA RELEASE
Expand Down
4 changes: 4 additions & 0 deletions examples/sqlite3_exec.hcl
Expand Up @@ -40,5 +40,9 @@ flow "sqlite3_exec" {
value = "v3"
}
}

_depends_on = [
task.sql_exec.create_table,
]
}
}
12 changes: 8 additions & 4 deletions runflow/core.py
Expand Up @@ -188,7 +188,7 @@ def __init__(self, flow):
self.results = {}

def check_depends_on(self, task):
for upstream_task in self.flow.graph.successors(task):
for upstream_task in self.flow.graph.predecessors(task):
if self.results[upstream_task].status in (
TaskStatus.FAILED,
TaskStatus.CANCELED,
Expand Down Expand Up @@ -230,7 +230,7 @@ def __init__(self, name, runner_cls=None):
def __iter__(self):
"""Iterate through all tasks in a dependent order."""
try:
return reversed(list(networkx.topological_sort(self.graph)))
yield from networkx.topological_sort(self.graph)
except networkx.exception.NetworkXUnfeasible as err:
raise RunflowAcyclicTasksError(str(err)) from err

Expand Down Expand Up @@ -274,8 +274,12 @@ def add_task(self, task):
self.graph.add_node(task)

def set_dependency(self, task, depends_on):
"""Let `task` depends on `depends_on`."""
self.graph.add_edge(task, depends_on)
"""Let `task` depends on `depends_on`,
e.g. `depends_on` is a predecessor edge of `task`.
"""
self.graph.add_edge(depends_on, task)

def set_default_var(self, name, value):
"""Set default value for variable."""
Expand Down
10 changes: 10 additions & 0 deletions tests/test_cli.py
Expand Up @@ -80,3 +80,13 @@ def test_visualize(tmpdir):
svg = output.read()
assert "task.bash_run.echo" in svg
assert "</svg>" in svg

def test_visualize_dot(tmpdir):
output = tmpdir / "visualize.dot"
cli(['visualize', '--output', str(output), 'examples/hello-deps.hcl'])
assert '"task.bash_run.greeter" -> "task.bash_run.echo"' in output.read()

def test_visualize_dot2(tmpdir):
output = tmpdir / "visualize.dot"
cli(['visualize', '--output', str(output), 'examples/hello-implicit-deps.hcl'])
assert '"task.bash_run.greeter" -> "task.bash_run.echo"' in output.read()

0 comments on commit d1e69b4

Please sign in to comment.