Skip to content

Commit fbb3bb2

Browse files
authored
Allow tasks to be pending. (#609)
1 parent 7ba6243 commit fbb3bb2

File tree

8 files changed

+63
-30
lines changed

8 files changed

+63
-30
lines changed

docs/source/changes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
4747
- {pull}`603` fixes an example in the documentation about capturing warnings.
4848
- {pull}`604` fixes some examples with `PythonNode`s in the documentation.
4949
- {pull}`605` improves checks and CI.
50+
- {pull}`609` allows a pending status for tasks. Useful for async backends implemented
51+
in pytask-parallel.
5052

5153
## 0.4.7 - 2024-03-19
5254

src/_pytask/dag_utils.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,7 @@ def from_dag(cls, dag: nx.DiGraph) -> TopologicalSorter:
9292
priorities = _extract_priorities_from_tasks(tasks)
9393

9494
task_signatures = {task.signature for task in tasks}
95-
task_dict = {
96-
name: nx.ancestors(dag, name) & task_signatures for name in task_signatures
97-
}
95+
task_dict = {s: nx.ancestors(dag, s) & task_signatures for s in task_signatures}
9896
task_dag = nx.DiGraph(task_dict).reverse()
9997

10098
return cls(dag=task_dag, priorities=priorities)

src/_pytask/execute.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from _pytask.exceptions import ExecutionError
2626
from _pytask.exceptions import NodeLoadError
2727
from _pytask.exceptions import NodeNotFoundError
28+
from _pytask.logging_utils import TaskExecutionStatus
2829
from _pytask.mark import Mark
2930
from _pytask.mark_utils import has_mark
3031
from _pytask.node_protocols import PNode
@@ -98,7 +99,9 @@ def pytask_execute_build(session: Session) -> bool | None:
9899
@hookimpl
99100
def pytask_execute_task_protocol(session: Session, task: PTask) -> ExecutionReport:
100101
"""Follow the protocol to execute each task."""
101-
session.hook.pytask_execute_task_log_start(session=session, task=task)
102+
session.hook.pytask_execute_task_log_start(
103+
session=session, task=task, status=TaskExecutionStatus.RUNNING
104+
)
102105
try:
103106
session.hook.pytask_execute_task_setup(session=session, task=task)
104107
session.hook.pytask_execute_task(session=session, task=task)

src/_pytask/hookspecs.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import click
1919
from pluggy import PluginManager
2020

21+
from _pytask.logging_utils import TaskExecutionStatus
2122
from _pytask.models import NodeInfo
2223
from _pytask.node_protocols import PNode
2324
from _pytask.node_protocols import PProvisionalNode
@@ -255,7 +256,9 @@ def pytask_execute_task_protocol(session: Session, task: PTask) -> ExecutionRepo
255256

256257

257258
@hookspec(firstresult=True)
258-
def pytask_execute_task_log_start(session: Session, task: PTask) -> None:
259+
def pytask_execute_task_log_start(
260+
session: Session, task: PTask, status: TaskExecutionStatus
261+
) -> None:
259262
"""Start logging of task execution.
260263
261264
This hook can be used to provide more verbose output during the execution.

src/_pytask/live.py

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations
44

5+
from dataclasses import dataclass
56
from typing import TYPE_CHECKING
67
from typing import Any
78
from typing import Generator
@@ -24,6 +25,7 @@
2425
from _pytask.pluginmanager import hookimpl
2526

2627
if TYPE_CHECKING:
28+
from _pytask.logging_utils import TaskExecutionStatus
2729
from _pytask.node_protocols import PTask
2830
from _pytask.reports import CollectionReport
2931
from _pytask.reports import ExecutionReport
@@ -129,6 +131,12 @@ def is_started(self) -> bool:
129131
return self._live.is_started
130132

131133

134+
@dataclass
135+
class _TaskEntry:
136+
task: PTask
137+
status: TaskExecutionStatus
138+
139+
132140
class _ReportEntry(NamedTuple):
133141
name: str
134142
outcome: TaskOutcome
@@ -146,7 +154,7 @@ class LiveExecution:
146154
sort_final_table: bool = False
147155
n_tasks: int | str = "x"
148156
_reports: list[_ReportEntry] = field(factory=list)
149-
_running_tasks: dict[str, PTask] = field(factory=dict)
157+
_running_tasks: dict[str, _TaskEntry] = field(factory=dict)
150158

151159
@hookimpl(wrapper=True)
152160
def pytask_execute_build(self) -> Generator[None, None, None]:
@@ -162,15 +170,17 @@ def pytask_execute_build(self) -> Generator[None, None, None]:
162170
return result
163171

164172
@hookimpl(tryfirst=True)
165-
def pytask_execute_task_log_start(self, task: PTask) -> bool:
173+
def pytask_execute_task_log_start(
174+
self, task: PTask, status: TaskExecutionStatus
175+
) -> bool:
166176
"""Mark a new task as running."""
167-
self.update_running_tasks(task)
177+
self.add_task(new_running_task=task, status=status)
168178
return True
169179

170180
@hookimpl
171181
def pytask_execute_task_log_end(self, report: ExecutionReport) -> bool:
172182
"""Mark a task as being finished and update outcome."""
173-
self.update_reports(report)
183+
self.update_report(report)
174184
return True
175185

176186
def _generate_table(
@@ -232,16 +242,17 @@ def _generate_table(
232242
format_task_name(report.task, editor_url_scheme=self.editor_url_scheme),
233243
Text(report.outcome.symbol, style=report.outcome.style),
234244
)
235-
for task in self._running_tasks.values():
245+
for task_entry in self._running_tasks.values():
236246
table.add_row(
237-
format_task_name(task, editor_url_scheme=self.editor_url_scheme),
238-
"running",
247+
format_task_name(
248+
task_entry.task, editor_url_scheme=self.editor_url_scheme
249+
),
250+
task_entry.status.value,
239251
)
240252

241253
# If the table is empty, do not display anything.
242254
if table.rows == []:
243-
table = None
244-
255+
return None
245256
return table
246257

247258
def _update_table(
@@ -256,14 +267,21 @@ def _update_table(
256267
)
257268
self.live_manager.update(table)
258269

259-
def update_running_tasks(self, new_running_task: PTask) -> None:
270+
def add_task(self, new_running_task: PTask, status: TaskExecutionStatus) -> None:
260271
"""Add a new running task."""
261-
self._running_tasks[new_running_task.name] = new_running_task
272+
self._running_tasks[new_running_task.signature] = _TaskEntry(
273+
task=new_running_task, status=status
274+
)
275+
self._update_table()
276+
277+
def update_task(self, signature: str, status: TaskExecutionStatus) -> None:
278+
"""Update the status of a running task."""
279+
self._running_tasks[signature].status = status
262280
self._update_table()
263281

264-
def update_reports(self, new_report: ExecutionReport) -> None:
282+
def update_report(self, new_report: ExecutionReport) -> None:
265283
"""Update the status of a running task by adding its report."""
266-
self._running_tasks.pop(new_report.task.name)
284+
self._running_tasks.pop(new_report.task.signature)
267285
self._reports.append(
268286
_ReportEntry(
269287
name=new_report.task.name,

src/_pytask/logging_utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from enum import Enum
2+
3+
4+
class TaskExecutionStatus(Enum):
5+
PENDING = "pending"
6+
RUNNING = "running"

src/pytask/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from _pytask.build import build
88
from _pytask.capture_utils import CaptureMethod
99
from _pytask.capture_utils import ShowCapture
10+
from _pytask.logging_utils import TaskExecutionStatus
1011

1112

1213
from _pytask.click import ColoredCommand
@@ -129,6 +130,7 @@
129130
"SkippedUnchanged",
130131
"State",
131132
"Task",
133+
"TaskExecutionStatus",
132134
"TaskOutcome",
133135
"TaskWithoutPath",
134136
"Traceback",

tests/test_live.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import pytest
77
from _pytask.live import LiveExecution
88
from _pytask.live import LiveManager
9+
from _pytask.logging_utils import TaskExecutionStatus
910
from pytask import ExecutionReport
1011
from pytask import ExitCode
1112
from pytask import Task
@@ -41,7 +42,7 @@ def test_live_execution_sequentially(capsys, tmp_path):
4142
)
4243

4344
live_manager.start()
44-
live.update_running_tasks(task)
45+
live.add_task(task, status=TaskExecutionStatus.RUNNING)
4546
live_manager.pause()
4647

4748
# Test pause removes the table.
@@ -69,7 +70,7 @@ def test_live_execution_sequentially(capsys, tmp_path):
6970
report = ExecutionReport(task=task, outcome=TaskOutcome.SUCCESS, exc_info=None)
7071

7172
live_manager.resume()
72-
live.update_reports(report)
73+
live.update_report(report)
7374
live_manager.stop()
7475

7576
# Test final table with reported outcome.
@@ -99,13 +100,13 @@ def test_live_execution_displays_skips_and_persists(capsys, tmp_path, verbose, o
99100
)
100101

101102
live_manager.start()
102-
live.update_running_tasks(task)
103+
live.add_task(task, status=TaskExecutionStatus.RUNNING)
103104
live_manager.pause()
104105

105106
report = ExecutionReport(task=task, outcome=outcome, exc_info=None)
106107

107108
live_manager.resume()
108-
live.update_reports(report)
109+
live.update_report(report)
109110
live_manager.stop()
110111

111112
# Test final table with reported outcome.
@@ -149,7 +150,7 @@ def test_live_execution_displays_subset_of_table(capsys, tmp_path, n_entries_in_
149150
)
150151

151152
live_manager.start()
152-
live.update_running_tasks(running_task)
153+
live.add_task(running_task, status=TaskExecutionStatus.RUNNING)
153154
live_manager.stop(transient=False)
154155

155156
captured = capsys.readouterr()
@@ -161,13 +162,13 @@ def test_live_execution_displays_subset_of_table(capsys, tmp_path, n_entries_in_
161162

162163
completed_task = Task(base_name="task_completed", path=path, function=lambda x: x)
163164
completed_task.name = "task_module.py::task_completed"
164-
live.update_running_tasks(completed_task)
165+
live.add_task(completed_task, status=TaskExecutionStatus.RUNNING)
165166
report = ExecutionReport(
166167
task=completed_task, outcome=TaskOutcome.SUCCESS, exc_info=None
167168
)
168169

169170
live_manager.resume()
170-
live.update_reports(report)
171+
live.update_report(report)
171172
live_manager.stop()
172173

173174
# Test that report is or is not included.
@@ -202,7 +203,7 @@ def test_live_execution_skips_do_not_crowd_out_displayed_tasks(capsys, tmp_path)
202203
)
203204

204205
live_manager.start()
205-
live.update_running_tasks(task)
206+
live.add_task(task, status=TaskExecutionStatus.RUNNING)
206207
live_manager.stop()
207208

208209
# Test table with running task.
@@ -224,9 +225,9 @@ def test_live_execution_skips_do_not_crowd_out_displayed_tasks(capsys, tmp_path)
224225
tasks.append(skipped_task)
225226

226227
live_manager.start()
227-
live.update_running_tasks(successful_task)
228+
live.add_task(successful_task, status=TaskExecutionStatus.RUNNING)
228229
for task in tasks:
229-
live.update_running_tasks(task)
230+
live.add_task(task, status=TaskExecutionStatus.RUNNING)
230231
live_manager.stop()
231232

232233
captured = capsys.readouterr()
@@ -239,10 +240,10 @@ def test_live_execution_skips_do_not_crowd_out_displayed_tasks(capsys, tmp_path)
239240
report = ExecutionReport(
240241
task=successful_task, outcome=TaskOutcome.SUCCESS, exc_info=None
241242
)
242-
live.update_reports(report)
243+
live.update_report(report)
243244
for task in tasks:
244245
report = ExecutionReport(task=task, outcome=TaskOutcome.SKIP, exc_info=None)
245-
live.update_reports(report)
246+
live.update_report(report)
246247
live_manager.stop()
247248

248249
# Test final table with reported outcome.

0 commit comments

Comments
 (0)