1414from _pytask .console import format_task_name
1515from _pytask .console import render_to_string
1616from _pytask .console import TASK_ICON
17- from _pytask .dag_utils import node_and_neighbors
18- from _pytask .dag_utils import task_and_descending_tasks
19- from _pytask .dag_utils import TopologicalSorter
20- from _pytask .database_utils import DatabaseSession
21- from _pytask .database_utils import State
2217from _pytask .exceptions import ResolvingDependenciesError
23- from _pytask .mark import Mark
2418from _pytask .node_protocols import PNode
2519from _pytask .node_protocols import PTask
2620from _pytask .nodes import PythonNode
3125from rich .tree import Tree
3226
3327if TYPE_CHECKING :
34- from _pytask .node_protocols import MetaNode
3528 from pathlib import Path
3629 from _pytask .session import Session
3730
@@ -44,7 +37,6 @@ def pytask_dag(session: Session) -> bool | None:
4437 session = session , tasks = session .tasks
4538 )
4639 session .hook .pytask_dag_modify_dag (session = session , dag = session .dag )
47- session .hook .pytask_dag_select_execution_dag (session = session , dag = session .dag )
4840
4941 except Exception : # noqa: BLE001
5042 report = DagReport .from_exception (sys .exc_info ())
@@ -101,59 +93,6 @@ def _add_product(dag: nx.DiGraph, task: PTask, node: PNode) -> None:
10193 return dag
10294
10395
104- @hookimpl
105- def pytask_dag_select_execution_dag (session : Session , dag : nx .DiGraph ) -> None :
106- """Select the tasks which need to be executed."""
107- scheduler = TopologicalSorter .from_dag (dag )
108- visited_nodes : set [str ] = set ()
109-
110- while scheduler .is_active ():
111- task_signature = scheduler .get_ready ()[0 ]
112- if task_signature not in visited_nodes :
113- task = dag .nodes [task_signature ]["task" ]
114- have_changed = _have_task_or_neighbors_changed (session , dag , task )
115- if have_changed :
116- visited_nodes .update (task_and_descending_tasks (task_signature , dag ))
117- else :
118- dag .nodes [task_signature ]["task" ].markers .append (
119- Mark ("skip_unchanged" , (), {})
120- )
121- scheduler .done (task_signature )
122-
123-
124- def _have_task_or_neighbors_changed (
125- session : Session , dag : nx .DiGraph , task : PTask
126- ) -> bool :
127- """Indicate whether dependencies or products of a task have changed."""
128- return any (
129- session .hook .pytask_dag_has_node_changed (
130- session = session ,
131- dag = dag ,
132- task = task ,
133- node = dag .nodes [node_name ].get ("task" ) or dag .nodes [node_name ].get ("node" ),
134- )
135- for node_name in node_and_neighbors (dag , task .signature )
136- )
137-
138-
139- @hookimpl (trylast = True )
140- def pytask_dag_has_node_changed (task : PTask , node : MetaNode ) -> bool :
141- """Indicate whether a single dependency or product has changed."""
142- # If node does not exist, we receive None.
143- node_state = node .state ()
144- if node_state is None :
145- return True
146-
147- with DatabaseSession () as session :
148- db_state = session .get (State , (task .signature , node .signature ))
149-
150- # If the node is not in the database.
151- if db_state is None :
152- return True
153-
154- return node_state != db_state .hash_
155-
156-
15796def _check_if_dag_has_cycles (dag : nx .DiGraph ) -> None :
15897 """Check if DAG has cycles."""
15998 try :
0 commit comments